diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index b7ac9fbd4..c4bd2b76a 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -28,3 +28,4 @@ Jan Goeteyn "ykzheng" Lear Corporation Nick Black +Francisco Javier Burgos Macia diff --git a/can/interfaces/ixxat/__init__.py b/can/interfaces/ixxat/__init__.py index fc2cae0f3..347caed50 100644 --- a/can/interfaces/ixxat/__init__.py +++ b/can/interfaces/ixxat/__init__.py @@ -4,4 +4,7 @@ Copyright (C) 2016 Giuseppe Corbelli """ -from can.interfaces.ixxat.canlib import IXXATBus, get_ixxat_hwids +from can.interfaces.ixxat.canlib import IXXATBus +from can.interfaces.ixxat.canlib_vcinpl import ( + get_ixxat_hwids, +) # import this and not the one from vcinpl2 for backward compatibility diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index ced840ff3..4dc0d3e6e 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -1,848 +1,147 @@ -""" -Ctypes wrapper module for IXXAT Virtual CAN Interface V3 on win32 systems - -TODO: We could implement this interface such that setting other filters - could work when the initial filters were set to zero using the - software fallback. Or could the software filters even be changed - after the connection was opened? We need to document that bahaviour! - See also the NICAN interface. - -""" - -import ctypes -import functools -import logging -import sys -from typing import Optional +import can.interfaces.ixxat.canlib_vcinpl as vcinpl +import can.interfaces.ixxat.canlib_vcinpl2 as vcinpl2 from can import BusABC, Message -from can.exceptions import CanInterfaceNotImplementedError, CanInitializationError -from can.broadcastmanager import ( - LimitedDurationCyclicSendTaskABC, - RestartableCyclicTaskABC, -) -from can.ctypesutil import CLibrary, HANDLE, PHANDLE, HRESULT as ctypes_HRESULT - -from . import constants, structures -from .exceptions import * - -__all__ = [ - "VCITimeout", - "VCIError", - "VCIBusOffError", - "VCIDeviceNotFoundError", - "IXXATBus", - "vciFormatError", -] - -log = logging.getLogger("can.ixxat") - -from time import perf_counter as _timer_function - -# Hack to have vciFormatError as a free function, see below -vciFormatError = None - -# main ctypes instance -_canlib = None -# TODO: Use ECI driver for linux -if sys.platform == "win32" or sys.platform == "cygwin": - try: - _canlib = CLibrary("vcinpl.dll") - except Exception as e: - log.warning("Cannot load IXXAT vcinpl library: %s", e) -else: - # Will not work on other systems, but have it importable anyway for - # tests/sphinx - log.warning("IXXAT VCI library does not work on %s platform", sys.platform) - - -def __vciFormatErrorExtended(library_instance, function, HRESULT, arguments): - """Format a VCI error and attach failed function, decoded HRESULT and arguments - :param CLibrary library_instance: - Mapped instance of IXXAT vcinpl library - :param callable function: - Failed function - :param HRESULT HRESULT: - HRESULT returned by vcinpl call - :param arguments: - Arbitrary arguments tuple - :return: - Formatted string - """ - # TODO: make sure we don't generate another exception - return "{} - arguments were {}".format( - __vciFormatError(library_instance, function, HRESULT), arguments - ) - - -def __vciFormatError(library_instance, function, HRESULT): - """Format a VCI error and attach failed function and decoded HRESULT - :param CLibrary library_instance: - Mapped instance of IXXAT vcinpl library - :param callable function: - Failed function - :param HRESULT HRESULT: - HRESULT returned by vcinpl call - :return: - Formatted string - """ - buf = ctypes.create_string_buffer(constants.VCI_MAX_ERRSTRLEN) - ctypes.memset(buf, 0, constants.VCI_MAX_ERRSTRLEN) - library_instance.vciFormatError(HRESULT, buf, constants.VCI_MAX_ERRSTRLEN) - return "function {} failed ({})".format( - function._name, buf.value.decode("utf-8", "replace") - ) - - -def __check_status(result, function, arguments): - """ - Check the result of a vcinpl function call and raise appropriate exception - in case of an error. Used as errcheck function when mapping C functions - with ctypes. - :param result: - Function call numeric result - :param callable function: - Called function - :param arguments: - Arbitrary arguments tuple - :raise: - :class:VCITimeout - :class:VCIRxQueueEmptyError - :class:StopIteration - :class:VCIError - """ - if isinstance(result, int): - # Real return value is an unsigned long - result = ctypes.c_ulong(result).value - - if result == constants.VCI_E_TIMEOUT: - raise VCITimeout("Function {} timed out".format(function._name)) - elif result == constants.VCI_E_RXQUEUE_EMPTY: - raise VCIRxQueueEmptyError() - elif result == constants.VCI_E_NO_MORE_ITEMS: - raise StopIteration() - elif result == constants.VCI_E_ACCESSDENIED: - pass # not a real error, might happen if another program has initialized the bus - elif result != constants.VCI_OK: - raise VCIError(vciFormatError(function, result)) - - return result - - -try: - # Map all required symbols and initialize library --------------------------- - # HRESULT VCIAPI vciInitialize ( void ); - _canlib.map_symbol("vciInitialize", ctypes.c_long, (), __check_status) - - # void VCIAPI vciFormatError (HRESULT hrError, PCHAR pszText, UINT32 dwsize); - _canlib.map_symbol( - "vciFormatError", None, (ctypes_HRESULT, ctypes.c_char_p, ctypes.c_uint32) - ) - # Hack to have vciFormatError as a free function - vciFormatError = functools.partial(__vciFormatError, _canlib) - - # HRESULT VCIAPI vciEnumDeviceOpen( OUT PHANDLE hEnum ); - _canlib.map_symbol("vciEnumDeviceOpen", ctypes.c_long, (PHANDLE,), __check_status) - # HRESULT VCIAPI vciEnumDeviceClose ( IN HANDLE hEnum ); - _canlib.map_symbol("vciEnumDeviceClose", ctypes.c_long, (HANDLE,), __check_status) - # HRESULT VCIAPI vciEnumDeviceNext( IN HANDLE hEnum, OUT PVCIDEVICEINFO pInfo ); - _canlib.map_symbol( - "vciEnumDeviceNext", - ctypes.c_long, - (HANDLE, structures.PVCIDEVICEINFO), - __check_status, - ) - - # HRESULT VCIAPI vciDeviceOpen( IN REFVCIID rVciid, OUT PHANDLE phDevice ); - _canlib.map_symbol( - "vciDeviceOpen", ctypes.c_long, (structures.PVCIID, PHANDLE), __check_status - ) - # HRESULT vciDeviceClose( HANDLE hDevice ) - _canlib.map_symbol("vciDeviceClose", ctypes.c_long, (HANDLE,), __check_status) - - # HRESULT VCIAPI canChannelOpen( IN HANDLE hDevice, IN UINT32 dwCanNo, IN BOOL fExclusive, OUT PHANDLE phCanChn ); - _canlib.map_symbol( - "canChannelOpen", - ctypes.c_long, - (HANDLE, ctypes.c_uint32, ctypes.c_long, PHANDLE), - __check_status, - ) - # EXTERN_C HRESULT VCIAPI canChannelInitialize( IN HANDLE hCanChn, IN UINT16 wRxFifoSize, IN UINT16 wRxThreshold, IN UINT16 wTxFifoSize, IN UINT16 wTxThreshold ); - _canlib.map_symbol( - "canChannelInitialize", - ctypes.c_long, - (HANDLE, ctypes.c_uint16, ctypes.c_uint16, ctypes.c_uint16, ctypes.c_uint16), - __check_status, - ) - # EXTERN_C HRESULT VCIAPI canChannelActivate( IN HANDLE hCanChn, IN BOOL fEnable ); - _canlib.map_symbol( - "canChannelActivate", ctypes.c_long, (HANDLE, ctypes.c_long), __check_status - ) - # HRESULT canChannelClose( HANDLE hChannel ) - _canlib.map_symbol("canChannelClose", ctypes.c_long, (HANDLE,), __check_status) - # EXTERN_C HRESULT VCIAPI canChannelReadMessage( IN HANDLE hCanChn, IN UINT32 dwMsTimeout, OUT PCANMSG pCanMsg ); - _canlib.map_symbol( - "canChannelReadMessage", - ctypes.c_long, - (HANDLE, ctypes.c_uint32, structures.PCANMSG), - __check_status, - ) - # HRESULT canChannelPeekMessage(HANDLE hChannel,PCANMSG pCanMsg ); - _canlib.map_symbol( - "canChannelPeekMessage", - ctypes.c_long, - (HANDLE, structures.PCANMSG), - __check_status, - ) - # HRESULT canChannelWaitTxEvent (HANDLE hChannel UINT32 dwMsTimeout ); - _canlib.map_symbol( - "canChannelWaitTxEvent", - ctypes.c_long, - (HANDLE, ctypes.c_uint32), - __check_status, - ) - # HRESULT canChannelWaitRxEvent (HANDLE hChannel, UINT32 dwMsTimeout ); - _canlib.map_symbol( - "canChannelWaitRxEvent", - ctypes.c_long, - (HANDLE, ctypes.c_uint32), - __check_status, - ) - # HRESULT canChannelPostMessage (HANDLE hChannel, PCANMSG pCanMsg ); - _canlib.map_symbol( - "canChannelPostMessage", - ctypes.c_long, - (HANDLE, structures.PCANMSG), - __check_status, - ) - # HRESULT canChannelSendMessage (HANDLE hChannel, UINT32 dwMsTimeout, PCANMSG pCanMsg ); - _canlib.map_symbol( - "canChannelSendMessage", - ctypes.c_long, - (HANDLE, ctypes.c_uint32, structures.PCANMSG), - __check_status, - ) - - # EXTERN_C HRESULT VCIAPI canControlOpen( IN HANDLE hDevice, IN UINT32 dwCanNo, OUT PHANDLE phCanCtl ); - _canlib.map_symbol( - "canControlOpen", - ctypes.c_long, - (HANDLE, ctypes.c_uint32, PHANDLE), - __check_status, - ) - # EXTERN_C HRESULT VCIAPI canControlInitialize( IN HANDLE hCanCtl, IN UINT8 bMode, IN UINT8 bBtr0, IN UINT8 bBtr1 ); - _canlib.map_symbol( - "canControlInitialize", - ctypes.c_long, - (HANDLE, ctypes.c_uint8, ctypes.c_uint8, ctypes.c_uint8), - __check_status, - ) - # EXTERN_C HRESULT VCIAPI canControlClose( IN HANDLE hCanCtl ); - _canlib.map_symbol("canControlClose", ctypes.c_long, (HANDLE,), __check_status) - # EXTERN_C HRESULT VCIAPI canControlReset( IN HANDLE hCanCtl ); - _canlib.map_symbol("canControlReset", ctypes.c_long, (HANDLE,), __check_status) - # EXTERN_C HRESULT VCIAPI canControlStart( IN HANDLE hCanCtl, IN BOOL fStart ); - _canlib.map_symbol( - "canControlStart", ctypes.c_long, (HANDLE, ctypes.c_long), __check_status - ) - # EXTERN_C HRESULT VCIAPI canControlGetStatus( IN HANDLE hCanCtl, OUT PCANLINESTATUS pStatus ); - _canlib.map_symbol( - "canControlGetStatus", - ctypes.c_long, - (HANDLE, structures.PCANLINESTATUS), - __check_status, - ) - # EXTERN_C HRESULT VCIAPI canControlGetCaps( IN HANDLE hCanCtl, OUT PCANCAPABILITIES pCanCaps ); - _canlib.map_symbol( - "canControlGetCaps", - ctypes.c_long, - (HANDLE, structures.PCANCAPABILITIES), - __check_status, - ) - # EXTERN_C HRESULT VCIAPI canControlSetAccFilter( IN HANDLE hCanCtl, IN BOOL fExtend, IN UINT32 dwCode, IN UINT32 dwMask ); - _canlib.map_symbol( - "canControlSetAccFilter", - ctypes.c_long, - (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32), - __check_status, - ) - # EXTERN_C HRESULT canControlAddFilterIds (HANDLE hControl, BOOL fExtended, UINT32 dwCode, UINT32 dwMask); - _canlib.map_symbol( - "canControlAddFilterIds", - ctypes.c_long, - (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32), - __check_status, - ) - # EXTERN_C HRESULT canControlRemFilterIds (HANDLE hControl, BOOL fExtendend, UINT32 dwCode, UINT32 dwMask ); - _canlib.map_symbol( - "canControlRemFilterIds", - ctypes.c_long, - (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32), - __check_status, - ) - # EXTERN_C HRESULT canSchedulerOpen (HANDLE hDevice, UINT32 dwCanNo, PHANDLE phScheduler ); - _canlib.map_symbol( - "canSchedulerOpen", - ctypes.c_long, - (HANDLE, ctypes.c_uint32, PHANDLE), - __check_status, - ) - # EXTERN_C HRESULT canSchedulerClose (HANDLE hScheduler ); - _canlib.map_symbol("canSchedulerClose", ctypes.c_long, (HANDLE,), __check_status) - # EXTERN_C HRESULT canSchedulerGetCaps (HANDLE hScheduler, PCANCAPABILITIES pCaps ); - _canlib.map_symbol( - "canSchedulerGetCaps", - ctypes.c_long, - (HANDLE, structures.PCANCAPABILITIES), - __check_status, - ) - # EXTERN_C HRESULT canSchedulerActivate ( HANDLE hScheduler, BOOL fEnable ); - _canlib.map_symbol( - "canSchedulerActivate", ctypes.c_long, (HANDLE, ctypes.c_int), __check_status - ) - # EXTERN_C HRESULT canSchedulerAddMessage (HANDLE hScheduler, PCANCYCLICTXMSG pMessage, PUINT32 pdwIndex ); - _canlib.map_symbol( - "canSchedulerAddMessage", - ctypes.c_long, - (HANDLE, structures.PCANCYCLICTXMSG, ctypes.POINTER(ctypes.c_uint32)), - __check_status, - ) - # EXTERN_C HRESULT canSchedulerRemMessage (HANDLE hScheduler, UINT32 dwIndex ); - _canlib.map_symbol( - "canSchedulerRemMessage", - ctypes.c_long, - (HANDLE, ctypes.c_uint32), - __check_status, - ) - # EXTERN_C HRESULT canSchedulerStartMessage (HANDLE hScheduler, UINT32 dwIndex, UINT16 dwCount ); - _canlib.map_symbol( - "canSchedulerStartMessage", - ctypes.c_long, - (HANDLE, ctypes.c_uint32, ctypes.c_uint16), - __check_status, - ) - # EXTERN_C HRESULT canSchedulerStopMessage (HANDLE hScheduler, UINT32 dwIndex ); - _canlib.map_symbol( - "canSchedulerStopMessage", - ctypes.c_long, - (HANDLE, ctypes.c_uint32), - __check_status, - ) - _canlib.vciInitialize() -except AttributeError: - # In case _canlib == None meaning we're not on win32/no lib found - pass -except Exception as e: - log.warning("Could not initialize IXXAT VCI library: %s", e) -# --------------------------------------------------------------------------- - - -CAN_INFO_MESSAGES = { - constants.CAN_INFO_START: "CAN started", - constants.CAN_INFO_STOP: "CAN stopped", - constants.CAN_INFO_RESET: "CAN reset", -} - -CAN_ERROR_MESSAGES = { - constants.CAN_ERROR_STUFF: "CAN bit stuff error", - constants.CAN_ERROR_FORM: "CAN form error", - constants.CAN_ERROR_ACK: "CAN acknowledgment error", - constants.CAN_ERROR_BIT: "CAN bit error", - constants.CAN_ERROR_CRC: "CAN CRC error", - constants.CAN_ERROR_OTHER: "Other (unknown) CAN error", -} - -CAN_STATUS_FLAGS = { - constants.CAN_STATUS_TXPEND: "transmission pending", - constants.CAN_STATUS_OVRRUN: "data overrun occurred", - constants.CAN_STATUS_ERRLIM: "error warning limit exceeded", - constants.CAN_STATUS_BUSOFF: "bus off", - constants.CAN_STATUS_ININIT: "init mode active", - constants.CAN_STATUS_BUSCERR: "bus coupling error", -} -# ---------------------------------------------------------------------------- +from typing import Optional class IXXATBus(BusABC): """The CAN Bus implemented for the IXXAT interface. - .. warning:: - - This interface does implement efficient filtering of messages, but - the filters have to be set in :meth:`~can.interfaces.ixxat.IXXATBus.__init__` - using the ``can_filters`` parameter. Using :meth:`~can.interfaces.ixxat.IXXATBus.set_filters` - does not work. + Based on the C implementation of IXXAT, two different dlls are provided by IXXAT, one to work with CAN, + the other with CAN-FD. + This class only delegates to related implementation (in calib_vcinpl or canlib_vcinpl2) class depending on fd user option. """ - CHANNEL_BITRATES = { - 0: { - 10000: constants.CAN_BT0_10KB, - 20000: constants.CAN_BT0_20KB, - 50000: constants.CAN_BT0_50KB, - 100000: constants.CAN_BT0_100KB, - 125000: constants.CAN_BT0_125KB, - 250000: constants.CAN_BT0_250KB, - 500000: constants.CAN_BT0_500KB, - 666000: constants.CAN_BT0_667KB, - 666666: constants.CAN_BT0_667KB, - 666667: constants.CAN_BT0_667KB, - 667000: constants.CAN_BT0_667KB, - 800000: constants.CAN_BT0_800KB, - 1000000: constants.CAN_BT0_1000KB, - }, - 1: { - 10000: constants.CAN_BT1_10KB, - 20000: constants.CAN_BT1_20KB, - 50000: constants.CAN_BT1_50KB, - 100000: constants.CAN_BT1_100KB, - 125000: constants.CAN_BT1_125KB, - 250000: constants.CAN_BT1_250KB, - 500000: constants.CAN_BT1_500KB, - 666000: constants.CAN_BT1_667KB, - 666666: constants.CAN_BT1_667KB, - 666667: constants.CAN_BT1_667KB, - 667000: constants.CAN_BT1_667KB, - 800000: constants.CAN_BT1_800KB, - 1000000: constants.CAN_BT1_1000KB, - }, - } - - def __init__(self, channel, can_filters=None, **kwargs): + def __init__( + self, + channel: int, + can_filters=None, + receive_own_messages: bool = False, + unique_hardware_id: Optional[int] = None, + extended: bool = True, + fd: bool = False, + rx_fifo_size: int = None, + tx_fifo_size: int = None, + bitrate: int = 500000, + data_bitrate: int = 2000000, + sjw_abr: int = None, + tseg1_abr: int = None, + tseg2_abr: int = None, + sjw_dbr: int = None, + tseg1_dbr: int = None, + tseg2_dbr: int = None, + ssp_dbr: int = None, + **kwargs + ): """ - :param int channel: + :param channel: The Channel id to create this bus with. - :param list can_filters: + :param can_filters: See :meth:`can.BusABC.set_filters`. - :param bool receive_own_messages: + :param receive_own_messages: Enable self-reception of sent messages. - :param int UniqueHardwareId: + :param unique_hardware_id: UniqueHardwareId to connect (optional, will use the first found if not supplied) - :param int bitrate: - Channel bitrate in bit/s - """ - if _canlib is None: - raise CanInterfaceNotImplementedError( - "The IXXAT VCI library has not been initialized. Check the logs for more details." - ) - log.info("CAN Filters: %s", can_filters) - log.info("Got configuration of: %s", kwargs) - # Configuration options - bitrate = kwargs.get("bitrate", 500000) - UniqueHardwareId = kwargs.get("UniqueHardwareId", None) - rxFifoSize = kwargs.get("rxFifoSize", 16) - txFifoSize = kwargs.get("txFifoSize", 16) - self._receive_own_messages = kwargs.get("receive_own_messages", False) - # Usually comes as a string from the config file - channel = int(channel) - - if bitrate not in self.CHANNEL_BITRATES[0]: - raise ValueError("Invalid bitrate {}".format(bitrate)) - - if rxFifoSize <= 0: - raise ValueError("rxFifoSize must be > 0") - - if txFifoSize <= 0: - raise ValueError("txFifoSize must be > 0") - - if channel < 0: - raise ValueError("channel number must be >= 0") - - self._device_handle = HANDLE() - self._device_info = structures.VCIDEVICEINFO() - self._control_handle = HANDLE() - self._channel_handle = HANDLE() - self._channel_capabilities = structures.CANCAPABILITIES() - self._message = structures.CANMSG() - self._payload = (ctypes.c_byte * 8)() - - # Search for supplied device - if UniqueHardwareId is None: - log.info("Searching for first available device") - else: - log.info("Searching for unique HW ID %s", UniqueHardwareId) - _canlib.vciEnumDeviceOpen(ctypes.byref(self._device_handle)) - while True: - try: - _canlib.vciEnumDeviceNext( - self._device_handle, ctypes.byref(self._device_info) - ) - except StopIteration: - if UniqueHardwareId is None: - raise VCIDeviceNotFoundError( - "No IXXAT device(s) connected or device(s) in use by other process(es)." - ) - else: - raise VCIDeviceNotFoundError( - "Unique HW ID {} not connected or not available.".format( - UniqueHardwareId - ) - ) - else: - if (UniqueHardwareId is None) or ( - self._device_info.UniqueHardwareId.AsChar - == bytes(UniqueHardwareId, "ascii") - ): - break - else: - log.debug( - "Ignoring IXXAT with hardware id '%s'.", - self._device_info.UniqueHardwareId.AsChar.decode("ascii"), - ) - _canlib.vciEnumDeviceClose(self._device_handle) + :param extended: + Default True, enables the capability to use extended IDs. - try: - _canlib.vciDeviceOpen( - ctypes.byref(self._device_info.VciObjectId), - ctypes.byref(self._device_handle), - ) - except Exception as exception: - raise CanInitializationError(f"Could not open device: {exception}") + :param fd: + Default False, enables CAN-FD usage. - log.info("Using unique HW ID %s", self._device_info.UniqueHardwareId.AsChar) + :param rx_fifo_size: + Receive fifo size (default 1024 for fd, else 16) - log.info( - "Initializing channel %d in shared mode, %d rx buffers, %d tx buffers", - channel, - rxFifoSize, - txFifoSize, - ) + :param tx_fifo_size: + Transmit fifo size (default 128 for fd, else 16) - try: - _canlib.canChannelOpen( - self._device_handle, - channel, - constants.FALSE, - ctypes.byref(self._channel_handle), - ) - except Exception as exception: - raise CanInitializationError( - f"Could not open and initialize channel: {exception}" - ) + :param bitrate: + Channel bitrate in bit/s - # Signal TX/RX events when at least one frame has been handled - _canlib.canChannelInitialize(self._channel_handle, rxFifoSize, 1, txFifoSize, 1) - _canlib.canChannelActivate(self._channel_handle, constants.TRUE) + :param data_bitrate: + Channel bitrate in bit/s (only in CAN-Fd if baudrate switch enabled). - log.info("Initializing control %d bitrate %d", channel, bitrate) - _canlib.canControlOpen( - self._device_handle, channel, ctypes.byref(self._control_handle) - ) - _canlib.canControlInitialize( - self._control_handle, - constants.CAN_OPMODE_STANDARD - | constants.CAN_OPMODE_EXTENDED - | constants.CAN_OPMODE_ERRFRAME, - self.CHANNEL_BITRATES[0][bitrate], - self.CHANNEL_BITRATES[1][bitrate], - ) - _canlib.canControlGetCaps( - self._control_handle, ctypes.byref(self._channel_capabilities) - ) + :param sjw_abr: + Bus timing value sample jump width (arbitration). Only takes effect with fd enabled. - # With receive messages, this field contains the relative reception time of - # the message in ticks. The resolution of a tick can be calculated from the fields - # dwClockFreq and dwTscDivisor of the structure CANCAPABILITIES in accordance with the following formula: - # frequency [1/s] = dwClockFreq / dwTscDivisor - # We explicitly cast to float for Python 2.x users - self._tick_resolution = float( - self._channel_capabilities.dwClockFreq - / self._channel_capabilities.dwTscDivisor - ) + :param tseg1_abr: + Bus timing value tseg1 (arbitration). Only takes effect with fd enabled. - # Setup filters before starting the channel - if can_filters: - log.info("The IXXAT VCI backend is filtering messages") - # Disable every message coming in - for extended in (0, 1): - _canlib.canControlSetAccFilter( - self._control_handle, - extended, - constants.CAN_ACC_CODE_NONE, - constants.CAN_ACC_MASK_NONE, - ) - for can_filter in can_filters: - # Filters define what messages are accepted - code = int(can_filter["can_id"]) - mask = int(can_filter["can_mask"]) - extended = can_filter.get("extended", False) - _canlib.canControlAddFilterIds( - self._control_handle, 1 if extended else 0, code << 1, mask << 1 - ) - log.info("Accepting ID: 0x%X MASK: 0x%X", code, mask) + :param tseg2_abr: + Bus timing value tseg2 (arbitration). Only takes effect with fd enabled. - # Start the CAN controller. Messages will be forwarded to the channel - _canlib.canControlStart(self._control_handle, constants.TRUE) + :param sjw_dbr: + Bus timing value sample jump width (data). Only takes effect with fd and baudrate switch enabled. - # For cyclic transmit list. Set when .send_periodic() is first called - self._scheduler = None - self._scheduler_resolution = None - self.channel = channel + :param tseg1_dbr: + Bus timing value tseg1 (data). Only takes effect with fd and bitrate switch enabled. - # Usually you get back 3 messages like "CAN initialized" ecc... - # Clear the FIFO by filter them out with low timeout - for _ in range(rxFifoSize): - try: - _canlib.canChannelReadMessage( - self._channel_handle, 0, ctypes.byref(self._message) - ) - except (VCITimeout, VCIRxQueueEmptyError): - break + :param tseg2_dbr: + Bus timing value tseg2 (data). Only takes effect with fd and bitrate switch enabled. - super().__init__(channel=channel, can_filters=None, **kwargs) + :param ssp_dbr: + Secondary sample point (data). Only takes effect with fd and bitrate switch enabled. - def _inWaiting(self): - try: - _canlib.canChannelWaitRxEvent(self._channel_handle, 0) - except VCITimeout: - return 0 + """ + if fd: + if rx_fifo_size is None: + rx_fifo_size = 1024 + if tx_fifo_size is None: + tx_fifo_size = 128 + self.bus = vcinpl2.IXXATBus( + channel=channel, + can_filters=can_filters, + receive_own_messages=receive_own_messages, + unique_hardware_id=unique_hardware_id, + extended=extended, + rx_fifo_size=rx_fifo_size, + tx_fifo_size=tx_fifo_size, + bitrate=bitrate, + data_bitrate=data_bitrate, + sjw_abr=sjw_abr, + tseg1_abr=tseg1_abr, + tseg2_abr=tseg2_abr, + sjw_dbr=sjw_dbr, + tseg1_dbr=tseg1_dbr, + tseg2_dbr=tseg2_dbr, + ssp_dbr=ssp_dbr, + **kwargs + ) else: - return 1 + if rx_fifo_size is None: + rx_fifo_size = 16 + if tx_fifo_size is None: + tx_fifo_size = 16 + self.bus = vcinpl.IXXATBus( + channel=channel, + can_filters=can_filters, + receive_own_messages=receive_own_messages, + unique_hardware_id=unique_hardware_id, + extended=extended, + rx_fifo_size=rx_fifo_size, + tx_fifo_size=tx_fifo_size, + bitrate=bitrate, + **kwargs + ) def flush_tx_buffer(self): """Flushes the transmit buffer on the IXXAT""" - # TODO #64: no timeout? - _canlib.canChannelWaitTxEvent(self._channel_handle, constants.INFINITE) + return self.bus.flush_tx_buffer() def _recv_internal(self, timeout): """Read a message from IXXAT device.""" - - # TODO: handling CAN error messages? - data_received = False - - if timeout == 0: - # Peek without waiting - try: - _canlib.canChannelPeekMessage( - self._channel_handle, ctypes.byref(self._message) - ) - except (VCITimeout, VCIRxQueueEmptyError): - return None, True - else: - if self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_DATA: - data_received = True - else: - # Wait if no message available - if timeout is None or timeout < 0: - remaining_ms = constants.INFINITE - t0 = None - else: - timeout_ms = int(timeout * 1000) - remaining_ms = timeout_ms - t0 = _timer_function() - - while True: - try: - _canlib.canChannelReadMessage( - self._channel_handle, remaining_ms, ctypes.byref(self._message) - ) - except (VCITimeout, VCIRxQueueEmptyError): - # Ignore the 2 errors, the timeout is handled manually with the _timer_function() - pass - else: - # See if we got a data or info/error messages - if self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_DATA: - data_received = True - break - elif self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_INFO: - log.info( - CAN_INFO_MESSAGES.get( - self._message.abData[0], - "Unknown CAN info message code {}".format( - self._message.abData[0] - ), - ) - ) - - elif ( - self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_ERROR - ): - log.warning( - CAN_ERROR_MESSAGES.get( - self._message.abData[0], - "Unknown CAN error message code {}".format( - self._message.abData[0] - ), - ) - ) - - elif ( - self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_STATUS - ): - log.info(_format_can_status(self._message.abData[0])) - if self._message.abData[0] & constants.CAN_STATUS_BUSOFF: - raise VCIBusOffError() - - elif ( - self._message.uMsgInfo.Bits.type - == constants.CAN_MSGTYPE_TIMEOVR - ): - pass - else: - log.warning("Unexpected message info type") - - if t0 is not None: - remaining_ms = timeout_ms - int((_timer_function() - t0) * 1000) - if remaining_ms < 0: - break - - if not data_received: - # Timed out / can message type is not DATA - return None, True - - # The _message.dwTime is a 32bit tick value and will overrun, - # so expect to see the value restarting from 0 - rx_msg = Message( - timestamp=self._message.dwTime - / self._tick_resolution, # Relative time in s - is_remote_frame=bool(self._message.uMsgInfo.Bits.rtr), - is_extended_id=bool(self._message.uMsgInfo.Bits.ext), - arbitration_id=self._message.dwMsgId, - dlc=self._message.uMsgInfo.Bits.dlc, - data=self._message.abData[: self._message.uMsgInfo.Bits.dlc], - channel=self.channel, - ) - - return rx_msg, True + return self.bus._recv_internal(timeout) def send(self, msg: Message, timeout: Optional[float] = None) -> None: - """ - Sends a message on the bus. The interface may buffer the message. - - :param msg: - The message to send. - :param timeout: - Timeout after some time. - :raise: - :class:CanTimeoutError - :class:CanOperationError - """ - # This system is not designed to be very efficient - message = structures.CANMSG() - message.uMsgInfo.Bits.type = constants.CAN_MSGTYPE_DATA - message.uMsgInfo.Bits.rtr = 1 if msg.is_remote_frame else 0 - message.uMsgInfo.Bits.ext = 1 if msg.is_extended_id else 0 - message.uMsgInfo.Bits.srr = 1 if self._receive_own_messages else 0 - message.dwMsgId = msg.arbitration_id - if msg.dlc: - message.uMsgInfo.Bits.dlc = msg.dlc - adapter = (ctypes.c_uint8 * len(msg.data)).from_buffer(msg.data) - ctypes.memmove(message.abData, adapter, len(msg.data)) - - if timeout: - _canlib.canChannelSendMessage( - self._channel_handle, int(timeout * 1000), message - ) - - else: - _canlib.canChannelPostMessage(self._channel_handle, message) + return self.bus.send(msg, timeout) def _send_periodic_internal(self, msg, period, duration=None): - """Send a message using built-in cyclic transmit list functionality.""" - if self._scheduler is None: - self._scheduler = HANDLE() - _canlib.canSchedulerOpen(self._device_handle, self.channel, self._scheduler) - caps = structures.CANCAPABILITIES() - _canlib.canSchedulerGetCaps(self._scheduler, caps) - self._scheduler_resolution = float(caps.dwClockFreq) / caps.dwCmsDivisor - _canlib.canSchedulerActivate(self._scheduler, constants.TRUE) - return CyclicSendTask( - self._scheduler, msg, period, duration, self._scheduler_resolution - ) + return self.bus._send_periodic_internal(msg, period, duration) def shutdown(self): - if self._scheduler is not None: - _canlib.canSchedulerClose(self._scheduler) - _canlib.canChannelClose(self._channel_handle) - _canlib.canControlStart(self._control_handle, constants.FALSE) - _canlib.canControlClose(self._control_handle) - _canlib.vciDeviceClose(self._device_handle) - - -class CyclicSendTask(LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC): - """A message in the cyclic transmit list.""" - - def __init__(self, scheduler, msgs, period, duration, resolution): - super().__init__(msgs, period, duration) - if len(self.messages) != 1: - raise ValueError( - "IXXAT Interface only supports periodic transmission of 1 element" - ) - - self._scheduler = scheduler - self._index = None - self._count = int(duration / period) if duration else 0 - - self._msg = structures.CANCYCLICTXMSG() - self._msg.wCycleTime = int(round(period * resolution)) - self._msg.dwMsgId = self.messages[0].arbitration_id - self._msg.uMsgInfo.Bits.type = constants.CAN_MSGTYPE_DATA - self._msg.uMsgInfo.Bits.ext = 1 if self.messages[0].is_extended_id else 0 - self._msg.uMsgInfo.Bits.rtr = 1 if self.messages[0].is_remote_frame else 0 - self._msg.uMsgInfo.Bits.dlc = self.messages[0].dlc - for i, b in enumerate(self.messages[0].data): - self._msg.abData[i] = b - self.start() - - def start(self): - """Start transmitting message (add to list if needed).""" - if self._index is None: - self._index = ctypes.c_uint32() - _canlib.canSchedulerAddMessage(self._scheduler, self._msg, self._index) - _canlib.canSchedulerStartMessage(self._scheduler, self._index, self._count) - - def pause(self): - """Pause transmitting message (keep it in the list).""" - _canlib.canSchedulerStopMessage(self._scheduler, self._index) - - def stop(self): - """Stop transmitting message (remove from list).""" - # Remove it completely instead of just stopping it to avoid filling up - # the list with permanently stopped messages - _canlib.canSchedulerRemMessage(self._scheduler, self._index) - self._index = None - - -def _format_can_status(status_flags: int): - """ - Format a status bitfield found in CAN_MSGTYPE_STATUS messages or in dwStatus - field in CANLINESTATUS. - - Valid states are defined in the CAN_STATUS_* constants in cantype.h - """ - states = [] - for flag, description in CAN_STATUS_FLAGS.items(): - if status_flags & flag: - states.append(description) - status_flags &= ~flag - - if status_flags: - states.append("unknown state 0x{:02x}".format(status_flags)) - - if states: - return "CAN status message: {}".format(", ".join(states)) - else: - return "Empty CAN status message" - - -def get_ixxat_hwids(): - """Get a list of hardware ids of all available IXXAT devices.""" - hwids = [] - device_handle = HANDLE() - device_info = structures.VCIDEVICEINFO() - - _canlib.vciEnumDeviceOpen(ctypes.byref(device_handle)) - while True: - try: - _canlib.vciEnumDeviceNext(device_handle, ctypes.byref(device_info)) - except StopIteration: - break - else: - hwids.append(device_info.UniqueHardwareId.AsChar.decode("ascii")) - _canlib.vciEnumDeviceClose(device_handle) - - return hwids + return self.bus.shutdown() diff --git a/can/interfaces/ixxat/canlib_vcinpl.py b/can/interfaces/ixxat/canlib_vcinpl.py new file mode 100644 index 000000000..cb0447b49 --- /dev/null +++ b/can/interfaces/ixxat/canlib_vcinpl.py @@ -0,0 +1,877 @@ +""" +Ctypes wrapper module for IXXAT Virtual CAN Interface V3 on win32 systems + +TODO: We could implement this interface such that setting other filters + could work when the initial filters were set to zero using the + software fallback. Or could the software filters even be changed + after the connection was opened? We need to document that bahaviour! + See also the NICAN interface. + +""" + +import ctypes +import functools +import logging +import sys +from typing import Optional, Callable, Tuple + +from can import BusABC, Message +from can.exceptions import CanInterfaceNotImplementedError, CanInitializationError +from can.broadcastmanager import ( + LimitedDurationCyclicSendTaskABC, + RestartableCyclicTaskABC, +) +from can.ctypesutil import CLibrary, HANDLE, PHANDLE, HRESULT as ctypes_HRESULT +from can.util import deprecated_args_alias + +from . import constants, structures +from .exceptions import * + +__all__ = [ + "VCITimeout", + "VCIError", + "VCIBusOffError", + "VCIDeviceNotFoundError", + "IXXATBus", + "vciFormatError", +] + +log = logging.getLogger("can.ixxat") + +from time import perf_counter + +# Hack to have vciFormatError as a free function, see below +vciFormatError = None + +# main ctypes instance +_canlib = None +# TODO: Use ECI driver for linux +if sys.platform == "win32" or sys.platform == "cygwin": + try: + _canlib = CLibrary("vcinpl.dll") + except Exception as e: + log.warning("Cannot load IXXAT vcinpl library: %s", e) +else: + # Will not work on other systems, but have it importable anyway for + # tests/sphinx + log.warning("IXXAT VCI library does not work on %s platform", sys.platform) + + +def __vciFormatErrorExtended( + library_instance: CLibrary, function: Callable, vret: int, args: Tuple +): + """Format a VCI error and attach failed function, decoded HRESULT and arguments + :param CLibrary library_instance: + Mapped instance of IXXAT vcinpl library + :param callable function: + Failed function + :param HRESULT vret: + HRESULT returned by vcinpl call + :param args: + Arbitrary arguments tuple + :return: + Formatted string + """ + # TODO: make sure we don't generate another exception + return "{} - arguments were {}".format( + __vciFormatError(library_instance, function, vret), args + ) + + +def __vciFormatError(library_instance: CLibrary, function: Callable, vret: int): + """Format a VCI error and attach failed function and decoded HRESULT + :param CLibrary library_instance: + Mapped instance of IXXAT vcinpl library + :param callable function: + Failed function + :param HRESULT vret: + HRESULT returned by vcinpl call + :return: + Formatted string + """ + buf = ctypes.create_string_buffer(constants.VCI_MAX_ERRSTRLEN) + ctypes.memset(buf, 0, constants.VCI_MAX_ERRSTRLEN) + library_instance.vciFormatError(vret, buf, constants.VCI_MAX_ERRSTRLEN) + return "function {} failed ({})".format( + function._name, buf.value.decode("utf-8", "replace") + ) + + +def __check_status(result, function, args): + """ + Check the result of a vcinpl function call and raise appropriate exception + in case of an error. Used as errcheck function when mapping C functions + with ctypes. + :param result: + Function call numeric result + :param callable function: + Called function + :param args: + Arbitrary arguments tuple + :raise: + :class:VCITimeout + :class:VCIRxQueueEmptyError + :class:StopIteration + :class:VCIError + """ + if isinstance(result, int): + # Real return value is an unsigned long, the following line converts the number to unsigned + result = ctypes.c_ulong(result).value + + if result == constants.VCI_E_TIMEOUT: + raise VCITimeout("Function {} timed out".format(function._name)) + elif result == constants.VCI_E_RXQUEUE_EMPTY: + raise VCIRxQueueEmptyError() + elif result == constants.VCI_E_NO_MORE_ITEMS: + raise StopIteration() + elif result == constants.VCI_E_ACCESSDENIED: + pass # not a real error, might happen if another program has initialized the bus + elif result != constants.VCI_OK: + raise VCIError(vciFormatError(function, result)) + + return result + + +try: + # Map all required symbols and initialize library --------------------------- + # HRESULT VCIAPI vciInitialize ( void ); + _canlib.map_symbol("vciInitialize", ctypes.c_long, (), __check_status) + + # void VCIAPI vciFormatError (HRESULT hrError, PCHAR pszText, UINT32 dwsize); + _canlib.map_symbol( + "vciFormatError", None, (ctypes_HRESULT, ctypes.c_char_p, ctypes.c_uint32) + ) + # Hack to have vciFormatError as a free function + vciFormatError = functools.partial(__vciFormatError, _canlib) + + # HRESULT VCIAPI vciEnumDeviceOpen( OUT PHANDLE hEnum ); + _canlib.map_symbol("vciEnumDeviceOpen", ctypes.c_long, (PHANDLE,), __check_status) + # HRESULT VCIAPI vciEnumDeviceClose ( IN HANDLE hEnum ); + _canlib.map_symbol("vciEnumDeviceClose", ctypes.c_long, (HANDLE,), __check_status) + # HRESULT VCIAPI vciEnumDeviceNext( IN HANDLE hEnum, OUT PVCIDEVICEINFO pInfo ); + _canlib.map_symbol( + "vciEnumDeviceNext", + ctypes.c_long, + (HANDLE, structures.PVCIDEVICEINFO), + __check_status, + ) + + # HRESULT VCIAPI vciDeviceOpen( IN REFVCIID rVciid, OUT PHANDLE phDevice ); + _canlib.map_symbol( + "vciDeviceOpen", ctypes.c_long, (structures.PVCIID, PHANDLE), __check_status + ) + # HRESULT vciDeviceClose( HANDLE hDevice ) + _canlib.map_symbol("vciDeviceClose", ctypes.c_long, (HANDLE,), __check_status) + + # HRESULT VCIAPI canChannelOpen( IN HANDLE hDevice, IN UINT32 dwCanNo, IN BOOL fExclusive, OUT PHANDLE phCanChn ); + _canlib.map_symbol( + "canChannelOpen", + ctypes.c_long, + (HANDLE, ctypes.c_uint32, ctypes.c_long, PHANDLE), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI canChannelInitialize( IN HANDLE hCanChn, IN UINT16 wRxFifoSize, IN UINT16 wRxThreshold, IN UINT16 wTxFifoSize, IN UINT16 wTxThreshold ); + _canlib.map_symbol( + "canChannelInitialize", + ctypes.c_long, + (HANDLE, ctypes.c_uint16, ctypes.c_uint16, ctypes.c_uint16, ctypes.c_uint16), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI canChannelActivate( IN HANDLE hCanChn, IN BOOL fEnable ); + _canlib.map_symbol( + "canChannelActivate", ctypes.c_long, (HANDLE, ctypes.c_long), __check_status + ) + # HRESULT canChannelClose( HANDLE hChannel ) + _canlib.map_symbol("canChannelClose", ctypes.c_long, (HANDLE,), __check_status) + # EXTERN_C HRESULT VCIAPI canChannelReadMessage( IN HANDLE hCanChn, IN UINT32 dwMsTimeout, OUT PCANMSG pCanMsg ); + _canlib.map_symbol( + "canChannelReadMessage", + ctypes.c_long, + (HANDLE, ctypes.c_uint32, structures.PCANMSG), + __check_status, + ) + # HRESULT canChannelPeekMessage(HANDLE hChannel,PCANMSG pCanMsg ); + _canlib.map_symbol( + "canChannelPeekMessage", + ctypes.c_long, + (HANDLE, structures.PCANMSG), + __check_status, + ) + # HRESULT canChannelWaitTxEvent (HANDLE hChannel UINT32 dwMsTimeout ); + _canlib.map_symbol( + "canChannelWaitTxEvent", + ctypes.c_long, + (HANDLE, ctypes.c_uint32), + __check_status, + ) + # HRESULT canChannelWaitRxEvent (HANDLE hChannel, UINT32 dwMsTimeout ); + _canlib.map_symbol( + "canChannelWaitRxEvent", + ctypes.c_long, + (HANDLE, ctypes.c_uint32), + __check_status, + ) + # HRESULT canChannelPostMessage (HANDLE hChannel, PCANMSG pCanMsg ); + _canlib.map_symbol( + "canChannelPostMessage", + ctypes.c_long, + (HANDLE, structures.PCANMSG), + __check_status, + ) + # HRESULT canChannelSendMessage (HANDLE hChannel, UINT32 dwMsTimeout, PCANMSG pCanMsg ); + _canlib.map_symbol( + "canChannelSendMessage", + ctypes.c_long, + (HANDLE, ctypes.c_uint32, structures.PCANMSG), + __check_status, + ) + + # EXTERN_C HRESULT VCIAPI canControlOpen( IN HANDLE hDevice, IN UINT32 dwCanNo, OUT PHANDLE phCanCtl ); + _canlib.map_symbol( + "canControlOpen", + ctypes.c_long, + (HANDLE, ctypes.c_uint32, PHANDLE), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI canControlInitialize( IN HANDLE hCanCtl, IN UINT8 bMode, IN UINT8 bBtr0, IN UINT8 bBtr1 ); + _canlib.map_symbol( + "canControlInitialize", + ctypes.c_long, + (HANDLE, ctypes.c_uint8, ctypes.c_uint8, ctypes.c_uint8), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI canControlClose( IN HANDLE hCanCtl ); + _canlib.map_symbol("canControlClose", ctypes.c_long, (HANDLE,), __check_status) + # EXTERN_C HRESULT VCIAPI canControlReset( IN HANDLE hCanCtl ); + _canlib.map_symbol("canControlReset", ctypes.c_long, (HANDLE,), __check_status) + # EXTERN_C HRESULT VCIAPI canControlStart( IN HANDLE hCanCtl, IN BOOL fStart ); + _canlib.map_symbol( + "canControlStart", ctypes.c_long, (HANDLE, ctypes.c_long), __check_status + ) + # EXTERN_C HRESULT VCIAPI canControlGetStatus( IN HANDLE hCanCtl, OUT PCANLINESTATUS pStatus ); + _canlib.map_symbol( + "canControlGetStatus", + ctypes.c_long, + (HANDLE, structures.PCANLINESTATUS), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI canControlGetCaps( IN HANDLE hCanCtl, OUT PCANCAPABILITIES pCanCaps ); + _canlib.map_symbol( + "canControlGetCaps", + ctypes.c_long, + (HANDLE, structures.PCANCAPABILITIES), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI canControlSetAccFilter( IN HANDLE hCanCtl, IN BOOL fExtend, IN UINT32 dwCode, IN UINT32 dwMask ); + _canlib.map_symbol( + "canControlSetAccFilter", + ctypes.c_long, + (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32), + __check_status, + ) + # EXTERN_C HRESULT canControlAddFilterIds (HANDLE hControl, BOOL fExtended, UINT32 dwCode, UINT32 dwMask); + _canlib.map_symbol( + "canControlAddFilterIds", + ctypes.c_long, + (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32), + __check_status, + ) + # EXTERN_C HRESULT canControlRemFilterIds (HANDLE hControl, BOOL fExtendend, UINT32 dwCode, UINT32 dwMask ); + _canlib.map_symbol( + "canControlRemFilterIds", + ctypes.c_long, + (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerOpen (HANDLE hDevice, UINT32 dwCanNo, PHANDLE phScheduler ); + _canlib.map_symbol( + "canSchedulerOpen", + ctypes.c_long, + (HANDLE, ctypes.c_uint32, PHANDLE), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerClose (HANDLE hScheduler ); + _canlib.map_symbol("canSchedulerClose", ctypes.c_long, (HANDLE,), __check_status) + # EXTERN_C HRESULT canSchedulerGetCaps (HANDLE hScheduler, PCANCAPABILITIES pCaps ); + _canlib.map_symbol( + "canSchedulerGetCaps", + ctypes.c_long, + (HANDLE, structures.PCANCAPABILITIES), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerActivate ( HANDLE hScheduler, BOOL fEnable ); + _canlib.map_symbol( + "canSchedulerActivate", ctypes.c_long, (HANDLE, ctypes.c_int), __check_status + ) + # EXTERN_C HRESULT canSchedulerAddMessage (HANDLE hScheduler, PCANCYCLICTXMSG pMessage, PUINT32 pdwIndex ); + _canlib.map_symbol( + "canSchedulerAddMessage", + ctypes.c_long, + (HANDLE, structures.PCANCYCLICTXMSG, ctypes.POINTER(ctypes.c_uint32)), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerRemMessage (HANDLE hScheduler, UINT32 dwIndex ); + _canlib.map_symbol( + "canSchedulerRemMessage", + ctypes.c_long, + (HANDLE, ctypes.c_uint32), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerStartMessage (HANDLE hScheduler, UINT32 dwIndex, UINT16 dwCount ); + _canlib.map_symbol( + "canSchedulerStartMessage", + ctypes.c_long, + (HANDLE, ctypes.c_uint32, ctypes.c_uint16), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerStopMessage (HANDLE hScheduler, UINT32 dwIndex ); + _canlib.map_symbol( + "canSchedulerStopMessage", + ctypes.c_long, + (HANDLE, ctypes.c_uint32), + __check_status, + ) + _canlib.vciInitialize() +except AttributeError: + # In case _canlib == None meaning we're not on win32/no lib found + pass +except Exception as e: + log.warning("Could not initialize IXXAT VCI library: %s", e) +# --------------------------------------------------------------------------- + + +CAN_INFO_MESSAGES = { + constants.CAN_INFO_START: "CAN started", + constants.CAN_INFO_STOP: "CAN stopped", + constants.CAN_INFO_RESET: "CAN reset", +} + +CAN_ERROR_MESSAGES = { + constants.CAN_ERROR_STUFF: "CAN bit stuff error", + constants.CAN_ERROR_FORM: "CAN form error", + constants.CAN_ERROR_ACK: "CAN acknowledgment error", + constants.CAN_ERROR_BIT: "CAN bit error", + constants.CAN_ERROR_CRC: "CAN CRC error", + constants.CAN_ERROR_OTHER: "Other (unknown) CAN error", +} + +CAN_STATUS_FLAGS = { + constants.CAN_STATUS_TXPEND: "transmission pending", + constants.CAN_STATUS_OVRRUN: "data overrun occurred", + constants.CAN_STATUS_ERRLIM: "error warning limit exceeded", + constants.CAN_STATUS_BUSOFF: "bus off", + constants.CAN_STATUS_ININIT: "init mode active", + constants.CAN_STATUS_BUSCERR: "bus coupling error", +} +# ---------------------------------------------------------------------------- + + +class IXXATBus(BusABC): + """The CAN Bus implemented for the IXXAT interface. + + .. warning:: + + This interface does implement efficient filtering of messages, but + the filters have to be set in :meth:`~can.interfaces.ixxat.IXXATBus.__init__` + using the ``can_filters`` parameter. Using :meth:`~can.interfaces.ixxat.IXXATBus.set_filters` + does not work. + + """ + + CHANNEL_BITRATES = { + 0: { + 10000: constants.CAN_BT0_10KB, + 20000: constants.CAN_BT0_20KB, + 50000: constants.CAN_BT0_50KB, + 100000: constants.CAN_BT0_100KB, + 125000: constants.CAN_BT0_125KB, + 250000: constants.CAN_BT0_250KB, + 500000: constants.CAN_BT0_500KB, + 666000: constants.CAN_BT0_667KB, + 666666: constants.CAN_BT0_667KB, + 666667: constants.CAN_BT0_667KB, + 667000: constants.CAN_BT0_667KB, + 800000: constants.CAN_BT0_800KB, + 1000000: constants.CAN_BT0_1000KB, + }, + 1: { + 10000: constants.CAN_BT1_10KB, + 20000: constants.CAN_BT1_20KB, + 50000: constants.CAN_BT1_50KB, + 100000: constants.CAN_BT1_100KB, + 125000: constants.CAN_BT1_125KB, + 250000: constants.CAN_BT1_250KB, + 500000: constants.CAN_BT1_500KB, + 666000: constants.CAN_BT1_667KB, + 666666: constants.CAN_BT1_667KB, + 666667: constants.CAN_BT1_667KB, + 667000: constants.CAN_BT1_667KB, + 800000: constants.CAN_BT1_800KB, + 1000000: constants.CAN_BT1_1000KB, + }, + } + + @deprecated_args_alias( + UniqueHardwareId="unique_hardware_id", + rxFifoSize="rx_fifo_size", + txFifoSize="tx_fifo_size", + ) + def __init__( + self, + channel: int, + can_filters=None, + receive_own_messages: bool = False, + unique_hardware_id: Optional[int] = None, + extended: bool = True, + rx_fifo_size: int = 16, + tx_fifo_size: int = 16, + bitrate: int = 500000, + **kwargs, + ): + """ + :param channel: + The Channel id to create this bus with. + + :param can_filters: + See :meth:`can.BusABC.set_filters`. + + :param receive_own_messages: + Enable self-reception of sent messages. + + :param unique_hardware_id: + unique_hardware_id to connect (optional, will use the first found if not supplied) + + :param extended: + Default True, enables the capability to use extended IDs. + + :param rx_fifo_size: + Receive fifo size (default 16) + + :param tx_fifo_size: + Transmit fifo size (default 16) + + :param bitrate: + Channel bitrate in bit/s + """ + if _canlib is None: + raise CanInterfaceNotImplementedError( + "The IXXAT VCI library has not been initialized. Check the logs for more details." + ) + log.info("CAN Filters: %s", can_filters) + # Configuration options + self._receive_own_messages = receive_own_messages + # Usually comes as a string from the config file + channel = int(channel) + + if bitrate not in self.CHANNEL_BITRATES[0]: + raise ValueError("Invalid bitrate {}".format(bitrate)) + + if rx_fifo_size <= 0: + raise ValueError("rx_fifo_size must be > 0") + + if tx_fifo_size <= 0: + raise ValueError("tx_fifo_size must be > 0") + + if channel < 0: + raise ValueError("channel number must be >= 0") + + self._device_handle = HANDLE() + self._device_info = structures.VCIDEVICEINFO() + self._control_handle = HANDLE() + self._channel_handle = HANDLE() + self._channel_capabilities = structures.CANCAPABILITIES() + self._message = structures.CANMSG() + self._payload = (ctypes.c_byte * 8)() + + # Search for supplied device + if unique_hardware_id is None: + log.info("Searching for first available device") + else: + log.info("Searching for unique HW ID %s", unique_hardware_id) + _canlib.vciEnumDeviceOpen(ctypes.byref(self._device_handle)) + while True: + try: + _canlib.vciEnumDeviceNext( + self._device_handle, ctypes.byref(self._device_info) + ) + except StopIteration: + if unique_hardware_id is None: + raise VCIDeviceNotFoundError( + "No IXXAT device(s) connected or device(s) in use by other process(es)." + ) + else: + raise VCIDeviceNotFoundError( + "Unique HW ID {} not connected or not available.".format( + unique_hardware_id + ) + ) + else: + if (unique_hardware_id is None) or ( + self._device_info.UniqueHardwareId.AsChar + == bytes(unique_hardware_id, "ascii") + ): + break + else: + log.debug( + "Ignoring IXXAT with hardware id '%s'.", + self._device_info.UniqueHardwareId.AsChar.decode("ascii"), + ) + _canlib.vciEnumDeviceClose(self._device_handle) + + try: + _canlib.vciDeviceOpen( + ctypes.byref(self._device_info.VciObjectId), + ctypes.byref(self._device_handle), + ) + except Exception as exception: + raise CanInitializationError(f"Could not open device: {exception}") + + log.info("Using unique HW ID %s", self._device_info.UniqueHardwareId.AsChar) + + log.info( + "Initializing channel %d in shared mode, %d rx buffers, %d tx buffers", + channel, + rx_fifo_size, + tx_fifo_size, + ) + + try: + _canlib.canChannelOpen( + self._device_handle, + channel, + constants.FALSE, + ctypes.byref(self._channel_handle), + ) + except Exception as exception: + raise CanInitializationError( + f"Could not open and initialize channel: {exception}" + ) + + # Signal TX/RX events when at least one frame has been handled + _canlib.canChannelInitialize( + self._channel_handle, rx_fifo_size, 1, tx_fifo_size, 1 + ) + _canlib.canChannelActivate(self._channel_handle, constants.TRUE) + + log.info("Initializing control %d bitrate %d", channel, bitrate) + _canlib.canControlOpen( + self._device_handle, channel, ctypes.byref(self._control_handle) + ) + + # compute opmode before control initialize + opmode = constants.CAN_OPMODE_STANDARD | constants.CAN_OPMODE_ERRFRAME + if extended: + opmode |= constants.CAN_OPMODE_EXTENDED + + # control initialize + _canlib.canControlInitialize( + self._control_handle, + opmode, + self.CHANNEL_BITRATES[0][bitrate], + self.CHANNEL_BITRATES[1][bitrate], + ) + _canlib.canControlGetCaps( + self._control_handle, ctypes.byref(self._channel_capabilities) + ) + + # With receive messages, this field contains the relative reception time of + # the message in ticks. The resolution of a tick can be calculated from the fields + # dwClockFreq and dwTscDivisor of the structure CANCAPABILITIES in accordance with the following formula: + # frequency [1/s] = dwClockFreq / dwTscDivisor + self._tick_resolution = ( + self._channel_capabilities.dwClockFreq + / self._channel_capabilities.dwTscDivisor + ) + + # Setup filters before starting the channel + if can_filters: + log.info("The IXXAT VCI backend is filtering messages") + # Disable every message coming in + for extended in (0, 1): + _canlib.canControlSetAccFilter( + self._control_handle, + extended, + constants.CAN_ACC_CODE_NONE, + constants.CAN_ACC_MASK_NONE, + ) + for can_filter in can_filters: + # Filters define what messages are accepted + code = int(can_filter["can_id"]) + mask = int(can_filter["can_mask"]) + extended = can_filter.get("extended", False) + _canlib.canControlAddFilterIds( + self._control_handle, 1 if extended else 0, code << 1, mask << 1 + ) + log.info("Accepting ID: 0x%X MASK: 0x%X", code, mask) + + # Start the CAN controller. Messages will be forwarded to the channel + _canlib.canControlStart(self._control_handle, constants.TRUE) + + # For cyclic transmit list. Set when .send_periodic() is first called + self._scheduler = None + self._scheduler_resolution = None + self.channel = channel + + # Usually you get back 3 messages like "CAN initialized" ecc... + # Clear the FIFO by filter them out with low timeout + for _ in range(rx_fifo_size): + try: + _canlib.canChannelReadMessage( + self._channel_handle, 0, ctypes.byref(self._message) + ) + except (VCITimeout, VCIRxQueueEmptyError): + break + + super().__init__(channel=channel, can_filters=None, **kwargs) + + def _inWaiting(self): + try: + _canlib.canChannelWaitRxEvent(self._channel_handle, 0) + except VCITimeout: + return 0 + else: + return 1 + + def flush_tx_buffer(self): + """Flushes the transmit buffer on the IXXAT""" + # TODO #64: no timeout? + _canlib.canChannelWaitTxEvent(self._channel_handle, constants.INFINITE) + + def _recv_internal(self, timeout): + """Read a message from IXXAT device.""" + + # TODO: handling CAN error messages? + data_received = False + + if timeout == 0: + # Peek without waiting + try: + _canlib.canChannelPeekMessage( + self._channel_handle, ctypes.byref(self._message) + ) + except (VCITimeout, VCIRxQueueEmptyError): + return None, True + else: + if self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_DATA: + data_received = True + else: + # Wait if no message available + if timeout is None or timeout < 0: + remaining_ms = constants.INFINITE + t0 = None + else: + timeout_ms = int(timeout * 1000) + remaining_ms = timeout_ms + t0 = perf_counter() + + while True: + try: + _canlib.canChannelReadMessage( + self._channel_handle, remaining_ms, ctypes.byref(self._message) + ) + except (VCITimeout, VCIRxQueueEmptyError): + # Ignore the 2 errors, the timeout is handled manually with the perf_counter() + pass + else: + # See if we got a data or info/error messages + if self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_DATA: + data_received = True + break + elif self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_INFO: + log.info( + CAN_INFO_MESSAGES.get( + self._message.abData[0], + "Unknown CAN info message code {}".format( + self._message.abData[0] + ), + ) + ) + + elif ( + self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_ERROR + ): + log.warning( + CAN_ERROR_MESSAGES.get( + self._message.abData[0], + "Unknown CAN error message code {}".format( + self._message.abData[0] + ), + ) + ) + + elif ( + self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_STATUS + ): + log.info(_format_can_status(self._message.abData[0])) + if self._message.abData[0] & constants.CAN_STATUS_BUSOFF: + raise VCIBusOffError() + + elif ( + self._message.uMsgInfo.Bits.type + == constants.CAN_MSGTYPE_TIMEOVR + ): + pass + else: + log.warning("Unexpected message info type") + + if t0 is not None: + remaining_ms = timeout_ms - int((perf_counter() - t0) * 1000) + if remaining_ms < 0: + break + + if not data_received: + # Timed out / can message type is not DATA + return None, True + + # The _message.dwTime is a 32bit tick value and will overrun, + # so expect to see the value restarting from 0 + rx_msg = Message( + timestamp=self._message.dwTime + / self._tick_resolution, # Relative time in s + is_remote_frame=bool(self._message.uMsgInfo.Bits.rtr), + is_extended_id=bool(self._message.uMsgInfo.Bits.ext), + arbitration_id=self._message.dwMsgId, + dlc=self._message.uMsgInfo.Bits.dlc, + data=self._message.abData[: self._message.uMsgInfo.Bits.dlc], + channel=self.channel, + ) + + return rx_msg, True + + def send(self, msg: Message, timeout: Optional[float] = None) -> None: + """ + Sends a message on the bus. The interface may buffer the message. + + :param msg: + The message to send. + :param timeout: + Timeout after some time. + :raise: + :class:CanTimeoutError + :class:CanOperationError + """ + # This system is not designed to be very efficient + message = structures.CANMSG() + message.uMsgInfo.Bits.type = constants.CAN_MSGTYPE_DATA + message.uMsgInfo.Bits.rtr = 1 if msg.is_remote_frame else 0 + message.uMsgInfo.Bits.ext = 1 if msg.is_extended_id else 0 + message.uMsgInfo.Bits.srr = 1 if self._receive_own_messages else 0 + message.dwMsgId = msg.arbitration_id + if msg.dlc: + message.uMsgInfo.Bits.dlc = msg.dlc + adapter = (ctypes.c_uint8 * len(msg.data)).from_buffer(msg.data) + ctypes.memmove(message.abData, adapter, len(msg.data)) + + if timeout: + _canlib.canChannelSendMessage( + self._channel_handle, int(timeout * 1000), message + ) + + else: + _canlib.canChannelPostMessage(self._channel_handle, message) + + def _send_periodic_internal(self, msg, period, duration=None): + """Send a message using built-in cyclic transmit list functionality.""" + if self._scheduler is None: + self._scheduler = HANDLE() + _canlib.canSchedulerOpen(self._device_handle, self.channel, self._scheduler) + caps = structures.CANCAPABILITIES() + _canlib.canSchedulerGetCaps(self._scheduler, caps) + self._scheduler_resolution = caps.dwClockFreq / caps.dwCmsDivisor + _canlib.canSchedulerActivate(self._scheduler, constants.TRUE) + return CyclicSendTask( + self._scheduler, msg, period, duration, self._scheduler_resolution + ) + + def shutdown(self): + if self._scheduler is not None: + _canlib.canSchedulerClose(self._scheduler) + _canlib.canChannelClose(self._channel_handle) + _canlib.canControlStart(self._control_handle, constants.FALSE) + _canlib.canControlClose(self._control_handle) + _canlib.vciDeviceClose(self._device_handle) + + +class CyclicSendTask(LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC): + """A message in the cyclic transmit list.""" + + def __init__(self, scheduler, msgs, period, duration, resolution): + super().__init__(msgs, period, duration) + if len(self.messages) != 1: + raise ValueError( + "IXXAT Interface only supports periodic transmission of 1 element" + ) + + self._scheduler = scheduler + self._index = None + self._count = int(duration / period) if duration else 0 + + self._msg = structures.CANCYCLICTXMSG() + self._msg.wCycleTime = int(round(period * resolution)) + self._msg.dwMsgId = self.messages[0].arbitration_id + self._msg.uMsgInfo.Bits.type = constants.CAN_MSGTYPE_DATA + self._msg.uMsgInfo.Bits.ext = 1 if self.messages[0].is_extended_id else 0 + self._msg.uMsgInfo.Bits.rtr = 1 if self.messages[0].is_remote_frame else 0 + self._msg.uMsgInfo.Bits.dlc = self.messages[0].dlc + for i, b in enumerate(self.messages[0].data): + self._msg.abData[i] = b + self.start() + + def start(self): + """Start transmitting message (add to list if needed).""" + if self._index is None: + self._index = ctypes.c_uint32() + _canlib.canSchedulerAddMessage(self._scheduler, self._msg, self._index) + _canlib.canSchedulerStartMessage(self._scheduler, self._index, self._count) + + def pause(self): + """Pause transmitting message (keep it in the list).""" + _canlib.canSchedulerStopMessage(self._scheduler, self._index) + + def stop(self): + """Stop transmitting message (remove from list).""" + # Remove it completely instead of just stopping it to avoid filling up + # the list with permanently stopped messages + _canlib.canSchedulerRemMessage(self._scheduler, self._index) + self._index = None + + +def _format_can_status(status_flags: int): + """ + Format a status bitfield found in CAN_MSGTYPE_STATUS messages or in dwStatus + field in CANLINESTATUS. + + Valid states are defined in the CAN_STATUS_* constants in cantype.h + """ + states = [] + for flag, description in CAN_STATUS_FLAGS.items(): + if status_flags & flag: + states.append(description) + status_flags &= ~flag + + if status_flags: + states.append("unknown state 0x{:02x}".format(status_flags)) + + if states: + return "CAN status message: {}".format(", ".join(states)) + else: + return "Empty CAN status message" + + +def get_ixxat_hwids(): + """Get a list of hardware ids of all available IXXAT devices.""" + hwids = [] + device_handle = HANDLE() + device_info = structures.VCIDEVICEINFO() + + _canlib.vciEnumDeviceOpen(ctypes.byref(device_handle)) + while True: + try: + _canlib.vciEnumDeviceNext(device_handle, ctypes.byref(device_info)) + except StopIteration: + break + else: + hwids.append(device_info.UniqueHardwareId.AsChar.decode("ascii")) + _canlib.vciEnumDeviceClose(device_handle) + + return hwids diff --git a/can/interfaces/ixxat/canlib_vcinpl2.py b/can/interfaces/ixxat/canlib_vcinpl2.py new file mode 100644 index 000000000..37085d74a --- /dev/null +++ b/can/interfaces/ixxat/canlib_vcinpl2.py @@ -0,0 +1,1043 @@ +""" +Ctypes wrapper module for IXXAT Virtual CAN Interface V3 on win32 systems + +TODO: We could implement this interface such that setting other filters + could work when the initial filters were set to zero using the + software fallback. Or could the software filters even be changed + after the connection was opened? We need to document that bahaviour! + See also the NICAN interface. + +""" + +import ctypes +import functools +import logging +import sys +from typing import Optional, Callable, Tuple + +from can import BusABC, Message +from can.exceptions import CanInterfaceNotImplementedError, CanInitializationError +from can.broadcastmanager import ( + LimitedDurationCyclicSendTaskABC, + RestartableCyclicTaskABC, +) +from can.ctypesutil import CLibrary, HANDLE, PHANDLE, HRESULT as ctypes_HRESULT + +import can.util +from can.util import deprecated_args_alias + +from . import constants, structures +from .exceptions import * + +__all__ = [ + "VCITimeout", + "VCIError", + "VCIBusOffError", + "VCIDeviceNotFoundError", + "IXXATBus", + "vciFormatError", +] + +log = logging.getLogger("can.ixxat") + +from time import perf_counter + +# Hack to have vciFormatError as a free function, see below +vciFormatError = None + +# main ctypes instance +_canlib = None +# TODO: Use ECI driver for linux +if sys.platform == "win32" or sys.platform == "cygwin": + try: + _canlib = CLibrary("vcinpl2.dll") + except Exception as e: + log.warning("Cannot load IXXAT vcinpl library: %s", e) +else: + # Will not work on other systems, but have it importable anyway for + # tests/sphinx + log.warning("IXXAT VCI library does not work on %s platform", sys.platform) + + +def __vciFormatErrorExtended( + library_instance: CLibrary, function: Callable, vret: int, args: Tuple +): + """Format a VCI error and attach failed function, decoded HRESULT and arguments + :param CLibrary library_instance: + Mapped instance of IXXAT vcinpl library + :param callable function: + Failed function + :param HRESULT vret: + HRESULT returned by vcinpl call + :param args: + Arbitrary arguments tuple + :return: + Formatted string + """ + # TODO: make sure we don't generate another exception + return "{} - arguments were {}".format( + __vciFormatError(library_instance, function, vret), args + ) + + +def __vciFormatError(library_instance: CLibrary, function: Callable, vret: int): + """Format a VCI error and attach failed function and decoded HRESULT + :param CLibrary library_instance: + Mapped instance of IXXAT vcinpl library + :param callable function: + Failed function + :param HRESULT vret: + HRESULT returned by vcinpl call + :return: + Formatted string + """ + buf = ctypes.create_string_buffer(constants.VCI_MAX_ERRSTRLEN) + ctypes.memset(buf, 0, constants.VCI_MAX_ERRSTRLEN) + library_instance.vciFormatError(vret, buf, constants.VCI_MAX_ERRSTRLEN) + return "function {} failed ({})".format( + function._name, buf.value.decode("utf-8", "replace") + ) + + +def __check_status(result, function, args): + """ + Check the result of a vcinpl function call and raise appropriate exception + in case of an error. Used as errcheck function when mapping C functions + with ctypes. + :param result: + Function call numeric result + :param callable function: + Called function + :param args: + Arbitrary arguments tuple + :raise: + :class:VCITimeout + :class:VCIRxQueueEmptyError + :class:StopIteration + :class:VCIError + """ + if result == constants.VCI_E_TIMEOUT: + raise VCITimeout("Function {} timed out".format(function._name)) + elif result == constants.VCI_E_RXQUEUE_EMPTY: + raise VCIRxQueueEmptyError() + elif result == constants.VCI_E_NO_MORE_ITEMS: + raise StopIteration() + elif result == constants.VCI_E_ACCESSDENIED: + pass # not a real error, might happen if another program has initialized the bus + elif result != constants.VCI_OK: + raise VCIError(vciFormatError(function, result)) + + return result + + +try: + hresult_type = ctypes.c_ulong + # Map all required symbols and initialize library --------------------------- + # HRESULT VCIAPI vciInitialize ( void ); + _canlib.map_symbol("vciInitialize", hresult_type, (), __check_status) + + # void VCIAPI vciFormatError (HRESULT hrError, PCHAR pszText, UINT32 dwsize); + try: + _canlib.map_symbol( + "vciFormatError", None, (ctypes_HRESULT, ctypes.c_char_p, ctypes.c_uint32) + ) + except: + _canlib.map_symbol( + "vciFormatErrorA", None, (ctypes_HRESULT, ctypes.c_char_p, ctypes.c_uint32) + ) + _canlib.vciFormatError = _canlib.vciFormatErrorA + # Hack to have vciFormatError as a free function + vciFormatError = functools.partial(__vciFormatError, _canlib) + + # HRESULT VCIAPI vciEnumDeviceOpen( OUT PHANDLE hEnum ); + _canlib.map_symbol("vciEnumDeviceOpen", hresult_type, (PHANDLE,), __check_status) + # HRESULT VCIAPI vciEnumDeviceClose ( IN HANDLE hEnum ); + _canlib.map_symbol("vciEnumDeviceClose", hresult_type, (HANDLE,), __check_status) + # HRESULT VCIAPI vciEnumDeviceNext( IN HANDLE hEnum, OUT PVCIDEVICEINFO pInfo ); + _canlib.map_symbol( + "vciEnumDeviceNext", + hresult_type, + (HANDLE, structures.PVCIDEVICEINFO), + __check_status, + ) + + # HRESULT VCIAPI vciDeviceOpen( IN REFVCIID rVciid, OUT PHANDLE phDevice ); + _canlib.map_symbol( + "vciDeviceOpen", hresult_type, (structures.PVCIID, PHANDLE), __check_status + ) + # HRESULT vciDeviceClose( HANDLE hDevice ) + _canlib.map_symbol("vciDeviceClose", hresult_type, (HANDLE,), __check_status) + + # HRESULT VCIAPI canChannelOpen( IN HANDLE hDevice, IN UINT32 dwCanNo, IN BOOL fExclusive, OUT PHANDLE phCanChn ); + _canlib.map_symbol( + "canChannelOpen", + hresult_type, + (HANDLE, ctypes.c_uint32, ctypes.c_long, PHANDLE), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI + # canChannelInitialize( IN HANDLE hCanChn, + # IN UINT16 wRxFifoSize, + # IN UINT16 wRxThreshold, + # IN UINT16 wTxFifoSize, + # IN UINT16 wTxThreshold, + # IN UINT32 dwFilterSize, + # IN UINT8 bFilterMode ); + _canlib.map_symbol( + "canChannelInitialize", + hresult_type, + ( + HANDLE, + ctypes.c_uint16, + ctypes.c_uint16, + ctypes.c_uint16, + ctypes.c_uint16, + ctypes.c_uint32, + ctypes.c_uint8, + ), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI canChannelActivate( IN HANDLE hCanChn, IN BOOL fEnable ); + _canlib.map_symbol( + "canChannelActivate", hresult_type, (HANDLE, ctypes.c_long), __check_status + ) + # HRESULT canChannelClose( HANDLE hChannel ) + _canlib.map_symbol("canChannelClose", hresult_type, (HANDLE,), __check_status) + # EXTERN_C HRESULT VCIAPI canChannelReadMessage( IN HANDLE hCanChn, IN UINT32 dwMsTimeout, OUT PCANMSG2 pCanMsg ); + _canlib.map_symbol( + "canChannelReadMessage", + hresult_type, + (HANDLE, ctypes.c_uint32, structures.PCANMSG2), + __check_status, + ) + # HRESULT canChannelPeekMessage(HANDLE hChannel,PCANMSG2 pCanMsg ); + _canlib.map_symbol( + "canChannelPeekMessage", + hresult_type, + (HANDLE, structures.PCANMSG2), + __check_status, + ) + # HRESULT canChannelWaitTxEvent (HANDLE hChannel UINT32 dwMsTimeout ); + _canlib.map_symbol( + "canChannelWaitTxEvent", + hresult_type, + (HANDLE, ctypes.c_uint32), + __check_status, + ) + # HRESULT canChannelWaitRxEvent (HANDLE hChannel, UINT32 dwMsTimeout ); + _canlib.map_symbol( + "canChannelWaitRxEvent", + hresult_type, + (HANDLE, ctypes.c_uint32), + __check_status, + ) + # HRESULT canChannelPostMessage (HANDLE hChannel, PCANMSG2 pCanMsg ); + _canlib.map_symbol( + "canChannelPostMessage", + hresult_type, + (HANDLE, structures.PCANMSG2), + __check_status, + ) + # HRESULT canChannelSendMessage (HANDLE hChannel, UINT32 dwMsTimeout, PCANMSG2 pCanMsg ); + _canlib.map_symbol( + "canChannelSendMessage", + hresult_type, + (HANDLE, ctypes.c_uint32, structures.PCANMSG2), + __check_status, + ) + + # EXTERN_C HRESULT VCIAPI canControlOpen( IN HANDLE hDevice, IN UINT32 dwCanNo, OUT PHANDLE phCanCtl ); + _canlib.map_symbol( + "canControlOpen", + hresult_type, + (HANDLE, ctypes.c_uint32, PHANDLE), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI + # canControlInitialize( IN HANDLE hCanCtl, + # IN UINT8 bOpMode, + # IN UINT8 bExMode, + # IN UINT8 bSFMode, + # IN UINT8 bEFMode, + # IN UINT32 dwSFIds, + # IN UINT32 dwEFIds, + # IN PCANBTP pBtpSDR, + # IN PCANBTP pBtpFDR ); + _canlib.map_symbol( + "canControlInitialize", + hresult_type, + ( + HANDLE, + ctypes.c_uint8, + ctypes.c_uint8, + ctypes.c_uint8, + ctypes.c_uint8, + ctypes.c_uint32, + ctypes.c_uint32, + structures.PCANBTP, + structures.PCANBTP, + ), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI canControlClose( IN HANDLE hCanCtl ); + _canlib.map_symbol("canControlClose", hresult_type, (HANDLE,), __check_status) + # EXTERN_C HRESULT VCIAPI canControlReset( IN HANDLE hCanCtl ); + _canlib.map_symbol("canControlReset", hresult_type, (HANDLE,), __check_status) + # EXTERN_C HRESULT VCIAPI canControlStart( IN HANDLE hCanCtl, IN BOOL fStart ); + _canlib.map_symbol( + "canControlStart", hresult_type, (HANDLE, ctypes.c_long), __check_status + ) + # EXTERN_C HRESULT VCIAPI canControlGetStatus( IN HANDLE hCanCtl, OUT PCANLINESTATUS2 pStatus ); + _canlib.map_symbol( + "canControlGetStatus", + hresult_type, + (HANDLE, structures.PCANLINESTATUS2), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI canControlGetCaps( IN HANDLE hCanCtl, OUT PCANCAPABILITIES2 pCanCaps ); + _canlib.map_symbol( + "canControlGetCaps", + hresult_type, + (HANDLE, structures.PCANCAPABILITIES2), + __check_status, + ) + # EXTERN_C HRESULT VCIAPI canControlSetAccFilter( IN HANDLE hCanCtl, IN BOOL fExtend, IN UINT32 dwCode, IN UINT32 dwMask ); + _canlib.map_symbol( + "canControlSetAccFilter", + hresult_type, + (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32), + __check_status, + ) + # EXTERN_C HRESULT canControlAddFilterIds (HANDLE hControl, BOOL fExtended, UINT32 dwCode, UINT32 dwMask); + _canlib.map_symbol( + "canControlAddFilterIds", + hresult_type, + (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32), + __check_status, + ) + # EXTERN_C HRESULT canControlRemFilterIds (HANDLE hControl, BOOL fExtendend, UINT32 dwCode, UINT32 dwMask ); + _canlib.map_symbol( + "canControlRemFilterIds", + hresult_type, + (HANDLE, ctypes.c_int, ctypes.c_uint32, ctypes.c_uint32), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerOpen (HANDLE hDevice, UINT32 dwCanNo, PHANDLE phScheduler ); + _canlib.map_symbol( + "canSchedulerOpen", + hresult_type, + (HANDLE, ctypes.c_uint32, PHANDLE), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerClose (HANDLE hScheduler ); + _canlib.map_symbol("canSchedulerClose", hresult_type, (HANDLE,), __check_status) + # EXTERN_C HRESULT canSchedulerGetCaps (HANDLE hScheduler, PCANCAPABILITIES2 pCaps ); + _canlib.map_symbol( + "canSchedulerGetCaps", + hresult_type, + (HANDLE, structures.PCANCAPABILITIES2), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerActivate ( HANDLE hScheduler, BOOL fEnable ); + _canlib.map_symbol( + "canSchedulerActivate", hresult_type, (HANDLE, ctypes.c_int), __check_status + ) + # EXTERN_C HRESULT canSchedulerAddMessage (HANDLE hScheduler, PCANCYCLICTXMSG2 pMessage, PUINT32 pdwIndex ); + _canlib.map_symbol( + "canSchedulerAddMessage", + hresult_type, + (HANDLE, structures.PCANCYCLICTXMSG2, ctypes.POINTER(ctypes.c_uint32)), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerRemMessage (HANDLE hScheduler, UINT32 dwIndex ); + _canlib.map_symbol( + "canSchedulerRemMessage", + hresult_type, + (HANDLE, ctypes.c_uint32), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerStartMessage (HANDLE hScheduler, UINT32 dwIndex, UINT16 dwCount ); + _canlib.map_symbol( + "canSchedulerStartMessage", + hresult_type, + (HANDLE, ctypes.c_uint32, ctypes.c_uint16), + __check_status, + ) + # EXTERN_C HRESULT canSchedulerStopMessage (HANDLE hScheduler, UINT32 dwIndex ); + _canlib.map_symbol( + "canSchedulerStopMessage", + hresult_type, + (HANDLE, ctypes.c_uint32), + __check_status, + ) + _canlib.vciInitialize() +except AttributeError: + # In case _canlib == None meaning we're not on win32/no lib found + pass +except Exception as e: + log.warning("Could not initialize IXXAT VCI library: %s", e) +# --------------------------------------------------------------------------- + + +CAN_INFO_MESSAGES = { + constants.CAN_INFO_START: "CAN started", + constants.CAN_INFO_STOP: "CAN stopped", + constants.CAN_INFO_RESET: "CAN reset", +} + +CAN_ERROR_MESSAGES = { + constants.CAN_ERROR_STUFF: "CAN bit stuff error", + constants.CAN_ERROR_FORM: "CAN form error", + constants.CAN_ERROR_ACK: "CAN acknowledgment error", + constants.CAN_ERROR_BIT: "CAN bit error", + constants.CAN_ERROR_CRC: "CAN CRC error", + constants.CAN_ERROR_OTHER: "Other (unknown) CAN error", +} + +CAN_STATUS_FLAGS = { + constants.CAN_STATUS_TXPEND: "transmission pending", + constants.CAN_STATUS_OVRRUN: "data overrun occurred", + constants.CAN_STATUS_ERRLIM: "error warning limit exceeded", + constants.CAN_STATUS_BUSOFF: "bus off", + constants.CAN_STATUS_ININIT: "init mode active", + constants.CAN_STATUS_BUSCERR: "bus coupling error", +} +# ---------------------------------------------------------------------------- + + +class IXXATBus(BusABC): + """The CAN Bus implemented for the IXXAT interface. + + .. warning:: + + This interface does implement efficient filtering of messages, but + the filters have to be set in :meth:`~can.interfaces.ixxat.IXXATBus.__init__` + using the ``can_filters`` parameter. Using :meth:`~can.interfaces.ixxat.IXXATBus.set_filters` + does not work. + + """ + + @deprecated_args_alias( + UniqueHardwareId="unique_hardware_id", + rxFifoSize="rx_fifo_size", + txFifoSize="tx_fifo_size", + ) + def __init__( + self, + channel: int, + can_filters=None, + receive_own_messages: int = False, + unique_hardware_id: Optional[int] = None, + extended: bool = True, + rx_fifo_size: int = 1024, + tx_fifo_size: int = 128, + bitrate: int = 500000, + data_bitrate: int = 2000000, + sjw_abr: int = None, + tseg1_abr: int = None, + tseg2_abr: int = None, + sjw_dbr: int = None, + tseg1_dbr: int = None, + tseg2_dbr: int = None, + ssp_dbr: int = None, + **kwargs, + ): + """ + :param channel: + The Channel id to create this bus with. + + :param can_filters: + See :meth:`can.BusABC.set_filters`. + + :param receive_own_messages: + Enable self-reception of sent messages. + + :param unique_hardware_id: + unique_hardware_id to connect (optional, will use the first found if not supplied) + + :param extended: + Default True, enables the capability to use extended IDs. + + :param rx_fifo_size: + Receive fifo size (default 1024) + + :param tx_fifo_size: + Transmit fifo size (default 128) + + :param bitrate: + Channel bitrate in bit/s + + :param data_bitrate: + Channel bitrate in bit/s (only in CAN-Fd if baudrate switch enabled). + + :param sjw_abr: + Bus timing value sample jump width (arbitration). + + :param tseg1_abr: + Bus timing value tseg1 (arbitration) + + :param tseg2_abr: + Bus timing value tseg2 (arbitration) + + :param sjw_dbr: + Bus timing value sample jump width (data) + + :param tseg1_dbr: + Bus timing value tseg1 (data). Only takes effect with fd and bitrate switch enabled. + + :param tseg2_dbr: + Bus timing value tseg2 (data). Only takes effect with fd and bitrate switch enabled. + + :param ssp_dbr: + Secondary sample point (data). Only takes effect with fd and bitrate switch enabled. + + """ + if _canlib is None: + raise CanInterfaceNotImplementedError( + "The IXXAT VCI library has not been initialized. Check the logs for more details." + ) + log.info("CAN Filters: %s", can_filters) + # Configuration options + self._receive_own_messages = receive_own_messages + # Usually comes as a string from the config file + channel = int(channel) + + if bitrate not in constants.CAN_BITRATE_PRESETS and ( + tseg1_abr is None or tseg2_abr is None or sjw_abr is None + ): + raise ValueError( + "To use bitrate {} (that has not predefined preset) is mandatory to use also parameters tseg1_abr, tseg2_abr and swj_abr".format( + bitrate + ) + ) + if data_bitrate not in constants.CAN_DATABITRATE_PRESETS and ( + tseg1_dbr is None or tseg2_dbr is None or sjw_dbr is None + ): + raise ValueError( + "To use data_bitrate {} (that has not predefined preset) is mandatory to use also parameters tseg1_dbr, tseg2_dbr and swj_dbr".format( + data_bitrate + ) + ) + + if rx_fifo_size <= 0: + raise ValueError("rx_fifo_size must be > 0") + + if tx_fifo_size <= 0: + raise ValueError("tx_fifo_size must be > 0") + + if channel < 0: + raise ValueError("channel number must be >= 0") + + self._device_handle = HANDLE() + self._device_info = structures.VCIDEVICEINFO() + self._control_handle = HANDLE() + self._channel_handle = HANDLE() + self._channel_capabilities = structures.CANCAPABILITIES2() + self._message = structures.CANMSG2() + self._payload = (ctypes.c_byte * 64)() + + # Search for supplied device + if unique_hardware_id is None: + log.info("Searching for first available device") + else: + log.info("Searching for unique HW ID %s", unique_hardware_id) + _canlib.vciEnumDeviceOpen(ctypes.byref(self._device_handle)) + while True: + try: + _canlib.vciEnumDeviceNext( + self._device_handle, ctypes.byref(self._device_info) + ) + except StopIteration: + if unique_hardware_id is None: + raise VCIDeviceNotFoundError( + "No IXXAT device(s) connected or device(s) in use by other process(es)." + ) + else: + raise VCIDeviceNotFoundError( + "Unique HW ID {} not connected or not available.".format( + unique_hardware_id + ) + ) + else: + if (unique_hardware_id is None) or ( + self._device_info.UniqueHardwareId.AsChar + == bytes(unique_hardware_id, "ascii") + ): + break + else: + log.debug( + "Ignoring IXXAT with hardware id '%s'.", + self._device_info.UniqueHardwareId.AsChar.decode("ascii"), + ) + _canlib.vciEnumDeviceClose(self._device_handle) + + try: + _canlib.vciDeviceOpen( + ctypes.byref(self._device_info.VciObjectId), + ctypes.byref(self._device_handle), + ) + except Exception as exception: + raise CanInitializationError(f"Could not open device: {exception}") + + log.info("Using unique HW ID %s", self._device_info.UniqueHardwareId.AsChar) + + log.info( + "Initializing channel %d in shared mode, %d rx buffers, %d tx buffers", + channel, + rx_fifo_size, + tx_fifo_size, + ) + + try: + _canlib.canChannelOpen( + self._device_handle, + channel, + constants.FALSE, + ctypes.byref(self._channel_handle), + ) + except Exception as exception: + raise CanInitializationError( + f"Could not open and initialize channel: {exception}" + ) + + # Signal TX/RX events when at least one frame has been handled + _canlib.canChannelInitialize( + self._channel_handle, + rx_fifo_size, + 1, + tx_fifo_size, + 1, + 0, + constants.CAN_FILTER_PASS, + ) + _canlib.canChannelActivate(self._channel_handle, constants.TRUE) + + pBtpSDR = IXXATBus._canptb_build( + defaults=constants.CAN_BITRATE_PRESETS, + bitrate=bitrate, + tseg1=tseg1_abr, + tseg2=tseg2_abr, + sjw=sjw_abr, + ssp=0, + ) + pBtpFDR = IXXATBus._canptb_build( + defaults=constants.CAN_DATABITRATE_PRESETS, + bitrate=data_bitrate, + tseg1=tseg1_dbr, + tseg2=tseg2_dbr, + sjw=sjw_dbr, + ssp=ssp_dbr if ssp_dbr is not None else tseg1_dbr, + ) + + log.info( + "Initializing control %d with SDR={%s}, FDR={%s}", + channel, + pBtpSDR, + pBtpFDR, + ) + _canlib.canControlOpen( + self._device_handle, channel, ctypes.byref(self._control_handle) + ) + + _canlib.canControlGetCaps( + self._control_handle, ctypes.byref(self._channel_capabilities) + ) + + # check capabilities + bOpMode = constants.CAN_OPMODE_UNDEFINED + if ( + self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_STDANDEXT + ) != 0: + # controller supportes CAN_OPMODE_STANDARD and CAN_OPMODE_EXTENDED at the same time + bOpMode |= constants.CAN_OPMODE_STANDARD # enable both 11 bits reception + if extended: # parameter from configuration + bOpMode |= constants.CAN_OPMODE_EXTENDED # enable 29 bits reception + elif ( + self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_STDANDEXT + ) != 0: + log.warning( + "Channel %d capabilities allow either basic or extended IDs, but not both. using %s according to parameter [extended=%s]", + channel, + "extended" if extended else "basic", + "True" if extended else "False", + ) + bOpMode |= ( + constants.CAN_OPMODE_EXTENDED + if extended + else constants.CAN_OPMODE_STANDARD + ) + + if ( + self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_ERRFRAME + ) != 0: + bOpMode |= constants.CAN_OPMODE_ERRFRAME + + bExMode = constants.CAN_EXMODE_DISABLED + if (self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_EXTDATA) != 0: + bExMode |= constants.CAN_EXMODE_EXTDATALEN + + if ( + self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_FASTDATA + ) != 0: + bExMode |= constants.CAN_EXMODE_FASTDATA + + _canlib.canControlInitialize( + self._control_handle, + bOpMode, + bExMode, + constants.CAN_FILTER_PASS, + constants.CAN_FILTER_PASS, + 0, + 0, + ctypes.byref(pBtpSDR), + ctypes.byref(pBtpFDR), + ) + + # With receive messages, this field contains the relative reception time of + # the message in ticks. The resolution of a tick can be calculated from the fields + # dwClockFreq and dwTscDivisor of the structure CANCAPABILITIES in accordance with the following formula: + # frequency [1/s] = dwClockFreq / dwTscDivisor + self._tick_resolution = ( + self._channel_capabilities.dwTscClkFreq + / self._channel_capabilities.dwTscDivisor + ) + + # Setup filters before starting the channel + if can_filters: + log.info("The IXXAT VCI backend is filtering messages") + # Disable every message coming in + for extended in (0, 1): + _canlib.canControlSetAccFilter( + self._control_handle, + extended, + constants.CAN_ACC_CODE_NONE, + constants.CAN_ACC_MASK_NONE, + ) + for can_filter in can_filters: + # Filters define what messages are accepted + code = int(can_filter["can_id"]) + mask = int(can_filter["can_mask"]) + extended = can_filter.get("extended", False) + _canlib.canControlAddFilterIds( + self._control_handle, 1 if extended else 0, code << 1, mask << 1 + ) + log.info("Accepting ID: 0x%X MASK: 0x%X", code, mask) + + # Start the CAN controller. Messages will be forwarded to the channel + _canlib.canControlStart(self._control_handle, constants.TRUE) + + # For cyclic transmit list. Set when .send_periodic() is first called + self._scheduler = None + self._scheduler_resolution = None + self.channel = channel + + # Usually you get back 3 messages like "CAN initialized" ecc... + # Clear the FIFO by filter them out with low timeout + for _ in range(rx_fifo_size): + try: + _canlib.canChannelReadMessage( + self._channel_handle, 0, ctypes.byref(self._message) + ) + except (VCITimeout, VCIRxQueueEmptyError): + break + + super().__init__(channel=channel, can_filters=None, **kwargs) + + @staticmethod + def _canptb_build(defaults, bitrate, tseg1, tseg2, sjw, ssp): + if bitrate in defaults: + d = defaults[bitrate] + if tseg1 is None: + tseg1 = d.wTS1 + if tseg2 is None: + tseg2 = d.wTS2 + if sjw is None: + sjw = d.wSJW + if ssp is None: + ssp = d.wTDO + dw_mode = d.dwMode + else: + dw_mode = 0 + + return structures.CANBTP( + dwMode=dw_mode, + dwBPS=bitrate, + wTS1=tseg1, + wTS2=tseg2, + wSJW=sjw, + wTDO=ssp, + ) + + def _inWaiting(self): + try: + _canlib.canChannelWaitRxEvent(self._channel_handle, 0) + except VCITimeout: + return 0 + else: + return 1 + + def flush_tx_buffer(self): + """Flushes the transmit buffer on the IXXAT""" + # TODO #64: no timeout? + _canlib.canChannelWaitTxEvent(self._channel_handle, constants.INFINITE) + + def _recv_internal(self, timeout): + """Read a message from IXXAT device.""" + + # TODO: handling CAN error messages? + data_received = False + + if timeout == 0: + # Peek without waiting + try: + _canlib.canChannelPeekMessage( + self._channel_handle, ctypes.byref(self._message) + ) + except (VCITimeout, VCIRxQueueEmptyError, VCIError): + # VCIError means no frame available (canChannelPeekMessage returned different from zero) + return None, True + else: + if self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_DATA: + data_received = True + else: + # Wait if no message available + if timeout is None or timeout < 0: + remaining_ms = constants.INFINITE + t0 = None + else: + timeout_ms = int(timeout * 1000) + remaining_ms = timeout_ms + t0 = perf_counter() + + while True: + try: + _canlib.canChannelReadMessage( + self._channel_handle, remaining_ms, ctypes.byref(self._message) + ) + except (VCITimeout, VCIRxQueueEmptyError): + # Ignore the 2 errors, the timeout is handled manually with the perf_counter() + pass + else: + # See if we got a data or info/error messages + if self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_DATA: + data_received = True + break + elif self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_INFO: + log.info( + CAN_INFO_MESSAGES.get( + self._message.abData[0], + "Unknown CAN info message code {}".format( + self._message.abData[0] + ), + ) + ) + + elif ( + self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_ERROR + ): + log.warning( + CAN_ERROR_MESSAGES.get( + self._message.abData[0], + "Unknown CAN error message code {}".format( + self._message.abData[0] + ), + ) + ) + + elif ( + self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_STATUS + ): + log.info(_format_can_status(self._message.abData[0])) + if self._message.abData[0] & constants.CAN_STATUS_BUSOFF: + raise VCIBusOffError() + + elif ( + self._message.uMsgInfo.Bits.type + == constants.CAN_MSGTYPE_TIMEOVR + ): + pass + else: + log.warning("Unexpected message info type") + + if t0 is not None: + remaining_ms = timeout_ms - int((perf_counter() - t0) * 1000) + if remaining_ms < 0: + break + + if not data_received: + # Timed out / can message type is not DATA + return None, True + + data_len = can.util.dlc2len(self._message.uMsgInfo.Bits.dlc) + # The _message.dwTime is a 32bit tick value and will overrun, + # so expect to see the value restarting from 0 + rx_msg = Message( + timestamp=self._message.dwTime + / self._tick_resolution, # Relative time in s + is_remote_frame=bool(self._message.uMsgInfo.Bits.rtr), + is_fd=bool(self._message.uMsgInfo.Bits.edl), + is_rx=True, + is_error_frame=bool( + self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_ERROR + ), + bitrate_switch=bool(self._message.uMsgInfo.Bits.fdr), + error_state_indicator=bool(self._message.uMsgInfo.Bits.esi), + is_extended_id=bool(self._message.uMsgInfo.Bits.ext), + arbitration_id=self._message.dwMsgId, + dlc=data_len, + data=self._message.abData[:data_len], + channel=self.channel, + ) + + return rx_msg, True + + def send(self, msg: Message, timeout: Optional[float] = None) -> None: + """ + Sends a message on the bus. The interface may buffer the message. + + :param msg: + The message to send. + :param timeout: + Timeout after some time. + :raise: + :class:CanTimeoutError + :class:CanOperationError + """ + # This system is not designed to be very efficient + message = structures.CANMSG2() + message.uMsgInfo.Bits.type = ( + constants.CAN_MSGTYPE_ERROR + if msg.is_error_frame + else constants.CAN_MSGTYPE_DATA + ) + message.uMsgInfo.Bits.rtr = 1 if msg.is_remote_frame else 0 + message.uMsgInfo.Bits.ext = 1 if msg.is_extended_id else 0 + message.uMsgInfo.Bits.srr = 1 if self._receive_own_messages else 0 + message.uMsgInfo.Bits.fdr = 1 if msg.bitrate_switch else 0 + message.uMsgInfo.Bits.esi = 1 if msg.error_state_indicator else 0 + message.uMsgInfo.Bits.edl = 1 if msg.is_fd else 0 + message.dwMsgId = msg.arbitration_id + if msg.dlc: # this dlc means number of bytes of payload + message.uMsgInfo.Bits.dlc = can.util.len2dlc(msg.dlc) + data_len_dif = msg.dlc - len(msg.data) + data = msg.data + bytearray( + [0] * data_len_dif + ) # pad with zeros until required length + adapter = (ctypes.c_uint8 * msg.dlc).from_buffer(data) + ctypes.memmove(message.abData, adapter, msg.dlc) + + if timeout: + _canlib.canChannelSendMessage( + self._channel_handle, int(timeout * 1000), message + ) + + else: + _canlib.canChannelPostMessage(self._channel_handle, message) + + def _send_periodic_internal(self, msg, period, duration=None): + """Send a message using built-in cyclic transmit list functionality.""" + if self._scheduler is None: + self._scheduler = HANDLE() + _canlib.canSchedulerOpen(self._device_handle, self.channel, self._scheduler) + caps = structures.CANCAPABILITIES2() + _canlib.canSchedulerGetCaps(self._scheduler, caps) + self._scheduler_resolution = ( + caps.dwCmsClkFreq / caps.dwCmsDivisor + ) # TODO: confirm + _canlib.canSchedulerActivate(self._scheduler, constants.TRUE) + return CyclicSendTask( + self._scheduler, msg, period, duration, self._scheduler_resolution + ) + + def shutdown(self): + if self._scheduler is not None: + _canlib.canSchedulerClose(self._scheduler) + _canlib.canChannelClose(self._channel_handle) + _canlib.canControlStart(self._control_handle, constants.FALSE) + _canlib.canControlClose(self._control_handle) + _canlib.vciDeviceClose(self._device_handle) + + +class CyclicSendTask(LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC): + """A message in the cyclic transmit list.""" + + def __init__(self, scheduler, msgs, period, duration, resolution): + super().__init__(msgs, period, duration) + if len(self.messages) != 1: + raise ValueError( + "IXXAT Interface only supports periodic transmission of 1 element" + ) + + self._scheduler = scheduler + self._index = None + self._count = int(duration / period) if duration else 0 + + self._msg = structures.CANCYCLICTXMSG2() + self._msg.wCycleTime = int(round(period * resolution)) + self._msg.dwMsgId = self.messages[0].arbitration_id + self._msg.uMsgInfo.Bits.type = constants.CAN_MSGTYPE_DATA + self._msg.uMsgInfo.Bits.ext = 1 if self.messages[0].is_extended_id else 0 + self._msg.uMsgInfo.Bits.rtr = 1 if self.messages[0].is_remote_frame else 0 + self._msg.uMsgInfo.Bits.dlc = self.messages[0].dlc + for i, b in enumerate(self.messages[0].data): + self._msg.abData[i] = b + self.start() + + def start(self): + """Start transmitting message (add to list if needed).""" + if self._index is None: + self._index = ctypes.c_uint32() + _canlib.canSchedulerAddMessage(self._scheduler, self._msg, self._index) + _canlib.canSchedulerStartMessage(self._scheduler, self._index, self._count) + + def pause(self): + """Pause transmitting message (keep it in the list).""" + _canlib.canSchedulerStopMessage(self._scheduler, self._index) + + def stop(self): + """Stop transmitting message (remove from list).""" + # Remove it completely instead of just stopping it to avoid filling up + # the list with permanently stopped messages + _canlib.canSchedulerRemMessage(self._scheduler, self._index) + self._index = None + + +def _format_can_status(status_flags: int): + """ + Format a status bitfield found in CAN_MSGTYPE_STATUS messages or in dwStatus + field in CANLINESTATUS. + + Valid states are defined in the CAN_STATUS_* constants in cantype.h + """ + states = [] + for flag, description in CAN_STATUS_FLAGS.items(): + if status_flags & flag: + states.append(description) + status_flags &= ~flag + + if status_flags: + states.append("unknown state 0x{:02x}".format(status_flags)) + + if states: + return "CAN status message: {}".format(", ".join(states)) + else: + return "Empty CAN status message" + + +def get_ixxat_hwids(): + """Get a list of hardware ids of all available IXXAT devices.""" + hwids = [] + device_handle = HANDLE() + device_info = structures.VCIDEVICEINFO() + + _canlib.vciEnumDeviceOpen(ctypes.byref(device_handle)) + while True: + try: + _canlib.vciEnumDeviceNext(device_handle, ctypes.byref(device_info)) + except StopIteration: + break + else: + hwids.append(device_info.UniqueHardwareId.AsChar.decode("ascii")) + _canlib.vciEnumDeviceClose(device_handle) + + return hwids diff --git a/can/interfaces/ixxat/constants.py b/can/interfaces/ixxat/constants.py index e911a580a..1dbc22a44 100644 --- a/can/interfaces/ixxat/constants.py +++ b/can/interfaces/ixxat/constants.py @@ -4,6 +4,8 @@ Copyright (C) 2016 Giuseppe Corbelli """ +from . import structures + FALSE = 0 TRUE = 1 @@ -119,6 +121,12 @@ CAN_OPMODE_LISTONLY = 0x08 CAN_OPMODE_LOWSPEED = 0x10 +# Extended operating modes +CAN_EXMODE_DISABLED = 0x00 +CAN_EXMODE_EXTDATALEN = 0x01 +CAN_EXMODE_FASTDATA = 0x02 +CAN_EXMODE_NONISOCANFD = 0x04 + # Message types CAN_MSGTYPE_DATA = 0 CAN_MSGTYPE_INFO = 1 @@ -146,3 +154,104 @@ # acceptance code and mask to reject all CAN IDs CAN_ACC_MASK_NONE = 0xFFFFFFFF CAN_ACC_CODE_NONE = 0x80000000 + +# BTMODEs +CAN_BTMODE_RAW = 0x00000001 # raw mode +CAN_BTMODE_TSM = 0x00000002 # triple sampling mode + + +CAN_FILTER_VOID = 0x00 # invalid or unknown filter mode (do not use for initialization) +CAN_FILTER_LOCK = 0x01 # lock filter (inhibit all IDs) +CAN_FILTER_PASS = 0x02 # bypass filter (pass all IDs) +CAN_FILTER_INCL = 0x03 # inclusive filtering (pass registered IDs) +CAN_FILTER_EXCL = 0x04 # exclusive filtering (inhibit registered IDs) + + +CAN_MSGFLAGS_DLC = 0x0F # [bit 0] data length code +CAN_MSGFLAGS_OVR = 0x10 # [bit 4] data overrun flag +CAN_MSGFLAGS_SRR = 0x20 # [bit 5] self reception request +CAN_MSGFLAGS_RTR = 0x40 # [bit 6] remote transmission request +CAN_MSGFLAGS_EXT = 0x80 # [bit 7] frame format (0=11-bit, 1=29-bit) + + +CAN_MSGFLAGS2_SSM = 0x01 # [bit 0] single shot mode +CAN_MSGFLAGS2_HPM = 0x02 # [bit 1] high priority message +CAN_MSGFLAGS2_EDL = 0x04 # [bit 2] extended data length +CAN_MSGFLAGS2_FDR = 0x08 # [bit 3] fast data bit rate +CAN_MSGFLAGS2_ESI = 0x10 # [bit 4] error state indicator +CAN_MSGFLAGS2_RES = 0xE0 # [bit 5..7] reserved bits + + +CAN_ACCEPT_REJECT = 0x00 # message not accepted +CAN_ACCEPT_ALWAYS = 0xFF # message always accepted +CAN_ACCEPT_FILTER_1 = 0x01 # message accepted by filter 1 +CAN_ACCEPT_FILTER_2 = 0x02 # message accepted by filter 2 +CAN_ACCEPT_PASSEXCL = 0x03 # message passes exclusion filter + + +CAN_FEATURE_STDOREXT = 0x00000001 # 11 OR 29 bit (exclusive) +CAN_FEATURE_STDANDEXT = 0x00000002 # 11 AND 29 bit (simultaneous) +CAN_FEATURE_RMTFRAME = 0x00000004 # reception of remote frames +CAN_FEATURE_ERRFRAME = 0x00000008 # reception of error frames +CAN_FEATURE_BUSLOAD = 0x00000010 # bus load measurement +CAN_FEATURE_IDFILTER = 0x00000020 # exact message filter +CAN_FEATURE_LISTONLY = 0x00000040 # listen only mode +CAN_FEATURE_SCHEDULER = 0x00000080 # cyclic message scheduler +CAN_FEATURE_GENERRFRM = 0x00000100 # error frame generation +CAN_FEATURE_DELAYEDTX = 0x00000200 # delayed message transmitter +CAN_FEATURE_SINGLESHOT = 0x00000400 # single shot mode +CAN_FEATURE_HIGHPRIOR = 0x00000800 # high priority message +CAN_FEATURE_AUTOBAUD = 0x00001000 # automatic bit rate detection +CAN_FEATURE_EXTDATA = 0x00002000 # extended data length (CANFD) +CAN_FEATURE_FASTDATA = 0x00004000 # fast data bit rate (CANFD) +CAN_FEATURE_ISOFRAME = 0x00008000 # ISO conform frame (CANFD) +CAN_FEATURE_NONISOFRM = ( + 0x00010000 # non ISO conform frame (CANFD) (different CRC computation) +) +CAN_FEATURE_64BITTSC = 0x00020000 # 64-bit time stamp counter + + +CAN_BITRATE_PRESETS = { + 250000: structures.CANBTP( + dwMode=0, dwBPS=250000, wTS1=6400, wTS2=1600, wSJW=1600, wTDO=0 + ), # SP = 80,0% + 500000: structures.CANBTP( + dwMode=0, dwBPS=500000, wTS1=6400, wTS2=1600, wSJW=1600, wTDO=0 + ), # SP = 80,0% + 1000000: structures.CANBTP( + dwMode=0, dwBPS=1000000, wTS1=6400, wTS2=1600, wSJW=1600, wTDO=0 + ), # SP = 80,0% +} + +CAN_DATABITRATE_PRESETS = { + 500000: structures.CANBTP( + dwMode=0, dwBPS=500000, wTS1=6400, wTS2=1600, wSJW=1600, wTDO=6400 + ), # SP = 80,0% + 833333: structures.CANBTP( + dwMode=0, dwBPS=833333, wTS1=1600, wTS2=400, wSJW=400, wTDO=1620 + ), # SP = 80,0% + 1000000: structures.CANBTP( + dwMode=0, dwBPS=1000000, wTS1=1600, wTS2=400, wSJW=400, wTDO=1600 + ), # SP = 80,0% + 1538461: structures.CANBTP( + dwMode=0, dwBPS=1538461, wTS1=1000, wTS2=300, wSJW=300, wTDO=1040 + ), # SP = 76,9% + 2000000: structures.CANBTP( + dwMode=0, dwBPS=2000000, wTS1=1600, wTS2=400, wSJW=400, wTDO=1600 + ), # SP = 80,0% + 4000000: structures.CANBTP( + dwMode=0, dwBPS=4000000, wTS1=800, wTS2=200, wSJW=200, wTDO=800 + ), # SP = 80,0% + 5000000: structures.CANBTP( + dwMode=0, dwBPS=5000000, wTS1=600, wTS2=200, wSJW=200, wTDO=600 + ), # SP = 75,0% + 6666666: structures.CANBTP( + dwMode=0, dwBPS=6666666, wTS1=400, wTS2=200, wSJW=200, wTDO=402 + ), # SP = 66,7% + 8000000: structures.CANBTP( + dwMode=0, dwBPS=8000000, wTS1=400, wTS2=100, wSJW=100, wTDO=250 + ), # SP = 80,0% + 10000000: structures.CANBTP( + dwMode=0, dwBPS=10000000, wTS1=300, wTS2=100, wSJW=100, wTDO=200 + ), # SP = 75,0% +} diff --git a/can/interfaces/ixxat/structures.py b/can/interfaces/ixxat/structures.py index 73c01823d..f76a39a38 100644 --- a/can/interfaces/ixxat/structures.py +++ b/can/interfaces/ixxat/structures.py @@ -114,24 +114,34 @@ class CANCAPABILITIES(ctypes.Structure): class CANMSGINFO(ctypes.Union): class Bytes(ctypes.Structure): _fields_ = [ - ("bType", ctypes.c_uint8), - ("bAddFlags", ctypes.c_uint8), - ("bFlags", ctypes.c_uint8), - ("bAccept", ctypes.c_uint8), + ("bType", ctypes.c_uint8), # type (see CAN_MSGTYPE_ constants) + ( + "bAddFlags", + ctypes.c_uint8, + ), # extended flags (see CAN_MSGFLAGS2_ constants) + ("bFlags", ctypes.c_uint8), # flags (see CAN_MSGFLAGS_ constants) + ("bAccept", ctypes.c_uint8), # accept code (see CAN_ACCEPT_ constants) ] class Bits(ctypes.Structure): _fields_ = [ - ("type", ctypes.c_uint32, 8), - ("ssm", ctypes.c_uint32, 1), - ("hi", ctypes.c_uint32, 2), - ("res", ctypes.c_uint32, 5), - ("dlc", ctypes.c_uint32, 4), - ("ovr", ctypes.c_uint32, 1), - ("srr", ctypes.c_uint32, 1), - ("rtr", ctypes.c_uint32, 1), - ("ext", ctypes.c_uint32, 1), - ("afc", ctypes.c_uint32, 8), + ("type", ctypes.c_uint32, 8), # type (see CAN_MSGTYPE_ constants) + ("ssm", ctypes.c_uint32, 1), # single shot mode + ("hpm", ctypes.c_uint32, 1), # high priority message + ("edl", ctypes.c_uint32, 1), # extended data length + ("fdr", ctypes.c_uint32, 1), # fast data bit rate + ("esi", ctypes.c_uint32, 1), # error state indicator + ("res", ctypes.c_uint32, 3), # reserved set to 0 + ("dlc", ctypes.c_uint32, 4), # data length code + ("ovr", ctypes.c_uint32, 1), # data overrun + ("srr", ctypes.c_uint32, 1), # self reception request + ("rtr", ctypes.c_uint32, 1), # remote transmission request + ( + "ext", + ctypes.c_uint32, + 1, + ), # extended frame format (0=standard, 1=extended) + ("afc", ctypes.c_uint32, 8), # accept code (see CAN_ACCEPT_ constants) ] _fields_ = [("Bytes", Bytes), ("Bits", Bits)] @@ -164,3 +174,123 @@ class CANCYCLICTXMSG(ctypes.Structure): PCANCYCLICTXMSG = ctypes.POINTER(CANCYCLICTXMSG) + + +class CANBTP(ctypes.Structure): + _fields_ = [ + ("dwMode", ctypes.c_uint32), # timing mode (see CAN_BTMODE_ const) + ("dwBPS", ctypes.c_uint32), # bits per second or prescaler (see CAN_BTMODE_RAW) + ("wTS1", ctypes.c_uint16), # length of time segment 1 in quanta + ("wTS2", ctypes.c_uint16), # length of time segment 2 in quanta + ("wSJW", ctypes.c_uint16), # re-synchronization jump width im quanta + ( + "wTDO", + ctypes.c_uint16, + ), # transceiver delay offset (SSP offset) in quanta (0 = disabled, 0xFFFF = simplified SSP positioning) + ] + + def __str__(self): + return "dwMode=%d, dwBPS=%d, wTS1=%d, wTS2=%d, wSJW=%d, wTDO=%d" % ( + self.dwMode, + self.dwBPS, + self.wTS1, + self.wTS2, + self.wSJW, + self.wTDO, + ) + + +PCANBTP = ctypes.POINTER(CANBTP) + + +class CANCAPABILITIES2(ctypes.Structure): + _fields_ = [ + ("wCtrlType", ctypes.c_uint16), # Type of CAN controller (see CAN_CTRL_ const) + ("wBusCoupling", ctypes.c_uint16), # Type of Bus coupling (see CAN_BUSC_ const) + ( + "dwFeatures", + ctypes.c_uint32, + ), # supported features (see CAN_FEATURE_ constants) + ("dwCanClkFreq", ctypes.c_uint32), # CAN clock frequency [Hz] + ("sSdrRangeMin", CANBTP), # minimum bit timing values for standard bit rate + ("sSdrRangeMax", CANBTP), # maximum bit timing values for standard bit rate + ("sFdrRangeMin", CANBTP), # minimum bit timing values for fast data bit rate + ("sFdrRangeMax", CANBTP), # maximum bit timing values for fast data bit rate + ( + "dwTscClkFreq", + ctypes.c_uint32, + ), # clock frequency of the time stamp counter [Hz] + ("dwTscDivisor", ctypes.c_uint32), # divisor for the message time stamp counter + ( + "dwCmsClkFreq", + ctypes.c_uint32, + ), # clock frequency of cyclic message scheduler [Hz] + ("dwCmsDivisor", ctypes.c_uint32), # divisor for the cyclic message scheduler + ( + "dwCmsMaxTicks", + ctypes.c_uint32, + ), # maximum tick count value of the cyclic message + ( + "dwDtxClkFreq", + ctypes.c_uint32, + ), # clock frequency of the delayed message transmitter [Hz] + ( + "dwDtxDivisor", + ctypes.c_uint32, + ), # divisor for the delayed message transmitter + ( + "dwDtxMaxTicks", + ctypes.c_uint32, + ), # maximum tick count value of the delayed message transmitter + ] + + +PCANCAPABILITIES2 = ctypes.POINTER(CANCAPABILITIES2) + + +class CANLINESTATUS2(ctypes.Structure): + _fields_ = [ + ("bOpMode", ctypes.c_uint8), # current CAN operating mode + ("bExMode", ctypes.c_uint8), # current CAN extended operating mode + ("bBusLoad", ctypes.c_uint8), # average bus load in percent (0..100) + ("bReserved", ctypes.c_uint8), # reserved set to 0 + ("sBtpSdr", ctypes.c_uint8), # standard bit rate timing + ("sBtpFdr", ctypes.c_uint8), # fast data bit rate timing + ("dwStatus", ctypes.c_uint32), # status of the CAN controller (see CAN_STATUS_) + ] + + +PCANLINESTATUS2 = ctypes.POINTER(CANLINESTATUS2) + + +class CANMSG2(ctypes.Structure): + _fields_ = [ + ("dwTime", ctypes.c_uint32), # time stamp for receive message + ("rsvd", ctypes.c_uint32), # reserved (set to 0) + ("dwMsgId", ctypes.c_uint32), # CAN message identifier (INTEL format) + ("uMsgInfo", CANMSGINFO), # message information (bit field) + ("abData", ctypes.c_uint8 * 64), # message data + ] + + +PCANMSG2 = ctypes.POINTER(CANMSG2) + + +class CANCYCLICTXMSG2(ctypes.Structure): + _fields_ = [ + ("wCycleTime", ctypes.c_uint16), # cycle time for the message in ticks + ( + "bIncrMode", + ctypes.c_uint8, + ), # auto increment mode (see CAN_CTXMSG_INC_ const) + ( + "bByteIndex", + ctypes.c_uint8, + ), # index of the byte within abData[] to increment + ("dwMsgId", ctypes.c_uint32), # message identifier (INTEL format) + ("uMsgInfo", CANMSGINFO), # message information (bit field) + ("abData", ctypes.c_uint8 * 64), # message data + ] + + +PCANCYCLICTXMSG2 = ctypes.POINTER(CANCYCLICTXMSG2) diff --git a/doc/interfaces/ixxat.rst b/doc/interfaces/ixxat.rst index 929221733..28fb6f314 100644 --- a/doc/interfaces/ixxat.rst +++ b/doc/interfaces/ixxat.rst @@ -36,11 +36,22 @@ Python-can will search for the first IXXAT device available and open the first c ``interface`` and ``channel`` parameters are interpreted by frontend ``can.interfaces.interface`` module, while the following parameters are optional and are interpreted by IXXAT implementation. -* ``bitrate`` (default 500000) Channel bitrate -* ``UniqueHardwareId`` (default first device) Unique hardware ID of the IXXAT device -* ``rxFifoSize`` (default 16) Number of RX mailboxes -* ``txFifoSize`` (default 16) Number of TX mailboxes -* ``extended`` (default False) Allow usage of extended IDs +* ``receive_own_messages`` (default False) Enable self-reception of sent messages. +* ``unique_hardware_id`` (default first device) Unique hardware ID of the IXXAT device. +* ``extended`` (default True) Allow usage of extended IDs. +* ``fd`` (default False) Enable CAN-FD capabilities. +* ``rx_fifo_size`` (default 16 for CAN, 1024 for CAN-FD) Number of RX mailboxes. +* ``tx_fifo_size`` (default 16 for CAN, 128 for CAN-FD) Number of TX mailboxes. +* ``bitrate`` (default 500000) Channel bitrate. +* ``data_bitrate`` (defaults to 2Mbps) Channel data bitrate (only canfd, to use when message bitrate_switch is used). +* ``sjw_abr`` (optional, only canfd) Bus timing value sample jump width (arbitration). +* ``tseg1_abr`` (optional, only canfd) Bus timing value tseg1 (arbitration). +* ``tseg2_abr`` (optional, only canfd) Bus timing value tseg2 (arbitration). +* ``sjw_dbr`` (optional, only used if baudrate switch enabled) Bus timing value sample jump width (data). +* ``tseg1_dbr`` (optional, only used if baudrate switch enabled) Bus timing value tseg1 (data). +* ``tseg2_dbr`` (optional, only used if baudrate switch enabled) Bus timing value tseg2 (data). +* ``ssp_dbr`` (optional, only used if baudrate switch enabled) Secondary sample point (data). + Filtering @@ -78,5 +89,6 @@ explicitly instantiated by the caller. - ``recv()`` is a blocking call with optional timeout. - ``send()`` is not blocking but may raise a VCIError if the TX FIFO is full -RX and TX FIFO sizes are configurable with ``rxFifoSize`` and ``txFifoSize`` +RX and TX FIFO sizes are configurable with ``rx_fifo_size`` and ``tx_fifo_size`` options, defaulting to 16 for both. + diff --git a/test/test_interface_ixxat.py b/test/test_interface_ixxat.py index 55618c769..ccd985051 100644 --- a/test/test_interface_ixxat.py +++ b/test/test_interface_ixxat.py @@ -26,13 +26,13 @@ def test_bus_creation(self): with self.assertRaises(ValueError): can.Bus(interface="ixxat", channel=-1) - # rxFifoSize must be > 0 + # rx_fifo_size must be > 0 with self.assertRaises(ValueError): - can.Bus(interface="ixxat", channel=0, rxFifoSize=0) + can.Bus(interface="ixxat", channel=0, rx_fifo_size=0) - # txFifoSize must be > 0 + # tx_fifo_size must be > 0 with self.assertRaises(ValueError): - can.Bus(interface="ixxat", channel=0, txFifoSize=0) + can.Bus(interface="ixxat", channel=0, tx_fifo_size=0) class HardwareTestCase(unittest.TestCase): diff --git a/test/test_interface_ixxat_fd.py b/test/test_interface_ixxat_fd.py new file mode 100644 index 000000000..0aa999a21 --- /dev/null +++ b/test/test_interface_ixxat_fd.py @@ -0,0 +1,62 @@ +""" +Unittest for ixxat interface using fd option. + +Run only this test: +python setup.py test --addopts "--verbose -s test/test_interface_ixxat_fd.py" +""" + +import unittest +import can + + +class SoftwareTestCase(unittest.TestCase): + """ + Test cases that test the software only and do not rely on an existing/connected hardware. + """ + + def setUp(self): + try: + bus = can.Bus(interface="ixxat", fd=True, channel=0) + bus.shutdown() + except can.CanInterfaceNotImplementedError: + raise unittest.SkipTest("not available on this platform") + + def test_bus_creation(self): + # channel must be >= 0 + with self.assertRaises(ValueError): + can.Bus(interface="ixxat", fd=True, channel=-1) + + # rx_fifo_size must be > 0 + with self.assertRaises(ValueError): + can.Bus(interface="ixxat", fd=True, channel=0, rx_fifo_size=0) + + # tx_fifo_size must be > 0 + with self.assertRaises(ValueError): + can.Bus(interface="ixxat", fd=True, channel=0, tx_fifo_size=0) + + +class HardwareTestCase(unittest.TestCase): + """ + Test cases that rely on an existing/connected hardware. + """ + + def setUp(self): + try: + bus = can.Bus(interface="ixxat", fd=True, channel=0) + bus.shutdown() + except can.CanInterfaceNotImplementedError: + raise unittest.SkipTest("not available on this platform") + + def test_bus_creation(self): + # non-existent channel -> use arbitrary high value + with self.assertRaises(can.CanInitializationError): + can.Bus(interface="ixxat", fd=True, channel=0xFFFF) + + def test_send_after_shutdown(self): + with can.Bus(interface="ixxat", fd=True, channel=0) as bus: + with self.assertRaises(can.CanOperationError): + bus.send(can.Message(arbitration_id=0x3FF, dlc=0)) + + +if __name__ == "__main__": + unittest.main()