Broadcast Manager

The broadcast manager allows the user to setup periodic message jobs. For example sending a particular message at a given period. The broadcast manager supported natively by several interfaces and a software thread based scheduler is used as a fallback.

This example shows the socketcan backend using the broadcast manager:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python

"""
This example exercises the periodic sending capabilities.

Expects a vcan0 interface:

    python3 -m examples.cyclic

"""

import logging
import time

import can

logging.basicConfig(level=logging.INFO)


def simple_periodic_send(bus):
    """
    Sends a message every 20ms with no explicit timeout
    Sleeps for 2 seconds then stops the task.
    """
    print("Starting to send a message every 200ms for 2s")
    msg = can.Message(
        arbitration_id=0x123, data=[1, 2, 3, 4, 5, 6], is_extended_id=False
    )
    task = bus.send_periodic(msg, 0.20)
    assert isinstance(task, can.CyclicSendTaskABC)
    time.sleep(2)
    task.stop()
    print("stopped cyclic send")


def limited_periodic_send(bus):
    """Send using LimitedDurationCyclicSendTaskABC."""
    print("Starting to send a message every 200ms for 1s")
    msg = can.Message(
        arbitration_id=0x12345678, data=[0, 0, 0, 0, 0, 0], is_extended_id=True
    )
    task = bus.send_periodic(msg, 0.20, 1, store_task=False)
    if not isinstance(task, can.LimitedDurationCyclicSendTaskABC):
        print("This interface doesn't seem to support LimitedDurationCyclicSendTaskABC")
        task.stop()
        return

    time.sleep(2)
    print("Cyclic send should have stopped as duration expired")
    # Note the (finished) task will still be tracked by the Bus
    # unless we pass `store_task=False` to bus.send_periodic
    # alternatively calling stop removes the task from the bus
    # task.stop()


def test_periodic_send_with_modifying_data(bus):
    """Send using ModifiableCyclicTaskABC."""
    print("Starting to send a message every 200ms. Initial data is four consecutive 1s")
    msg = can.Message(arbitration_id=0x0CF02200, data=[1, 1, 1, 1])
    task = bus.send_periodic(msg, 0.20)
    if not isinstance(task, can.ModifiableCyclicTaskABC):
        print("This interface doesn't seem to support modification")
        task.stop()
        return
    time.sleep(2)
    print("Changing data of running task to begin with 99")
    msg.data[0] = 0x99
    task.modify_data(msg)
    time.sleep(2)

    task.stop()
    print("stopped cyclic send")
    print("Changing data of stopped task to single ff byte")
    msg.data = bytearray([0xFF])
    msg.dlc = 1
    task.modify_data(msg)
    time.sleep(1)
    print("starting again")
    task.start()
    time.sleep(1)
    task.stop()
    print("done")


# Will have to consider how to expose items like this. The socketcan
# interfaces will continue to support it... but the top level api won't.
# def test_dual_rate_periodic_send():
#     """Send a message 10 times at 1ms intervals, then continue to send every 500ms"""
#     msg = can.Message(arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5])
#     print("Creating cyclic task to send message 10 times at 1ms, then every 500ms")
#     task = can.interface.MultiRateCyclicSendTask('vcan0', msg, 10, 0.001, 0.50)
#     time.sleep(2)
#
#     print("Changing data[0] = 0x42")
#     msg.data[0] = 0x42
#     task.modify_data(msg)
#     time.sleep(2)
#
#     task.stop()
#     print("stopped cyclic send")
#
#     time.sleep(2)
#
#     task.start()
#     print("starting again")
#     time.sleep(2)
#     task.stop()
#     print("done")


def main():
    """Test different cyclic sending tasks."""
    reset_msg = can.Message(
        arbitration_id=0x00, data=[0, 0, 0, 0, 0, 0], is_extended_id=False
    )

    with can.Bus(interface="virtual") as bus:
        bus.send(reset_msg)

        simple_periodic_send(bus)

        bus.send(reset_msg)

        limited_periodic_send(bus)

        test_periodic_send_with_modifying_data(bus)

        # print("Carrying out multirate cyclic test for {} interface".format(interface))
        # can.rc['interface'] = interface
        # test_dual_rate_periodic_send()

    time.sleep(2)


if __name__ == "__main__":
    main()

Message Sending Tasks

The class based api for the broadcast manager uses a series of mixin classes. All mixins inherit from CyclicSendTaskABC which inherits from CyclicTask.

class can.broadcastmanager.CyclicTask[source]

Bases: object

Abstract Base for all cyclic tasks.

stop()[source]

Cancel this periodic task.

Raises:can.CanError – If stop is called on an already stopped task.
class can.broadcastmanager.CyclicSendTaskABC(messages, period)[source]

Bases: can.broadcastmanager.CyclicTask

Message send task with defined period

Parameters:
  • messages (Union[Sequence[Message], Message]) – The messages to be sent periodically.
  • period (float) – The rate in seconds at which to send the messages.
class can.broadcastmanager.LimitedDurationCyclicSendTaskABC(messages, period, duration)[source]

Bases: can.broadcastmanager.CyclicSendTaskABC

Message send task with a defined duration and period.

Parameters:
  • messages (Union[Sequence[Message], Message]) – The messages to be sent periodically.
  • period (float) – The rate in seconds at which to send the messages.
  • duration (Optional[float]) – Approximate duration in seconds to continue sending messages. If no duration is provided, the task will continue indefinitely.
class can.broadcastmanager.MultiRateCyclicSendTaskABC(channel, messages, count, initial_period, subsequent_period)[source]

Bases: can.broadcastmanager.CyclicSendTaskABC

A Cyclic send task that supports switches send frequency after a set time.

Transmits a message count times at initial_period then continues to transmit messages at subsequent_period.

Parameters:
  • channel (Union[int, str]) – See interface specific documentation.
  • messages (Union[Sequence[Message], Message]) –
  • count (int) –
  • initial_period (float) –
  • subsequent_period (float) –
class can.ModifiableCyclicTaskABC(messages, period)[source]

Bases: can.broadcastmanager.CyclicSendTaskABC

Adds support for modifying a periodic message

Parameters:
  • messages (Union[Sequence[Message], Message]) – The messages to be sent periodically.
  • period (float) – The rate in seconds at which to send the messages.
modify_data(messages)[source]

Update the contents of the periodically sent messages, without altering the timing.

Parameters:messages (Union[Sequence[Message], Message]) –

The messages with the new Message.data.

Note: The arbitration ID cannot be changed.

Note: The number of new cyclic messages to be sent must be equal to the original number of messages originally specified for this task.

class can.RestartableCyclicTaskABC(messages, period)[source]

Bases: can.broadcastmanager.CyclicSendTaskABC

Adds support for restarting a stopped cyclic task

Parameters:
  • messages (Union[Sequence[Message], Message]) – The messages to be sent periodically.
  • period (float) – The rate in seconds at which to send the messages.
start()[source]

Restart a stopped periodic task.