Source code for can.io.csv

#!/usr/bin/env python
# coding: utf-8

"""
This module contains handling for CSV (comma seperated values) files.

TODO: CAN FD messages are not yet supported.
TODO: This module could use https://docs.python.org/2/library/csv.html#module-csv
      to allow different delimiters for writing, special escape chars to circumvent
      the base64 encoding and use csv.Sniffer to automatically deduce the delimiters
      of a CSV file.
"""

from base64 import b64encode, b64decode

from can.message import Message
from can.listener import Listener


[docs]class CSVWriter(Listener): """Writes a comma separated text file with a line for each message. The columns are as follows: ================ ======================= =============== name of column format description example ================ ======================= =============== timestamp decimal float 1483389946.197 arbitration_id hex 0x00dadada extended 1 == True, 0 == False 1 remote 1 == True, 0 == False 0 error 1 == True, 0 == False 0 dlc int 6 data base64 encoded WzQyLCA5XQ== ================ ======================= =============== Each line is terminated with a platform specific line seperator. """ def __init__(self, filename): self.csv_file = open(filename, 'wt') # Write a header row self.csv_file.write("timestamp,arbitration_id,extended,remote,error,dlc,data\n") def on_message_received(self, msg): row = ','.join([ repr(msg.timestamp), # cannot use str() here because that is rounding hex(msg.arbitration_id), '1' if msg.id_type else '0', '1' if msg.is_remote_frame else '0', '1' if msg.is_error_frame else '0', str(msg.dlc), b64encode(msg.data).decode('utf8') ]) self.csv_file.write(row + '\n')
[docs] def stop(self): self.csv_file.flush() self.csv_file.close()
class CSVReader(): """Iterator over CAN messages from a .csv file that was generated by :class:`~can.CSVWriter` or that uses the same format that is described there. """ def __init__(self, filename): self.csv_file = open(filename, 'rt') # skip the header line self.header_line = next(self.csv_file).split(',') def __iter__(self): for line in self.csv_file: timestamp, arbitration_id, extended, remote, error, dlc, data = line.split(',') yield Message( timestamp=float(timestamp), is_remote_frame=(remote == '1'), extended_id=(extended == '1'), is_error_frame=(error == '1'), arbitration_id=int(arbitration_id, base=16), dlc=int(dlc), data=b64decode(data), ) self.csv_file.close()