Source code for can.interfaces.socketcan_native

# -*- coding: utf-8 -*-

"""
This implementation is for versions of Python that have native
can socket and can bcm socket support: >3.4
"""

import socket
import struct
import logging
from collections import namedtuple
import select


log = logging.getLogger('can.socketcan.native')
#log.setLevel(logging.DEBUG)
log.debug("Loading native socket can implementation")

try:
    import fcntl
except ImportError:
    log.warning("fcntl not available on this platform")

try:
    socket.CAN_RAW
except:
    log.debug("CAN_* properties not found in socket module. These are required to use native socketcan")


from can import Message
from can.interfaces.socketcan_constants import *  # CAN_RAW
from ..bus import BusABC

from ..broadcastmanager import CyclicSendTaskABC

# struct module defines a binary packing format:
# https://docs.python.org/3/library/struct.html#struct-format-strings
# The 32bit can id is directly followed by the 8bit data link count
# The data field is aligned on an 8 byte boundary, hence we add padding
# which aligns the data field to an 8 byte boundary.
can_frame_fmt = "=IB3x8s"
can_frame_size = struct.calcsize(can_frame_fmt)


def build_can_frame(can_id, data):
    """ CAN frame packing/unpacking (see 'struct can_frame' in <linux/can.h>)
    /**
     * struct can_frame - basic CAN frame structure
     * @can_id:  the CAN ID of the frame and CAN_*_FLAG flags, see above.
     * @can_dlc: the data length field of the CAN frame
     * @data:    the CAN frame payload.
     */
    struct can_frame {
        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */
        __u8    can_dlc; /* data length code: 0 .. 8 */
        __u8    data[8] __attribute__((aligned(8)));
    };
    """
    can_dlc = len(data)
    data = data.ljust(8, b'\x00')
    return struct.pack(can_frame_fmt, can_id, can_dlc, data)


def build_bcm_header(opcode, flags, count, ival1_seconds, ival1_usec, ival2_seconds, ival2_usec, can_id, nframes):
    # == Must use native not standard types for packing ==
    # struct bcm_msg_head {
    #     __u32 opcode; -> I
    #     __u32 flags;  -> I
    #     __u32 count;  -> I
    #     struct timeval ival1, ival2; ->  llll ...
    #     canid_t can_id; -> I
    #     __u32 nframes; -> I
    bcm_cmd_msg_fmt = "@IIIllllII"

    return struct.pack(bcm_cmd_msg_fmt,
                       opcode,
                       flags,
                       count,
                       ival1_seconds,
                       ival1_usec,
                       ival2_seconds,
                       ival2_usec,
                       can_id,
                       nframes)


def build_bcm_tx_delete_header(can_id):
    opcode = CAN_BCM_TX_DELETE
    return build_bcm_header(opcode, 0, 0, 0, 0, 0, 0, can_id, 1)


def build_bcm_transmit_header(can_id, count, initial_period, subsequent_period):
    opcode = CAN_BCM_TX_SETUP

    flags = SETTIMER | STARTTIMER

    if initial_period > 0:
        # Note `TX_COUNTEVT` creates the message TX_EXPIRED when count expires
        flags |= TX_COUNTEVT

    def split_time(value):
        """Given seconds as a float, return whole seconds and microseconds"""
        seconds = int(value)
        microseconds = int(1e6 * (value - seconds))
        return seconds, microseconds

    ival1_seconds, ival1_usec = split_time(initial_period)
    ival2_seconds, ival2_usec = split_time(subsequent_period)
    nframes = 1

    return build_bcm_header(opcode, flags, count, ival1_seconds, ival1_usec, ival2_seconds, ival2_usec, can_id, nframes)


def dissect_can_frame(frame):
    can_id, can_dlc, data = struct.unpack(can_frame_fmt, frame)
    return (can_id, can_dlc, data[:can_dlc])


def create_bcm_socket(channel):
    """create a broadcast manager socket and connect to the given interface"""
    try:
        s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
    except AttributeError:
        raise SystemExit("To use BCM sockets you need Python3.4 or higher")
    try:
        s.connect((channel,))
    except OSError as e:
        log.error("Couldn't connect a broadcast manager socket")
        raise e
    return s


