Source code for can.interfaces.ics_neovi.neovi_bus

# coding: utf-8

"""
ICS NeoVi interface module.

python-ics is a Python wrapper around the API provided by Intrepid Control
Systems for communicating with their NeoVI range of devices.

Implementation references:
* https://github.com/intrepidcs/python_ics
"""

import logging
import os
import tempfile
from collections import deque

from can import Message, CanError, BusABC

logger = logging.getLogger(__name__)

try:
    import ics
except ImportError as ie:
    logger.warning(
        "You won't be able to use the ICS NeoVi can backend without the "
        "python-ics module installed!: %s",
        ie,
    )
    ics = None


try:
    from filelock import FileLock
except ImportError as ie:

    logger.warning(
        "Using ICS NeoVi can backend without the "
        "filelock module installed may cause some issues!: %s",
        ie,
    )

    class FileLock:
        """Dummy file lock that does not actually do anything"""

        def __init__(self, lock_file, timeout=-1):
            self._lock_file = lock_file
            self.timeout = timeout

        def __enter__(self):
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            return None


# Use inter-process mutex to prevent concurrent device open.
# When neoVI server is enabled, there is an issue with concurrent device open.
open_lock = FileLock(os.path.join(tempfile.gettempdir(), "neovi.lock"))


class ICSApiError(CanError):
    """
    Indicates an error with the ICS API.
    """

    # A critical error which affects operation or accuracy.
    ICS_SPY_ERR_CRITICAL = 0x10
    # An error which is not understood.
    ICS_SPY_ERR_QUESTION = 0x20
    # An important error which may be critical depending on the application
    ICS_SPY_ERR_EXCLAMATION = 0x30
    # An error which probably does not need attention.
    ICS_SPY_ERR_INFORMATION = 0x40

    def __init__(
        self,
        error_number,
        description_short,
        description_long,
        severity,
        restart_needed,
    ):
        super().__init__(description_short)
        self.error_number = error_number
        self.description_short = description_short
        self.description_long = description_long
        self.severity = severity
        self.restart_needed = restart_needed == 1

    def __str__(self):
        return "{} {}".format(self.description_short, self.description_long)

    @property
    def is_critical(self):
        return self.severity == self.ICS_SPY_ERR_CRITICAL


