diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 39dc1f0ba..ae47fc048 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -53,7 +53,7 @@ def stop(self) -> None: """ -class CyclicSendTaskABC(CyclicTask): +class CyclicSendTaskABC(CyclicTask, abc.ABC): """ Message send task with defined period """ @@ -114,7 +114,7 @@ def _check_and_convert_messages( return messages -class LimitedDurationCyclicSendTaskABC(CyclicSendTaskABC): +class LimitedDurationCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC): def __init__( self, messages: Union[Sequence[Message], Message], @@ -136,7 +136,7 @@ def __init__( self.duration = duration -class RestartableCyclicTaskABC(CyclicSendTaskABC): +class RestartableCyclicTaskABC(CyclicSendTaskABC, abc.ABC): """Adds support for restarting a stopped cyclic task""" @abc.abstractmethod @@ -144,9 +144,7 @@ def start(self) -> None: """Restart a stopped periodic task.""" -class ModifiableCyclicTaskABC(CyclicSendTaskABC): - """Adds support for modifying a periodic message""" - +class ModifiableCyclicTaskABC(CyclicSendTaskABC, abc.ABC): def _check_modified_messages(self, messages: Tuple[Message, ...]) -> None: """Helper function to perform error checking when modifying the data in the cyclic task. @@ -190,7 +188,7 @@ def modify_data(self, messages: Union[Sequence[Message], Message]) -> None: self.messages = messages -class MultiRateCyclicSendTaskABC(CyclicSendTaskABC): +class MultiRateCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC): """A Cyclic send task that supports switches send frequency after a set time.""" def __init__( @@ -218,7 +216,7 @@ def __init__( class ThreadBasedCyclicSendTask( - ModifiableCyclicTaskABC, LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC + LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC ): """Fallback cyclic send task using daemon thread.""" @@ -230,6 +228,7 @@ def __init__( period: float, duration: Optional[float] = None, on_error: Optional[Callable[[Exception], bool]] = None, + modifier_callback: Optional[Callable[[Message], None]] = None, ) -> None: """Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`. @@ -255,6 +254,7 @@ def __init__( time.perf_counter() + duration if duration else None ) self.on_error = on_error + self.modifier_callback = modifier_callback if USE_WINDOWS_EVENTS: self.period_ms = int(round(period * 1000, 0)) @@ -301,14 +301,22 @@ def _run(self) -> None: # Prevent calling bus.send from multiple threads with self.send_lock: try: + if self.modifier_callback is not None: + self.modifier_callback(self.messages[msg_index]) self.bus.send(self.messages[msg_index]) except Exception as exc: # pylint: disable=broad-except log.exception(exc) - if self.on_error: - if not self.on_error(exc): - break - else: + + # stop if `on_error` callback was not given + if self.on_error is None: + self.stop() + raise exc + + # stop if `on_error` returns False + if not self.on_error(exc): + self.stop() break + msg_due_time_ns += self.period_ns if self.end_time is not None and time.perf_counter() >= self.end_time: break diff --git a/can/bus.py b/can/bus.py index b7a54dbb1..9c65ad52f 100644 --- a/can/bus.py +++ b/can/bus.py @@ -8,7 +8,7 @@ from abc import ABC, ABCMeta, abstractmethod from enum import Enum, auto from time import time -from typing import Any, Iterator, List, Optional, Sequence, Tuple, Union, cast +from typing import Any, Callable, Iterator, List, Optional, Sequence, Tuple, Union, cast import can import can.typechecking @@ -195,6 +195,7 @@ def send_periodic( period: float, duration: Optional[float] = None, store_task: bool = True, + modifier_callback: Optional[Callable[[Message], None]] = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Start sending messages at a given period on this bus. @@ -216,6 +217,10 @@ def send_periodic( :param store_task: If True (the default) the task will be attached to this Bus instance. Disable to instead manage tasks manually. + :param modifier_callback: + Function which should be used to modify each message's data before + sending. The callback modifies the :attr:`~can.Message.data` of the + message and returns ``None``. :return: A started task instance. Note the task can be stopped (and depending on the backend modified) by calling the task's @@ -230,7 +235,7 @@ def send_periodic( .. note:: - For extremely long running Bus instances with many short lived + For extremely long-running Bus instances with many short-lived tasks the default api with ``store_task==True`` may not be appropriate as the stopped tasks are still taking up memory as they are associated with the Bus instance. @@ -247,9 +252,8 @@ def send_periodic( # Create a backend specific task; will be patched to a _SelfRemovingCyclicTask later task = cast( _SelfRemovingCyclicTask, - self._send_periodic_internal(msgs, period, duration), + self._send_periodic_internal(msgs, period, duration, modifier_callback), ) - # we wrap the task's stop method to also remove it from the Bus's list of tasks periodic_tasks = self._periodic_tasks original_stop_method = task.stop @@ -275,6 +279,7 @@ def _send_periodic_internal( msgs: Union[Sequence[Message], Message], period: float, duration: Optional[float] = None, + modifier_callback: Optional[Callable[[Message], None]] = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Default implementation of periodic message sending using threading. @@ -298,7 +303,12 @@ def _send_periodic_internal( threading.Lock() ) task = ThreadBasedCyclicSendTask( - self, self._lock_send_periodic, msgs, period, duration + bus=self, + lock=self._lock_send_periodic, + messages=msgs, + period=period, + duration=duration, + modifier_callback=modifier_callback, ) return task diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index b28c93541..d47fc2d6a 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -1,9 +1,13 @@ -from typing import Optional +from typing import Callable, Optional, Sequence, Union import can.interfaces.ixxat.canlib_vcinpl as vcinpl import can.interfaces.ixxat.canlib_vcinpl2 as vcinpl2 -from can import BusABC, Message -from can.bus import BusState +from can import ( + BusABC, + BusState, + CyclicSendTaskABC, + Message, +) class IXXATBus(BusABC): @@ -145,8 +149,16 @@ def _recv_internal(self, timeout): def send(self, msg: Message, timeout: Optional[float] = None) -> None: return self.bus.send(msg, timeout) - def _send_periodic_internal(self, msgs, period, duration=None): - return self.bus._send_periodic_internal(msgs, period, duration) + def _send_periodic_internal( + self, + msgs: Union[Sequence[Message], Message], + period: float, + duration: Optional[float] = None, + modifier_callback: Optional[Callable[[Message], None]] = None, + ) -> CyclicSendTaskABC: + return self.bus._send_periodic_internal( + msgs, period, duration, modifier_callback + ) def shutdown(self) -> None: super().shutdown() diff --git a/can/interfaces/ixxat/canlib_vcinpl.py b/can/interfaces/ixxat/canlib_vcinpl.py index 5a366cc30..cbf2fb61c 100644 --- a/can/interfaces/ixxat/canlib_vcinpl.py +++ b/can/interfaces/ixxat/canlib_vcinpl.py @@ -13,14 +13,18 @@ import functools import logging import sys -from typing import Callable, Optional, Tuple - -from can import BusABC, CanProtocol, Message -from can.broadcastmanager import ( +import warnings +from typing import Callable, Optional, Sequence, Tuple, Union + +from can import ( + BusABC, + BusState, + CanProtocol, + CyclicSendTaskABC, LimitedDurationCyclicSendTaskABC, + Message, RestartableCyclicTaskABC, ) -from can.bus import BusState from can.ctypesutil import HANDLE, PHANDLE, CLibrary from can.ctypesutil import HRESULT as ctypes_HRESULT from can.exceptions import CanInitializationError, CanInterfaceNotImplementedError @@ -785,17 +789,39 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: # Want to log outgoing messages? # log.log(self.RECV_LOGGING_LEVEL, "Sent: %s", message) - def _send_periodic_internal(self, msgs, period, duration=None): + def _send_periodic_internal( + self, + msgs: Union[Sequence[Message], Message], + period: float, + duration: Optional[float] = None, + modifier_callback: Optional[Callable[[Message], None]] = None, + ) -> CyclicSendTaskABC: """Send a message using built-in cyclic transmit list functionality.""" - if self._scheduler is None: - self._scheduler = HANDLE() - _canlib.canSchedulerOpen(self._device_handle, self.channel, self._scheduler) - caps = structures.CANCAPABILITIES() - _canlib.canSchedulerGetCaps(self._scheduler, caps) - self._scheduler_resolution = caps.dwClockFreq / caps.dwCmsDivisor - _canlib.canSchedulerActivate(self._scheduler, constants.TRUE) - return CyclicSendTask( - self._scheduler, msgs, period, duration, self._scheduler_resolution + if modifier_callback is None: + if self._scheduler is None: + self._scheduler = HANDLE() + _canlib.canSchedulerOpen( + self._device_handle, self.channel, self._scheduler + ) + caps = structures.CANCAPABILITIES() + _canlib.canSchedulerGetCaps(self._scheduler, caps) + self._scheduler_resolution = caps.dwClockFreq / caps.dwCmsDivisor + _canlib.canSchedulerActivate(self._scheduler, constants.TRUE) + return CyclicSendTask( + self._scheduler, msgs, period, duration, self._scheduler_resolution + ) + + # fallback to thread based cyclic task + warnings.warn( + f"{self.__class__.__name__} falls back to a thread-based cyclic task, " + "when the `modifier_callback` argument is given." + ) + return BusABC._send_periodic_internal( + self, + msgs=msgs, + period=period, + duration=duration, + modifier_callback=modifier_callback, ) def shutdown(self): diff --git a/can/interfaces/ixxat/canlib_vcinpl2.py b/can/interfaces/ixxat/canlib_vcinpl2.py index 446b3e35c..b796be744 100644 --- a/can/interfaces/ixxat/canlib_vcinpl2.py +++ b/can/interfaces/ixxat/canlib_vcinpl2.py @@ -13,11 +13,15 @@ import functools import logging import sys -from typing import Callable, Optional, Tuple +import warnings +from typing import Callable, Optional, Sequence, Tuple, Union -from can import BusABC, CanProtocol, Message -from can.broadcastmanager import ( +from can import ( + BusABC, + CanProtocol, + CyclicSendTaskABC, LimitedDurationCyclicSendTaskABC, + Message, RestartableCyclicTaskABC, ) from can.ctypesutil import HANDLE, PHANDLE, CLibrary @@ -931,19 +935,41 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: else: _canlib.canChannelPostMessage(self._channel_handle, message) - def _send_periodic_internal(self, msgs, period, duration=None): + def _send_periodic_internal( + self, + msgs: Union[Sequence[Message], Message], + period: float, + duration: Optional[float] = None, + modifier_callback: Optional[Callable[[Message], None]] = None, + ) -> CyclicSendTaskABC: """Send a message using built-in cyclic transmit list functionality.""" - if self._scheduler is None: - self._scheduler = HANDLE() - _canlib.canSchedulerOpen(self._device_handle, self.channel, self._scheduler) - caps = structures.CANCAPABILITIES2() - _canlib.canSchedulerGetCaps(self._scheduler, caps) - self._scheduler_resolution = ( - caps.dwCmsClkFreq / caps.dwCmsDivisor - ) # TODO: confirm - _canlib.canSchedulerActivate(self._scheduler, constants.TRUE) - return CyclicSendTask( - self._scheduler, msgs, period, duration, self._scheduler_resolution + if modifier_callback is None: + if self._scheduler is None: + self._scheduler = HANDLE() + _canlib.canSchedulerOpen( + self._device_handle, self.channel, self._scheduler + ) + caps = structures.CANCAPABILITIES2() + _canlib.canSchedulerGetCaps(self._scheduler, caps) + self._scheduler_resolution = ( + caps.dwCmsClkFreq / caps.dwCmsDivisor + ) # TODO: confirm + _canlib.canSchedulerActivate(self._scheduler, constants.TRUE) + return CyclicSendTask( + self._scheduler, msgs, period, duration, self._scheduler_resolution + ) + + # fallback to thread based cyclic task + warnings.warn( + f"{self.__class__.__name__} falls back to a thread-based cyclic task, " + "when the `modifier_callback` argument is given." + ) + return BusABC._send_periodic_internal( + self, + msgs=msgs, + period=period, + duration=duration, + modifier_callback=modifier_callback, ) def shutdown(self): diff --git a/can/interfaces/socketcan/socketcan.py b/can/interfaces/socketcan/socketcan.py index a3f74bb82..44cecec76 100644 --- a/can/interfaces/socketcan/socketcan.py +++ b/can/interfaces/socketcan/socketcan.py @@ -14,7 +14,8 @@ import struct import threading import time -from typing import Dict, List, Optional, Sequence, Tuple, Type, Union +import warnings +from typing import Callable, Dict, List, Optional, Sequence, Tuple, Type, Union log = logging.getLogger(__name__) log_tx = log.getChild("tx") @@ -806,7 +807,8 @@ def _send_periodic_internal( msgs: Union[Sequence[Message], Message], period: float, duration: Optional[float] = None, - ) -> CyclicSendTask: + modifier_callback: Optional[Callable[[Message], None]] = None, + ) -> can.broadcastmanager.CyclicSendTaskABC: """Start sending messages at a given period on this bus. The Linux kernel's Broadcast Manager SocketCAN API is used to schedule @@ -838,15 +840,29 @@ def _send_periodic_internal( general the message will be sent at the given rate until at least *duration* seconds. """ - msgs = LimitedDurationCyclicSendTaskABC._check_and_convert_messages( # pylint: disable=protected-access - msgs - ) + if modifier_callback is None: + msgs = LimitedDurationCyclicSendTaskABC._check_and_convert_messages( # pylint: disable=protected-access + msgs + ) + + msgs_channel = str(msgs[0].channel) if msgs[0].channel else None + bcm_socket = self._get_bcm_socket(msgs_channel or self.channel) + task_id = self._get_next_task_id() + task = CyclicSendTask(bcm_socket, task_id, msgs, period, duration) + return task - msgs_channel = str(msgs[0].channel) if msgs[0].channel else None - bcm_socket = self._get_bcm_socket(msgs_channel or self.channel) - task_id = self._get_next_task_id() - task = CyclicSendTask(bcm_socket, task_id, msgs, period, duration) - return task + # fallback to thread based cyclic task + warnings.warn( + f"{self.__class__.__name__} falls back to a thread-based cyclic task, " + "when the `modifier_callback` argument is given." + ) + return BusABC._send_periodic_internal( + self, + msgs=msgs, + period=period, + duration=duration, + modifier_callback=modifier_callback, + ) def _get_next_task_id(self) -> int: with self._task_id_guard: diff --git a/examples/cyclic_checksum.py b/examples/cyclic_checksum.py new file mode 100644 index 000000000..3ab6c78ac --- /dev/null +++ b/examples/cyclic_checksum.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +""" +This example demonstrates how to send a periodic message containing +an automatically updating counter and checksum. + +Expects a virtual interface: + + python3 -m examples.cyclic_checksum +""" + +import logging +import time + +import can + +logging.basicConfig(level=logging.INFO) + + +def cyclic_checksum_send(bus: can.BusABC) -> None: + """ + Sends periodic messages every 1 s with no explicit timeout. + The message's counter and checksum is updated before each send. + Sleeps for 10 seconds then stops the task. + """ + message = can.Message(arbitration_id=0x78, data=[0, 1, 2, 3, 4, 5, 6, 0]) + print("Starting to send an auto-updating message every 100ms for 3 s") + task = bus.send_periodic(msgs=message, period=0.1, modifier_callback=update_message) + time.sleep(3) + task.stop() + print("stopped cyclic send") + + +def update_message(message: can.Message) -> None: + counter = increment_counter(message) + checksum = compute_xbr_checksum(message, counter) + message.data[7] = (checksum << 4) + counter + + +def increment_counter(message: can.Message) -> int: + counter = message.data[7] & 0x0F + counter += 1 + counter %= 16 + + return counter + + +def compute_xbr_checksum(message: can.Message, counter: int) -> int: + """ + Computes an XBR checksum as per SAE J1939 SPN 3188. + """ + checksum = sum(message.data[:7]) + checksum += sum(message.arbitration_id.to_bytes(length=4, byteorder="big")) + checksum += counter & 0x0F + xbr_checksum = ((checksum >> 4) + checksum) & 0x0F + + return xbr_checksum + + +if __name__ == "__main__": + with can.Bus(channel=0, interface="virtual", receive_own_messages=True) as _bus: + notifier = can.Notifier(bus=_bus, listeners=[print]) + cyclic_checksum_send(_bus) + notifier.stop() diff --git a/test/simplecyclic_test.py b/test/simplecyclic_test.py index 9e01be457..650a1fddf 100644 --- a/test/simplecyclic_test.py +++ b/test/simplecyclic_test.py @@ -5,8 +5,10 @@ """ import gc +import time import unittest from time import sleep +from typing import List from unittest.mock import MagicMock import can @@ -160,34 +162,73 @@ def test_thread_based_cyclic_send_task(self): # good case, bus is up on_error_mock = MagicMock(return_value=False) task = can.broadcastmanager.ThreadBasedCyclicSendTask( - bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock + bus=bus, + lock=bus._lock_send_periodic, + messages=msg, + period=0.1, + duration=3, + on_error=on_error_mock, ) - task.start() sleep(1) on_error_mock.assert_not_called() task.stop() bus.shutdown() - # bus has been shutted down + # bus has been shut down on_error_mock = MagicMock(return_value=False) task = can.broadcastmanager.ThreadBasedCyclicSendTask( - bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock + bus=bus, + lock=bus._lock_send_periodic, + messages=msg, + period=0.1, + duration=3, + on_error=on_error_mock, ) - task.start() sleep(1) - self.assertEqual(on_error_mock.call_count, 1) + self.assertEqual(1, on_error_mock.call_count) task.stop() - # bus is still shutted down, but on_error returns True + # bus is still shut down, but on_error returns True on_error_mock = MagicMock(return_value=True) task = can.broadcastmanager.ThreadBasedCyclicSendTask( - bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock + bus=bus, + lock=bus._lock_send_periodic, + messages=msg, + period=0.1, + duration=3, + on_error=on_error_mock, ) - task.start() sleep(1) self.assertTrue(on_error_mock.call_count > 1) task.stop() + def test_modifier_callback(self) -> None: + msg_list: List[can.Message] = [] + + def increment_first_byte(msg: can.Message) -> None: + msg.data[0] += 1 + + original_msg = can.Message( + is_extended_id=False, arbitration_id=0x123, data=[0] * 8 + ) + + with can.ThreadSafeBus(interface="virtual", receive_own_messages=True) as bus: + notifier = can.Notifier(bus=bus, listeners=[msg_list.append]) + task = bus.send_periodic( + msgs=original_msg, period=0.001, modifier_callback=increment_first_byte + ) + time.sleep(0.2) + task.stop() + notifier.stop() + + self.assertEqual(b"\x01\x00\x00\x00\x00\x00\x00\x00", bytes(msg_list[0].data)) + self.assertEqual(b"\x02\x00\x00\x00\x00\x00\x00\x00", bytes(msg_list[1].data)) + self.assertEqual(b"\x03\x00\x00\x00\x00\x00\x00\x00", bytes(msg_list[2].data)) + self.assertEqual(b"\x04\x00\x00\x00\x00\x00\x00\x00", bytes(msg_list[3].data)) + self.assertEqual(b"\x05\x00\x00\x00\x00\x00\x00\x00", bytes(msg_list[4].data)) + self.assertEqual(b"\x06\x00\x00\x00\x00\x00\x00\x00", bytes(msg_list[5].data)) + self.assertEqual(b"\x07\x00\x00\x00\x00\x00\x00\x00", bytes(msg_list[6].data)) + if __name__ == "__main__": unittest.main()