Skip to content

Drivers API Reference

The drivers module provides hardware abstraction for communication protocols and I/O operations.

Overview

The BESS RCU firmware uses a unified driver architecture organized by protocol type:

  • Modbus: TCP and RTU variants for PLC and sensor communication
  • CAN Bus: SocketCAN interface with DBC file support for PCS/BMS
  • GPIO: Digital I/O for safety interlocks and status indicators

All drivers follow a consistent interface pattern supporting both server and client modes through the DriverMode enumeration.

Modbus Drivers

ModbusDriverBase

bess_rcu.drivers.modbus.base.ModbusDriverBase

Bases: ABC

Abstract base class for Modbus drivers (TCP and RTU).

This unified interface supports both server and client modes, allowing implementations to act as either Modbus slave or master.

Parameters:

Name Type Description Default
slave_id int

Modbus slave/unit ID (1-247)

1
mode DriverMode

Operation mode (SERVER or CLIENT)

SERVER
timeout float

Communication timeout in seconds

5.0
Example

class ModbusTCPDriver(ModbusDriverBase): ... async def start(self): ... # Implementation ... pass

Source code in code/bess_rcu/drivers/modbus/base.py
class ModbusDriverBase(ABC):
    """Abstract base class for Modbus drivers (TCP and RTU).

    This unified interface supports both server and client modes,
    allowing implementations to act as either Modbus slave or master.

    Args:
        slave_id: Modbus slave/unit ID (1-247)
        mode: Operation mode (SERVER or CLIENT)
        timeout: Communication timeout in seconds

    Example:
        >>> class ModbusTCPDriver(ModbusDriverBase):
        ...     async def start(self):
        ...         # Implementation
        ...         pass
    """

    def __init__(
        self,
        slave_id: int = 1,
        mode: DriverMode = DriverMode.SERVER,
        timeout: float = 5.0,
    ) -> None:
        if not 1 <= slave_id <= 247:
            raise ValueError("slave_id must be between 1 and 247")
        self.slave_id = slave_id
        self.mode = mode
        self.timeout = timeout
        self._running = False

    @abstractmethod
    async def start(self) -> None:
        """Start the Modbus driver (server or client).

        Raises:
            CommunicationError: If driver fails to start
            ConfigurationError: If configuration is invalid
        """

    @abstractmethod
    async def stop(self) -> None:
        """Stop the Modbus driver and release resources."""

    @abstractmethod
    async def read_holding_registers(
        self, address: int, count: int, slave_id: Optional[int] = None
    ) -> Optional[List[int]]:
        """Read holding registers.

        Args:
            address: Starting register address
            count: Number of registers to read
            slave_id: Target slave ID (client mode only, optional)

        Returns:
            List of register values or None on error

        Raises:
            CommunicationTimeoutError: If request times out
            DeviceError: If device returns error response
        """

    @abstractmethod
    async def write_register(
        self, address: int, value: int, slave_id: Optional[int] = None
    ) -> bool:
        """Write single holding register.

        Args:
            address: Register address
            value: Value to write (0-65535)
            slave_id: Target slave ID (client mode only, optional)

        Returns:
            True if successful, False otherwise

        Raises:
            CommunicationTimeoutError: If request times out
        """

    @abstractmethod
    async def write_multiple_registers(
        self, address: int, values: List[int], slave_id: Optional[int] = None
    ) -> bool:
        """Write multiple holding registers.

        Args:
            address: Starting register address
            values: List of values to write
            slave_id: Target slave ID (client mode only, optional)

        Returns:
            True if successful, False otherwise

        Raises:
            CommunicationTimeoutError: If request times out
        """

    @property
    def is_running(self) -> bool:
        """Check if driver is running."""
        return self._running

    @property
    def is_server(self) -> bool:
        """Check if driver is in server mode."""
        return self.mode == DriverMode.SERVER

    @property
    def is_client(self) -> bool:
        """Check if driver is in client mode."""
        return self.mode == DriverMode.CLIENT

Attributes

is_running property

Check if driver is running.

is_server property

Check if driver is in server mode.

is_client property

Check if driver is in client mode.

Functions

start() abstractmethod async

Start the Modbus driver (server or client).

Raises:

Type Description
CommunicationError

If driver fails to start

ConfigurationError

If configuration is invalid

Source code in code/bess_rcu/drivers/modbus/base.py
@abstractmethod
async def start(self) -> None:
    """Start the Modbus driver (server or client).

    Raises:
        CommunicationError: If driver fails to start
        ConfigurationError: If configuration is invalid
    """

stop() abstractmethod async

Stop the Modbus driver and release resources.

Source code in code/bess_rcu/drivers/modbus/base.py
@abstractmethod
async def stop(self) -> None:
    """Stop the Modbus driver and release resources."""

read_holding_registers(address, count, slave_id=None) abstractmethod async

Read holding registers.

Parameters:

Name Type Description Default
address int

Starting register address

required
count int

Number of registers to read

required
slave_id Optional[int]

Target slave ID (client mode only, optional)

None

Returns:

Type Description
Optional[List[int]]

List of register values or None on error

Raises:

Type Description
CommunicationTimeoutError

If request times out

DeviceError

If device returns error response

Source code in code/bess_rcu/drivers/modbus/base.py
@abstractmethod
async def read_holding_registers(
    self, address: int, count: int, slave_id: Optional[int] = None
) -> Optional[List[int]]:
    """Read holding registers.

    Args:
        address: Starting register address
        count: Number of registers to read
        slave_id: Target slave ID (client mode only, optional)

    Returns:
        List of register values or None on error

    Raises:
        CommunicationTimeoutError: If request times out
        DeviceError: If device returns error response
    """

write_register(address, value, slave_id=None) abstractmethod async

Write single holding register.

Parameters:

Name Type Description Default
address int

Register address

required
value int

Value to write (0-65535)

required
slave_id Optional[int]

Target slave ID (client mode only, optional)

None

Returns:

Type Description
bool

True if successful, False otherwise

Raises:

Type Description
CommunicationTimeoutError

If request times out

Source code in code/bess_rcu/drivers/modbus/base.py
@abstractmethod
async def write_register(
    self, address: int, value: int, slave_id: Optional[int] = None
) -> bool:
    """Write single holding register.

    Args:
        address: Register address
        value: Value to write (0-65535)
        slave_id: Target slave ID (client mode only, optional)

    Returns:
        True if successful, False otherwise

    Raises:
        CommunicationTimeoutError: If request times out
    """

write_multiple_registers(address, values, slave_id=None) abstractmethod async

Write multiple holding registers.

Parameters:

Name Type Description Default
address int

Starting register address

required
values List[int]

List of values to write

required
slave_id Optional[int]

Target slave ID (client mode only, optional)

None

Returns:

Type Description
bool

True if successful, False otherwise

Raises:

Type Description
CommunicationTimeoutError

If request times out

Source code in code/bess_rcu/drivers/modbus/base.py
@abstractmethod
async def write_multiple_registers(
    self, address: int, values: List[int], slave_id: Optional[int] = None
) -> bool:
    """Write multiple holding registers.

    Args:
        address: Starting register address
        values: List of values to write
        slave_id: Target slave ID (client mode only, optional)

    Returns:
        True if successful, False otherwise

    Raises:
        CommunicationTimeoutError: If request times out
    """

ModbusTCPDriver

bess_rcu.drivers.modbus.tcp.ModbusTCPDriver

Bases: ModbusDriverBase

Modbus TCP driver implementation.

Supports both server (slave) and client (master) modes over TCP/IP.

Parameters:

Name Type Description Default
slave_id int

Modbus slave/unit ID (1-247)

1
mode DriverMode

Operation mode (SERVER or CLIENT)

SERVER
host str

IP address to bind (server) or connect to (client)

DEFAULT_HOST
port int

TCP port number (default: 502)

DEFAULT_PORT
timeout float

Communication timeout in seconds

5.0
Example

Server mode:

driver = ModbusTCPDriver( ... slave_id=1, ... mode=DriverMode.SERVER, ... host='0.0.0.0', ... port=502 ... ) await driver.start()

Client mode:

driver = ModbusTCPDriver( ... slave_id=1, ... mode=DriverMode.CLIENT, ... host='192.168.1.100', ... port=502 ... ) await driver.start() values = await driver.read_holding_registers(0x1000, 10)