class SocketCanBCMBase(object):
    """Mixin to add a BCM socket"""

    def __init__(self, channel, *args, **kwargs):
        self.bcm_socket = create_bcm_socket(channel)
        super(SocketCanBCMBase, self).__init__(channel, *args, **kwargs)


class CyclicSendTask(SocketCanBCMBase, CyclicSendTaskABC):

    def __init__(self, channel, message, period):
        """

        :param channel: The name of the CAN channel to connect to.
        :param message: The message to be sent periodically.
        :param period: The rate in seconds at which to send the message.
        """
        super(CyclicSendTask, self).__init__(channel, message, period)
        self._tx_setup(message)
        self.message = message

    def _tx_setup(self, message):
        # Create a low level packed frame to pass to the kernel
        header = build_bcm_transmit_header(self.can_id, 0, 0.0, self.period)
        frame = build_can_frame(self.can_id, message.data)
        log.info("Sending BCM command")
        self.bcm_socket.send(header + frame)

    def stop(self):
        """Send a TX_DELETE message to cancel this task.

        This will delete the entry for the transmission of the CAN-message
        with the specified can_id CAN identifier. The message length for the command
        TX_DELETE is {[bcm_msg_head]} (only the header).
        """
        try:
            self.bcm_socket.send(build_bcm_tx_delete_header(self.can_id))
        except:
            pass

    def modify_data(self, message):
        """Update the contents of this periodically sent message.
        """
        assert message.arbitration_id == self.can_id, "You cannot modify the can identifier"
        self._tx_setup(message)

    def start(self):
        self._tx_setup(self.message)


class MultiRateCyclicSendTask(CyclicSendTask):

    """Exposes more of the full power of the TX_SETUP opcode.

    Transmits a message `count` times at `initial_period` then
    continues to transmit message at `subsequent_period`.
    """

    def __init__(self, channel, message, count, initial_period, subsequent_period):
        super(MultiRateCyclicSendTask, self).__init__(channel, message, subsequent_period)

        # Create a low level packed frame to pass to the kernel
        frame = build_can_frame(self.can_id, message.data)
        header = build_bcm_transmit_header(
            self.can_id,
            count,
            initial_period,
            subsequent_period)

        log.info("Sending BCM TX_SETUP command")
        self.bcm_socket.send(header + frame)


