"""A text based interface. For example use over serial ports like"/dev/ttyS1" or "/dev/ttyUSB0" on Linux machines or "COM1" on Windows.The interface is a simple implementation that has been used forrecording CAN traces.See the interface documentation for the format being used."""importioimportloggingimportstructfromtypingimportAny,List,Optional,Tuplefromcanimport(BusABC,CanInitializationError,CanInterfaceNotImplementedError,CanOperationError,CanProtocol,CanTimeoutError,Message,)fromcan.typecheckingimportAutoDetectedConfiglogger=logging.getLogger("can.serial")try:importserialexceptImportError:logger.warning("You won't be able to use the serial can backend without ""the serial module installed!")serial=Nonetry:fromserial.tools.list_portsimportcomportsaslist_comportsexceptImportError:# If unavailable on some platform, just return nothingdeflist_comports()->List[Any]:return[]
[docs]classSerialBus(BusABC):""" Enable basic can communication over a serial device. .. note:: See :meth:`~_recv_internal` for some special semantics. """def__init__(self,channel:str,baudrate:int=115200,timeout:float=0.1,rtscts:bool=False,*args,**kwargs,)->None:""" :param channel: The serial device to open. For example "/dev/ttyS1" or "/dev/ttyUSB0" on Linux or "COM1" on Windows systems. :param baudrate: Baud rate of the serial device in bit/s (default 115200). .. warning:: Some serial port implementations don't care about the baudrate. :param timeout: Timeout for the serial device in seconds (default 0.1). :param rtscts: turn hardware handshake (RTS/CTS) on and off :raises ~can.exceptions.CanInitializationError: If the given parameters are invalid. :raises ~can.exceptions.CanInterfaceNotImplementedError: If the serial module is not installed. """ifnotserial:raiseCanInterfaceNotImplementedError("the serial module is not installed")ifnotchannel:raiseTypeError("Must specify a serial port.")self.channel_info=f"Serial interface: {channel}"self._can_protocol=CanProtocol.CAN_20try:self._ser=serial.serial_for_url(channel,baudrate=baudrate,timeout=timeout,rtscts=rtscts)exceptValueErroraserror:raiseCanInitializationError("could not create the serial device")fromerrorsuper().__init__(channel,*args,**kwargs)defshutdown(self)->None:""" Close the serial interface. """super().shutdown()self._ser.close()defsend(self,msg:Message,timeout:Optional[float]=None)->None:""" Send a message over the serial device. :param msg: Message to send. .. note:: Flags like ``extended_id``, ``is_remote_frame`` and ``is_error_frame`` will be ignored. .. note:: If the timestamp is a float value it will be converted to an integer. :param timeout: This parameter will be ignored. The timeout value of the channel is used instead. """# Pack timestamptry:timestamp=struct.pack("<I",int(msg.timestamp*1000))exceptstruct.error:raiseValueError(f"Timestamp is out of range: {msg.timestamp}")fromNone# Pack arbitration IDtry:arbitration_id=struct.pack("<I",msg.arbitration_id)exceptstruct.error:raiseValueError(f"Arbitration ID is out of range: {msg.arbitration_id}")fromNone# Assemble messagebyte_msg=bytearray()byte_msg.append(0xAA)byte_msg+=timestampbyte_msg.append(msg.dlc)byte_msg+=arbitration_idbyte_msg+=msg.databyte_msg.append(0xBB)# Write to serial devicetry:self._ser.write(byte_msg)exceptserial.PortNotOpenErroraserror:raiseCanOperationError("writing to closed port")fromerrorexceptserial.SerialTimeoutExceptionaserror:raiseCanTimeoutError()fromerror
[docs]def_recv_internal(self,timeout:Optional[float])->Tuple[Optional[Message],bool]:""" Read a message from the serial device. :param timeout: .. warning:: This parameter will be ignored. The timeout value of the channel is used. :returns: Received message and :obj:`False` (because no filtering as taken place). .. warning:: Flags like ``is_extended_id``, ``is_remote_frame`` and ``is_error_frame`` will not be set over this function, the flags in the return message are the default values. """try:rx_byte=self._ser.read()ifrx_byteandord(rx_byte)==0xAA:s=self._ser.read(4)timestamp=struct.unpack("<I",s)[0]dlc=ord(self._ser.read())ifdlc>8:raiseValueError("received DLC may not exceed 8 bytes")s=self._ser.read(4)arbitration_id=struct.unpack("<I",s)[0]ifarbitration_id>=0x20000000:raiseValueError("received arbitration id may not exceed 2^29 (0x20000000)")data=self._ser.read(dlc)delimiter_byte=ord(self._ser.read())ifdelimiter_byte==0xBB:# received message data okaymsg=Message(# TODO: We are only guessing that they are millisecondstimestamp=timestamp/1000,arbitration_id=arbitration_id,dlc=dlc,data=data,)returnmsg,Falseelse:raiseCanOperationError(f"invalid delimiter byte while reading message: {delimiter_byte}")else:returnNone,Falseexceptserial.SerialExceptionaserror:raiseCanOperationError("could not read from serial")fromerror
deffileno(self)->int:try:returnself._ser.fileno()exceptio.UnsupportedOperation:raiseNotImplementedError("fileno is not implemented using current CAN bus on this platform")fromNoneexceptExceptionasexception:raiseCanOperationError("Cannot fetch fileno")fromexception@staticmethoddef_detect_available_configs()->List[AutoDetectedConfig]:return[{"interface":"serial","channel":port.device}forportinlist_comports()]