Source code in code/bess_rcu/drivers/modbus/tcp.py
class ModbusTCPDriver(ModbusDriverBase):
    """Modbus TCP driver implementation.

    Supports both server (slave) and client (master) modes over TCP/IP.

    Args:
        slave_id: Modbus slave/unit ID (1-247)
        mode: Operation mode (SERVER or CLIENT)
        host: IP address to bind (server) or connect to (client)
        port: TCP port number (default: 502)
        timeout: Communication timeout in seconds

    Example:
        Server mode:
        >>> driver = ModbusTCPDriver(
        ...     slave_id=1,
        ...     mode=DriverMode.SERVER,
        ...     host='0.0.0.0',
        ...     port=502
        ... )
        >>> await driver.start()

        Client mode:
        >>> driver = ModbusTCPDriver(
        ...     slave_id=1,
        ...     mode=DriverMode.CLIENT,
        ...     host='192.168.1.100',
        ...     port=502
        ... )
        >>> await driver.start()
        >>> values = await driver.read_holding_registers(0x1000, 10)
    """

    def __init__(
        self,
        slave_id: int = 1,
        mode: DriverMode = DriverMode.SERVER,
        host: str = DEFAULT_HOST,
        port: int = DEFAULT_PORT,
        timeout: float = 5.0,
    ) -> None:
        super().__init__(slave_id=slave_id, mode=mode, timeout=timeout)
        self.host = host
        self.port = port
        self._server = None
        self._client = None

    async def start(self) -> None:
        """Start the Modbus TCP driver.

        Raises:
            CommunicationError: If driver fails to start
            ConfigurationError: If configuration is invalid
        """
        if self._running:
            logger.warning("Driver already running")
            return

        try:
            if self.is_server:
                await self._start_server()
            else:
                await self._start_client()

            self._running = True
            logger.info(
                "Modbus TCP driver started in %s mode on %s:%d",
                self.mode.value,
                self.host,
                self.port,
            )

        except Exception as e:
            logger.error("Failed to start Modbus TCP driver: %s", e)
            raise

    async def stop(self) -> None:
        """Stop the Modbus TCP driver and release resources."""
        if not self._running:
            return

        try:
            if self.is_server and self._server:
                # TODO: Stop server properly
                self._server = None
            elif self.is_client and self._client:
                # TODO: Disconnect client
                self._client = None

            self._running = False
            logger.info("Modbus TCP driver stopped")

        except Exception as e:
            logger.error("Error stopping Modbus TCP driver: %s", e)
            raise

    async def read_holding_registers(
        self, address: int, count: int, slave_id: Optional[int] = None
    ) -> Optional[List[int]]:
        """Read holding registers via TCP.

        Args:
            address: Starting register address
            count: Number of registers to read
            slave_id: Target slave ID (client mode only)

        Returns:
            List of register values or None on error
        """
        if not self._running:
            logger.error("Driver not running")
            return None

        # TODO: Implement using pymodbus
        logger.debug(
            "Reading %d registers from address 0x%04X (slave %d)",
            count,
            address,
            slave_id or self.slave_id,
        )
        return None

    async def write_register(
        self, address: int, value: int, slave_id: Optional[int] = None
    ) -> bool:
        """Write single holding register via TCP.

        Args:
            address: Register address
            value: Value to write (0-65535)
            slave_id: Target slave ID (client mode only)

        Returns:
            True if successful, False otherwise
        """
        if not self._running:
            logger.error("Driver not running")
            return False

        # TODO: Implement using pymodbus
        logger.debug(
            "Writing value %d to address 0x%04X (slave %d)",
            value,
            address,
            slave_id or self.slave_id,
        )
        return False

    async def write_multiple_registers(
        self, address: int, values: List[int], slave_id: Optional[int] = None
    ) -> bool:
        """Write multiple holding registers via TCP.

        Args:
            address: Starting register address
            values: List of values to write
            slave_id: Target slave ID (client mode only)

        Returns:
            True if successful, False otherwise
        """
        if not self._running:
            logger.error("Driver not running")
            return False

        # TODO: Implement using pymodbus
        logger.debug(
            "Writing %d registers to address 0x%04X (slave %d)",
            len(values),
            address,
            slave_id or self.slave_id,
        )
        return False

    async def _start_server(self) -> None:
        """Start Modbus TCP server (private method)."""
        # TODO: Implement server using pymodbus
        logger.debug("Starting Modbus TCP server...")

    async def _start_client(self) -> None:
        """Start Modbus TCP client (private method)."""
        # TODO: Implement client using pymodbus
        logger.debug("Starting Modbus TCP client...")

Attributes

is_running property

Check if driver is running.

is_server property

Check if driver is in server mode.

is_client property

Check if driver is in client mode.

Functions

start() async

Start the Modbus TCP driver.

Raises:

Type Description
CommunicationError

If driver fails to start

ConfigurationError

If configuration is invalid

Source code in code/bess_rcu/drivers/modbus/tcp.py
async def start(self) -> None:
    """Start the Modbus TCP driver.

    Raises:
        CommunicationError: If driver fails to start
        ConfigurationError: If configuration is invalid
    """
    if self._running:
        logger.warning("Driver already running")
        return

    try:
        if self.is_server:
            await self._start_server()
        else:
            await self._start_client()

        self._running = True
        logger.info(
            "Modbus TCP driver started in %s mode on %s:%d",
            self.mode.value,
            self.host,
            self.port,
        )

    except Exception as e:
        logger.error("Failed to start Modbus TCP driver: %s", e)
        raise

stop() async

Stop the Modbus TCP driver and release resources.

Source code in code/bess_rcu/drivers/modbus/tcp.py
async def stop(self) -> None:
    """Stop the Modbus TCP driver and release resources."""
    if not self._running:
        return

    try:
        if self.is_server and self._server:
            # TODO: Stop server properly
            self._server = None
        elif self.is_client and self._client:
            # TODO: Disconnect client
            self._client = None

        self._running = False
        logger.info("Modbus TCP driver stopped")

    except Exception as e:
        logger.error("Error stopping Modbus TCP driver: %s", e)
        raise

read_holding_registers(address, count, slave_id=None) async

Read holding registers via TCP.

Parameters:

Name Type Description Default
address int

Starting register address

required
count int

Number of registers to read

required
slave_id Optional[int]

Target slave ID (client mode only)

None

Returns:

Type Description
Optional[List[int]]

List of register values or None on error

Source code in code/bess_rcu/drivers/modbus/tcp.py
async def read_holding_registers(
    self, address: int, count: int, slave_id: Optional[int] = None
) -> Optional[List[int]]:
    """Read holding registers via TCP.

    Args:
        address: Starting register address
        count: Number of registers to read
        slave_id: Target slave ID (client mode only)

    Returns:
        List of register values or None on error
    """
    if not self._running:
        logger.error("Driver not running")
        return None

    # TODO: Implement using pymodbus
    logger.debug(
        "Reading %d registers from address 0x%04X (slave %d)",
        count,
        address,
        slave_id or self.slave_id,
    )
    return None

write_register(address, value, slave_id=None) async

Write single holding register via TCP.

Parameters:

Name Type Description Default
address int

Register address

required
value int

Value to write (0-65535)

required
slave_id Optional[int]

Target slave ID (client mode only)

None

Returns:

Type Description
bool

True if successful, False otherwise

Source code in code/bess_rcu/drivers/modbus/tcp.py
async def write_register(
    self, address: int, value: int, slave_id: Optional[int] = None
) -> bool:
    """Write single holding register via TCP.

    Args:
        address: Register address
        value: Value to write (0-65535)
        slave_id: Target slave ID (client mode only)

    Returns:
        True if successful, False otherwise
    """
    if not self._running:
        logger.error("Driver not running")
        return False

    # TODO: Implement using pymodbus
    logger.debug(
        "Writing value %d to address 0x%04X (slave %d)",
        value,
        address,
        slave_id or self.slave_id,
    )
    return False

write_multiple_registers(address, values, slave_id=None) async

Write multiple holding registers via TCP.

Parameters:

Name Type Description Default
address int

Starting register address

required
values List[int]

List of values to write

required
slave_id Optional[int]

Target slave ID (client mode only)

None

Returns:

Type Description
bool

True if successful, False otherwise

Source code in code/bess_rcu/drivers/modbus/tcp.py
async def write_multiple_registers(
    self, address: int, values: List[int], slave_id: Optional[int] = None
) -> bool:
    """Write multiple holding registers via TCP.

    Args:
        address: Starting register address
        values: List of values to write
        slave_id: Target slave ID (client mode only)

    Returns:
        True if successful, False otherwise
    """
    if not self._running:
        logger.error("Driver not running")
        return False

    # TODO: Implement using pymodbus
    logger.debug(
        "Writing %d registers to address 0x%04X (slave %d)",
        len(values),
        address,
        slave_id or self.slave_id,
    )
    return False

ModbusRTUDriver

bess_rcu.drivers.modbus.rtu.ModbusRTUDriver

Bases: ModbusDriverBase

Modbus RTU driver implementation.

Supports both server (slave) and client (master) modes over RS-485 serial.

Parameters:

Name Type Description Default
slave_id int

Modbus slave/unit ID (1-247)

1
mode DriverMode

Operation mode (SERVER or CLIENT)

SERVER
port str

Serial port path (e.g., '/dev/ttyUSB0', 'COM3')

'/dev/ttyUSB0'
baudrate int

Serial baudrate (default: 9600)

DEFAULT_BAUDRATE
bytesize int

Number of data bits (default: 8)

DEFAULT_BYTESIZE
parity str

Parity checking ('N', 'E', 'O') (default: 'N')

DEFAULT_PARITY
stopbits int

Number of stop bits (default: 1)

DEFAULT_STOPBITS
timeout float

Communication timeout in seconds

5.0
Example

Server mode:

driver = ModbusRTUDriver( ... slave_id=1, ... mode=DriverMode.SERVER, ... port='/dev/ttyUSB0', ... baudrate=9600 ... ) await driver.start()

Client mode:

driver = ModbusRTUDriver( ... slave_id=1, ... mode=DriverMode.CLIENT, ... port='/dev/ttyUSB0', ... baudrate=9600 ... ) await driver.start() values = await driver.read_holding_registers(0x1000, 10, slave_id=2)

