Source code for can.interfaces.cantact

Interface for CANtact devices from Linklayer Labs

import time
import logging
from unittest.mock import Mock

from can import BusABC, Message
from ..exceptions import (

logger = logging.getLogger(__name__)

    import cantact
except ImportError:
    cantact = None
        "The CANtact module is not installed. Install it using `pip install cantact`"

[docs]class CantactBus(BusABC): """CANtact interface""" @staticmethod def _detect_available_configs(): try: interface = cantact.Interface() except (NameError, SystemError, AttributeError): logger.debug( "Could not import or instantiate cantact, so no configurations are available" ) return [] channels = [] for i in range(0, interface.channel_count()): channels.append({"interface": "cantact", "channel": f"ch:{i}"}) return channels def __init__( self, channel, bitrate=500000, poll_interval=0.01, monitor=False, bit_timing=None, _testing=False, **kwargs, ): """ :param int channel: Channel number (zero indexed, labeled on multi-channel devices) :param int bitrate: Bitrate in bits/s :param bool monitor: If true, operate in listen-only monitoring mode :param BitTiming bit_timing: Optional BitTiming to use for custom bit timing setting. Overrides bitrate if not None. """ if _testing: self.interface = MockInterface() else: if cantact is None: raise CanInterfaceNotImplementedError( "The CANtact module is not installed. Install it using `python -m pip install cantact`" ) with error_check( "Cannot create the cantact.Interface", CanInitializationError ): self.interface = cantact.Interface() = int(channel) self.channel_info = f"CANtact: ch:{channel}" # Configure the interface with error_check("Cannot setup the cantact.Interface", CanInitializationError): if bit_timing is None: # use bitrate self.interface.set_bitrate(int(channel), int(bitrate)) else: # use custom bit timing self.interface.set_bit_timing( int(channel), int(bit_timing.brp), int(bit_timing.tseg1), int(bit_timing.tseg2), int(bit_timing.sjw), ) self.interface.set_enabled(int(channel), True) self.interface.set_monitor(int(channel), monitor) self.interface.start() super().__init__( channel=channel, bitrate=bitrate, poll_interval=poll_interval, **kwargs ) def _recv_internal(self, timeout): with error_check("Cannot receive message"): frame = self.interface.recv(int(timeout * 1000)) if frame is None: # timeout occurred return None, False msg = Message( arbitration_id=frame["id"], is_extended_id=frame["extended"], timestamp=frame["timestamp"], is_remote_frame=frame["rtr"], dlc=frame["dlc"], data=frame["data"][: frame["dlc"]], channel=frame["channel"], is_rx=(not frame["loopback"]), # received if not loopback frame ) return msg, False
[docs] def send(self, msg, timeout=None): with error_check("Cannot send message"): self.interface.send(, msg.arbitration_id, bool(msg.is_extended_id), bool(msg.is_remote_frame), msg.dlc,, )
[docs] def shutdown(self): super().shutdown() with error_check("Cannot shutdown interface"): self.interface.stop()
def mock_recv(timeout): if timeout > 0: return { "id": 0x123, "extended": False, "timestamp": time.time(), "loopback": False, "rtr": False, "dlc": 8, "data": [1, 2, 3, 4, 5, 6, 7, 8], "channel": 0, } else: # simulate timeout when timeout = 0 return None class MockInterface: """ Mock interface to replace real interface when testing. This allows for tests to run without actual hardware. """ start = Mock() set_bitrate = Mock() set_bit_timing = Mock() set_enabled = Mock() set_monitor = Mock() stop = Mock() send = Mock() channel_count = Mock(return_value=1) recv = Mock(side_effect=mock_recv)