Broadcast Manager¶
The broadcast manager isn’t yet supported by all interfaces. It allows the user to setup periodic message jobs.
This example shows the ctypes socketcan using the broadcast manager:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | #!/usr/bin/env python3
"""
This example exercises the periodic sending capabilities.
Expects a vcan0 interface:
python3 -m examples.cyclic
"""
import logging
import time
import can
logging.basicConfig(level=logging.INFO)
channel = 'vcan0'
def test_simple_periodic_send():
print("Starting to send a message every 200ms. Initial data is zeros")
msg = can.Message(arbitration_id=0x0cf02200, data=[0, 0, 0, 0, 0, 0])
task = can.send_periodic('vcan0', msg, 0.20)
time.sleep(2)
task.stop()
print("stopped cyclic send")
def test_periodic_send_with_modifying_data():
print("Starting to send a message every 200ms. Initial data is ones")
msg = can.Message(arbitration_id=0x0cf02200, data=[1, 1, 1, 1])
task = can.send_periodic('vcan0', msg, 0.20)
time.sleep(2)
print("Changing data of running task to begin with 99")
msg.data[0] = 0x99
task.modify_data(msg)
time.sleep(2)
task.stop()
print("stopped cyclic send")
print("Changing data of stopped task to single ff byte")
msg.data = bytearray([0xff])
task.modify_data(msg)
time.sleep(1)
print("starting again")
task.start()
time.sleep(1)
task.stop()
print("done")
def test_dual_rate_periodic_send():
"""Send a message 10 times at 1ms intervals, then continue to send every 500ms"""
msg = can.Message(arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5])
print("Creating cyclic task to send message 10 times at 1ms, then every 500ms")
task = can.interface.MultiRateCyclicSendTask('vcan0', msg, 10, 0.001, 0.50)
time.sleep(2)
print("Changing data[0] = 0x42")
msg.data[0] = 0x42
task.modify_data(msg)
time.sleep(2)
task.stop()
print("stopped cyclic send")
time.sleep(2)
task.start()
print("starting again")
time.sleep(2)
task.stop()
print("done")
if __name__ == "__main__":
for interface in {'socketcan_ctypes', 'socketcan_native'}:
print("Carrying out cyclic tests with {} interface".format(interface))
can.rc['interface'] = interface
test_simple_periodic_send()
test_periodic_send_with_modifying_data()
print("Carrying out multirate cyclic test for {} interface".format(interface))
can.rc['interface'] = interface
test_dual_rate_periodic_send()
|
Functional API¶
Class based API¶
-
class
can.
CyclicSendTaskABC
(channel, message, period)[source]¶ Bases:
can.broadcastmanager.CyclicTask
Parameters: - channel (str) – The name of the CAN channel to connect to.
- message – The
can.Message
to be sent periodically. - period (float) – The rate in seconds at which to send the message.
-
modify_data
(message)[source]¶ Update the contents of this periodically sent message without altering the timing.
Parameters: message – The Message
with newMessage.data
. Note it must have the samearbitration_id
.
-
class
can.
MultiRateCyclicSendTaskABC
(channel, message, count, initial_period, subsequent_period)[source]¶ Bases:
can.broadcastmanager.CyclicSendTaskABC
Exposes more of the full power of the TX_SETUP opcode.
Transmits a message count times at initial_period then continues to transmit message at subsequent_period.