Source code for can.interfaces.socketcand.socketcand

"""
Interface to socketcand
see https://github.com/linux-can/socketcand

Authors: Marvin Seiler, Gerrit Telkamp

Copyright (C) 2021  DOMOLOGIC GmbH
http://www.domologic.de
"""
import logging
import os
import select
import socket
import time
import traceback
from collections import deque

import can

log = logging.getLogger(__name__)


def convert_ascii_message_to_can_message(ascii_msg: str) -> can.Message:
    if not ascii_msg.startswith("< frame ") or not ascii_msg.endswith(" >"):
        log.warning(f"Could not parse ascii message: {ascii_msg}")
        return None
    else:
        # frame_string = ascii_msg.removeprefix("< frame ").removesuffix(" >")
        frame_string = ascii_msg[8:-2]
        parts = frame_string.split(" ", 3)
        can_id, timestamp = int(parts[0], 16), float(parts[1])
        is_ext = len(parts[0]) != 3

        data = bytearray.fromhex(parts[2])
        can_dlc = len(data)
        can_message = can.Message(
            timestamp=timestamp,
            arbitration_id=can_id,
            data=data,
            dlc=can_dlc,
            is_extended_id=is_ext,
            is_rx=True,
        )
        return can_message


def convert_can_message_to_ascii_message(can_message: can.Message) -> str:
    # Note: socketcan bus adds extended flag, remote_frame_flag & error_flag to id
    # not sure if that is necessary here
    can_id = can_message.arbitration_id
    if can_message.is_extended_id:
        can_id_string = f"{(can_id&0x1FFFFFFF):08X}"
    else:
        can_id_string = f"{(can_id&0x7FF):03X}"
    # Note: seems like we cannot add CANFD_BRS (bitrate_switch) and CANFD_ESI (error_state_indicator) flags
    data = can_message.data
    length = can_message.dlc
    bytes_string = " ".join(f"{x:x}" for x in data[0:length])
    return f"< send {can_id_string} {length:X} {bytes_string} >"


def connect_to_server(s, host, port):
    timeout_ms = 10000
    now = time.time() * 1000
    end_time = now + timeout_ms
    while now < end_time:
        try:
            s.connect((host, port))
            return
        except Exception as e:
            log.warning(f"Failed to connect to server: {type(e)} Message: {e}")
            now = time.time() * 1000
    raise TimeoutError(
        f"connect_to_server: Failed to connect server for {timeout_ms} ms"
    )


[docs] class SocketCanDaemonBus(can.BusABC): def __init__(self, channel, host, port, tcp_tune=False, can_filters=None, **kwargs): """Connects to a CAN bus served by socketcand. It will attempt to connect to the server for up to 10s, after which a TimeoutError exception will be thrown. If the handshake with the socketcand server fails, a CanError exception is thrown. :param channel: The can interface name served by socketcand. An example channel would be 'vcan0' or 'can0'. :param host: The host address of the socketcand server. :param port: The port of the socketcand server. :param tcp_tune: This tunes the TCP socket for low latency (TCP_NODELAY, and TCP_QUICKACK). This option is not available under windows. :param can_filters: See :meth:`can.BusABC.set_filters`. """ self.__host = host self.__port = port self.__tcp_tune = tcp_tune self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if self.__tcp_tune: if os.name == "nt": self.__tcp_tune = False log.warning("'tcp_tune' not available in Windows. Setting to False") else: self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.__message_buffer = deque() self.__receive_buffer = "" # i know string is not the most efficient here self.channel = channel self.channel_info = f"socketcand on {channel}@{host}:{port}" connect_to_server(self.__socket, self.__host, self.__port) self._expect_msg("< hi >") log.info( f"SocketCanDaemonBus: connected with address {self.__socket.getsockname()}" ) self._tcp_send(f"< open {channel} >") self._expect_msg("< ok >") self._tcp_send("< rawmode >") self._expect_msg("< ok >") super().__init__(channel=channel, can_filters=can_filters, **kwargs) def _recv_internal(self, timeout): if len(self.__message_buffer) != 0: can_message = self.__message_buffer.popleft() return can_message, False try: # get all sockets that are ready (can be a list with a single value # being self.socket or an empty list if self.socket is not ready) ready_receive_sockets, _, _ = select.select( [self.__socket], [], [], timeout ) except OSError as exc: # something bad happened (e.g. the interface went down) log.error(f"Failed to receive: {exc}") raise can.CanError(f"Failed to receive: {exc}") try: if not ready_receive_sockets: # socket wasn't readable or timeout occurred log.debug("Socket not ready") return None, False ascii_msg = self.__socket.recv(1024).decode( "ascii" ) # may contain multiple messages if self.__tcp_tune: self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1) self.__receive_buffer += ascii_msg log.debug(f"Received Ascii Message: {ascii_msg}") buffer_view = self.__receive_buffer chars_processed_successfully = 0 while True: if len(buffer_view) == 0: break start = buffer_view.find("<") if start == -1: log.warning( f"Bad data: No opening < found => discarding entire buffer '{buffer_view}'" ) chars_processed_successfully = len(self.__receive_buffer) break end = buffer_view.find(">") if end == -1: log.warning("Got incomplete message => waiting for more data") if len(buffer_view) > 200: log.warning( "Incomplete message exceeds 200 chars => Discarding" ) chars_processed_successfully = len(self.__receive_buffer) break chars_processed_successfully += end + 1 single_message = buffer_view[start : end + 1] parsed_can_message = convert_ascii_message_to_can_message( single_message ) if parsed_can_message is None: log.warning(f"Invalid Frame: {single_message}") else: parsed_can_message.channel = self.channel self.__message_buffer.append(parsed_can_message) buffer_view = buffer_view[end + 1 :] self.__receive_buffer = self.__receive_buffer[chars_processed_successfully:] can_message = ( None if len(self.__message_buffer) == 0 else self.__message_buffer.popleft() ) return can_message, False except Exception as exc: log.error(f"Failed to receive: {exc} {traceback.format_exc()}") raise can.CanError(f"Failed to receive: {exc} {traceback.format_exc()}") def _tcp_send(self, msg: str): log.debug(f"Sending TCP Message: '{msg}'") self.__socket.sendall(msg.encode("ascii")) if self.__tcp_tune: self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1) def _expect_msg(self, msg): ascii_msg = self.__socket.recv(256).decode("ascii") if self.__tcp_tune: self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1) if not ascii_msg == msg: raise can.CanError(f"{msg} message expected!")
[docs] def send(self, msg, timeout=None): """Transmit a message to the CAN bus. :param msg: A message object. :param timeout: Ignored """ ascii_msg = convert_can_message_to_ascii_message(msg) self._tcp_send(ascii_msg)
[docs] def shutdown(self): """Stops all active periodic tasks and closes the socket.""" super().shutdown() self.__socket.close()