"""
Interface to socketcand
see https://github.com/linux-can/socketcand
Authors: Marvin Seiler, Gerrit Telkamp
Copyright (C) 2021 DOMOLOGIC GmbH
http://www.domologic.de
"""
import logging
import os
import select
import socket
import time
import traceback
from collections import deque
import can
log = logging.getLogger(__name__)
def convert_ascii_message_to_can_message(ascii_msg: str) -> can.Message:
if not ascii_msg.startswith("< frame ") or not ascii_msg.endswith(" >"):
log.warning(f"Could not parse ascii message: {ascii_msg}")
return None
else:
# frame_string = ascii_msg.removeprefix("< frame ").removesuffix(" >")
frame_string = ascii_msg[8:-2]
parts = frame_string.split(" ", 3)
can_id, timestamp = int(parts[0], 16), float(parts[1])
is_ext = len(parts[0]) != 3
data = bytearray.fromhex(parts[2])
can_dlc = len(data)
can_message = can.Message(
timestamp=timestamp,
arbitration_id=can_id,
data=data,
dlc=can_dlc,
is_extended_id=is_ext,
is_rx=True,
)
return can_message
def convert_can_message_to_ascii_message(can_message: can.Message) -> str:
# Note: socketcan bus adds extended flag, remote_frame_flag & error_flag to id
# not sure if that is necessary here
can_id = can_message.arbitration_id
if can_message.is_extended_id:
can_id_string = f"{(can_id&0x1FFFFFFF):08X}"
else:
can_id_string = f"{(can_id&0x7FF):03X}"
# Note: seems like we cannot add CANFD_BRS (bitrate_switch) and CANFD_ESI (error_state_indicator) flags
data = can_message.data
length = can_message.dlc
bytes_string = " ".join(f"{x:x}" for x in data[0:length])
return f"< send {can_id_string} {length:X} {bytes_string} >"
def connect_to_server(s, host, port):
timeout_ms = 10000
now = time.time() * 1000
end_time = now + timeout_ms
while now < end_time:
try:
s.connect((host, port))
return
except Exception as e:
log.warning(f"Failed to connect to server: {type(e)} Message: {e}")
now = time.time() * 1000
raise TimeoutError(
f"connect_to_server: Failed to connect server for {timeout_ms} ms"
)
[docs]
class SocketCanDaemonBus(can.BusABC):
def __init__(self, channel, host, port, tcp_tune=False, can_filters=None, **kwargs):
"""Connects to a CAN bus served by socketcand.
It will attempt to connect to the server for up to 10s, after which a
TimeoutError exception will be thrown.
If the handshake with the socketcand server fails, a CanError exception
is thrown.
:param channel:
The can interface name served by socketcand.
An example channel would be 'vcan0' or 'can0'.
:param host:
The host address of the socketcand server.
:param port:
The port of the socketcand server.
:param tcp_tune:
This tunes the TCP socket for low latency (TCP_NODELAY, and
TCP_QUICKACK).
This option is not available under windows.
:param can_filters:
See :meth:`can.BusABC.set_filters`.
"""
self.__host = host
self.__port = port
self.__tcp_tune = tcp_tune
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.__tcp_tune:
if os.name == "nt":
self.__tcp_tune = False
log.warning("'tcp_tune' not available in Windows. Setting to False")
else:
self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.__message_buffer = deque()
self.__receive_buffer = "" # i know string is not the most efficient here
self.channel = channel
self.channel_info = f"socketcand on {channel}@{host}:{port}"
connect_to_server(self.__socket, self.__host, self.__port)
self._expect_msg("< hi >")
log.info(
f"SocketCanDaemonBus: connected with address {self.__socket.getsockname()}"
)
self._tcp_send(f"< open {channel} >")
self._expect_msg("< ok >")
self._tcp_send("< rawmode >")
self._expect_msg("< ok >")
super().__init__(channel=channel, can_filters=can_filters, **kwargs)
def _recv_internal(self, timeout):
if len(self.__message_buffer) != 0:
can_message = self.__message_buffer.popleft()
return can_message, False
try:
# get all sockets that are ready (can be a list with a single value
# being self.socket or an empty list if self.socket is not ready)
ready_receive_sockets, _, _ = select.select(
[self.__socket], [], [], timeout
)
except OSError as exc:
# something bad happened (e.g. the interface went down)
log.error(f"Failed to receive: {exc}")
raise can.CanError(f"Failed to receive: {exc}")
try:
if not ready_receive_sockets:
# socket wasn't readable or timeout occurred
log.debug("Socket not ready")
return None, False
ascii_msg = self.__socket.recv(1024).decode(
"ascii"
) # may contain multiple messages
if self.__tcp_tune:
self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
self.__receive_buffer += ascii_msg
log.debug(f"Received Ascii Message: {ascii_msg}")
buffer_view = self.__receive_buffer
chars_processed_successfully = 0
while True:
if len(buffer_view) == 0:
break
start = buffer_view.find("<")
if start == -1:
log.warning(
f"Bad data: No opening < found => discarding entire buffer '{buffer_view}'"
)
chars_processed_successfully = len(self.__receive_buffer)
break
end = buffer_view.find(">")
if end == -1:
log.warning("Got incomplete message => waiting for more data")
if len(buffer_view) > 200:
log.warning(
"Incomplete message exceeds 200 chars => Discarding"
)
chars_processed_successfully = len(self.__receive_buffer)
break
chars_processed_successfully += end + 1
single_message = buffer_view[start : end + 1]
parsed_can_message = convert_ascii_message_to_can_message(
single_message
)
if parsed_can_message is None:
log.warning(f"Invalid Frame: {single_message}")
else:
parsed_can_message.channel = self.channel
self.__message_buffer.append(parsed_can_message)
buffer_view = buffer_view[end + 1 :]
self.__receive_buffer = self.__receive_buffer[chars_processed_successfully:]
can_message = (
None
if len(self.__message_buffer) == 0
else self.__message_buffer.popleft()
)
return can_message, False
except Exception as exc:
log.error(f"Failed to receive: {exc} {traceback.format_exc()}")
raise can.CanError(f"Failed to receive: {exc} {traceback.format_exc()}")
def _tcp_send(self, msg: str):
log.debug(f"Sending TCP Message: '{msg}'")
self.__socket.sendall(msg.encode("ascii"))
if self.__tcp_tune:
self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
def _expect_msg(self, msg):
ascii_msg = self.__socket.recv(256).decode("ascii")
if self.__tcp_tune:
self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
if not ascii_msg == msg:
raise can.CanError(f"{msg} message expected!")
[docs]
def send(self, msg, timeout=None):
"""Transmit a message to the CAN bus.
:param msg: A message object.
:param timeout: Ignored
"""
ascii_msg = convert_can_message_to_ascii_message(msg)
self._tcp_send(ascii_msg)
[docs]
def shutdown(self):
"""Stops all active periodic tasks and closes the socket."""
super().shutdown()
self.__socket.close()