Source code for can.interfaces.nican

"""
NI-CAN interface module.

Implementation references:
* http://www.ni.com/pdf/manuals/370289c.pdf
* https://github.com/buendiya/NicanPython
"""
import ctypes
import logging
import sys

from can import CanError, BusABC, Message

logger = logging.getLogger(__name__)

NC_SUCCESS = 0
NC_ERR_TIMEOUT = 1
TIMEOUT_ERROR_CODE = -1074388991

NC_DURATION_INFINITE = 0xFFFFFFFF

NC_OP_START = 0x80000001
NC_OP_STOP = 0x80000002
NC_OP_RESET = 0x80000003

NC_FRMTYPE_REMOTE = 1
NC_FRMTYPE_COMM_ERR = 2

NC_ST_READ_AVAIL = 0x00000001
NC_ST_WRITE_SUCCESS = 0x00000002
NC_ST_ERROR = 0x00000010
NC_ST_WARNING = 0x00000020

NC_ATTR_BAUD_RATE = 0x80000007
NC_ATTR_START_ON_OPEN = 0x80000006
NC_ATTR_READ_Q_LEN = 0x80000013
NC_ATTR_WRITE_Q_LEN = 0x80000014
NC_ATTR_CAN_COMP_STD = 0x80010001
NC_ATTR_CAN_MASK_STD = 0x80010002
NC_ATTR_CAN_COMP_XTD = 0x80010003
NC_ATTR_CAN_MASK_XTD = 0x80010004
NC_ATTR_LOG_COMM_ERRS = 0x8001000A

NC_FL_CAN_ARBID_XTD = 0x20000000


CanData = ctypes.c_ubyte * 8

class RxMessageStruct(ctypes.Structure):
    _pack_ = 1
    _fields_ = [
        ("timestamp", ctypes.c_ulonglong),
        ("arb_id", ctypes.c_ulong),
        ("frame_type", ctypes.c_ubyte),
        ("dlc", ctypes.c_ubyte),
        ("data", CanData),
    ]

class TxMessageStruct(ctypes.Structure):
    _fields_ = [
        ("arb_id", ctypes.c_ulong),
        ("is_remote", ctypes.c_ubyte),
        ("dlc", ctypes.c_ubyte),
        ("data", CanData),
    ]


def check_status(result, function, arguments):
    if result > 0:
        logger.warning(get_error_message(result))
    elif result < 0:
        raise NicanError(function, result, arguments)
    return result


def get_error_message(status_code):
    """Convert status code to descriptive string."""
    errmsg = ctypes.create_string_buffer(1024)
    nican.ncStatusToString(status_code, len(errmsg), errmsg)
    return errmsg.value.decode("ascii")


if sys.platform == "win32":
    try:
        nican = ctypes.windll.LoadLibrary("nican")
    except Exception as e:
        nican = None
        logger.error("Failed to load NI-CAN driver: %s", e)
    else:
        nican.ncConfig.argtypes = [
            ctypes.c_char_p, ctypes.c_ulong, ctypes.c_void_p, ctypes.c_void_p]
        nican.ncConfig.errcheck = check_status
        nican.ncOpenObject.argtypes = [ctypes.c_char_p, ctypes.c_void_p]
        nican.ncOpenObject.errcheck = check_status
        nican.ncCloseObject.errcheck = check_status
        nican.ncAction.argtypes = [ctypes.c_ulong, ctypes.c_ulong, ctypes.c_ulong]
        nican.ncAction.errcheck = check_status
        nican.ncRead.errcheck = check_status
        nican.ncWrite.errcheck = check_status
        nican.ncWaitForState.argtypes = [
            ctypes.c_ulong, ctypes.c_ulong, ctypes.c_ulong, ctypes.c_void_p]
        nican.ncWaitForState.errcheck = check_status
        nican.ncStatusToString.argtypes = [
            ctypes.c_int, ctypes.c_uint, ctypes.c_char_p]
else:
    nican = None
    logger.warning("NI-CAN interface is only available on Windows systems")

