"""This module contains the generic :class:`LogReader` aswell as :class:`MessageSync` which plays back messagesin the recorded order and time intervals."""importgzipimportpathlibimporttimefromtypingimport(Any,Dict,Final,Generator,Iterable,Tuple,Type,Union,)from.._entry_pointsimportread_entry_pointsfrom..messageimportMessagefrom..typecheckingimportAcceptedIOType,FileLike,StringPathLikefrom.ascimportASCReaderfrom.blfimportBLFReaderfrom.canutilsimportCanutilsLogReaderfrom.csvimportCSVReaderfrom.genericimportBinaryIOMessageReader,MessageReaderfrom.mf4importMF4Readerfrom.sqliteimportSqliteReaderfrom.trcimportTRCReader#: A map of file suffixes to their corresponding#: :class:`can.io.generic.MessageReader` classMESSAGE_READERS:Final[Dict[str,Type[MessageReader]]]={".asc":ASCReader,".blf":BLFReader,".csv":CSVReader,".db":SqliteReader,".log":CanutilsLogReader,".mf4":MF4Reader,".trc":TRCReader,}def_update_reader_plugins()->None:"""Update available message reader plugins from entry points."""forentry_pointinread_entry_points("can.io.message_reader"):ifentry_point.keyinMESSAGE_READERS:continuereader_class=entry_point.load()ifissubclass(reader_class,MessageReader):MESSAGE_READERS[entry_point.key]=reader_classdef_get_logger_for_suffix(suffix:str)->Type[MessageReader]:"""Find MessageReader class for given suffix."""try:returnMESSAGE_READERS[suffix]exceptKeyError:raiseValueError(f'No read support for unknown log format "{suffix}"')fromNonedef_decompress(filename:StringPathLike,)->Tuple[Type[MessageReader],Union[str,FileLike]]:""" Return the suffix and io object of the decompressed file. """suffixes=pathlib.Path(filename).suffixesiflen(suffixes)!=2:raiseValueError(f"No write support for unknown log format \"{''.join(suffixes)}\"")fromNonereal_suffix=suffixes[-2].lower()reader_type=_get_logger_for_suffix(real_suffix)mode="rb"ifissubclass(reader_type,BinaryIOMessageReader)else"rt"returnreader_type,gzip.open(filename,mode)
[docs]defLogReader(filename:StringPathLike,**kwargs:Any)->MessageReader:# noqa: N802"""Find and return the appropriate :class:`~can.io.generic.MessageReader` instance for a given file suffix. The format is determined from the file suffix which can be one of: * .asc :class:`can.ASCReader` * .blf :class:`can.BLFReader` * .csv :class:`can.CSVReader` * .db :class:`can.SqliteReader` * .log :class:`can.CanutilsLogReader` * .mf4 :class:`can.MF4Reader` (optional, depends on `asammdf <https://github.com/danielhrisca/asammdf>`_) * .trc :class:`can.TRCReader` Gzip compressed files can be used as long as the original files suffix is one of the above (e.g. filename.asc.gz). Exposes a simple iterator interface, to use simply:: for msg in can.LogReader("some/path/to/my_file.log"): print(msg) :param filename: the filename/path of the file to read from :raises ValueError: if the filename's suffix is of an unknown file type .. note:: There are no time delays, if you want to reproduce the measured delays between messages look at the :class:`can.MessageSync` class. .. note:: This function itself is just a dispatcher, and any positional and keyword arguments are passed on to the returned instance. """_update_reader_plugins()suffix=pathlib.PurePath(filename).suffix.lower()file_or_filename:AcceptedIOType=filenameifsuffix==".gz":reader_type,file_or_filename=_decompress(filename)else:reader_type=_get_logger_for_suffix(suffix)returnreader_type(file=file_or_filename,**kwargs)
[docs]classMessageSync:""" Used to iterate over some given messages in the recorded time. """def__init__(self,messages:Iterable[Message],timestamps:bool=True,gap:float=0.0001,skip:float=60.0,)->None:"""Creates an new **MessageSync** instance. :param messages: An iterable of :class:`can.Message` instances. :param timestamps: Use the messages' timestamps. If False, uses the *gap* parameter as the time between messages. :param gap: Minimum time between sent messages in seconds :param skip: Skip periods of inactivity greater than this (in seconds). Example:: import can with can.LogReader("my_logfile.asc") as reader, can.Bus(interface="virtual") as bus: for msg in can.MessageSync(messages=reader): print(msg) bus.send(msg) """self.raw_messages=messagesself.timestamps=timestampsself.gap=gapself.skip=skipdef__iter__(self)->Generator[Message,None,None]:t_wakeup=playback_start_time=time.perf_counter()recorded_start_time=Nonet_skipped=0.0formessageinself.raw_messages:# Work out the correct wait timeifself.timestamps:ifrecorded_start_timeisNone:recorded_start_time=message.timestampt_wakeup=playback_start_time+(message.timestamp-t_skipped-recorded_start_time)else:t_wakeup+=self.gapsleep_period=t_wakeup-time.perf_counter()ifself.skipandsleep_period>self.skip:t_skipped+=sleep_period-self.skipsleep_period=self.skipifsleep_period>1e-4:time.sleep(sleep_period)yieldmessage