# -*- coding: utf-8 -*-
"""
Ctypes wrapper module for Vector CAN Interface on win32/win64 systems
Authors: Julien Grave <grave.jul@gmail.com>, Christian Sandberg
"""
# Import Standard Python Modules
# ==============================
import ctypes
import logging
import sys
import time
# Import Modules
# ==============
from can import BusABC, Message
from .exceptions import VectorError
# Define Module Logger
# ====================
LOG = logging.getLogger(__name__)
# Import safely Vector API module for Travis tests
vxlapi = None
try:
from . import vxlapi
except Exception as exc:
LOG.warning('Could not import vxlapi: %s', exc)
[docs]class VectorBus(BusABC):
"""The CAN Bus implemented for the Vector interface."""
def __init__(self, channel, can_filters=None, poll_interval=0.01,
bitrate=None, rx_queue_size=256, app_name="CANalyzer", **config):
"""
:param list channel:
The channel indexes to create this bus with.
Can also be a single integer or a comma separated string.
:param float poll_interval:
Poll interval in seconds.
:param int bitrate:
Bitrate in bits/s.
:param int rx_queue_size:
Number of messages in receive queue.
:param str app_name:
Name of application in Hardware Config.
"""
if vxlapi is None:
raise ImportError("The Vector API has not been loaded")
self.poll_interval = poll_interval
if isinstance(channel, (list, tuple)):
self.channels = channel
elif isinstance(channel, int):
self.channels = [channel]
else:
# Assume comma separated string of channels
self.channels = [int(ch.strip()) for ch in channel.split(',')]
self._app_name = app_name.encode()
self.channel_info = 'Application %s: %s' % (
app_name, ', '.join('CAN %d' % (ch + 1) for ch in self.channels))
vxlapi.xlOpenDriver()
self.port_handle = vxlapi.XLportHandle(vxlapi.XL_INVALID_PORTHANDLE)
self.mask = 0
# Get channels masks
for channel in self.channels:
hw_type = ctypes.c_uint(0)
hw_index = ctypes.c_uint(0)
hw_channel = ctypes.c_uint(0)
vxlapi.xlGetApplConfig(self._app_name, channel, hw_type, hw_index,
hw_channel, vxlapi.XL_BUS_TYPE_CAN)
LOG.debug('Channel index %d found', channel)
mask = vxlapi.xlGetChannelMask(hw_type.value, hw_index.value,
hw_channel.value)
LOG.debug('Channel %d, Type: %d, Mask: %d',
hw_channel.value, hw_type.value, mask)
self.mask |= mask
permission_mask = vxlapi.XLaccess()
# Set mask to request channel init permission if needed
if bitrate:
permission_mask.value = self.mask
vxlapi.xlOpenPort(self.port_handle, self._app_name, self.mask,
permission_mask, rx_queue_size,
vxlapi.XL_INTERFACE_VERSION, vxlapi.XL_BUS_TYPE_CAN)
LOG.debug(
'Open Port: PortHandle: %d, PermissionMask: 0x%X',
self.port_handle.value, permission_mask.value)
if bitrate:
if permission_mask.value != self.mask:
LOG.warning('Can not set bitrate since no init access')
vxlapi.xlCanSetChannelBitrate(self.port_handle, permission_mask, bitrate)
self.set_filters(can_filters)
try:
vxlapi.xlActivateChannel(self.port_handle, self.mask,
vxlapi.XL_BUS_TYPE_CAN, 0)
except VectorError:
self.shutdown()
raise
# Calculate time offset for absolute timestamps
offset = vxlapi.XLuint64()
vxlapi.xlGetSyncTime(self.port_handle, offset)
self._time_offset = time.time() - offset.value / 1000000000.0
super(VectorBus, self).__init__()
def set_filters(self, can_filters=None):
if can_filters:
# Only one filter per ID type allowed
if len(can_filters) == 1 or (
len(can_filters) == 2 and
can_filters[0].get("extended") != can_filters[1].get("extended")):
for can_filter in can_filters:
try:
vxlapi.xlCanSetChannelAcceptance(
self.port_handle, self.mask,
can_filter["can_id"], can_filter["can_mask"],
vxlapi.XL_CAN_EXT if can_filter.get("extended") else vxlapi.XL_CAN_STD)
except VectorError as exc:
LOG.warning("Could not set filters: %s", exc)
else:
LOG.warning("Only one filter per extended or standard ID allowed")
def recv(self, timeout=None):
end_time = time.time() + timeout if timeout is not None else None
event = vxlapi.XLevent(0)
while True:
event_count = ctypes.c_uint(1)
try:
vxlapi.xlReceive(self.port_handle, event_count, event)
except VectorError as exc:
if exc.error_code != vxlapi.XL_ERR_QUEUE_IS_EMPTY:
raise
else:
if event.tag == vxlapi.XL_RECEIVE_MSG:
msg_id = event.tagData.msg.id
dlc = event.tagData.msg.dlc
flags = event.tagData.msg.flags
timestamp = event.timeStamp / 1000000000.0
msg = Message(
timestamp=timestamp + self._time_offset,
arbitration_id=msg_id & 0x1FFFFFFF,
extended_id=bool(msg_id & vxlapi.XL_CAN_EXT_MSG_ID),
is_remote_frame=bool(flags & vxlapi.XL_CAN_MSG_FLAG_REMOTE_FRAME),
is_error_frame=bool(flags & vxlapi.XL_CAN_MSG_FLAG_ERROR_FRAME),
dlc=dlc,
data=event.tagData.msg.data[:dlc],
channel=event.chanIndex)
return msg
if end_time is not None and time.time() > end_time:
return None
time.sleep(self.poll_interval)
def send(self, msg, timeout=None):
message_count = ctypes.c_uint(1)
msg_id = msg.arbitration_id
if msg.id_type:
msg_id |= vxlapi.XL_CAN_EXT_MSG_ID
flags = 0
if msg.is_remote_frame:
flags |= vxlapi.XL_CAN_MSG_FLAG_REMOTE_FRAME
xl_event = vxlapi.XLevent()
xl_event.tag = vxlapi.XL_TRANSMIT_MSG
xl_event.tagData.msg.id = msg_id
xl_event.tagData.msg.dlc = msg.dlc
xl_event.tagData.msg.flags = flags
for idx, value in enumerate(msg.data):
xl_event.tagData.msg.data[idx] = value
vxlapi.xlCanTransmit(self.port_handle, self.mask, message_count, xl_event)
def flush_tx_buffer(self):
vxlapi.xlCanFlushTransmitQueue(self.port_handle, self.mask)
def shutdown(self):
vxlapi.xlDeactivateChannel(self.port_handle, self.mask)
vxlapi.xlClosePort(self.port_handle)
vxlapi.xlCloseDriver()
def reset(self):
vxlapi.xlDeactivateChannel(self.port_handle, self.mask)
vxlapi.xlActivateChannel(self.port_handle, self.mask,
vxlapi.XL_BUS_TYPE_CAN, 0)