[docs]class NicanBus(BusABC): """ The CAN Bus implemented for the NI-CAN interface. """ def __init__(self, channel, can_filters=None, bitrate=None, log_errors=True, **kwargs): """ :param str channel: Name of the object to open (e.g. 'CAN0') :param int bitrate: Bitrate in bits/s :param list can_filters: A list of dictionaries each containing a "can_id" and a "can_mask". >>> [{"can_id": 0x11, "can_mask": 0x21}] :param bool log_errors: If True, communication errors will appear as CAN messages with ``is_error_frame`` set to True and ``arbitration_id`` will identify the error (default True) :raises can.interfaces.nican.NicanError: If starting communication fails """ if nican is None: raise ImportError("The NI-CAN driver could not be loaded. " "Check that you are using 32-bit Python on Windows.") self.channel_info = "NI-CAN: " + channel if not isinstance(channel, bytes): channel = channel.encode() config = [ (NC_ATTR_START_ON_OPEN, True), (NC_ATTR_LOG_COMM_ERRS, log_errors) ] if not can_filters: logger.info("Filtering has been disabled") config.extend([ (NC_ATTR_CAN_COMP_STD, 0), (NC_ATTR_CAN_MASK_STD, 0), (NC_ATTR_CAN_COMP_XTD, 0), (NC_ATTR_CAN_MASK_XTD, 0) ]) else: for can_filter in can_filters: can_id = can_filter["can_id"] can_mask = can_filter["can_mask"] logger.info("Filtering on ID 0x%X, mask 0x%X", can_id, can_mask) if can_filter.get("extended"): config.extend([ (NC_ATTR_CAN_COMP_XTD, can_id | NC_FL_CAN_ARBID_XTD), (NC_ATTR_CAN_MASK_XTD, can_mask) ]) else: config.extend([ (NC_ATTR_CAN_COMP_STD, can_id), (NC_ATTR_CAN_MASK_STD, can_mask), ]) if bitrate: config.append((NC_ATTR_BAUD_RATE, bitrate)) AttrList = ctypes.c_ulong * len(config) attr_id_list = AttrList(*(row[0] for row in config)) attr_value_list = AttrList(*(row[1] for row in config)) nican.ncConfig(channel, len(config), ctypes.byref(attr_id_list), ctypes.byref(attr_value_list)) self.handle = ctypes.c_ulong() nican.ncOpenObject(channel, ctypes.byref(self.handle))
[docs] def recv(self, timeout=None): """ Read a message from NI-CAN. :param float timeout: Max time to wait in seconds or None if infinite :returns: The CAN message or None if timeout :rtype: can.Message :raises can.interfaces.nican.NicanError: If reception fails """ if timeout is None: timeout = NC_DURATION_INFINITE else: timeout = int(timeout * 1000) state = ctypes.c_ulong() try: nican.ncWaitForState( self.handle, NC_ST_READ_AVAIL, timeout, ctypes.byref(state)) except NicanError as e: if e.error_code == TIMEOUT_ERROR_CODE: return None else: raise raw_msg = RxMessageStruct() nican.ncRead(self.handle, ctypes.sizeof(raw_msg), ctypes.byref(raw_msg)) # http://stackoverflow.com/questions/6161776/convert-windows-filetime-to-second-in-unix-linux timestamp = raw_msg.timestamp / 10000000.0 - 11644473600 is_remote_frame = raw_msg.frame_type == NC_FRMTYPE_REMOTE is_error_frame = raw_msg.frame_type == NC_FRMTYPE_COMM_ERR is_extended = bool(raw_msg.arb_id & NC_FL_CAN_ARBID_XTD) arb_id = raw_msg.arb_id if not is_error_frame: arb_id &= 0x1FFFFFFF dlc = raw_msg.dlc msg = Message(timestamp=timestamp, is_remote_frame=is_remote_frame, is_error_frame=is_error_frame, extended_id=is_extended, arbitration_id=arb_id, dlc=dlc, data=raw_msg.data[:dlc]) return msg
[docs] def send(self, msg, timeout=None): """ Send a message to NI-CAN. :param can.Message msg: Message to send :raises can.interfaces.nican.NicanError: If writing to transmit buffer fails. It does not wait for message to be ACKed currently. """ arb_id = msg.arbitration_id if msg.id_type: arb_id |= NC_FL_CAN_ARBID_XTD raw_msg = TxMessageStruct(arb_id, bool(msg.is_remote_frame), msg.dlc, CanData(*msg.data)) nican.ncWrite( self.handle, ctypes.sizeof(raw_msg), ctypes.byref(raw_msg))
# ncWaitForState can not be called here if the recv() method is called # from a different thread, which is a very common use case. # Maybe it is possible to use ncCreateNotification instead but seems a # bit overkill at the moment. #state = ctypes.c_ulong() #nican.ncWaitForState( # self.handle, NC_ST_WRITE_SUCCESS, int(timeout * 1000), ctypes.byref(state))
[docs] def flush_tx_buffer(self): """ Resets the CAN chip which includes clearing receive and transmit queues. """ nican.ncAction(self.handle, NC_OP_RESET, 0)
[docs] def shutdown(self): """Close object.""" nican.ncCloseObject(self.handle)
[docs]class NicanError(CanError): """Error from NI-CAN driver.""" def __init__(self, function, error_code, arguments): super(NicanError, self).__init__() #: Status code self.error_code = error_code #: Function that failed self.function = function #: Arguments passed to function self.arguments = arguments def __str__(self): return "Function %s failed:\n%s" % ( self.function.__name__, get_error_message(self.error_code))