diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index 3a25c99d8..adf7b6c5d 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -104,6 +104,7 @@ def __init__( sjw_dbr: int = 2, tseg1_dbr: int = 6, tseg2_dbr: int = 3, + listen_only: Optional[bool] = False, **kwargs: Any, ) -> None: """ @@ -157,6 +158,8 @@ def __init__( Bus timing value tseg1 (data) :param tseg2_dbr: Bus timing value tseg2 (data) + :param listen_only: + if the bus should be set to listen only mode. :raise ~can.exceptions.CanInterfaceNotImplementedError: If the current operating system is not supported or the driver could not be loaded. @@ -205,6 +208,8 @@ def __init__( self.index_to_channel: Dict[int, int] = {} self._can_protocol = CanProtocol.CAN_FD if is_fd else CanProtocol.CAN_20 + self._listen_only = listen_only + for channel in self.channels: if ( len(self.channels) == 1 @@ -232,7 +237,7 @@ def __init__( permission_mask = xlclass.XLaccess() # Set mask to request channel init permission if needed - if bitrate or fd or timing: + if bitrate or fd or timing or self._listen_only: permission_mask.value = self.mask interface_version = ( @@ -311,6 +316,9 @@ def __init__( if assert_timing: self._check_can_settings(channel_mask=self.mask, bitrate=bitrate) + if self._listen_only: + self._set_output_mode(channel_mask=self.mask, listen_only=True) + # Enable/disable TX receipts tx_receipts = 1 if receive_own_messages else 0 self.xldriver.xlCanSetChannelMode(self.port_handle, self.mask, tx_receipts, 0) @@ -445,6 +453,28 @@ def _read_bus_params( f"Channel configuration for channel {channel} not found." ) + def _set_output_mode(self, channel_mask: int, listen_only: bool) -> None: + # set parameters for channels with init access + channel_mask = channel_mask & self.permission_mask + + if channel_mask: + if listen_only: + self.xldriver.xlCanSetChannelOutput( + self.port_handle, + channel_mask, + xldefine.XL_OutputMode.XL_OUTPUT_MODE_SILENT, + ) + else: + self.xldriver.xlCanSetChannelOutput( + self.port_handle, + channel_mask, + xldefine.XL_OutputMode.XL_OUTPUT_MODE_NORMAL, + ) + + LOG.info("xlCanSetChannelOutput: listen_only=%u", listen_only) + else: + LOG.warning("No channels with init access to set listen only mode") + def _set_bitrate(self, channel_mask: int, bitrate: int) -> None: # set parameters for channels with init access channel_mask = channel_mask & self.permission_mask @@ -643,9 +673,11 @@ def _apply_filters(self, filters: Optional[CanFilters]) -> None: self.mask, can_filter["can_id"], can_filter["can_mask"], - xldefine.XL_AcceptanceFilter.XL_CAN_EXT - if can_filter.get("extended") - else xldefine.XL_AcceptanceFilter.XL_CAN_STD, + ( + xldefine.XL_AcceptanceFilter.XL_CAN_EXT + if can_filter.get("extended") + else xldefine.XL_AcceptanceFilter.XL_CAN_STD + ), ) except VectorOperationError as exception: LOG.warning("Could not set filters: %s", exception) diff --git a/test/test_vector.py b/test/test_vector.py index 99756c41d..ca46526fb 100644 --- a/test/test_vector.py +++ b/test/test_vector.py @@ -48,6 +48,7 @@ def mock_xldriver() -> None: xldriver_mock.xlCanSetChannelAcceptance = Mock(return_value=0) xldriver_mock.xlCanSetChannelBitrate = Mock(return_value=0) xldriver_mock.xlSetNotification = Mock(side_effect=xlSetNotification) + xldriver_mock.xlCanSetChannelOutput = Mock(return_value=0) # bus deactivation functions xldriver_mock.xlDeactivateChannel = Mock(return_value=0) @@ -78,6 +79,44 @@ def mock_xldriver() -> None: canlib.HAS_EVENTS = real_has_events +def test_listen_only_mocked(mock_xldriver) -> None: + bus = can.Bus(channel=0, interface="vector", listen_only=True, _testing=True) + assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_20 + + can.interfaces.vector.canlib.xldriver.xlCanSetChannelOutput.assert_called() + xlCanSetChannelOutput_args = ( + can.interfaces.vector.canlib.xldriver.xlCanSetChannelOutput.call_args[0] + ) + assert xlCanSetChannelOutput_args[2] == xldefine.XL_OutputMode.XL_OUTPUT_MODE_SILENT + + +@pytest.mark.skipif(not XLDRIVER_FOUND, reason="Vector XL API is unavailable") +def test_listen_only() -> None: + bus = can.Bus( + channel=0, + serial=_find_virtual_can_serial(), + interface="vector", + receive_own_messages=True, + listen_only=True, + ) + assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_20 + + msg = can.Message( + arbitration_id=0xC0FFEF, data=[1, 2, 3, 4, 5, 6, 7, 8], is_extended_id=True + ) + + bus.send(msg) + + received_msg = bus.recv() + + assert received_msg.arbitration_id == msg.arbitration_id + assert received_msg.data == msg.data + + bus.shutdown() + + def test_bus_creation_mocked(mock_xldriver) -> None: bus = can.Bus(channel=0, interface="vector", _testing=True) assert isinstance(bus, canlib.VectorBus)