Source code for can.interfaces.ics_neovi.neovi_bus

ICS NeoVi interface module.

python-ics is a Python wrapper around the API provided by Intrepid Control 
Systems for communicating with their NeoVI range of devices.

Implementation references:

import logging
from collections import deque

from can import Message, CanError
from can.bus import BusABC

logger = logging.getLogger(__name__)

    import ics
except ImportError as ie:
        "You won't be able to use the ICS NeoVi can backend without the "
        "python-ics module installed!: %s", ie
    ics = None

class ICSApiError(CanError):
    # A critical error which affects operation or accuracy.
    # An error which is not understood.
    # An important error which may be critical depending on the application
    # An error which probably does not need attention.

    def __init__(
            self, error_number, description_short, description_long,
            severity, restart_needed
        super(ICSApiError, self).__init__(description_short)
        self.error_number = error_number
        self.description_short = description_short
        self.description_long = description_long
        self.severity = severity
        self.restart_needed = restart_needed == 1

    def __str__(self):
        return "{} {}".format(self.description_short, self.description_long)

    def is_critical(self):
        return self.severity == self.ICS_SPY_ERR_CRITICAL

[docs]class NeoViBus(BusABC): """ The CAN Bus implemented for the python_ics interface """ def __init__(self, channel=None, can_filters=None, **config): """ :param int channel: The Channel id to create this bus with. :param list can_filters: A list of dictionaries each containing a "can_id" and a "can_mask". :param use_system_timestamp: Use system timestamp for can messages instead of the hardware time stamp >>> [{"can_id": 0x11, "can_mask": 0x21}] """ super(NeoViBus, self).__init__(channel, can_filters, **config) if ics is None: raise ImportError('Please install python-ics')"CAN Filters: {}".format(can_filters))"Got configuration of: {}".format(config)) self._use_system_timestamp = bool( config.get('use_system_timestamp', False) ) # TODO: Add support for multiples channels try: channel = int(channel) except ValueError: raise ValueError('channel must be an integer') type_filter = config.get('type_filter') serial = config.get('serial') = self._open_device(type_filter, serial) self.channel_info = '%s %s CH:%s' % (, self.get_serial_number(, channel )"Using device: {}".format(self.channel_info)) ics.load_default_settings( self.sw_filters = None self.set_filters(can_filters) self.rx_buffer = deque() self.opened = True = int(channel) if channel is not None else None # TODO: Change the scaling based on the device type self.ts_scaling = ( ics.NEOVI6_VCAN_TIMESTAMP_1, ics.NEOVI6_VCAN_TIMESTAMP_2 )
[docs] @staticmethod def get_serial_number(device): """Decode (if needed) and return the ICS device serial string :param device: ics device :return: ics device serial string :rtype: str """ def to_base36(n, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"): return (to_base36(n // 36) + alphabet[n % 36]).lstrip("0") \ if n > 0 else "0" a0000 = 604661760 if device.SerialNumber >= a0000: return to_base36(device.SerialNumber) return str(device.SerialNumber)
def shutdown(self): super(NeoViBus, self).shutdown() self.opened = False ics.close_device( def _open_device(self, type_filter=None, serial=None): if type_filter is not None: devices = ics.find_devices(type_filter) else: devices = ics.find_devices() for device in devices: if serial is None or self.get_serial_number(device) == str(serial): dev = device break else: msg = ['No device'] if type_filter is not None: msg.append('with type {}'.format(type_filter)) if serial is not None: msg.append('with serial {}'.format(serial)) msg.append('found.') raise Exception(' '.join(msg)) ics.open_device(dev) return dev def _process_msg_queue(self, timeout=None): try: messages, errors = ics.get_messages(, False, timeout) except ics.RuntimeError: return for ics_msg in messages: if ics_msg.NetworkID != continue if not self._is_filter_match(ics_msg.ArbIDOrHeader): continue self.rx_buffer.append(ics_msg) if errors: logger.warning("%d error(s) found" % errors) for msg in ics.get_error_messages( error = ICSApiError(*msg) if error.is_critical: raise error logger.warning(error) def _is_filter_match(self, arb_id): """ If SW filtering is used, checks if the `arb_id` matches any of the filters setup. :param int arb_id: CAN ID to check against. :return: True if `arb_id` matches any filters (or if SW filtering is not used). """ if not self.sw_filters: # Filtering done on HW or driver level or no filtering return True for can_filter in self.sw_filters: if not (arb_id ^ can_filter['can_id']) & can_filter['can_mask']: return True return False def _get_timestamp_for_msg(self, ics_msg): if self._use_system_timestamp: # This is the system time stamp. # TimeSystem is loaded with the value received from the timeGetTime # call in the WIN32 multimedia API. # # The timeGetTime accuracy is up to 1 millisecond. See the WIN32 # API documentation for more information. # # This timestamp is useful for time comparing with other system # events or data which is not synced with the neoVI timestamp. # # Currently, TimeSystem2 is not used. return ics_msg.TimeSystem else: # This is the hardware time stamp. # The TimeStamp is reset to zero every time the OpenPort method is # called. return \ float(ics_msg.TimeHardware2) * self.ts_scaling[1] + \ float(ics_msg.TimeHardware) * self.ts_scaling[0] def _ics_msg_to_message(self, ics_msg): return Message( timestamp=self._get_timestamp_for_msg(ics_msg), arbitration_id=ics_msg.ArbIDOrHeader, data=ics_msg.Data[:ics_msg.NumberBytesData], dlc=ics_msg.NumberBytesData, extended_id=bool( ics_msg.StatusBitField & ics.SPY_STATUS_XTD_FRAME ), is_remote_frame=bool( ics_msg.StatusBitField & ics.SPY_STATUS_REMOTE_FRAME ), channel=ics_msg.NetworkID ) def recv(self, timeout=None): msg = None if not self.rx_buffer: self._process_msg_queue(timeout=timeout) try: ics_msg = self.rx_buffer.popleft() msg = self._ics_msg_to_message(ics_msg) except IndexError: pass return msg def send(self, msg, timeout=None): if not self.opened: return data = tuple( flags = 0 if msg.is_extended_id: flags |= ics.SPY_STATUS_XTD_FRAME if msg.is_remote_frame: flags |= ics.SPY_STATUS_REMOTE_FRAME message = ics.SpyMessage() message.ArbIDOrHeader = msg.arbitration_id message.NumberBytesData = len(data) message.Data = data message.StatusBitField = flags message.StatusBitField2 = 0 message.NetworkID = try: ics.transmit_messages(, message) except ics.RuntimeError: raise ICSApiError(*ics.get_last_api_error(
[docs] def set_filters(self, can_filters=None): """Apply filtering to all messages received by this Bus. Calling without passing any filters will reset the applied filters. :param list can_filters: A list of dictionaries each containing a "can_id" and a "can_mask". >>> [{"can_id": 0x11, "can_mask": 0x21}] A filter matches, when ``<received_can_id> & can_mask == can_id & can_mask`` """ self.sw_filters = can_filters or [] if not len(self.sw_filters):"Filtering has been disabled") else: for can_filter in can_filters: can_id = can_filter["can_id"] can_mask = can_filter["can_mask"] "Filtering on ID 0x%X, mask 0x%X", can_id, can_mask)