Source code in code/bess_rcu/drivers/modbus/rtu.py
class ModbusRTUDriver(ModbusDriverBase):
    """Modbus RTU driver implementation.

    Supports both server (slave) and client (master) modes over RS-485 serial.

    Args:
        slave_id: Modbus slave/unit ID (1-247)
        mode: Operation mode (SERVER or CLIENT)
        port: Serial port path (e.g., '/dev/ttyUSB0', 'COM3')
        baudrate: Serial baudrate (default: 9600)
        bytesize: Number of data bits (default: 8)
        parity: Parity checking ('N', 'E', 'O') (default: 'N')
        stopbits: Number of stop bits (default: 1)
        timeout: Communication timeout in seconds

    Example:
        Server mode:
        >>> driver = ModbusRTUDriver(
        ...     slave_id=1,
        ...     mode=DriverMode.SERVER,
        ...     port='/dev/ttyUSB0',
        ...     baudrate=9600
        ... )
        >>> await driver.start()

        Client mode:
        >>> driver = ModbusRTUDriver(
        ...     slave_id=1,
        ...     mode=DriverMode.CLIENT,
        ...     port='/dev/ttyUSB0',
        ...     baudrate=9600
        ... )
        >>> await driver.start()
        >>> values = await driver.read_holding_registers(0x1000, 10, slave_id=2)
    """

    def __init__(
        self,
        slave_id: int = 1,
        mode: DriverMode = DriverMode.SERVER,
        port: str = "/dev/ttyUSB0",
        baudrate: int = DEFAULT_BAUDRATE,
        bytesize: int = DEFAULT_BYTESIZE,
        parity: str = DEFAULT_PARITY,
        stopbits: int = DEFAULT_STOPBITS,
        timeout: float = 5.0,
    ) -> None:
        super().__init__(slave_id=slave_id, mode=mode, timeout=timeout)
        self.port = port
        self.baudrate = baudrate
        self.bytesize = bytesize
        self.parity = parity
        self.stopbits = stopbits
        self._server = None
        self._client = None

    async def start(self) -> None:
        """Start the Modbus RTU driver.

        Raises:
            CommunicationError: If driver fails to start
            ConfigurationError: If serial port configuration is invalid
        """
        if self._running:
            logger.warning("Driver already running")
            return

        try:
            if self.is_server:
                await self._start_server()
            else:
                await self._start_client()

            self._running = True
            logger.info(
                "Modbus RTU driver started in %s mode on %s @ %d baud",
                self.mode.value,
                self.port,
                self.baudrate,
            )

        except Exception as e:
            logger.error("Failed to start Modbus RTU driver: %s", e)
            raise

    async def stop(self) -> None:
        """Stop the Modbus RTU driver and release resources."""
        if not self._running:
            return

        try:
            if self.is_server and self._server:
                # TODO: Stop server properly
                self._server = None
            elif self.is_client and self._client:
                # TODO: Disconnect client
                self._client = None

            self._running = False
            logger.info("Modbus RTU driver stopped")

        except Exception as e:
            logger.error("Error stopping Modbus RTU driver: %s", e)
            raise

    async def read_holding_registers(
        self, address: int, count: int, slave_id: Optional[int] = None
    ) -> Optional[List[int]]:
        """Read holding registers via RTU.

        Args:
            address: Starting register address
            count: Number of registers to read
            slave_id: Target slave ID (client mode only, required for RTU)

        Returns:
            List of register values or None on error
        """
        if not self._running:
            logger.error("Driver not running")
            return None

        # TODO: Implement using pymodbus
        logger.debug(
            "Reading %d registers from address 0x%04X (slave %d)",
            count,
            address,
            slave_id or self.slave_id,
        )
        return None

    async def write_register(
        self, address: int, value: int, slave_id: Optional[int] = None
    ) -> bool:
        """Write single holding register via RTU.

        Args:
            address: Register address
            value: Value to write (0-65535)
            slave_id: Target slave ID (client mode only, required for RTU)

        Returns:
            True if successful, False otherwise
        """
        if not self._running:
            logger.error("Driver not running")
            return False

        # TODO: Implement using pymodbus
        logger.debug(
            "Writing value %d to address 0x%04X (slave %d)",
            value,
            address,
            slave_id or self.slave_id,
        )
        return False

    async def write_multiple_registers(
        self, address: int, values: List[int], slave_id: Optional[int] = None
    ) -> bool:
        """Write multiple holding registers via RTU.

        Args:
            address: Starting register address
            values: List of values to write
            slave_id: Target slave ID (client mode only, required for RTU)

        Returns:
            True if successful, False otherwise
        """
        if not self._running:
            logger.error("Driver not running")
            return False

        # TODO: Implement using pymodbus
        logger.debug(
            "Writing %d registers to address 0x%04X (slave %d)",
            len(values),
            address,
            slave_id or self.slave_id,
        )
        return False

    async def _start_server(self) -> None:
        """Start Modbus RTU server (private method)."""
        # TODO: Implement server using pymodbus
        logger.debug("Starting Modbus RTU server...")

    async def _start_client(self) -> None:
        """Start Modbus RTU client (private method)."""
        # TODO: Implement client using pymodbus
        logger.debug("Starting Modbus RTU client...")

Attributes

is_running property

Check if driver is running.

is_server property

Check if driver is in server mode.

is_client property

Check if driver is in client mode.

Functions

start() async

Start the Modbus RTU driver.

Raises:

Type Description
CommunicationError

If driver fails to start

ConfigurationError

If serial port configuration is invalid

Source code in code/bess_rcu/drivers/modbus/rtu.py
async def start(self) -> None:
    """Start the Modbus RTU driver.

    Raises:
        CommunicationError: If driver fails to start
        ConfigurationError: If serial port configuration is invalid
    """
    if self._running:
        logger.warning("Driver already running")
        return

    try:
        if self.is_server:
            await self._start_server()
        else:
            await self._start_client()

        self._running = True
        logger.info(
            "Modbus RTU driver started in %s mode on %s @ %d baud",
            self.mode.value,
            self.port,
            self.baudrate,
        )

    except Exception as e:
        logger.error("Failed to start Modbus RTU driver: %s", e)
        raise

stop() async

Stop the Modbus RTU driver and release resources.

Source code in code/bess_rcu/drivers/modbus/rtu.py
async def stop(self) -> None:
    """Stop the Modbus RTU driver and release resources."""
    if not self._running:
        return

    try:
        if self.is_server and self._server:
            # TODO: Stop server properly
            self._server = None
        elif self.is_client and self._client:
            # TODO: Disconnect client
            self._client = None

        self._running = False
        logger.info("Modbus RTU driver stopped")

    except Exception as e:
        logger.error("Error stopping Modbus RTU driver: %s", e)
        raise

read_holding_registers(address, count, slave_id=None) async

Read holding registers via RTU.

Parameters:

Name Type Description Default
address int

Starting register address

required
count int

Number of registers to read

required
slave_id Optional[int]

Target slave ID (client mode only, required for RTU)

None

Returns:

Type Description
Optional[List[int]]

List of register values or None on error

Source code in code/bess_rcu/drivers/modbus/rtu.py
async def read_holding_registers(
    self, address: int, count: int, slave_id: Optional[int] = None
) -> Optional[List[int]]:
    """Read holding registers via RTU.

    Args:
        address: Starting register address
        count: Number of registers to read
        slave_id: Target slave ID (client mode only, required for RTU)

    Returns:
        List of register values or None on error
    """
    if not self._running:
        logger.error("Driver not running")
        return None

    # TODO: Implement using pymodbus
    logger.debug(
        "Reading %d registers from address 0x%04X (slave %d)",
        count,
        address,
        slave_id or self.slave_id,
    )
    return None

write_register(address, value, slave_id=None) async

Write single holding register via RTU.

Parameters:

Name Type Description Default
address int

Register address

required
value int

Value to write (0-65535)

required
slave_id Optional[int]

Target slave ID (client mode only, required for RTU)

None

Returns:

Type Description
bool

True if successful, False otherwise

Source code in code/bess_rcu/drivers/modbus/rtu.py
async def write_register(
    self, address: int, value: int, slave_id: Optional[int] = None
) -> bool:
    """Write single holding register via RTU.

    Args:
        address: Register address
        value: Value to write (0-65535)
        slave_id: Target slave ID (client mode only, required for RTU)

    Returns:
        True if successful, False otherwise
    """
    if not self._running:
        logger.error("Driver not running")
        return False

    # TODO: Implement using pymodbus
    logger.debug(
        "Writing value %d to address 0x%04X (slave %d)",
        value,
        address,
        slave_id or self.slave_id,
    )
    return False

write_multiple_registers(address, values, slave_id=None) async

Write multiple holding registers via RTU.

Parameters:

Name Type Description Default
address int

Starting register address

required
values List[int]

List of values to write

required
slave_id Optional[int]

Target slave ID (client mode only, required for RTU)

None

Returns:

Type Description
bool

True if successful, False otherwise

Source code in code/bess_rcu/drivers/modbus/rtu.py
async def write_multiple_registers(
    self, address: int, values: List[int], slave_id: Optional[int] = None
) -> bool:
    """Write multiple holding registers via RTU.

    Args:
        address: Starting register address
        values: List of values to write
        slave_id: Target slave ID (client mode only, required for RTU)

    Returns:
        True if successful, False otherwise
    """
    if not self._running:
        logger.error("Driver not running")
        return False

    # TODO: Implement using pymodbus
    logger.debug(
        "Writing %d registers to address 0x%04X (slave %d)",
        len(values),
        address,
        slave_id or self.slave_id,
    )
    return False

CAN Bus Drivers

CANDriverBase

bess_rcu.drivers.can.base.CANDriverBase

Bases: ABC

Abstract base class for CAN bus drivers.

This class defines the interface for CAN bus communication regardless of the underlying hardware (SocketCAN, PCAN, etc.).

Parameters:

Name Type Description Default
channel str

CAN channel name (e.g., 'can0', 'vcan0')

required
bitrate int

Bus bitrate in bps (default: 500000)

DEFAULT_BITRATE
can_filters Optional[List[Dict[str, int]]]

List of CAN ID filters (optional)

None
Example

class SocketCANDriver(CANDriverBase): ... async def connect(self): ... # Implementation ... pass

