from __future__ import print_function
import ctypes
import threading
import logging
import select
import sys
from ctypes.util import find_library
import can
from can.broadcastmanager import CyclicSendTaskABC, RestartableCyclicTaskABC, ModifiableCyclicTaskABC
from can.bus import BusABC
from can.message import Message
from can.interfaces.socketcan.socketcan_constants import * # CAN_RAW
from can.interfaces.socketcan.socketcan_common import * # parseCanFilters
# Set up logging
log = logging.getLogger('can.socketcan.ctypes')
log.info("Loading socketcan ctypes backend")
if not sys.platform.startswith("win32"):
libc = ctypes.cdll.LoadLibrary(find_library("c"))
log.info("Loading libc with ctypes")
else:
log.warning("libc is unavailable")
libc = None
start_sec = 0
start_usec = 0
SEC_USEC = 1000000
[docs]class SocketcanCtypes_Bus(BusABC):
"""
An implementation of the :class:`can.bus.BusABC` for SocketCAN using :mod:`ctypes`.
"""
channel_info = "ctypes socketcan channel"
def __init__(self,
channel='vcan0',
receive_own_messages=False,
*args, **kwargs):
"""
:param str channel:
The can interface name with which to create this bus. An example channel
would be 'vcan0'.
"""
self.socket = createSocket()
self.channel = channel
log.debug("Result of createSocket was %d", self.socket)
# Add any socket options such as can frame filters
if 'can_filters' in kwargs and len(kwargs['can_filters']) > 0:
log.debug("Creating a filtered can bus")
self.set_filters(kwargs['can_filters'])
error = bindSocket(self.socket, channel)
if error < 0:
m = u'bindSocket failed for channel {} with error {}'.format(
channel, error)
raise can.CanError(m)
if receive_own_messages:
error1 = recv_own_msgs(self.socket)
super(SocketcanCtypes_Bus, self).__init__(*args, **kwargs)
[docs] def set_filters(self, can_filters=None):
"""Apply filtering to all messages received by this Bus.
Calling without passing any filters will reset the applied filters.
:param list can_filters:
A list of dictionaries each containing a "can_id" and a "can_mask".
>>> [{"can_id": 0x11, "can_mask": 0x21}]
A filter matches, when ``<received_can_id> & can_mask == can_id & can_mask``
"""
filter_struct = pack_filters(can_filters)
res = libc.setsockopt(self.socket,
SOL_CAN_RAW,
CAN_RAW_FILTER,
filter_struct, len(filter_struct)
)
# TODO Is this serious enough to raise a CanError exception?
if res != 0:
log.error('Setting filters failed: ' + str(res))
def recv(self, timeout=None):
log.debug("Trying to read a msg")
if timeout is None or len(select.select([self.socket],
[], [], timeout)[0]) > 0:
packet = capturePacket(self.socket)
else:
# socket wasn't readable or timeout occurred
return None
log.debug("Receiving a message")
arbitration_id = packet['CAN ID'] & MSK_ARBID
# Flags: EXT, RTR, ERR
flags = (packet['CAN ID'] & MSK_FLAGS) >> 29
rx_msg = Message(
timestamp=packet['Timestamp'],
is_remote_frame=bool(flags & SKT_RTRFLG),
extended_id=bool(flags & EXTFLG),
is_error_frame=bool(flags & SKT_ERRFLG),
arbitration_id=arbitration_id,
dlc=packet['DLC'],
data=packet['Data']
)
return rx_msg
def send(self, msg, timeout=None):
frame = _build_can_frame(msg)
if timeout:
# Wait for write availability. write will fail below on timeout
select.select([], [self.socket], [], timeout)
bytes_sent = libc.write(self.socket, ctypes.byref(frame), ctypes.sizeof(frame))
if bytes_sent == -1:
log.debug("Error sending frame :-/")
raise can.CanError("can.socketcan.ctypes failed to transmit")
elif bytes_sent == 0:
raise can.CanError("Transmit buffer overflow")
log.debug("Frame transmitted with %s bytes", bytes_sent)
def send_periodic(self, msg, period, duration=None):
task = CyclicSendTask(self.channel, msg, period)
if duration is not None:
threading.Timer(duration, task.stop).start()
return task
class SOCKADDR(ctypes.Structure):
# See /usr/include/i386-linux-gnu/bits/socket.h for original struct
_fields_ = [("sa_family", ctypes.c_uint16),
("sa_data", (ctypes.c_char)*14)]
class TP(ctypes.Structure):
# This struct is only used within the SOCKADDR_CAN struct
_fields_ = [("rx_id", ctypes.c_uint32),
("tx_id", ctypes.c_uint32)]
class ADDR_INFO(ctypes.Union):
# This struct is only used within the SOCKADDR_CAN struct
# This union is to future proof for future can address information
_fields_ = [("tp", TP)]
class SOCKADDR_CAN(ctypes.Structure):
# See /usr/include/linux/can.h for original struct
_fields_ = [("can_family", ctypes.c_uint16),
("can_ifindex", ctypes.c_int),
("can_addr", ADDR_INFO)]
class IFREQ(ctypes.Structure):
# The two fields in this struct were originally unions.
# See /usr/include/net/if.h for original struct
_fields_ = [("ifr_name", ctypes.c_char*16),
("ifr_ifindex", ctypes.c_int)]
class CAN_FRAME(ctypes.Structure):
# See /usr/include/linux/can.h for original struct
# The 32bit can id is directly followed by the 8bit data link count
# The data field is aligned on an 8 byte boundary, hence the padding.
# Aligns the data field to an 8 byte boundary
_fields_ = [("can_id", ctypes.c_uint32),
("can_dlc", ctypes.c_uint8),
("padding", ctypes.c_ubyte * 3),
("data", ctypes.c_uint8 * 8)
]
class TIME_VALUE(ctypes.Structure):
# See usr/include/linux/time.h for original struct
_fields_ = [("tv_sec", ctypes.c_ulong),
("tv_usec", ctypes.c_ulong)]
class BCM_HEADER(ctypes.Structure):
# See usr/include/linux/can/bcm.h for original struct
_fields_ = [
("opcode", ctypes.c_uint32),
("flags", ctypes.c_uint32),
("count", ctypes.c_uint32),
("ival1", TIME_VALUE),
("ival2", TIME_VALUE),
("can_id", ctypes.c_uint32),
("nframes", ctypes.c_uint32),
("frames", CAN_FRAME)
]
[docs]def createSocket(protocol=CAN_RAW):
"""
This function creates a RAW CAN socket.
The socket returned needs to be bound to an interface by calling
:func:`bindSocket`.
:param int protocol:
The type of the socket to be bound. Valid values
include CAN_RAW and CAN_BCM
:return:
+-----------+----------------------------+
| 0 |protocol invalid |
+-----------+----------------------------+
| -1 |socket creation unsuccessful|
+-----------+----------------------------+
| socketID | successful creation |
+-----------+----------------------------+
"""
if protocol == CAN_RAW:
socketID = libc.socket(PF_CAN, SOCK_RAW, CAN_RAW)
elif protocol == CAN_BCM:
socketID = libc.socket(PF_CAN, SOCK_DGRAM, CAN_BCM)
else:
socketID = -1
return socketID
[docs]def bindSocket(socketID, channel_name):
"""
Binds the given socket to the given interface.
:param int socketID:
The ID of the socket to be bound
:param str channel_name:
The interface name to find and bind.
:return:
The error code from the bind call.
+----+----------------------------+
| 0 |protocol invalid |
+----+----------------------------+
| -1 |socket creation unsuccessful|
+----+----------------------------+
"""
log.debug('Binding socket with id %d to channel %s', socketID, channel_name)
socketID = ctypes.c_int(socketID)
ifr = IFREQ()
ifr.ifr_name = channel_name.encode('ascii')
log.debug('calling ioctl SIOCGIFINDEX')
# ifr.ifr_ifindex gets filled with that device's index
ret = libc.ioctl(socketID, SIOCGIFINDEX, ctypes.byref(ifr))
if ret < 0:
m = u'Failure while getting "{}" interface index.'.format(channel_name)
raise can.CanError(m)
log.info('ifr.ifr_ifindex: %d', ifr.ifr_ifindex)
# select the CAN interface and bind the socket to it
addr = SOCKADDR_CAN(AF_CAN, ifr.ifr_ifindex)
error = libc.bind(socketID, ctypes.byref(addr), ctypes.sizeof(addr))
if error < 0:
log.error("Couldn't bind socket")
log.debug('bind returned: %d', error)
return error
[docs]def connectSocket(socketID, channel_name):
"""Connects the given socket to the given interface.
:param int socketID:
The ID of the socket to be bound
:param str channel_name:
The interface name to find and bind.
:return:
The error code from the bind call.
"""
log.debug('Connecting socket with id %d to channel %s', socketID, channel_name)
socketID = ctypes.c_int(socketID)
ifr = IFREQ()
ifr.ifr_name = channel_name.encode('ascii')
log.debug('calling ioctl SIOCGIFINDEX')
# ifr.ifr_ifindex gets filled with that device's index
libc.ioctl(socketID, SIOCGIFINDEX, ctypes.byref(ifr))
log.info('ifr.ifr_ifindex: %d', ifr.ifr_ifindex)
# select the CAN interface and bind the socket to it
addr = SOCKADDR_CAN(AF_CAN, ifr.ifr_ifindex)
error = libc.connect(socketID, ctypes.byref(addr), ctypes.sizeof(addr))
if error < 0:
log.error("Couldn't connect socket")
log.debug('connect returned: %d', error)
return error
def recv_own_msgs(socket_id):
setting = ctypes.c_int(1)
error = libc.setsockopt(socket_id, SOL_CAN_RAW, CAN_RAW_RECV_OWN_MSGS, ctypes.byref(setting), ctypes.sizeof(setting))
if error < 0:
log.error("Couldn't set recv own msgs")
return error
def _build_can_frame(message):
log.debug("Packing a can frame")
arbitration_id = message.arbitration_id
if message.id_type:
log.debug("sending an extended id type message")
arbitration_id |= 0x80000000
if message.is_remote_frame:
log.debug("requesting a remote frame")
arbitration_id |= 0x40000000
if message.is_error_frame:
log.debug("sending error frame")
arbitration_id |= 0x20000000
log.debug("Data: %s", message.data)
log.debug("Type: %s", type(message.data))
# TODO need to understand the extended frame format
frame = CAN_FRAME()
frame.can_id = arbitration_id
frame.can_dlc = len(message.data)
frame.data[0:frame.can_dlc] = message.data
log.debug("sizeof frame: %d", ctypes.sizeof(frame))
return frame
[docs]def capturePacket(socketID):
"""
Captures a packet of data from the given socket.
:param int socketID:
The socket to read from
:return:
A dictionary with the following keys:
- `"CAN ID"` (int)
- `"DLC"` (int)
- `"Data"` (list)
- `"Timestamp"` (float)
"""
packet = {}
frame = CAN_FRAME()
time = TIME_VALUE()
# Fetching the Arb ID, DLC and Data
bytes_read = libc.read(socketID, ctypes.byref(frame), ctypes.sizeof(frame))
# Fetching the timestamp
error = libc.ioctl(socketID, SIOCGSTAMP, ctypes.byref(time))
packet['CAN ID'] = frame.can_id
packet['DLC'] = frame.can_dlc
packet["Data"] = [frame.data[i] for i in range(frame.can_dlc)]
timestamp = time.tv_sec + (time.tv_usec / 1000000.0)
packet['Timestamp'] = timestamp
return packet
def _create_bcm_frame(opcode, flags, count, ival1_seconds, ival1_usec, ival2_seconds, ival2_usec, can_id, nframes, msg_frame):
frame = BCM_HEADER()
frame.opcode = opcode
frame.flags = flags
frame.count = count
frame.ival1.tv_sec = ival1_seconds
frame.ival1.tv_usec = ival1_usec
frame.ival2.tv_sec = ival2_seconds
frame.ival2.tv_usec = ival2_usec
frame.can_id = can_id
frame.nframes = nframes
frame.frames = msg_frame
return frame
class SocketCanCtypesBCMBase(object):
"""Mixin to add a BCM socket"""
def __init__(self, channel, *args, **kwargs):
log.debug("Creating bcm socket on channel '%s'", channel)
# Set up the bcm socket using ctypes
self.bcm_socket = createSocket(protocol=CAN_BCM)
log.debug("Created bcm socket (un-connected fd=%d)", self.bcm_socket)
connectSocket(self.bcm_socket, channel)
log.debug("Connected bcm socket")
super(SocketCanCtypesBCMBase, self).__init__(*args, **kwargs)
[docs]class CyclicSendTask(SocketCanCtypesBCMBase, RestartableCyclicTaskABC, ModifiableCyclicTaskABC):
def __init__(self, channel, message, period):
"""
:param channel: The name of the CAN channel to connect to.
:param message: The message to be sent periodically.
:param period: The rate in seconds at which to send the message.
"""
super(CyclicSendTask, self).__init__(channel, message, period)
self.message = message
# Send the bcm message with opcode TX_SETUP to start the cyclic transmit
self._tx_setup()
def _tx_setup(self):
message = self.message
# Create a low level packed frame to pass to the kernel
msg_frame = _build_can_frame(message)
frame = _create_bcm_frame(opcode=CAN_BCM_TX_SETUP,
flags=SETTIMER | STARTTIMER,
count=0,
ival1_seconds=0,
ival1_usec=0,
ival2_seconds=int(self.period),
ival2_usec=int(1e6 * (self.period - int(self.period))),
can_id=message.arbitration_id,
nframes=1,
msg_frame=msg_frame)
log.info("Sending BCM TX_SETUP command")
bytes_sent = libc.send(self.bcm_socket, ctypes.byref(frame), ctypes.sizeof(frame))
if bytes_sent == -1:
log.debug("Error sending frame :-/")
def start(self):
self._tx_setup()
[docs] def stop(self):
"""Send a TX_DELETE message to cancel this task.
This will delete the entry for the transmission of the CAN-message
with the specified can_id CAN identifier. The message length for the command
TX_DELETE is {[bcm_msg_head]} (only the header).
"""
frame = _create_bcm_frame(
opcode=CAN_BCM_TX_DELETE,
flags=0,
count=0,
ival1_seconds=0,
ival1_usec=0,
ival2_seconds=0,
ival2_usec=0,
can_id=self.can_id,
nframes=0,
msg_frame=CAN_FRAME()
)
bytes_sent = libc.send(self.bcm_socket, ctypes.byref(frame), ctypes.sizeof(frame))
if bytes_sent == -1:
log.debug("Error sending frame to stop cyclic message:-/")
[docs] def modify_data(self, message):
"""Update the contents of this periodically sent message.
"""
assert message.arbitration_id == self.can_id, "You cannot modify the can identifier"
self.message = message
self._tx_setup()
class MultiRateCyclicSendTask(CyclicSendTask):
"""Exposes more of the full power of the TX_SETUP opcode.
Transmits a message `count` times at `initial_period` then
continues to transmit message at `subsequent_period`.
"""
def __init__(self, channel, message, count, initial_period, subsequent_period):
super(MultiRateCyclicSendTask, self).__init__(channel, message, subsequent_period)
msg_frame = _build_can_frame(message)
frame = _create_bcm_frame(opcode=CAN_BCM_TX_SETUP,
flags=SETTIMER | STARTTIMER,
count=count,
ival1_seconds=int(initial_period),
ival1_usec=int(1e6 * (initial_period - int(initial_period))),
ival2_seconds=int(subsequent_period),
ival2_usec=int(1e6 * (subsequent_period - int(subsequent_period))),
can_id=message.arbitration_id,
nframes=1,
msg_frame=msg_frame)
log.info("Sending BCM TX_SETUP command")
bytes_sent = libc.send(self.bcm_socket, ctypes.byref(frame), ctypes.sizeof(frame))
if bytes_sent == -1:
log.debug("Error sending frame :-/")