Source code for can.interfaces.ics_neovi.neovi_bus

# coding: utf-8

"""
ICS NeoVi interface module.

python-ics is a Python wrapper around the API provided by Intrepid Control
Systems for communicating with their NeoVI range of devices.

Implementation references:
* https://github.com/intrepidcs/python_ics
"""

import logging
from collections import deque

from can import Message, CanError, BusABC

logger = logging.getLogger(__name__)

try:
    import ics
except ImportError as ie:
    logger.warning(
        "You won't be able to use the ICS NeoVi can backend without the "
        "python-ics module installed!: %s", ie
    )
    ics = None


class ICSApiError(CanError):
    """
    Indicates an error with the ICS API.
    """

    # A critical error which affects operation or accuracy.
    ICS_SPY_ERR_CRITICAL = 0x10
    # An error which is not understood.
    ICS_SPY_ERR_QUESTION = 0x20
    # An important error which may be critical depending on the application
    ICS_SPY_ERR_EXCLAMATION = 0x30
    # An error which probably does not need attention.
    ICS_SPY_ERR_INFORMATION = 0x40

    def __init__(
            self, error_number, description_short, description_long,
            severity, restart_needed
    ):
        super(ICSApiError, self).__init__(description_short)
        self.error_number = error_number
        self.description_short = description_short
        self.description_long = description_long
        self.severity = severity
        self.restart_needed = restart_needed == 1

    def __str__(self):
        return "{} {}".format(self.description_short, self.description_long)

    @property
    def is_critical(self):
        return self.severity == self.ICS_SPY_ERR_CRITICAL


