Source code for can.interfaces.ics_neovi.neovi_bus

#!/usr/bin/env python
# coding: utf-8

ICS NeoVi interface module.

python-ics is a Python wrapper around the API provided by Intrepid Control 
Systems for communicating with their NeoVI range of devices.

Implementation references:

import logging
from collections import deque

from can import Message, CanError, BusABC

logger = logging.getLogger(__name__)

    import ics
except ImportError as ie:
        "You won't be able to use the ICS NeoVi can backend without the "
        "python-ics module installed!: %s", ie
    ics = None

class ICSApiError(CanError):
    Indicates an error with the ICS API.

    # A critical error which affects operation or accuracy.
    # An error which is not understood.
    # An important error which may be critical depending on the application
    # An error which probably does not need attention.

    def __init__(
            self, error_number, description_short, description_long,
            severity, restart_needed
        super(ICSApiError, self).__init__(description_short)
        self.error_number = error_number
        self.description_short = description_short
        self.description_long = description_long
        self.severity = severity
        self.restart_needed = restart_needed == 1

    def __str__(self):
        return "{} {}".format(self.description_short, self.description_long)

    def is_critical(self):
        return self.severity == self.ICS_SPY_ERR_CRITICAL

[docs]class NeoViBus(BusABC): """ The CAN Bus implemented for the python_ics interface """ def __init__(self, channel, can_filters=None, **config): """ :param int channel: The Channel id to create this bus with. :param list can_filters: See :meth:`can.BusABC.set_filters` for details. :param use_system_timestamp: Use system timestamp for can messages instead of the hardware time stamp :param str serial: Serial to connect (optional, will use the first found if not supplied) :param int bitrate: Channel bitrate in bit/s. (optional, will enable the auto bitrate feature if not supplied) """ if ics is None: raise ImportError('Please install python-ics') super(NeoViBus, self).__init__( channel=channel, can_filters=can_filters, **config)"CAN Filters: {}".format(can_filters))"Got configuration of: {}".format(config)) self._use_system_timestamp = bool( config.get('use_system_timestamp', False) ) try: channel = int(channel) except ValueError: raise ValueError('channel must be an integer') type_filter = config.get('type_filter') serial = config.get('serial') = self._find_device(type_filter, serial) ics.open_device( if 'bitrate' in config: ics.set_bit_rate(, config.get('bitrate'), channel) self.channel_info = '%s %s CH:%s' % (, self.get_serial_number(, channel )"Using device: {}".format(self.channel_info)) self.rx_buffer = deque() = channel if channel is not None else None
[docs] @staticmethod def get_serial_number(device): """Decode (if needed) and return the ICS device serial string :param device: ics device :return: ics device serial string :rtype: str """ a0000 = 604661760 if device.SerialNumber >= a0000: return ics.base36enc(device.SerialNumber) return str(device.SerialNumber)
[docs] def shutdown(self): super(NeoViBus, self).shutdown() ics.close_device(
@staticmethod def _detect_available_configs(): """Detect all configurations/channels that this interface could currently connect with. :rtype: Iterator[dict] :return: an iterable of dicts, each being a configuration suitable for usage in the interface's bus constructor. """ if ics is None: return [] # TODO: add the channel(s) return [{ 'serial': NeoViBus.get_serial_number(device) } for device in ics.find_devices()] def _find_device(self, type_filter=None, serial=None): if type_filter is not None: devices = ics.find_devices(type_filter) else: devices = ics.find_devices() for device in devices: if serial is None or self.get_serial_number(device) == str(serial): dev = device break else: msg = ['No device'] if type_filter is not None: msg.append('with type {}'.format(type_filter)) if serial is not None: msg.append('with serial {}'.format(serial)) msg.append('found.') raise Exception(' '.join(msg)) return dev def _process_msg_queue(self, timeout=0.1): try: messages, errors = ics.get_messages(, False, timeout) except ics.RuntimeError: return for ics_msg in messages: if ics_msg.NetworkID != continue self.rx_buffer.append(ics_msg) if errors: logger.warning("%d error(s) found" % errors) for msg in ics.get_error_messages( error = ICSApiError(*msg) logger.warning(error) def _get_timestamp_for_msg(self, ics_msg): if self._use_system_timestamp: # This is the system time stamp. # TimeSystem is loaded with the value received from the timeGetTime # call in the WIN32 multimedia API. # # The timeGetTime accuracy is up to 1 millisecond. See the WIN32 # API documentation for more information. # # This timestamp is useful for time comparing with other system # events or data which is not synced with the neoVI timestamp. # # Currently, TimeSystem2 is not used. return ics_msg.TimeSystem else: # This is the hardware time stamp. return ics.get_timestamp_for_msg(, ics_msg) def _ics_msg_to_message(self, ics_msg): return Message( timestamp=self._get_timestamp_for_msg(ics_msg), arbitration_id=ics_msg.ArbIDOrHeader, data=ics_msg.Data[:ics_msg.NumberBytesData], dlc=ics_msg.NumberBytesData, extended_id=bool( ics_msg.StatusBitField & ics.SPY_STATUS_XTD_FRAME ), is_remote_frame=bool( ics_msg.StatusBitField & ics.SPY_STATUS_REMOTE_FRAME ), channel=ics_msg.NetworkID ) def _recv_internal(self, timeout=0.1): if not self.rx_buffer: self._process_msg_queue(timeout=timeout) try: ics_msg = self.rx_buffer.popleft() msg = self._ics_msg_to_message(ics_msg) except IndexError: return None, False return msg, False
[docs] def send(self, msg, timeout=None): if not raise CanError("bus not open") flags = 0 if msg.is_extended_id: flags |= ics.SPY_STATUS_XTD_FRAME if msg.is_remote_frame: flags |= ics.SPY_STATUS_REMOTE_FRAME message = ics.SpyMessage() message.ArbIDOrHeader = msg.arbitration_id message.NumberBytesData = len( message.Data = tuple( message.StatusBitField = flags message.StatusBitField2 = 0 message.NetworkID = try: ics.transmit_messages(, message) except ics.RuntimeError: raise ICSApiError(*ics.get_last_api_error(