Source code for can.io.asc

from can.listener import Listener
from can.message import Message

from datetime import datetime
import time
CAN_MSG_EXT = 0x80000000
CAN_ID_MASK = 0x1FFFFFFF

[docs]class ASCReader(object): """ Iterator of CAN messages from a ASC Logging File. """ def __init__(self, filename): self.fp = open(filename, "r") def __iter__(self): def extractCanId(strCanId): if strCanId[-1:].lower() == "x": isExtended = True can_id = int(strCanId[0:-1], 16) else: isExtended = False can_id = int(strCanId, 16) return (can_id, isExtended) for line in self.fp: temp = line.strip() if len(temp) == 0 or not temp[0].isdigit(): continue (time, channel, dummy) = temp.split(None,2) # , frameType, dlc, frameData time = float(time) if dummy.strip()[0:10] == "ErrorFrame": time = float(time) msg = Message(timestamp=time, is_error_frame=True) yield msg continue if not channel.isdigit() or dummy.strip()[0:10] == "Statistic:": continue if dummy[-1:].lower() == "r": (canId, _) = dummy.split(None, 1) msg = Message(timestamp=time, arbitration_id=extractCanId(canId)[0] & CAN_ID_MASK, extended_id=extractCanId(canId)[1], is_remote_frame=True) yield msg else: (canId, direction,_,dlc,data) = dummy.split(None,4) dlc = int(dlc) frame = bytearray() data = data.split() for byte in data[0:dlc]: frame.append(int(byte,16)) msg = Message(timestamp=time, arbitration_id=extractCanId(canId)[0] & CAN_ID_MASK, extended_id=extractCanId(canId)[1], is_remote_frame=False, dlc=dlc, data=frame) yield msg
[docs]class ASCWriter(Listener): """Logs CAN data to an ASCII log file (.asc)""" LOG_STRING = "{time: 9.4f} {channel} {id:<15} Rx {dtype} {data}\n" EVENT_STRING = "{time: 9.4f} {message}\n" def __init__(self, filename, channel=1): now = datetime.now().strftime("%a %b %m %I:%M:%S %p %Y") self.channel = channel self.started = time.time() self.log_file = open(filename, "w") self.log_file.write("date %s\n" % now) self.log_file.write("base hex timestamps absolute\n") self.log_file.write("internal events logged\n") self.log_file.write("Begin Triggerblock %s\n" % now) self.log_event("Start of measurement")
[docs] def stop(self): """Stops logging and closes the file.""" if not self.log_file.closed: self.log_file.write("End TriggerBlock\n") self.log_file.close()
[docs] def log_event(self, message, timestamp=None): """Add an arbitrary message to the log file.""" timestamp = (timestamp or time.time()) if timestamp >= self.started: timestamp -= self.started line = self.EVENT_STRING.format(time=timestamp, message=message) if not self.log_file.closed: self.log_file.write(line)
def on_message_received(self, msg): if msg.is_error_frame: self.log_event("{} ErrorFrame".format(self.channel), msg.timestamp) return if msg.is_remote_frame: dtype = "r" data = [] else: dtype = "d {}".format(msg.dlc) data = ["{:02X}".format(byte) for byte in msg.data] arb_id = "{:X}".format(msg.arbitration_id) if msg.id_type: arb_id = arb_id + "x" timestamp = msg.timestamp if timestamp >= self.started: timestamp -= self.started channel = msg.channel if isinstance(msg.channel, int) else self.channel line = self.LOG_STRING.format(time=timestamp, channel=channel, id=arb_id, dtype=dtype, data=" ".join(data)) if not self.log_file.closed: self.log_file.write(line)