"""This module implements an OS and hardware independentvirtual CAN interface for testing purposes.Any VirtualBus instances connecting to the same channeland reside in the same process will receive the same messages."""importloggingimportqueueimporttimefromcopyimportdeepcopyfromrandomimportrandintfromthreadingimportRLockfromtypingimportTYPE_CHECKING,Any,Dict,List,Optional,TuplefromcanimportCanOperationErrorfromcan.busimportBusABC,CanProtocolfromcan.messageimportMessagefromcan.typecheckingimportAutoDetectedConfiglogger=logging.getLogger(__name__)# Channels are lists of queues, one for each connectionifTYPE_CHECKING:# https://mypy.readthedocs.io/en/stable/runtime_troubles.html#using-classes-that-are-generic-in-stubs-but-not-at-runtimechannels:Dict[Optional[Any],List[queue.Queue[Message]]]={}else:channels={}channels_lock=RLock()
[docs]classVirtualBus(BusABC):""" A virtual CAN bus using an internal message queue. It can be used for example for testing. In this interface, a channel is an arbitrary object used as an identifier for connected buses. Implements :meth:`can.BusABC._detect_available_configs`; see :meth:`_detect_available_configs` for how it behaves here. .. note:: The timeout when sending a message applies to each receiver individually. This means that sending can block up to 5 seconds if a message is sent to 5 receivers with the timeout set to 1.0. .. warning:: This interface guarantees reliable delivery and message ordering, but does *not* implement rate limiting or ID arbitration/prioritization under high loads. Please refer to the section :ref:`virtual_interfaces_doc` for more information on this and a comparison to alternatives. """def__init__(self,channel:Any=None,receive_own_messages:bool=False,rx_queue_size:int=0,preserve_timestamps:bool=False,protocol:CanProtocol=CanProtocol.CAN_20,**kwargs:Any,)->None:""" The constructed instance has access to the bus identified by the channel parameter. It is able to see all messages transmitted on the bus by virtual instances constructed with the same channel identifier. :param channel: The channel identifier. This parameter can be an arbitrary value. The bus instance will be able to see messages from other virtual bus instances that were created with the same value. :param receive_own_messages: If set to True, sent messages will be reflected back on the input queue. :param rx_queue_size: The size of the reception queue. The reception queue stores messages until they are read. If the queue reaches its capacity, it will start dropping the oldest messages to make room for new ones. If set to 0, the queue has an infinite capacity. Be aware that this can cause memory leaks if messages are read with a lower frequency than they arrive on the bus. :param preserve_timestamps: If set to True, messages transmitted via :func:`~can.BusABC.send` will keep the timestamp set in the :class:`~can.Message` instance. Otherwise, the timestamp value will be replaced with the current system time. :param protocol: The protocol implemented by this bus instance. The value does not affect the operation of the bus instance and can be set to an arbitrary value for testing purposes. :param kwargs: Additional keyword arguments passed to the parent constructor. """super().__init__(channel=channel,receive_own_messages=receive_own_messages,**kwargs,)# the channel identifier may be an arbitrary objectself.channel_id=channelself._can_protocol=protocolself.channel_info=f"Virtual bus channel {self.channel_id}"self.receive_own_messages=receive_own_messagesself.preserve_timestamps=preserve_timestampsself._open=Truewithchannels_lock:# Create a new channel if one does not existifself.channel_idnotinchannels:channels[self.channel_id]=[]self.channel=channels[self.channel_id]self.queue:queue.Queue[Message]=queue.Queue(rx_queue_size)self.channel.append(self.queue)def_check_if_open(self)->None:"""Raises :exc:`~can.exceptions.CanOperationError` if the bus is not open. Has to be called in every method that accesses the bus. """ifnotself._open:raiseCanOperationError("Cannot operate on a closed bus")def_recv_internal(self,timeout:Optional[float])->Tuple[Optional[Message],bool]:self._check_if_open()try:msg=self.queue.get(block=True,timeout=timeout)exceptqueue.Empty:returnNone,Falseelse:returnmsg,False
[docs]defsend(self,msg:Message,timeout:Optional[float]=None)->None:self._check_if_open()timestamp=msg.timestampifself.preserve_timestampselsetime.time()# Add message to all listening on this channelall_sent=Trueforbus_queueinself.channel:ifbus_queueisself.queueandnotself.receive_own_messages:continuemsg_copy=deepcopy(msg)msg_copy.timestamp=timestampmsg_copy.channel=self.channel_idmsg_copy.is_rx=bus_queueisnotself.queuetry:bus_queue.put(msg_copy,block=True,timeout=timeout)exceptqueue.Full:all_sent=Falseifnotall_sent:raiseCanOperationError("Could not send message to one or more recipients")
[docs]defshutdown(self)->None:super().shutdown()ifself._open:self._open=Falsewithchannels_lock:self.channel.remove(self.queue)# remove if emptyifnotself.channel:delchannels[self.channel_id]
[docs]@staticmethoddef_detect_available_configs()->List[AutoDetectedConfig]:""" Returns all currently used channels as well as one other currently unused channel. .. note:: This method will run into problems if thousands of autodetected buses are used at once. """withchannels_lock:available_channels=list(channels.keys())# find a currently unused channeldefget_extra():returnf"channel-{randint(0,9999)}"extra=get_extra()whileextrainavailable_channels:extra=get_extra()available_channels+=[extra]return[{"interface":"virtual","channel":channel}forchannelinavailable_channels]