Source code in code/bess_rcu/drivers/can/base.py
class CANDriverBase(ABC):
    """Abstract base class for CAN bus drivers.

    This class defines the interface for CAN bus communication
    regardless of the underlying hardware (SocketCAN, PCAN, etc.).

    Args:
        channel: CAN channel name (e.g., 'can0', 'vcan0')
        bitrate: Bus bitrate in bps (default: 500000)
        can_filters: List of CAN ID filters (optional)

    Example:
        >>> class SocketCANDriver(CANDriverBase):
        ...     async def connect(self):
        ...         # Implementation
        ...         pass
    """

    def __init__(
        self,
        channel: str,
        bitrate: int = DEFAULT_BITRATE,
        can_filters: Optional[List[Dict[str, int]]] = None,
    ) -> None:
        self.channel = channel
        self.bitrate = bitrate
        self.can_filters = can_filters or []
        self._connected = False

    @abstractmethod
    async def connect(self) -> bool:
        """Initialize and connect to CAN bus.

        Returns:
            True if connection successful, False otherwise

        Raises:
            CommunicationError: If CAN interface initialization fails
        """

    @abstractmethod
    async def disconnect(self) -> None:
        """Disconnect from CAN bus and cleanup resources."""

    @abstractmethod
    async def send(self, message: CANMessage) -> bool:
        """Send CAN message on the bus.

        Args:
            message: CANMessage to send

        Returns:
            True if send successful, False otherwise

        Raises:
            DeviceNotConnectedError: If not connected to bus
        """

    @abstractmethod
    async def receive(self, timeout: float = DEFAULT_TIMEOUT) -> Optional[CANMessage]:
        """Receive CAN message from bus.

        Args:
            timeout: Maximum time to wait for message in seconds

        Returns:
            Received CANMessage or None if timeout

        Raises:
            DeviceNotConnectedError: If not connected to bus
        """

    @abstractmethod
    def set_filters(self, filters: List[Dict[str, int]]) -> bool:
        """Set CAN ID filters.

        Args:
            filters: List of filter dicts with 'can_id' and 'can_mask'
                Example: [{'can_id': 0x100, 'can_mask': 0x7FF}]

        Returns:
            True if filters applied successfully
        """

    @property
    def is_connected(self) -> bool:
        """Check if connected to CAN bus."""
        return self._connected

Attributes

is_connected property

Check if connected to CAN bus.

Functions

connect() abstractmethod async

Initialize and connect to CAN bus.

Returns:

Type Description
bool

True if connection successful, False otherwise

Raises:

Type Description
CommunicationError

If CAN interface initialization fails

Source code in code/bess_rcu/drivers/can/base.py
@abstractmethod
async def connect(self) -> bool:
    """Initialize and connect to CAN bus.

    Returns:
        True if connection successful, False otherwise

    Raises:
        CommunicationError: If CAN interface initialization fails
    """

disconnect() abstractmethod async

Disconnect from CAN bus and cleanup resources.

Source code in code/bess_rcu/drivers/can/base.py
@abstractmethod
async def disconnect(self) -> None:
    """Disconnect from CAN bus and cleanup resources."""

send(message) abstractmethod async

Send CAN message on the bus.

Parameters:

Name Type Description Default
message CANMessage

CANMessage to send

required

Returns:

Type Description
bool

True if send successful, False otherwise

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

Source code in code/bess_rcu/drivers/can/base.py
@abstractmethod
async def send(self, message: CANMessage) -> bool:
    """Send CAN message on the bus.

    Args:
        message: CANMessage to send

    Returns:
        True if send successful, False otherwise

    Raises:
        DeviceNotConnectedError: If not connected to bus
    """

receive(timeout=DEFAULT_TIMEOUT) abstractmethod async

Receive CAN message from bus.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for message in seconds

DEFAULT_TIMEOUT

Returns:

Type Description
Optional[CANMessage]

Received CANMessage or None if timeout

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

Source code in code/bess_rcu/drivers/can/base.py
@abstractmethod
async def receive(self, timeout: float = DEFAULT_TIMEOUT) -> Optional[CANMessage]:
    """Receive CAN message from bus.

    Args:
        timeout: Maximum time to wait for message in seconds

    Returns:
        Received CANMessage or None if timeout

    Raises:
        DeviceNotConnectedError: If not connected to bus
    """

set_filters(filters) abstractmethod

Set CAN ID filters.

Parameters:

Name Type Description Default
filters List[Dict[str, int]]

List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}]

required

Returns:

Type Description
bool

True if filters applied successfully

Source code in code/bess_rcu/drivers/can/base.py
@abstractmethod
def set_filters(self, filters: List[Dict[str, int]]) -> bool:
    """Set CAN ID filters.

    Args:
        filters: List of filter dicts with 'can_id' and 'can_mask'
            Example: [{'can_id': 0x100, 'can_mask': 0x7FF}]

    Returns:
        True if filters applied successfully
    """

SocketCANDriver

bess_rcu.drivers.can.socketcan.SocketCANDriver

Bases: CANDriverBase

SocketCAN driver implementation for Linux.

Supports both real CAN hardware (can0, can1) and virtual CAN (vcan0) using the python-can library with SocketCAN interface.

Parameters:

Name Type Description Default
channel str

CAN channel name (e.g., 'can0', 'vcan0')

required
bitrate int

Bus bitrate in bps (default: 500000)

500000
can_filters Optional[List[Dict[str, int]]]

List of CAN ID filters (optional)

None
Example

driver = SocketCANDriver(channel='vcan0', bitrate=500000) await driver.connect() msg = CANMessage(arbitration_id=0x100, data=b'\x01\x02\x03') await driver.send(msg) received = await driver.receive(timeout=1.0)

Source code in code/bess_rcu/drivers/can/socketcan.py
class SocketCANDriver(CANDriverBase):
    """SocketCAN driver implementation for Linux.

    Supports both real CAN hardware (can0, can1) and virtual CAN (vcan0)
    using the python-can library with SocketCAN interface.

    Args:
        channel: CAN channel name (e.g., 'can0', 'vcan0')
        bitrate: Bus bitrate in bps (default: 500000)
        can_filters: List of CAN ID filters (optional)

    Example:
        >>> driver = SocketCANDriver(channel='vcan0', bitrate=500000)
        >>> await driver.connect()
        >>> msg = CANMessage(arbitration_id=0x100, data=b'\\x01\\x02\\x03')
        >>> await driver.send(msg)
        >>> received = await driver.receive(timeout=1.0)
    """

    def __init__(
        self,
        channel: str,
        bitrate: int = 500000,
        can_filters: Optional[List[Dict[str, int]]] = None,
    ) -> None:
        super().__init__(channel=channel, bitrate=bitrate, can_filters=can_filters)
        self._bus = None

    async def connect(self) -> bool:
        """Initialize and connect to SocketCAN interface.

        Returns:
            True if connection successful, False otherwise

        Raises:
            CommunicationError: If CAN interface initialization fails
        """
        if self._connected:
            logger.warning("Already connected to CAN bus")
            return True

        try:
            # TODO: Implement using python-can
            # self._bus = can.interface.Bus(
            #     channel=self.channel,
            #     bustype='socketcan',
            #     bitrate=self.bitrate
            # )

            # Apply filters if provided
            if self.can_filters:
                self.set_filters(self.can_filters)

            self._connected = True
            logger.info(
                "Connected to SocketCAN interface %s @ %d bps",
                self.channel,
                self.bitrate,
            )
            return True

        except Exception as e:
            logger.error("Failed to connect to SocketCAN interface: %s", e)
            return False

    async def disconnect(self) -> None:
        """Disconnect from SocketCAN interface and cleanup resources."""
        if not self._connected:
            return

        try:
            if self._bus:
                # TODO: Shutdown bus properly
                # self._bus.shutdown()
                self._bus = None

            self._connected = False
            logger.info("Disconnected from SocketCAN interface %s", self.channel)

        except Exception as e:
            logger.error("Error disconnecting from SocketCAN: %s", e)
            raise

    async def send(self, message: CANMessage) -> bool:
        """Send CAN message on the bus.

        Args:
            message: CANMessage to send

        Returns:
            True if send successful, False otherwise

        Raises:
            DeviceNotConnectedError: If not connected to bus
        """
        if not self._connected:
            logger.error("Not connected to CAN bus")
            return False

        try:
            # TODO: Implement using python-can
            # can_msg = can.Message(
            #     arbitration_id=message.arbitration_id,
            #     data=message.data,
            #     is_extended_id=message.is_extended
            # )
            # self._bus.send(can_msg)

            logger.debug(
                "Sent CAN message: ID=0x%03X, Data=%s",
                message.arbitration_id,
                message.data.hex(),
            )
            return True

        except Exception as e:
            logger.error("Failed to send CAN message: %s", e)
            return False

    async def receive(self, timeout: float = DEFAULT_TIMEOUT) -> Optional[CANMessage]:
        """Receive CAN message from bus.

        Args:
            timeout: Maximum time to wait for message in seconds

        Returns:
            Received CANMessage or None if timeout

        Raises:
            DeviceNotConnectedError: If not connected to bus
        """
        if not self._connected:
            logger.error("Not connected to CAN bus")
            return None

        try:
            # TODO: Implement using python-can
            # can_msg = self._bus.recv(timeout=timeout)
            # if can_msg is None:
            #     return None
            #
            # return CANMessage(
            #     arbitration_id=can_msg.arbitration_id,
            #     data=can_msg.data,
            #     is_extended=can_msg.is_extended_id,
            #     timestamp=can_msg.timestamp
            # )

            return None

        except Exception as e:
            logger.error("Error receiving CAN message: %s", e)
            return None

    def set_filters(self, filters: List[Dict[str, int]]) -> bool:
        """Set CAN ID filters.

        Args:
            filters: List of filter dicts with 'can_id' and 'can_mask'
                Example: [{'can_id': 0x100, 'can_mask': 0x7FF}]

        Returns:
            True if filters applied successfully
        """
        if not self._connected or not self._bus:
            logger.warning("Cannot set filters: not connected to bus")
            return False

        try:
            # TODO: Implement using python-can
            # self._bus.set_filters(filters)

            logger.info("Applied %d CAN filters", len(filters))
            return True

        except Exception as e:
            logger.error("Failed to set CAN filters: %s", e)
            return False

