Source code for can.interfaces.serial.serial_can

"""
A text based interface. For example use over serial ports like
"/dev/ttyS1" or "/dev/ttyUSB0" on Linux machines or "COM1" on Windows.
The interface is a simple implementation that has been used for
recording CAN traces.

See the interface documentation for the format being used.
"""

import io
import logging
import struct
from typing import Any, List, Tuple, Optional

from can import BusABC, Message
from can import (
    CanInterfaceNotImplementedError,
    CanInitializationError,
    CanOperationError,
    CanTimeoutError,
)
from can.typechecking import AutoDetectedConfig

logger = logging.getLogger("can.serial")

try:
    import serial
except ImportError:
    logger.warning(
        "You won't be able to use the serial can backend without "
        "the serial module installed!"
    )
    serial = None

try:
    from serial.tools.list_ports import comports as list_comports
except ImportError:
    # If unavailable on some platform, just return nothing
    def list_comports() -> List[Any]:
        return []


[docs]class SerialBus(BusABC): """ Enable basic can communication over a serial device. .. note:: See :meth:`~_recv_internal` for some special semantics. """ def __init__( self, channel: str, baudrate: int = 115200, timeout: float = 0.1, rtscts: bool = False, *args, **kwargs, ) -> None: """ :param channel: The serial device to open. For example "/dev/ttyS1" or "/dev/ttyUSB0" on Linux or "COM1" on Windows systems. :param baudrate: Baud rate of the serial device in bit/s (default 115200). .. warning:: Some serial port implementations don't care about the baudrate. :param timeout: Timeout for the serial device in seconds (default 0.1). :param rtscts: turn hardware handshake (RTS/CTS) on and off :raises ~can.exceptions.CanInitializationError: If the given parameters are invalid. :raises ~can.exceptions.CanInterfaceNotImplementedError: If the serial module is not installed. """ if not serial: raise CanInterfaceNotImplementedError("the serial module is not installed") if not channel: raise TypeError("Must specify a serial port.") self.channel_info = f"Serial interface: {channel}" try: self._ser = serial.serial_for_url( channel, baudrate=baudrate, timeout=timeout, rtscts=rtscts ) except ValueError as error: raise CanInitializationError( "could not create the serial device" ) from error super().__init__(channel, *args, **kwargs) def shutdown(self) -> None: """ Close the serial interface. """ super().shutdown() self._ser.close() def send(self, msg: Message, timeout: Optional[float] = None) -> None: """ Send a message over the serial device. :param msg: Message to send. .. note:: Flags like ``extended_id``, ``is_remote_frame`` and ``is_error_frame`` will be ignored. .. note:: If the timestamp is a float value it will be converted to an integer. :param timeout: This parameter will be ignored. The timeout value of the channel is used instead. """ # Pack timestamp try: timestamp = struct.pack("<I", int(msg.timestamp * 1000)) except struct.error: raise ValueError("Timestamp is out of range") # Pack arbitration ID try: arbitration_id = struct.pack("<I", msg.arbitration_id) except struct.error: raise ValueError("Arbitration ID is out of range") # Assemble message byte_msg = bytearray() byte_msg.append(0xAA) byte_msg += timestamp byte_msg.append(msg.dlc) byte_msg += arbitration_id byte_msg += msg.data byte_msg.append(0xBB) # Write to serial device try: self._ser.write(byte_msg) except serial.PortNotOpenError as error: raise CanOperationError("writing to closed port") from error except serial.SerialTimeoutException as error: raise CanTimeoutError() from error
[docs] def _recv_internal( self, timeout: Optional[float] ) -> Tuple[Optional[Message], bool]: """ Read a message from the serial device. :param timeout: .. warning:: This parameter will be ignored. The timeout value of the channel is used. :returns: Received message and :obj:`False` (because no filtering as taken place). .. warning:: Flags like ``is_extended_id``, ``is_remote_frame`` and ``is_error_frame`` will not be set over this function, the flags in the return message are the default values. """ try: rx_byte = self._ser.read() if rx_byte and ord(rx_byte) == 0xAA: s = self._ser.read(4) timestamp = struct.unpack("<I", s)[0] dlc = ord(self._ser.read()) if dlc > 8: raise ValueError("received DLC may not exceed 8 bytes") s = self._ser.read(4) arbitration_id = struct.unpack("<I", s)[0] if arbitration_id >= 0x20000000: raise ValueError( "received arbitration id may not exceed 2^29 (0x20000000)" ) data = self._ser.read(dlc) delimiter_byte = ord(self._ser.read()) if delimiter_byte == 0xBB: # received message data okay msg = Message( # TODO: We are only guessing that they are milliseconds timestamp=timestamp / 1000, arbitration_id=arbitration_id, dlc=dlc, data=data, ) return msg, False else: raise CanOperationError( f"invalid delimiter byte while reading message: {delimiter_byte}" ) else: return None, False except serial.SerialException as error: raise CanOperationError("could not read from serial") from error
def fileno(self) -> int: try: return self._ser.fileno() except io.UnsupportedOperation: raise NotImplementedError( "fileno is not implemented using current CAN bus on this platform" ) except Exception as exception: raise CanOperationError("Cannot fetch fileno") from exception @staticmethod def _detect_available_configs() -> List[AutoDetectedConfig]: return [ {"interface": "serial", "channel": port.device} for port in list_comports() ]