From 85910bc5de8e00f2b108c4ceebb96354c822819c Mon Sep 17 00:00:00 2001 From: zariiii9003 <52598363+zariiii9003@users.noreply.github.com> Date: Mon, 24 Jun 2024 18:19:53 +0200 Subject: [PATCH 1/3] refactor filter parsing, add tests --- can/logger.py | 75 ++++++++++++++++++++++++++++++--------------- can/typechecking.py | 15 +++++++++ can/viewer.py | 25 ++++++--------- test/test_logger.py | 35 +++++++++++++++++++-- test/test_viewer.py | 75 ++++++++++++++++++++++----------------------- 5 files changed, 144 insertions(+), 81 deletions(-) diff --git a/can/logger.py b/can/logger.py index 7d1ae6f66..35c3db20b 100644 --- a/can/logger.py +++ b/can/logger.py @@ -3,17 +3,26 @@ import re import sys from datetime import datetime -from typing import TYPE_CHECKING, Any, Dict, List, Sequence, Tuple, Union +from typing import ( + TYPE_CHECKING, + Any, + Dict, + List, + Optional, + Sequence, + Tuple, + Union, +) import can +from can import Bus, BusState, Logger, SizedRotatingLogger +from can.typechecking import TAdditionalCliArgs from can.util import cast_from_string -from . import Bus, BusState, Logger, SizedRotatingLogger -from .typechecking import CanFilter, CanFilters - if TYPE_CHECKING: from can.io import BaseRotatingLogger from can.io.generic import MessageWriter + from can.typechecking import CanFilter def _create_base_argument_parser(parser: argparse.ArgumentParser) -> None: @@ -60,10 +69,7 @@ def _create_base_argument_parser(parser: argparse.ArgumentParser) -> None: def _append_filter_argument( - parser: Union[ - argparse.ArgumentParser, - argparse._ArgumentGroup, - ], + parser: Union[argparse.ArgumentParser, argparse._ArgumentGroup], *args: str, **kwargs: Any, ) -> None: @@ -78,16 +84,17 @@ def _append_filter_argument( "\n ~ (matches when & mask !=" " can_id & mask)" "\nFx to show only frames with ID 0x100 to 0x103 and 0x200 to 0x20F:" - "\n python -m can.viewer -f 100:7FC 200:7F0" + "\n python -m can.viewer --filter 100:7FC 200:7F0" "\nNote that the ID and mask are always interpreted as hex values", metavar="{:,~}", nargs=argparse.ONE_OR_MORE, - default="", + action=_CanFilterAction, + dest="can_filters", **kwargs, ) -def _create_bus(parsed_args: Any, **kwargs: Any) -> can.BusABC: +def _create_bus(parsed_args: argparse.Namespace, **kwargs: Any) -> can.BusABC: logging_level_names = ["critical", "error", "warning", "info", "debug", "subdebug"] can.set_logging_level(logging_level_names[min(5, parsed_args.verbosity)]) @@ -100,16 +107,27 @@ def _create_bus(parsed_args: Any, **kwargs: Any) -> can.BusABC: config["fd"] = True if parsed_args.data_bitrate: config["data_bitrate"] = parsed_args.data_bitrate + if getattr(parsed_args, "can_filters", None): + config["can_filters"] = parsed_args.can_filters return Bus(parsed_args.channel, **config) -def _parse_filters(parsed_args: Any) -> CanFilters: - can_filters: List[CanFilter] = [] +class _CanFilterAction(argparse.Action): + def __call__( + self, + parser: argparse.ArgumentParser, + namespace: argparse.Namespace, + values: Union[str, Sequence[Any], None], + option_string: Optional[str] = None, + ) -> None: + if not isinstance(values, list): + raise argparse.ArgumentError(None, "Invalid filter argument") + + print(f"Adding filter(s): {values}") + can_filters: List[CanFilter] = [] - if parsed_args.filter: - print(f"Adding filter(s): {parsed_args.filter}") - for filt in parsed_args.filter: + for filt in values: if ":" in filt: parts = filt.split(":") can_id = int(parts[0], base=16) @@ -122,12 +140,10 @@ def _parse_filters(parsed_args: Any) -> CanFilters: raise argparse.ArgumentError(None, "Invalid filter argument") can_filters.append({"can_id": can_id, "can_mask": can_mask}) - return can_filters + setattr(namespace, self.dest, can_filters) -def _parse_additional_config( - unknown_args: Sequence[str], -) -> Dict[str, Union[str, int, float, bool]]: +def _parse_additional_config(unknown_args: Sequence[str]) -> TAdditionalCliArgs: for arg in unknown_args: if not re.match(r"^--[a-zA-Z\-]*?=\S*?$", arg): raise ValueError(f"Parsing argument {arg} failed") @@ -142,12 +158,18 @@ def _split_arg(_arg: str) -> Tuple[str, str]: return args -def main() -> None: +def _parse_logger_args( + args: List[str], +) -> Tuple[argparse.Namespace, TAdditionalCliArgs]: + """Parse command line arguments for logger script.""" + parser = argparse.ArgumentParser( description="Log CAN traffic, printing messages to stdout or to a " "given file.", ) + # Generate the standard arguments: + # Channel, bitrate, data_bitrate, interface, app_name, CAN-FD support _create_base_argument_parser(parser) parser.add_argument( @@ -200,13 +222,18 @@ def main() -> None: ) # print help message when no arguments were given - if len(sys.argv) < 2: + if not args: parser.print_help(sys.stderr) raise SystemExit(errno.EINVAL) - results, unknown_args = parser.parse_known_args() + results, unknown_args = parser.parse_known_args(args) additional_config = _parse_additional_config([*results.extra_args, *unknown_args]) - bus = _create_bus(results, can_filters=_parse_filters(results), **additional_config) + return results, additional_config + + +def main() -> None: + results, additional_config = _parse_logger_args(sys.argv[1:]) + bus = _create_bus(results, **additional_config) if results.active: bus.state = BusState.ACTIVE diff --git a/can/typechecking.py b/can/typechecking.py index 89f978a1a..284dd8aba 100644 --- a/can/typechecking.py +++ b/can/typechecking.py @@ -2,8 +2,16 @@ """ import gzip +import struct +import sys import typing +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + + if typing.TYPE_CHECKING: import os @@ -40,6 +48,13 @@ class CanFilterExtended(typing.TypedDict): BusConfig = typing.NewType("BusConfig", typing.Dict[str, typing.Any]) +# Used by CLI scripts +TAdditionalCliArgs: TypeAlias = typing.Dict[str, typing.Union[str, int, float, bool]] +TDataStructs: TypeAlias = typing.Dict[ + typing.Union[int, typing.Tuple[int, ...]], + typing.Union[struct.Struct, typing.Tuple, None], +] + class AutoDetectedConfig(typing.TypedDict): interface: str diff --git a/can/viewer.py b/can/viewer.py index 07752327d..45c313b07 100644 --- a/can/viewer.py +++ b/can/viewer.py @@ -27,17 +27,16 @@ import struct import sys import time -from typing import Dict, List, Tuple, Union +from typing import Dict, List, Tuple from can import __version__ - -from .logger import ( +from can.logger import ( _append_filter_argument, _create_base_argument_parser, _create_bus, _parse_additional_config, - _parse_filters, ) +from can.typechecking import TAdditionalCliArgs, TDataStructs logger = logging.getLogger("can.viewer") @@ -391,7 +390,9 @@ def _fill_text(self, text, width, indent): return super()._fill_text(text, width, indent) -def parse_args(args: List[str]) -> Tuple: +def _parse_viewer_args( + args: List[str], +) -> Tuple[argparse.Namespace, TDataStructs, TAdditionalCliArgs]: # Parse command line arguments parser = argparse.ArgumentParser( "python -m can.viewer", @@ -489,8 +490,6 @@ def parse_args(args: List[str]) -> Tuple: parsed_args, unknown_args = parser.parse_known_args(args) - can_filters = _parse_filters(parsed_args) - # Dictionary used to convert between Python values and C structs represented as Python strings. # If the value is 'None' then the message does not contain any data package. # @@ -511,9 +510,7 @@ def parse_args(args: List[str]) -> Tuple: # similarly the values # are divided by the value in order to convert from real units to raw integer values. - data_structs: Dict[ - Union[int, Tuple[int, ...]], Union[struct.Struct, Tuple, None] - ] = {} + data_structs: TDataStructs = {} if parsed_args.decode: if os.path.isfile(parsed_args.decode[0]): with open(parsed_args.decode[0], encoding="utf-8") as f: @@ -544,16 +541,12 @@ def parse_args(args: List[str]) -> Tuple: additional_config = _parse_additional_config( [*parsed_args.extra_args, *unknown_args] ) - return parsed_args, can_filters, data_structs, additional_config + return parsed_args, data_structs, additional_config def main() -> None: - parsed_args, can_filters, data_structs, additional_config = parse_args(sys.argv[1:]) - - if can_filters: - additional_config.update({"can_filters": can_filters}) + parsed_args, data_structs, additional_config = _parse_viewer_args(sys.argv[1:]) bus = _create_bus(parsed_args, **additional_config) - curses.wrapper(CanViewer, bus, data_structs) # type: ignore[attr-defined,unused-ignore] diff --git a/test/test_logger.py b/test/test_logger.py index 083e4d19c..32fc987b4 100644 --- a/test/test_logger.py +++ b/test/test_logger.py @@ -16,8 +16,6 @@ import can import can.logger -from .config import * - class TestLoggerScriptModule(unittest.TestCase): def setUp(self) -> None: @@ -108,6 +106,39 @@ def test_log_virtual_sizedlogger(self): self.assertSuccessfullCleanup() self.mock_logger_sized.assert_called_once() + def test_parse_logger_args(self): + args = self.baseargs + [ + "--bitrate", + "250000", + "--fd", + "--data_bitrate", + "2000000", + "--receive-own-messages=True", + ] + results, additional_config = can.logger._parse_logger_args(args[1:]) + assert results.interface == "virtual" + assert results.bitrate == 250_000 + assert results.fd is True + assert results.data_bitrate == 2_000_000 + assert additional_config["receive_own_messages"] is True + + def test_parse_can_filters(self): + expected_can_filters = [{"can_id": 0x100, "can_mask": 0x7FC}] + results, additional_config = can.logger._parse_logger_args( + ["--filter", "100:7FC", "--bitrate", "250000"] + ) + assert results.can_filters == expected_can_filters + + def test_parse_can_filters_list(self): + expected_can_filters = [ + {"can_id": 0x100, "can_mask": 0x7FC}, + {"can_id": 0x200, "can_mask": 0x7F0}, + ] + results, additional_config = can.logger._parse_logger_args( + ["--filter", "100:7FC", "200:7F0", "--bitrate", "250000"] + ) + assert results.can_filters == expected_can_filters + def test_parse_additional_config(self): unknown_args = [ "--app-name=CANalyzer", diff --git a/test/test_viewer.py b/test/test_viewer.py index ecc594915..3bd32b25a 100644 --- a/test/test_viewer.py +++ b/test/test_viewer.py @@ -36,7 +36,7 @@ import pytest import can -from can.viewer import CanViewer, parse_args +from can.viewer import CanViewer, _parse_viewer_args # Allow the curses module to be missing (e.g. on PyPy on Windows) try: @@ -397,19 +397,19 @@ def test_pack_unpack(self): ) def test_parse_args(self): - parsed_args, _, _, _ = parse_args(["-b", "250000"]) + parsed_args, _, _ = _parse_viewer_args(["-b", "250000"]) self.assertEqual(parsed_args.bitrate, 250000) - parsed_args, _, _, _ = parse_args(["--bitrate", "500000"]) + parsed_args, _, _ = _parse_viewer_args(["--bitrate", "500000"]) self.assertEqual(parsed_args.bitrate, 500000) - parsed_args, _, _, _ = parse_args(["-c", "can0"]) + parsed_args, _, _ = _parse_viewer_args(["-c", "can0"]) self.assertEqual(parsed_args.channel, "can0") - parsed_args, _, _, _ = parse_args(["--channel", "PCAN_USBBUS1"]) + parsed_args, _, _ = _parse_viewer_args(["--channel", "PCAN_USBBUS1"]) self.assertEqual(parsed_args.channel, "PCAN_USBBUS1") - parsed_args, _, data_structs, _ = parse_args(["-d", "100: Date: Tue, 25 Jun 2024 11:27:17 +0200 Subject: [PATCH 2/3] change chapter title to "Command Line Tools" --- doc/scripts.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/scripts.rst b/doc/scripts.rst index e3a59a409..520b19177 100644 --- a/doc/scripts.rst +++ b/doc/scripts.rst @@ -1,5 +1,5 @@ -Scripts -======= +Command Line Tools +================== The following modules are callable from ``python-can``. From bc4aba3e023853f880748f176ca1d8c009354650 Mon Sep 17 00:00:00 2001 From: zariiii9003 <52598363+zariiii9003@users.noreply.github.com> Date: Tue, 25 Jun 2024 11:33:26 +0200 Subject: [PATCH 3/3] upper case --- doc/other-tools.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/other-tools.rst b/doc/other-tools.rst index eab3c4f43..db06812ca 100644 --- a/doc/other-tools.rst +++ b/doc/other-tools.rst @@ -1,4 +1,4 @@ -Other CAN bus tools +Other CAN Bus Tools =================== In order to keep the project maintainable, the scope of the package is limited to providing common