Source code for can.io.mf4

"""
Contains handling of MF4 logging files.

MF4 files represent Measurement Data Format (MDF) version 4 as specified by
the ASAM MDF standard (see https://www.asam.net/standards/detail/mdf/)
"""
import logging
from datetime import datetime
from hashlib import md5
from io import BufferedIOBase, BytesIO
from pathlib import Path
from typing import Any, BinaryIO, Generator, Optional, Union, cast

from ..message import Message
from ..typechecking import StringPathLike
from ..util import channel2int, dlc2len, len2dlc
from .generic import BinaryIOMessageReader, BinaryIOMessageWriter

logger = logging.getLogger("can.io.mf4")

try:
    import asammdf
    import numpy as np
    from asammdf import Signal
    from asammdf.blocks.mdf_v4 import MDF4
    from asammdf.blocks.v4_blocks import SourceInformation
    from asammdf.blocks.v4_constants import BUS_TYPE_CAN, SOURCE_BUS
    from asammdf.mdf import MDF

    STD_DTYPE = np.dtype(
        [
            ("CAN_DataFrame.BusChannel", "<u1"),
            ("CAN_DataFrame.ID", "<u4"),
            ("CAN_DataFrame.IDE", "<u1"),
            ("CAN_DataFrame.DLC", "<u1"),
            ("CAN_DataFrame.DataLength", "<u1"),
            ("CAN_DataFrame.DataBytes", "(64,)u1"),
            ("CAN_DataFrame.Dir", "<u1"),
            ("CAN_DataFrame.EDL", "<u1"),
            ("CAN_DataFrame.BRS", "<u1"),
            ("CAN_DataFrame.ESI", "<u1"),
        ]
    )

    ERR_DTYPE = np.dtype(
        [
            ("CAN_ErrorFrame.BusChannel", "<u1"),
            ("CAN_ErrorFrame.ID", "<u4"),
            ("CAN_ErrorFrame.IDE", "<u1"),
            ("CAN_ErrorFrame.DLC", "<u1"),
            ("CAN_ErrorFrame.DataLength", "<u1"),
            ("CAN_ErrorFrame.DataBytes", "(64,)u1"),
            ("CAN_ErrorFrame.Dir", "<u1"),
            ("CAN_ErrorFrame.EDL", "<u1"),
            ("CAN_ErrorFrame.BRS", "<u1"),
            ("CAN_ErrorFrame.ESI", "<u1"),
        ]
    )

    RTR_DTYPE = np.dtype(
        [
            ("CAN_RemoteFrame.BusChannel", "<u1"),
            ("CAN_RemoteFrame.ID", "<u4"),
            ("CAN_RemoteFrame.IDE", "<u1"),
            ("CAN_RemoteFrame.DLC", "<u1"),
            ("CAN_RemoteFrame.DataLength", "<u1"),
            ("CAN_RemoteFrame.Dir", "<u1"),
        ]
    )
except ImportError:
    asammdf = None  # type: ignore


CAN_MSG_EXT = 0x80000000
CAN_ID_MASK = 0x1FFFFFFF


