Broadcast Manager#

The broadcast manager allows the user to setup periodic message jobs. For example sending a particular message at a given period. The broadcast manager supported natively by several interfaces and a software thread based scheduler is used as a fallback.

This example shows the socketcan backend using the broadcast manager:

  1#!/usr/bin/env python
  2
  3"""
  4This example exercises the periodic sending capabilities.
  5
  6Expects a vcan0 interface:
  7
  8    python3 -m examples.cyclic
  9
 10"""
 11
 12import logging
 13import time
 14
 15import can
 16
 17logging.basicConfig(level=logging.INFO)
 18
 19
 20def simple_periodic_send(bus):
 21    """
 22    Sends a message every 20ms with no explicit timeout
 23    Sleeps for 2 seconds then stops the task.
 24    """
 25    print("Starting to send a message every 200ms for 2s")
 26    msg = can.Message(
 27        arbitration_id=0x123, data=[1, 2, 3, 4, 5, 6], is_extended_id=False
 28    )
 29    task = bus.send_periodic(msg, 0.20)
 30    assert isinstance(task, can.CyclicSendTaskABC)
 31    time.sleep(2)
 32    task.stop()
 33    print("stopped cyclic send")
 34
 35
 36def limited_periodic_send(bus):
 37    """Send using LimitedDurationCyclicSendTaskABC."""
 38    print("Starting to send a message every 200ms for 1s")
 39    msg = can.Message(
 40        arbitration_id=0x12345678, data=[0, 0, 0, 0, 0, 0], is_extended_id=True
 41    )
 42    task = bus.send_periodic(msg, 0.20, 1, store_task=False)
 43    if not isinstance(task, can.LimitedDurationCyclicSendTaskABC):
 44        print("This interface doesn't seem to support LimitedDurationCyclicSendTaskABC")
 45        task.stop()
 46        return
 47
 48    time.sleep(2)
 49    print("Cyclic send should have stopped as duration expired")
 50    # Note the (finished) task will still be tracked by the Bus
 51    # unless we pass `store_task=False` to bus.send_periodic
 52    # alternatively calling stop removes the task from the bus
 53    # task.stop()
 54
 55
 56def test_periodic_send_with_modifying_data(bus):
 57    """Send using ModifiableCyclicTaskABC."""
 58    print("Starting to send a message every 200ms. Initial data is four consecutive 1s")
 59    msg = can.Message(arbitration_id=0x0CF02200, data=[1, 1, 1, 1])
 60    task = bus.send_periodic(msg, 0.20)
 61    if not isinstance(task, can.ModifiableCyclicTaskABC):
 62        print("This interface doesn't seem to support modification")
 63        task.stop()
 64        return
 65    time.sleep(2)
 66    print("Changing data of running task to begin with 99")
 67    msg.data[0] = 0x99
 68    task.modify_data(msg)
 69    time.sleep(2)
 70
 71    task.stop()
 72    print("stopped cyclic send")
 73    print("Changing data of stopped task to single ff byte")
 74    msg.data = bytearray([0xFF])
 75    msg.dlc = 1
 76    task.modify_data(msg)
 77    time.sleep(1)
 78    print("starting again")
 79    task.start()
 80    time.sleep(1)
 81    task.stop()
 82    print("done")
 83
 84
 85# Will have to consider how to expose items like this. The socketcan
 86# interfaces will continue to support it... but the top level api won't.
 87# def test_dual_rate_periodic_send():
 88#     """Send a message 10 times at 1ms intervals, then continue to send every 500ms"""
 89#     msg = can.Message(arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5])
 90#     print("Creating cyclic task to send message 10 times at 1ms, then every 500ms")
 91#     task = can.interface.MultiRateCyclicSendTask('vcan0', msg, 10, 0.001, 0.50)
 92#     time.sleep(2)
 93#
 94#     print("Changing data[0] = 0x42")
 95#     msg.data[0] = 0x42
 96#     task.modify_data(msg)
 97#     time.sleep(2)
 98#
 99#     task.stop()