Attributes

is_connected property

Check if connected to CAN bus.

Functions

connect() async

Initialize and connect to SocketCAN interface.

Returns:

Type Description
bool

True if connection successful, False otherwise

Raises:

Type Description
CommunicationError

If CAN interface initialization fails

Source code in code/bess_rcu/drivers/can/socketcan.py
async def connect(self) -> bool:
    """Initialize and connect to SocketCAN interface.

    Returns:
        True if connection successful, False otherwise

    Raises:
        CommunicationError: If CAN interface initialization fails
    """
    if self._connected:
        logger.warning("Already connected to CAN bus")
        return True

    try:
        # TODO: Implement using python-can
        # self._bus = can.interface.Bus(
        #     channel=self.channel,
        #     bustype='socketcan',
        #     bitrate=self.bitrate
        # )

        # Apply filters if provided
        if self.can_filters:
            self.set_filters(self.can_filters)

        self._connected = True
        logger.info(
            "Connected to SocketCAN interface %s @ %d bps",
            self.channel,
            self.bitrate,
        )
        return True

    except Exception as e:
        logger.error("Failed to connect to SocketCAN interface: %s", e)
        return False

disconnect() async

Disconnect from SocketCAN interface and cleanup resources.

Source code in code/bess_rcu/drivers/can/socketcan.py
async def disconnect(self) -> None:
    """Disconnect from SocketCAN interface and cleanup resources."""
    if not self._connected:
        return

    try:
        if self._bus:
            # TODO: Shutdown bus properly
            # self._bus.shutdown()
            self._bus = None

        self._connected = False
        logger.info("Disconnected from SocketCAN interface %s", self.channel)

    except Exception as e:
        logger.error("Error disconnecting from SocketCAN: %s", e)
        raise

send(message) async

Send CAN message on the bus.

Parameters:

Name Type Description Default
message CANMessage

CANMessage to send

required

Returns:

Type Description
bool

True if send successful, False otherwise

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

Source code in code/bess_rcu/drivers/can/socketcan.py
async def send(self, message: CANMessage) -> bool:
    """Send CAN message on the bus.

    Args:
        message: CANMessage to send

    Returns:
        True if send successful, False otherwise

    Raises:
        DeviceNotConnectedError: If not connected to bus
    """
    if not self._connected:
        logger.error("Not connected to CAN bus")
        return False

    try:
        # TODO: Implement using python-can
        # can_msg = can.Message(
        #     arbitration_id=message.arbitration_id,
        #     data=message.data,
        #     is_extended_id=message.is_extended
        # )
        # self._bus.send(can_msg)

        logger.debug(
            "Sent CAN message: ID=0x%03X, Data=%s",
            message.arbitration_id,
            message.data.hex(),
        )
        return True

    except Exception as e:
        logger.error("Failed to send CAN message: %s", e)
        return False

receive(timeout=DEFAULT_TIMEOUT) async

Receive CAN message from bus.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for message in seconds

DEFAULT_TIMEOUT

Returns:

Type Description
Optional[CANMessage]

Received CANMessage or None if timeout

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

Source code in code/bess_rcu/drivers/can/socketcan.py
async def receive(self, timeout: float = DEFAULT_TIMEOUT) -> Optional[CANMessage]:
    """Receive CAN message from bus.

    Args:
        timeout: Maximum time to wait for message in seconds

    Returns:
        Received CANMessage or None if timeout

    Raises:
        DeviceNotConnectedError: If not connected to bus
    """
    if not self._connected:
        logger.error("Not connected to CAN bus")
        return None

    try:
        # TODO: Implement using python-can
        # can_msg = self._bus.recv(timeout=timeout)
        # if can_msg is None:
        #     return None
        #
        # return CANMessage(
        #     arbitration_id=can_msg.arbitration_id,
        #     data=can_msg.data,
        #     is_extended=can_msg.is_extended_id,
        #     timestamp=can_msg.timestamp
        # )

        return None

    except Exception as e:
        logger.error("Error receiving CAN message: %s", e)
        return None

set_filters(filters)

Set CAN ID filters.

Parameters:

Name Type Description Default
filters List[Dict[str, int]]

List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}]

required

Returns:

Type Description
bool

True if filters applied successfully

Source code in code/bess_rcu/drivers/can/socketcan.py
def set_filters(self, filters: List[Dict[str, int]]) -> bool:
    """Set CAN ID filters.

    Args:
        filters: List of filter dicts with 'can_id' and 'can_mask'
            Example: [{'can_id': 0x100, 'can_mask': 0x7FF}]

    Returns:
        True if filters applied successfully
    """
    if not self._connected or not self._bus:
        logger.warning("Cannot set filters: not connected to bus")
        return False

    try:
        # TODO: Implement using python-can
        # self._bus.set_filters(filters)

        logger.info("Applied %d CAN filters", len(filters))
        return True

    except Exception as e:
        logger.error("Failed to set CAN filters: %s", e)
        return False

DBCLoaderBase

bess_rcu.drivers.can.base.DBCLoaderBase

Bases: ABC

Abstract base class for DBC file loading and parsing.

This class defines the interface for loading CAN database files and encoding/decoding CAN messages based on signal definitions.

Parameters:

Name Type Description Default
dbc_path str

Path to DBC file

required
Example

class DBCLoader(DBCLoaderBase): ... def load(self): ... # Implementation ... pass

Source code in code/bess_rcu/drivers/can/base.py
class DBCLoaderBase(ABC):
    """Abstract base class for DBC file loading and parsing.

    This class defines the interface for loading CAN database files
    and encoding/decoding CAN messages based on signal definitions.

    Args:
        dbc_path: Path to DBC file

    Example:
        >>> class DBCLoader(DBCLoaderBase):
        ...     def load(self):
        ...         # Implementation
        ...         pass
    """

    def __init__(self, dbc_path: str) -> None:
        self.dbc_path = dbc_path
        self._loaded = False

    @abstractmethod
    def load(self) -> bool:
        """Load and parse DBC file.

        Returns:
            True if loaded successfully, False otherwise

        Raises:
            FileNotFoundError: If DBC file not found
            ValueError: If DBC file format invalid
        """

    @abstractmethod
    def encode_message(
        self, message_name: str, data: Dict[str, Any]
    ) -> Optional[CANMessage]:
        """Encode signal data into CAN message.

        Args:
            message_name: Name of message in DBC
            data: Dictionary of signal_name: value pairs

        Returns:
            Encoded CANMessage or None if encoding fails

        Example:
            >>> msg = loader.encode_message(
            ...     'PowerCommand',
            ...     {'power_kw': 50.0, 'enable': True}
            ... )
        """

    @abstractmethod
    def decode_message(self, message: CANMessage) -> Optional[Dict[str, Any]]:
        """Decode CAN message into signal data.

        Args:
            message: CANMessage to decode

        Returns:
            Dictionary of signal_name: value pairs or None if unknown message

        Example:
            >>> signals = loader.decode_message(can_msg)
            >>> power = signals.get('power_kw')
        """

    @abstractmethod
    def get_message_id(self, message_name: str) -> Optional[int]:
        """Get CAN ID for message name.

        Args:
            message_name: Name of message in DBC

        Returns:
            CAN arbitration ID or None if not found
        """

    @property
    def is_loaded(self) -> bool:
        """Check if DBC file is loaded."""
        return self._loaded

Attributes

is_loaded property

Check if DBC file is loaded.

Functions

load() abstractmethod

Load and parse DBC file.

Returns:

Type Description
bool

True if loaded successfully, False otherwise

Raises:

Type Description
FileNotFoundError

If DBC file not found

ValueError

If DBC file format invalid

Source code in code/bess_rcu/drivers/can/base.py
@abstractmethod
def load(self) -> bool:
    """Load and parse DBC file.

    Returns:
        True if loaded successfully, False otherwise

    Raises:
        FileNotFoundError: If DBC file not found
        ValueError: If DBC file format invalid
    """

encode_message(message_name, data) abstractmethod

Encode signal data into CAN message.

Parameters:

Name Type Description Default
message_name str

Name of message in DBC

required
data Dict[str, Any]

Dictionary of signal_name: value pairs

required

Returns:

Type Description
Optional[CANMessage]

Encoded CANMessage or None if encoding fails

Example

msg = loader.encode_message( ... 'PowerCommand', ... {'power_kw': 50.0, 'enable': True} ... )

Source code in code/bess_rcu/drivers/can/base.py
@abstractmethod
def encode_message(
    self, message_name: str, data: Dict[str, Any]
) -> Optional[CANMessage]:
    """Encode signal data into CAN message.

    Args:
        message_name: Name of message in DBC
        data: Dictionary of signal_name: value pairs

    Returns:
        Encoded CANMessage or None if encoding fails

    Example:
        >>> msg = loader.encode_message(
        ...     'PowerCommand',
        ...     {'power_kw': 50.0, 'enable': True}
        ... )
    """

decode_message(message) abstractmethod

Decode CAN message into signal data.

Parameters:

Name Type Description Default
message CANMessage

