Drivers API Reference¶
The drivers module provides hardware abstraction for communication protocols and I/O operations.
Overview¶
The BESS RCU firmware uses a unified driver architecture organized by protocol type:
- Modbus: TCP and RTU variants for PLC and sensor communication
- CAN Bus: SocketCAN interface with DBC file support for PCS/BMS
- GPIO: Digital I/O for safety interlocks and status indicators
All drivers follow a consistent interface pattern supporting both server and client modes through the DriverMode enumeration.
Modbus Drivers¶
ModbusDriverBase¶
bess_rcu.drivers.modbus.base.ModbusDriverBase
¶
Bases: ABC
Abstract base class for Modbus drivers (TCP and RTU).
This unified interface supports both server and client modes, allowing implementations to act as either Modbus slave or master.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
slave_id
|
int
|
Modbus slave/unit ID (1-247) |
1
|
mode
|
DriverMode
|
Operation mode (SERVER or CLIENT) |
SERVER
|
timeout
|
float
|
Communication timeout in seconds |
5.0
|
Example
class ModbusTCPDriver(ModbusDriverBase): ... async def start(self): ... # Implementation ... pass
Source code in code/bess_rcu/drivers/modbus/base.py
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | |
Attributes¶
is_running
property
¶
Check if driver is running.
is_server
property
¶
Check if driver is in server mode.
is_client
property
¶
Check if driver is in client mode.
Functions¶
start()
abstractmethod
async
¶
Start the Modbus driver (server or client).
Raises:
| Type | Description |
|---|---|
CommunicationError
|
If driver fails to start |
ConfigurationError
|
If configuration is invalid |
stop()
abstractmethod
async
¶
read_holding_registers(address, count, slave_id=None)
abstractmethod
async
¶
Read holding registers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
int
|
Starting register address |
required |
count
|
int
|
Number of registers to read |
required |
slave_id
|
Optional[int]
|
Target slave ID (client mode only, optional) |
None
|
Returns:
| Type | Description |
|---|---|
Optional[List[int]]
|
List of register values or None on error |
Raises:
| Type | Description |
|---|---|
CommunicationTimeoutError
|
If request times out |
DeviceError
|
If device returns error response |
Source code in code/bess_rcu/drivers/modbus/base.py
write_register(address, value, slave_id=None)
abstractmethod
async
¶
Write single holding register.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
int
|
Register address |
required |
value
|
int
|
Value to write (0-65535) |
required |
slave_id
|
Optional[int]
|
Target slave ID (client mode only, optional) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if successful, False otherwise |
Raises:
| Type | Description |
|---|---|
CommunicationTimeoutError
|
If request times out |
Source code in code/bess_rcu/drivers/modbus/base.py
write_multiple_registers(address, values, slave_id=None)
abstractmethod
async
¶
Write multiple holding registers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
int
|
Starting register address |
required |
values
|
List[int]
|
List of values to write |
required |
slave_id
|
Optional[int]
|
Target slave ID (client mode only, optional) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if successful, False otherwise |
Raises:
| Type | Description |
|---|---|
CommunicationTimeoutError
|
If request times out |
Source code in code/bess_rcu/drivers/modbus/base.py
ModbusTCPDriver¶
bess_rcu.drivers.modbus.tcp.ModbusTCPDriver
¶
Bases: ModbusDriverBase
Modbus TCP driver implementation.
Supports both server (slave) and client (master) modes over TCP/IP.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
slave_id
|
int
|
Modbus slave/unit ID (1-247) |
1
|
mode
|
DriverMode
|
Operation mode (SERVER or CLIENT) |
SERVER
|
host
|
str
|
IP address to bind (server) or connect to (client) |
DEFAULT_HOST
|
port
|
int
|
TCP port number (default: 502) |
DEFAULT_PORT
|
timeout
|
float
|
Communication timeout in seconds |
5.0
|
Example
Server mode:
driver = ModbusTCPDriver( ... slave_id=1, ... mode=DriverMode.SERVER, ... host='0.0.0.0', ... port=502 ... ) await driver.start()
Client mode:
driver = ModbusTCPDriver( ... slave_id=1, ... mode=DriverMode.CLIENT, ... host='192.168.1.100', ... port=502 ... ) await driver.start() values = await driver.read_holding_registers(0x1000, 10)
Source code in code/bess_rcu/drivers/modbus/tcp.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | |
Attributes¶
is_running
property
¶
Check if driver is running.
is_server
property
¶
Check if driver is in server mode.
is_client
property
¶
Check if driver is in client mode.
Functions¶
start()
async
¶
Start the Modbus TCP driver.
Raises:
| Type | Description |
|---|---|
CommunicationError
|
If driver fails to start |
ConfigurationError
|
If configuration is invalid |
Source code in code/bess_rcu/drivers/modbus/tcp.py
stop()
async
¶
Stop the Modbus TCP driver and release resources.
Source code in code/bess_rcu/drivers/modbus/tcp.py
read_holding_registers(address, count, slave_id=None)
async
¶
Read holding registers via TCP.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
int
|
Starting register address |
required |
count
|
int
|
Number of registers to read |
required |
slave_id
|
Optional[int]
|
Target slave ID (client mode only) |
None
|
Returns:
| Type | Description |
|---|---|
Optional[List[int]]
|
List of register values or None on error |
Source code in code/bess_rcu/drivers/modbus/tcp.py
write_register(address, value, slave_id=None)
async
¶
Write single holding register via TCP.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
int
|
Register address |
required |
value
|
int
|
Value to write (0-65535) |
required |
slave_id
|
Optional[int]
|
Target slave ID (client mode only) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if successful, False otherwise |
Source code in code/bess_rcu/drivers/modbus/tcp.py
write_multiple_registers(address, values, slave_id=None)
async
¶
Write multiple holding registers via TCP.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
int
|
Starting register address |
required |
values
|
List[int]
|
List of values to write |
required |
slave_id
|
Optional[int]
|
Target slave ID (client mode only) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if successful, False otherwise |
Source code in code/bess_rcu/drivers/modbus/tcp.py
ModbusRTUDriver¶
bess_rcu.drivers.modbus.rtu.ModbusRTUDriver
¶
Bases: ModbusDriverBase
Modbus RTU driver implementation.
Supports both server (slave) and client (master) modes over RS-485 serial.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
slave_id
|
int
|
Modbus slave/unit ID (1-247) |
1
|
mode
|
DriverMode
|
Operation mode (SERVER or CLIENT) |
SERVER
|
port
|
str
|
Serial port path (e.g., '/dev/ttyUSB0', 'COM3') |
'/dev/ttyUSB0'
|
baudrate
|
int
|
Serial baudrate (default: 9600) |
DEFAULT_BAUDRATE
|
bytesize
|
int
|
Number of data bits (default: 8) |
DEFAULT_BYTESIZE
|
parity
|
str
|
Parity checking ('N', 'E', 'O') (default: 'N') |
DEFAULT_PARITY
|
stopbits
|
int
|
Number of stop bits (default: 1) |
DEFAULT_STOPBITS
|
timeout
|
float
|
Communication timeout in seconds |
5.0
|
Example
Server mode:
driver = ModbusRTUDriver( ... slave_id=1, ... mode=DriverMode.SERVER, ... port='/dev/ttyUSB0', ... baudrate=9600 ... ) await driver.start()
Client mode:
driver = ModbusRTUDriver( ... slave_id=1, ... mode=DriverMode.CLIENT, ... port='/dev/ttyUSB0', ... baudrate=9600 ... ) await driver.start() values = await driver.read_holding_registers(0x1000, 10, slave_id=2)
Source code in code/bess_rcu/drivers/modbus/rtu.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | |
Attributes¶
is_running
property
¶
Check if driver is running.
is_server
property
¶
Check if driver is in server mode.
is_client
property
¶
Check if driver is in client mode.
Functions¶
start()
async
¶
Start the Modbus RTU driver.
Raises:
| Type | Description |
|---|---|
CommunicationError
|
If driver fails to start |
ConfigurationError
|
If serial port configuration is invalid |
Source code in code/bess_rcu/drivers/modbus/rtu.py
stop()
async
¶
Stop the Modbus RTU driver and release resources.
Source code in code/bess_rcu/drivers/modbus/rtu.py
read_holding_registers(address, count, slave_id=None)
async
¶
Read holding registers via RTU.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
int
|
Starting register address |
required |
count
|
int
|
Number of registers to read |
required |
slave_id
|
Optional[int]
|
Target slave ID (client mode only, required for RTU) |
None
|
Returns:
| Type | Description |
|---|---|
Optional[List[int]]
|
List of register values or None on error |
Source code in code/bess_rcu/drivers/modbus/rtu.py
write_register(address, value, slave_id=None)
async
¶
Write single holding register via RTU.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
int
|
Register address |
required |
value
|
int
|
Value to write (0-65535) |
required |
slave_id
|
Optional[int]
|
Target slave ID (client mode only, required for RTU) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if successful, False otherwise |
Source code in code/bess_rcu/drivers/modbus/rtu.py
write_multiple_registers(address, values, slave_id=None)
async
¶
Write multiple holding registers via RTU.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
int
|
Starting register address |
required |
values
|
List[int]
|
List of values to write |
required |
slave_id
|
Optional[int]
|
Target slave ID (client mode only, required for RTU) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if successful, False otherwise |
Source code in code/bess_rcu/drivers/modbus/rtu.py
CAN Bus Drivers¶
CANDriverBase¶
bess_rcu.drivers.can.base.CANDriverBase
¶
Bases: ABC
Abstract base class for CAN bus drivers.
This class defines the interface for CAN bus communication regardless of the underlying hardware (SocketCAN, PCAN, etc.).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
str
|
CAN channel name (e.g., 'can0', 'vcan0') |
required |
bitrate
|
int
|
Bus bitrate in bps (default: 500000) |
DEFAULT_BITRATE
|
can_filters
|
Optional[List[Dict[str, int]]]
|
List of CAN ID filters (optional) |
None
|
Example
class SocketCANDriver(CANDriverBase): ... async def connect(self): ... # Implementation ... pass
Source code in code/bess_rcu/drivers/can/base.py
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | |
Attributes¶
is_connected
property
¶
Check if connected to CAN bus.
Functions¶
connect()
abstractmethod
async
¶
Initialize and connect to CAN bus.
Returns:
| Type | Description |
|---|---|
bool
|
True if connection successful, False otherwise |
Raises:
| Type | Description |
|---|---|
CommunicationError
|
If CAN interface initialization fails |
disconnect()
abstractmethod
async
¶
send(message)
abstractmethod
async
¶
Send CAN message on the bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
CANMessage
|
CANMessage to send |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if send successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
Source code in code/bess_rcu/drivers/can/base.py
receive(timeout=DEFAULT_TIMEOUT)
abstractmethod
async
¶
Receive CAN message from bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait for message in seconds |
DEFAULT_TIMEOUT
|
Returns:
| Type | Description |
|---|---|
Optional[CANMessage]
|
Received CANMessage or None if timeout |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
Source code in code/bess_rcu/drivers/can/base.py
set_filters(filters)
abstractmethod
¶
Set CAN ID filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filters
|
List[Dict[str, int]]
|
List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}] |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if filters applied successfully |
Source code in code/bess_rcu/drivers/can/base.py
SocketCANDriver¶
bess_rcu.drivers.can.socketcan.SocketCANDriver
¶
Bases: CANDriverBase
SocketCAN driver implementation for Linux.
Supports both real CAN hardware (can0, can1) and virtual CAN (vcan0) using the python-can library with SocketCAN interface.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
str
|
CAN channel name (e.g., 'can0', 'vcan0') |
required |
bitrate
|
int
|
Bus bitrate in bps (default: 500000) |
500000
|
can_filters
|
Optional[List[Dict[str, int]]]
|
List of CAN ID filters (optional) |
None
|
Example
driver = SocketCANDriver(channel='vcan0', bitrate=500000) await driver.connect() msg = CANMessage(arbitration_id=0x100, data=b'\x01\x02\x03') await driver.send(msg) received = await driver.receive(timeout=1.0)
Source code in code/bess_rcu/drivers/can/socketcan.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | |
Attributes¶
is_connected
property
¶
Check if connected to CAN bus.
Functions¶
connect()
async
¶
Initialize and connect to SocketCAN interface.
Returns:
| Type | Description |
|---|---|
bool
|
True if connection successful, False otherwise |
Raises:
| Type | Description |
|---|---|
CommunicationError
|
If CAN interface initialization fails |
Source code in code/bess_rcu/drivers/can/socketcan.py
disconnect()
async
¶
Disconnect from SocketCAN interface and cleanup resources.
Source code in code/bess_rcu/drivers/can/socketcan.py
send(message)
async
¶
Send CAN message on the bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
CANMessage
|
CANMessage to send |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if send successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
Source code in code/bess_rcu/drivers/can/socketcan.py
receive(timeout=DEFAULT_TIMEOUT)
async
¶
Receive CAN message from bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait for message in seconds |
DEFAULT_TIMEOUT
|
Returns:
| Type | Description |
|---|---|
Optional[CANMessage]
|
Received CANMessage or None if timeout |
Raises:
| Type | Description |
|---|---|
DeviceNotConnectedError
|
If not connected to bus |
Source code in code/bess_rcu/drivers/can/socketcan.py
set_filters(filters)
¶
Set CAN ID filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filters
|
List[Dict[str, int]]
|
List of filter dicts with 'can_id' and 'can_mask' Example: [{'can_id': 0x100, 'can_mask': 0x7FF}] |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if filters applied successfully |
Source code in code/bess_rcu/drivers/can/socketcan.py
DBCLoaderBase¶
bess_rcu.drivers.can.base.DBCLoaderBase
¶
Bases: ABC
Abstract base class for DBC file loading and parsing.
This class defines the interface for loading CAN database files and encoding/decoding CAN messages based on signal definitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dbc_path
|
str
|
Path to DBC file |
required |
Example
class DBCLoader(DBCLoaderBase): ... def load(self): ... # Implementation ... pass
Source code in code/bess_rcu/drivers/can/base.py
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | |
Attributes¶
is_loaded
property
¶
Check if DBC file is loaded.
Functions¶
load()
abstractmethod
¶
Load and parse DBC file.
Returns:
| Type | Description |
|---|---|
bool
|
True if loaded successfully, False otherwise |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If DBC file not found |
ValueError
|
If DBC file format invalid |
Source code in code/bess_rcu/drivers/can/base.py
encode_message(message_name, data)
abstractmethod
¶
Encode signal data into CAN message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_name
|
str
|
Name of message in DBC |
required |
data
|
Dict[str, Any]
|
Dictionary of signal_name: value pairs |
required |
Returns:
| Type | Description |
|---|---|
Optional[CANMessage]
|
Encoded CANMessage or None if encoding fails |
Example
msg = loader.encode_message( ... 'PowerCommand', ... {'power_kw': 50.0, 'enable': True} ... )
Source code in code/bess_rcu/drivers/can/base.py
decode_message(message)
abstractmethod
¶
Decode CAN message into signal data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
CANMessage
|
CANMessage to decode |
required |
Returns:
| Type | Description |
|---|---|
Optional[Dict[str, Any]]
|
Dictionary of signal_name: value pairs or None if unknown message |
Example
signals = loader.decode_message(can_msg) power = signals.get('power_kw')
Source code in code/bess_rcu/drivers/can/base.py
get_message_id(message_name)
abstractmethod
¶
Get CAN ID for message name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_name
|
str
|
Name of message in DBC |
required |
Returns:
| Type | Description |
|---|---|
Optional[int]
|
CAN arbitration ID or None if not found |
DBCLoader¶
bess_rcu.drivers.can.dbc_loader.DBCLoader
¶
Bases: DBCLoaderBase
DBC file loader using cantools library.
Parses CAN database files (.dbc) and provides message encoding/decoding based on signal definitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dbc_path
|
str
|
Path to DBC file |
required |
Example
loader = DBCLoader(dbc_path='config/can/pcs.dbc') loader.load() msg = loader.encode_message('PowerCommand', {'power_kw': 50.0}) signals = loader.decode_message(received_msg)
Source code in code/bess_rcu/drivers/can/dbc_loader.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | |
Attributes¶
is_loaded
property
¶
Check if DBC file is loaded.
Functions¶
load()
¶
Load and parse DBC file.
Returns:
| Type | Description |
|---|---|
bool
|
True if loaded successfully, False otherwise |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If DBC file not found |
ValueError
|
If DBC file format invalid |
Source code in code/bess_rcu/drivers/can/dbc_loader.py
encode_message(message_name, data)
¶
Encode signal data into CAN message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_name
|
str
|
Name of message in DBC |
required |
data
|
Dict[str, Any]
|
Dictionary of signal_name: value pairs |
required |
Returns:
| Type | Description |
|---|---|
Optional[CANMessage]
|
Encoded CANMessage or None if encoding fails |
Example
msg = loader.encode_message( ... 'PowerCommand', ... {'power_kw': 50.0, 'enable': True} ... )
Source code in code/bess_rcu/drivers/can/dbc_loader.py
decode_message(message)
¶
Decode CAN message into signal data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
CANMessage
|
CANMessage to decode |
required |
Returns:
| Type | Description |
|---|---|
Optional[Dict[str, Any]]
|
Dictionary of signal_name: value pairs or None if unknown message |
Example
signals = loader.decode_message(can_msg) power = signals.get('power_kw')
Source code in code/bess_rcu/drivers/can/dbc_loader.py
get_message_id(message_name)
¶
Get CAN ID for message name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_name
|
str
|
Name of message in DBC |
required |
Returns:
| Type | Description |
|---|---|
Optional[int]
|
CAN arbitration ID or None if not found |
Source code in code/bess_rcu/drivers/can/dbc_loader.py
GPIO Drivers¶
GPIODriverBase¶
bess_rcu.drivers.gpio.base.GPIODriverBase
¶
Bases: ABC
Abstract base class for GPIO drivers.
This class defines the interface for digital I/O control on embedded Linux platforms using various GPIO libraries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chip
|
str
|
GPIO chip device (e.g., '/dev/gpiochip0') |
'/dev/gpiochip0'
|
Example
class LinuxGPIODriver(GPIODriverBase): ... def initialize(self): ... # Implementation ... pass
Source code in code/bess_rcu/drivers/gpio/base.py
Attributes¶
is_initialized
property
¶
Check if GPIO driver is initialized.
Functions¶
initialize()
abstractmethod
¶
Initialize GPIO subsystem.
Returns:
| Type | Description |
|---|---|
bool
|
True if initialization successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceError
|
If GPIO chip initialization fails |
cleanup()
abstractmethod
¶
setup_pin(pin, mode)
abstractmethod
¶
Configure GPIO pin mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
mode
|
PinMode
|
Pin mode (INPUT, OUTPUT, etc.) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if configuration successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin number is invalid |
Source code in code/bess_rcu/drivers/gpio/base.py
read_pin(pin)
abstractmethod
¶
Read GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
Returns:
| Type | Description |
|---|---|
Optional[PinState]
|
Current pin state (LOW/HIGH) or None on error |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as input |
Source code in code/bess_rcu/drivers/gpio/base.py
write_pin(pin, state)
abstractmethod
¶
Write GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
state
|
PinState
|
Desired pin state (LOW/HIGH) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if write successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as output |
Source code in code/bess_rcu/drivers/gpio/base.py
LinuxGPIODriver¶
bess_rcu.drivers.gpio.linux_gpio.LinuxGPIODriver
¶
Bases: GPIODriverBase
Linux GPIO driver using gpiod library.
Provides GPIO control on Linux using the modern character device interface (/dev/gpiochipX) instead of deprecated sysfs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chip
|
str
|
GPIO chip device path (default: '/dev/gpiochip0') |
'/dev/gpiochip0'
|
consumer
|
str
|
Consumer name for GPIO requests (default: 'bess_rcu') |
'bess_rcu'
|
Example
driver = LinuxGPIODriver(chip='/dev/gpiochip0') driver.initialize() driver.setup_pin(17, PinMode.OUTPUT) driver.write_pin(17, PinState.HIGH) state = driver.read_pin(18)
Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | |
Attributes¶
is_initialized
property
¶
Check if GPIO driver is initialized.
Functions¶
initialize()
¶
Initialize GPIO subsystem.
Returns:
| Type | Description |
|---|---|
bool
|
True if initialization successful, False otherwise |
Raises:
| Type | Description |
|---|---|
DeviceError
|
If GPIO chip initialization fails |
Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
cleanup()
¶
Cleanup GPIO resources and release pins.
Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
setup_pin(pin, mode)
¶
Configure GPIO pin mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
mode
|
PinMode
|
Pin mode (INPUT, OUTPUT, etc.) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if configuration successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin number is invalid |
Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
read_pin(pin)
¶
Read GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
Returns:
| Type | Description |
|---|---|
Optional[PinState]
|
Current pin state (LOW/HIGH) or None on error |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as input |
Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
write_pin(pin, state)
¶
Write GPIO pin state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pin
|
int
|
GPIO pin number |
required |
state
|
PinState
|
Desired pin state (LOW/HIGH) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if write successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pin is not configured as output |
Source code in code/bess_rcu/drivers/gpio/linux_gpio.py
Usage Examples¶
Modbus TCP Server¶
from bess_rcu.drivers.modbus import ModbusTCPDriver, DriverMode
# Create Modbus TCP server for PLC
driver = ModbusTCPDriver(
host="0.0.0.0",
port=502,
mode=DriverMode.SERVER
)
# Connect and start serving
await driver.connect()
# Write coil
await driver.write_coil(address=0x0001, value=True)
# Read holding register
value = await driver.read_holding_register(address=0x2001)
CAN Bus with DBC¶
from bess_rcu.drivers.can import SocketCANDriver, DBCLoader
# Load DBC file
dbc_loader = DBCLoader()
database = dbc_loader.load("config/can/pcs.dbc")
# Create CAN driver
can_driver = SocketCANDriver(
channel="can0",
bitrate=500000,
dbc_database=database
)
# Connect
await can_driver.connect()
# Send message using DBC definition
await can_driver.send_message(
"PowerCommand",
{"power_kw": 75.0, "mode": 1}
)
# Receive and decode
msg = await can_driver.receive_message(timeout=1.0)
decoded = can_driver.decode_message(msg)
GPIO Control¶
from bess_rcu.drivers.gpio import LinuxGPIODriver, PinMode
# Create GPIO driver
gpio = LinuxGPIODriver(chip="/dev/gpiochip0")
# Initialize
await gpio.initialize()
# Configure output pin
await gpio.configure_pin(pin=17, mode=PinMode.OUTPUT)
# Set pin high
await gpio.write_pin(pin=17, value=True)
# Read input pin
value = await gpio.read_pin(pin=27)
Driver Mode Enumeration¶
All communication drivers support two operational modes:
from bess_rcu.drivers.modbus import DriverMode
# Server mode - listen for incoming connections
driver = ModbusTCPDriver(mode=DriverMode.SERVER)
# Client mode - initiate connections
driver = ModbusTCPDriver(mode=DriverMode.CLIENT)
Common Interfaces¶
All drivers implement these common methods:
connect()- Establish connection/initialize hardwaredisconnect()- Clean shutdown and resource cleanupis_connected()- Check connection status
Protocol-specific methods are documented in each driver class.