# coding: utf-8
"""
ICS NeoVi interface module.
python-ics is a Python wrapper around the API provided by Intrepid Control
Systems for communicating with their NeoVI range of devices.
Implementation references:
* https://github.com/intrepidcs/python_ics
"""
import logging
import os
import tempfile
from collections import deque
from can import Message, CanError, BusABC
logger = logging.getLogger(__name__)
try:
import ics
except ImportError as ie:
logger.warning(
"You won't be able to use the ICS NeoVi can backend without the "
"python-ics module installed!: %s",
ie,
)
ics = None
try:
from filelock import FileLock
except ImportError as ie:
logger.warning(
"Using ICS NeoVi can backend without the "
"filelock module installed may cause some issues!: %s",
ie,
)
class FileLock:
"""Dummy file lock that does not actually do anything"""
def __init__(self, lock_file, timeout=-1):
self._lock_file = lock_file
self.timeout = timeout
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return None
# Use inter-process mutex to prevent concurrent device open.
# When neoVI server is enabled, there is an issue with concurrent device open.
open_lock = FileLock(os.path.join(tempfile.gettempdir(), "neovi.lock"))
class ICSApiError(CanError):
"""
Indicates an error with the ICS API.
"""
# A critical error which affects operation or accuracy.
ICS_SPY_ERR_CRITICAL = 0x10
# An error which is not understood.
ICS_SPY_ERR_QUESTION = 0x20
# An important error which may be critical depending on the application
ICS_SPY_ERR_EXCLAMATION = 0x30
# An error which probably does not need attention.
ICS_SPY_ERR_INFORMATION = 0x40
def __init__(
self,
error_number,
description_short,
description_long,
severity,
restart_needed,
):
super().__init__(description_short)
self.error_number = error_number
self.description_short = description_short
self.description_long = description_long
self.severity = severity
self.restart_needed = restart_needed == 1
def __str__(self):
return "{} {}".format(self.description_short, self.description_long)
@property
def is_critical(self):
return self.severity == self.ICS_SPY_ERR_CRITICAL
[docs]class NeoViBus(BusABC):
"""
The CAN Bus implemented for the python_ics interface
https://github.com/intrepidcs/python_ics
"""
def __init__(self, channel, can_filters=None, **kwargs):
"""
:param channel:
The channel ids to create this bus with.
Can also be a single integer, netid name or a comma separated
string.
:type channel: int or str or list(int) or list(str)
:param list can_filters:
See :meth:`can.BusABC.set_filters` for details.
:param bool receive_own_messages:
If transmitted messages should also be received by this bus.
:param bool use_system_timestamp:
Use system timestamp for can messages instead of the hardware time
stamp
:param str serial:
Serial to connect (optional, will use the first found if not
supplied)
:param int bitrate:
Channel bitrate in bit/s. (optional, will enable the auto bitrate
feature if not supplied)
:param bool fd:
If CAN-FD frames should be supported.
:param int data_bitrate:
Which bitrate to use for data phase in CAN FD.
Defaults to arbitration bitrate.
:param override_library_name:
Absolute path or relative path to the library including filename.
"""
if ics is None:
raise ImportError("Please install python-ics")
super().__init__(channel=channel, can_filters=can_filters, **kwargs)
logger.info("CAN Filters: {}".format(can_filters))
logger.info("Got configuration of: {}".format(kwargs))
if "override_library_name" in kwargs:
ics.override_library_name(kwargs.get("override_library_name"))
if isinstance(channel, (list, tuple)):
self.channels = channel
elif isinstance(channel, int):
self.channels = [channel]
else:
# Assume comma separated string of channels
self.channels = [ch.strip() for ch in channel.split(",")]
self.channels = [NeoViBus.channel_to_netid(ch) for ch in self.channels]
type_filter = kwargs.get("type_filter")
serial = kwargs.get("serial")
self.dev = self._find_device(type_filter, serial)
with open_lock:
ics.open_device(self.dev)
if "bitrate" in kwargs:
for channel in self.channels:
ics.set_bit_rate(self.dev, kwargs.get("bitrate"), channel)
fd = kwargs.get("fd", False)
if fd:
if "data_bitrate" in kwargs:
for channel in self.channels:
ics.set_fd_bit_rate(self.dev, kwargs.get("data_bitrate"), channel)
self._use_system_timestamp = bool(kwargs.get("use_system_timestamp", False))
self._receive_own_messages = kwargs.get("receive_own_messages", True)
self.channel_info = "%s %s CH:%s" % (
self.dev.Name,
self.get_serial_number(self.dev),
self.channels,
)
logger.info("Using device: {}".format(self.channel_info))
self.rx_buffer = deque()
@staticmethod
def channel_to_netid(channel_name_or_id):
try:
channel = int(channel_name_or_id)
except ValueError:
netid = "NETID_{}".format(channel_name_or_id.upper())
if hasattr(ics, netid):
channel = getattr(ics, netid)
else:
raise ValueError(
"channel must be an integer or " "a valid ICS channel name"
)
return channel
[docs] @staticmethod
def get_serial_number(device):
"""Decode (if needed) and return the ICS device serial string
:param device: ics device
:return: ics device serial string
:rtype: str
"""
a0000 = 604661760
if device.SerialNumber >= a0000:
return ics.base36enc(device.SerialNumber)
return str(device.SerialNumber)
[docs] def shutdown(self):
super().shutdown()
ics.close_device(self.dev)
@staticmethod
def _detect_available_configs():
"""Detect all configurations/channels that this interface could
currently connect with.
:rtype: Iterator[dict]
:return: an iterable of dicts, each being a configuration suitable
for usage in the interface's bus constructor.
"""
if ics is None:
return []
try:
devices = ics.find_devices()
except Exception as e:
logger.debug("Failed to detect configs: %s", e)
return []
# TODO: add the channel(s)
return [
{"interface": "neovi", "serial": NeoViBus.get_serial_number(device)}
for device in devices
]
def _find_device(self, type_filter=None, serial=None):
if type_filter is not None:
devices = ics.find_devices(type_filter)
else:
devices = ics.find_devices()
for device in devices:
if serial is None or self.get_serial_number(device) == str(serial):
dev = device
break
else:
msg = ["No device"]
if type_filter is not None:
msg.append("with type {}".format(type_filter))
if serial is not None:
msg.append("with serial {}".format(serial))
msg.append("found.")
raise Exception(" ".join(msg))
return dev
def _process_msg_queue(self, timeout=0.1):
try:
messages, errors = ics.get_messages(self.dev, False, timeout)
except ics.RuntimeError:
return
for ics_msg in messages:
if ics_msg.NetworkID not in self.channels:
continue
is_tx = bool(ics_msg.StatusBitField & ics.SPY_STATUS_TX_MSG)
if not self._receive_own_messages and is_tx:
continue
self.rx_buffer.append(ics_msg)
if errors:
logger.warning("%d error(s) found", errors)
for msg in ics.get_error_messages(self.dev):
error = ICSApiError(*msg)
logger.warning(error)
def _get_timestamp_for_msg(self, ics_msg):
if self._use_system_timestamp:
# This is the system time stamp.
# TimeSystem is loaded with the value received from the timeGetTime
# call in the WIN32 multimedia API.
#
# The timeGetTime accuracy is up to 1 millisecond. See the WIN32
# API documentation for more information.
#
# This timestamp is useful for time comparing with other system
# events or data which is not synced with the neoVI timestamp.
#
# Currently, TimeSystem2 is not used.
return ics_msg.TimeSystem
else:
# This is the hardware time stamp.
return ics.get_timestamp_for_msg(self.dev, ics_msg)
def _ics_msg_to_message(self, ics_msg):
is_fd = ics_msg.Protocol == ics.SPY_PROTOCOL_CANFD
if is_fd:
if ics_msg.ExtraDataPtrEnabled:
data = ics_msg.ExtraDataPtr[: ics_msg.NumberBytesData]
else:
data = ics_msg.Data[: ics_msg.NumberBytesData]
return Message(
timestamp=self._get_timestamp_for_msg(ics_msg),
arbitration_id=ics_msg.ArbIDOrHeader,
data=data,
dlc=ics_msg.NumberBytesData,
is_extended_id=bool(ics_msg.StatusBitField & ics.SPY_STATUS_XTD_FRAME),
is_fd=is_fd,
is_remote_frame=bool(
ics_msg.StatusBitField & ics.SPY_STATUS_REMOTE_FRAME
),
error_state_indicator=bool(
ics_msg.StatusBitField3 & ics.SPY_STATUS3_CANFD_ESI
),
bitrate_switch=bool(
ics_msg.StatusBitField3 & ics.SPY_STATUS3_CANFD_BRS
),
channel=ics_msg.NetworkID,
)
else:
return Message(
timestamp=self._get_timestamp_for_msg(ics_msg),
arbitration_id=ics_msg.ArbIDOrHeader,
data=ics_msg.Data[: ics_msg.NumberBytesData],
dlc=ics_msg.NumberBytesData,
is_extended_id=bool(ics_msg.StatusBitField & ics.SPY_STATUS_XTD_FRAME),
is_fd=is_fd,
is_remote_frame=bool(
ics_msg.StatusBitField & ics.SPY_STATUS_REMOTE_FRAME
),
channel=ics_msg.NetworkID,
)
def _recv_internal(self, timeout=0.1):
if not self.rx_buffer:
self._process_msg_queue(timeout=timeout)
try:
ics_msg = self.rx_buffer.popleft()
msg = self._ics_msg_to_message(ics_msg)
except IndexError:
return None, False
return msg, False
[docs] def send(self, msg, timeout=None):
if not ics.validate_hobject(self.dev):
raise CanError("bus not open")
message = ics.SpyMessage()
flag0 = 0
if msg.is_extended_id:
flag0 |= ics.SPY_STATUS_XTD_FRAME
if msg.is_remote_frame:
flag0 |= ics.SPY_STATUS_REMOTE_FRAME
flag3 = 0
if msg.is_fd:
message.Protocol = ics.SPY_PROTOCOL_CANFD
if msg.bitrate_switch:
flag3 |= ics.SPY_STATUS3_CANFD_BRS
if msg.error_state_indicator:
flag3 |= ics.SPY_STATUS3_CANFD_ESI
message.ArbIDOrHeader = msg.arbitration_id
message.NumberBytesData = len(msg.data)
message.Data = tuple(msg.data[:8])
if msg.is_fd and len(msg.data) > 8:
message.ExtraDataPtrEnabled = 1
message.ExtraDataPtr = tuple(msg.data)
message.StatusBitField = flag0
message.StatusBitField2 = 0
message.StatusBitField3 = flag3
if msg.channel is not None:
message.NetworkID = msg.channel
elif len(self.channels) == 1:
message.NetworkID = self.channels[0]
else:
raise ValueError("msg.channel must be set when using multiple channels.")
try:
ics.transmit_messages(self.dev, message)
except ics.RuntimeError:
raise ICSApiError(*ics.get_last_api_error(self.dev))