CANMessage to decode

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary of signal_name: value pairs or None if unknown message

Example

signals = loader.decode_message(can_msg) power = signals.get('power_kw')

Source code in code/bess_rcu/drivers/can/base.py
@abstractmethod
def decode_message(self, message: CANMessage) -> Optional[Dict[str, Any]]:
    """Decode CAN message into signal data.

    Args:
        message: CANMessage to decode

    Returns:
        Dictionary of signal_name: value pairs or None if unknown message

    Example:
        >>> signals = loader.decode_message(can_msg)
        >>> power = signals.get('power_kw')
    """

get_message_id(message_name) abstractmethod

Get CAN ID for message name.

Parameters:

Name Type Description Default
message_name str

Name of message in DBC

required

Returns:

Type Description
Optional[int]

CAN arbitration ID or None if not found

Source code in code/bess_rcu/drivers/can/base.py
@abstractmethod
def get_message_id(self, message_name: str) -> Optional[int]:
    """Get CAN ID for message name.

    Args:
        message_name: Name of message in DBC

    Returns:
        CAN arbitration ID or None if not found
    """

DBCLoader

bess_rcu.drivers.can.dbc_loader.DBCLoader

Bases: DBCLoaderBase

DBC file loader using cantools library.

Parses CAN database files (.dbc) and provides message encoding/decoding based on signal definitions.

Parameters:

Name Type Description Default
dbc_path str

Path to DBC file

required
Example

loader = DBCLoader(dbc_path='config/can/pcs.dbc') loader.load() msg = loader.encode_message('PowerCommand', {'power_kw': 50.0}) signals = loader.decode_message(received_msg)

Source code in code/bess_rcu/drivers/can/dbc_loader.py
class DBCLoader(DBCLoaderBase):
    """DBC file loader using cantools library.

    Parses CAN database files (.dbc) and provides message encoding/decoding
    based on signal definitions.

    Args:
        dbc_path: Path to DBC file

    Example:
        >>> loader = DBCLoader(dbc_path='config/can/pcs.dbc')
        >>> loader.load()
        >>> msg = loader.encode_message('PowerCommand', {'power_kw': 50.0})
        >>> signals = loader.decode_message(received_msg)
    """

    def __init__(self, dbc_path: str) -> None:
        super().__init__(dbc_path=dbc_path)
        self._database = None

    def load(self) -> bool:
        """Load and parse DBC file.

        Returns:
            True if loaded successfully, False otherwise

        Raises:
            FileNotFoundError: If DBC file not found
            ValueError: If DBC file format invalid
        """
        if self._loaded:
            logger.warning("DBC file already loaded")
            return True

        try:
            dbc_file = Path(self.dbc_path)
            if not dbc_file.exists():
                raise FileNotFoundError("DBC file not found: %s", self.dbc_path)

            # TODO: Implement using cantools
            # self._database = cantools.database.load_file(self.dbc_path)

            self._loaded = True
            logger.info("Loaded DBC file: %s", self.dbc_path)
            # logger.info("  Messages: %d", len(self._database.messages))
            return True

        except FileNotFoundError as e:
            logger.error("DBC file not found: %s", e)
            raise
        except Exception as e:
            logger.error("Failed to load DBC file: %s", e)
            raise ValueError("Invalid DBC file format") from e

    def encode_message(
        self, message_name: str, data: Dict[str, Any]
    ) -> Optional[CANMessage]:
        """Encode signal data into CAN message.

        Args:
            message_name: Name of message in DBC
            data: Dictionary of signal_name: value pairs

        Returns:
            Encoded CANMessage or None if encoding fails

        Example:
            >>> msg = loader.encode_message(
            ...     'PowerCommand',
            ...     {'power_kw': 50.0, 'enable': True}
            ... )
        """
        if not self._loaded or not self._database:
            logger.error("DBC file not loaded")
            return None

        try:
            # TODO: Implement using cantools
            # message = self._database.get_message_by_name(message_name)
            # encoded_data = message.encode(data)
            #
            # return CANMessage(
            #     arbitration_id=message.frame_id,
            #     data=encoded_data,
            #     is_extended=message.is_extended_frame
            # )

            logger.debug("Encoded message '%s' with data: %s", message_name, data)
            return None

        except Exception as e:
            logger.error("Failed to encode message '%s': %s", message_name, e)
            return None

    def decode_message(self, message: CANMessage) -> Optional[Dict[str, Any]]:
        """Decode CAN message into signal data.

        Args:
            message: CANMessage to decode

        Returns:
            Dictionary of signal_name: value pairs or None if unknown message

        Example:
            >>> signals = loader.decode_message(can_msg)
            >>> power = signals.get('power_kw')
        """
        if not self._loaded or not self._database:
            logger.error("DBC file not loaded")
            return None

        try:
            # TODO: Implement using cantools
            # dbc_message = self._database.get_message_by_frame_id(
            #     message.arbitration_id
            # )
            # signals = dbc_message.decode(message.data)
            #
            # logger.debug(
            #     "Decoded message ID 0x%03X: %s",
            #     message.arbitration_id,
            #     signals
            # )
            # return signals

            return None

        except Exception as e:
            logger.debug(
                "Failed to decode message ID 0x%03X: %s", message.arbitration_id, e
            )
            return None

    def get_message_id(self, message_name: str) -> Optional[int]:
        """Get CAN ID for message name.

        Args:
            message_name: Name of message in DBC

        Returns:
            CAN arbitration ID or None if not found
        """
        if not self._loaded or not self._database:
            logger.error("DBC file not loaded")
            return None

        try:
            # TODO: Implement using cantools
            # message = self._database.get_message_by_name(message_name)
            # return message.frame_id

            return None

        except Exception as e:
            logger.error("Message '%s' not found in DBC: %s", message_name, e)
            return None

Attributes

is_loaded property

Check if DBC file is loaded.

Functions

load()

Load and parse DBC file.

Returns:

Type Description
bool

True if loaded successfully, False otherwise

Raises:

Type Description
FileNotFoundError

If DBC file not found

ValueError

If DBC file format invalid

Source code in code/bess_rcu/drivers/can/dbc_loader.py
def load(self) -> bool:
    """Load and parse DBC file.

    Returns:
        True if loaded successfully, False otherwise

    Raises:
        FileNotFoundError: If DBC file not found
        ValueError: If DBC file format invalid
    """
    if self._loaded:
        logger.warning("DBC file already loaded")
        return True

    try:
        dbc_file = Path(self.dbc_path)
        if not dbc_file.exists():
            raise FileNotFoundError("DBC file not found: %s", self.dbc_path)

        # TODO: Implement using cantools
        # self._database = cantools.database.load_file(self.dbc_path)

        self._loaded = True
        logger.info("Loaded DBC file: %s", self.dbc_path)
        # logger.info("  Messages: %d", len(self._database.messages))
        return True

    except FileNotFoundError as e:
        logger.error("DBC file not found: %s", e)
        raise
    except Exception as e:
        logger.error("Failed to load DBC file: %s", e)
        raise ValueError("Invalid DBC file format") from e

encode_message(message_name, data)

Encode signal data into CAN message.

Parameters:

Name Type Description Default
message_name str

Name of message in DBC

required
data Dict[str, Any]

Dictionary of signal_name: value pairs

required

Returns:

Type Description
Optional[CANMessage]

Encoded CANMessage or None if encoding fails

Example

msg = loader.encode_message( ... 'PowerCommand', ... {'power_kw': 50.0, 'enable': True} ... )

Source code in code/bess_rcu/drivers/can/dbc_loader.py
def encode_message(
    self, message_name: str, data: Dict[str, Any]
) -> Optional[CANMessage]:
    """Encode signal data into CAN message.

    Args:
        message_name: Name of message in DBC
        data: Dictionary of signal_name: value pairs

    Returns:
        Encoded CANMessage or None if encoding fails

    Example:
        >>> msg = loader.encode_message(
        ...     'PowerCommand',
        ...     {'power_kw': 50.0, 'enable': True}
        ... )
    """
    if not self._loaded or not self._database:
        logger.error("DBC file not loaded")
        return None

    try:
        # TODO: Implement using cantools
        # message = self._database.get_message_by_name(message_name)
        # encoded_data = message.encode(data)
        #
        # return CANMessage(
        #     arbitration_id=message.frame_id,
        #     data=encoded_data,
        #     is_extended=message.is_extended_frame
        # )

        logger.debug("Encoded message '%s' with data: %s", message_name, data)
        return None

    except Exception as e:
        logger.error("Failed to encode message '%s': %s", message_name, e)
        return None

decode_message(message)

Decode CAN message into signal data.

Parameters:

Name Type Description Default
message CANMessage

CANMessage to decode

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary of signal_name: value pairs or None if unknown message

Example

signals = loader.decode_message(can_msg) power = signals.get('power_kw')

Source code in code/bess_rcu/drivers/can/dbc_loader.py
def decode_message(self, message: CANMessage) -> Optional[Dict[str, Any]]:
    """Decode CAN message into signal data.

    Args:
        message: CANMessage to decode

    Returns:
        Dictionary of signal_name: value pairs or None if unknown message

    Example:
        >>> signals = loader.decode_message(can_msg)
        >>> power = signals.get('power_kw')
    """
    if not self._loaded or not self._database:
        logger.error("DBC file not loaded")
        return None

    try:
        # TODO: Implement using cantools
        # dbc_message = self._database.get_message_by_frame_id(
        #     message.arbitration_id
        # )
        # signals = dbc_message.decode(message.data)
        #
        # logger.debug(
        #     "Decoded message ID 0x%03X: %s",
        #     message.arbitration_id,
        #     signals
        # )
        # return signals

        return None

    except Exception as e:
        logger.debug(
            "Failed to decode message ID 0x%03X: %s", message.arbitration_id, e
        )
        return None

