Source code for can.interfaces.socketcan.socketcan_ctypes

from __future__ import print_function

import ctypes
import threading
import logging
import select
import sys
from ctypes.util import find_library

import can
from can.broadcastmanager import CyclicSendTaskABC, RestartableCyclicTaskABC, ModifiableCyclicTaskABC
from can.bus import BusABC
from can.message import Message
from can.interfaces.socketcan.socketcan_constants import *  # CAN_RAW
from can.interfaces.socketcan.socketcan_common import * # parseCanFilters

# Set up logging
log = logging.getLogger('can.socketcan.ctypes')
log.info("Loading socketcan ctypes backend")


if not sys.platform.startswith("win32"):
    libc = ctypes.cdll.LoadLibrary(find_library("c"))
    log.info("Loading libc with ctypes")
else:
    log.warning("libc is unavailable")
    libc = None


start_sec = 0
start_usec = 0
SEC_USEC = 1000000


[docs]class SocketcanCtypes_Bus(BusABC): """ An implementation of the :class:`can.bus.BusABC` for SocketCAN using :mod:`ctypes`. """ channel_info = "ctypes socketcan channel" def __init__(self, channel='vcan0', receive_own_messages=False, *args, **kwargs): """ :param str channel: The can interface name with which to create this bus. An example channel would be 'vcan0'. """ self.socket = createSocket() self.channel = channel log.debug("Result of createSocket was %d", self.socket) # Add any socket options such as can frame filters if 'can_filters' in kwargs and len(kwargs['can_filters']) > 0: log.debug("Creating a filtered can bus") self.set_filters(kwargs['can_filters']) error = bindSocket(self.socket, channel) if error < 0: m = u'bindSocket failed for channel {} with error {}'.format( channel, error) raise can.CanError(m) if receive_own_messages: error1 = recv_own_msgs(self.socket) super(SocketcanCtypes_Bus, self).__init__(*args, **kwargs)
[docs] def set_filters(self, can_filters=None): """Apply filtering to all messages received by this Bus. Calling without passing any filters will reset the applied filters. :param list can_filters: A list of dictionaries each containing a "can_id" and a "can_mask". >>> [{"can_id": 0x11, "can_mask": 0x21}] A filter matches, when ``<received_can_id> & can_mask == can_id & can_mask`` """ filter_struct = pack_filters(can_filters) res = libc.setsockopt(self.socket, SOL_CAN_RAW, CAN_RAW_FILTER, filter_struct, len(filter_struct) ) # TODO Is this serious enough to raise a CanError exception? if res != 0: log.error('Setting filters failed: ' + str(res))
def recv(self, timeout=None): log.debug("Trying to read a msg") if timeout is None or len(select.select([self.socket], [], [], timeout)[0]) > 0: packet = capturePacket(self.socket) else: # socket wasn't readable or timeout occurred return None log.debug("Receiving a message") arbitration_id = packet['CAN ID'] & MSK_ARBID # Flags: EXT, RTR, ERR flags = (packet['CAN ID'] & MSK_FLAGS) >> 29 rx_msg = Message( timestamp=packet['Timestamp'], is_remote_frame=bool(flags & SKT_RTRFLG), extended_id=bool(flags & EXTFLG), is_error_frame=bool(flags & SKT_ERRFLG), arbitration_id=arbitration_id, dlc=packet['DLC'], data=packet['Data'] ) return rx_msg def send(self, msg, timeout=None): frame = _build_can_frame(msg) if timeout: # Wait for write availability. write will fail below on timeout select.select([], [self.socket], [], timeout) bytes_sent = libc.write(self.socket, ctypes.byref(frame), ctypes.sizeof(frame)) if bytes_sent == -1: log.debug("Error sending frame :-/") raise can.CanError("can.socketcan.ctypes failed to transmit") elif bytes_sent == 0: raise can.CanError("Transmit buffer overflow") log.debug("Frame transmitted with %s bytes", bytes_sent) def send_periodic(self, msg, period, duration=None): task = CyclicSendTask(self.channel, msg, period) if duration is not None: threading.Timer(duration, task.stop).start() return task
class SOCKADDR(ctypes.Structure): # See /usr/include/i386-linux-gnu/bits/socket.h for original struct _fields_ = [("sa_family", ctypes.c_uint16), ("sa_data", (ctypes.c_char)*14)] class TP(ctypes.Structure): # This struct is only used within the SOCKADDR_CAN struct _fields_ = [("rx_id", ctypes.c_uint32), ("tx_id", ctypes.c_uint32)] class ADDR_INFO(ctypes.Union): # This struct is only used within the SOCKADDR_CAN struct # This union is to future proof for future can address information _fields_ = [("tp", TP)] class SOCKADDR_CAN(ctypes.Structure): # See /usr/include/linux/can.h for original struct _fields_ = [("can_family", ctypes.c_uint16), ("can_ifindex", ctypes.c_int), ("can_addr", ADDR_INFO)] class IFREQ(ctypes.Structure): # The two fields in this struct were originally unions. # See /usr/include/net/if.h for original struct _fields_ = [("ifr_name", ctypes.c_char*16), ("ifr_ifindex", ctypes.c_int)] class CAN_FRAME(ctypes.Structure): # See /usr/include/linux/can.h for original struct # The 32bit can id is directly followed by the 8bit data link count # The data field is aligned on an 8 byte boundary, hence the padding. # Aligns the data field to an 8 byte boundary _fields_ = [("can_id", ctypes.c_uint32), ("can_dlc", ctypes.c_uint8), ("padding", ctypes.c_ubyte * 3), ("data", ctypes.c_uint8 * 8) ] class TIME_VALUE(ctypes.Structure): # See usr/include/linux/time.h for original struct _fields_ = [("tv_sec", ctypes.c_ulong), ("tv_usec", ctypes.c_ulong)] class BCM_HEADER(ctypes.Structure): # See usr/include/linux/can/bcm.h for original struct _fields_ = [ ("opcode", ctypes.c_uint32), ("flags", ctypes.c_uint32), ("count", ctypes.c_uint32), ("ival1", TIME_VALUE), ("ival2", TIME_VALUE), ("can_id", ctypes.c_uint32), ("nframes", ctypes.c_uint32), ("frames", CAN_FRAME) ]
[docs]def createSocket(protocol=CAN_RAW): """ This function creates a RAW CAN socket. The socket returned needs to be bound to an interface by calling :func:`bindSocket`. :param int protocol: The type of the socket to be bound. Valid values include CAN_RAW and CAN_BCM :return: +-----------+----------------------------+ | 0 |protocol invalid | +-----------+----------------------------+ | -1 |socket creation unsuccessful| +-----------+----------------------------+ | socketID | successful creation | +-----------+----------------------------+ """ if protocol == CAN_RAW: socketID = libc.socket(PF_CAN, SOCK_RAW, CAN_RAW) elif protocol == CAN_BCM: socketID = libc.socket(PF_CAN, SOCK_DGRAM, CAN_BCM) else: socketID = -1 return socketID
[docs]def bindSocket(socketID, channel_name): """ Binds the given socket to the given interface. :param int socketID: The ID of the socket to be bound :param str channel_name: The interface name to find and bind. :return: The error code from the bind call. +----+----------------------------+ | 0 |protocol invalid | +----+----------------------------+ | -1 |socket creation unsuccessful| +----+----------------------------+ """ log.debug('Binding socket with id %d to channel %s', socketID, channel_name) socketID = ctypes.c_int(socketID) ifr = IFREQ() ifr.ifr_name = channel_name.encode('ascii') log.debug('calling ioctl SIOCGIFINDEX') # ifr.ifr_ifindex gets filled with that device's index ret = libc.ioctl(socketID, SIOCGIFINDEX, ctypes.byref(ifr)) if ret < 0: m = u'Failure while getting "{}" interface index.'.format(channel_name) raise can.CanError(m) log.info('ifr.ifr_ifindex: %d', ifr.ifr_ifindex) # select the CAN interface and bind the socket to it addr = SOCKADDR_CAN(AF_CAN, ifr.ifr_ifindex) error = libc.bind(socketID, ctypes.byref(addr), ctypes.sizeof(addr)) if error < 0: log.error("Couldn't bind socket") log.debug('bind returned: %d', error) return error
[docs]def connectSocket(socketID, channel_name): """Connects the given socket to the given interface. :param int socketID: The ID of the socket to be bound :param str channel_name: The interface name to find and bind. :return: The error code from the bind call. """ log.debug('Connecting socket with id %d to channel %s', socketID, channel_name) socketID = ctypes.c_int(socketID) ifr = IFREQ() ifr.ifr_name = channel_name.encode('ascii') log.debug('calling ioctl SIOCGIFINDEX') # ifr.ifr_ifindex gets filled with that device's index libc.ioctl(socketID, SIOCGIFINDEX, ctypes.byref(ifr)) log.info('ifr.ifr_ifindex: %d', ifr.ifr_ifindex) # select the CAN interface and bind the socket to it addr = SOCKADDR_CAN(AF_CAN, ifr.ifr_ifindex) error = libc.connect(socketID, ctypes.byref(addr), ctypes.sizeof(addr)) if error < 0: log.error("Couldn't connect socket") log.debug('connect returned: %d', error) return error
def recv_own_msgs(socket_id): setting = ctypes.c_int(1) error = libc.setsockopt(socket_id, SOL_CAN_RAW, CAN_RAW_RECV_OWN_MSGS, ctypes.byref(setting), ctypes.sizeof(setting)) if error < 0: log.error("Couldn't set recv own msgs") return error def _build_can_frame(message): log.debug("Packing a can frame") arbitration_id = message.arbitration_id if message.id_type: log.debug("sending an extended id type message") arbitration_id |= 0x80000000 if message.is_remote_frame: log.debug("requesting a remote frame") arbitration_id |= 0x40000000 if message.is_error_frame: log.debug("sending error frame") arbitration_id |= 0x20000000 log.debug("Data: %s", message.data) log.debug("Type: %s", type(message.data)) # TODO need to understand the extended frame format frame = CAN_FRAME() frame.can_id = arbitration_id frame.can_dlc = len(message.data) frame.data[0:frame.can_dlc] = message.data log.debug("sizeof frame: %d", ctypes.sizeof(frame)) return frame
[docs]def capturePacket(socketID): """ Captures a packet of data from the given socket. :param int socketID: The socket to read from :return: A dictionary with the following keys: - `"CAN ID"` (int) - `"DLC"` (int) - `"Data"` (list) - `"Timestamp"` (float) """ packet = {} frame = CAN_FRAME() time = TIME_VALUE() # Fetching the Arb ID, DLC and Data bytes_read = libc.read(socketID, ctypes.byref(frame), ctypes.sizeof(frame)) # Fetching the timestamp error = libc.ioctl(socketID, SIOCGSTAMP, ctypes.byref(time)) packet['CAN ID'] = frame.can_id packet['DLC'] = frame.can_dlc packet["Data"] = [frame.data[i] for i in range(frame.can_dlc)] timestamp = time.tv_sec + (time.tv_usec / 1000000.0) packet['Timestamp'] = timestamp return packet
def _create_bcm_frame(opcode, flags, count, ival1_seconds, ival1_usec, ival2_seconds, ival2_usec, can_id, nframes, msg_frame): frame = BCM_HEADER() frame.opcode = opcode frame.flags = flags frame.count = count frame.ival1.tv_sec = ival1_seconds frame.ival1.tv_usec = ival1_usec frame.ival2.tv_sec = ival2_seconds frame.ival2.tv_usec = ival2_usec frame.can_id = can_id frame.nframes = nframes frame.frames = msg_frame return frame class SocketCanCtypesBCMBase(object): """Mixin to add a BCM socket""" def __init__(self, channel, *args, **kwargs): log.debug("Creating bcm socket on channel '%s'", channel) # Set up the bcm socket using ctypes self.bcm_socket = createSocket(protocol=CAN_BCM) log.debug("Created bcm socket (un-connected fd=%d)", self.bcm_socket) connectSocket(self.bcm_socket, channel) log.debug("Connected bcm socket") super(SocketCanCtypesBCMBase, self).__init__(*args, **kwargs)
[docs]class CyclicSendTask(SocketCanCtypesBCMBase, RestartableCyclicTaskABC, ModifiableCyclicTaskABC): def __init__(self, channel, message, period): """ :param channel: The name of the CAN channel to connect to. :param message: The message to be sent periodically. :param period: The rate in seconds at which to send the message. """ super(CyclicSendTask, self).__init__(channel, message, period) self.message = message # Send the bcm message with opcode TX_SETUP to start the cyclic transmit self._tx_setup() def _tx_setup(self): message = self.message # Create a low level packed frame to pass to the kernel msg_frame = _build_can_frame(message) frame = _create_bcm_frame(opcode=CAN_BCM_TX_SETUP, flags=SETTIMER | STARTTIMER, count=0, ival1_seconds=0, ival1_usec=0, ival2_seconds=int(self.period), ival2_usec=int(1e6 * (self.period - int(self.period))), can_id=message.arbitration_id, nframes=1, msg_frame=msg_frame) log.info("Sending BCM TX_SETUP command") bytes_sent = libc.send(self.bcm_socket, ctypes.byref(frame), ctypes.sizeof(frame)) if bytes_sent == -1: log.debug("Error sending frame :-/") def start(self): self._tx_setup()
[docs] def stop(self): """Send a TX_DELETE message to cancel this task. This will delete the entry for the transmission of the CAN-message with the specified can_id CAN identifier. The message length for the command TX_DELETE is {[bcm_msg_head]} (only the header). """ frame = _create_bcm_frame( opcode=CAN_BCM_TX_DELETE, flags=0, count=0, ival1_seconds=0, ival1_usec=0, ival2_seconds=0, ival2_usec=0, can_id=self.can_id, nframes=0, msg_frame=CAN_FRAME() ) bytes_sent = libc.send(self.bcm_socket, ctypes.byref(frame), ctypes.sizeof(frame)) if bytes_sent == -1: log.debug("Error sending frame to stop cyclic message:-/")
[docs] def modify_data(self, message): """Update the contents of this periodically sent message. """ assert message.arbitration_id == self.can_id, "You cannot modify the can identifier" self.message = message self._tx_setup()
class MultiRateCyclicSendTask(CyclicSendTask): """Exposes more of the full power of the TX_SETUP opcode. Transmits a message `count` times at `initial_period` then continues to transmit message at `subsequent_period`. """ def __init__(self, channel, message, count, initial_period, subsequent_period): super(MultiRateCyclicSendTask, self).__init__(channel, message, subsequent_period) msg_frame = _build_can_frame(message) frame = _create_bcm_frame(opcode=CAN_BCM_TX_SETUP, flags=SETTIMER | STARTTIMER, count=count, ival1_seconds=int(initial_period), ival1_usec=int(1e6 * (initial_period - int(initial_period))), ival2_seconds=int(subsequent_period), ival2_usec=int(1e6 * (subsequent_period - int(subsequent_period))), can_id=message.arbitration_id, nframes=1, msg_frame=msg_frame) log.info("Sending BCM TX_SETUP command") bytes_sent = libc.send(self.bcm_socket, ctypes.byref(frame), ctypes.sizeof(frame)) if bytes_sent == -1: log.debug("Error sending frame :-/")