Source code for can.interfaces.slcan

"""
Interface for slcan compatible interfaces (win32/linux).
(Linux could use slcand/socketcan also).
"""
from __future__ import absolute_import

import serial
import io
import time
import logging

from can import CanError, BusABC, Message


logger = logging.getLogger(__name__)

[docs]class slcanBus(BusABC): """slcan interface""" def write(self, str): if not str.endswith("\r"): str += "\r" self.serialPort.write(str.decode()) self.serialPort.flush() def open(self): self.write("O") def close(self): self.write("C") def __init__(self, channel, ttyBaudrate=115200, timeout=1, bitrate=None , **kwargs): """ :param string channel: port of underlying serial or usb device (e.g. /dev/ttyUSB0, COM8, ...) :param int ttyBaudrate: baudrate of underlying serial or usb device :param int bitrate: Bitrate in bits/s :param float poll_interval: Poll interval in seconds when reading messages :param float timeout timeout in seconds when reading message """ if channel == '': raise TypeError("Must specify a serial port.") if '@' in channel: (channel, ttyBaudrate) = channel.split('@') self.serialPortOrig = serial.Serial(channel, baudrate=ttyBaudrate, timeout=timeout) self.serialPort = io.TextIOWrapper(io.BufferedRWPair(self.serialPortOrig, self.serialPortOrig, 1), newline='\r', line_buffering=True) time.sleep(2) if bitrate is not None: self.close() if bitrate == 10000: self.write('S0') elif bitrate == 20000: self.write('S1') elif bitrate == 50000: self.write('S2') elif bitrate == 100000: self.write('S3') elif bitrate == 125000: self.write('S4') elif bitrate == 250000: self.write('S5') elif bitrate == 500000: self.write('S6') elif bitrate == 750000: self.write('S7') elif bitrate == 1000000: self.write('S8') elif bitrate == 83300: self.write('S9') else: raise ValueError("Invalid bitrate, choose one of 10000 20000 50000 100000 125000 250000 500000 750000 1000000 83300") self.open() super(slcanBus, self).__init__(channel, **kwargs) def recv(self, timeout=None): if timeout is not None: self.serialPortOrig.timeout = timeout canId = None remote = False frame = [] readStr = self.serialPort.readline() if readStr is None or len(readStr) == 0: return None else: if readStr[0] == 'T': # entended frame canId = int(readStr[1:9], 16) dlc = int(readStr[9]) extended = True for i in range(0, dlc): frame.append(int(readStr[10 + i * 2:12 + i * 2], 16)) elif readStr[0] == 't': # normal frame canId = int(readStr[1:4], 16) dlc = int(readStr[4]) for i in range(0, dlc): frame.append(int(readStr[5 + i * 2:7 + i * 2], 16)) extended = False elif readStr[0] == 'r': # remote frame canId = int(readStr[1:4], 16) remote = True elif readStr[0] == 'R': # remote extended frame canId = int(readStr[1:9], 16) extended = True remote = True if canId is not None: return Message(arbitration_id=canId, extended_id=extended, timestamp=time.time(), # Better than nothing... is_remote_frame=remote, dlc=dlc, data=frame) else: return None def send(self, msg, timeout=None): if msg.is_remote_frame: if msg.is_extended_id: sendStr = "R%08X0" % (msg.arbitration_id) else: sendStr = "r%03X0" % (msg.arbitration_id) else: if msg.is_extended_id: sendStr = "T%08X%d" % (msg.arbitration_id, msg.dlc) else: sendStr = "t%03X%d" % (msg.arbitration_id, msg.dlc) for i in range(0, msg.dlc): sendStr += "%02X" % msg.data[i] self.write(sendStr) def shutdown(self): self.close()