get_message_id(message_name)

Get CAN ID for message name.

Parameters:

Name Type Description Default
message_name str

Name of message in DBC

required

Returns:

Type Description
Optional[int]

CAN arbitration ID or None if not found

Source code in code/bess_rcu/drivers/can/dbc_loader.py
def get_message_id(self, message_name: str) -> Optional[int]:
    """Get CAN ID for message name.

    Args:
        message_name: Name of message in DBC

    Returns:
        CAN arbitration ID or None if not found
    """
    if not self._loaded or not self._database:
        logger.error("DBC file not loaded")
        return None

    try:
        # TODO: Implement using cantools
        # message = self._database.get_message_by_name(message_name)
        # return message.frame_id

        return None

    except Exception as e:
        logger.error("Message '%s' not found in DBC: %s", message_name, e)
        return None

GPIO Drivers

GPIODriverBase

bess_rcu.drivers.gpio.base.GPIODriverBase

Bases: ABC

Abstract base class for GPIO drivers.

This class defines the interface for digital I/O control on embedded Linux platforms using various GPIO libraries.

Parameters:

Name Type Description Default
chip str

GPIO chip device (e.g., '/dev/gpiochip0')

'/dev/gpiochip0'
Example

class LinuxGPIODriver(GPIODriverBase): ... def initialize(self): ... # Implementation ... pass

Source code in code/bess_rcu/drivers/gpio/base.py
class GPIODriverBase(ABC):
    """Abstract base class for GPIO drivers.

    This class defines the interface for digital I/O control on
    embedded Linux platforms using various GPIO libraries.

    Args:
        chip: GPIO chip device (e.g., '/dev/gpiochip0')

    Example:
        >>> class LinuxGPIODriver(GPIODriverBase):
        ...     def initialize(self):
        ...         # Implementation
        ...         pass
    """

    def __init__(self, chip: str = "/dev/gpiochip0") -> None:
        self.chip = chip
        self._initialized = False

    @abstractmethod
    def initialize(self) -> bool:
        """Initialize GPIO subsystem.

        Returns:
            True if initialization successful, False otherwise

        Raises:
            DeviceError: If GPIO chip initialization fails
        """

    @abstractmethod
    def cleanup(self) -> None:
        """Cleanup GPIO resources and release pins."""

    @abstractmethod
    def setup_pin(self, pin: int, mode: PinMode) -> bool:
        """Configure GPIO pin mode.

        Args:
            pin: GPIO pin number
            mode: Pin mode (INPUT, OUTPUT, etc.)

        Returns:
            True if configuration successful, False otherwise

        Raises:
            ValueError: If pin number is invalid
        """

    @abstractmethod
    def read_pin(self, pin: int) -> Optional[PinState]:
        """Read GPIO pin state.

        Args:
            pin: GPIO pin number

        Returns:
            Current pin state (LOW/HIGH) or None on error

        Raises:
            ValueError: If pin is not configured as input
        """

    @abstractmethod
    def write_pin(self, pin: int, state: PinState) -> bool:
        """Write GPIO pin state.

        Args:
            pin: GPIO pin number
            state: Desired pin state (LOW/HIGH)

        Returns:
            True if write successful, False otherwise

        Raises:
            ValueError: If pin is not configured as output
        """

    @property
    def is_initialized(self) -> bool:
        """Check if GPIO driver is initialized."""
        return self._initialized

Attributes

is_initialized property

Check if GPIO driver is initialized.

Functions

initialize() abstractmethod

Initialize GPIO subsystem.

Returns:

Type Description
bool

True if initialization successful, False otherwise

Raises:

Type Description
DeviceError

If GPIO chip initialization fails

Source code in code/bess_rcu/drivers/gpio/base.py
@abstractmethod
def initialize(self) -> bool:
    """Initialize GPIO subsystem.

    Returns:
        True if initialization successful, False otherwise

    Raises:
        DeviceError: If GPIO chip initialization fails
    """

cleanup() abstractmethod

Cleanup GPIO resources and release pins.

Source code in code/bess_rcu/drivers/gpio/base.py
@abstractmethod
def cleanup(self) -> None:
    """Cleanup GPIO resources and release pins."""

setup_pin(pin, mode) abstractmethod

Configure GPIO pin mode.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
mode PinMode

Pin mode (INPUT, OUTPUT, etc.)

required

Returns:

Type Description
bool

True if configuration successful, False otherwise

Raises:

Type Description
ValueError

If pin number is invalid

Source code in code/bess_rcu/drivers/gpio/base.py
@abstractmethod
def setup_pin(self, pin: int, mode: PinMode) -> bool:
    """Configure GPIO pin mode.

    Args:
        pin: GPIO pin number
        mode: Pin mode (INPUT, OUTPUT, etc.)

    Returns:
        True if configuration successful, False otherwise

    Raises:
        ValueError: If pin number is invalid
    """

read_pin(pin) abstractmethod

Read GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required

Returns:

Type Description
Optional[PinState]

Current pin state (LOW/HIGH) or None on error

Raises:

Type Description
ValueError

If pin is not configured as input

Source code in code/bess_rcu/drivers/gpio/base.py
@abstractmethod
def read_pin(self, pin: int) -> Optional[PinState]:
    """Read GPIO pin state.

    Args:
        pin: GPIO pin number

    Returns:
        Current pin state (LOW/HIGH) or None on error

    Raises:
        ValueError: If pin is not configured as input
    """

write_pin(pin, state) abstractmethod

Write GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
state PinState

Desired pin state (LOW/HIGH)

required

Returns:

Type Description
bool

True if write successful, False otherwise

Raises:

Type Description
ValueError

If pin is not configured as output

Source code in code/bess_rcu/drivers/gpio/base.py
@abstractmethod
def write_pin(self, pin: int, state: PinState) -> bool:
    """Write GPIO pin state.

    Args:
        pin: GPIO pin number
        state: Desired pin state (LOW/HIGH)

    Returns:
        True if write successful, False otherwise

    Raises:
        ValueError: If pin is not configured as output
    """

LinuxGPIODriver

bess_rcu.drivers.gpio.linux_gpio.LinuxGPIODriver

Bases: GPIODriverBase

Linux GPIO driver using gpiod library.

Provides GPIO control on Linux using the modern character device interface (/dev/gpiochipX) instead of deprecated sysfs.

Parameters:

Name Type Description Default
chip str

GPIO chip device path (default: '/dev/gpiochip0')

'/dev/gpiochip0'
consumer str

Consumer name for GPIO requests (default: 'bess_rcu')

'bess_rcu'
Example

driver = LinuxGPIODriver(chip='/dev/gpiochip0') driver.initialize() driver.setup_pin(17, PinMode.OUTPUT) driver.write_pin(17, PinState.HIGH) state = driver.read_pin(18)

Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
class LinuxGPIODriver(GPIODriverBase):
    """Linux GPIO driver using gpiod library.

    Provides GPIO control on Linux using the modern character device
    interface (/dev/gpiochipX) instead of deprecated sysfs.

    Args:
        chip: GPIO chip device path (default: '/dev/gpiochip0')
        consumer: Consumer name for GPIO requests (default: 'bess_rcu')

    Example:
        >>> driver = LinuxGPIODriver(chip='/dev/gpiochip0')
        >>> driver.initialize()
        >>> driver.setup_pin(17, PinMode.OUTPUT)
        >>> driver.write_pin(17, PinState.HIGH)
        >>> state = driver.read_pin(18)
    """

    def __init__(
        self, chip: str = "/dev/gpiochip0", consumer: str = "bess_rcu"
    ) -> None:
        super().__init__(chip=chip)
        self.consumer = consumer
        self._chip_handle = None
        self._lines: Dict[int, Any] = {}  # Pin number -> gpiod line

    def initialize(self) -> bool:
        """Initialize GPIO subsystem.

        Returns:
            True if initialization successful, False otherwise

        Raises:
            DeviceError: If GPIO chip initialization fails
        """
        if self._initialized:
            logger.warning("GPIO driver already initialized")
            return True

        try:
            # TODO: Implement using gpiod
            # self._chip_handle = gpiod.Chip(self.chip)

            self._initialized = True
            logger.info("Initialized GPIO driver for chip: %s", self.chip)
            return True

        except Exception as e:
            logger.error("Failed to initialize GPIO driver: %s", e)
            return False

    def cleanup(self) -> None:
        """Cleanup GPIO resources and release pins."""
        if not self._initialized:
            return

        try:
            # Release all lines
            for pin, line in self._lines.items():
                # TODO: Release line
                # line.release()
                logger.debug("Released GPIO pin %d", pin)

            self._lines.clear()

            # Close chip
            if self._chip_handle:
                # TODO: Close chip
                # self._chip_handle.close()
                self._chip_handle = None

            self._initialized = False
            logger.info("GPIO driver cleanup complete")

        except Exception as e:
            logger.error("Error during GPIO cleanup: %s", e)

    def setup_pin(self, pin: int, mode: PinMode) -> bool:
        """Configure GPIO pin mode.

        Args:
            pin: GPIO pin number
            mode: Pin mode (INPUT, OUTPUT, etc.)

        Returns:
            True if configuration successful, False otherwise

        Raises:
            ValueError: If pin number is invalid
        """
        if not self._initialized:
            logger.error("GPIO driver not initialized")
            return False

        try:
            # TODO: Implement using gpiod
            # line = self._chip_handle.get_line(pin)
            #
            # if mode == PinMode.OUTPUT:
            #     line.request(consumer=self.consumer, type=gpiod.LINE_REQ_DIR_OUT)
            # elif mode == PinMode.INPUT:
            #     line.request(consumer=self.consumer, type=gpiod.LINE_REQ_DIR_IN)
            # elif mode == PinMode.INPUT_PULLUP:
            #     line.request(
            #         consumer=self.consumer,
            #         type=gpiod.LINE_REQ_DIR_IN,
            #         flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP
            #     )
            # elif mode == PinMode.INPUT_PULLDOWN:
            #     line.request(
            #         consumer=self.consumer,
            #         type=gpiod.LINE_REQ_DIR_IN,
            #         flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_DOWN
            #     )
            #
            # self._lines[pin] = line

            logger.info("Configured GPIO pin %d as %s", pin, mode.value)
            return True

        except Exception as e:
            logger.error("Failed to setup GPIO pin %d: %s", pin, e)
            return False

    def read_pin(self, pin: int) -> Optional[PinState]:
        """Read GPIO pin state.

        Args:
            pin: GPIO pin number

        Returns:
            Current pin state (LOW/HIGH) or None on error

        Raises:
            ValueError: If pin is not configured as input
        """
        if not self._initialized:
            logger.error("GPIO driver not initialized")
            return None

        if pin not in self._lines:
            logger.error("GPIO pin %d not configured", pin)
            return None

        try:
            # TODO: Implement using gpiod
            # line = self._lines[pin]
            # value = line.get_value()
            # return PinState.HIGH if value == 1 else PinState.LOW

            return None

        except Exception as e:
            logger.error("Failed to read GPIO pin %d: %s", pin, e)
            return None

    def write_pin(self, pin: int, state: PinState) -> bool:
        """Write GPIO pin state.

        Args:
            pin: GPIO pin number
            state: Desired pin state (LOW/HIGH)

        Returns:
            True if write successful, False otherwise

        Raises:
            ValueError: If pin is not configured as output
        """
        if not self._initialized:
            logger.error("GPIO driver not initialized")
            return False

        if pin not in self._lines:
            logger.error("GPIO pin %d not configured", pin)
            return False

        try:
            # TODO: Implement using gpiod
            # line = self._lines[pin]
            # line.set_value(state.value)

            logger.debug("Set GPIO pin %d to %s", pin, state.name)
            return True

        except Exception as e:
            logger.error("Failed to write GPIO pin %d: %s", pin, e)
            return False

