Skip to content

Drivers API Reference

Hardware abstraction layer for communication protocols and I/O operations.

Overview

The BESS RCU firmware uses a unified driver architecture organized by protocol type:

  • Modbus: TCP and RTU variants for PLC and sensor communication
  • CAN Bus: SocketCAN interface with DBC file support for PCS/BMS
  • GPIO: Digital I/O for safety interlocks and status indicators

All drivers follow a consistent interface pattern supporting both server and client modes.


Modbus Drivers

ModbusWrapperServer

bess_rcu.drivers.modbus.modbus_wrapper_server.ModbusWrapperServer

Functions

__init__(iface='tcp', *, layout=None, **kwargs)

Initialize Modbus TCP/RTU server.

:param iface: Interface type - 'tcp' or 'rtu'/'serial' :param layout: Custom ModbusLayout or None for defaults :param **kwargs: Interface config

:raises ValueError: If unsupported interface or missing parameters

ModbusWrapperClient

bess_rcu.drivers.modbus.modbus_wrapper_client.ModbusWrapperClient

Initializes Modbus Client instance and handles polling and writing operations

Functions

__init__(iface='tcp', **kwargs)

:param iface: "tcp" or "rtu" :param kwargs: Server configs

poll(device_id, fc, address, count)

Polling based on Function Code

ModbusLayout

bess_rcu.drivers.modbus.modbus_wrapper_server.ModbusLayout dataclass

A dataclass representing the complete Modbus datamodel.

Attributes:

Name Type Description
co BaseModbusDataBlock

Coils (Read-Write).

di BaseModbusDataBlock

Discrete Inputs (Read-only)

hr BaseModbusDataBlock

Holding Registers (Read-Write)

ir BaseModbusDataBlock

Input Registers (Read-only)


CAN Bus Drivers

CANDriverBase

bess_rcu.drivers.can.base.CANDriverBase

Bases: ABC

Abstract base class for CAN bus drivers.

This class defines the interface for CAN bus communication regardless of the underlying hardware (SocketCAN, PCAN, etc.).

Parameters:

Name Type Description Default
channel str

CAN channel name (e.g., 'can0', 'vcan0')

required
bitrate int

Bus bitrate in bps (default: 500000)

DEFAULT_BITRATE
can_filters Optional[List[Dict[str, int]]]

List of CAN ID filters (optional)

None
Example

class SocketCANDriver(CANDriverBase): ... async def connect(self): ... # Implementation ... pass

Attributes

is_connected property

Check if connected to CAN bus.

Functions

connect() abstractmethod async

Initialize and connect to CAN bus.

Returns:

Type Description
bool

True if connection successful, False otherwise

Raises:

Type Description
CommunicationError

If CAN interface initialization fails

disconnect() abstractmethod async

Disconnect from CAN bus and cleanup resources.

send(message) abstractmethod async

Send CAN message on the bus.

Parameters:

Name Type Description Default
message CANMessage

CANMessage to send

required

Returns:

Type Description
bool

True if send successful, False otherwise

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

receive(timeout=DEFAULT_TIMEOUT) abstractmethod async

Receive CAN message from bus.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for message in seconds

DEFAULT_TIMEOUT

Returns:

Type Description
Optional[CANMessage]

Received CANMessage or None if timeout

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

set_filters(filters) abstractmethod

Set CAN ID filters.

Parameters:

Name Type Description Default
filters List[Dict[str, int]]

List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}]

required

Returns:

Type Description
bool

True if filters applied successfully

CANMessage

bess_rcu.drivers.can.base.CANMessage dataclass

CAN message data structure.

Attributes:

Name Type Description
arbitration_id int

CAN message ID

data bytes

Message payload (up to 8 bytes)

is_extended bool

True for 29-bit ID, False for 11-bit

timestamp Optional[float]

Message timestamp (optional)

SocketCANDriver

bess_rcu.drivers.can.socketcan.SocketCANDriver

Bases: CANDriverBase

SocketCAN driver implementation for Linux.

Supports both real CAN hardware (can0, can1) and virtual CAN (vcan0) using the python-can library with SocketCAN interface.

Parameters:

Name Type Description Default
channel str

CAN channel name (e.g., 'can0', 'vcan0')

required
bitrate int

Bus bitrate in bps (default: 500000)

500000
can_filters Optional[List[Dict[str, int]]]

List of CAN ID filters (optional)

None
Example

driver = SocketCANDriver(channel='vcan0', bitrate=500000) await driver.connect() msg = CANMessage(arbitration_id=0x100, data=b'\x01\x02\x03') await driver.send(msg) received = await driver.receive(timeout=1.0)

Attributes

is_connected property

Check if connected to CAN bus.

Functions

connect() async

Initialize and connect to SocketCAN interface.

Returns:

Type Description
bool

True if connection successful, False otherwise

Raises:

Type Description
CommunicationError

If CAN interface initialization fails

disconnect() async

Disconnect from SocketCAN interface and cleanup resources.

send(message) async

Send CAN message on the bus.

Parameters:

Name Type Description Default
message CANMessage

CANMessage to send

required

Returns:

Type Description
bool

True if send successful, False otherwise

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

receive(timeout=DEFAULT_TIMEOUT) async

Receive CAN message from bus.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for message in seconds

DEFAULT_TIMEOUT

Returns:

Type Description
Optional[CANMessage]

Received CANMessage or None if timeout

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

set_filters(filters)

Set CAN ID filters.

Parameters:

Name Type Description Default
filters List[Dict[str, int]]

List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}]

required

Returns:

Type Description
bool

True if filters applied successfully

DBCLoader

bess_rcu.drivers.can.dbc_loader.DBCLoader