100#     print("stopped cyclic send")
101#
102#     time.sleep(2)
103#
104#     task.start()
105#     print("starting again")
106#     time.sleep(2)
107#     task.stop()
108#     print("done")
109
110
111def main():
112    """Test different cyclic sending tasks."""
113    reset_msg = can.Message(
114        arbitration_id=0x00, data=[0, 0, 0, 0, 0, 0], is_extended_id=False
115    )
116
117    # this uses the default configuration (for example from environment variables, or a
118    # config file) see https://python-can.readthedocs.io/en/stable/configuration.html
119    with can.Bus() as bus:
120        bus.send(reset_msg)
121
122        simple_periodic_send(bus)
123
124        bus.send(reset_msg)
125
126        limited_periodic_send(bus)
127
128        test_periodic_send_with_modifying_data(bus)
129
130        # print("Carrying out multirate cyclic test for {} interface".format(interface))
131        # can.rc['interface'] = interface
132        # test_dual_rate_periodic_send()
133
134    time.sleep(2)
135
136
137if __name__ == "__main__":
138    main()

Message Sending Tasks#

The class based api for the broadcast manager uses a series of mixin classes. All mixins inherit from CyclicSendTaskABC which inherits from CyclicTask.

class can.broadcastmanager.CyclicTask[source]#

Abstract Base for all cyclic tasks.

abstract stop()[source]#

Cancel this periodic task.

Raises:

CanError – If stop is called on an already stopped task.

Return type:

None

class can.broadcastmanager.CyclicSendTaskABC(messages, period)[source]#

Message send task with defined period

Parameters:
  • messages (Sequence[Message] | Message) – The messages to be sent periodically.

  • period (float) – The rate in seconds at which to send the messages.

Raises:

ValueError – If the given messages are invalid

class can.broadcastmanager.LimitedDurationCyclicSendTaskABC(messages, period, duration)[source]#

Message send task with a defined duration and period.

Parameters:
  • messages (Sequence[Message] | Message) – The messages to be sent periodically.

  • period (float) – The rate in seconds at which to send the messages.

  • duration (float | None) – Approximate duration in seconds to continue sending messages. If no duration is provided, the task will continue indefinitely.

Raises:

ValueError – If the given messages are invalid

class can.broadcastmanager.MultiRateCyclicSendTaskABC(channel, messages, count, initial_period, subsequent_period)[source]#

A Cyclic send task that supports switches send frequency after a set time.

Transmits a message count times at initial_period then continues to transmit messages at subsequent_period.

Parameters:
Raises:

ValueError – If the given messages are invalid

class can.ModifiableCyclicTaskABC(messages, period)[source]#

Adds support for modifying a periodic message

Parameters:
  • messages (Sequence[Message] | Message) – The messages to be sent periodically.

  • period (float) – The rate in seconds at which to send the messages.

Raises:

ValueError – If the given messages are invalid

modify_data(messages)[source]#

Update the contents of the periodically sent messages, without altering the timing.

Parameters:

messages (Sequence[Message] | Message) –

The messages with the new Message.data.

Note: The arbitration ID cannot be changed.

Note: The number of new cyclic messages to be sent must be equal to the original number of messages originally specified for this task.

Raises:

ValueError – If the given messages are invalid

Return type:

None

class can.RestartableCyclicTaskABC(messages, period)[source]#

Adds support for restarting a stopped cyclic task

Parameters:
  • messages (Sequence[Message] | Message) – The messages to be sent periodically.

  • period (float) – The rate in seconds at which to send the messages.

Raises:

ValueError – If the given messages are invalid

abstract start()[source]#

Restart a stopped periodic task.

Return type:

None

class can.broadcastmanager.ThreadBasedCyclicSendTask(bus, lock, messages, period, duration=None, on_error=None)[source]#

Fallback cyclic send task using daemon thread.

Transmits messages with a period seconds for duration seconds on a bus.

The on_error is called if any error happens on bus while sending messages. If on_error present, and returns False when invoked, thread is stopped immediately, otherwise, thread continuously tries to send messages ignoring errors on a bus. Absence of on_error means that thread exits immediately on error.

Parameters:
  • on_error (Callable[[Exception], bool] | None) – The callable that accepts an exception if any error happened on a bus while sending messages, it shall return either True or False depending on desired behaviour of ThreadBasedCyclicSendTask.

  • bus (BusABC) –

  • lock (allocate_lock) –

  • messages (Sequence[Message] | Message) –

  • period (float) –

  • duration (float | None) –

Raises:

ValueError – If the given messages are invalid

start()[source]#

Restart a stopped periodic task.

Return type:

None

stop()[source]#

Cancel this periodic task.

Raises:

CanError – If stop is called on an already stopped task.

Return type:

None