Source code for can.io.csv

"""
This module contains handling for CSV (comma separated values) files.

TODO: CAN FD messages are not yet supported.

TODO: This module could use https://docs.python.org/2/library/csv.html#module-csv
      to allow different delimiters for writing, special escape chars to circumvent
      the base64 encoding and use csv.Sniffer to automatically deduce the delimiters
      of a CSV file.
"""

from base64 import b64encode, b64decode
from typing import TextIO, Generator, Union, Any

from can.message import Message
from .generic import FileIOMessageWriter, MessageReader
from ..typechecking import StringPathLike


[docs]class CSVReader(MessageReader): """Iterator over CAN messages from a .csv file that was generated by :class:`~can.CSVWriter` or that uses the same format as described there. Assumes that there is a header and thus skips the first line. Any line separator is accepted. """ file: TextIO def __init__( self, file: Union[StringPathLike, TextIO], *args: Any, **kwargs: Any, ) -> None: """ :param file: a path-like object or as file-like object to read from If this is a file-like object, is has to opened in text read mode, not binary read mode. """ super().__init__(file, mode="r") def __iter__(self) -> Generator[Message, None, None]: # skip the header line try: next(self.file) except StopIteration: # don't crash on a file with only a header return for line in self.file: timestamp, arbitration_id, extended, remote, error, dlc, data = line.split( "," ) yield Message( timestamp=float(timestamp), is_remote_frame=(remote == "1"), is_extended_id=(extended == "1"), is_error_frame=(error == "1"), arbitration_id=int(arbitration_id, base=16), dlc=int(dlc), data=b64decode(data), ) self.stop()
[docs]class CSVWriter(FileIOMessageWriter): """Writes a comma separated text file with a line for each message. Includes a header line. The columns are as follows: ================ ======================= =============== name of column format description example ================ ======================= =============== timestamp decimal float 1483389946.197 arbitration_id hex 0x00dadada extended 1 == True, 0 == False 1 remote 1 == True, 0 == False 0 error 1 == True, 0 == False 0 dlc int 6 data base64 encoded WzQyLCA5XQ== ================ ======================= =============== Each line is terminated with a platform specific line separator. """ file: TextIO def __init__( self, file: Union[StringPathLike, TextIO], append: bool = False, *args: Any, **kwargs: Any, ) -> None: """ :param file: a path-like object or a file-like object to write to. If this is a file-like object, is has to open in text write mode, not binary write mode. :param bool append: if set to `True` messages are appended to the file and no header line is written, else the file is truncated and starts with a newly written header line """ mode = "a" if append else "w" super().__init__(file, mode=mode) # Write a header row if not append: self.file.write("timestamp,arbitration_id,extended,remote,error,dlc,data\n")
[docs] def on_message_received(self, msg: Message) -> None: row = ",".join( [ repr(msg.timestamp), # cannot use str() here because that is rounding hex(msg.arbitration_id), "1" if msg.is_extended_id else "0", "1" if msg.is_remote_frame else "0", "1" if msg.is_error_frame else "0", str(msg.dlc), b64encode(msg.data).decode("utf8"), ] ) self.file.write(row) self.file.write("\n")