From c240aed85c4fdf9ecc021367d77f51ed5621c9aa Mon Sep 17 00:00:00 2001 From: bri3d Date: Thu, 11 Feb 2021 17:05:05 -0700 Subject: [PATCH 1/4] usb2can-libusb driver for native 8devices CAN on OSX and other platforms --- can/interfaces/__init__.py | 1 + can/interfaces/usb2can_libusb/__init__.py | 5 + .../usb2can_libusb/can_8dev_usb_device.py | 303 ++++++++++++++++++ .../usb2can_libusb/usb2can_libusb_bus.py | 103 ++++++ 4 files changed, 412 insertions(+) create mode 100644 can/interfaces/usb2can_libusb/__init__.py create mode 100644 can/interfaces/usb2can_libusb/can_8dev_usb_device.py create mode 100644 can/interfaces/usb2can_libusb/usb2can_libusb_bus.py diff --git a/can/interfaces/__init__.py b/can/interfaces/__init__.py index ae088c4bc..bdda5def7 100644 --- a/can/interfaces/__init__.py +++ b/can/interfaces/__init__.py @@ -13,6 +13,7 @@ "serial": ("can.interfaces.serial.serial_can", "SerialBus"), "pcan": ("can.interfaces.pcan", "PcanBus"), "usb2can": ("can.interfaces.usb2can", "Usb2canBus"), + "usb2can_libusb": ("can.interfaces.usb2can_libusb", "Usb2CanLibUsbBus"), "ixxat": ("can.interfaces.ixxat", "IXXATBus"), "nican": ("can.interfaces.nican", "NicanBus"), "iscan": ("can.interfaces.iscan", "IscanBus"), diff --git a/can/interfaces/usb2can_libusb/__init__.py b/can/interfaces/usb2can_libusb/__init__.py new file mode 100644 index 000000000..b314b4398 --- /dev/null +++ b/can/interfaces/usb2can_libusb/__init__.py @@ -0,0 +1,5 @@ +""" +""" + +from .usb2can_libusb_bus import Usb2CanLibUsbBus +from .can_8dev_usb_device import Can8DevUSBDevice diff --git a/can/interfaces/usb2can_libusb/can_8dev_usb_device.py b/can/interfaces/usb2can_libusb/can_8dev_usb_device.py new file mode 100644 index 000000000..9673b7cb7 --- /dev/null +++ b/can/interfaces/usb2can_libusb/can_8dev_usb_device.py @@ -0,0 +1,303 @@ +from enum import Enum +import queue +from threading import Thread + +import usb.core +import usb.util + +MAX_8DEV_RECV_QUEUE = 128 # Maximum number of slots in the recv queue + +USB_8DEV_VENDOR_ID = 0x0483 # Unfortunately this is actually the ST Microelectronics Vendor ID +USB_8DEV_PRODUCT_ID = 0x1234 # Unfortunately this is pretty bogus +USB_8DEV_PRODUCT_STRING = "USB2CAN converter" # So we use this instead. Not great. + +USB_8DEV_ABP_CLOCK = 32000000 + +# USB Bulk Endpoint identifiers + +USB_8DEV_ENDP_DATA_RX = 0x81 +USB_8DEV_ENDP_DATA_TX = 0x2 +USB_8DEV_ENDP_CMD_RX = 0x83 +USB_8DEV_ENDP_CMD_TX = 0x4 + +# Open Device Options + +USB_8DEV_SILENT = 0x01 +USB_8DEV_LOOPBACK = 0x02 +USB_8DEV_DISABLE_AUTO_RESTRANS = 0x04 +USB_8DEV_STATUS_FRAME = 0x08 + +# Command options +USB_8DEV_BAUD_MANUAL = 0x09 +USB_8DEV_CMD_START = 0x11 +USB_8DEV_CMD_END = 0x22 + +USB_8DEV_CMD_SUCCESS = 0 +USB_8DEV_CMD_ERROR = 255 + +USB_8DEV_CMD_TIMEOUT = 1000 + +# Framing definitions +USB_8DEV_DATA_START = 0x55 +USB_8DEV_DATA_END = 0xAA + +USB_8DEV_TYPE_CAN_FRAME = 0 +USB_8DEV_TYPE_ERROR_FRAME = 3 + +USB_8DEV_EXTID = 0x01 +USB_8DEV_RTR = 0x02 +USB_8DEV_ERR_FLAG = 0x04 + +# Status messages +USB_8DEV_STATUSMSG_OK = 0x00 +USB_8DEV_STATUSMSG_OVERRUN = 0x01 # Overrun occured when sending */ +USB_8DEV_STATUSMSG_BUSLIGHT = 0x02 # Error counter has reached 96 */ +USB_8DEV_STATUSMSG_BUSHEAVY = 0x03 # Error count. has reached 128 */ +USB_8DEV_STATUSMSG_BUSOFF = 0x04 # Device is in BUSOFF */ +USB_8DEV_STATUSMSG_STUFF = 0x20 # Stuff Error */ +USB_8DEV_STATUSMSG_FORM = 0x21 # Form Error */ +USB_8DEV_STATUSMSG_ACK = 0x23 # Ack Error */ +USB_8DEV_STATUSMSG_BIT0 = 0x24 # Bit1 Error */ +USB_8DEV_STATUSMSG_BIT1 = 0x25 # Bit0 Error */ +USB_8DEV_STATUSMSG_CRC = 0x27 # CRC Error */ + +USB_8DEV_RP_MASK = 0x7F # Mask for Receive Error Bit */ + +# Available Commands + +class Can8DevCommand(Enum): + USB_8DEV_RESET = 1 # Reset Device + USB_8DEV_OPEN = 2 # Open Port + USB_8DEV_CLOSE = 3 # Close Port + USB_8DEV_SET_SPEED = 4 + USB_8DEV_SET_MASK_FILTER = 5 # Unfortunately unknown parameters and supposedly un-implemented on early firmwares + USB_8DEV_GET_STATUS = 6 + USB_8DEV_GET_STATISTICS = 7 + USB_8DEV_GET_SERIAL = 8 + USB_8DEV_GET_SOFTW_VER = 9 + USB_8DEV_GET_HARDW_VER = 0xA + USB_8DEV_RESET_TIMESTAMP = 0xB + USB_8DEV_GET_SOFTW_HARDW_VER = 0xC + +class Can8DevTxFrame(): + flags: int + id: int + dlc: int + data: bytes + + def __init__(self, can_id: int, dlc: int, data: bytes, is_ext: bool, is_remote:bool): + self.can_id = can_id + self.dlc = dlc + self.data = data + self.flags = 0 + if is_ext: + self.flags |= USB_8DEV_EXTID + if is_remote: + self.flags |= USB_8DEV_RTR + + def _pad_data(self, data: bytes): + data_bytes = bytearray(8) + for i in range(0, 7): + if i < len(data): + data_bytes[i] = data[i] + return bytes(data_bytes) + + def to_bytes(self): + cmd_buf = bytearray() + cmd_buf.append(USB_8DEV_DATA_START) + cmd_buf.append(self.flags) + id_bytes = self.can_id.to_bytes(4, byteorder='big') + cmd_buf.extend(id_bytes) + cmd_buf.append(self.dlc) + cmd_buf.extend(self._pad_data(self.data)) + cmd_buf.append(USB_8DEV_DATA_END) + return bytes(cmd_buf) + +class Can8DevRxFrame(): + data: bytes + id: int + dlc: int + timestamp: int + ext_id: bool + is_error: bool + is_remote: bool + + def __init__(self, bytes_in: bytes): + if(len(bytes_in) != 21): + raise ValueError("Did not receive 21 bytes for 8Dev Data Frame") + if(bytes_in[0] != USB_8DEV_DATA_START): + raise ValueError("Did not receive a valid 8Dev Data Frame") + if(bytes_in[1] == USB_8DEV_TYPE_CAN_FRAME): + self.data = bytes_in[8:16] + self.dlc = bytes_in[7] + self.ext_id = bytes_in[2] & USB_8DEV_EXTID + self.is_remote = bytes_in[2] & USB_8DEV_RTR + self.id = int.from_bytes(bytes_in[3:7], byteorder='big') + self.timestamp = int.from_bytes(bytes_in[16:20], byteorder='big') + self.is_error = False + elif(bytes_in[1] == USB_8DEV_TYPE_ERROR_FRAME): + self.is_error = True + self.data = bytes_in[7:15] + self.timestamp = int.from_bytes(bytes_in[16:20], byteorder='big') + else: + raise ValueError("8Dev Data Frame with Unknown Type") + + + +class Can8DevCommandFrame(): + command: Can8DevCommand + opt1: int + opt2: int + data: bytes + + def __init__(self, command, data = bytes(), opt1 = 0, opt2 = 0): + self.command = command + self.data = data + self.opt1 = opt1 + self.opt2 = opt2 + + def _pad_data(self, data: bytes): + data_bytes = bytearray(10) + for i in range(0, 9): + if i < len(data): + data_bytes[i] = data[i] + return bytes(data_bytes) + + def to_bytes(self): + cmd_buf = bytearray() + cmd_buf.append(USB_8DEV_CMD_START) + cmd_buf.append(0) # Supposedly could be a channel value, but unknown + cmd_buf.append(self.command.value) + cmd_buf.append(self.opt1) + cmd_buf.append(self.opt2) + cmd_buf.extend(self._pad_data(self.data)) + cmd_buf.append(USB_8DEV_CMD_END) + return bytes(cmd_buf) + + def from_bytes(byte_input: bytes): + if(len(byte_input) != 16): + raise ValueError("Did not receive 16 bytes for 8Dev Command Frame") + return Can8DevCommandFrame(Can8DevCommand(byte_input[2]), byte_input[5:15], byte_input[3], byte_input[4]) + +class Can8DevUSBDevice(): + cmd_rx_ep: usb.core.Endpoint + cmd_tx_ep: usb.core.Endpoint + data_rx_ep: usb.core.Endpoint + data_tx_ep: usb.core.Endpoint + serial_number: str + _close: bool + _rx_queue: queue.Queue + _recv_thread: Thread + + def __init__(self, serial_number=None): + if (serial_number is not None): + dev = usb.core.find(idVendor=USB_8DEV_VENDOR_ID, idProduct=USB_8DEV_PRODUCT_ID, serial_number=serial_number) + else: + dev = usb.core.find(idVendor=USB_8DEV_VENDOR_ID, idProduct=USB_8DEV_PRODUCT_ID) + + if dev is None: + raise ValueError('8Devices CAN interface not found! Serial number provided: %s' % serial_number) + + self.serial_number = dev.serial_number + + dev.reset() + # set the active configuration. With no arguments, the first + # configuration will be the active one + dev.set_configuration() + + # get an endpoint instance + cfg = dev.get_active_configuration() + intf = cfg[(0,0)] + + self.cmd_rx_ep: usb.core.Endpoint = usb.util.find_descriptor(intf, bEndpointAddress=USB_8DEV_ENDP_CMD_RX) + self.cmd_tx_ep: usb.core.Endpoint = usb.util.find_descriptor(intf, bEndpointAddress=USB_8DEV_ENDP_CMD_TX) + self.data_rx_ep: usb.core.Endpoint = usb.util.find_descriptor(intf, bEndpointAddress=USB_8DEV_ENDP_DATA_RX) + self.data_tx_ep: usb.core.Endpoint = usb.util.find_descriptor(intf, bEndpointAddress=USB_8DEV_ENDP_DATA_TX) + + if self.cmd_rx_ep is None or self.cmd_tx_ep is None or self.data_rx_ep is None or self.data_tx_ep is None: + raise ValueError('Could not configure 8Devices CAN endpoints!') + + + self._rx_queue = queue.Queue(MAX_8DEV_RECV_QUEUE) + + def _recv_thread_loop(self): + while True: + byte_buffer = bytes() + try: + # We must read the full possible buffer size each iteration or we risk a buffer overrun exception losing data. + byte_buffer = self.data_rx_ep.read(512, 0).tobytes() + except Exception: + pass + for i in range(0,len(byte_buffer),21): + # We could have read multiple frames in a single bulk xfer + self._rx_queue.put(Can8DevRxFrame(byte_buffer[i:i+21])) + if(self._close): + return + + def _start_recv_thread(self): + self._close = False + self._recv_thread = Thread(target = self._recv_thread_loop, daemon=True) + self._recv_thread.start() + + def _stop_recv_thread(self): + self._close = True + + def send_command(self, cmd: Can8DevCommand, data: bytes = bytes(), opt1 = 0, opt2= 0): + frame = Can8DevCommandFrame(cmd, data, opt1, opt2) + self.cmd_tx_ep.write(frame.to_bytes()) + return Can8DevCommandFrame.from_bytes(self.cmd_rx_ep.read(16)) + + def open(self, phase_seg1: int, phase_seg2: int, sjw: int, brp: int, loopback: bool = False, listenonly: bool = False, oneshot: bool = False): + self.send_command(Can8DevCommand.USB_8DEV_RESET) + open_command = Can8DevCommand.USB_8DEV_OPEN + opt1 = USB_8DEV_BAUD_MANUAL + flags = 0 + if loopback: + flags |= USB_8DEV_LOOPBACK + if listenonly: + flags |= USB_8DEV_SILENT + if oneshot: + flags |= USB_8DEV_DISABLE_AUTO_RESTRANS + flags_bytes = flags.to_bytes(4, 'big') + brp_bytes = brp.to_bytes(2, 'big') + data = bytearray(10) + data[0] = phase_seg1 + data[1] = phase_seg2 + data[2] = sjw + data[3] = brp_bytes[0] + data[4] = brp_bytes[1] + data[5] = flags_bytes[0] + data[6] = flags_bytes[1] + data[7] = flags_bytes[2] + data[8] = flags_bytes[3] + if (self.send_command(open_command, data, opt1).opt1 == 0): + self._start_recv_thread() + return True + else: + return False + + def close(self): + self._stop_recv_thread() + close_command = Can8DevCommand.USB_8DEV_CLOSE + self.send_command(close_command) + + def recv(self, timeout = None): + try: + return self._rx_queue.get(True, timeout = timeout/1000) + except queue.Empty: + return None + + def send(self, tx_frame: Can8DevTxFrame, timeout = None): + self.data_tx_ep.write(tx_frame.to_bytes(), timeout) + + def get_version(self): + cmd_response = self.send_command(Can8DevCommand.USB_8DEV_GET_SOFTW_HARDW_VER) + version = int.from_bytes(cmd_response.data[0:4], byteorder='big') + return version + + def get_firmware_version(self): + version = self.get_version() + return "%d.%d" % ((version >> 24) & 0xFF, (version >> 16) & 0xFF) + + def get_serial_number(self): + return self.serial_number \ No newline at end of file diff --git a/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py b/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py new file mode 100644 index 000000000..216083969 --- /dev/null +++ b/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py @@ -0,0 +1,103 @@ +""" +This interface requires LibUSB and `pyusb` to be installed on your system. +The interface will bind by default to the first device with VID +""" + +import logging +from ctypes import byref + +from can import BusABC, Message, CanError, BitTiming +from .can_8dev_usb_device import * + +# Set up logging +log = logging.getLogger("can.usb2can_libusb") + +def message_convert_tx(msg): + """convert message from PythonCAN Message to 8Devices frame""" + return Can8DevTxFrame(can_id = msg.arbitration_id, dlc = msg.dlc, data = msg.data, is_ext = msg.is_extended_id, is_remote = msg.is_remote_frame) + +def message_convert_rx(message_rx: Can8DevRxFrame): + """convert message from 8Devices frame to PythonCAN Message""" + + if(message_rx.is_error): + return Message( + timestamp=message_rx.timestamp / 1000, + is_error_frame=message_rx.is_error, + data=message_rx.data + ) + + return Message( + timestamp=message_rx.timestamp / 1000, + is_remote_frame=message_rx.is_remote, + is_extended_id=message_rx.ext_id, + is_error_frame=message_rx.is_error, + arbitration_id=message_rx.id, + dlc=message_rx.dlc, + data=message_rx.data[: message_rx.dlc], + ) + + +class Usb2CanLibUsbBus(BusABC): + """Interface to an 8Devices USB2CAN Bus. + + This device should work on any platform with a working LibUSB and PyUSB. It was tested with a "Korlan USB2Can" but should work with the older module as well. + + Hardware filtering is not provided, if anyone knows how the 8Devices filtering command works, this would be valuable. + + Based on the in-tree Linux kernel SocketCAN driver for USB2CAN. + + :param str channel (optional): + The device's serial number. If not provided, the first matching VID/DID will match (WARNING: 8Devices reuse a random ST VID/DID, so other devices may match!) + + :param int bitrate (optional): + Bitrate of channel in bit/s. Values will be limited to a maximum of 1000 Kb/s. + Default is 500 Kbs + + :param int flags (optional): + Flags to directly pass to open function of the usb2can abstraction layer. + """ + + def __init__( + self, + channel=None, + *args, + bitrate=500000, + **kwargs + ): + + self.can = Can8DevUSBDevice() + + # convert to kb/s and cap: max rate is 1000 kb/s + baudrate = min(int(bitrate // 1000), 1000) + + self.channel_info = "USB2CAN LibUSB device {}".format(self.can.get_serial_number()) + + connector = "{}; {}".format("USB2Can_LibUSB", baudrate) + + timing = BitTiming(tseg1=6, tseg2=1, sjw=1, bitrate = bitrate, f_clock = USB_8DEV_ABP_CLOCK) + self.can.open(timing.tseg1, timing.tseg2, timing.sjw, timing.brp) + + super().__init__( + channel=channel, bitrate=bitrate, *args, **kwargs + ) + + def send(self, msg, timeout=None): + tx = message_convert_tx(msg) + if(timeout is not None): + timeout *= 1000 + self.can.send(tx, timeout) + + def _recv_internal(self, timeout): + if(timeout is not None): + timeout *= 1000 + messagerx = self.can.recv(timeout) + rx = None + if(messagerx is not None): + rx = message_convert_rx(messagerx) + return rx, False + + def shutdown(self): + """ + Shuts down connection to the device. + """ + self.can.close() \ No newline at end of file From cdd24f68ed775701ea025d82839002d94c6cd163 Mon Sep 17 00:00:00 2001 From: bri3d Date: Fri, 12 Feb 2021 12:39:56 -0700 Subject: [PATCH 2/4] Satisfy the linter and add basic docs --- .../usb2can_libusb/can_8dev_usb_device.py | 162 +++++++++++------- .../usb2can_libusb/usb2can_libusb_bus.py | 44 ++--- doc/interfaces.rst | 1 + doc/interfaces/usb2can_libusb.rst | 57 ++++++ 4 files changed, 185 insertions(+), 79 deletions(-) create mode 100644 doc/interfaces/usb2can_libusb.rst diff --git a/can/interfaces/usb2can_libusb/can_8dev_usb_device.py b/can/interfaces/usb2can_libusb/can_8dev_usb_device.py index 9673b7cb7..e43139e02 100644 --- a/can/interfaces/usb2can_libusb/can_8dev_usb_device.py +++ b/can/interfaces/usb2can_libusb/can_8dev_usb_device.py @@ -5,11 +5,13 @@ import usb.core import usb.util -MAX_8DEV_RECV_QUEUE = 128 # Maximum number of slots in the recv queue +MAX_8DEV_RECV_QUEUE = 128 # Maximum number of slots in the recv queue -USB_8DEV_VENDOR_ID = 0x0483 # Unfortunately this is actually the ST Microelectronics Vendor ID -USB_8DEV_PRODUCT_ID = 0x1234 # Unfortunately this is pretty bogus -USB_8DEV_PRODUCT_STRING = "USB2CAN converter" # So we use this instead. Not great. +USB_8DEV_VENDOR_ID = ( + 0x0483 +) # Unfortunately this is actually the ST Microelectronics Vendor ID +USB_8DEV_PRODUCT_ID = 0x1234 # Unfortunately this is pretty bogus +USB_8DEV_PRODUCT_STRING = "USB2CAN converter" # So we use this instead. Not great. USB_8DEV_ABP_CLOCK = 32000000 @@ -65,12 +67,15 @@ # Available Commands + class Can8DevCommand(Enum): - USB_8DEV_RESET = 1 # Reset Device - USB_8DEV_OPEN = 2 # Open Port - USB_8DEV_CLOSE = 3 # Close Port - USB_8DEV_SET_SPEED = 4 - USB_8DEV_SET_MASK_FILTER = 5 # Unfortunately unknown parameters and supposedly un-implemented on early firmwares + USB_8DEV_RESET = 1 # Reset Device + USB_8DEV_OPEN = 2 # Open Port + USB_8DEV_CLOSE = 3 # Close Port + USB_8DEV_SET_SPEED = 4 + USB_8DEV_SET_MASK_FILTER = ( + 5 + ) # Unfortunately unknown parameters and supposedly un-implemented on early firmwares USB_8DEV_GET_STATUS = 6 USB_8DEV_GET_STATISTICS = 7 USB_8DEV_GET_SERIAL = 8 @@ -79,13 +84,16 @@ class Can8DevCommand(Enum): USB_8DEV_RESET_TIMESTAMP = 0xB USB_8DEV_GET_SOFTW_HARDW_VER = 0xC -class Can8DevTxFrame(): + +class Can8DevTxFrame: flags: int id: int dlc: int data: bytes - def __init__(self, can_id: int, dlc: int, data: bytes, is_ext: bool, is_remote:bool): + def __init__( + self, can_id: int, dlc: int, data: bytes, is_ext: bool, is_remote: bool + ): self.can_id = can_id self.dlc = dlc self.data = data @@ -106,14 +114,15 @@ def to_bytes(self): cmd_buf = bytearray() cmd_buf.append(USB_8DEV_DATA_START) cmd_buf.append(self.flags) - id_bytes = self.can_id.to_bytes(4, byteorder='big') + id_bytes = self.can_id.to_bytes(4, byteorder="big") cmd_buf.extend(id_bytes) cmd_buf.append(self.dlc) cmd_buf.extend(self._pad_data(self.data)) cmd_buf.append(USB_8DEV_DATA_END) return bytes(cmd_buf) -class Can8DevRxFrame(): + +class Can8DevRxFrame: data: bytes id: int dlc: int @@ -123,34 +132,33 @@ class Can8DevRxFrame(): is_remote: bool def __init__(self, bytes_in: bytes): - if(len(bytes_in) != 21): + if len(bytes_in) != 21: raise ValueError("Did not receive 21 bytes for 8Dev Data Frame") - if(bytes_in[0] != USB_8DEV_DATA_START): + if bytes_in[0] != USB_8DEV_DATA_START: raise ValueError("Did not receive a valid 8Dev Data Frame") - if(bytes_in[1] == USB_8DEV_TYPE_CAN_FRAME): + if bytes_in[1] == USB_8DEV_TYPE_CAN_FRAME: self.data = bytes_in[8:16] self.dlc = bytes_in[7] self.ext_id = bytes_in[2] & USB_8DEV_EXTID self.is_remote = bytes_in[2] & USB_8DEV_RTR - self.id = int.from_bytes(bytes_in[3:7], byteorder='big') - self.timestamp = int.from_bytes(bytes_in[16:20], byteorder='big') + self.id = int.from_bytes(bytes_in[3:7], byteorder="big") + self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") self.is_error = False - elif(bytes_in[1] == USB_8DEV_TYPE_ERROR_FRAME): + elif bytes_in[1] == USB_8DEV_TYPE_ERROR_FRAME: self.is_error = True self.data = bytes_in[7:15] - self.timestamp = int.from_bytes(bytes_in[16:20], byteorder='big') + self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") else: raise ValueError("8Dev Data Frame with Unknown Type") - -class Can8DevCommandFrame(): +class Can8DevCommandFrame: command: Can8DevCommand opt1: int opt2: int data: bytes - def __init__(self, command, data = bytes(), opt1 = 0, opt2 = 0): + def __init__(self, command, data=bytes(), opt1=0, opt2=0): self.command = command self.data = data self.opt1 = opt1 @@ -166,7 +174,7 @@ def _pad_data(self, data: bytes): def to_bytes(self): cmd_buf = bytearray() cmd_buf.append(USB_8DEV_CMD_START) - cmd_buf.append(0) # Supposedly could be a channel value, but unknown + cmd_buf.append(0) # Supposedly could be a channel value, but unknown cmd_buf.append(self.command.value) cmd_buf.append(self.opt1) cmd_buf.append(self.opt2) @@ -175,11 +183,17 @@ def to_bytes(self): return bytes(cmd_buf) def from_bytes(byte_input: bytes): - if(len(byte_input) != 16): + if len(byte_input) != 16: raise ValueError("Did not receive 16 bytes for 8Dev Command Frame") - return Can8DevCommandFrame(Can8DevCommand(byte_input[2]), byte_input[5:15], byte_input[3], byte_input[4]) + return Can8DevCommandFrame( + Can8DevCommand(byte_input[2]), + byte_input[5:15], + byte_input[3], + byte_input[4], + ) + -class Can8DevUSBDevice(): +class Can8DevUSBDevice: cmd_rx_ep: usb.core.Endpoint cmd_tx_ep: usb.core.Endpoint data_rx_ep: usb.core.Endpoint @@ -190,13 +204,22 @@ class Can8DevUSBDevice(): _recv_thread: Thread def __init__(self, serial_number=None): - if (serial_number is not None): - dev = usb.core.find(idVendor=USB_8DEV_VENDOR_ID, idProduct=USB_8DEV_PRODUCT_ID, serial_number=serial_number) + if serial_number is not None: + dev = usb.core.find( + idVendor=USB_8DEV_VENDOR_ID, + idProduct=USB_8DEV_PRODUCT_ID, + serial_number=serial_number, + ) else: - dev = usb.core.find(idVendor=USB_8DEV_VENDOR_ID, idProduct=USB_8DEV_PRODUCT_ID) + dev = usb.core.find( + idVendor=USB_8DEV_VENDOR_ID, idProduct=USB_8DEV_PRODUCT_ID + ) if dev is None: - raise ValueError('8Devices CAN interface not found! Serial number provided: %s' % serial_number) + raise ValueError( + "8Devices CAN interface not found! Serial number provided: %s" + % serial_number + ) self.serial_number = dev.serial_number @@ -207,19 +230,31 @@ def __init__(self, serial_number=None): # get an endpoint instance cfg = dev.get_active_configuration() - intf = cfg[(0,0)] - - self.cmd_rx_ep: usb.core.Endpoint = usb.util.find_descriptor(intf, bEndpointAddress=USB_8DEV_ENDP_CMD_RX) - self.cmd_tx_ep: usb.core.Endpoint = usb.util.find_descriptor(intf, bEndpointAddress=USB_8DEV_ENDP_CMD_TX) - self.data_rx_ep: usb.core.Endpoint = usb.util.find_descriptor(intf, bEndpointAddress=USB_8DEV_ENDP_DATA_RX) - self.data_tx_ep: usb.core.Endpoint = usb.util.find_descriptor(intf, bEndpointAddress=USB_8DEV_ENDP_DATA_TX) - - if self.cmd_rx_ep is None or self.cmd_tx_ep is None or self.data_rx_ep is None or self.data_tx_ep is None: - raise ValueError('Could not configure 8Devices CAN endpoints!') - + intf = cfg[(0, 0)] + + self.cmd_rx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_CMD_RX + ) + self.cmd_tx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_CMD_TX + ) + self.data_rx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_DATA_RX + ) + self.data_tx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_DATA_TX + ) + + if ( + self.cmd_rx_ep is None + or self.cmd_tx_ep is None + or self.data_rx_ep is None + or self.data_tx_ep is None + ): + raise ValueError("Could not configure 8Devices CAN endpoints!") self._rx_queue = queue.Queue(MAX_8DEV_RECV_QUEUE) - + def _recv_thread_loop(self): while True: byte_buffer = bytes() @@ -228,26 +263,35 @@ def _recv_thread_loop(self): byte_buffer = self.data_rx_ep.read(512, 0).tobytes() except Exception: pass - for i in range(0,len(byte_buffer),21): + for i in range(0, len(byte_buffer), 21): # We could have read multiple frames in a single bulk xfer - self._rx_queue.put(Can8DevRxFrame(byte_buffer[i:i+21])) - if(self._close): + self._rx_queue.put(Can8DevRxFrame(byte_buffer[i : i + 21])) + if self._close: return - + def _start_recv_thread(self): self._close = False - self._recv_thread = Thread(target = self._recv_thread_loop, daemon=True) + self._recv_thread = Thread(target=self._recv_thread_loop, daemon=True) self._recv_thread.start() - + def _stop_recv_thread(self): self._close = True - def send_command(self, cmd: Can8DevCommand, data: bytes = bytes(), opt1 = 0, opt2= 0): + def send_command(self, cmd: Can8DevCommand, data: bytes = bytes(), opt1=0, opt2=0): frame = Can8DevCommandFrame(cmd, data, opt1, opt2) self.cmd_tx_ep.write(frame.to_bytes()) return Can8DevCommandFrame.from_bytes(self.cmd_rx_ep.read(16)) - def open(self, phase_seg1: int, phase_seg2: int, sjw: int, brp: int, loopback: bool = False, listenonly: bool = False, oneshot: bool = False): + def open( + self, + phase_seg1: int, + phase_seg2: int, + sjw: int, + brp: int, + loopback: bool = False, + listenonly: bool = False, + oneshot: bool = False, + ): self.send_command(Can8DevCommand.USB_8DEV_RESET) open_command = Can8DevCommand.USB_8DEV_OPEN opt1 = USB_8DEV_BAUD_MANUAL @@ -258,8 +302,8 @@ def open(self, phase_seg1: int, phase_seg2: int, sjw: int, brp: int, loopback: b flags |= USB_8DEV_SILENT if oneshot: flags |= USB_8DEV_DISABLE_AUTO_RESTRANS - flags_bytes = flags.to_bytes(4, 'big') - brp_bytes = brp.to_bytes(2, 'big') + flags_bytes = flags.to_bytes(4, "big") + brp_bytes = brp.to_bytes(2, "big") data = bytearray(10) data[0] = phase_seg1 data[1] = phase_seg2 @@ -270,7 +314,7 @@ def open(self, phase_seg1: int, phase_seg2: int, sjw: int, brp: int, loopback: b data[6] = flags_bytes[1] data[7] = flags_bytes[2] data[8] = flags_bytes[3] - if (self.send_command(open_command, data, opt1).opt1 == 0): + if self.send_command(open_command, data, opt1).opt1 == 0: self._start_recv_thread() return True else: @@ -281,23 +325,23 @@ def close(self): close_command = Can8DevCommand.USB_8DEV_CLOSE self.send_command(close_command) - def recv(self, timeout = None): + def recv(self, timeout=None): try: - return self._rx_queue.get(True, timeout = timeout/1000) + return self._rx_queue.get(True, timeout=timeout / 1000) except queue.Empty: return None - def send(self, tx_frame: Can8DevTxFrame, timeout = None): + def send(self, tx_frame: Can8DevTxFrame, timeout=None): self.data_tx_ep.write(tx_frame.to_bytes(), timeout) def get_version(self): cmd_response = self.send_command(Can8DevCommand.USB_8DEV_GET_SOFTW_HARDW_VER) - version = int.from_bytes(cmd_response.data[0:4], byteorder='big') + version = int.from_bytes(cmd_response.data[0:4], byteorder="big") return version def get_firmware_version(self): version = self.get_version() - return "%d.%d" % ((version >> 24) & 0xFF, (version >> 16) & 0xFF) + return "%d.%d" % ((version >> 24) & 0xFF, (version >> 16) & 0xFF) def get_serial_number(self): - return self.serial_number \ No newline at end of file + return self.serial_number diff --git a/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py b/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py index 216083969..66eaa47b4 100644 --- a/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py +++ b/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py @@ -12,20 +12,28 @@ # Set up logging log = logging.getLogger("can.usb2can_libusb") + def message_convert_tx(msg): """convert message from PythonCAN Message to 8Devices frame""" - return Can8DevTxFrame(can_id = msg.arbitration_id, dlc = msg.dlc, data = msg.data, is_ext = msg.is_extended_id, is_remote = msg.is_remote_frame) + return Can8DevTxFrame( + can_id=msg.arbitration_id, + dlc=msg.dlc, + data=msg.data, + is_ext=msg.is_extended_id, + is_remote=msg.is_remote_frame, + ) + def message_convert_rx(message_rx: Can8DevRxFrame): """convert message from 8Devices frame to PythonCAN Message""" - if(message_rx.is_error): + if message_rx.is_error: return Message( timestamp=message_rx.timestamp / 1000, is_error_frame=message_rx.is_error, - data=message_rx.data + data=message_rx.data, ) - + return Message( timestamp=message_rx.timestamp / 1000, is_remote_frame=message_rx.is_remote, @@ -57,42 +65,38 @@ class Usb2CanLibUsbBus(BusABC): Flags to directly pass to open function of the usb2can abstraction layer. """ - def __init__( - self, - channel=None, - *args, - bitrate=500000, - **kwargs - ): + def __init__(self, channel=None, *args, bitrate=500000, **kwargs): self.can = Can8DevUSBDevice() # convert to kb/s and cap: max rate is 1000 kb/s baudrate = min(int(bitrate // 1000), 1000) - self.channel_info = "USB2CAN LibUSB device {}".format(self.can.get_serial_number()) + self.channel_info = "USB2CAN LibUSB device {}".format( + self.can.get_serial_number() + ) connector = "{}; {}".format("USB2Can_LibUSB", baudrate) - timing = BitTiming(tseg1=6, tseg2=1, sjw=1, bitrate = bitrate, f_clock = USB_8DEV_ABP_CLOCK) + timing = BitTiming( + tseg1=6, tseg2=1, sjw=1, bitrate=bitrate, f_clock=USB_8DEV_ABP_CLOCK + ) self.can.open(timing.tseg1, timing.tseg2, timing.sjw, timing.brp) - super().__init__( - channel=channel, bitrate=bitrate, *args, **kwargs - ) + super().__init__(channel=channel, bitrate=bitrate, *args, **kwargs) def send(self, msg, timeout=None): tx = message_convert_tx(msg) - if(timeout is not None): + if timeout is not None: timeout *= 1000 self.can.send(tx, timeout) def _recv_internal(self, timeout): - if(timeout is not None): + if timeout is not None: timeout *= 1000 messagerx = self.can.recv(timeout) rx = None - if(messagerx is not None): + if messagerx is not None: rx = message_convert_rx(messagerx) return rx, False @@ -100,4 +104,4 @@ def shutdown(self): """ Shuts down connection to the device. """ - self.can.close() \ No newline at end of file + self.can.close() diff --git a/doc/interfaces.rst b/doc/interfaces.rst index 361ea4097..2613d5e06 100644 --- a/doc/interfaces.rst +++ b/doc/interfaces.rst @@ -28,6 +28,7 @@ The available interfaces are: interfaces/socketcan interfaces/systec interfaces/usb2can + interfaces/usb2can_libusb interfaces/vector interfaces/virtual diff --git a/doc/interfaces/usb2can_libusb.rst b/doc/interfaces/usb2can_libusb.rst new file mode 100644 index 000000000..09ddffb26 --- /dev/null +++ b/doc/interfaces/usb2can_libusb.rst @@ -0,0 +1,57 @@ +USB2CAN Interface +================= + +OVERVIEW +-------- + +The `USB2CAN `_ is a cheap CAN interface. Two versions exist, "Korlan" and a previous edition. +Both are based on STM32 chips. + +There is support for this device on Linux through the :doc:`socketcan` interface and for Windows using the +``usb2can`` interface. + +This interface supports any platform with a working "pyusb". + +The device has been tested on OS X at 500kbaud against a device spamming 300+ messages/second, so it should be decently robust. + +INSTALL +_______ + +Install `pyusb` and a working pyusb backend (on most platforms, this is `libusb1`). + +This should be the only required action. + +Interface Layout +---------------- + +`USB2CanLibUsbBus` implements the basic `python-can` Bus for this device. + +`Can8DevUSBDevice` implements an abstract device driver for the hardware, based on `pyusb` and the SocketCAN kernel module for the device. + +`Can8DevUSBDevice` uses a Queue and a read thread to ensure that messages are read into host hardware before they overflow the internal buffers in the device. The `recv` methods simply poll the read queue for available messages. + +Interface Specific Items +------------------------ + +This device is really an oddball. It works well, but documentation is quite sparse. + +Filtering is not implemented because the details of how to use it are not documented in any way. + + +.. warning:: + + Currently message filtering is not implemented. Contributions are most welcome! + + +Bus +--- + +.. autoclass:: can.interfaces.usb2can_libusb.Usb2CanLibUsbBus + + +Internals +--------- + +.. autoclass:: can.interfaces.usb2can_libusb.Can8DevUSBDevice + :members: + :undoc-members: From da68a660811324435becd4788f883273ce124fbe Mon Sep 17 00:00:00 2001 From: bri3d Date: Fri, 12 Feb 2021 15:10:11 -0700 Subject: [PATCH 3/4] refactor to enable testing, test low-level functions to extent possible --- can/interfaces/usb2can_libusb/__init__.py | 1 - .../usb2can_libusb/can_8dev_usb_device.py | 237 ++---------------- .../usb2can_libusb/can_8dev_usb_utils.py | 221 ++++++++++++++++ .../usb2can_libusb/usb2can_libusb_bus.py | 11 +- test/test_usb2can_libusb.py | 52 ++++ 5 files changed, 303 insertions(+), 219 deletions(-) create mode 100644 can/interfaces/usb2can_libusb/can_8dev_usb_utils.py create mode 100644 test/test_usb2can_libusb.py diff --git a/can/interfaces/usb2can_libusb/__init__.py b/can/interfaces/usb2can_libusb/__init__.py index b314b4398..53e84b325 100644 --- a/can/interfaces/usb2can_libusb/__init__.py +++ b/can/interfaces/usb2can_libusb/__init__.py @@ -2,4 +2,3 @@ """ from .usb2can_libusb_bus import Usb2CanLibUsbBus -from .can_8dev_usb_device import Can8DevUSBDevice diff --git a/can/interfaces/usb2can_libusb/can_8dev_usb_device.py b/can/interfaces/usb2can_libusb/can_8dev_usb_device.py index e43139e02..3402aed62 100644 --- a/can/interfaces/usb2can_libusb/can_8dev_usb_device.py +++ b/can/interfaces/usb2can_libusb/can_8dev_usb_device.py @@ -1,196 +1,18 @@ -from enum import Enum +import logging import queue from threading import Thread -import usb.core -import usb.util +from .can_8dev_usb_utils import * -MAX_8DEV_RECV_QUEUE = 128 # Maximum number of slots in the recv queue +logger = logging.getLogger(__name__) -USB_8DEV_VENDOR_ID = ( - 0x0483 -) # Unfortunately this is actually the ST Microelectronics Vendor ID -USB_8DEV_PRODUCT_ID = 0x1234 # Unfortunately this is pretty bogus -USB_8DEV_PRODUCT_STRING = "USB2CAN converter" # So we use this instead. Not great. - -USB_8DEV_ABP_CLOCK = 32000000 - -# USB Bulk Endpoint identifiers - -USB_8DEV_ENDP_DATA_RX = 0x81 -USB_8DEV_ENDP_DATA_TX = 0x2 -USB_8DEV_ENDP_CMD_RX = 0x83 -USB_8DEV_ENDP_CMD_TX = 0x4 - -# Open Device Options - -USB_8DEV_SILENT = 0x01 -USB_8DEV_LOOPBACK = 0x02 -USB_8DEV_DISABLE_AUTO_RESTRANS = 0x04 -USB_8DEV_STATUS_FRAME = 0x08 - -# Command options -USB_8DEV_BAUD_MANUAL = 0x09 -USB_8DEV_CMD_START = 0x11 -USB_8DEV_CMD_END = 0x22 - -USB_8DEV_CMD_SUCCESS = 0 -USB_8DEV_CMD_ERROR = 255 - -USB_8DEV_CMD_TIMEOUT = 1000 - -# Framing definitions -USB_8DEV_DATA_START = 0x55 -USB_8DEV_DATA_END = 0xAA - -USB_8DEV_TYPE_CAN_FRAME = 0 -USB_8DEV_TYPE_ERROR_FRAME = 3 - -USB_8DEV_EXTID = 0x01 -USB_8DEV_RTR = 0x02 -USB_8DEV_ERR_FLAG = 0x04 - -# Status messages -USB_8DEV_STATUSMSG_OK = 0x00 -USB_8DEV_STATUSMSG_OVERRUN = 0x01 # Overrun occured when sending */ -USB_8DEV_STATUSMSG_BUSLIGHT = 0x02 # Error counter has reached 96 */ -USB_8DEV_STATUSMSG_BUSHEAVY = 0x03 # Error count. has reached 128 */ -USB_8DEV_STATUSMSG_BUSOFF = 0x04 # Device is in BUSOFF */ -USB_8DEV_STATUSMSG_STUFF = 0x20 # Stuff Error */ -USB_8DEV_STATUSMSG_FORM = 0x21 # Form Error */ -USB_8DEV_STATUSMSG_ACK = 0x23 # Ack Error */ -USB_8DEV_STATUSMSG_BIT0 = 0x24 # Bit1 Error */ -USB_8DEV_STATUSMSG_BIT1 = 0x25 # Bit0 Error */ -USB_8DEV_STATUSMSG_CRC = 0x27 # CRC Error */ - -USB_8DEV_RP_MASK = 0x7F # Mask for Receive Error Bit */ - -# Available Commands - - -class Can8DevCommand(Enum): - USB_8DEV_RESET = 1 # Reset Device - USB_8DEV_OPEN = 2 # Open Port - USB_8DEV_CLOSE = 3 # Close Port - USB_8DEV_SET_SPEED = 4 - USB_8DEV_SET_MASK_FILTER = ( - 5 - ) # Unfortunately unknown parameters and supposedly un-implemented on early firmwares - USB_8DEV_GET_STATUS = 6 - USB_8DEV_GET_STATISTICS = 7 - USB_8DEV_GET_SERIAL = 8 - USB_8DEV_GET_SOFTW_VER = 9 - USB_8DEV_GET_HARDW_VER = 0xA - USB_8DEV_RESET_TIMESTAMP = 0xB - USB_8DEV_GET_SOFTW_HARDW_VER = 0xC - - -class Can8DevTxFrame: - flags: int - id: int - dlc: int - data: bytes - - def __init__( - self, can_id: int, dlc: int, data: bytes, is_ext: bool, is_remote: bool - ): - self.can_id = can_id - self.dlc = dlc - self.data = data - self.flags = 0 - if is_ext: - self.flags |= USB_8DEV_EXTID - if is_remote: - self.flags |= USB_8DEV_RTR - - def _pad_data(self, data: bytes): - data_bytes = bytearray(8) - for i in range(0, 7): - if i < len(data): - data_bytes[i] = data[i] - return bytes(data_bytes) - - def to_bytes(self): - cmd_buf = bytearray() - cmd_buf.append(USB_8DEV_DATA_START) - cmd_buf.append(self.flags) - id_bytes = self.can_id.to_bytes(4, byteorder="big") - cmd_buf.extend(id_bytes) - cmd_buf.append(self.dlc) - cmd_buf.extend(self._pad_data(self.data)) - cmd_buf.append(USB_8DEV_DATA_END) - return bytes(cmd_buf) - - -class Can8DevRxFrame: - data: bytes - id: int - dlc: int - timestamp: int - ext_id: bool - is_error: bool - is_remote: bool - - def __init__(self, bytes_in: bytes): - if len(bytes_in) != 21: - raise ValueError("Did not receive 21 bytes for 8Dev Data Frame") - if bytes_in[0] != USB_8DEV_DATA_START: - raise ValueError("Did not receive a valid 8Dev Data Frame") - if bytes_in[1] == USB_8DEV_TYPE_CAN_FRAME: - self.data = bytes_in[8:16] - self.dlc = bytes_in[7] - self.ext_id = bytes_in[2] & USB_8DEV_EXTID - self.is_remote = bytes_in[2] & USB_8DEV_RTR - self.id = int.from_bytes(bytes_in[3:7], byteorder="big") - self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") - self.is_error = False - elif bytes_in[1] == USB_8DEV_TYPE_ERROR_FRAME: - self.is_error = True - self.data = bytes_in[7:15] - self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") - else: - raise ValueError("8Dev Data Frame with Unknown Type") - - -class Can8DevCommandFrame: - command: Can8DevCommand - opt1: int - opt2: int - data: bytes - - def __init__(self, command, data=bytes(), opt1=0, opt2=0): - self.command = command - self.data = data - self.opt1 = opt1 - self.opt2 = opt2 - - def _pad_data(self, data: bytes): - data_bytes = bytearray(10) - for i in range(0, 9): - if i < len(data): - data_bytes[i] = data[i] - return bytes(data_bytes) - - def to_bytes(self): - cmd_buf = bytearray() - cmd_buf.append(USB_8DEV_CMD_START) - cmd_buf.append(0) # Supposedly could be a channel value, but unknown - cmd_buf.append(self.command.value) - cmd_buf.append(self.opt1) - cmd_buf.append(self.opt2) - cmd_buf.extend(self._pad_data(self.data)) - cmd_buf.append(USB_8DEV_CMD_END) - return bytes(cmd_buf) - - def from_bytes(byte_input: bytes): - if len(byte_input) != 16: - raise ValueError("Did not receive 16 bytes for 8Dev Command Frame") - return Can8DevCommandFrame( - Can8DevCommand(byte_input[2]), - byte_input[5:15], - byte_input[3], - byte_input[4], - ) +try: + import usb.core + import usb.util +except ImportError: + logger.warning( + "The PyUSB module is not installed. Install it using `python3 -m pip install pyusb`" + ) class Can8DevUSBDevice: @@ -277,9 +99,8 @@ def _start_recv_thread(self): def _stop_recv_thread(self): self._close = True - def send_command(self, cmd: Can8DevCommand, data: bytes = bytes(), opt1=0, opt2=0): - frame = Can8DevCommandFrame(cmd, data, opt1, opt2) - self.cmd_tx_ep.write(frame.to_bytes()) + def send_command(self, cmd: Can8DevCommandFrame): + self.cmd_tx_ep.write(cmd.to_bytes()) return Can8DevCommandFrame.from_bytes(self.cmd_rx_ep.read(16)) def open( @@ -292,29 +113,11 @@ def open( listenonly: bool = False, oneshot: bool = False, ): - self.send_command(Can8DevCommand.USB_8DEV_RESET) - open_command = Can8DevCommand.USB_8DEV_OPEN - opt1 = USB_8DEV_BAUD_MANUAL - flags = 0 - if loopback: - flags |= USB_8DEV_LOOPBACK - if listenonly: - flags |= USB_8DEV_SILENT - if oneshot: - flags |= USB_8DEV_DISABLE_AUTO_RESTRANS - flags_bytes = flags.to_bytes(4, "big") - brp_bytes = brp.to_bytes(2, "big") - data = bytearray(10) - data[0] = phase_seg1 - data[1] = phase_seg2 - data[2] = sjw - data[3] = brp_bytes[0] - data[4] = brp_bytes[1] - data[5] = flags_bytes[0] - data[6] = flags_bytes[1] - data[7] = flags_bytes[2] - data[8] = flags_bytes[3] - if self.send_command(open_command, data, opt1).opt1 == 0: + self.send_command(Can8DevCommandFrame(Can8DevCommand.USB_8DEV_RESET)) + open_command = can_8dev_open_frame( + phase_seg1, phase_seg2, sjw, brp, loopback, listenonly, oneshot + ) + if self.send_command(open_command).opt1 == 0: self._start_recv_thread() return True else: @@ -323,7 +126,7 @@ def open( def close(self): self._stop_recv_thread() close_command = Can8DevCommand.USB_8DEV_CLOSE - self.send_command(close_command) + self.send_command(Can8DevCommandFrame(close_command)) def recv(self, timeout=None): try: @@ -335,7 +138,9 @@ def send(self, tx_frame: Can8DevTxFrame, timeout=None): self.data_tx_ep.write(tx_frame.to_bytes(), timeout) def get_version(self): - cmd_response = self.send_command(Can8DevCommand.USB_8DEV_GET_SOFTW_HARDW_VER) + cmd_response = self.send_command( + Can8DevCommandFrame(Can8DevCommand.USB_8DEV_GET_SOFTW_HARDW_VER) + ) version = int.from_bytes(cmd_response.data[0:4], byteorder="big") return version diff --git a/can/interfaces/usb2can_libusb/can_8dev_usb_utils.py b/can/interfaces/usb2can_libusb/can_8dev_usb_utils.py new file mode 100644 index 000000000..b6836303d --- /dev/null +++ b/can/interfaces/usb2can_libusb/can_8dev_usb_utils.py @@ -0,0 +1,221 @@ +from enum import Enum + +MAX_8DEV_RECV_QUEUE = 128 # Maximum number of slots in the recv queue + +USB_8DEV_VENDOR_ID = ( + 0x0483 +) # Unfortunately this is actually the ST Microelectronics Vendor ID +USB_8DEV_PRODUCT_ID = 0x1234 # Unfortunately this is pretty bogus +USB_8DEV_PRODUCT_STRING = "USB2CAN converter" # So we use this instead. Not great. + +USB_8DEV_ABP_CLOCK = 32000000 + +# USB Bulk Endpoint identifiers + +USB_8DEV_ENDP_DATA_RX = 0x81 +USB_8DEV_ENDP_DATA_TX = 0x2 +USB_8DEV_ENDP_CMD_RX = 0x83 +USB_8DEV_ENDP_CMD_TX = 0x4 + +# Open Device Options + +USB_8DEV_SILENT = 0x01 +USB_8DEV_LOOPBACK = 0x02 +USB_8DEV_DISABLE_AUTO_RESTRANS = 0x04 +USB_8DEV_STATUS_FRAME = 0x08 + +# Command options +USB_8DEV_BAUD_MANUAL = 0x09 +USB_8DEV_CMD_START = 0x11 +USB_8DEV_CMD_END = 0x22 + +USB_8DEV_CMD_SUCCESS = 0 +USB_8DEV_CMD_ERROR = 255 + +USB_8DEV_CMD_TIMEOUT = 1000 + +# Framing definitions +USB_8DEV_DATA_START = 0x55 +USB_8DEV_DATA_END = 0xAA + +USB_8DEV_TYPE_CAN_FRAME = 0 +USB_8DEV_TYPE_ERROR_FRAME = 3 + +USB_8DEV_EXTID = 0x01 +USB_8DEV_RTR = 0x02 +USB_8DEV_ERR_FLAG = 0x04 + +# Status messages +USB_8DEV_STATUSMSG_OK = 0x00 +USB_8DEV_STATUSMSG_OVERRUN = 0x01 # Overrun occured when sending */ +USB_8DEV_STATUSMSG_BUSLIGHT = 0x02 # Error counter has reached 96 */ +USB_8DEV_STATUSMSG_BUSHEAVY = 0x03 # Error count. has reached 128 */ +USB_8DEV_STATUSMSG_BUSOFF = 0x04 # Device is in BUSOFF */ +USB_8DEV_STATUSMSG_STUFF = 0x20 # Stuff Error */ +USB_8DEV_STATUSMSG_FORM = 0x21 # Form Error */ +USB_8DEV_STATUSMSG_ACK = 0x23 # Ack Error */ +USB_8DEV_STATUSMSG_BIT0 = 0x24 # Bit1 Error */ +USB_8DEV_STATUSMSG_BIT1 = 0x25 # Bit0 Error */ +USB_8DEV_STATUSMSG_CRC = 0x27 # CRC Error */ + +USB_8DEV_RP_MASK = 0x7F # Mask for Receive Error Bit */ + +# Available Commands + + +class Can8DevCommand(Enum): + USB_8DEV_RESET = 1 # Reset Device + USB_8DEV_OPEN = 2 # Open Port + USB_8DEV_CLOSE = 3 # Close Port + USB_8DEV_SET_SPEED = 4 + USB_8DEV_SET_MASK_FILTER = ( + 5 + ) # Unfortunately unknown parameters and supposedly un-implemented on early firmwares + USB_8DEV_GET_STATUS = 6 + USB_8DEV_GET_STATISTICS = 7 + USB_8DEV_GET_SERIAL = 8 + USB_8DEV_GET_SOFTW_VER = 9 + USB_8DEV_GET_HARDW_VER = 0xA + USB_8DEV_RESET_TIMESTAMP = 0xB + USB_8DEV_GET_SOFTW_HARDW_VER = 0xC + + +class Can8DevTxFrame: + flags: int + id: int + dlc: int + data: bytes + + def __init__( + self, can_id: int, dlc: int, data: bytes, is_ext: bool, is_remote: bool + ): + self.can_id = can_id + self.dlc = dlc + self.data = data + self.flags = 0 + if is_ext: + self.flags |= USB_8DEV_EXTID + if is_remote: + self.flags |= USB_8DEV_RTR + + def _pad_data(self, data: bytes): + data_bytes = bytearray(8) + for i in range(0, 7): + if i < len(data): + data_bytes[i] = data[i] + return bytes(data_bytes) + + def to_bytes(self): + cmd_buf = bytearray() + cmd_buf.append(USB_8DEV_DATA_START) + cmd_buf.append(self.flags) + id_bytes = self.can_id.to_bytes(4, byteorder="big") + cmd_buf.extend(id_bytes) + cmd_buf.append(self.dlc) + cmd_buf.extend(self._pad_data(self.data)) + cmd_buf.append(USB_8DEV_DATA_END) + return bytes(cmd_buf) + + +class Can8DevRxFrame: + data: bytes + id: int + dlc: int + timestamp: int + ext_id: bool + is_error: bool + is_remote: bool + + def __init__(self, bytes_in: bytes): + if len(bytes_in) != 21: + raise ValueError("Did not receive 21 bytes for 8Dev Data Frame") + if bytes_in[0] != USB_8DEV_DATA_START: + raise ValueError("Did not receive a valid 8Dev Data Frame") + if bytes_in[1] == USB_8DEV_TYPE_CAN_FRAME: + self.data = bytes_in[8:16] + self.dlc = bytes_in[7] + self.ext_id = bytes_in[2] & USB_8DEV_EXTID + self.is_remote = bytes_in[2] & USB_8DEV_RTR + self.id = int.from_bytes(bytes_in[3:7], byteorder="big") + self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") + self.is_error = False + elif bytes_in[1] == USB_8DEV_TYPE_ERROR_FRAME: + self.is_error = True + self.data = bytes_in[7:15] + self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") + else: + raise ValueError("8Dev Data Frame with Unknown Type") + + +class Can8DevCommandFrame: + command: Can8DevCommand + opt1: int + opt2: int + data: bytes + + def __init__(self, command, data=bytes(), opt1=0, opt2=0): + self.command = command + self.data = data + self.opt1 = opt1 + self.opt2 = opt2 + + def _pad_data(self, data: bytes): + data_bytes = bytearray(10) + for i in range(0, 9): + if i < len(data): + data_bytes[i] = data[i] + return bytes(data_bytes) + + def to_bytes(self): + cmd_buf = bytearray() + cmd_buf.append(USB_8DEV_CMD_START) + cmd_buf.append(0) # Supposedly could be a channel value, but unknown + cmd_buf.append(self.command.value) + cmd_buf.append(self.opt1) + cmd_buf.append(self.opt2) + cmd_buf.extend(self._pad_data(self.data)) + cmd_buf.append(USB_8DEV_CMD_END) + return bytes(cmd_buf) + + def from_bytes(byte_input: bytes): + if len(byte_input) != 16: + raise ValueError("Did not receive 16 bytes for 8Dev Command Frame") + return Can8DevCommandFrame( + Can8DevCommand(byte_input[2]), + byte_input[5:15], + byte_input[3], + byte_input[4], + ) + + +def can_8dev_open_frame( + phase_seg1: int, + phase_seg2: int, + sjw: int, + brp: int, + loopback: bool = False, + listenonly: bool = False, + oneshot: bool = False, +) -> Can8DevCommandFrame: + open_command = Can8DevCommand.USB_8DEV_OPEN + opt1 = USB_8DEV_BAUD_MANUAL + flags = 0 + if loopback: + flags |= USB_8DEV_LOOPBACK + if listenonly: + flags |= USB_8DEV_SILENT + if oneshot: + flags |= USB_8DEV_DISABLE_AUTO_RESTRANS + flags_bytes = flags.to_bytes(4, "big") + brp_bytes = brp.to_bytes(2, "big") + data = bytearray(10) + data[0] = phase_seg1 + data[1] = phase_seg2 + data[2] = sjw + data[3] = brp_bytes[0] + data[4] = brp_bytes[1] + data[5] = flags_bytes[0] + data[6] = flags_bytes[1] + data[7] = flags_bytes[2] + data[8] = flags_bytes[3] + return Can8DevCommandFrame(open_command, data, opt1) diff --git a/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py b/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py index 66eaa47b4..5259709e7 100644 --- a/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py +++ b/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py @@ -6,12 +6,19 @@ import logging from ctypes import byref -from can import BusABC, Message, CanError, BitTiming -from .can_8dev_usb_device import * +from can import BusABC, Message, BitTiming +from .can_8dev_usb_utils import * # Set up logging log = logging.getLogger("can.usb2can_libusb") +try: + from .can_8dev_usb_device import * +except NameError: + log.warning( + "The PyUSB module is not installed, but it is required for USB2Can_LibUSB support. Install it using `python3 -m pip install pyusb`" + ) + def message_convert_tx(msg): """convert message from PythonCAN Message to 8Devices frame""" diff --git a/test/test_usb2can_libusb.py b/test/test_usb2can_libusb.py new file mode 100644 index 000000000..bb3eb7570 --- /dev/null +++ b/test/test_usb2can_libusb.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# coding: utf-8 + +""" +Test for USB2Can_LibUSB +""" + +import unittest +from unittest.mock import Mock + +import pytest + +import can +from can import Message +from can.interfaces.usb2can_libusb.usb2can_libusb_bus import message_convert_rx, message_convert_tx, Usb2CanLibUsbBus +from can.interfaces.usb2can_libusb.can_8dev_usb_utils import Can8DevRxFrame, Can8DevCommandFrame, Can8DevCommand, can_8dev_open_frame + +class TestUsb2CanLibUsbBus(unittest.TestCase): + def test_receive_deserialize(self) -> None: + recv_packet = bytes.fromhex("55000000000121084ef1ff1f00007e00008355e0aa") + recv_rx_frame = Can8DevRxFrame(recv_packet) + recv_msg = message_convert_rx(recv_rx_frame) + self.assertEqual(recv_msg.arbitration_id, 0x121) + self.assertEqual(recv_msg.dlc, 8) + self.assertEqual(recv_msg.data, bytes([0x4e, 0xf1, 0xff, 0x1f, 0x00, 0x00, 0x7E, 0x00])) + self.assertEqual(recv_msg.timestamp, 8607.200000) + + def test_transmit_serialize(self) -> None: + send_packet = Message(arbitration_id=0xC0FFEE, is_extended_id=True,data=[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]) + tx_frame = message_convert_tx(send_packet) + tx_bytes = tx_frame.to_bytes() + self.assertEqual(tx_bytes.hex(), '550100c0ffee080102030405060700aa') + + def test_command_serialize(self) -> None: + frame = Can8DevCommandFrame(Can8DevCommand.USB_8DEV_RESET) + self.assertEqual(frame.to_bytes().hex(), '11000100000000000000000000000022') + deserialized_frame = Can8DevCommandFrame.from_bytes(frame.to_bytes()) + self.assertEqual(deserialized_frame.command, Can8DevCommand.USB_8DEV_RESET) + + def test_open_command(self) -> None: + phase_seg1 = 6 + phase_seg2 = 1 + sjw = 1 + brp = 8 + loopback = True + listenonly = False + oneshot = False + open_command = can_8dev_open_frame(phase_seg1, phase_seg2, sjw, brp, loopback, listenonly, oneshot) + self.assertEqual(open_command.to_bytes().hex(), '11000209000601010008000000020022') + +if __name__ == "__main__": + unittest.main() From c2638b26471ea411e4245e4df87139220a24f2f3 Mon Sep 17 00:00:00 2001 From: bri3d Date: Fri, 12 Feb 2021 15:14:56 -0700 Subject: [PATCH 4/4] lint tests - the docs only run lint against can dir, maybe need a fix there too --- test/test_usb2can_libusb.py | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/test/test_usb2can_libusb.py b/test/test_usb2can_libusb.py index bb3eb7570..695f5dfb6 100644 --- a/test/test_usb2can_libusb.py +++ b/test/test_usb2can_libusb.py @@ -12,8 +12,18 @@ import can from can import Message -from can.interfaces.usb2can_libusb.usb2can_libusb_bus import message_convert_rx, message_convert_tx, Usb2CanLibUsbBus -from can.interfaces.usb2can_libusb.can_8dev_usb_utils import Can8DevRxFrame, Can8DevCommandFrame, Can8DevCommand, can_8dev_open_frame +from can.interfaces.usb2can_libusb.usb2can_libusb_bus import ( + message_convert_rx, + message_convert_tx, + Usb2CanLibUsbBus, +) +from can.interfaces.usb2can_libusb.can_8dev_usb_utils import ( + Can8DevRxFrame, + Can8DevCommandFrame, + Can8DevCommand, + can_8dev_open_frame, +) + class TestUsb2CanLibUsbBus(unittest.TestCase): def test_receive_deserialize(self) -> None: @@ -22,18 +32,24 @@ def test_receive_deserialize(self) -> None: recv_msg = message_convert_rx(recv_rx_frame) self.assertEqual(recv_msg.arbitration_id, 0x121) self.assertEqual(recv_msg.dlc, 8) - self.assertEqual(recv_msg.data, bytes([0x4e, 0xf1, 0xff, 0x1f, 0x00, 0x00, 0x7E, 0x00])) + self.assertEqual( + recv_msg.data, bytes([0x4E, 0xF1, 0xFF, 0x1F, 0x00, 0x00, 0x7E, 0x00]) + ) self.assertEqual(recv_msg.timestamp, 8607.200000) def test_transmit_serialize(self) -> None: - send_packet = Message(arbitration_id=0xC0FFEE, is_extended_id=True,data=[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]) + send_packet = Message( + arbitration_id=0xC0FFEE, + is_extended_id=True, + data=[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08], + ) tx_frame = message_convert_tx(send_packet) tx_bytes = tx_frame.to_bytes() - self.assertEqual(tx_bytes.hex(), '550100c0ffee080102030405060700aa') + self.assertEqual(tx_bytes.hex(), "550100c0ffee080102030405060700aa") def test_command_serialize(self) -> None: frame = Can8DevCommandFrame(Can8DevCommand.USB_8DEV_RESET) - self.assertEqual(frame.to_bytes().hex(), '11000100000000000000000000000022') + self.assertEqual(frame.to_bytes().hex(), "11000100000000000000000000000022") deserialized_frame = Can8DevCommandFrame.from_bytes(frame.to_bytes()) self.assertEqual(deserialized_frame.command, Can8DevCommand.USB_8DEV_RESET) @@ -45,8 +61,13 @@ def test_open_command(self) -> None: loopback = True listenonly = False oneshot = False - open_command = can_8dev_open_frame(phase_seg1, phase_seg2, sjw, brp, loopback, listenonly, oneshot) - self.assertEqual(open_command.to_bytes().hex(), '11000209000601010008000000020022') + open_command = can_8dev_open_frame( + phase_seg1, phase_seg2, sjw, brp, loopback, listenonly, oneshot + ) + self.assertEqual( + open_command.to_bytes().hex(), "11000209000601010008000000020022" + ) + if __name__ == "__main__": unittest.main()