[docs]def createSocket(can_protocol=None): """Creates a CAN socket. The socket can be BCM or RAW. The socket will be returned unbound to any interface. :param int can_protocol: The protocol to use for the CAN socket, either: * socket.CAN_RAW * socket.CAN_BCM. :return: * -1 if socket creation unsuccessful * socketID - successful creation """ if can_protocol is None or can_protocol == socket.CAN_RAW: can_protocol = socket.CAN_RAW socket_type = socket.SOCK_RAW elif can_protocol == socket.CAN_BCM: can_protocol = socket.CAN_BCM socket_type = socket.SOCK_DGRAM sock = socket.socket(socket.PF_CAN, socket_type, can_protocol) log.info('Created a socket') return sock
[docs]def bindSocket(sock, channel='can0'): """ Binds the given socket to the given interface. :param Socket socketID: The ID of the socket to be bound :raise: :class:`OSError` if the specified interface isn't found. """ log.debug('Binding socket to channel=%s', channel) sock.bind((channel,)) log.debug('Bound socket.')
_CanPacket = namedtuple('_CanPacket', ['timestamp', 'arbitration_id', 'is_error_frame', 'is_extended_frame_format', 'is_remote_transmission_request', 'dlc', 'data'])
[docs]def capturePacket(sock): """ Captures a packet of data from the given socket. :param socket sock: The socket to read a packet from. :return: A namedtuple with the following fields: * timestamp * arbitration_id * is_extended_frame_format * is_remote_transmission_request * is_error_frame * dlc * data """ # Fetching the Arb ID, DLC and Data try: cf, addr = sock.recvfrom(can_frame_size) except BlockingIOError: log.debug('Captured no data, socket in non-blocking mode.') return None except socket.timeout: log.debug('Captured no data, socket read timed out.') return None can_id, can_dlc, data = dissect_can_frame(cf) log.debug('Received: can_id=%x, can_dlc=%x, data=%s', can_id, can_dlc, data) # Fetching the timestamp binary_structure = "@LL" res = fcntl.ioctl(sock, SIOCGSTAMP, struct.pack(binary_structure, 0, 0)) seconds, microseconds = struct.unpack(binary_structure, res) timestamp = seconds + microseconds / 1000000 # EXT, RTR, ERR flags -> boolean attributes # /* special address description flags for the CAN_ID */ # #define CAN_EFF_FLAG 0x80000000U /* EFF/SFF is set in the MSB */ # #define CAN_RTR_FLAG 0x40000000U /* remote transmission request */ # #define CAN_ERR_FLAG 0x20000000U /* error frame */ CAN_EFF_FLAG = bool(can_id & 0x80000000) CAN_RTR_FLAG = bool(can_id & 0x40000000) CAN_ERR_FLAG = bool(can_id & 0x20000000) if CAN_EFF_FLAG: log.debug("CAN: Extended") # TODO does this depend on SFF or EFF? arbitration_id = can_id & 0x1FFFFFFF else: log.debug("CAN: Standard") arbitration_id = can_id & 0x000007FF return _CanPacket(timestamp, arbitration_id, CAN_ERR_FLAG, CAN_EFF_FLAG, CAN_RTR_FLAG, can_dlc, data)
[docs]class SocketcanNative_Bus(BusABC): channel_info = "native socketcan channel" def __init__(self, channel, **kwargs): """ :param str channel: The can interface name with which to create this bus. An example channel would be 'vcan0'. :param list can_filters: A list of dictionaries, each containing a "can_id" and a "can_mask". """ self.socket = createSocket(CAN_RAW) # Add any socket options such as can frame filters if 'can_filters' in kwargs and len(kwargs['can_filters']) > 0: log.debug("Creating a filtered can bus") self.set_filters(kwargs['can_filters']) bindSocket(self.socket, channel) super(SocketcanNative_Bus, self).__init__() def shutdown(self): self.socket.close() def recv(self, timeout=None): if timeout is None or len(select.select([self.socket], [], [], timeout)[0]) > 0: packet = capturePacket(self.socket) # The capturePacket function can return None if # self.socket.settimeout has been called. if packet is None: return None else: # socket wasn't readable or timeout occurred return None rx_msg = Message(timestamp=packet.timestamp, arbitration_id=packet.arbitration_id, extended_id=packet.is_extended_frame_format, is_remote_frame=packet.is_remote_transmission_request, is_error_frame=packet.is_error_frame, dlc=packet.dlc, data=packet.data ) return rx_msg def send(self, msg): log.debug("We've been asked to write a message to the bus") arbitration_id = msg.arbitration_id if msg.id_type: log.debug("sending an extended id type message") arbitration_id |= 0x80000000 if msg.is_remote_frame: log.debug("requesting a remote frame") arbitration_id |= 0x40000000 if msg.is_error_frame: log.warning("Trying to send an error frame - this won't work") arbitration_id |= 0x20000000 l = log.getChild("tx") l.debug("sending: %s", msg) try: self.socket.send(build_can_frame(arbitration_id, msg.data)) except OSError: l.warning("Failed to send: %s", msg) def set_filters(self, can_filters=None): if can_filters is None: # Pass all messages can_filters=[{ 'can_id': 0, 'can_mask': 0 }] can_filter_fmt = "={}I".format(2 * len(can_filters)) filter_data = [] for can_filter in can_filters: filter_data.append(can_filter['can_id']) filter_data.append(can_filter['can_mask']) self.socket.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, struct.pack(can_filter_fmt, *filter_data) )
if __name__ == "__main__": # Create two sockets on vcan0 to test send and receive # # If you want to try it out you can do the following: # # modprobe vcan # ip link add dev vcan0 type vcan # ifconfig vcan0 up log.setLevel(logging.DEBUG) def receiver(e): receiver_socket = createSocket() bindSocket(receiver_socket, 'vcan0') print("Receiver is waiting for a message...") e.set() print("Receiver got: ", capturePacket(receiver_socket)) def sender(e): e.wait() sender_socket = createSocket() bindSocket(sender_socket, 'vcan0') sender_socket.send(build_can_frame(0x01, b'\x01\x02\x03')) print("Sender sent a message.") import threading e = threading.Event() threading.Thread(target=receiver, args=(e,)).start() threading.Thread(target=sender, args=(e,)).start()