Attributes

is_initialized property

Check if GPIO driver is initialized.

Functions

initialize()

Initialize GPIO subsystem.

Returns:

Type Description
bool

True if initialization successful, False otherwise

Raises:

Type Description
DeviceError

If GPIO chip initialization fails

Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
def initialize(self) -> bool:
    """Initialize GPIO subsystem.

    Returns:
        True if initialization successful, False otherwise

    Raises:
        DeviceError: If GPIO chip initialization fails
    """
    if self._initialized:
        logger.warning("GPIO driver already initialized")
        return True

    try:
        # TODO: Implement using gpiod
        # self._chip_handle = gpiod.Chip(self.chip)

        self._initialized = True
        logger.info("Initialized GPIO driver for chip: %s", self.chip)
        return True

    except Exception as e:
        logger.error("Failed to initialize GPIO driver: %s", e)
        return False

cleanup()

Cleanup GPIO resources and release pins.

Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
def cleanup(self) -> None:
    """Cleanup GPIO resources and release pins."""
    if not self._initialized:
        return

    try:
        # Release all lines
        for pin, line in self._lines.items():
            # TODO: Release line
            # line.release()
            logger.debug("Released GPIO pin %d", pin)

        self._lines.clear()

        # Close chip
        if self._chip_handle:
            # TODO: Close chip
            # self._chip_handle.close()
            self._chip_handle = None

        self._initialized = False
        logger.info("GPIO driver cleanup complete")

    except Exception as e:
        logger.error("Error during GPIO cleanup: %s", e)

setup_pin(pin, mode)

Configure GPIO pin mode.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
mode PinMode

Pin mode (INPUT, OUTPUT, etc.)

required

Returns:

Type Description
bool

True if configuration successful, False otherwise

Raises:

Type Description
ValueError

If pin number is invalid

Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
def setup_pin(self, pin: int, mode: PinMode) -> bool:
    """Configure GPIO pin mode.

    Args:
        pin: GPIO pin number
        mode: Pin mode (INPUT, OUTPUT, etc.)

    Returns:
        True if configuration successful, False otherwise

    Raises:
        ValueError: If pin number is invalid
    """
    if not self._initialized:
        logger.error("GPIO driver not initialized")
        return False

    try:
        # TODO: Implement using gpiod
        # line = self._chip_handle.get_line(pin)
        #
        # if mode == PinMode.OUTPUT:
        #     line.request(consumer=self.consumer, type=gpiod.LINE_REQ_DIR_OUT)
        # elif mode == PinMode.INPUT:
        #     line.request(consumer=self.consumer, type=gpiod.LINE_REQ_DIR_IN)
        # elif mode == PinMode.INPUT_PULLUP:
        #     line.request(
        #         consumer=self.consumer,
        #         type=gpiod.LINE_REQ_DIR_IN,
        #         flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP
        #     )
        # elif mode == PinMode.INPUT_PULLDOWN:
        #     line.request(
        #         consumer=self.consumer,
        #         type=gpiod.LINE_REQ_DIR_IN,
        #         flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_DOWN
        #     )
        #
        # self._lines[pin] = line

        logger.info("Configured GPIO pin %d as %s", pin, mode.value)
        return True

    except Exception as e:
        logger.error("Failed to setup GPIO pin %d: %s", pin, e)
        return False

read_pin(pin)

Read GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required

Returns:

Type Description
Optional[PinState]

Current pin state (LOW/HIGH) or None on error

Raises:

Type Description
ValueError

If pin is not configured as input

Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
def read_pin(self, pin: int) -> Optional[PinState]:
    """Read GPIO pin state.

    Args:
        pin: GPIO pin number

    Returns:
        Current pin state (LOW/HIGH) or None on error

    Raises:
        ValueError: If pin is not configured as input
    """
    if not self._initialized:
        logger.error("GPIO driver not initialized")
        return None

    if pin not in self._lines:
        logger.error("GPIO pin %d not configured", pin)
        return None

    try:
        # TODO: Implement using gpiod
        # line = self._lines[pin]
        # value = line.get_value()
        # return PinState.HIGH if value == 1 else PinState.LOW

        return None

    except Exception as e:
        logger.error("Failed to read GPIO pin %d: %s", pin, e)
        return None

write_pin(pin, state)

Write GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
state PinState

Desired pin state (LOW/HIGH)

required

Returns:

Type Description
bool

True if write successful, False otherwise

Raises:

Type Description
ValueError

If pin is not configured as output

Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
def write_pin(self, pin: int, state: PinState) -> bool:
    """Write GPIO pin state.

    Args:
        pin: GPIO pin number
        state: Desired pin state (LOW/HIGH)

    Returns:
        True if write successful, False otherwise

    Raises:
        ValueError: If pin is not configured as output
    """
    if not self._initialized:
        logger.error("GPIO driver not initialized")
        return False

    if pin not in self._lines:
        logger.error("GPIO pin %d not configured", pin)
        return False

    try:
        # TODO: Implement using gpiod
        # line = self._lines[pin]
        # line.set_value(state.value)

        logger.debug("Set GPIO pin %d to %s", pin, state.name)
        return True

    except Exception as e:
        logger.error("Failed to write GPIO pin %d: %s", pin, e)
        return False

Usage Examples

Modbus TCP Server

from bess_rcu.drivers.modbus import ModbusTCPDriver, DriverMode

# Create Modbus TCP server for PLC
driver = ModbusTCPDriver(
    host="0.0.0.0",
    port=502,
    mode=DriverMode.SERVER
)

# Connect and start serving
await driver.connect()

# Write coil
await driver.write_coil(address=0x0001, value=True)

# Read holding register
value = await driver.read_holding_register(address=0x2001)

CAN Bus with DBC

from bess_rcu.drivers.can import SocketCANDriver, DBCLoader

# Load DBC file
dbc_loader = DBCLoader()
database = dbc_loader.load("config/can/pcs.dbc")

# Create CAN driver
can_driver = SocketCANDriver(
    channel="can0",
    bitrate=500000,
    dbc_database=database
)

# Connect
await can_driver.connect()

# Send message using DBC definition
await can_driver.send_message(
    "PowerCommand",
    {"power_kw": 75.0, "mode": 1}
)

# Receive and decode
msg = await can_driver.receive_message(timeout=1.0)
decoded = can_driver.decode_message(msg)

GPIO Control

from bess_rcu.drivers.gpio import LinuxGPIODriver, PinMode

# Create GPIO driver
gpio = LinuxGPIODriver(chip="/dev/gpiochip0")

# Initialize
await gpio.initialize()

# Configure output pin
await gpio.configure_pin(pin=17, mode=PinMode.OUTPUT)

# Set pin high
await gpio.write_pin(pin=17, value=True)

# Read input pin
value = await gpio.read_pin(pin=27)

Driver Mode Enumeration

All communication drivers support two operational modes:

from bess_rcu.drivers.modbus import DriverMode

# Server mode - listen for incoming connections
driver = ModbusTCPDriver(mode=DriverMode.SERVER)

# Client mode - initiate connections
driver = ModbusTCPDriver(mode=DriverMode.CLIENT)

Common Interfaces

All drivers implement these common methods:

  • connect() - Establish connection/initialize hardware
  • disconnect() - Clean shutdown and resource cleanup
  • is_connected() - Check connection status

Protocol-specific methods are documented in each driver class.