Source code for can.interfaces.udp_multicast.bus

import logging
import select
import socket
import struct

from typing import List, Optional, Tuple, Union

log = logging.getLogger(__name__)

import can
from can import BusABC
from can.typechecking import AutoDetectedConfig

from .utils import pack_message, unpack_message, check_msgpack_installed

# see socket.getaddrinfo()
IPv4_ADDRESS_INFO = Tuple[str, int]  # address, port
IPv6_ADDRESS_INFO = Tuple[str, int, int, int]  # address, port, flowinfo, scope_id

# Additional constants for the interaction with Unix kernels

[docs]class UdpMulticastBus(BusABC): """A virtual interface for CAN communications between multiple processes using UDP over Multicast IP. It supports IPv4 and IPv6, specified via the channel (which really is just a multicast IP address as a string). You can also specify the port and the IPv6 *hop limit*/the IPv4 *time to live* (TTL). This bus does not support filtering based on message IDs on the kernel level but instead provides it in user space (in Python) as a fallback. Both default addresses should allow for multi-host CAN networks in a normal local area network (LAN) where multicast is enabled. .. note:: The auto-detection of available interfaces (see) is implemented using heuristic that checks if the required socket operations are available. It then returns two configurations, one based on the :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv6` address and another one based on the :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv4` address. .. warning:: The parameter `receive_own_messages` is currently unsupported and setting it to `True` will raise an exception. .. warning:: This interface does not make guarantees on reliable delivery and message ordering, and also does not implement rate limiting or ID arbitration/prioritization under high loads. Please refer to the section :ref:`other_virtual_interfaces` for more information on this and a comparison to alternatives. :param channel: A multicast IPv4 address (in ``) or an IPv6 address (in `ff00::/8`). This defines which version of IP is used. See `Wikipedia ("Multicast address") <>`__ for more details on the addressing schemes. Defaults to :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv6`. :param port: The IP port to read from and write to. :param hop_limit: The hop limit in IPv6 or in IPv4 the time to live (TTL). :param receive_own_messages: If transmitted messages should also be received by this bus. CURRENTLY UNSUPPORTED. :param fd: If CAN-FD frames should be supported. If set to false, an error will be raised upon sending such a frame and such received frames will be ignored. :param can_filters: See :meth:`~can.BusABC.set_filters`. :raises RuntimeError: If the *msgpack*-dependency is not available. It should be installed on all non Windows platforms via the `` requirements. :raises NotImplementedError: If the `receive_own_messages` is passed as `True`. """ #: An arbitrary IPv6 multicast address with "site-local" scope, i.e. only to be routed within the local #: physical network and not beyond it. It should allow for multi-host CAN networks in a normal IPv6 LAN. #: This is the default channel and should work with most modern routers if multicast is allowed. DEFAULT_GROUP_IPv6 = "ff15:7079:7468:6f6e:6465:6d6f:6d63:6173" #: An arbitrary IPv4 multicast address with "administrative" scope, i.e. only to be routed within #: administrative organizational boundaries and not beyond it. #: It should allow for multi-host CAN networks in a normal IPv4 LAN. #: This is provided as a default fallback channel if IPv6 is (still) not supported. DEFAULT_GROUP_IPv4 = "" def __init__( self, channel: str = DEFAULT_GROUP_IPv6, port: int = 43113, hop_limit: int = 1, receive_own_messages: bool = False, fd: bool = True, **kwargs, ) -> None: check_msgpack_installed() if receive_own_messages: raise can.CanInterfaceNotImplementedError( "receiving own messages is not yet implemented" ) super().__init__(channel, **kwargs) self.is_fd = fd self._multicast = GeneralPurposeUdpMulticastBus(channel, port, hop_limit) def _recv_internal(self, timeout: Optional[float]): result = self._multicast.recv(timeout) if not result: return None, False data, _, timestamp = result try: can_message = unpack_message( data, replace={"timestamp": timestamp}, check=True ) except Exception as exception: raise can.CanOperationError( "could not unpack received message" ) from exception if not self.is_fd and can_message.is_fd: return None, False return can_message, False def send(self, msg: can.Message, timeout: Optional[float] = None) -> None: if not self.is_fd and msg.is_fd: raise can.CanOperationError( "cannot send FD message over bus with CAN FD disabled" ) data = pack_message(msg) self._multicast.send(data, timeout)
[docs] def fileno(self) -> int: """Provides the internally used file descriptor of the socket or `-1` if not available.""" return self._multicast.fileno()
[docs] def shutdown(self) -> None: """Close all sockets and free up any resources. Never throws errors and only logs them. """ super().shutdown() self._multicast.shutdown()
@staticmethod def _detect_available_configs() -> List[AutoDetectedConfig]: if hasattr(socket, "CMSG_SPACE"): return [ { "interface": "udp_multicast", "channel": UdpMulticastBus.DEFAULT_GROUP_IPv6, }, { "interface": "udp_multicast", "channel": UdpMulticastBus.DEFAULT_GROUP_IPv4, }, ] # else, this interface cannot be used return []
class GeneralPurposeUdpMulticastBus: """A general purpose send and receive handler for multicast over IP/UDP. However, it raises CAN-specific exceptions for convenience. """ def __init__( self, group: str, port: int, hop_limit: int, max_buffer: int = 4096 ) -> None: = group self.port = port self.hop_limit = hop_limit self.max_buffer = max_buffer # Look up multicast group address in name server and find out IP version of the first suitable target # and then get the address family of it (socket.AF_INET or socket.AF_INET6) connection_candidates = socket.getaddrinfo( # type: ignore group, self.port, type=socket.SOCK_DGRAM ) sock = None for connection_candidate in connection_candidates: address_family: socket.AddressFamily = connection_candidate[0] self.ip_version = 4 if address_family == socket.AF_INET else 6 try: sock = self._create_socket(address_family) except OSError as error: "could not connect to the multicast IP network of candidate %s; reason: %s", connection_candidates, error, ) if sock is not None: self._socket = sock else: raise can.CanInitializationError( "could not connect to a multicast IP network" ) # used in recv() self.received_timestamp_struct = "@ll" ancillary_data_size = struct.calcsize(self.received_timestamp_struct) self.received_ancillary_buffer_size = socket.CMSG_SPACE(ancillary_data_size) # used by send() self._send_destination = (, self.port) self._last_send_timeout: Optional[float] = None def _create_socket(self, address_family: socket.AddressFamily) -> socket.socket: """Creates a new socket. This might fail and raise an exception! :param address_family: whether this is of type `socket.AF_INET` or `socket.AF_INET6` :raises can.CanInitializationError: if the socket could not be opened or configured correctly; in this case, it is guaranteed to be closed/cleaned up """ # create the UDP socket # this might already fail but then there is nothing to clean up sock = socket.socket(address_family, socket.SOCK_DGRAM) # configure the socket try: # set hop limit / TTL ttl_as_binary = struct.pack("@I", self.hop_limit) if self.ip_version == 4: sock.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_as_binary ) else: sock.setsockopt( socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_as_binary ) # Allow multiple programs to access that address + port sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # set how to receive timestamps sock.setsockopt(socket.SOL_SOCKET, SO_TIMESTAMPNS, 1) # Bind it to the port (on any interface) sock.bind(("", self.port)) # Join the multicast group group_as_binary = socket.inet_pton(address_family, if self.ip_version == 4: request = group_as_binary + struct.pack("@I", socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, request) else: request = group_as_binary + struct.pack("@I", 0) sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, request) return sock except OSError as error: # clean up the incompletely configured but opened socket try: sock.close() except OSError as close_error: # ignore but log any failures in here log.warning("Could not close partly configured socket: %s", close_error) # still raise the error raise can.CanInitializationError( "could not create or configure socket" ) from error def send(self, data: bytes, timeout: Optional[float] = None) -> None: """Send data to all group members. This call blocks. :param timeout: the timeout in seconds after which an Exception is raised is sending has failed :param data: the data to be sent :raises OSError: if an error occurred while writing to the underlying socket :raises socket.timeout: if the timeout ran out before sending was completed (this is a subclass of *OSError*) """ if timeout != self._last_send_timeout: self._last_send_timeout = timeout # this applies to all blocking calls on the socket, but sending is the only one that is blocking self._socket.settimeout(timeout) bytes_sent = self._socket.sendto(data, self._send_destination) if bytes_sent < len(data): raise socket.timeout() def recv( self, timeout: Optional[float] = None ) -> Optional[Tuple[bytes, IP_ADDRESS_INFO, float]]: """ Receive up to **max_buffer** bytes. :param timeout: the timeout in seconds after which `None` is returned if no data arrived :returns: `None` on timeout, or a 3-tuple comprised of: - received data, - the sender of the data, and - a timestamp in seconds """ # get all sockets that are ready (can be a list with a single value # being self.socket or an empty list if self.socket is not ready) try: # get all sockets that are ready (can be a list with a single value # being self.socket or an empty list if self.socket is not ready) ready_receive_sockets, _, _ =[self._socket], [], [], timeout) except socket.error as exc: # something bad (not a timeout) happened (e.g. the interface went down) raise can.CanOperationError( f"Failed to wait for IP/UDP socket: {exc}" ) from exc if ready_receive_sockets: # not empty # fetch data & source address ( raw_message_data, ancillary_data, _, # flags sender_address, ) = self._socket.recvmsg( self.max_buffer, self.received_ancillary_buffer_size ) # fetch timestamp; this is configured in in _create_socket() assert len(ancillary_data) == 1, "only requested a single extra field" cmsg_level, cmsg_type, cmsg_data = ancillary_data[0] assert ( cmsg_level == socket.SOL_SOCKET and cmsg_type == SO_TIMESTAMPNS ), "received control message type that was not requested" # see -> struct timespec for details seconds, nanoseconds = struct.unpack( self.received_timestamp_struct, cmsg_data ) if nanoseconds >= 1e9: raise can.CanError( f"Timestamp nanoseconds field was out of range: {nanoseconds} not less than 1e9" ) timestamp = seconds + nanoseconds * 1.0e-9 return raw_message_data, sender_address, timestamp # socket wasn't readable or timeout occurred return None def fileno(self) -> int: """Provides the internally used file descriptor of the socket or `-1` if not available.""" return self._socket.fileno() def shutdown(self) -> None: """Close all sockets and free up any resources. Never throws errors and only logs them. """ try: self._socket.close() except OSError as exception: log.error("could not close IP socket: %s", exception)