[docs]classGsUsbBus(can.BusABC):def__init__(self,channel,bitrate,index=None,bus=None,address=None,can_filters=None,**kwargs,):""" :param channel: usb device name :param index: device number if using automatic scan, starting from 0. If specified, bus/address shall not be provided. :param bus: number of the bus that the device is connected to :param address: address of the device on the bus it is connected to :param can_filters: not supported :param bitrate: CAN network bandwidth (bits/s) """if(indexisnotNone)and((busoraddress)isnotNone):raiseCanInitializationError("index and bus/address cannot be used simultaneously")ifindexisnotNone:devs=GsUsb.scan()iflen(devs)<=index:raiseCanInitializationError(f"Cannot find device {index}. Devices found: {len(devs)}")gs_usb=devs[index]else:gs_usb=GsUsb.find(bus=bus,address=address)ifnotgs_usb:raiseCanInitializationError(f"Cannot find device {channel}")self.gs_usb=gs_usbself.channel_info=channelself._can_protocol=can.CanProtocol.CAN_20self.gs_usb.set_bitrate(bitrate)self.gs_usb.start()super().__init__(channel=channel,can_filters=can_filters,**kwargs,)
[docs]defsend(self,msg:can.Message,timeout:Optional[float]=None):"""Transmit a message to the CAN bus. :param Message msg: A message object. :param timeout: timeout is not supported. The function won't return until message is sent or exception is raised. :raises CanOperationError: if the message could not be sent """can_id=msg.arbitration_idifmsg.is_extended_id:can_id=can_id|CAN_EFF_FLAGifmsg.is_remote_frame:can_id=can_id|CAN_RTR_FLAGifmsg.is_error_frame:can_id=can_id|CAN_ERR_FLAG# Pad message datamsg.data.extend([0x00]*(CAN_MAX_DLC-len(msg.data)))frame=GsUsbFrame()frame.can_id=can_idframe.can_dlc=msg.dlcframe.timestamp_us=int(msg.timestamp*1000000)frame.data=list(msg.data)try:self.gs_usb.send(frame)exceptusb.core.USBErrorasexc:raiseCanOperationError("The message could not be sent")fromexc
def_recv_internal(self,timeout:Optional[float])->Tuple[Optional[can.Message],bool]:""" Read a message from the bus and tell whether it was filtered. This methods may be called by :meth:`~can.BusABC.recv` to read a message multiple times if the filters set by :meth:`~can.BusABC.set_filters` do not match and the call has not yet timed out. Never raises an error/exception. :param float timeout: seconds to wait for a message, see :meth:`~can.BusABC.send` 0 and None will be converted to minimum value 1ms. :return: 1. a message that was read or None on timeout 2. a bool that is True if message filtering has already been done and else False. In this interface it is always False since filtering is not available """frame=GsUsbFrame()# Do not set timeout as None or zero here to avoid blockingtimeout_ms=round(timeout*1000)iftimeoutelse1ifnotself.gs_usb.read(frame=frame,timeout_ms=timeout_ms):returnNone,Falsemsg=can.Message(timestamp=frame.timestamp,arbitration_id=frame.arbitration_id,is_extended_id=frame.is_extended_id,is_remote_frame=frame.is_remote_frame,is_error_frame=frame.is_error_frame,channel=self.channel_info,dlc=frame.can_dlc,data=bytearray(frame.data)[0:frame.can_dlc],is_rx=frame.echo_id==GS_USB_NONE_ECHO_ID,)returnmsg,False