Drivers API Reference¶
Hardware abstraction layer for communication protocols and I/O operations.
Overview¶
The BESS RCU firmware uses a unified driver architecture organized by protocol type:
- Modbus: TCP and RTU variants for PLC and sensor communication
- CAN Bus: SocketCAN interface with DBC file support for PCS/BMS
- GPIO: Digital I/O for safety interlocks and status indicators
All drivers follow a consistent interface pattern supporting both server and client modes.
Modbus Drivers¶
ModbusWrapperServer¶
bess_rcu.drivers.modbus.modbus_wrapper_server.ModbusWrapperServer
¶
Functions¶
__init__(iface='tcp', *, layout=None, **kwargs)
¶
Initialize Modbus TCP/RTU server.
:param iface: Interface type - 'tcp' or 'rtu'/'serial' :param layout: Custom ModbusLayout or None for defaults :param **kwargs: Interface config
:raises ValueError: If unsupported interface or missing parameters
ModbusWrapperClient¶
bess_rcu.drivers.modbus.modbus_wrapper_client.ModbusWrapperClient
¶
ModbusLayout¶
bess_rcu.drivers.modbus.modbus_wrapper_server.ModbusLayout
dataclass
¶
A dataclass representing the complete Modbus datamodel.
Attributes:
| Name | Type | Description |
|---|---|---|
co |
BaseModbusDataBlock
|
Coils (Read-Write). |
di |
BaseModbusDataBlock
|
Discrete Inputs (Read-only) |
hr |
BaseModbusDataBlock
|
Holding Registers (Read-Write) |
ir |
BaseModbusDataBlock
|
Input Registers (Read-only) |
CAN Bus Drivers¶
CANDriverBase¶
bess_rcu.drivers.can.base.CANDriverBase
¶
Bases: ABC
Abstract base class for CAN bus drivers.
This class defines the interface for CAN bus communication regardless of the underlying hardware (SocketCAN, PCAN, etc.).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
str
|
CAN channel name (e.g., 'can0', 'vcan0') |
required |
bitrate
|
int
|
Bus bitrate in bps (default: 500000) |
DEFAULT_BITRATE
|
can_filters
|
Optional[List[Dict[str, int]]]
|
List of CAN ID filters (optional) |
None
|
Example
class SocketCANDriver(CANDriverBase): ... async def connect(self): ... # Implementation ... pass
Attributes¶
is_connected
property
¶
Check if connected to CAN bus.
Functions¶
connect()
abstractmethod
async
¶
Initialize and connect to CAN bus.
Returns:
| Type | Description |
|---|---|
bool
|
True if connection successful, False otherwise |
Raises:
| Type | Description |
|---|---|
CommunicationError
|
If CAN interface initialization fails |
disconnect()
abstractmethod
async
¶
Disconnect from CAN bus and cleanup resources.
send(message)
abstractmethod
async
¶
Send CAN message on the bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
CANMessage
|
CANMessage to send |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if send successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
receive(timeout=DEFAULT_TIMEOUT)
abstractmethod
async
¶
Receive CAN message from bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait for message in seconds |
DEFAULT_TIMEOUT
|
Returns:
| Type | Description |
|---|---|
Optional[CANMessage]
|
Received CANMessage or None if timeout |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
set_filters(filters)
abstractmethod
¶
Set CAN ID filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filters
|
List[Dict[str, int]]
|
List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}] |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if filters applied successfully |
CANMessage¶
bess_rcu.drivers.can.base.CANMessage
dataclass
¶
CAN message data structure.
Attributes:
| Name | Type | Description |
|---|---|---|
arbitration_id |
int
|
CAN message ID |
data |
bytes
|
Message payload (up to 8 bytes) |
is_extended |
bool
|
True for 29-bit ID, False for 11-bit |
timestamp |
Optional[float]
|
Message timestamp (optional) |
SocketCANDriver¶
bess_rcu.drivers.can.socketcan.SocketCANDriver
¶
Bases: CANDriverBase
SocketCAN driver implementation for Linux.
Supports both real CAN hardware (can0, can1) and virtual CAN (vcan0) using the python-can library with SocketCAN interface.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
str
|
CAN channel name (e.g., 'can0', 'vcan0') |
required |
bitrate
|
int
|
Bus bitrate in bps (default: 500000) |
500000
|
can_filters
|
Optional[List[Dict[str, int]]]
|
List of CAN ID filters (optional) |
None
|
Example
driver = SocketCANDriver(channel='vcan0', bitrate=500000) await driver.connect() msg = CANMessage(arbitration_id=0x100, data=b'\x01\x02\x03') await driver.send(msg) received = await driver.receive(timeout=1.0)
Attributes¶
is_connected
property
¶
Check if connected to CAN bus.
Functions¶
connect()
async
¶
Initialize and connect to SocketCAN interface.
Returns:
| Type | Description |
|---|---|
bool
|
True if connection successful, False otherwise |
Raises:
| Type | Description |
|---|---|
CommunicationError
|
If CAN interface initialization fails |
disconnect()
async
¶
Disconnect from SocketCAN interface and cleanup resources.
send(message)
async
¶
Send CAN message on the bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
CANMessage
|
CANMessage to send |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if send successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
receive(timeout=DEFAULT_TIMEOUT)
async
¶
Receive CAN message from bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait for message in seconds |
DEFAULT_TIMEOUT
|
Returns:
| Type | Description |
|---|---|
Optional[CANMessage]
|
Received CANMessage or None if timeout |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
set_filters(filters)
¶
Set CAN ID filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filters
|
List[Dict[str, int]]
|
List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}] |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if filters applied successfully |
DBCLoader¶
bess_rcu.drivers.can.dbc_loader.DBCLoader
¶
Bases: DBCLoaderBase
DBC file loader using cantools library.
Parses CAN database files (.dbc) and provides message encoding/decoding based on signal definitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dbc_path
|
str
|
Path to DBC file |
required |
Example
loader = DBCLoader(dbc_path='config/can/pcs.dbc') loader.load() msg = loader.encode_message('PowerCommand', {'power_kw': 50.0}) signals = loader.decode_message(received_msg)
Functions¶
load()
¶
Load and parse DBC file.
Returns:
| Type | Description |
|---|---|
bool
|
True if loaded successfully, False otherwise |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If DBC file not found |
ValueError
|
If DBC file format invalid |
encode_message(message_name, data)
¶
Encode signal data into CAN message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_name
|
str
|
Name of message in DBC |
required |
data
|
Dict[str, Any]
|
Dictionary of signal_name: value pairs |
required |
Returns:
| Type | Description |
|---|---|
Optional[CANMessage]
|
Encoded CANMessage or None if encoding fails |
Example
msg = loader.encode_message( ... 'PowerCommand', ... {'power_kw': 50.0, 'enable': True} ... )
decode_message(message)
¶
Decode CAN message into signal data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
CANMessage
|
CANMessage to decode |
required |
Returns:
| Type | Description |
|---|---|
Optional[Dict[str, Any]]
|
Dictionary of signal_name: value pairs or None if unknown message |
Example
signals = loader.decode_message(can_msg) power = signals.get('power_kw')
get_message_id(message_name)
¶
Get CAN ID for message name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_name
|
str
|
Name of message in DBC |
required |
Returns:
| Type | Description |
|---|---|
Optional[int]
|
CAN arbitration ID or None if not found |
GPIO Drivers¶
GPIODriverBase¶
bess_rcu.drivers.gpio.base.GPIODriverBase
¶
Bases: ABC
Abstract base class for GPIO drivers.
This class defines the interface for digital I/O control on embedded Linux platforms using various GPIO libraries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chip
|
str
|
GPIO chip device (e.g., '/dev/gpiochip0') |
'/dev/gpiochip0'
|
Example
class LinuxGPIODriver(GPIODriverBase): ... def initialize(self): ... # Implementation ... pass
Attributes¶
is_initialized
property
¶
Check if GPIO driver is initialized.
Functions¶
initialize()
abstractmethod
¶
Initialize GPIO subsystem.
Returns:
| Type | Description |
|---|---|
bool
|
True if initialization successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceError
|
If GPIO chip initialization fails |
cleanup()
abstractmethod
¶
Cleanup GPIO resources and release pins.
setup_pin(pin, mode)
abstractmethod
¶
Configure GPIO pin mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
mode
|
PinMode
|
Pin mode (INPUT, OUTPUT, etc.) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if configuration successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin number is invalid |
read_pin(pin)
abstractmethod
¶
Read GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
Returns:
| Type | Description |
|---|---|
Optional[PinState]
|
Current pin state (LOW/HIGH) or None on error |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as input |
write_pin(pin, state)
abstractmethod
¶
Write GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
state
|
PinState
|
Desired pin state (LOW/HIGH) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if write successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as output |
LinuxGPIODriver¶
bess_rcu.drivers.gpio.linux_gpio.LinuxGPIODriver
¶
Bases: GPIODriverBase
Linux GPIO driver using gpiod library.
Provides GPIO control on Linux using the modern character device interface (/dev/gpiochipX) instead of deprecated sysfs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chip
|
str
|
GPIO chip device path (default: '/dev/gpiochip0') |
'/dev/gpiochip0'
|
consumer
|
str
|
Consumer name for GPIO requests (default: 'bess_rcu') |
'bess_rcu'
|
Example
driver = LinuxGPIODriver(chip='/dev/gpiochip0') driver.initialize() driver.setup_pin(17, PinMode.OUTPUT) driver.write_pin(17, PinState.HIGH) state = driver.read_pin(18)
Attributes¶
is_initialized
property
¶
Check if GPIO driver is initialized.
Functions¶
initialize()
¶
Initialize GPIO subsystem.
Returns:
| Type | Description |
|---|---|
bool
|
True if initialization successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceError
|
If GPIO chip initialization fails |
cleanup()
¶
Cleanup GPIO resources and release pins.
setup_pin(pin, mode)
¶
Configure GPIO pin mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
mode
|
PinMode
|
Pin mode (INPUT, OUTPUT, etc.) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if configuration successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin number is invalid |
read_pin(pin)
¶
Read GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
Returns:
| Type | Description |
|---|---|
Optional[PinState]
|
Current pin state (LOW/HIGH) or None on error |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as input |
write_pin(pin, state)
¶
Write GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
state
|
PinState
|
Desired pin state (LOW/HIGH) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if write successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as output |
PinMode¶
bess_rcu.drivers.gpio.base.PinMode
¶
Bases: Enum
GPIO pin modes.
PinState¶
bess_rcu.drivers.gpio.base.PinState
¶
Bases: Enum
GPIO pin states.
Driver Architecture¶
All drivers follow these principles:
- Unified Interface: Consistent methods across all driver types
- Async/Await: Non-blocking operations for concurrent communication (where applicable)
- Type Hints: Full type annotations for IDE support
- Error Handling: Comprehensive exception hierarchy
- Logging: Structured logging for debugging
For implementation details and usage examples, see the module README files in the code/bess_rcu/drivers directory.