Drivers API Reference¶
Hardware abstraction layer for communication protocols and I/O operations.
Overview¶
The BESS RCU firmware uses a unified driver architecture organized by protocol type:
- Modbus: TCP and RTU variants for PLC and sensor communication
- CAN Bus: SocketCAN interface with DBC file support for PCS/BMS
- GPIO: Digital I/O for safety interlocks and status indicators
All drivers follow a consistent interface pattern supporting both server and client modes.
Modbus Drivers¶
ModbusWrapperServer¶
bess_rcu.drivers.modbus.modbus_wrapper_server.ModbusWrapperServer
¶
Modbus TCP/RTU server wrapper with optional Redis synchronisation.
Functions¶
__init__(iface='tcp', *, layout=None, redis_client=None, **kwargs)
¶
Initialize Modbus TCP/RTU server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
iface
|
str
|
Interface type - |
'tcp'
|
layout
|
ModbusLayout | None
|
Custom :class: |
None
|
redis_client
|
RedisClient | None
|
Sync to Redis server |
None
|
**kwargs
|
Interface-specific configuration |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If unsupported interface or missing parameters. |
start_server()
async
¶
Start the async Modbus server.
start_database_sync(interval=5.0)
async
¶
Periodically fetch values from Redis and update the Modbus server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interval
|
float
|
Seconds to wait between each sync pass. |
5.0
|
ModbusWrapperClient¶
bess_rcu.drivers.modbus.modbus_wrapper_client.ModbusWrapperClient
¶
Modbus TCP/RTU client with optional Redis cache.
Functions¶
__init__(iface='tcp', redis_client=None, **kwargs)
¶
Initialize Modbus client instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
iface
|
str
|
"tcp" or "rtu" |
'tcp'
|
redis_client
|
RedisClient | None
|
Optional :class: |
None
|
**kwargs
|
Modbus client configs |
{}
|
connect()
¶
Connect to Modbus server.
close()
¶
Close the Modbus connection.
write_holding_registers(address, values, device_id=1)
¶
Write multiple values to holding registers.
poll(task)
¶
Poll based on Function Code.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device_id
|
Modbus slave/unit ID |
required | |
fc
|
Function code |
required | |
address
|
Starting register/coil address |
required | |
count
|
Number of registers/coils to read |
required |
Returns:
| Type | Description |
|---|---|
list[int] | None
|
List of raw bytes, or |
poll_and_update(task)
¶
Poll and update values to Redis.
poll_log(data, task, mode, update)
¶
Log the result of a poll / poll and update.
start_polling(tasks, interval=1.0, update=False, mode='conv')
¶
Continuously poll all tasks in a loop for a given interval.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tasks
|
list[dict]
|
List of polling tasks |
required |
interval
|
float
|
Polling interval in seconds (default: 1.0s) |
1.0
|
update
|
bool
|
If True, updates polled values to Redis |
False
|
mode
|
str
|
Log mode: "raw" or "conv" |
'conv'
|
Raises:
| Type | Description |
|---|---|
ConnectionError
|
If client fails to connect to server. |
KeyboardInterrupt
|
If polling is interrupted by the user. |
ModbusLayout¶
bess_rcu.drivers.modbus.modbus_wrapper_server.ModbusLayout
dataclass
¶
A dataclass representing the complete Modbus datamodel.
Attributes:
| Name | Type | Description |
|---|---|---|
co |
CustomDataBlock
|
Coils (Read-Write). |
di |
CustomDataBlock
|
Discrete Inputs (Read-only) |
hr |
CustomDataBlock
|
Holding Registers (Read-Write) |
ir |
CustomDataBlock
|
Input Registers (Read-only) |
CAN Bus Drivers¶
CANDriverBase¶
bess_rcu.drivers.can.base.CANDriverBase
¶
Bases: ABC
Abstract base class for CAN bus drivers.
This class defines the interface for CAN bus communication regardless of the underlying hardware (SocketCAN, PCAN, etc.).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
str
|
CAN channel name (e.g., 'can0', 'vcan0') |
required |
bitrate
|
int
|
Bus bitrate in bps (default: 500000) |
DEFAULT_BITRATE
|
can_filters
|
list[dict[str, int]] | None
|
List of CAN ID filters (optional) |
None
|
Example
class SocketCANDriver(CANDriverBase): ... async def connect(self): ... # Implementation ... pass
Attributes¶
is_connected
property
¶
Check if connected to CAN bus.
Functions¶
connect()
abstractmethod
async
¶
Initialize and connect to CAN bus.
Returns:
| Type | Description |
|---|---|
bool
|
True if connection successful, False otherwise |
Raises:
| Type | Description |
|---|---|
CommunicationError
|
If CAN interface initialization fails |
disconnect()
abstractmethod
async
¶
Disconnect from CAN bus and cleanup resources.
send(message)
abstractmethod
async
¶
Send CAN message on the bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
CANMessage
|
CANMessage to send |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if send successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
receive(timeout=DEFAULT_TIMEOUT)
abstractmethod
async
¶
Receive CAN message from bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait for message in seconds |
DEFAULT_TIMEOUT
|
Returns:
| Type | Description |
|---|---|
CANMessage | None
|
Received CANMessage or None if timeout |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
set_filters(filters)
abstractmethod
¶
Set CAN ID filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filters
|
list[dict[str, int]]
|
List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}] |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if filters applied successfully |
CANMessage¶
bess_rcu.drivers.can.base.CANMessage
dataclass
¶
CAN message data structure.
Attributes:
| Name | Type | Description |
|---|---|---|
arbitration_id |
int
|
CAN message ID |
data |
bytes
|
Message payload (up to 8 bytes) |
is_extended |
bool
|
True for 29-bit ID, False for 11-bit |
timestamp |
float | None
|
Message timestamp (optional) |
SocketCANDriver¶
bess_rcu.drivers.can.socketcan.SocketCANDriver
¶
Bases: CANDriverBase
SocketCAN driver implementation for Linux.
Supports both real CAN hardware (can0, can1) and virtual CAN (vcan0) using the python-can library with SocketCAN interface.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
str
|
CAN channel name (e.g., 'can0', 'vcan0') |
required |
bitrate
|
int
|
Bus bitrate in bps (default: 500000) |
500000
|
can_filters
|
list[dict[str, int]] | None
|
List of CAN ID filters (optional) |
None
|
Example
driver = SocketCANDriver(channel="vcan0", bitrate=500000) await driver.connect() msg = CANMessage(arbitration_id=0x100, data=b"\x01\x02\x03") await driver.send(msg) received = await driver.receive(timeout=1.0)
Attributes¶
is_connected
property
¶
Check if connected to CAN bus.
Functions¶
connect()
async
¶
Initialize and connect to SocketCAN interface.
Returns:
| Type | Description |
|---|---|
bool
|
True if connection successful, False otherwise |
Raises:
| Type | Description |
|---|---|
CommunicationError
|
If CAN interface initialization fails |
disconnect()
async
¶
Disconnect from SocketCAN interface and cleanup resources.
send(message)
async
¶
Send CAN message on the bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
CANMessage
|
CANMessage to send |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if send successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
receive(timeout=DEFAULT_TIMEOUT)
async
¶
Receive CAN message from bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait for message in seconds |
DEFAULT_TIMEOUT
|
Returns:
| Type | Description |
|---|---|
CANMessage | None
|
Received CANMessage or None if timeout |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
set_filters(filters)
¶
Set CAN ID filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filters
|
list[dict[str, int]]
|
List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}] |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if filters applied successfully |
DBCLoader¶
bess_rcu.drivers.can.dbc_loader.DBCLoader
¶
Bases: DBCLoaderBase
DBC file loader using cantools library.
Parses CAN database files (.dbc) and provides message encoding/decoding based on signal definitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dbc_path
|
str
|
Path to DBC file |
required |
Example
loader = DBCLoader(dbc_path="config/can/pcs.dbc") loader.load() msg = loader.encode_message("PowerCommand", {"power_kw": 50.0}) signals = loader.decode_message(received_msg)
Functions¶
load()
¶
Load and parse DBC file.
Returns:
| Type | Description |
|---|---|
bool
|
True if loaded successfully, False otherwise |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If DBC file not found |
ValueError
|
If DBC file format invalid |
encode_message(message_name, data)
¶
Encode signal data into CAN message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_name
|
str
|
Name of message in DBC |
required |
data
|
dict[str, Any]
|
Dictionary of signal_name: value pairs |
required |
Returns:
| Type | Description |
|---|---|
CANMessage | None
|
Encoded CANMessage or None if encoding fails |
Example
msg = loader.encode_message( ... "PowerCommand", {"power_kw": 50.0, "enable": True} ... )
decode_message(message)
¶
Decode CAN message into signal data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
CANMessage
|
CANMessage to decode |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Dictionary of signal_name: value pairs or None if unknown message |
Example
signals = loader.decode_message(can_msg) power = signals.get("power_kw")
get_message_id(message_name)
¶
Get CAN ID for message name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_name
|
str
|
Name of message in DBC |
required |
Returns:
| Type | Description |
|---|---|
int | None
|
CAN arbitration ID or None if not found |
GPIO Drivers¶
GPIODriverBase¶
bess_rcu.drivers.gpio.base.GPIODriverBase
¶
Bases: ABC
Abstract base class for GPIO drivers.
This class defines the interface for digital I/O control on embedded Linux platforms using various GPIO libraries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chip
|
str
|
GPIO chip device (e.g., '/dev/gpiochip0') |
'/dev/gpiochip0'
|
Example
class LinuxGPIODriver(GPIODriverBase): ... def initialize(self): ... # Implementation ... pass
Attributes¶
is_initialized
property
¶
Check if GPIO driver is initialized.
Functions¶
initialize()
abstractmethod
¶
Initialize GPIO subsystem.
Returns:
| Type | Description |
|---|---|
bool
|
True if initialization successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceError
|
If GPIO chip initialization fails |
cleanup()
abstractmethod
¶
Cleanup GPIO resources and release pins.
setup_pin(pin, mode)
abstractmethod
¶
Configure GPIO pin mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
mode
|
PinMode
|
Pin mode (INPUT, OUTPUT, etc.) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if configuration successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin number is invalid |
read_pin(pin)
abstractmethod
¶
Read GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
Returns:
| Type | Description |
|---|---|
PinState | None
|
Current pin state (LOW/HIGH) or None on error |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as input |
write_pin(pin, state)
abstractmethod
¶
Write GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
state
|
PinState
|
Desired pin state (LOW/HIGH) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if write successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as output |
LinuxGPIODriver¶
bess_rcu.drivers.gpio.linux_gpio.LinuxGPIODriver
¶
Bases: GPIODriverBase
Linux GPIO driver using gpiod library.
Provides GPIO control on Linux using the modern character device interface (/dev/gpiochipX) instead of deprecated sysfs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chip
|
str
|
GPIO chip device path (default: '/dev/gpiochip0') |
'/dev/gpiochip0'
|
consumer
|
str
|
Consumer name for GPIO requests (default: 'bess_rcu') |
'bess_rcu'
|
Example
driver = LinuxGPIODriver(chip="/dev/gpiochip0") driver.initialize() driver.setup_pin(17, PinMode.OUTPUT) driver.write_pin(17, PinState.HIGH) state = driver.read_pin(18)
Attributes¶
is_initialized
property
¶
Check if GPIO driver is initialized.
Functions¶
initialize()
¶
Initialize GPIO subsystem.
Returns:
| Type | Description |
|---|---|
bool
|
True if initialization successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceError
|
If GPIO chip initialization fails |
cleanup()
¶
Cleanup GPIO resources and release pins.
setup_pin(pin, mode)
¶
Configure GPIO pin mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
mode
|
PinMode
|
Pin mode (INPUT, OUTPUT, etc.) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if configuration successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin number is invalid |
read_pin(pin)
¶
Read GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
Returns:
| Type | Description |
|---|---|
PinState | None
|
Current pin state (LOW/HIGH) or None on error |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as input |
write_pin(pin, state)
¶
Write GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
state
|
PinState
|
Desired pin state (LOW/HIGH) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if write successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as output |
PinMode¶
bess_rcu.drivers.gpio.base.PinMode
¶
Bases: Enum
GPIO pin modes.
PinState¶
bess_rcu.drivers.gpio.base.PinState
¶
Bases: Enum
GPIO pin states.
Driver Architecture¶
All drivers follow these principles:
- Unified Interface: Consistent methods across all driver types
- Async/Await: Non-blocking operations for concurrent communication (where applicable)
- Type Hints: Full type annotations for IDE support
- Error Handling: Comprehensive exception hierarchy
- Logging: Structured logging for debugging
For implementation details and usage examples, see the module README files in the code/bess_rcu/drivers directory.