Bases: DBCLoaderBase

DBC file loader using cantools library.

Parses CAN database files (.dbc) and provides message encoding/decoding based on signal definitions.

Parameters:

Name Type Description Default
dbc_path str

Path to DBC file

required
Example

loader = DBCLoader(dbc_path='config/can/pcs.dbc') loader.load() msg = loader.encode_message('PowerCommand', {'power_kw': 50.0}) signals = loader.decode_message(received_msg)

Functions

load()

Load and parse DBC file.

Returns:

Type Description
bool

True if loaded successfully, False otherwise

Raises:

Type Description
FileNotFoundError

If DBC file not found

ValueError

If DBC file format invalid

encode_message(message_name, data)

Encode signal data into CAN message.

Parameters:

Name Type Description Default
message_name str

Name of message in DBC

required
data Dict[str, Any]

Dictionary of signal_name: value pairs

required

Returns:

Type Description
Optional[CANMessage]

Encoded CANMessage or None if encoding fails

Example

msg = loader.encode_message( ... 'PowerCommand', ... {'power_kw': 50.0, 'enable': True} ... )

decode_message(message)

Decode CAN message into signal data.

Parameters:

Name Type Description Default
message CANMessage

CANMessage to decode

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary of signal_name: value pairs or None if unknown message

Example

signals = loader.decode_message(can_msg) power = signals.get('power_kw')

get_message_id(message_name)

Get CAN ID for message name.

Parameters:

Name Type Description Default
message_name str

Name of message in DBC

required

Returns:

Type Description
Optional[int]

CAN arbitration ID or None if not found


GPIO Drivers

GPIODriverBase

bess_rcu.drivers.gpio.base.GPIODriverBase

Bases: ABC

Abstract base class for GPIO drivers.

This class defines the interface for digital I/O control on embedded Linux platforms using various GPIO libraries.

Parameters:

Name Type Description Default
chip str

GPIO chip device (e.g., '/dev/gpiochip0')

'/dev/gpiochip0'
Example

class LinuxGPIODriver(GPIODriverBase): ... def initialize(self): ... # Implementation ... pass

Attributes

is_initialized property

Check if GPIO driver is initialized.

Functions

initialize() abstractmethod

Initialize GPIO subsystem.

Returns:

Type Description
bool

True if initialization successful, False otherwise

Raises:

Type Description
DeviceError

If GPIO chip initialization fails

cleanup() abstractmethod

Cleanup GPIO resources and release pins.

setup_pin(pin, mode) abstractmethod

Configure GPIO pin mode.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
mode PinMode

Pin mode (INPUT, OUTPUT, etc.)

required

Returns:

Type Description
bool

True if configuration successful, False otherwise

Raises:

Type Description
ValueError

If pin number is invalid

read_pin(pin) abstractmethod

Read GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required

Returns:

Type Description
Optional[PinState]

Current pin state (LOW/HIGH) or None on error

Raises:

Type Description
ValueError

If pin is not configured as input

write_pin(pin, state) abstractmethod

Write GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
state PinState

Desired pin state (LOW/HIGH)

required

Returns:

Type Description
bool

True if write successful, False otherwise

Raises:

Type Description
ValueError

If pin is not configured as output

LinuxGPIODriver

bess_rcu.drivers.gpio.linux_gpio.LinuxGPIODriver

Bases: GPIODriverBase

Linux GPIO driver using gpiod library.

Provides GPIO control on Linux using the modern character device interface (/dev/gpiochipX) instead of deprecated sysfs.

Parameters:

Name Type Description Default
chip str

GPIO chip device path (default: '/dev/gpiochip0')

'/dev/gpiochip0'
consumer str

Consumer name for GPIO requests (default: 'bess_rcu')

'bess_rcu'
Example

driver = LinuxGPIODriver(chip='/dev/gpiochip0') driver.initialize() driver.setup_pin(17, PinMode.OUTPUT) driver.write_pin(17, PinState.HIGH) state = driver.read_pin(18)

Attributes

is_initialized property

Check if GPIO driver is initialized.

Functions

initialize()

Initialize GPIO subsystem.

Returns:

Type Description
bool

True if initialization successful, False otherwise

Raises:

Type Description
DeviceError

If GPIO chip initialization fails

cleanup()

Cleanup GPIO resources and release pins.

setup_pin(pin, mode)

Configure GPIO pin mode.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
mode PinMode

Pin mode (INPUT, OUTPUT, etc.)

required

Returns:

Type Description
bool

True if configuration successful, False otherwise

Raises:

Type Description
ValueError

If pin number is invalid

read_pin(pin)

Read GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required

Returns:

Type Description
Optional[PinState]

Current pin state (LOW/HIGH) or None on error

Raises:

Type Description
ValueError

If pin is not configured as input

write_pin(pin, state)

Write GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
state PinState

Desired pin state (LOW/HIGH)

required

Returns:

Type Description
bool

True if write successful, False otherwise

Raises:

Type Description
ValueError

If pin is not configured as output

PinMode

bess_rcu.drivers.gpio.base.PinMode

Bases: Enum

GPIO pin modes.

PinState

bess_rcu.drivers.gpio.base.PinState

Bases: Enum

GPIO pin states.


Driver Architecture

All drivers follow these principles:

  • Unified Interface: Consistent methods across all driver types
  • Async/Await: Non-blocking operations for concurrent communication (where applicable)
  • Type Hints: Full type annotations for IDE support
  • Error Handling: Comprehensive exception hierarchy
  • Logging: Structured logging for debugging

For implementation details and usage examples, see the module README files in the code/bess_rcu/drivers directory.