"""This module contains the implementation of :class:`~can.Notifier`."""importasyncioimportfunctoolsimportloggingimportthreadingimporttimefromtypingimportAwaitable,Callable,Iterable,List,Optional,Union,castfromcan.busimportBusABCfromcan.listenerimportListenerfromcan.messageimportMessagelogger=logging.getLogger("can.Notifier")MessageRecipient=Union[Listener,Callable[[Message],Union[Awaitable[None],None]]]
[docs]classNotifier:def__init__(self,bus:Union[BusABC,List[BusABC]],listeners:Iterable[MessageRecipient],timeout:float=1.0,loop:Optional[asyncio.AbstractEventLoop]=None,)->None:"""Manages the distribution of :class:`~can.Message` instances to listeners. Supports multiple buses and listeners. .. Note:: Remember to call `stop()` after all messages are received as many listeners carry out flush operations to persist data. :param bus: A :ref:`bus` or a list of buses to listen to. :param listeners: An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message` and return nothing. :param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`. :param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in. """self.listeners:List[MessageRecipient]=list(listeners)self.bus=busself.timeout=timeoutself._loop=loop#: Exception raised in threadself.exception:Optional[Exception]=Noneself._running=Trueself._lock=threading.Lock()self._readers:List[Union[int,threading.Thread]]=[]buses=self.busifisinstance(self.bus,list)else[self.bus]foreach_businbuses:self.add_bus(each_bus)
[docs]defadd_bus(self,bus:BusABC)->None:"""Add a bus for notification. :param bus: CAN bus instance. """reader:int=-1try:reader=bus.fileno()exceptNotImplementedError:# Bus doesn't support fileno, we fall back to thread based readerpassifself._loopisnotNoneandreader>=0:# Use bus file descriptor to watch for messagesself._loop.add_reader(reader,self._on_message_available,bus)self._readers.append(reader)else:reader_thread=threading.Thread(target=self._rx_thread,args=(bus,),name=f'can.notifier for bus "{bus.channel_info}"',)reader_thread.daemon=Truereader_thread.start()self._readers.append(reader_thread)
[docs]defstop(self,timeout:float=5)->None:"""Stop notifying Listeners when new :class:`~can.Message` objects arrive and call :meth:`~can.Listener.stop` on each Listener. :param timeout: Max time in seconds to wait for receive threads to finish. Should be longer than timeout given at instantiation. """self._running=Falseend_time=time.time()+timeoutforreaderinself._readers:ifisinstance(reader,threading.Thread):now=time.time()ifnow<end_time:reader.join(end_time-now)elifself._loop:# reader is a file descriptorself._loop.remove_reader(reader)forlistenerinself.listeners:ifhasattr(listener,"stop"):listener.stop()
def_rx_thread(self,bus:BusABC)->None:# determine message handling callable early, not inside while loophandle_message=cast(Callable[[Message],None],self._on_message_receivedifself._loopisNoneelsefunctools.partial(self._loop.call_soon_threadsafe,self._on_message_received),)whileself._running:try:ifmsg:=bus.recv(self.timeout):withself._lock:handle_message(msg)exceptExceptionasexc:# pylint: disable=broad-exceptself.exception=excifself._loopisnotNone:self._loop.call_soon_threadsafe(self._on_error,exc)# Raise anywayraiseelifnotself._on_error(exc):# If it was not handled, raise the exception hereraiseelse:# It was handled, so only log itlogger.debug("suppressed exception: %s",exc)def_on_message_available(self,bus:BusABC)->None:ifmsg:=bus.recv(0):self._on_message_received(msg)def_on_message_received(self,msg:Message)->None:forcallbackinself.listeners:res=callback(msg)ifresandself._loopandasyncio.iscoroutine(res):# Schedule coroutineself._loop.create_task(res)def_on_error(self,exc:Exception)->bool:"""Calls ``on_error()`` for all listeners if they implement it. :returns: ``True`` if at least one error handler was called. """was_handled=Falseforlistenerinself.listeners:ifhasattr(listener,"on_error"):try:listener.on_error(exc)exceptNotImplementedError:passelse:was_handled=Truereturnwas_handled
[docs]defadd_listener(self,listener:MessageRecipient)->None:"""Add new Listener to the notification list. If it is already present, it will be called two times each time a message arrives. :param listener: Listener to be added to the list to be notified """self.listeners.append(listener)
[docs]defremove_listener(self,listener:MessageRecipient)->None:"""Remove a listener from the notification list. This method throws an exception if the given listener is not part of the stored listeners. :param listener: Listener to be removed from the list to be notified :raises ValueError: if `listener` was never added to this notifier """self.listeners.remove(listener)