Skip to content

Drivers API Reference

Hardware abstraction layer for communication protocols and I/O operations.

Overview

The BESS RCU firmware uses a unified driver architecture organized by protocol type:

  • Modbus: TCP and RTU variants for PLC and sensor communication
  • CAN Bus: SocketCAN interface with DBC file support for PCS/BMS
  • GPIO: Digital I/O for safety interlocks and status indicators

All drivers follow a consistent interface pattern supporting both server and client modes.


Modbus Drivers

ModbusWrapperServer

bess_rcu.drivers.modbus.modbus_wrapper_server.ModbusWrapperServer

Modbus TCP/RTU server wrapper with optional Redis synchronisation.

Functions

__init__(iface='tcp', *, layout=None, redis_client=None, **kwargs)

Initialize Modbus TCP/RTU server.

Parameters:

Name Type Description Default
iface str

Interface type - tcp or rtu/serial.

'tcp'
layout ModbusLayout | None

Custom :class:ModbusLayout or None for defaults.

None
redis_client RedisClient | None

Sync to Redis server

None
**kwargs

Interface-specific configuration

{}

Raises:

Type Description
ValueError

If unsupported interface or missing parameters.

start_server() async

Start the async Modbus server.

start_database_sync(interval=5.0) async

Periodically fetch values from Redis and update the Modbus server.

Parameters:

Name Type Description Default
interval float

Seconds to wait between each sync pass.

5.0

ModbusWrapperClient

bess_rcu.drivers.modbus.modbus_wrapper_client.ModbusWrapperClient

Modbus TCP/RTU client with optional Redis cache.

Functions

__init__(iface='tcp', redis_client=None, **kwargs)

Initialize Modbus client instance.

Parameters:

Name Type Description Default
iface str

"tcp" or "rtu"

'tcp'
redis_client RedisClient | None

Optional :class:~.database.RedisClient

None
**kwargs

Modbus client configs

{}

connect()

Connect to Modbus server.

close()

Close the Modbus connection.

write_holding_registers(address, values, device_id=1)

Write multiple values to holding registers.

poll(task)

Poll based on Function Code.

Parameters:

Name Type Description Default
device_id

Modbus slave/unit ID

required
fc

Function code

required
address

Starting register/coil address

required
count

Number of registers/coils to read

required

Returns:

Type Description
list[int] | None

List of raw bytes, or None on error

poll_and_update(task)

Poll and update values to Redis.

poll_log(data, task, mode, update)

Log the result of a poll / poll and update.

start_polling(tasks, interval=1.0, update=False, mode='conv')

Continuously poll all tasks in a loop for a given interval.

Parameters:

Name Type Description Default
tasks list[dict]

List of polling tasks

required
interval float

Polling interval in seconds (default: 1.0s)

1.0
update bool

If True, updates polled values to Redis

False
mode str

Log mode: "raw" or "conv"

'conv'

Raises:

Type Description
ConnectionError

If client fails to connect to server.

KeyboardInterrupt

If polling is interrupted by the user.

ModbusLayout

bess_rcu.drivers.modbus.modbus_wrapper_server.ModbusLayout dataclass

A dataclass representing the complete Modbus datamodel.

Attributes:

Name Type Description
co CustomDataBlock

Coils (Read-Write).

di CustomDataBlock

Discrete Inputs (Read-only)

hr CustomDataBlock

Holding Registers (Read-Write)

ir CustomDataBlock

Input Registers (Read-only)


CAN Bus Drivers

CANDriverBase

bess_rcu.drivers.can.base.CANDriverBase

Bases: ABC

Abstract base class for CAN bus drivers.

This class defines the interface for CAN bus communication regardless of the underlying hardware (SocketCAN, PCAN, etc.).

Parameters:

Name Type Description Default
channel str

CAN channel name (e.g., 'can0', 'vcan0')

required
bitrate int

Bus bitrate in bps (default: 500000)

DEFAULT_BITRATE
can_filters list[dict[str, int]] | None

List of CAN ID filters (optional)

None
Example

class SocketCANDriver(CANDriverBase): ... async def connect(self): ... # Implementation ... pass

Attributes

is_connected property

Check if connected to CAN bus.

Functions

connect() abstractmethod async

Initialize and connect to CAN bus.

Returns:

Type Description
bool

True if connection successful, False otherwise

Raises:

Type Description
CommunicationError

