Source code for can.interfaces.vector.canlib

# coding: utf-8

"""
Ctypes wrapper module for Vector CAN Interface on win32/win64 systems.

Authors: Julien Grave <grave.jul@gmail.com>, Christian Sandberg
"""

# Import Standard Python Modules
# ==============================
import ctypes
import logging
import time
import os

try:
    # Try builtin Python 3 Windows API
    from _winapi import WaitForSingleObject, INFINITE

    HAS_EVENTS = True
except ImportError:
    try:
        # Try pywin32 package
        from win32event import WaitForSingleObject, INFINITE

        HAS_EVENTS = True
    except ImportError:
        # Use polling instead
        HAS_EVENTS = False

# Import Modules
# ==============
from can import BusABC, Message
from can.util import len2dlc, dlc2len
from .exceptions import VectorError

# Define Module Logger
# ====================
LOG = logging.getLogger(__name__)

# Import Vector API module
# ========================
from . import xldefine, xlclass

# Import safely Vector API module for Travis tests
xldriver = None
try:
    from . import xldriver
except Exception as exc:
    LOG.warning("Could not import vxlapi: %s", exc)


[docs]class VectorBus(BusABC): """The CAN Bus implemented for the Vector interface.""" def __init__( self, channel, can_filters=None, poll_interval=0.01, receive_own_messages=False, bitrate=None, rx_queue_size=2 ** 14, app_name="CANalyzer", serial=None, fd=False, data_bitrate=None, sjwAbr=2, tseg1Abr=6, tseg2Abr=3, sjwDbr=2, tseg1Dbr=6, tseg2Dbr=3, **kwargs, ): """ :param list channel: The channel indexes to create this bus with. Can also be a single integer or a comma separated string. :param float poll_interval: Poll interval in seconds. :param int bitrate: Bitrate in bits/s. :param int rx_queue_size: Number of messages in receive queue (power of 2). CAN: range 16…32768 CAN-FD: range 8192…524288 :param str app_name: Name of application in Hardware Config. If set to None, the channel should be a global channel index. :param int serial: Serial number of the hardware to be used. If set, the channel parameter refers to the channels ONLY on the specified hardware. If set, the app_name is unused. :param bool fd: If CAN-FD frames should be supported. :param int data_bitrate: Which bitrate to use for data phase in CAN FD. Defaults to arbitration bitrate. """ if os.name != "nt" and not kwargs.get("_testing", False): raise OSError( f'The Vector interface is only supported on Windows, but you are running "{os.name}"' ) if xldriver is None: raise ImportError("The Vector API has not been loaded") self.poll_interval = poll_interval if isinstance(channel, (list, tuple)): self.channels = channel elif isinstance(channel, int): self.channels = [channel] else: # Assume comma separated string of channels self.channels = [int(ch.strip()) for ch in channel.split(",")] self._app_name = app_name.encode() if app_name is not None else "" self.channel_info = "Application %s: %s" % ( app_name, ", ".join("CAN %d" % (ch + 1) for ch in self.channels), ) if serial is not None: app_name = None channel_index = [] channel_configs = get_channel_configs() for channel_config in channel_configs: if channel_config.serialNumber == serial: if channel_config.hwChannel in self.channels: channel_index.append(channel_config.channelIndex) if channel_index: if len(channel_index) != len(self.channels): LOG.info( "At least one defined channel wasn't found on the specified hardware." ) self.channels = channel_index else: # Is there any better way to raise the error? raise Exception( "None of the configured channels could be found on the specified hardware." ) xldriver.xlOpenDriver() self.port_handle = xlclass.XLportHandle(xldefine.XL_INVALID_PORTHANDLE) self.mask = 0 self.fd = fd # Get channels masks self.channel_masks = {} self.index_to_channel = {} for channel in self.channels: if app_name: # Get global channel index from application channel hw_type = ctypes.c_uint(0) hw_index = ctypes.c_uint(0) hw_channel = ctypes.c_uint(0) xldriver.xlGetApplConfig( self._app_name, channel, hw_type, hw_index, hw_channel, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value, ) LOG.debug("Channel index %d found", channel) idx = xldriver.xlGetChannelIndex( hw_type.value, hw_index.value, hw_channel.value ) if idx < 0: # Undocumented behavior! See issue #353. # If hardware is unavailable, this function returns -1. # Raise an exception as if the driver # would have signalled XL_ERR_HW_NOT_PRESENT. raise VectorError( xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT.value, "XL_ERR_HW_NOT_PRESENT", "xlGetChannelIndex", ) else: # Channel already given as global channel idx = channel mask = 1 << idx self.channel_masks[channel] = mask self.index_to_channel[idx] = channel self.mask |= mask permission_mask = xlclass.XLaccess() # Set mask to request channel init permission if needed if bitrate or fd: permission_mask.value = self.mask if fd: xldriver.xlOpenPort( self.port_handle, self._app_name, self.mask, permission_mask, rx_queue_size, xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4.value, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value, ) else: xldriver.xlOpenPort( self.port_handle, self._app_name, self.mask, permission_mask, rx_queue_size, xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION.value, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value, ) LOG.debug( "Open Port: PortHandle: %d, PermissionMask: 0x%X", self.port_handle.value, permission_mask.value, ) if permission_mask.value == self.mask: if fd: self.canFdConf = xlclass.XLcanFdConf() if bitrate: self.canFdConf.arbitrationBitRate = ctypes.c_uint(bitrate) else: self.canFdConf.arbitrationBitRate = ctypes.c_uint(500000) self.canFdConf.sjwAbr = ctypes.c_uint(sjwAbr) self.canFdConf.tseg1Abr = ctypes.c_uint(tseg1Abr) self.canFdConf.tseg2Abr = ctypes.c_uint(tseg2Abr) if data_bitrate: self.canFdConf.dataBitRate = ctypes.c_uint(data_bitrate) else: self.canFdConf.dataBitRate = self.canFdConf.arbitrationBitRate self.canFdConf.sjwDbr = ctypes.c_uint(sjwDbr) self.canFdConf.tseg1Dbr = ctypes.c_uint(tseg1Dbr) self.canFdConf.tseg2Dbr = ctypes.c_uint(tseg2Dbr) xldriver.xlCanFdSetConfiguration( self.port_handle, self.mask, self.canFdConf ) LOG.info( "SetFdConfig.: ABaudr.=%u, DBaudr.=%u", self.canFdConf.arbitrationBitRate, self.canFdConf.dataBitRate, ) LOG.info( "SetFdConfig.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u", self.canFdConf.sjwAbr, self.canFdConf.tseg1Abr, self.canFdConf.tseg2Abr, ) LOG.info( "SetFdConfig.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u", self.canFdConf.sjwDbr, self.canFdConf.tseg1Dbr, self.canFdConf.tseg2Dbr, ) else: if bitrate: xldriver.xlCanSetChannelBitrate( self.port_handle, permission_mask, bitrate ) LOG.info("SetChannelBitrate: baudr.=%u", bitrate) else: LOG.info("No init access!") # Enable/disable TX receipts tx_receipts = 1 if receive_own_messages else 0 xldriver.xlCanSetChannelMode(self.port_handle, self.mask, tx_receipts, 0) if HAS_EVENTS: self.event_handle = xlclass.XLhandle() xldriver.xlSetNotification(self.port_handle, self.event_handle, 1) else: LOG.info("Install pywin32 to avoid polling") try: xldriver.xlActivateChannel( self.port_handle, self.mask, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value, 0, ) except VectorError: self.shutdown() raise # Calculate time offset for absolute timestamps offset = xlclass.XLuint64() xldriver.xlGetSyncTime(self.port_handle, offset) self._time_offset = time.time() - offset.value * 1e-9 self._is_filtered = False super().__init__(channel=channel, can_filters=can_filters, **kwargs) def _apply_filters(self, filters): if filters: # Only up to one filter per ID type allowed if len(filters) == 1 or ( len(filters) == 2 and filters[0].get("extended") != filters[1].get("extended") ): try: for can_filter in filters: xldriver.xlCanSetChannelAcceptance( self.port_handle, self.mask, can_filter["can_id"], can_filter["can_mask"], xldefine.XL_AcceptanceFilter.XL_CAN_EXT.value if can_filter.get("extended") else xldefine.XL_AcceptanceFilter.XL_CAN_STD.value, ) except VectorError as exc: LOG.warning("Could not set filters: %s", exc) # go to fallback else: self._is_filtered = True return else: LOG.warning("Only up to one filter per extended or standard ID allowed") # go to fallback # fallback: reset filters self._is_filtered = False try: xldriver.xlCanSetChannelAcceptance( self.port_handle, self.mask, 0x0, 0x0, xldefine.XL_AcceptanceFilter.XL_CAN_EXT.value, ) xldriver.xlCanSetChannelAcceptance( self.port_handle, self.mask, 0x0, 0x0, xldefine.XL_AcceptanceFilter.XL_CAN_STD.value, ) except VectorError as exc: LOG.warning("Could not reset filters: %s", exc) def _recv_internal(self, timeout): end_time = time.time() + timeout if timeout is not None else None if self.fd: event = xlclass.XLcanRxEvent() else: event = xlclass.XLevent() event_count = ctypes.c_uint() while True: if self.fd: try: xldriver.xlCanReceive(self.port_handle, event) except VectorError as exc: if exc.error_code != xldefine.XL_Status.XL_ERR_QUEUE_IS_EMPTY.value: raise else: if ( event.tag == xldefine.XL_CANFD_RX_EventTags.XL_CAN_EV_TAG_RX_OK.value or event.tag == xldefine.XL_CANFD_RX_EventTags.XL_CAN_EV_TAG_TX_OK.value ): msg_id = event.tagData.canRxOkMsg.canId dlc = dlc2len(event.tagData.canRxOkMsg.dlc) flags = event.tagData.canRxOkMsg.msgFlags timestamp = event.timeStamp * 1e-9 channel = self.index_to_channel.get(event.chanIndex) msg = Message( timestamp=timestamp + self._time_offset, arbitration_id=msg_id & 0x1FFFFFFF, is_extended_id=bool( msg_id & xldefine.XL_MessageFlagsExtended.XL_CAN_EXT_MSG_ID.value ), is_remote_frame=bool( flags & xldefine.XL_CANFD_RX_MessageFlags.XL_CAN_RXMSG_FLAG_RTR.value ), is_error_frame=bool( flags & xldefine.XL_CANFD_RX_MessageFlags.XL_CAN_RXMSG_FLAG_EF.value ), is_fd=bool( flags & xldefine.XL_CANFD_RX_MessageFlags.XL_CAN_RXMSG_FLAG_EDL.value ), error_state_indicator=bool( flags & xldefine.XL_CANFD_RX_MessageFlags.XL_CAN_RXMSG_FLAG_ESI.value ), bitrate_switch=bool( flags & xldefine.XL_CANFD_RX_MessageFlags.XL_CAN_RXMSG_FLAG_BRS.value ), dlc=dlc, data=event.tagData.canRxOkMsg.data[:dlc], channel=channel, ) return msg, self._is_filtered else: event_count.value = 1 try: xldriver.xlReceive(self.port_handle, event_count, event) except VectorError as exc: if exc.error_code != xldefine.XL_Status.XL_ERR_QUEUE_IS_EMPTY.value: raise else: if event.tag == xldefine.XL_EventTags.XL_RECEIVE_MSG.value: msg_id = event.tagData.msg.id dlc = event.tagData.msg.dlc flags = event.tagData.msg.flags timestamp = event.timeStamp * 1e-9 channel = self.index_to_channel.get(event.chanIndex) msg = Message( timestamp=timestamp + self._time_offset, arbitration_id=msg_id & 0x1FFFFFFF, is_extended_id=bool( msg_id & xldefine.XL_MessageFlagsExtended.XL_CAN_EXT_MSG_ID.value ), is_remote_frame=bool( flags & xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_REMOTE_FRAME.value ), is_error_frame=bool( flags & xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_ERROR_FRAME.value ), is_fd=False, dlc=dlc, data=event.tagData.msg.data[:dlc], channel=channel, ) return msg, self._is_filtered if end_time is not None and time.time() > end_time: return None, self._is_filtered if HAS_EVENTS: # Wait for receive event to occur if timeout is None: time_left_ms = INFINITE else: time_left = end_time - time.time() time_left_ms = max(0, int(time_left * 1000)) WaitForSingleObject(self.event_handle.value, time_left_ms) else: # Wait a short time until we try again time.sleep(self.poll_interval)
[docs] def send(self, msg, timeout=None): msg_id = msg.arbitration_id if msg.is_extended_id: msg_id |= xldefine.XL_MessageFlagsExtended.XL_CAN_EXT_MSG_ID.value flags = 0 # If channel has been specified, try to send only to that one. # Otherwise send to all channels mask = self.channel_masks.get(msg.channel, self.mask) if self.fd: if msg.is_fd: flags |= xldefine.XL_CANFD_TX_MessageFlags.XL_CAN_TXMSG_FLAG_EDL.value if msg.bitrate_switch: flags |= xldefine.XL_CANFD_TX_MessageFlags.XL_CAN_TXMSG_FLAG_BRS.value if msg.is_remote_frame: flags |= xldefine.XL_CANFD_TX_MessageFlags.XL_CAN_TXMSG_FLAG_RTR.value message_count = 1 MsgCntSent = ctypes.c_uint(1) XLcanTxEvent = xlclass.XLcanTxEvent() XLcanTxEvent.tag = xldefine.XL_CANFD_TX_EventTags.XL_CAN_EV_TAG_TX_MSG.value XLcanTxEvent.transId = 0xFFFF XLcanTxEvent.tagData.canMsg.canId = msg_id XLcanTxEvent.tagData.canMsg.msgFlags = flags XLcanTxEvent.tagData.canMsg.dlc = len2dlc(msg.dlc) for idx, value in enumerate(msg.data): XLcanTxEvent.tagData.canMsg.data[idx] = value xldriver.xlCanTransmitEx( self.port_handle, mask, message_count, MsgCntSent, XLcanTxEvent ) else: if msg.is_remote_frame: flags |= xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_REMOTE_FRAME.value message_count = ctypes.c_uint(1) xl_event = xlclass.XLevent() xl_event.tag = xldefine.XL_EventTags.XL_TRANSMIT_MSG.value xl_event.tagData.msg.id = msg_id xl_event.tagData.msg.dlc = msg.dlc xl_event.tagData.msg.flags = flags for idx, value in enumerate(msg.data): xl_event.tagData.msg.data[idx] = value xldriver.xlCanTransmit(self.port_handle, mask, message_count, xl_event)
[docs] def flush_tx_buffer(self): xldriver.xlCanFlushTransmitQueue(self.port_handle, self.mask)
[docs] def shutdown(self): xldriver.xlDeactivateChannel(self.port_handle, self.mask) xldriver.xlClosePort(self.port_handle) xldriver.xlCloseDriver()
def reset(self): xldriver.xlDeactivateChannel(self.port_handle, self.mask) xldriver.xlActivateChannel( self.port_handle, self.mask, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value, 0 ) @staticmethod def _detect_available_configs(): configs = [] channel_configs = get_channel_configs() LOG.info("Found %d channels", len(channel_configs)) for channel_config in channel_configs: if ( not channel_config.channelBusCapabilities & xldefine.XL_BusCapabilities.XL_BUS_ACTIVE_CAP_CAN.value ): continue LOG.info( "Channel index %d: %s", channel_config.channelIndex, channel_config.name.decode("ascii"), ) configs.append( { "interface": "vector", "app_name": None, "channel": channel_config.channelIndex, "supports_fd": bool( channel_config.channelBusCapabilities & xldefine.XL_ChannelCapabilities.XL_CHANNEL_FLAG_CANFD_ISO_SUPPORT.value ), } ) return configs
def get_channel_configs(): if xldriver is None: return [] driver_config = xlclass.XLdriverConfig() try: xldriver.xlOpenDriver() xldriver.xlGetDriverConfig(driver_config) xldriver.xlCloseDriver() except Exception: pass return [driver_config.channel[i] for i in range(driver_config.channelCount)]