[docs] class MF4Writer(BinaryIOMessageWriter): """Logs CAN data to an ASAM Measurement Data File v4 (.mf4). MF4Writer does not support append mode. If a message has a timestamp smaller than the previous one or None, it gets assigned the timestamp that was written for the last message. It the first message does not have a timestamp, it is set to zero. """ def __init__( self, file: Union[StringPathLike, BinaryIO], database: Optional[StringPathLike] = None, compression_level: int = 2, **kwargs: Any, ) -> None: """ :param file: A path-like object or as file-like object to write to. If this is a file-like object, is has to be opened in binary write mode, not text write mode. :param database: optional path to a DBC or ARXML file that contains message description. :param compression_level: compression option as integer (default 2) * 0 - no compression * 1 - deflate (slower, but produces smaller files) * 2 - transposition + deflate (slowest, but produces the smallest files) """ if asammdf is None: raise NotImplementedError( "The asammdf package was not found. Install python-can with " "the optional dependency [mf4] to use the MF4Writer." ) if kwargs.get("append", False): raise ValueError( f"{self.__class__.__name__} is currently not equipped to " f"append messages to an existing file." ) super().__init__(file, mode="w+b") now = datetime.now() self._mdf = cast(MDF4, MDF(version="4.10")) self._mdf.header.start_time = now self.last_timestamp = self._start_time = now.timestamp() self._compression_level = compression_level if database: database = Path(database).resolve() if database.exists(): data = database.read_bytes() attachment = data, database.name, md5(data).digest() else: attachment = None else: attachment = None acquisition_source = SourceInformation( source_type=SOURCE_BUS, bus_type=BUS_TYPE_CAN ) # standard frames group self._mdf.append( Signal( name="CAN_DataFrame", samples=np.array([], dtype=STD_DTYPE), timestamps=np.array([], dtype="<f8"), attachment=attachment, source=acquisition_source, ) ) # error frames group self._mdf.append( Signal( name="CAN_ErrorFrame", samples=np.array([], dtype=ERR_DTYPE), timestamps=np.array([], dtype="<f8"), attachment=attachment, source=acquisition_source, ) ) # remote frames group self._mdf.append( Signal( name="CAN_RemoteFrame", samples=np.array([], dtype=RTR_DTYPE), timestamps=np.array([], dtype="<f8"), attachment=attachment, source=acquisition_source, ) ) self._std_buffer = np.zeros(1, dtype=STD_DTYPE) self._err_buffer = np.zeros(1, dtype=ERR_DTYPE) self._rtr_buffer = np.zeros(1, dtype=RTR_DTYPE)
[docs] def file_size(self) -> int: """Return an estimate of the current file size in bytes.""" # TODO: find solution without accessing private attributes of asammdf return cast(int, self._mdf._tempfile.tell()) # pylint: disable=protected-access
[docs] def stop(self) -> None: self._mdf.save(self.file, compression=self._compression_level) self._mdf.close() super().stop()
[docs] def on_message_received(self, msg: Message) -> None: channel = channel2int(msg.channel) timestamp = msg.timestamp if timestamp is None: timestamp = self.last_timestamp else: self.last_timestamp = max(self.last_timestamp, timestamp) timestamp -= self._start_time if msg.is_remote_frame: if channel is not None: self._rtr_buffer["CAN_RemoteFrame.BusChannel"] = channel self._rtr_buffer["CAN_RemoteFrame.ID"] = msg.arbitration_id self._rtr_buffer["CAN_RemoteFrame.IDE"] = int(msg.is_extended_id) self._rtr_buffer["CAN_RemoteFrame.Dir"] = 0 if msg.is_rx else 1 self._rtr_buffer["CAN_RemoteFrame.DLC"] = msg.dlc sigs = [(np.array([timestamp]), None), (self._rtr_buffer, None)] self._mdf.extend(2, sigs) elif msg.is_error_frame: if channel is not None: self._err_buffer["CAN_ErrorFrame.BusChannel"] = channel self._err_buffer["CAN_ErrorFrame.ID"] = msg.arbitration_id self._err_buffer["CAN_ErrorFrame.IDE"] = int(msg.is_extended_id) self._err_buffer["CAN_ErrorFrame.Dir"] = 0 if msg.is_rx else 1 data = msg.data size = len(data) self._err_buffer["CAN_ErrorFrame.DataLength"] = size self._err_buffer["CAN_ErrorFrame.DataBytes"][0, :size] = data if msg.is_fd: self._err_buffer["CAN_ErrorFrame.DLC"] = len2dlc(msg.dlc) self._err_buffer["CAN_ErrorFrame.ESI"] = int(msg.error_state_indicator) self._err_buffer["CAN_ErrorFrame.BRS"] = int(msg.bitrate_switch) self._err_buffer["CAN_ErrorFrame.EDL"] = 1 else: self._err_buffer["CAN_ErrorFrame.DLC"] = msg.dlc self._err_buffer["CAN_ErrorFrame.ESI"] = 0 self._err_buffer["CAN_ErrorFrame.BRS"] = 0 self._err_buffer["CAN_ErrorFrame.EDL"] = 0 sigs = [(np.array([timestamp]), None), (self._err_buffer, None)] self._mdf.extend(1, sigs) else: if channel is not None: self._std_buffer["CAN_DataFrame.BusChannel"] = channel self._std_buffer["CAN_DataFrame.ID"] = msg.arbitration_id self._std_buffer["CAN_DataFrame.IDE"] = int(msg.is_extended_id) self._std_buffer["CAN_DataFrame.Dir"] = 0 if msg.is_rx else 1 data = msg.data size = len(data) self._std_buffer["CAN_DataFrame.DataLength"] = size self._std_buffer["CAN_DataFrame.DataBytes"][0, :size] = data if msg.is_fd: self._std_buffer["CAN_DataFrame.DLC"] = len2dlc(msg.dlc) self._std_buffer["CAN_DataFrame.ESI"] = int(msg.error_state_indicator) self._std_buffer["CAN_DataFrame.BRS"] = int(msg.bitrate_switch) self._std_buffer["CAN_DataFrame.EDL"] = 1 else: self._std_buffer["CAN_DataFrame.DLC"] = msg.dlc self._std_buffer["CAN_DataFrame.ESI"] = 0 self._std_buffer["CAN_DataFrame.BRS"] = 0 self._std_buffer["CAN_DataFrame.EDL"] = 0 sigs = [(np.array([timestamp]), None), (self._std_buffer, None)] self._mdf.extend(0, sigs) # reset buffer structure self._std_buffer = np.zeros(1, dtype=STD_DTYPE) self._err_buffer = np.zeros(1, dtype=ERR_DTYPE) self._rtr_buffer = np.zeros(1, dtype=RTR_DTYPE)
[docs] class MF4Reader(BinaryIOMessageReader): """ Iterator of CAN messages from a MF4 logging file. The MF4Reader only supports MF4 files that were recorded with python-can. """ def __init__( self, file: Union[StringPathLike, BinaryIO], **kwargs: Any, ) -> None: """ :param file: a path-like object or as file-like object to read from If this is a file-like object, is has to be opened in binary read mode, not text read mode. """ if asammdf is None: raise NotImplementedError( "The asammdf package was not found. Install python-can with " "the optional dependency [mf4] to use the MF4Reader." ) super().__init__(file, mode="rb") self._mdf: MDF4 if isinstance(file, BufferedIOBase): self._mdf = MDF(BytesIO(file.read())) else: self._mdf = MDF(file) self.start_timestamp = self._mdf.header.start_time.timestamp() masters = [self._mdf.get_master(i) for i in range(3)] masters = [ np.core.records.fromarrays((master, np.ones(len(master)) * i)) for i, master in enumerate(masters) ] self.masters = np.sort(np.concatenate(masters)) def __iter__(self) -> Generator[Message, None, None]: standard_counter = 0 error_counter = 0 rtr_counter = 0 for timestamp, group_index in self.masters: # standard frames if group_index == 0: sample = self._mdf.get( "CAN_DataFrame", group=group_index, raw=True, record_offset=standard_counter, record_count=1, ) try: channel = int(sample["CAN_DataFrame.BusChannel"][0]) except ValueError: channel = None if sample["CAN_DataFrame.EDL"] == 0: is_extended_id = bool(sample["CAN_DataFrame.IDE"][0]) arbitration_id = int(sample["CAN_DataFrame.ID"][0]) is_rx = int(sample["CAN_DataFrame.Dir"][0]) == 0 size = int(sample["CAN_DataFrame.DataLength"][0]) dlc = int(sample["CAN_DataFrame.DLC"][0]) data = sample["CAN_DataFrame.DataBytes"][0, :size].tobytes() msg = Message( timestamp=timestamp + self.start_timestamp, is_error_frame=False, is_remote_frame=False, is_fd=False, is_extended_id=is_extended_id, channel=channel, is_rx=is_rx, arbitration_id=arbitration_id, data=data, dlc=dlc, ) else: is_extended_id = bool(sample["CAN_DataFrame.IDE"][0]) arbitration_id = int(sample["CAN_DataFrame.ID"][0]) is_rx = int(sample["CAN_DataFrame.Dir"][0]) == 0 size = int(sample["CAN_DataFrame.DataLength"][0]) dlc = dlc2len(sample["CAN_DataFrame.DLC"][0]) data = sample["CAN_DataFrame.DataBytes"][0, :size].tobytes() error_state_indicator = bool(sample["CAN_DataFrame.ESI"][0]) bitrate_switch = bool(sample["CAN_DataFrame.BRS"][0]) msg = Message( timestamp=timestamp + self.start_timestamp, is_error_frame=False, is_remote_frame=False, is_fd=True, is_extended_id=is_extended_id, channel=channel, arbitration_id=arbitration_id, is_rx=is_rx, data=data, dlc=dlc, bitrate_switch=bitrate_switch, error_state_indicator=error_state_indicator, ) yield msg standard_counter += 1 # error frames elif group_index == 1: sample = self._mdf.get( "CAN_ErrorFrame", group=group_index, raw=True, record_offset=error_counter, record_count=1, ) try: channel = int(sample["CAN_ErrorFrame.BusChannel"][0]) except ValueError: channel = None if sample["CAN_ErrorFrame.EDL"] == 0: is_extended_id = bool(sample["CAN_ErrorFrame.IDE"][0]) arbitration_id = int(sample["CAN_ErrorFrame.ID"][0]) is_rx = int(sample["CAN_ErrorFrame.Dir"][0]) == 0 size = int(sample["CAN_ErrorFrame.DataLength"][0]) dlc = int(sample["CAN_ErrorFrame.DLC"][0]) data = sample["CAN_ErrorFrame.DataBytes"][0, :size].tobytes() msg = Message( timestamp=timestamp + self.start_timestamp, is_error_frame=True, is_remote_frame=False, is_fd=False, is_extended_id=is_extended_id, channel=channel, arbitration_id=arbitration_id, is_rx=is_rx, data=data, dlc=dlc, ) else: is_extended_id = bool(sample["CAN_ErrorFrame.IDE"][0]) arbitration_id = int(sample["CAN_ErrorFrame.ID"][0]) is_rx = int(sample["CAN_ErrorFrame.Dir"][0]) == 0 size = int(sample["CAN_ErrorFrame.DataLength"][0]) dlc = dlc2len(sample["CAN_ErrorFrame.DLC"][0]) data = sample["CAN_ErrorFrame.DataBytes"][0, :size].tobytes() error_state_indicator = bool(sample["CAN_ErrorFrame.ESI"][0]) bitrate_switch = bool(sample["CAN_ErrorFrame.BRS"][0]) msg = Message( timestamp=timestamp + self.start_timestamp, is_error_frame=True, is_remote_frame=False, is_fd=True, is_extended_id=is_extended_id, channel=channel, arbitration_id=arbitration_id, is_rx=is_rx, data=data, dlc=dlc, bitrate_switch=bitrate_switch, error_state_indicator=error_state_indicator, ) yield msg error_counter += 1 # remote frames else: sample = self._mdf.get( "CAN_RemoteFrame", group=group_index, raw=True, record_offset=rtr_counter, record_count=1, ) try: channel = int(sample["CAN_RemoteFrame.BusChannel"][0]) except ValueError: channel = None is_extended_id = bool(sample["CAN_RemoteFrame.IDE"][0]) arbitration_id = int(sample["CAN_RemoteFrame.ID"][0]) is_rx = int(sample["CAN_RemoteFrame.Dir"][0]) == 0 dlc = int(sample["CAN_RemoteFrame.DLC"][0]) msg = Message( timestamp=timestamp + self.start_timestamp, is_error_frame=False, is_remote_frame=True, is_fd=False, is_extended_id=is_extended_id, channel=channel, arbitration_id=arbitration_id, is_rx=is_rx, dlc=dlc, ) yield msg rtr_counter += 1 self.stop()
[docs] def stop(self) -> None: self._mdf.close() super().stop()