# coding: utf-8
from ctypes import *
import logging
import platform
from can import BusABC, Message
logger = logging.getLogger(__name__)
class VCI_INIT_CONFIG(Structure):
_fields_ = [
("AccCode", c_int32),
("AccMask", c_int32),
("Reserved", c_int32),
("Filter", c_ubyte),
("Timing0", c_ubyte),
("Timing1", c_ubyte),
("Mode", c_ubyte),
]
class VCI_CAN_OBJ(Structure):
_fields_ = [
("ID", c_uint),
("TimeStamp", c_int),
("TimeFlag", c_byte),
("SendType", c_byte),
("RemoteFlag", c_byte),
("ExternFlag", c_byte),
("DataLen", c_byte),
("Data", c_ubyte * 8),
("Reserved", c_byte * 3),
]
VCI_USBCAN2 = 4
STATUS_OK = 0x01
STATUS_ERR = 0x00
TIMING_DICT = {
5000: (0xBF, 0xFF),
10000: (0x31, 0x1C),
20000: (0x18, 0x1C),
33330: (0x09, 0x6F),
40000: (0x87, 0xFF),
50000: (0x09, 0x1C),
66660: (0x04, 0x6F),
80000: (0x83, 0xFF),
83330: (0x03, 0x6F),
100000: (0x04, 0x1C),
125000: (0x03, 0x1C),
200000: (0x81, 0xFA),
250000: (0x01, 0x1C),
400000: (0x80, 0xFA),
500000: (0x00, 0x1C),
666000: (0x80, 0xB6),
800000: (0x00, 0x16),
1000000: (0x00, 0x14),
}
try:
if platform.system() == "Windows":
CANalystII = WinDLL("./ControlCAN.dll")
else:
CANalystII = CDLL("./libcontrolcan.so")
logger.info("Loaded CANalystII library")
except OSError as e:
CANalystII = None
logger.info("Cannot load CANalystII library")
[docs]class CANalystIIBus(BusABC):
def __init__(
self,
channel,
device=0,
bitrate=None,
Timing0=None,
Timing1=None,
can_filters=None,
**kwargs,
):
"""
:param channel: channel number
:param device: device number
:param bitrate: CAN network bandwidth (bits/s)
:param Timing0: customize the timing register if bitrate is not specified
:param Timing1:
:param can_filters: filters for packet
"""
super().__init__(channel=channel, can_filters=can_filters, **kwargs)
if isinstance(channel, (list, tuple)):
self.channels = channel
elif isinstance(channel, int):
self.channels = [channel]
else:
# Assume comma separated string of channels
self.channels = [int(ch.strip()) for ch in channel.split(",")]
self.device = device
self.channel_info = "CANalyst-II: device {}, channels {}".format(
self.device, self.channels
)
if bitrate is not None:
try:
Timing0, Timing1 = TIMING_DICT[bitrate]
except KeyError:
raise ValueError("Bitrate is not supported")
if Timing0 is None or Timing1 is None:
raise ValueError("Timing registers are not set")
self.init_config = VCI_INIT_CONFIG(0, 0xFFFFFFFF, 0, 1, Timing0, Timing1, 0)
if CANalystII.VCI_OpenDevice(VCI_USBCAN2, self.device, 0) == STATUS_ERR:
logger.error("VCI_OpenDevice Error")
for channel in self.channels:
status = CANalystII.VCI_InitCAN(
VCI_USBCAN2, self.device, channel, byref(self.init_config)
)
if status == STATUS_ERR:
logger.error("VCI_InitCAN Error")
self.shutdown()
return
if CANalystII.VCI_StartCAN(VCI_USBCAN2, self.device, channel) == STATUS_ERR:
logger.error("VCI_StartCAN Error")
self.shutdown()
return
[docs] def send(self, msg, timeout=None):
"""
:param msg: message to send
:param timeout: timeout is not used here
:return:
"""
extern_flag = 1 if msg.is_extended_id else 0
raw_message = VCI_CAN_OBJ(
msg.arbitration_id,
0,
0,
1,
msg.is_remote_frame,
extern_flag,
msg.dlc,
(c_ubyte * 8)(*msg.data),
(c_byte * 3)(*[0, 0, 0]),
)
if msg.channel is not None:
channel = msg.channel
elif len(self.channels) == 1:
channel = self.channels[0]
else:
raise ValueError("msg.channel must be set when using multiple channels.")
CANalystII.VCI_Transmit(
VCI_USBCAN2, self.device, channel, byref(raw_message), 1
)
def _recv_internal(self, timeout=None):
"""
:param timeout: float in seconds
:return:
"""
raw_message = VCI_CAN_OBJ()
timeout = -1 if timeout is None else int(timeout * 1000)
status = CANalystII.VCI_Receive(
VCI_USBCAN2, self.device, self.channels[0], byref(raw_message), 1, timeout
)
if status <= STATUS_ERR:
return None, False
else:
return (
Message(
timestamp=raw_message.TimeStamp if raw_message.TimeFlag else 0.0,
arbitration_id=raw_message.ID,
is_remote_frame=raw_message.RemoteFlag,
channel=0,
dlc=raw_message.DataLen,
data=raw_message.Data,
),
False,
)
[docs] def flush_tx_buffer(self):
for channel in self.channels:
CANalystII.VCI_ClearBuffer(VCI_USBCAN2, self.device, channel)
[docs] def shutdown(self):
CANalystII.VCI_CloseDevice(VCI_USBCAN2, self.device)