If CAN interface initialization fails

disconnect() abstractmethod async

Disconnect from CAN bus and cleanup resources.

send(message) abstractmethod async

Send CAN message on the bus.

Parameters:

Name Type Description Default
message CANMessage

CANMessage to send

required

Returns:

Type Description
bool

True if send successful, False otherwise

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

receive(timeout=DEFAULT_TIMEOUT) abstractmethod async

Receive CAN message from bus.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for message in seconds

DEFAULT_TIMEOUT

Returns:

Type Description
CANMessage | None

Received CANMessage or None if timeout

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

set_filters(filters) abstractmethod

Set CAN ID filters.

Parameters:

Name Type Description Default
filters list[dict[str, int]]

List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}]

required

Returns:

Type Description
bool

True if filters applied successfully

CANMessage

bess_rcu.drivers.can.base.CANMessage dataclass

CAN message data structure.

Attributes:

Name Type Description
arbitration_id int

CAN message ID

data bytes

Message payload (up to 8 bytes)

is_extended bool

True for 29-bit ID, False for 11-bit

timestamp float | None

Message timestamp (optional)

SocketCANDriver

bess_rcu.drivers.can.socketcan.SocketCANDriver

Bases: CANDriverBase

SocketCAN driver implementation for Linux.

Supports both real CAN hardware (can0, can1) and virtual CAN (vcan0) using the python-can library with SocketCAN interface.

Parameters:

Name Type Description Default
channel str

CAN channel name (e.g., 'can0', 'vcan0')

required
bitrate int

Bus bitrate in bps (default: 500000)

500000
can_filters list[dict[str, int]] | None

List of CAN ID filters (optional)

None
Example

driver = SocketCANDriver(channel="vcan0", bitrate=500000) await driver.connect() msg = CANMessage(arbitration_id=0x100, data=b"\x01\x02\x03") await driver.send(msg) received = await driver.receive(timeout=1.0)

Attributes

is_connected property

Check if connected to CAN bus.

Functions

connect() async

Initialize and connect to SocketCAN interface.

Returns:

Type Description
bool

True if connection successful, False otherwise

Raises:

Type Description
CommunicationError

If CAN interface initialization fails

disconnect() async

Disconnect from SocketCAN interface and cleanup resources.

send(message) async

Send CAN message on the bus.

Parameters:

Name Type Description Default
message CANMessage

CANMessage to send

required

Returns:

Type Description
bool

True if send successful, False otherwise

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

receive(timeout=DEFAULT_TIMEOUT) async

Receive CAN message from bus.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for message in seconds

DEFAULT_TIMEOUT

Returns:

Type Description
CANMessage | None

Received CANMessage or None if timeout

Raises:

Type Description
DeviceNotConnectedError

If not connected to bus

set_filters(filters)

Set CAN ID filters.

Parameters:

Name Type Description Default
filters list[dict[str, int]]

List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}]

required

Returns:

Type Description
bool

True if filters applied successfully

DBCLoader

bess_rcu.drivers.can.dbc_loader.DBCLoader

Bases: DBCLoaderBase

DBC file loader using cantools library.

Parses CAN database files (.dbc) and provides message encoding/decoding based on signal definitions.

Parameters:

Name Type Description Default
dbc_path str

Path to DBC file

required
Example

loader = DBCLoader(dbc_path="config/can/pcs.dbc") loader.load() msg = loader.encode_message("PowerCommand", {"power_kw": 50.0}) signals = loader.decode_message(received_msg)

Functions

load()

Load and parse DBC file.

Returns:

Type Description
bool

True if loaded successfully, False otherwise

Raises:

Type Description
FileNotFoundError

If DBC file not found

ValueError

If DBC file format invalid

encode_message(message_name, data)

Encode signal data into CAN message.

Parameters:

Name Type Description Default
message_name str

Name of message in DBC

required
data dict[str, Any]

Dictionary of signal_name: value pairs

required

Returns:

Type Description
CANMessage | None

Encoded CANMessage or None if encoding fails

Example

msg = loader.encode_message( ... "PowerCommand", {"power_kw": 50.0, "enable": True} ... )

decode_message(message)

Decode CAN message into signal data.

Parameters:

Name Type Description Default
message CANMessage

CANMessage to decode

required

Returns:

Type Description
dict[str, Any] | None

Dictionary of signal_name: value pairs or None if unknown message

Example

