fromthreadingimportRLocktry:# Only raise an exception on instantiation but allow module# to be importedfromwraptimportObjectProxyimport_exc=NoneexceptImportErrorasexc:ObjectProxy=objectimport_exc=excfromcontextlibimportnullcontextfrom.interfaceimportBus
[docs]classThreadSafeBus(ObjectProxy):# pylint: disable=abstract-method""" Contains a thread safe :class:`can.BusABC` implementation that wraps around an existing interface instance. All public methods of that base class are now safe to be called from multiple threads. The send and receive methods are synchronized separately. Use this as a drop-in replacement for :class:`~can.BusABC`. .. note:: This approach assumes that both :meth:`~can.BusABC.send` and :meth:`~can.BusABC._recv_internal` of the underlying bus instance can be called simultaneously, and that the methods use :meth:`~can.BusABC._recv_internal` instead of :meth:`~can.BusABC.recv` directly. """def__init__(self,*args,**kwargs):ifimport_excisnotNone:raiseimport_excsuper().__init__(Bus(*args,**kwargs))# now, BusABC.send_periodic() does not need a lock anymore, but the# implementation still requires a context managerself.__wrapped__._lock_send_periodic=nullcontext()# init locks for sending and receiving separatelyself._lock_send=RLock()self._lock_recv=RLock()defrecv(self,timeout=None,*args,**kwargs):# pylint: disable=keyword-arg-before-varargwithself._lock_recv:returnself.__wrapped__.recv(timeout=timeout,*args,**kwargs)defsend(self,msg,timeout=None,*args,**kwargs):# pylint: disable=keyword-arg-before-varargwithself._lock_send:returnself.__wrapped__.send(msg,timeout=timeout,*args,**kwargs)# send_periodic does not need a lock, since the underlying# `send` method is already synchronized@propertydeffilters(self):withself._lock_recv:returnself.__wrapped__.filters@filters.setterdeffilters(self,filters):withself._lock_recv:self.__wrapped__.filters=filtersdefset_filters(self,filters=None,*args,**kwargs):# pylint: disable=keyword-arg-before-varargwithself._lock_recv:returnself.__wrapped__.set_filters(filters=filters,*args,**kwargs)defflush_tx_buffer(self,*args,**kwargs):withself._lock_send:returnself.__wrapped__.flush_tx_buffer(*args,**kwargs)defshutdown(self,*args,**kwargs):withself._lock_send,self._lock_recv:returnself.__wrapped__.shutdown(*args,**kwargs)@propertydefstate(self):withself._lock_send,self._lock_recv:returnself.__wrapped__.state@state.setterdefstate(self,new_state):withself._lock_send,self._lock_recv:self.__wrapped__.state=new_state