#!/usr/bin/env python
# coding: utf-8
"""
ICS NeoVi interface module.
python-ics is a Python wrapper around the API provided by Intrepid Control
Systems for communicating with their NeoVI range of devices.
Implementation references:
* https://github.com/intrepidcs/python_ics
"""
import logging
from collections import deque
from can import Message, CanError, BusABC
logger = logging.getLogger(__name__)
try:
import ics
except ImportError as ie:
logger.warning(
"You won't be able to use the ICS NeoVi can backend without the "
"python-ics module installed!: %s", ie
)
ics = None
class ICSApiError(CanError):
"""
Indicates an error with the ICS API.
"""
# A critical error which affects operation or accuracy.
ICS_SPY_ERR_CRITICAL = 0x10
# An error which is not understood.
ICS_SPY_ERR_QUESTION = 0x20
# An important error which may be critical depending on the application
ICS_SPY_ERR_EXCLAMATION = 0x30
# An error which probably does not need attention.
ICS_SPY_ERR_INFORMATION = 0x40
def __init__(
self, error_number, description_short, description_long,
severity, restart_needed
):
super(ICSApiError, self).__init__(description_short)
self.error_number = error_number
self.description_short = description_short
self.description_long = description_long
self.severity = severity
self.restart_needed = restart_needed == 1
def __str__(self):
return "{} {}".format(self.description_short, self.description_long)
@property
def is_critical(self):
return self.severity == self.ICS_SPY_ERR_CRITICAL
[docs]class NeoViBus(BusABC):
"""
The CAN Bus implemented for the python_ics interface
https://github.com/intrepidcs/python_ics
"""
def __init__(self, channel, can_filters=None, **config):
"""
:param int channel:
The Channel id to create this bus with.
:param list can_filters:
See :meth:`can.BusABC.set_filters` for details.
:param use_system_timestamp:
Use system timestamp for can messages instead of the hardware time
stamp
:param str serial:
Serial to connect (optional, will use the first found if not
supplied)
:param int bitrate:
Channel bitrate in bit/s. (optional, will enable the auto bitrate
feature if not supplied)
"""
if ics is None:
raise ImportError('Please install python-ics')
super(NeoViBus, self).__init__(
channel=channel, can_filters=can_filters, **config)
logger.info("CAN Filters: {}".format(can_filters))
logger.info("Got configuration of: {}".format(config))
self._use_system_timestamp = bool(
config.get('use_system_timestamp', False)
)
try:
channel = int(channel)
except ValueError:
raise ValueError('channel must be an integer')
type_filter = config.get('type_filter')
serial = config.get('serial')
self.dev = self._find_device(type_filter, serial)
ics.open_device(self.dev)
if 'bitrate' in config:
ics.set_bit_rate(self.dev, config.get('bitrate'), channel)
self.channel_info = '%s %s CH:%s' % (
self.dev.Name,
self.get_serial_number(self.dev),
channel
)
logger.info("Using device: {}".format(self.channel_info))
self.rx_buffer = deque()
self.network = channel if channel is not None else None
[docs] @staticmethod
def get_serial_number(device):
"""Decode (if needed) and return the ICS device serial string
:param device: ics device
:return: ics device serial string
:rtype: str
"""
a0000 = 604661760
if device.SerialNumber >= a0000:
return ics.base36enc(device.SerialNumber)
return str(device.SerialNumber)
[docs] def shutdown(self):
super(NeoViBus, self).shutdown()
ics.close_device(self.dev)
@staticmethod
def _detect_available_configs():
"""Detect all configurations/channels that this interface could
currently connect with.
:rtype: Iterator[dict]
:return: an iterable of dicts, each being a configuration suitable
for usage in the interface's bus constructor.
"""
if ics is None:
return []
# TODO: add the channel(s)
return [{
'serial': NeoViBus.get_serial_number(device)
} for device in ics.find_devices()]
def _find_device(self, type_filter=None, serial=None):
if type_filter is not None:
devices = ics.find_devices(type_filter)
else:
devices = ics.find_devices()
for device in devices:
if serial is None or self.get_serial_number(device) == str(serial):
dev = device
break
else:
msg = ['No device']
if type_filter is not None:
msg.append('with type {}'.format(type_filter))
if serial is not None:
msg.append('with serial {}'.format(serial))
msg.append('found.')
raise Exception(' '.join(msg))
return dev
def _process_msg_queue(self, timeout=0.1):
try:
messages, errors = ics.get_messages(self.dev, False, timeout)
except ics.RuntimeError:
return
for ics_msg in messages:
if ics_msg.NetworkID != self.network:
continue
self.rx_buffer.append(ics_msg)
if errors:
logger.warning("%d error(s) found" % errors)
for msg in ics.get_error_messages(self.dev):
error = ICSApiError(*msg)
logger.warning(error)
def _get_timestamp_for_msg(self, ics_msg):
if self._use_system_timestamp:
# This is the system time stamp.
# TimeSystem is loaded with the value received from the timeGetTime
# call in the WIN32 multimedia API.
#
# The timeGetTime accuracy is up to 1 millisecond. See the WIN32
# API documentation for more information.
#
# This timestamp is useful for time comparing with other system
# events or data which is not synced with the neoVI timestamp.
#
# Currently, TimeSystem2 is not used.
return ics_msg.TimeSystem
else:
# This is the hardware time stamp.
return ics.get_timestamp_for_msg(self.dev, ics_msg)
def _ics_msg_to_message(self, ics_msg):
return Message(
timestamp=self._get_timestamp_for_msg(ics_msg),
arbitration_id=ics_msg.ArbIDOrHeader,
data=ics_msg.Data[:ics_msg.NumberBytesData],
dlc=ics_msg.NumberBytesData,
extended_id=bool(
ics_msg.StatusBitField & ics.SPY_STATUS_XTD_FRAME
),
is_remote_frame=bool(
ics_msg.StatusBitField & ics.SPY_STATUS_REMOTE_FRAME
),
channel=ics_msg.NetworkID
)
def _recv_internal(self, timeout=0.1):
if not self.rx_buffer:
self._process_msg_queue(timeout=timeout)
try:
ics_msg = self.rx_buffer.popleft()
msg = self._ics_msg_to_message(ics_msg)
except IndexError:
return None, False
return msg, False
[docs] def send(self, msg, timeout=None):
if not self.dev.IsOpen:
raise CanError("bus not open")
flags = 0
if msg.is_extended_id:
flags |= ics.SPY_STATUS_XTD_FRAME
if msg.is_remote_frame:
flags |= ics.SPY_STATUS_REMOTE_FRAME
message = ics.SpyMessage()
message.ArbIDOrHeader = msg.arbitration_id
message.NumberBytesData = len(msg.data)
message.Data = tuple(msg.data)
message.StatusBitField = flags
message.StatusBitField2 = 0
message.NetworkID = self.network
try:
ics.transmit_messages(self.dev, message)
except ics.RuntimeError:
raise ICSApiError(*ics.get_last_api_error(self.dev))