Broadcast Manager

The broadcast manager isn’t yet supported by all interfaces. It allows the user to setup periodic message jobs.

This example shows the ctypes socketcan using the broadcast manager:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python3
"""
This example exercises the periodic sending capabilities.

Expects a vcan0 interface:

    python3 -m examples.cyclic

"""

import logging
import time

import can
logging.basicConfig(level=logging.INFO)


channel = 'vcan0'



def test_simple_periodic_send():
    print("Starting to send a message every 200ms. Initial data is zeros")
    msg = can.Message(arbitration_id=0x0cf02200, data=[0, 0, 0, 0, 0, 0])
    task = can.send_periodic('vcan0', msg, 0.20)
    time.sleep(2)
    task.stop()
    print("stopped cyclic send")


def test_periodic_send_with_modifying_data():
    print("Starting to send a message every 200ms. Initial data is ones")
    msg = can.Message(arbitration_id=0x0cf02200, data=[1, 1, 1, 1])
    task = can.send_periodic('vcan0', msg, 0.20)
    time.sleep(2)
    print("Changing data of running task to begin with 99")
    msg.data[0] = 0x99
    task.modify_data(msg)
    time.sleep(2)

    task.stop()
    print("stopped cyclic send")
    print("Changing data of stopped task to single ff byte")
    msg.data = bytearray([0xff])
    task.modify_data(msg)
    time.sleep(1)
    print("starting again")
    task.start()
    time.sleep(1)
    task.stop()
    print("done")


def test_dual_rate_periodic_send():
    """Send a message 10 times at 1ms intervals, then continue to send every 500ms"""
    msg = can.Message(arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5])
    print("Creating cyclic task to send message 10 times at 1ms, then every 500ms")
    task = can.interface.MultiRateCyclicSendTask('vcan0', msg, 10, 0.001, 0.50)
    time.sleep(2)

    print("Changing data[0] = 0x42")
    msg.data[0] = 0x42
    task.modify_data(msg)
    time.sleep(2)

    task.stop()
    print("stopped cyclic send")

    time.sleep(2)

    task.start()
    print("starting again")
    time.sleep(2)
    task.stop()
    print("done")


if __name__ == "__main__":

    for interface in {'socketcan_ctypes', 'socketcan_native'}:
        print("Carrying out cyclic tests with {} interface".format(interface))
        can.rc['interface'] = interface

        test_simple_periodic_send()

        test_periodic_send_with_modifying_data()

        print("Carrying out multirate cyclic test for {} interface".format(interface))
        can.rc['interface'] = interface
        test_dual_rate_periodic_send()

Functional API

can.send_periodic(channel, message, period)[source]

Send a message every period seconds on the given channel.

Class based API

class can.CyclicSendTaskABC(channel, message, period)[source]

Bases: can.broadcastmanager.CyclicTask

Parameters:
  • channel (str) – The name of the CAN channel to connect to.
  • message – The can.Message to be sent periodically.
  • period (float) – The rate in seconds at which to send the message.
modify_data(message)[source]

Update the contents of this periodically sent message without altering the timing.

Parameters:message – The Message with new Message.data. Note it must have the same arbitration_id.
stop()[source]

Send a TX_DELETE message to the broadcast manager to cancel this task.

This will delete the entry for the transmission of the CAN message specified.

class can.MultiRateCyclicSendTaskABC(channel, message, count, initial_period, subsequent_period)[source]

Bases: can.broadcastmanager.CyclicSendTaskABC

Exposes more of the full power of the TX_SETUP opcode.

Transmits a message count times at initial_period then continues to transmit message at subsequent_period.