[docs]class NeoViBus(BusABC): """ The CAN Bus implemented for the python_ics interface https://github.com/intrepidcs/python_ics """ def __init__(self, channel, can_filters=None, **kwargs): """ :param channel: The channel ids to create this bus with. Can also be a single integer, netid name or a comma separated string. :type channel: int or str or list(int) or list(str) :param list can_filters: See :meth:`can.BusABC.set_filters` for details. :param bool receive_own_messages: If transmitted messages should also be received by this bus. :param bool use_system_timestamp: Use system timestamp for can messages instead of the hardware time stamp :param str serial: Serial to connect (optional, will use the first found if not supplied) :param int bitrate: Channel bitrate in bit/s. (optional, will enable the auto bitrate feature if not supplied) :param bool fd: If CAN-FD frames should be supported. :param int data_bitrate: Which bitrate to use for data phase in CAN FD. Defaults to arbitration bitrate. :param override_library_name: Absolute path or relative path to the library including filename. """ if ics is None: raise ImportError("Please install python-ics") super().__init__(channel=channel, can_filters=can_filters, **kwargs) logger.info("CAN Filters: {}".format(can_filters)) logger.info("Got configuration of: {}".format(kwargs)) if "override_library_name" in kwargs: ics.override_library_name(kwargs.get("override_library_name")) if isinstance(channel, (list, tuple)): self.channels = channel elif isinstance(channel, int): self.channels = [channel] else: # Assume comma separated string of channels self.channels = [ch.strip() for ch in channel.split(",")] self.channels = [NeoViBus.channel_to_netid(ch) for ch in self.channels] type_filter = kwargs.get("type_filter") serial = kwargs.get("serial") self.dev = self._find_device(type_filter, serial) with open_lock: ics.open_device(self.dev) if "bitrate" in kwargs: for channel in self.channels: ics.set_bit_rate(self.dev, kwargs.get("bitrate"), channel) fd = kwargs.get("fd", False) if fd: if "data_bitrate" in kwargs: for channel in self.channels: ics.set_fd_bit_rate(self.dev, kwargs.get("data_bitrate"), channel) self._use_system_timestamp = bool(kwargs.get("use_system_timestamp", False)) self._receive_own_messages = kwargs.get("receive_own_messages", True) self.channel_info = "%s %s CH:%s" % ( self.dev.Name, self.get_serial_number(self.dev), self.channels, ) logger.info("Using device: {}".format(self.channel_info)) self.rx_buffer = deque() @staticmethod def channel_to_netid(channel_name_or_id): try: channel = int(channel_name_or_id) except ValueError: netid = "NETID_{}".format(channel_name_or_id.upper()) if hasattr(ics, netid): channel = getattr(ics, netid) else: raise ValueError( "channel must be an integer or " "a valid ICS channel name" ) return channel
[docs] @staticmethod def get_serial_number(device): """Decode (if needed) and return the ICS device serial string :param device: ics device :return: ics device serial string :rtype: str """ a0000 = 604661760 if device.SerialNumber >= a0000: return ics.base36enc(device.SerialNumber) return str(device.SerialNumber)
[docs] def shutdown(self): super().shutdown() ics.close_device(self.dev)
@staticmethod def _detect_available_configs(): """Detect all configurations/channels that this interface could currently connect with. :rtype: Iterator[dict] :return: an iterable of dicts, each being a configuration suitable for usage in the interface's bus constructor. """ if ics is None: return [] try: devices = ics.find_devices() except Exception as e: logger.debug("Failed to detect configs: %s", e) return [] # TODO: add the channel(s) return [ {"interface": "neovi", "serial": NeoViBus.get_serial_number(device)} for device in devices ] def _find_device(self, type_filter=None, serial=None): if type_filter is not None: devices = ics.find_devices(type_filter) else: devices = ics.find_devices() for device in devices: if serial is None or self.get_serial_number(device) == str(serial): dev = device break else: msg = ["No device"] if type_filter is not None: msg.append("with type {}".format(type_filter)) if serial is not None: msg.append("with serial {}".format(serial)) msg.append("found.") raise Exception(" ".join(msg)) return dev def _process_msg_queue(self, timeout=0.1): try: messages, errors = ics.get_messages(self.dev, False, timeout) except ics.RuntimeError: return for ics_msg in messages: if ics_msg.NetworkID not in self.channels: continue is_tx = bool(ics_msg.StatusBitField & ics.SPY_STATUS_TX_MSG) if not self._receive_own_messages and is_tx: continue self.rx_buffer.append(ics_msg) if errors: logger.warning("%d error(s) found", errors) for msg in ics.get_error_messages(self.dev): error = ICSApiError(*msg) logger.warning(error) def _get_timestamp_for_msg(self, ics_msg): if self._use_system_timestamp: # This is the system time stamp. # TimeSystem is loaded with the value received from the timeGetTime # call in the WIN32 multimedia API. # # The timeGetTime accuracy is up to 1 millisecond. See the WIN32 # API documentation for more information. # # This timestamp is useful for time comparing with other system # events or data which is not synced with the neoVI timestamp. # # Currently, TimeSystem2 is not used. return ics_msg.TimeSystem else: # This is the hardware time stamp. return ics.get_timestamp_for_msg(self.dev, ics_msg) def _ics_msg_to_message(self, ics_msg): is_fd = ics_msg.Protocol == ics.SPY_PROTOCOL_CANFD if is_fd: if ics_msg.ExtraDataPtrEnabled: data = ics_msg.ExtraDataPtr[: ics_msg.NumberBytesData] else: data = ics_msg.Data[: ics_msg.NumberBytesData] return Message( timestamp=self._get_timestamp_for_msg(ics_msg), arbitration_id=ics_msg.ArbIDOrHeader, data=data, dlc=ics_msg.NumberBytesData, is_extended_id=bool(ics_msg.StatusBitField & ics.SPY_STATUS_XTD_FRAME), is_fd=is_fd, is_remote_frame=bool( ics_msg.StatusBitField & ics.SPY_STATUS_REMOTE_FRAME ), error_state_indicator=bool( ics_msg.StatusBitField3 & ics.SPY_STATUS3_CANFD_ESI ), bitrate_switch=bool( ics_msg.StatusBitField3 & ics.SPY_STATUS3_CANFD_BRS ), channel=ics_msg.NetworkID, ) else: return Message( timestamp=self._get_timestamp_for_msg(ics_msg), arbitration_id=ics_msg.ArbIDOrHeader, data=ics_msg.Data[: ics_msg.NumberBytesData], dlc=ics_msg.NumberBytesData, is_extended_id=bool(ics_msg.StatusBitField & ics.SPY_STATUS_XTD_FRAME), is_fd=is_fd, is_remote_frame=bool( ics_msg.StatusBitField & ics.SPY_STATUS_REMOTE_FRAME ), channel=ics_msg.NetworkID, ) def _recv_internal(self, timeout=0.1): if not self.rx_buffer: self._process_msg_queue(timeout=timeout) try: ics_msg = self.rx_buffer.popleft() msg = self._ics_msg_to_message(ics_msg) except IndexError: return None, False return msg, False
[docs] def send(self, msg, timeout=None): if not ics.validate_hobject(self.dev): raise CanError("bus not open") message = ics.SpyMessage() flag0 = 0 if msg.is_extended_id: flag0 |= ics.SPY_STATUS_XTD_FRAME if msg.is_remote_frame: flag0 |= ics.SPY_STATUS_REMOTE_FRAME flag3 = 0 if msg.is_fd: message.Protocol = ics.SPY_PROTOCOL_CANFD if msg.bitrate_switch: flag3 |= ics.SPY_STATUS3_CANFD_BRS if msg.error_state_indicator: flag3 |= ics.SPY_STATUS3_CANFD_ESI message.ArbIDOrHeader = msg.arbitration_id message.NumberBytesData = len(msg.data) message.Data = tuple(msg.data[:8]) if msg.is_fd and len(msg.data) > 8: message.ExtraDataPtrEnabled = 1 message.ExtraDataPtr = tuple(msg.data) message.StatusBitField = flag0 message.StatusBitField2 = 0 message.StatusBitField3 = flag3 if msg.channel is not None: message.NetworkID = msg.channel elif len(self.channels) == 1: message.NetworkID = self.channels[0] else: raise ValueError("msg.channel must be set when using multiple channels.") try: ics.transmit_messages(self.dev, message) except ics.RuntimeError: raise ICSApiError(*ics.get_last_api_error(self.dev))