signals = loader.decode_message(can_msg) power = signals.get("power_kw")

get_message_id(message_name)

Get CAN ID for message name.

Parameters:

Name Type Description Default
message_name str

Name of message in DBC

required

Returns:

Type Description
int | None

CAN arbitration ID or None if not found


GPIO Drivers

GPIODriverBase

bess_rcu.drivers.gpio.base.GPIODriverBase

Bases: ABC

Abstract base class for GPIO drivers.

This class defines the interface for digital I/O control on embedded Linux platforms using various GPIO libraries.

Parameters:

Name Type Description Default
chip str

GPIO chip device (e.g., '/dev/gpiochip0')

'/dev/gpiochip0'
Example

class LinuxGPIODriver(GPIODriverBase): ... def initialize(self): ... # Implementation ... pass

Attributes

is_initialized property

Check if GPIO driver is initialized.

Functions

initialize() abstractmethod

Initialize GPIO subsystem.

Returns:

Type Description
bool

True if initialization successful, False otherwise

Raises:

Type Description
DeviceError

If GPIO chip initialization fails

cleanup() abstractmethod

Cleanup GPIO resources and release pins.

setup_pin(pin, mode) abstractmethod

Configure GPIO pin mode.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
mode PinMode

Pin mode (INPUT, OUTPUT, etc.)

required

Returns:

Type Description
bool

True if configuration successful, False otherwise

Raises:

Type Description
ValueError

If pin number is invalid

read_pin(pin) abstractmethod

Read GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required

Returns:

Type Description
PinState | None

Current pin state (LOW/HIGH) or None on error

Raises:

Type Description
ValueError

If pin is not configured as input

write_pin(pin, state) abstractmethod

Write GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
state PinState

Desired pin state (LOW/HIGH)

required

Returns:

Type Description
bool

True if write successful, False otherwise

Raises:

Type Description
ValueError

If pin is not configured as output

LinuxGPIODriver

bess_rcu.drivers.gpio.linux_gpio.LinuxGPIODriver

Bases: GPIODriverBase

Linux GPIO driver using gpiod library.

Provides GPIO control on Linux using the modern character device interface (/dev/gpiochipX) instead of deprecated sysfs.

Parameters:

Name Type Description Default
chip str

GPIO chip device path (default: '/dev/gpiochip0')

'/dev/gpiochip0'
consumer str

Consumer name for GPIO requests (default: 'bess_rcu')

'bess_rcu'
Example

driver = LinuxGPIODriver(chip="/dev/gpiochip0") driver.initialize() driver.setup_pin(17, PinMode.OUTPUT) driver.write_pin(17, PinState.HIGH) state = driver.read_pin(18)

Attributes

is_initialized property

Check if GPIO driver is initialized.

Functions

initialize()

Initialize GPIO subsystem.

Returns:

Type Description
bool

True if initialization successful, False otherwise

Raises:

Type Description
DeviceError

If GPIO chip initialization fails

cleanup()

Cleanup GPIO resources and release pins.

setup_pin(pin, mode)

Configure GPIO pin mode.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
mode PinMode

Pin mode (INPUT, OUTPUT, etc.)

required

Returns:

Type Description
bool

True if configuration successful, False otherwise

Raises:

Type Description
ValueError

If pin number is invalid

read_pin(pin)

Read GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required

Returns:

Type Description
PinState | None

Current pin state (LOW/HIGH) or None on error

Raises:

Type Description
ValueError

If pin is not configured as input

write_pin(pin, state)

Write GPIO pin state.

Parameters:

Name Type Description Default
pin int

GPIO pin number

required
state PinState

Desired pin state (LOW/HIGH)

required

Returns:

Type Description
bool

True if write successful, False otherwise

Raises:

Type Description
ValueError

If pin is not configured as output

PinMode

bess_rcu.drivers.gpio.base.PinMode

Bases: Enum

GPIO pin modes.

PinState

bess_rcu.drivers.gpio.base.PinState

Bases: Enum

GPIO pin states.


Driver Architecture

All drivers follow these principles:

  • Unified Interface: Consistent methods across all driver types
  • Async/Await: Non-blocking operations for concurrent communication (where applicable)
  • Type Hints: Full type annotations for IDE support
  • Error Handling: Comprehensive exception hierarchy
  • Logging: Structured logging for debugging

For implementation details and usage examples, see the module README files in the code/bess_rcu/drivers directory.