[docs]class NeoViBus(BusABC): """ The CAN Bus implemented for the python_ics interface https://github.com/intrepidcs/python_ics """ def __init__(self, channel, can_filters=None, **kwargs): """ :param channel: The channel ids to create this bus with. Can also be a single integer, netid name or a comma separated string. :type channel: int or str or list(int) or list(str) :param list can_filters: See :meth:`can.BusABC.set_filters` for details. :param bool receive_own_messages: If transmitted messages should also be received by this bus. :param bool use_system_timestamp: Use system timestamp for can messages instead of the hardware time stamp :param str serial: Serial to connect (optional, will use the first found if not supplied) :param int bitrate: Channel bitrate in bit/s. (optional, will enable the auto bitrate feature if not supplied) :param bool fd: If CAN-FD frames should be supported. :param int data_bitrate: Which bitrate to use for data phase in CAN FD. Defaults to arbitration bitrate. :param override_library_name: Absolute path or relative path to the library including filename. """ if ics is None: raise ImportError('Please install python-ics') super(NeoViBus, self).__init__( channel=channel, can_filters=can_filters, **kwargs) logger.info("CAN Filters: {}".format(can_filters)) logger.info("Got configuration of: {}".format(kwargs)) if 'override_library_name' in kwargs: ics.override_library_name(kwargs.get('override_library_name')) if isinstance(channel, (list, tuple)): self.channels = channel elif isinstance(channel, int): self.channels = [channel] else: # Assume comma separated string of channels self.channels = [ch.strip() for ch in channel.split(',')] self.channels = [NeoViBus.channel_to_netid(ch) for ch in self.channels] type_filter = kwargs.get('type_filter') serial = kwargs.get('serial') self.dev = self._find_device(type_filter, serial) ics.open_device(self.dev) if 'bitrate' in kwargs: for channel in self.channels: ics.set_bit_rate(self.dev, kwargs.get('bitrate'), channel) fd = kwargs.get('fd', False) if fd: if 'data_bitrate' in kwargs: for channel in self.channels: ics.set_fd_bit_rate( self.dev, kwargs.get('data_bitrate'), channel) self._use_system_timestamp = bool( kwargs.get('use_system_timestamp', False) ) self._receive_own_messages = kwargs.get('receive_own_messages', True) self.channel_info = '%s %s CH:%s' % ( self.dev.Name, self.get_serial_number(self.dev), self.channels ) logger.info("Using device: {}".format(self.channel_info)) self.rx_buffer = deque() @staticmethod def channel_to_netid(channel_name_or_id): try: channel = int(channel_name_or_id) except ValueError: netid = "NETID_{}".format(channel_name_or_id.upper()) if hasattr(ics, netid): channel = getattr(ics, netid) else: raise ValueError( 'channel must be an integer or ' 'a valid ICS channel name' ) return channel
[docs] @staticmethod def get_serial_number(device): """Decode (if needed) and return the ICS device serial string :param device: ics device :return: ics device serial string :rtype: str """ a0000 = 604661760 if device.SerialNumber >= a0000: return ics.base36enc(device.SerialNumber) return str(device.SerialNumber)
[docs] def shutdown(self): super(NeoViBus, self).shutdown() ics.close_device(self.dev)
@staticmethod def _detect_available_configs(): """Detect all configurations/channels that this interface could currently connect with. :rtype: Iterator[dict] :return: an iterable of dicts, each being a configuration suitable for usage in the interface's bus constructor. """ if ics is None: return [] try: devices = ics.find_devices() except Exception as e: logger.debug("Failed to detect configs: %s", e) return [] # TODO: add the channel(s) return [{ 'interface': 'neovi', 'serial': NeoViBus.get_serial_number(device) } for device in devices] def _find_device(self, type_filter=None, serial=None): if type_filter is not None: devices = ics.find_devices(type_filter) else: devices = ics.find_devices() for device in devices: if serial is None or self.get_serial_number(device) == str(serial): dev = device break else: msg = ['No device'] if type_filter is not None: msg.append('with type {}'.format(type_filter)) if serial is not None: msg.append('with serial {}'.format(serial)) msg.append('found.') raise Exception(' '.join(msg)) return dev def _process_msg_queue(self, timeout=0.1): try: messages, errors = ics.get_messages(self.dev, False, timeout) except ics.RuntimeError: return for ics_msg in messages: if ics_msg.NetworkID not in self.channels: continue is_tx = bool(ics_msg.StatusBitField & ics.SPY_STATUS_TX_MSG) if not self._receive_own_messages and is_tx: continue self.rx_buffer.append(ics_msg) if errors: logger.warning("%d error(s) found" % errors) for msg in ics.get_error_messages(self.dev): error = ICSApiError(*msg) logger.warning(error) def _get_timestamp_for_msg(self, ics_msg): if self._use_system_timestamp: # This is the system time stamp. # TimeSystem is loaded with the value received from the timeGetTime # call in the WIN32 multimedia API. # # The timeGetTime accuracy is up to 1 millisecond. See the WIN32 # API documentation for more information. # # This timestamp is useful for time comparing with other system # events or data which is not synced with the neoVI timestamp. # # Currently, TimeSystem2 is not used. return ics_msg.TimeSystem else: # This is the hardware time stamp. return ics.get_timestamp_for_msg(self.dev, ics_msg) def _ics_msg_to_message(self, ics_msg): is_fd = ics_msg.Protocol == ics.SPY_PROTOCOL_CANFD if is_fd: if ics_msg.ExtraDataPtrEnabled: data = ics_msg.ExtraDataPtr[:ics_msg.NumberBytesData] else: data = ics_msg.Data[:ics_msg.NumberBytesData] return Message( timestamp=self._get_timestamp_for_msg(ics_msg), arbitration_id=ics_msg.ArbIDOrHeader, data=data, dlc=ics_msg.NumberBytesData, is_extended_id=bool( ics_msg.StatusBitField & ics.SPY_STATUS_XTD_FRAME ), is_fd=is_fd, is_remote_frame=bool( ics_msg.StatusBitField & ics.SPY_STATUS_REMOTE_FRAME ), error_state_indicator=bool( ics_msg.StatusBitField3 & ics.SPY_STATUS3_CANFD_ESI ), bitrate_switch=bool( ics_msg.StatusBitField3 & ics.SPY_STATUS3_CANFD_BRS ), channel=ics_msg.NetworkID ) else: return Message( timestamp=self._get_timestamp_for_msg(ics_msg), arbitration_id=ics_msg.ArbIDOrHeader, data=ics_msg.Data[:ics_msg.NumberBytesData], dlc=ics_msg.NumberBytesData, is_extended_id=bool( ics_msg.StatusBitField & ics.SPY_STATUS_XTD_FRAME ), is_fd=is_fd, is_remote_frame=bool( ics_msg.StatusBitField & ics.SPY_STATUS_REMOTE_FRAME ), channel=ics_msg.NetworkID ) def _recv_internal(self, timeout=0.1): if not self.rx_buffer: self._process_msg_queue(timeout=timeout) try: ics_msg = self.rx_buffer.popleft() msg = self._ics_msg_to_message(ics_msg) except IndexError: return None, False return msg, False
[docs] def send(self, msg, timeout=None): if not ics.validate_hobject(self.dev): raise CanError("bus not open") message = ics.SpyMessage() flag0 = 0 if msg.is_extended_id: flag0 |= ics.SPY_STATUS_XTD_FRAME if msg.is_remote_frame: flag0 |= ics.SPY_STATUS_REMOTE_FRAME flag3 = 0 if msg.is_fd: message.Protocol = ics.SPY_PROTOCOL_CANFD if msg.bitrate_switch: flag3 |= ics.SPY_STATUS3_CANFD_BRS if msg.error_state_indicator: flag3 |= ics.SPY_STATUS3_CANFD_ESI message.ArbIDOrHeader = msg.arbitration_id msg_data = msg.data message.NumberBytesData = len(msg_data) message.Data = tuple(msg_data[:8]) if msg.is_fd and len(msg_data) > 8: message.ExtraDataPtrEnabled = 1 message.ExtraDataPtr = tuple(msg_data) message.StatusBitField = flag0 message.StatusBitField2 = 0 message.StatusBitField3 = flag3 if msg.channel is not None: message.NetworkID = msg.channel elif len(self.channels) == 1: message.NetworkID = self.channels[0] else: raise ValueError( "msg.channel must be set when using multiple channels.") try: ics.transmit_messages(self.dev, message) except ics.RuntimeError: raise ICSApiError(*ics.get_last_api_error(self.dev))