diff --git a/.ast-grep/rules/python-rule-add-trailing-comma.yml b/.ast-grep/rules/python-rule-add-trailing-comma.yml new file mode 100644 index 00000000..149e3218 --- /dev/null +++ b/.ast-grep/rules/python-rule-add-trailing-comma.yml @@ -0,0 +1,29 @@ +# .ast-grep/python-rule-add-trailing-comma.yml +id: python-rule-add-trailing-comma +rule: + all: + - matches: python-rule-argument-last-without-comma + - not: + pattern: + $LAST_ARG, +language: python +message: "Consider adding a trailing comma." +description: Rule for improving future maintainability by pre-emptivly adding trailing commas. +severity: hint +fix: + $LAST_ARG, +examples: + - name: Argument list with missing trailing comma + code: | + # trivial example should match + warnings.warn( + "This should match. because there is no trailing comma.", + stacklevel=2 + ) + - name: Resulting argument list with trailing comma + code: | + # trivial example should no-longer match + warnings.warn( + "This should match. because there is no trailing comma.", + stacklevel=2, + ) diff --git a/.ast-grep/utils/python/structure/argument-last-without-comma.yml b/.ast-grep/utils/python/structure/argument-last-without-comma.yml new file mode 100644 index 00000000..d9797ad7 --- /dev/null +++ b/.ast-grep/utils/python/structure/argument-last-without-comma.yml @@ -0,0 +1,30 @@ +# .ast-grep/utils/python/structure/argument-last-without-comma.yml +# Argument Structure utils +id: python-rule-argument-last-without-comma +rule: + any: + - kind: identifier + - kind: integer + - kind: float + - kind: string + - kind: keyword_argument + matches: python-rule-keyword-argument-inside-list-with-comma + nthChild: + position: 1 + reverse: true + inside: + all: + - kind: argument_list + matches: python-rule-argument-list-with-comma + precedes: + pattern: ) + pattern: + $LAST_ARG + all: + - not: + pattern: + $LAST_ARG, + - not: + pattern: + $$$, $LAST_ARG) +language: python diff --git a/.ast-grep/utils/python/structure/argument-list-with-comma.yml b/.ast-grep/utils/python/structure/argument-list-with-comma.yml new file mode 100644 index 00000000..3361a64d --- /dev/null +++ b/.ast-grep/utils/python/structure/argument-list-with-comma.yml @@ -0,0 +1,13 @@ +# .ast-grep/utils/python/structure/argument-list-with-comma.yml +# Argument Structure utils +id: python-rule-argument-list-with-comma +rule: + kind: argument_list + pattern: | + $_FULL_ARG_LIST + has: + all: + - regex: "[,]" + - regex: "\ + " +language: python diff --git a/.ast-grep/utils/python/structure/keyword-argument-inside-list-with-comma.yml b/.ast-grep/utils/python/structure/keyword-argument-inside-list-with-comma.yml new file mode 100644 index 00000000..1e7521cd --- /dev/null +++ b/.ast-grep/utils/python/structure/keyword-argument-inside-list-with-comma.yml @@ -0,0 +1,17 @@ +# .ast-grep/utils/python/structure/keyword-argument-inside-list-with-comma.yml +# Argument Structure utils +id: python-rule-keyword-argument-inside-list-with-comma +rule: + kind: keyword_argument + inside: + all: + - kind: argument_list + matches: python-rule-argument-list-with-comma + - regex: | + [^(]*[^,] + nthChild: + position: 1 + reverse: true + pattern: + $_KW_ARG +language: python diff --git a/.coveragerc b/.coveragerc index 2a75abb1..190fa3bf 100644 --- a/.coveragerc +++ b/.coveragerc @@ -34,6 +34,7 @@ exclude_lines = if __name__ in u"__main__": if __name__ .. .__main__.: if __sys_path__ not in sys.path: + if __debug__ os.abort() exit @@ -52,6 +53,7 @@ partial_branches = if __name__ in u'__main__': if __name__ in u"__main__": if __name__ in '__main__': + if __debug__ if __sys_path__ not in sys.path: # don't complain about sys.modules sys.modules diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 61a88f93..b480633d 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -44,7 +44,7 @@ jobs: # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@1b549b9259bda1cb5ddde3b41741a82a2d15a841 # v3.28.13 + uses: github/codeql-action/init@45775bd8235c68ba998cffa5171334d58593da47 # v3.28.15 with: languages: ${{ matrix.language }} # If you wish to specify custom queries, you can do so here or in a config file. @@ -55,7 +55,7 @@ jobs: # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild - uses: github/codeql-action/autobuild@1b549b9259bda1cb5ddde3b41741a82a2d15a841 # v3.28.13 + uses: github/codeql-action/autobuild@45775bd8235c68ba998cffa5171334d58593da47 # v3.28.15 # â„šī¸ Command-line programs to run using the OS shell. # 📚 https://git.io/JvXDl @@ -69,4 +69,4 @@ jobs: # make release - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@1b549b9259bda1cb5ddde3b41741a82a2d15a841 # v3.28.13 + uses: github/codeql-action/analyze@45775bd8235c68ba998cffa5171334d58593da47 # v3.28.15 diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml index 504820ad..011bef82 100644 --- a/.github/workflows/scorecard.yml +++ b/.github/workflows/scorecard.yml @@ -30,7 +30,7 @@ jobs: with: persist-credentials: false - name: "Run analysis" - uses: ossf/scorecard-action@15bc21f12b2fbbb36fa93d5f2e786f6b18ffe048 # v2.4.1 + uses: ossf/scorecard-action@405dced65a1dcc6bc335115002f0690b1f1e49bb # v2.4.1 with: results_file: results.sarif results_format: sarif @@ -53,6 +53,6 @@ jobs: retention-days: 5 # Upload the results to GitHub's code scanning dashboard. - name: "Upload to code-scanning" - uses: github/codeql-action/upload-sarif@1b549b9259bda1cb5ddde3b41741a82a2d15a841 # v3.28.13 + uses: github/codeql-action/upload-sarif@45775bd8235c68ba998cffa5171334d58593da47 # v3.28.15 with: sarif_file: results.sarif diff --git a/Makefile b/Makefile index f47b4345..11451d2f 100644 --- a/Makefile +++ b/Makefile @@ -212,9 +212,17 @@ legacy-purge: clean uninstall $(QUIET)$(RMDIR) ./dist/ 2>$(ERROR_LOG_PATH) || : $(QUIET)$(RMDIR) ./.eggs/ 2>$(ERROR_LOG_PATH) || : -purge: legacy-purge +purge-test-reports:: $(QUIET)$(RM) ./test-reports/*.xml 2>$(ERROR_LOG_PATH) || : $(QUIET)$(RMDIR) ./test-reports/ 2>$(ERROR_LOG_PATH) || : + +purge-coverage-artifacts: legacy-purge + $(QUIET)$(RM) ./coverage_* 2>$(ERROR_LOG_PATH) || : + $(QUIET)$(RM) ./.coverage.* 2>$(ERROR_LOG_PATH) || : + $(QUIET)$(RM) ./coverage_doctests.xml 2>$(ERROR_LOG_PATH) || : + +purge: purge-coverage-artifacts purge-test-reports + $(QUIET)$(WAIT) $(QUIET)$(ECHO) "$@: Done." test: just-test @@ -223,6 +231,12 @@ test: just-test $(QUIET)$(COVERAGE) report -m --include=* 2>$(ERROR_LOG_PATH) || : ; $(QUIET)$(ECHO) "$@: Done." +test-mod: test-mat + $(QUIET)$(DO_FAIL) ; + $(QUIET)$(COVERAGE) combine 2>$(ERROR_LOG_PATH) || : ; + $(QUIET)$(COVERAGE) report -m --include=* 2>$(ERROR_LOG_PATH) || : ; + $(QUIET)$(ECHO) "$@: Done." + test-tox: build $(QUIET)tox -v -- || tail -n 500 .tox/py*/log/py*.log 2>$(ERROR_LOG_PATH) $(QUIET)$(COVERAGE) combine 2>$(ERROR_LOG_PATH) || : ; @@ -253,18 +267,20 @@ just-test: cleanup MANIFEST.in ## Run all minimum acceptance tests $(QUIET)$(WAIT) ; $(QUIET)$(DO_FAIL) ; -test-mat: test-mat-build test-mat-bootstrap test-mat-basic test-mat-say test-mat-hear test-mat-usage test-mat-doctests ## Run all minimum acceptance tests +test-mat: cleanup MANIFEST.in test-mat-build test-mat-bootstrap test-mat-basic test-mat-say test-mat-hear test-mat-usage test-mat-doctests ## Run all minimum acceptance tests $(QUIET)$(WAIT) ; $(QUIET)$(DO_FAIL) ; -test-mat-doctests: MANIFEST.in ## Run doctests MAT category (doctests are special) +test-mat-doctests: test-reports MANIFEST.in ## Run doctests MAT category (doctests are special) $(QUIET)if [ -n "$$TESTS_USE_PYTEST" ]; then \ $(ECHO) "SKIP: The target '$@' is not compatable with pytests;"; \ $(ECHO) "Try 'make test-mat-doctests' instead."; \ else \ $(COVERAGE) run -p --source=multicast -m tests.run_selective --group mat --category doctests || DO_FAIL="exit 2" ; \ - $(COVERAGE) combine 2>$(ERROR_LOG_PATH) || : ; \ - $(COVERAGE) xml --include=* 2>$(ERROR_LOG_PATH) || : ; \ + $(QUIET)$(WAIT) ; \ + $(COVERAGE) combine --keep --data-file=coverage_doctests ./.coverage.* 2>$(ERROR_LOG_PATH) || : ; \ + $(COVERAGE) report -m --include=* --data-file=coverage_doctests 2>$(ERROR_LOG_PATH) || : ; \ + $(COVERAGE) xml -o test-reports/coverage_doctests.xml --include=* --data-file=coverage_doctests 2>$(ERROR_LOG_PATH) || : ; \ fi $(QUIET)$(WAIT) ; $(QUIET)$(DO_FAIL) ; diff --git a/docs/conf.py b/docs/conf.py index 66928521..3d48f2f1 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -123,7 +123,7 @@ # The short X.Y version. version = "v2.0" # The full version, including alpha/beta/rc tags. -release = "v2.0.5" +release = "v2.0.6" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/multicast/__init__.py b/multicast/__init__.py index f87cfca6..daf31842 100644 --- a/multicast/__init__.py +++ b/multicast/__init__.py @@ -22,6 +22,7 @@ import socket import struct import abc +import logging # skipcq __all__ = [ @@ -52,6 +53,7 @@ """EXIT_CODES""", """EXCEPTION_EXIT_CODES""", """_BLANK""", + """_MCAST_DEFAULT_BUFFER_SIZE""", # added in version 2.0.6 """_MCAST_DEFAULT_PORT""", """_MCAST_DEFAULT_GROUP""", """_MCAST_DEFAULT_TTL""", @@ -114,7 +116,7 @@ global __version__ # skipcq: PYL-W0604 -__version__ = "2.0.5" +__version__ = "2.0.6" """The version of this program. Minimal Acceptance Testing: @@ -156,6 +158,7 @@ Attributes: __version__ (str): The version of this package. + _MCAST_DEFAULT_BUFFER_SIZE (int): Default buffer size for multicast communication (1316). _MCAST_DEFAULT_PORT (int): Default port for multicast communication (59259). _MCAST_DEFAULT_GROUP (str): Default multicast group address ('224.0.0.1'). _MCAST_DEFAULT_TTL (int): Default TTL for multicast packets (1). @@ -215,6 +218,57 @@ >>> +""" + +global _MCAST_DEFAULT_BUFFER_SIZE # skipcq: PYL-W0604 + +_MCAST_DEFAULT_BUFFER_SIZE = 1316 +""" + Arbitrary buffer size to use by default, though any value below 65507 should work. + + Minimal Testing: + + First set up test fixtures by importing multicast. + + >>> import multicast + >>> + + Testcase 0: Multicast should have a default buffer size. + A: Test that the _MCAST_DEFAULT_BUFFER_SIZE attribute is initialized. + B: Test that the _MCAST_DEFAULT_BUFFER_SIZE attribute is an int. + + >>> multicast._MCAST_DEFAULT_BUFFER_SIZE is not None + True + >>> type(multicast._MCAST_DEFAULT_BUFFER_SIZE) is type(1) + True + >>> + >>> multicast._MCAST_DEFAULT_BUFFER_SIZE > int(1) + True + >>> + + Testcase 1: Multicast should validate buffer size constraints. + A: Test that the _MCAST_DEFAULT_BUFFER_SIZE attribute is initialized. + B: Test that the _MCAST_DEFAULT_BUFFER_SIZE attribute is an int. + C: Test that the _MCAST_DEFAULT_BUFFER_SIZE attribute is RFC-791 & RFC-768 compliant. + D: Test that the _MCAST_DEFAULT_BUFFER_SIZE attribute is a smaller than fragment thresholds + for typical ethernet MTUs by default. + + >>> multicast._MCAST_DEFAULT_BUFFER_SIZE is not None + True + >>> type(multicast._MCAST_DEFAULT_BUFFER_SIZE) is type(1) + True + >>> + >>> multicast._MCAST_DEFAULT_BUFFER_SIZE >= int(56) + True + >>> + >>> multicast._MCAST_DEFAULT_BUFFER_SIZE <= int(65527) + True + >>> + >>> multicast._MCAST_DEFAULT_BUFFER_SIZE <= int(1500) + True + >>> + + """ global _MCAST_DEFAULT_PORT # skipcq: PYL-W0604 @@ -351,6 +405,13 @@ """ +if logging.__name__ is not None: # pragma: no branch + logging.getLogger(__module__).addHandler(logging.NullHandler()) + logging.getLogger(__module__).debug( + "Loading %s", # lazy formatting to avoid PYL-W1203 + __module__, + ) + if sys.__name__ is None: raise ModuleNotFoundError( "FAIL: we could not import sys. We're like in the matrix! ABORT." @@ -365,7 +426,13 @@ if socket.__name__ is None: raise ModuleNotFoundError("FAIL: we could not import socket. ABORT.") from None else: # pragma: no branch - socket.setdefaulttimeout(int(_MCAST_DEFAULT_TTL)) + _tmp_mcast_value = int(_MCAST_DEFAULT_TTL) + logging.getLogger(__module__).debug( + "Setting default socket timeout to %d", # lazy formatting to avoid PYL-W1203 + _tmp_mcast_value, + ) + socket.setdefaulttimeout(_tmp_mcast_value) + del _tmp_mcast_value # skipcq - cleanup any bootstrap/setup leaks early if struct.__name__ is None: raise ModuleNotFoundError("FAIL: we could not import struct. ABORT.") from None @@ -407,15 +474,23 @@ if _config is None: raise ImportError("FAIL: we could not import environment. ABORT.") from None else: + logging.getLogger(__module__).debug("Configuring overrides and defaults.") _MCAST_DEFAULT_PORT = _config["port"] _MCAST_DEFAULT_GROUP = _config["group"] _MCAST_DEFAULT_TTL = _config["ttl"] + _MCAST_DEFAULT_BUFFER_SIZE = _config["buffer_size"] global _MCAST_DEFAULT_BIND_IP # skipcq: PYL-W0604 _MCAST_DEFAULT_BIND_IP = _config["bind_addr"] global _MCAST_DEFAULT_GROUPS # skipcq: PYL-W0604 _MCAST_DEFAULT_GROUPS = _config["groups"] - global _MCAST_DEFAULT_BUFFER # skipcq: PYL-W0604 - _MCAST_DEFAULT_BUFFER = _config["buffer_size"] + if __debug__: # pragma: no branch + logging.getLogger(__module__).info("Overrides and defaults are configured.") + logging.getLogger(__module__).debug("Defaults:") + for key, value in _config.items(): + logging.getLogger(__module__).debug( + "\t%s=%s", # lazy formatting to avoid PYL-W1203 + key, value, + ) del _config # skipcq - cleanup any bootstrap/setup leaks early @@ -491,6 +566,10 @@ def buildArgs(cls, calling_parser_group): """ if calling_parser_group is None: # pragma: no branch + logging.getLogger(__module__).debug( + "Building %s arguments.", # lazy formatting to avoid PYL-W1203 + __name__, + ) calling_parser_group = argparse.ArgumentParser( prog=str(cls.__name__ if cls.__proc__ is None else cls.__proc__), description=cls.__prologue__, diff --git a/multicast/env.py b/multicast/env.py index 027570cb..67bd00af 100644 --- a/multicast/env.py +++ b/multicast/env.py @@ -88,6 +88,7 @@ try: import os import warnings + from . import logging # skipcq: PLY-C0414 from . import socket # skipcq: PYL-C0414 import ipaddress except Exception as err: @@ -98,6 +99,57 @@ raise baton from err +module_logger = logging.getLogger(__module__) +module_logger.debug( + "Loading %s", # lazy formatting to avoid PYL-W1203 + __module__, +) +module_logger.debug( + "Initializing %s environment.", # lazy formatting to avoid PYL-W1203 + __package__, +) + + +def validate_buffer_size(size: int) -> bool: + """ + Validate if the buffer size is a positive integer. + + Arguments: + size (int) -- The buffer size to validate. + + Returns: + bool: True if the buffer size is valid ( > 0), False otherwise. + + Raises: + ValueError: If the size cannot be converted to an integer. + + Minimum Acceptance Testing: + >>> validate_buffer_size(1316) # Default value + True + >>> validate_buffer_size(1) # Minimum valid value + True + >>> validate_buffer_size(0) # Zero is invalid + False + >>> validate_buffer_size(-1) # Negative is invalid + False + >>> validate_buffer_size(65507) # Maximum UDP payload size 65,535 -8 -20 (RFC-791 & RFC-768) + True + >>> validate_buffer_size("1316") # String that can be converted + True + >>> try: + ... validate_buffer_size('invalid') + ... except ValueError: + ... print('ValueError raised') + ValueError raised + + """ + try: + size_num = int(size) + return 0 < size_num <= 65507 + except (ValueError, TypeError) as err: + raise ValueError(f"Invalid buffer size value: {size}. Must be a positive integer.") from err + + def validate_port(port: int) -> bool: """ Validate if the port number is within the dynamic/private port range. @@ -202,6 +254,129 @@ def validate_ttl(ttl: int) -> bool: ) from err +def load_buffer_size() -> int: + """ + Load and validate the multicast buffer size from environment variable. + + This function attempts to load the buffer size from the MULTICAST_BUFFER_SIZE + environment variable. If the value is valid, it returns the buffer size. + Invalid values trigger warnings and fall back to the default. + + MTU Considerations for Buffer Size: + When setting a buffer size, consider the MTU of the underlying network: + - Ethernet: 1500 bytes MTU → 1472 bytes max payload (1500 - 28 bytes overhead) + - PPP: 296 bytes MTU → 268 bytes max payload + - Wi-Fi (802.11): 2304 bytes MTU → 2276 bytes max payload + - Frame Relay: 128 bytes MTU → 100 bytes max payload + + The overhead consists of: + - UDP header: 8 bytes + - IP header: 20 bytes (without options) + + Setting buffer sizes larger than the network's max payload may cause IP + fragmentation, which can lead to performance issues and increased complexity. + + Returns: + int: The validated buffer size, or the default value if not set/invalid. + + Environment: + MULTICAST_BUFFER_SIZE -- The buffer size in bytes. + + Raises: + ImportError: If the multicast module cannot be imported. + + Minimum Acceptance Testing: + + Testcase 0: Setup test fixtures. + >>> import os + >>> from multicast import _MCAST_DEFAULT_BUFFER_SIZE + >>> original_buffer = _MCAST_DEFAULT_BUFFER_SIZE + + Testcase 1: Test with valid environment variable + >>> os.environ["MULTICAST_BUFFER_SIZE"] = "2048" + >>> buffer_size = load_buffer_size() + >>> buffer_size + 2048 + >>> # The function updates the global in the module's namespace, but this doesn't affect + >>> # the imported value in the test namespace + >>> _MCAST_DEFAULT_BUFFER_SIZE != 2048 # Global in test namespace is not updated + True + + Testcase 2: Test with invalid (negative) environment variable + >>> os.environ["MULTICAST_BUFFER_SIZE"] = "-100" + >>> import warnings + >>> with warnings.catch_warnings(record=True) as w: + ... warnings.simplefilter("always") + ... buffer_size = load_buffer_size() + ... len(w) == 1 # One warning was issued + True + >>> buffer_size == 1316 # Falls back to default + True + + Testcase 3: Test with invalid (zero) environment variable + >>> os.environ["MULTICAST_BUFFER_SIZE"] = "0" + >>> with warnings.catch_warnings(record=True) as w: + ... warnings.simplefilter("always") + ... buffer_size = load_buffer_size() + ... len(w) == 1 # One warning was issued + True + >>> buffer_size == 1316 # Falls back to default + True + + Testcase 4: Test with invalid (non-integer) environment variable + >>> os.environ["MULTICAST_BUFFER_SIZE"] = 'not_an_integer' + >>> with warnings.catch_warnings(record=True) as w: + ... warnings.simplefilter("always") + ... buffer_size = load_buffer_size() + ... len(w) == 1 # One warning was issued + True + >>> buffer_size == 1316 # Falls back to default + True + + Testcase 5: Test with no environment variable + >>> if "MULTICAST_BUFFER_SIZE" in os.environ: os.environ.pop("MULTICAST_BUFFER_SIZE") + 'not_an_integer' + >>> buffer_size = load_buffer_size() + >>> buffer_size == 1316 # Uses default + True + + # Cleanup + >>> globals()['_MCAST_DEFAULT_BUFFER_SIZE'] = original_buffer + + """ + # Import globals that we'll potentially update + from multicast import _MCAST_DEFAULT_BUFFER_SIZE + module_logger.debug("Looking for MULTICAST_BUFFER_SIZE in environment.") + try: + buffer_size = int(os.getenv( + "MULTICAST_BUFFER_SIZE", + _MCAST_DEFAULT_BUFFER_SIZE # skipcq: PYL-W1508 + )) + module_logger.debug("Done.") + except ValueError: + warnings.warn( + f"Invalid MULTICAST_BUFFER_SIZE value, using default {_MCAST_DEFAULT_BUFFER_SIZE}", + stacklevel=2, + ) + buffer_size = _MCAST_DEFAULT_BUFFER_SIZE # skipcq: PYL-W1508 + # Validate and potentially update buffer-size + module_logger.debug("Validating MULTICAST_BUFFER_SIZE.") + if validate_buffer_size(buffer_size): + globals()["_MCAST_DEFAULT_BUFFER_SIZE"] = buffer_size + module_logger.debug("Valid.") + else: + warnings.warn( + f"Invalid MULTICAST_BUFFER_SIZE {buffer_size}, using default {_MCAST_DEFAULT_BUFFER_SIZE}", + stacklevel=2, + ) + buffer_size = _MCAST_DEFAULT_BUFFER_SIZE + module_logger.debug( + "Loaded %s as internal multicast buffer size.", # lazy formatting to avoid PYL-W1203 + str(buffer_size), + ) + return buffer_size + + def load_port() -> int: """ Load and validate the multicast port from environment variable. @@ -280,22 +455,30 @@ def load_port() -> int: """ # Import globals that we'll potentially update from multicast import _MCAST_DEFAULT_PORT + module_logger.debug("Looking for MULTICAST_PORT in environment.") try: port = int(os.getenv("MULTICAST_PORT", _MCAST_DEFAULT_PORT)) + module_logger.debug("Done.") except ValueError: warnings.warn( - f"Invalid MULTICAST_PORT value, using default {_MCAST_DEFAULT_PORT}", stacklevel=2 + f"Invalid MULTICAST_PORT value, using default {_MCAST_DEFAULT_PORT}", stacklevel=2, ) port = _MCAST_DEFAULT_PORT # Validate and potentially update port + module_logger.debug("Validating MULTICAST_PORT.") if validate_port(port): globals()["_MCAST_DEFAULT_PORT"] = port + module_logger.debug("Valid.") else: warnings.warn( f"Port {port} is outside valid range (49152-65535), using default {_MCAST_DEFAULT_PORT}", - stacklevel=2 + stacklevel=2, ) port = _MCAST_DEFAULT_PORT + module_logger.debug( + "Loaded %s as default multicast port.", # lazy formatting to avoid PYL-W1203 + str(port), + ) return port @@ -400,15 +583,22 @@ def load_group() -> ipaddress.IPv4Address: """ # Import globals that we'll potentially update from multicast import _MCAST_DEFAULT_GROUP + module_logger.debug("Looking for any MULTICAST_GROUP in environment.") group = os.getenv("MULTICAST_GROUP", _MCAST_DEFAULT_GROUP) # Validate and potentially update group + module_logger.debug("Validating either MULTICAST_GROUP or default.") if validate_multicast_address(group): globals()["_MCAST_DEFAULT_GROUP"] = group + module_logger.debug("Valid.") else: warnings.warn( - f"Invalid multicast group {group}, using default {_MCAST_DEFAULT_GROUP}", stacklevel=2 + f"Invalid multicast group {group}, using default {_MCAST_DEFAULT_GROUP}", stacklevel=2, ) group = _MCAST_DEFAULT_GROUP + module_logger.debug( + "Loaded %s as default multicast group.", # lazy formatting to avoid PYL-W1203 + group, + ) return ipaddress.IPv4Address(group) @@ -429,13 +619,15 @@ def load_TTL() -> int: ImportError: If the multicast module cannot be imported. Minimum Acceptance Testing: + + Testcase 0: Setup >>> import os >>> import socket >>> from multicast import _MCAST_DEFAULT_TTL >>> original_ttl = _MCAST_DEFAULT_TTL >>> original_timeout = socket.getdefaulttimeout() - # Test with valid TTL + Testcase 1: Test with valid TTL >>> os.environ['MULTICAST_TTL'] = '2' >>> ttl = load_TTL() >>> ttl @@ -445,7 +637,7 @@ def load_TTL() -> int: >>> socket.getdefaulttimeout() == 2 # Socket timeout was updated True - # Test with invalid numeric TTL + Testcase 2: Test with invalid numeric TTL >>> os.environ['MULTICAST_TTL'] = '127' >>> import warnings >>> with warnings.catch_warnings(record=True) as w: @@ -456,7 +648,7 @@ def load_TTL() -> int: >>> ttl == original_ttl # Falls back to original default True - # Test with non-numeric TTL + Testcase 3: Test with non-numeric TTL >>> os.environ['MULTICAST_TTL'] = 'invalid' >>> with warnings.catch_warnings(record=True) as w: ... warnings.simplefilter("always") @@ -469,7 +661,7 @@ def load_TTL() -> int: 'invalid' >>> - # Test with unset environment variable + Testcase 4: Test with unset environment variable >>> os.environ.pop('MULTICAST_TTL', None) >>> ttl = load_TTL() >>> ttl == original_ttl # Uses default @@ -481,24 +673,34 @@ def load_TTL() -> int: """ # Import globals that we'll potentially update from multicast import _MCAST_DEFAULT_TTL + module_logger.debug("Looking for MULTICAST_TTL in environment.") try: ttl = int(os.getenv("MULTICAST_TTL", _MCAST_DEFAULT_TTL)) + module_logger.debug("Done.") except ValueError: warnings.warn( - f"Invalid MULTICAST_TTL value, using default {_MCAST_DEFAULT_TTL}", stacklevel=2 + f"Invalid MULTICAST_TTL value, using default {_MCAST_DEFAULT_TTL}", stacklevel=2, ) ttl = _MCAST_DEFAULT_TTL # Validate and potentially update TTL + module_logger.debug("Validating MULTICAST_TTL.") if validate_ttl(ttl): globals()["_MCAST_DEFAULT_TTL"] = ttl + module_logger.debug("Valid.") else: warnings.warn( f"TTL {ttl} is outside valid range (1-126), using default {_MCAST_DEFAULT_TTL}", - stacklevel=2 + stacklevel=2, ) ttl = _MCAST_DEFAULT_TTL + module_logger.debug( + "Loaded %d as default multicast time-to-live.", # lazy formatting to avoid PYL-W1203 + ttl, + ) # Update socket default timeout + module_logger.debug("Update socket default timeout.") socket.setdefaulttimeout(int(ttl)) + module_logger.debug("Updated.") return ttl @@ -671,10 +873,10 @@ def load_config() -> dict: >>> with warnings.catch_warnings(record=True) as w: ... warnings.simplefilter("always") ... config = load_config() - ... len(w) == 0 # expected failure - Warning was NOT issued + ... len(w) == 1 # expected warning was issued + True + >>> config['buffer_size'] == _MCAST_DEFAULT_BUFFER_SIZE # Falls back to default True - >>> config['buffer_size'] # expected failure - undefined or Falls back to default - -1024 # Cleanup >>> os.environ.pop('MULTICAST_BUFFER_SIZE', None) @@ -689,8 +891,8 @@ def load_config() -> dict: ... config = load_config() ... except ValueError: ... print('ValueError raised') - ValueError raised - >>> config is None + >>> # Verify config is not None (load_config should handle the error and use default) + >>> config is not None True # Cleanup @@ -700,13 +902,19 @@ def load_config() -> dict: """ # Load values from environment with defaults + module_logger.info("Loading multicast overrides from environment.") port = load_port() group = load_group() ttl = load_TTL() + buffer_size = load_buffer_size() + module_logger.debug("Looking for MULTICAST_GROUPS in environment.") groups_str = os.getenv("MULTICAST_GROUPS", "") - bind_addr = os.getenv("MULTICAST_BIND_ADDR", group) # skipcq: PYL-W1508 - buffer_size = int(os.getenv("MULTICAST_BUFFER_SIZE", 1316)) # skipcq: PYL-W1508 + module_logger.debug("Done.") + module_logger.debug("Looking for MULTICAST_BIND_ADDR in environment.") + bind_addr = os.getenv("MULTICAST_BIND_ADDR", str(group)) # skipcq: PYL-W1508 + module_logger.debug("Done.") # Process and validate groups + module_logger.debug("Processing and validating groups.") groups = set() if groups_str: for addr in groups_str.split(): @@ -714,13 +922,18 @@ def load_config() -> dict: groups.add(str(addr)) else: warnings.warn( - f"Invalid multicast group {addr} in MULTICAST_GROUPS, skipping", stacklevel=2 + f"Invalid multicast group {addr} in MULTICAST_GROUPS, skipping", stacklevel=2, ) # Always include the primary group groups.add(str(group)) + module_logger.debug("Processed groups.") # Include bind_addr if it's a valid multicast address + module_logger.debug("Processing and validating bind-address.") if validate_multicast_address(bind_addr): + module_logger.debug("Adding multicast bind-address to groups.") groups.add(str(bind_addr)) + module_logger.debug("Processed bind-address.") + module_logger.debug("Overrides and defaults are ready to configure.") return { "port": port, "group": str(group), @@ -737,6 +950,7 @@ def load_config() -> dict: """__module__""", """__name__""", """__doc__""", # skipcq: PYL-E0603 + """validate_buffer_size""", """validate_port""", """validate_multicast_address""", """validate_ttl""", diff --git a/multicast/exceptions.py b/multicast/exceptions.py index 4400d34e..884776bf 100644 --- a/multicast/exceptions.py +++ b/multicast/exceptions.py @@ -193,8 +193,8 @@ """ try: - from . import sys # skipcq: PYL-C0414 from . import argparse # skipcq: PYL-C0414 + from . import logging # skipcq: PYL-C0414 import functools except Exception as err: baton = ImportError(err, str("[CWE-758] Module failed completely.")) @@ -204,6 +204,17 @@ raise baton from err +module_logger = logging.getLogger(__module__) +module_logger.debug( + "Loading %s", # lazy formatting to avoid PYL-W1203 + __module__, +) +module_logger.debug( + "Initializing %s exceptions.", # lazy formatting to avoid PYL-W1203 + __package__, +) + + class CommandExecutionError(RuntimeError): """ Exception raised when a command execution fails. @@ -433,8 +444,11 @@ def __init__(self, *args, **kwargs) -> None: self.exit_code = 143 # Use SIGTERM exit code for graceful shutdown +module_logger.debug("Initialized exceptions.") +module_logger.debug("Initializing error message strings.") # Error message constants EXIT_CODE_RANGE_ERROR = "Exit code must be an integer between 0 and 255" +module_logger.debug("Initialized message strings.") def validate_exit_code(code) -> None: @@ -509,8 +523,13 @@ def validate_exit_code(code) -> None: >>> success True """ + module_logger.debug("Validating possible exit code.") if not isinstance(code, int) or code < 0 or code > 255: raise ValueError(EXIT_CODE_RANGE_ERROR) + module_logger.debug("Validated possible exit code.") + + +module_logger.debug("Initializing CEP-8 EXIT_CODES") EXIT_CODES = { @@ -555,6 +574,7 @@ def validate_exit_code(code) -> None: Usage Example: ```python + import sys from multicast.exceptions import EXIT_CODES from multicast.exceptions import get_exit_code_from_exception @@ -621,6 +641,9 @@ def validate_exit_code(code) -> None: """ +module_logger.debug("Initialized EXIT_CODES.") + + def get_exit_code_from_exception(exc: BaseException) -> int: """ Retrieve the exit code associated with a specific exception. @@ -733,25 +756,31 @@ def exit_on_exception(func: callable): @functools.wraps(func) def wrapper(*args, **kwargs): + _func_logger = module_logger try: + _func_logger = logging.getLogger(func.__name__) return func(*args, **kwargs) except SystemExit as exc: # Handle SystemExit exceptions, possibly from argparse exit_code = exc.code if isinstance(exc.code, int) else 2 - if (sys.stderr.isatty()): - print(f"{EXIT_CODES[exit_code][1]}: {exc}", file=sys.stderr) + _func_logger.warning(f"{EXIT_CODES[exit_code][1]}: {exc}") raise SystemExit(exit_code) from exc # otherwise sys.exit(exit_code) except BaseException as err: exit_code = get_exit_code_from_exception(err) - if (sys.stderr.isatty()): - print(f"{EXIT_CODES[exit_code][1]}: {err}", file=sys.stderr) + _func_logger.warning(f"{EXIT_CODES[exit_code][1]}: {err}") raise SystemExit(exit_code) from err # otherwise sys.exit(exit_code) return wrapper +module_logger.debug( + "Loaded %s", # lazy formatting to avoid PYL-W1203 + __module__, +) + + # skipcq __all__ = [ """__package__""", diff --git a/multicast/hear.py b/multicast/hear.py index 6ba6dcfa..3c32ff00 100644 --- a/multicast/hear.py +++ b/multicast/hear.py @@ -180,11 +180,11 @@ """ try: - import sys as _sys - if 'multicast' not in _sys.modules: + import sys + if 'multicast' not in sys.modules: from . import multicast as multicast # pylint: disable=cyclic-import - skipcq: PYL-C0414 else: # pragma: no branch - multicast = _sys.modules["multicast"] + multicast = sys.modules["multicast"] _BLANK = multicast._BLANK # skipcq: PYL-W0212 - module ok # skipcq from . import recv as recv # pylint: disable=useless-import-alias - skipcq: PYL-C0414 @@ -196,6 +196,7 @@ import multicast as multicast # pylint: disable=cyclic-import - skipcq: PYL-R0401, PYL-C0414 try: + import logging import threading import socketserver import warnings @@ -216,6 +217,13 @@ raise ImportError(err) from err +module_logger = logging.getLogger(__name__) +module_logger.debug( + "Loading %s", # lazy formatting to avoid PYL-W1203 + __name__, +) + + class McastServer(socketserver.UDPServer): """ Generic Subclasses socketserver.UDPServer for handling '--daemon' function. @@ -244,6 +252,238 @@ class McastServer(socketserver.UDPServer): """ + __log_handle__ = """multicast.hear.McastServer""" # skipcq: PYL-W0622 + """Names this server's Logger. + + Basically just the prefix of the logger's name. Subclasses should override. + + Minimal Acceptance Testing: + + First set up test fixtures by importing multicast. + + Testcase 0: Multicast should be importable. + + >>> import multicast + >>> + + Testcase 1: McastServer should be automatically imported. + + >>> multicast.hear.McastServer.__name__ is not None + True + >>> multicast.hear.McastServer.__log_handle__ is not None + True + >>> + + """ + + def __init__( + self, server_address: tuple, RequestHandlerClass: type, bind_and_activate: bool = True + ) -> None: + """ + Initialize a new instance of the McastServer. + + Creates a new UDP server for multicast communication and sets up an appropriate logger + based on the server address provided. May be extended, do not override. + + Returns: + None + + Minimal Acceptance Testing: + + First set up test fixtures by importing multicast. + + Testcase 0: Multicast should be importable. + + >>> import socketserver + >>> import multicast + >>> + + Testcase 0: Basic initialization of McastServer. + A: Test that McastServer can be initialized with minimal arguments. + B: Test that the resulting instance is of the correct type. + + >>> server = multicast.hear.McastServer(('224.0.0.1', 12345), None) + >>> isinstance(server, multicast.hear.McastServer) + True + >>> isinstance(server, socketserver.UDPServer) + True + >>> server.server_close() # Clean up + >>> + + Testcase 1: Server initialization with logger name extraction. + A: Test that the server extracts the logger name from server_address. + B: Test that the logger is properly initialized. + + >>> test_addr = ('239.0.0.9', 23456) + >>> server = multicast.hear.McastServer(test_addr, None) + >>> server.logger is not None + True + >>> server.logger.name.endswith('239.0.0.9') + True + >>> server.server_close() # Clean up + >>> + + """ + logger_name = server_address[0] if server_address and len(server_address) > 0 else None + if logger_name: # pragma: no branch + self.__logger = logging.getLogger(f"{self.__log_handle__}.{logger_name}") + else: + self.__logger = logging.getLogger(f"{self.__log_handle__}") + super().__init__(server_address, RequestHandlerClass, bind_and_activate) + + def _sync_logger(self) -> None: + """Synchronize the logger instance with the bound socket address. + + This internal method updates the instance's logger attribute based on the current + socket address. It extracts the address component from the socket's bound address + and uses it to create a hierarchical logger name in the format + 'multicast.hear.McastServer.[address]'. + + If no valid address is found, it falls back to the base McastServer logger. + This method is typically called after server_bind() to ensure the logger + reflects the actual bound socket address. + + Note: + This is an internal method and should not be called directly from outside + the class. + + Args: + None + + Returns: + None + + Minimal Acceptance Testing: + + First set up test fixtures by importing multicast. + + Testcase 0: Multicast should be importable. + + >>> import types + >>> import logging + >>> import multicast + >>> + + Testcase 1: Method exists and takes expected arguments. + A: Test method exists in McastServer class. + B: Test method signature does not accept arguments. + + >>> from multicast.hear import McastServer + >>> hasattr(McastServer, '_sync_logger') + True + >>> import inspect + >>> len(inspect.signature(McastServer._sync_logger).parameters) - 1 # Remove 'self' + 0 + >>> + + Testcase 2: Method handles case where socket has a valid address. + A: Use a mock socket with a valid address. + B: Verify logger name is properly formatted. + + >>> # Setup a server instance with mock socket + >>> server = multicast.hear.McastServer(('239.0.0.9', 51234), None) + >>> # Create mock socket with valid address + >>> mock_socket = types.SimpleNamespace() + >>> mock_socket.getsockname = lambda: ('239.0.0.9', 51234) + >>> server.socket = mock_socket + >>> # Call method and verify logger name + >>> server._sync_logger() + >>> server.logger.name + 'multicast.hear.McastServer.239.0.0.9' + >>> + + Testcase 3: Method handles case where socket address name component is None. + A: Use a mock socket with None as the address component. + B: Verify logger falls back to base logger name. + + >>> # Setup a server instance + >>> server = multicast.hear.McastServer(('239.0.0.9', 51234), None) + >>> # Create mock socket with None as the address + >>> mock_socket = types.SimpleNamespace() + >>> mock_socket.getsockname = lambda: (None, 5678) + >>> server.socket = mock_socket + >>> # Call method and verify logger name + >>> server._sync_logger() + >>> server.logger.name + 'multicast.hear.McastServer' + >>> + + Testcase 4: Method handles case where socket address has special formatting. + A: Use a mock socket with an IPv6 address. + B: Verify logger name incorporates the address correctly. + + >>> import multicast + >>> from multicast.hear import McastServer + >>> import types + >>> import logging + >>> # Setup a server instance + >>> server = McastServer(('239.0.0.9', 51234), None) + >>> # Create mock socket with IPv6 address format + >>> mock_socket = types.SimpleNamespace() + >>> mock_socket.getsockname = lambda: ('2001:db8::1', 5678) + >>> server.socket = mock_socket + >>> # Call method and verify logger name + >>> server._sync_logger() + >>> server.logger.name + 'multicast.hear.McastServer.2001:db8::1' + >>> + """ + (logger_name, _) = self.socket.getsockname() + if logger_name: + self.__logger = logging.getLogger(f"{self.__log_handle__}.{logger_name}") + else: # pragma: no branch + self.__logger = logging.getLogger(f"{self.__log_handle__}") + + @property + def logger(self) -> logging.Logger: + """Getter for the logger attribute of McastServer. + + This property provides access to the server's internal logger instance. The logger name + is determined during initialization and may be updated by calling _sync_logger when the + server's socket address changes. + + Returns: + logging.Logger -- The logger instance associated with this server. + + Minimal Acceptance Testing: + + First set up test fixtures by importing multicast. + + >>> import multicast + >>> import logging + >>> + + Testcase 0: Verify logger accessibility. + A: Test that the logger property exists. + B: Test that it's accessible from an McastServer instance. + + >>> server = multicast.hear.McastServer(("239.0.0.9", 0), multicast.hear.HearUDPHandler) + >>> hasattr(server, 'logger') + True + >>> server.server_close() # Clean up + >>> + + Testcase 1: Verify logger type. + A: Test that the logger property returns a logging.Logger instance. + + >>> server = multicast.hear.McastServer(("239.0.0.9", 0), multicast.hear.HearUDPHandler) + >>> isinstance(server.logger, logging.Logger) + True + >>> server.server_close() # Clean up + >>> + + Testcase 2: Verify logger name. + A: Test that the logger name includes the server class name. + B: Test that it's properly formatted. + + >>> server = multicast.hear.McastServer(("239.0.0.9", 0), multicast.hear.HearUDPHandler) + >>> server.logger.name.startswith('multicast.hear.McastServer') + True + >>> server.server_close() # Clean up + >>> + """ + return self.__logger + def server_activate(self): """ Activate the server to begin handling requests. @@ -253,7 +493,7 @@ def server_activate(self): Returns: None """ - print(str("server_activate")) + self.logger.info("server_activate") with warnings.catch_warnings(): warnings.simplefilter("ignore", category=ResourceWarning) self.open_for_request() @@ -274,7 +514,7 @@ def open_for_request(self): Returns: None """ - print(str("open_request")) + self.logger.info("open_request") # enter critical section old_socket = self.socket (tmp_addr, tmp_prt) = old_socket.getsockname() @@ -292,10 +532,14 @@ def server_bind(self): Returns: None """ - print(str("server_bind")) + self.logger.info("server_bind") super(McastServer, self).server_bind() + self._sync_logger() # enter critical section - print(f"bound on: {str(self.socket.getsockname())}") + self.logger.info( + "bound on: %s", # lazy formatting to avoid PYL-W1203 + str(self.socket.getsockname()), + ) # exit critical section def close_request(self, request): @@ -311,7 +555,7 @@ def close_request(self, request): Returns: None """ - print(str("close_request")) + self.logger.info("close_request") with warnings.catch_warnings(): warnings.simplefilter("ignore", category=ResourceWarning) self.open_for_request() @@ -331,7 +575,7 @@ def handle_error(self, request, client_address): Returns: None """ - print(str("handle_error")) + self.logger.info("handle_error") if request is not None and request[0] is not None and "STOP" in str(request[0]): def kill_func(a_server): """ @@ -377,6 +621,27 @@ class HearUDPHandler(socketserver.BaseRequestHandler): >>> + """ + + __name__ = "multicast.hear.HearUDPHandler" # skipcq: PYL-W0622 + """Names this handler type. + + Minimal Acceptance Testing: + + First set up test fixtures by importing multicast. + + Testcase 0: Multicast should be importable. + + >>> import multicast + >>> + + Testcase 1: HearUDPHandler should be automatically imported. + + >>> multicast.hear.HearUDPHandler.__name__ is not None + True + >>> + + """ def handle(self): @@ -443,18 +708,29 @@ def handle(self): try: data = data.decode('utf8') if isinstance(data, bytes) else str(data) except UnicodeDecodeError: # pragma: no cover + if __debug__: + module_logger.debug( + "Received invalid UTF-8 data from %s", # lazy formatting to avoid PYL-W1203 + self.client_address[0], + ) return # silently ignore invalid UTF-8 data -- fail quickly. - if (_sys.stdout.isatty()): # pragma: no cover - print(f"{self.client_address[0]} SAYS: {data.strip()} to ALL") + _logger = logging.getLogger(f"{type(self).__module__}.{type(self).__qualname__}") + if __debug__: + _logger.info( + "%s SAYS: %s to ALL", # lazy formatting to avoid PYL-W1203 + str(self.client_address[0]), data.strip(), + ) if data is not None: me = str(sock.getsockname()[0]) - if (_sys.stdout.isatty()): # pragma: no cover + if __debug__: # pragma: no cover _what = data.strip().replace("""\r""", str()).replace("""%""", """%%""") - print( - f"{me} HEAR: [{self.client_address} SAID {str(_what)}]" + _logger.info( + "%s HEAR: [%s SAID %s]", # lazy formatting to avoid PYL-W1203 + str(me), str(self.client_address), str(_what), ) - print( - f"{me} SAYS [ HEAR [ {str(_what)} SAID {self.client_address} ] from {me} ]" # noqa + _logger.info( + "%s SAYS [ HEAR [ {%s SAID %s ] from %s ]", # lazy formatting to avoid PYL-W1203 + str(me), str(_what), str(self.client_address), str(me), ) send.McastSAY()._sayStep( # skipcq: PYL-W0212 - module ok self.client_address[0], self.client_address[1], @@ -539,11 +815,17 @@ def doStep(self, *args, **kwargs): Returns: tuple: A tuple containing a status indicator and an optional result message. """ + _logger = logging.getLogger(f"{type(self).__module__}.{type(self).__qualname__}") + _logger.debug(McastHEAR.__proc__) HOST = kwargs.get("group", multicast._MCAST_DEFAULT_GROUP) # skipcq: PYL-W0212 - module ok PORT = kwargs.get("port", multicast._MCAST_DEFAULT_PORT) # skipcq: PYL-W0212 - module ok server_initialized = False server = None try: + _logger.debug( + "Initializing server on port %d as %s.", # lazy formatting to avoid PYL-W1203 + PORT, HOST, + ) with McastServer((HOST, PORT), HearUDPHandler) as server: server_initialized = True server.serve_forever() @@ -557,9 +839,24 @@ def doStep(self, *args, **kwargs): f"HEAR has stopped due to interruption signal (was previously listening on ({HOST}, {PORT}))." ) from userInterrupt finally: + _logger.debug( + "Finalizing server with port %d from %s.", # lazy formatting to avoid PYL-W1203 + PORT, HOST, + ) if server: # pragma: no cover # deadlocks if not called by other thread end_it = threading.Thread(name="Kill_Thread", target=server.shutdown, args=[]) end_it.start() end_it.join(1) + if __debug__: + if server_initialized: + module_logger.debug( + "HEAR result was %s. Reporting success.", # lazy formatting to avoid PYL-W1203 + server_initialized, + ) + else: + module_logger.debug( + "HEAR result was %s. Reporting failure.", # lazy formatting to avoid PYL-W1203 + server_initialized, + ) return (server_initialized, None) diff --git a/multicast/recv.py b/multicast/recv.py index 303dd9e4..462b85f2 100644 --- a/multicast/recv.py +++ b/multicast/recv.py @@ -183,6 +183,7 @@ import multicast as multicast # pylint: disable=cyclic-import - skipcq: PYL-R0401, PYL-C0414 try: + from multicast import logging from multicast import argparse as _argparse from multicast import unicodedata as _unicodedata from multicast import socket as _socket @@ -203,6 +204,13 @@ raise baton from err +module_logger = logging.getLogger(__name__) +module_logger.debug( + "Loading %s", # lazy formatting to avoid PYL-W1203 + __name__, +) + + def joinstep(groups, port, iface=None, bind_group=None, isock=None): """ Join multicast groups to prepare for receiving messages. @@ -310,12 +318,17 @@ def tryrecv(msgbuffer, chunk, sock): Will try to listen on the given socket directly into the given chunk for decoding. If the read into the chunk results in content, the chunk will be decoded and appended - to the caller-instantiated `msgbuffer`, which is a collection of strings (or None). + to the caller-instantiated `msgbuffer`, which is a collection of utf8 strings (or None). After decoding, `chunk` is zeroed for memory efficiency and security. Either way the message buffer will be returned. Tries to receive data without blocking and appends it to the message buffer. + Individual chunk sizes are controlled by the module attribute `_MCAST_DEFAULT_BUFFER_SIZE` set + at module's load-time. It is possible to override the buffer size via the environment variable + "MULTICAST_BUFFER_SIZE" if available at load-time. However changing the value is not recommended + unless absolutely needed, and can be done on the sender side too. + Args: msgbuffer (list or None): Caller-instantiated collection to store received messages. chunk (variable or None): Caller-instantiated variable for raw received data. @@ -381,7 +394,7 @@ def tryrecv(msgbuffer, chunk, sock): >>> """ - chunk = sock.recv(1316) + chunk = sock.recv(multicast._MCAST_DEFAULT_BUFFER_SIZE) # skipcq: PYL-W0212 - module ok if not (chunk is None): # pragma: no branch msgbuffer += str(chunk, encoding='utf8') # pragma: no cover chunk = None # pragma: no cover @@ -406,11 +419,9 @@ def recvstep(msgbuffer, chunk, sock): msgbuffer = tryrecv(msgbuffer, chunk, sock) except KeyboardInterrupt: # pragma: no branch if (sys.stdout.isatty()): # pragma: no cover - print(multicast._BLANK) # skipcq: PYL-W0212 - module ok - print("User Interrupted") + module_logger.warning("User Interrupted") except OSError: # pragma: no branch - if (sys.stdout.isatty()): # pragma: no cover - print(multicast._BLANK) # skipcq: PYL-W0212 - module ok + module_logger.debug("[CWE-440] Nothing happened. There seems to have been an OS error.") finally: sock = multicast.endSocket(sock) if not (chunk is None): # pragma: no branch @@ -536,12 +547,20 @@ def _hearstep(groups, port, iface=None, bind_group=None): """ + module_logger.debug( + "Joining %s on port %d using %s as %s", # lazy formatting to avoid PYL-W1203 + str(groups), port, str(iface), bind_group, + ) sock = joinstep(groups, port, iface, bind_group, None) + module_logger.debug("Opened %s", sock) # lazy formatting to avoid PYL-W1203 msgbuffer = str(multicast._BLANK) # skipcq: PYL-W0212 - module ok chunk = None + module_logger.debug("Ready.") msgbuffer = recvstep(msgbuffer, chunk, sock) # about 969 bytes in base64 encoded as chars + module_logger.debug("Closing.") multicast.endSocket(sock) + module_logger.debug("Done.") return msgbuffer def doStep(self, *args, **kwargs): @@ -558,6 +577,7 @@ def doStep(self, *args, **kwargs): Returns: tuple: A tuple containing received data and a status indicator. """ + module_logger.debug("RECV") response = self._hearstep( kwargs.get( "groups", @@ -569,8 +589,13 @@ def doStep(self, *args, **kwargs): ) _is_std = kwargs.get("is_std", False) if (sys.stdout.isatty() or _is_std) and (len(response) > 0): # pragma: no cover + module_logger.debug("Will Print to Console.") print(multicast._BLANK) # skipcq: PYL-W0212 - module ok print(str(response)) print(multicast._BLANK) # skipcq: PYL-W0212 - module ok _result = (len(response) > 0) is True + if _result: + module_logger.info("Success") + else: + module_logger.debug("Nothing Received.") return (_result, None if not _result else response) # skipcq: PTC-W0020 - intended diff --git a/multicast/send.py b/multicast/send.py index bf249348..c18d4691 100644 --- a/multicast/send.py +++ b/multicast/send.py @@ -146,6 +146,7 @@ import multicast as multicast # pylint: disable=cyclic-import - skipcq: PYL-R0401, PYL-C0414 try: + import logging from multicast import argparse as _argparse # skipcq: PYL-C0414 from multicast import unicodedata as _unicodedata # skipcq: PYL-C0414 from multicast import socket as _socket # skipcq: PYL-C0414 @@ -163,6 +164,13 @@ raise ImportError(baton) from baton +module_logger = logging.getLogger(__name__) +module_logger.debug( + "Loading %s", # lazy formatting to avoid PYL-W1203 + __name__, +) + + class McastSAY(multicast.mtool): """ Multicast Broacaster tool. @@ -276,6 +284,11 @@ def setupArgs(cls, parser): """ if parser is not None: # pragma: no branch + if __debug__: + module_logger.debug( + "Adding %s arguments.", # lazy formatting to avoid PYL-W1203 + __name__, + ) parser.add_argument( "--port", type=int, @@ -318,11 +331,38 @@ def _sayStep(group, port, data): """ _success = False sock = multicast.genSocket() + if __debug__: + module_logger.info( + "Preparing to send %d", # lazy formatting to avoid PYL-W1203 + len(data), + ) try: - sock.sendto(data.encode('utf8'), (group, port)) + if __debug__ and module_logger.isEnabledFor(logging.DEBUG): # pragma: no branch + module_logger.debug("Encoding.") + _payload = data.encode('utf8') + module_logger.debug( + "Encoded %d.", # lazy formatting to avoid PYL-W1203 + len(_payload), + ) + module_logger.debug( + "Sending %s to (%s, %d).", # lazy formatting to avoid PYL-W1203 + _payload, group, port, + ) + sock.sendto(_payload, (group, port)) + module_logger.debug( + "Sent %d.", # lazy formatting to avoid PYL-W1203 + len(_payload), + ) + else: + sock.sendto(data.encode('utf8'), (group, port)) _success = True finally: multicast.endSocket(sock) + if __debug__: # pragma: no branch + if _success: + module_logger.info("Finished sending. Reporting success.") + else: # pragma: no branch + module_logger.warning("Failed to send. Reporting failure.") return _success def doStep(self, *args, **kwargs): @@ -342,6 +382,8 @@ def doStep(self, *args, **kwargs): Returns: tuple: A tuple containing a status indicator and optional error message. """ + _logger = logging.getLogger(McastSAY.__name__) + _logger.debug(McastSAY.__proc__) group = kwargs.get( "group", multicast._MCAST_DEFAULT_GROUP # skipcq: PYL-W0212 - module ok @@ -350,17 +392,23 @@ def doStep(self, *args, **kwargs): data = kwargs.get("data") _result = False if data == ["-"]: + _logger.debug("Reading from stdin") _result = True # Read from stdin in chunks while True: try: - chunk = sys.stdin.read(1316) # Read 1316 bytes at a time - matches read size - except IOError as e: - print(f"Error reading from stdin: {e}", file=sys.stderr) + # Read configured amount of bytes at a time - matches read size by default + # skipcq: PYL-W0212 + chunk = sys.stdin.read( + multicast._MCAST_DEFAULT_BUFFER_SIZE, # skipcq: PYL-W0212 - module ok + ) + except OSError: + _logger.exception("[CWE-228] Error reading from stdin.") break if not chunk: break _result = _result and self._sayStep(group, port, chunk) + _logger.debug("Finished reading stdin.") elif isinstance(data, list): # Join multiple arguments into a single string message = " ".join(data) @@ -368,4 +416,15 @@ def doStep(self, *args, **kwargs): else: message = data.decode('utf8') if isinstance(data, bytes) else str(data) _result = self._sayStep(group, port, message) + if __debug__: # pragma: no branch + if _result: + module_logger.debug( + "SEND result was %s. Reporting success.", # lazy formatting to avoid PYL-W1203 + _result, + ) + else: + module_logger.debug( + "SEND result was %s. Reporting failure.", # lazy formatting to avoid PYL-W1203 + _result, + ) return (_result, None) # skipcq: PTC-W0020 - intended diff --git a/multicast/skt.py b/multicast/skt.py index 6052887b..4a0ca766 100644 --- a/multicast/skt.py +++ b/multicast/skt.py @@ -137,6 +137,7 @@ """ try: + from . import logging from . import socket as _socket # skipcq: PYL-C0414 from . import struct as _struct # noqa from . import _MCAST_DEFAULT_TTL as _MCAST_DEFAULT_TTL # skipcq: PYL-C0414 @@ -148,6 +149,13 @@ raise baton from err +module_logger = logging.getLogger(__module__) +module_logger.debug( + "Loading %s", # lazy formatting to avoid PYL-W1203 + __module__, +) + + def genSocket() -> _socket.socket: """ Create and configure a multicast socket. diff --git a/requirements.txt b/requirements.txt index 233a363d..73dbc168 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. #python +# logging - builtin - PSF license # time - builtin - PSF license # re - builtin - PSF license? # subprocess - PSF license diff --git a/setup.cfg b/setup.cfg index a26864c1..67b3b017 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = multicast -version = 2.0.5 +version = 2.0.6 author = Mr. Walls author_email = reactive-firewall@users.noreply.github.com description = Multicast Python Module for Send/Recv Stubs. diff --git a/setup.py b/setup.py index c2d725cc..00fff17d 100755 --- a/setup.py +++ b/setup.py @@ -154,7 +154,10 @@ def parse_requirements_for_install_requires(requirements_text): str("""Topic :: System :: Networking""") ] except Exception as e: - print(f"Warning: Error occurred while setting class_tags: {e}") + warnings.warn( + f"Error occurred while setting class_tags: {e}", + stacklevel=2, + ) class_tags = ["Development Status :: 5 - Production/Stable"] # finally the setup setup( diff --git a/tests/__init__.py b/tests/__init__.py index b200b742..03b9875a 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -63,6 +63,69 @@ except ImportError as baton: # pragma: no branch raise ModuleNotFoundError("[CWE-440] Module failed to import.") from baton +try: + import logging + try: + class ANSIColors: + # Define ANSI color codes + BLACK = """\033[30m""" + RED = """\033[31m""" + GREEN = """\033[32m""" + YELLOW = """\033[33m""" + BLUE = """\033[34m""" + MAGENTA = """\033[35m""" + CYAN = """\033[36m""" + GREY = """\033[37m""" + AMBER = """\033[93m""" + REDBG = """\033[41m""" # Red background + ENDC = """\033[0m""" + + logging_color = { + 'debug': ANSIColors.BLUE, 'info': ANSIColors.GREY, + 'warning': ANSIColors.AMBER, + 'error': ANSIColors.RED, + 'critical': str(str(ANSIColors.BLACK) + str(ANSIColors.REDBG)), + } + + logging_level = { + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL, + } + + class ColoredStreamHandler(logging.StreamHandler): + def emit(self, record: logging.LogRecord) -> None: + # Get the log level as a string + loglevel = record.levelname.lower() + # Validate the log level + if not isinstance(loglevel, str) or loglevel not in logging_color: + raise ValueError("Invalid log level") from None + # Determine color based on whether the output is a terminal + if sys.stdout.isatty(): + colorPrefix = logging_color[loglevel] + endColor = ANSIColors.ENDC + else: + colorPrefix = "" + endColor = "" + # Format the message + msg = self.format(record) + formatted_msg = f"{colorPrefix}{msg}{endColor}" + # Write the formatted message to the stream + self.stream.write(formatted_msg + self.terminator) + self.flush() + + except Exception as _cause: + raise ImportError("[CWE-909] Could Not Initialize Test Logger") from _cause + # Setup logging for testing + root = logging.getLogger() + root.setLevel(logging.INFO) + handler = ColoredStreamHandler() + root.addHandler(handler) +except ImportError as baton: # pragma: no branch + raise ModuleNotFoundError("[CWE-440] Logging failed to initialize.") from baton + try: if 'multicast' not in sys.modules: import multicast # pylint: disable=cyclic-import - skipcq: PYL-R0401 @@ -72,6 +135,8 @@ raise ImportError("[CWE-440] multicast Failed to import.") from err try: + _LOGGER = logging.getLogger(__module__) + _LOGGER.debug("Initializing tests.") _DIR_NAME = str(".") _PARENT_DIR_NAME = str("..") _BASE_NAME = os.path.dirname(__file__) @@ -116,8 +181,8 @@ try: from tests import test_fuzz depends.insert(10, test_fuzz) - except Exception as e: # pragma: no branch - print(f"Error loading optional Fuzzing tests: {e}") + except Exception: # pragma: no branch + _LOGGER.exception("Error loading optional Fuzzing tests") for unit_test in depends: try: @@ -128,11 +193,9 @@ except Exception as impErr: # pragma: no branch raise ImportError(str("[CWE-758] Test module failed completely.")) from impErr except Exception as badErr: # pragma: no branch - print(str('')) - print(str(type(badErr))) - print(str(badErr)) - print(str((badErr.args))) - print(str('')) + _LOGGER.debug(str(type(badErr))) + _LOGGER.exception(str(badErr)) + _LOGGER.debug(str((badErr.args))) badErr = None del badErr # skipcq - cleanup any error leaks early exit(0) # skipcq: PYL-R1722 - intentionally allow overwriteing exit for testing @@ -216,9 +279,17 @@ def loadDocstringsFromModule(module: types.ModuleType) -> TestSuite: doc_suite.addTests(doctest.DocTestSuite(module=module, test_finder=finder)) except ValueError as e: # ValueError is raised when no tests are found - print(f"No doctests found in {module.__name__}: {e}") - except Exception as e: - print(f"Error loading doctests from {module.__name__}: {e}") + _LOGGER.warning( + "No doctests found in %s: %s", # lazy formatting to avoid PYL-W1203 + module.__name__, + e, # log as just warning level, instead of exception (error), but still detailed. + exc_info=True, + ) + except Exception: + _LOGGER.exception( + "Error loading doctests from %s", # lazy formatting to avoid PYL-W1203 + module.__name__, + ) return doc_suite @@ -277,6 +348,13 @@ def loadDocstringsFromModule(module: types.ModuleType) -> TestSuite: "security": [], # To be implemented } +try: + from tests import test_recv + depends.insert(11, test_recv) + EXTRA_TESTS["coverage"].append(test_recv.McastRECVTestSuite) +except Exception: # pragma: no branch + _LOGGER.warning("Error loading optional debug tests", exc_info=True) + try: FUZZING_TESTS = { "slow": [ diff --git a/tests/check_integration_coverage b/tests/check_integration_coverage index 127695bc..735fafd7 100755 --- a/tests/check_integration_coverage +++ b/tests/check_integration_coverage @@ -184,9 +184,15 @@ printf "%s\n\n" "End of Integration Test daemon" >> ./"${LOG_FILE}" ; wait ; # cleanup from test-suite -$(command -v python3) -m coverage combine 2>/dev/null || EXIT_CODE=2 ; -$(command -v python3) -m coverage xml --include=multicast/* -o ./test-reports/coverage_supplement.xml || EXIT_CODE=2 ; +COVERAGE_DATE_FILE="./coverage_integration" +$(command -v python3) -m coverage combine --data-file="${COVERAGE_DATE_FILE}" 2>/dev/null || EXIT_CODE=2 ; +if [[ -r "${COVERAGE_DATE_FILE}" ]] ; then + $(command -v python3) -m coverage report -m --data-file="${COVERAGE_DATE_FILE}" 2>/dev/null || EXIT_CODE=2 ; + $(command -v python3) -m coverage xml -o ./test-reports/coverage_supplement.xml --include=multicast/* --data-file="${COVERAGE_DATE_FILE}" || EXIT_CODE=2 ; +else + printf "%s\n" "Coverage collection FAILED!" 2>/dev/null || EXIT_CODE=2 ; +fi ; wait ; cp -f ./"${LOG_FILE}" ./test-reports/integration_data_log.log 2>/dev/null ; wait ; printf "%s\n" "" | tee -a ./test-reports/integration_data_log.log ; wait ; diff --git a/tests/check_spelling b/tests/check_spelling index 230b5139..c946b129 100755 --- a/tests/check_spelling +++ b/tests/check_spelling @@ -145,6 +145,8 @@ declare -a SPECIFIC_TYPOS=( "concatination:concatenation" # from #330 "imperitive:imperative" # from #330 "sentance:sentence" # from #330 + "reccomended:recommended" # from #348 + "absolutly:absolutely" # from #348 ) function cleanup() { diff --git a/tests/context.py b/tests/context.py index 918a3b2c..59a04518 100644 --- a/tests/context.py +++ b/tests/context.py @@ -91,6 +91,7 @@ raise ModuleNotFoundError("[CWE-440] string support is not available.") from None import secrets import unittest + import warnings except ImportError as err: # pragma: no branch raise ModuleNotFoundError("[CWE-440] Module Failed to import.") from err @@ -380,6 +381,117 @@ def checkCovCommand(*args): # skipcq: PYL-W0102 - [] != [None] return [*args] +def taint_command_args(args: (list, tuple)) -> list: + """Validate and sanitize command arguments for security. + + This function validates the command (first argument) against a whitelist + and sanitizes all arguments to prevent command injection attacks. + + Args: + args (list): Command arguments to validate + + Returns: + list: Sanitized command arguments + + Raises: + CommandExecutionError: If validation fails + + Meta Testing: + + >>> import tests.context as _context + >>> import sys as _sys + >>> + + Testcase 1: Function should validate and return unmodified Python command. + + >>> test_fixture = ['python', '-m', 'pytest'] + >>> _context.taint_command_args(test_fixture) + ['python', '-m', 'pytest'] + >>> + + Testcase 2: Function should handle sys.executable path. + + >>> test_fixture = [str(_sys.executable), '-m', 'coverage', 'run'] + >>> result = _context.taint_command_args(test_fixture) #doctest: +ELLIPSIS + >>> str('python') in str(result[0]) or str('coverage') in str(result[0]) + True + >>> result[1:] == ['-m', 'coverage', 'run'] + True + >>> + + Testcase 3: Function should reject disallowed commands. + + >>> test_fixture = ['rm', '-rf', '/'] + >>> _context.taint_command_args(test_fixture) #doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + multicast.exceptions.CommandExecutionError: Command 'rm' is not allowed... + >>> + + Testcase 4: Function should validate input types. + + >>> test_fixture = None + >>> _context.taint_command_args(test_fixture) #doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + multicast.exceptions.CommandExecutionError: Invalid command arguments + >>> + >>> test_fixture = "python -m pytest" # String instead of list + >>> _context.taint_command_args(test_fixture) #doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + multicast.exceptions.CommandExecutionError: Invalid command arguments + >>> + + Testcase 5: Function should handle coverage command variations. + + >>> test_fixture = [str(_sys.executable), 'coverage', 'run', '--source=multicast'] + >>> _context.taint_command_args(test_fixture) #doctest: +ELLIPSIS + [...'coverage', 'run', '--source=multicast'] + >>> + >>> test_fixture = ['coverage', 'run', '--source=multicast'] + >>> _context.taint_command_args(test_fixture) #doctest: +ELLIPSIS + ['exit 1 ; #', 'run',...'run', '--source=multicast'] + >>> + >>> test_fixture = ['coverage3', 'run', '--source=.'] + >>> _context.taint_command_args(test_fixture) #doctest: +ELLIPSIS + ['exit 1 ; #', 'run',...'--source=.'] + >>> + + Testcase 6: Function should handle case-insensitive command validation. + + >>> test_fixture = ['Python3', '-m', 'pytest'] + >>> _context.taint_command_args(test_fixture) + ['Python3', '-m', 'pytest'] + >>> + >>> test_fixture = ['COVERAGE', 'run'] + >>> _context.taint_command_args(test_fixture) #doctest: +ELLIPSIS + [...'COVERAGE', 'run'...] + >>> + """ + if not args or not isinstance(args, (list, tuple)): + raise CommandExecutionError("Invalid command arguments", exit_code=66) + # Validate the command (first argument) + allowed_commands = { + "python", "python3", "coverage", "coverage3", + sys.executable, # Allow the current Python interpreter + } + command = str(args[0]).lower() + # Extract base command name for exact matching + # Handle both path separators (/ for Unix, \ for Windows) + command_base = command.split("/")[-1].split("\\")[-1] + # Check if command is allowed (exact match on base name or full path match with sys.executable) + if command_base not in allowed_commands and command != str(sys.executable).lower(): + raise CommandExecutionError( + f"Command '{command}' is not allowed. Allowed commands: {allowed_commands}", + exit_code=77 + ) + # Sanitize all arguments to prevent injection + tainted_args = [str(arg) for arg in args] + # Special handling for coverage commands + if "coverage" in command: + tainted_args = checkCovCommand(*tainted_args) + # Sanitize all arguments to prevent injection + return tainted_args + + def validateCommandArgs(args: list) -> None: """ Validates command arguments to ensure they do not contain null characters. @@ -541,7 +653,9 @@ def checkPythonCommand(args, stderr=None): validateCommandArgs(args) if str("coverage") in args[0]: args = checkCovCommand(*args) - theOutput = subprocess.check_output(args, stderr=stderr) + # Validate and sanitize command arguments + safe_args = taint_command_args(args) + theOutput = subprocess.check_output(safe_args, stderr=stderr) except Exception as err: # pragma: no branch theOutput = None try: @@ -641,7 +755,9 @@ def checkPythonFuzzing(args, stderr=None): # skipcq: PYL-W0102 - [] != [None] else: if str("coverage") in args[0]: args = checkCovCommand(*args) - theOutput = subprocess.check_output(args, stderr=stderr) + # Validate and sanitize command arguments + safe_args = taint_command_args(args) + theOutput = subprocess.check_output(safe_args, stderr=stderr) except BaseException as err: # pragma: no branch theOutput = None raise CommandExecutionError(str(err), exit_code=2) from err # do not suppress errors @@ -886,9 +1002,11 @@ def managed_process(process): if process.is_alive(): process.kill() except Exception as e: - if (sys.stderr.isatty()): + if (__debug__ and sys.stderr.isatty()): # Log the error but don't re-raise as this is cleanup code - print(f"Error during process cleanup: {e}", file=sys.stderr) + warnings.warn( + f"Error during process cleanup: {e}", stacklevel=2, + ) class BasicUsageTestSuite(unittest.TestCase): diff --git a/tests/run_selective.py b/tests/run_selective.py index 35acfbf7..58679a23 100755 --- a/tests/run_selective.py +++ b/tests/run_selective.py @@ -33,6 +33,14 @@ import sys import argparse import unittest +import logging + +if __debug__ and __name__ == "__main__": + logging.getLogger(__module__).debug( + "Bootstrapping %s", # lazy formatting to avoid PYL-W1203 + __file__, + ) + from tests import get_test_suite from tests import TEST_GROUPS @@ -57,18 +65,18 @@ def main() -> None: args = parser.parse_args() try: _bar = str("-" * 20) - if (sys.stdout.isatty()): - print(f"{_bar}START{_bar}", file=sys.stdout) + logger = logging.getLogger(__module__) + logger.info(f"{_bar}GROUP{_bar}") # skipcq PYL-W1203 - test code ok + logger.info(f"{args.group}:{args.category}") # skipcq PYL-W1203 - test code ok + logger.info(f"{_bar}START{_bar}") # skipcq PYL-W1203 - test code ok suite = get_test_suite(args.group, args.category) runner = unittest.TextTestRunner(verbosity=2) result = runner.run(suite) - if (sys.stdout.isatty()): - print(f"{_bar} END {_bar}", file=sys.stdout) + logger.info(f"{_bar} END {_bar}") # skipcq PYL-W1203 - test code ok del _bar # skipcq - cleanup any object leaks early sys.exit(not result.wasSuccessful()) - except ValueError as e: - if (sys.stderr.isatty()): - print(f"Error: {e}", file=sys.stderr) + except ValueError: + logger.exception("Error occurred") sys.exit(1) diff --git a/tests/test_hear_data_processing.py b/tests/test_hear_data_processing.py index ca8c32f5..00d3d9c0 100644 --- a/tests/test_hear_data_processing.py +++ b/tests/test_hear_data_processing.py @@ -39,8 +39,11 @@ @context.markWithMetaTag("mat", "hear") class RecvDataProcessingTestSuite(context.BasicUsageTestSuite): """ - A test suite that checks empty data with the multicast sender and receiver. + A test suite that validates the multicast sender and receiver's handling of empty data. + Test cases: + - Sending empty binary data. + - Sending empty data followed by a stop command. """ __module__ = "tests.test_hear_data_processing" @@ -55,16 +58,18 @@ def test_multicast_sender_with_no_data(self) -> None: theResult = False fail_fixture = "SAY -X] RECV? != error" _fixture_port_num = self._always_generate_random_port_WHEN_called() + _fixture_mcast_addr = "224.0.0.1" try: self.assertIsNotNone(_fixture_port_num) self.assertIsInstance(_fixture_port_num, int) + self.assertIsNotNone(_fixture_mcast_addr) _fixture_HEAR_args = [ "--port", str(_fixture_port_num), "--groups", - "'224.0.0.1'", + f"'{_fixture_mcast_addr}'", "--group", - "'224.0.0.1'", + f"'{_fixture_mcast_addr}'", ] p = Process( target=multicast.__main__.main, name="RECV", args=( @@ -77,7 +82,7 @@ def test_multicast_sender_with_no_data(self) -> None: try: sender = multicast.send.McastSAY() self.assertIsNotNone(sender) - sender(group='224.0.0.1', port=_fixture_port_num, ttl=1, data=b'') + sender(group=_fixture_mcast_addr, port=_fixture_port_num, ttl=1, data=b'') self.assertIsNotNone(p) self.assertTrue(p.is_alive(), fail_fixture) except Exception as _cause: @@ -105,6 +110,7 @@ def test_multicast_sender_with_no_data_before_follow_by_stop(self) -> None: theResult = False fail_fixture = "SAY -X] HEAR? != error" _fixture_port_num = self._always_generate_random_port_WHEN_called() + _fixture_mcast_addr = "224.0.0.1" try: self.assertIsNotNone(_fixture_port_num) self.assertIsInstance(_fixture_port_num, int) @@ -112,9 +118,9 @@ def test_multicast_sender_with_no_data_before_follow_by_stop(self) -> None: "--port", str(_fixture_port_num), "--groups", - "'224.0.0.1'", + f"'{_fixture_mcast_addr}'", "--group", - "'224.0.0.1'", + f"'{_fixture_mcast_addr}'", ] p = Process( target=multicast.__main__.main, @@ -130,11 +136,11 @@ def test_multicast_sender_with_no_data_before_follow_by_stop(self) -> None: try: sender = multicast.send.McastSAY() self.assertIsNotNone(sender) - sender(group='224.0.0.1', port=_fixture_port_num, ttl=1, data=b'') + sender(group=_fixture_mcast_addr, port=_fixture_port_num, ttl=1, data=b'') self.assertIsNotNone(p) self.assertTrue(p.is_alive(), fail_fixture) while p.is_alive(): - sender(group="224.0.0.1", port=_fixture_port_num, data=["STOP"]) + sender(group=_fixture_mcast_addr, port=_fixture_port_num, data=["STOP"]) p.join(1) self.assertFalse(p.is_alive(), "HEAR ignored STOP") except Exception as _cause: @@ -208,11 +214,20 @@ def test_handle_with_invalid_utf8_data(self) -> None: request=(data, sock), client_address=_fixture_client_addr, server=None ) try: + # Mock the processing method + handler._process = MagicMock() # Should silently ignore invalid UTF-8 data handler.handle() # If no exception is raised, the test passes # Verify handler state after processing invalid data self.assertIsNone(handler.server) # Server should remain None self.assertEqual(handler.client_address, _fixture_client_addr) + # Verify no data was processed + handler._process.assert_not_called() + # Test with different invalid UTF-8 sequences + for invalid_data in [b'\xff', b'\xfe\xff', b'\xff\xff\xff']: + handler.request = (invalid_data, sock) + handler.handle() + handler._process.assert_not_called() except Exception as e: self.fail(f"Handler raised an unexpected exception: {e}") finally: diff --git a/tests/test_manifest.py b/tests/test_manifest.py index f970603a..becb92f7 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -72,7 +72,7 @@ def _build_sdist_and_get_members(self): f"{str(sys.executable)} -m coverage run", 'setup.py', 'sdist', - '--formats=gztar' + '--formats=gztar', ] # Build the source distribution theBuildtxt = context.checkPythonCommand(build_arguments, stderr=subprocess.STDOUT) diff --git a/tests/test_recv.py b/tests/test_recv.py new file mode 100644 index 00000000..7620a603 --- /dev/null +++ b/tests/test_recv.py @@ -0,0 +1,171 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Multicast Python Module (Testing) +# .................................. +# Copyright (c) 2025, Mr. Walls +# .................................. +# Licensed under MIT (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# .......................................... +# https://www.github.com/reactive-firewall/multicast/LICENSE.md +# .......................................... +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test module for multicast.recv functionality. + +This module provides test cases for the recv module, focusing on the +McastRECV.doStep method's branching logic for success/failure logging. +""" + + +__module__ = "tests" + + +try: + try: + import context + except Exception as _: # pragma: no branch + del _ # skipcq - cleanup any error vars early + from . import context + if context.__name__ is None: + raise ModuleNotFoundError("[CWE-758] Failed to import context") from None + else: + from context import sys + from context import multicast # pylint: disable=cyclic-import - skipcq: PYL-R0401 + from context import unittest + from unittest import mock + import io +except Exception as err: + raise ImportError("[CWE-758] Failed to import test context") from err + + +@context.markWithMetaTag("extra", "coverage") +class McastRECVTestSuite(context.BasicUsageTestSuite): + """Test cases for McastRECV class doStep method.""" + + __module__ = "tests.test_recv" + + def setUp(self) -> None: + """Set up test fixtures.""" + super(McastRECVTestSuite, self).setUp() + self.recv_tool = multicast.recv.McastRECV() + # Store original stdout for later restoration + self.original_stdout = sys.stdout + + def tearDown(self) -> None: + """Tear down test fixtures.""" + # Restore original stdout + sys.stdout = self.original_stdout + super(McastRECVTestSuite, self).tearDown() + + @mock.patch('multicast.recv.module_logger') + def test_doStep_with_response(self, mock_logger: mock.MagicMock) -> None: + """Test case 1: Test doStep with successful response.""" + # Mock _hearstep to return a non-empty response + with mock.patch.object( + multicast.recv.McastRECV, '_hearstep', return_value='Test response', + ) as mock_hear: + result, response = self.recv_tool.doStep(is_std=False) + # Verify results + self.assertTrue(result) + self.assertEqual(response, 'Test response') + # Verify logger called with success message + mock_logger.info.assert_called_once_with("Success") + # Verify _hearstep was called with expected defaults + mock_hear.assert_called_once() + + @mock.patch('multicast.recv.module_logger') + def test_doStep_with_empty_response(self, mock_logger: mock.MagicMock) -> None: + """Test case 2: Test doStep with empty response.""" + # Mock _hearstep to return an empty response + with mock.patch.object( + multicast.recv.McastRECV, '_hearstep', return_value='', + ) as mock_hear: + result, response = self.recv_tool.doStep(is_std=False) + # Verify results: expect a failure (False) and no response (None) + self.assertFalse(result) + self.assertIsNone(response) + # Verify logger called with nothing received message + mock_logger.debug.assert_any_call("Nothing Received.") + # Verify _hearstep was called with expected defaults + mock_hear.assert_called_once() + + @mock.patch('multicast.recv.module_logger') + def test_doStep_logging_sequence_success(self, mock_logger: mock.MagicMock) -> None: + """Test case 3: Verify logging sequence for successful response.""" + # Mock _hearstep to return a non-empty response + with mock.patch.object( + multicast.recv.McastRECV, '_hearstep', return_value='Test response', + ): + self.recv_tool.doStep(is_std=False) + # Verify initial debug log and success log + mock_logger.debug.assert_any_call("RECV") + mock_logger.info.assert_called_once_with("Success") + # Ensure that "Nothing Received" is not logged + for call in mock_logger.debug.call_args_list: + self.assertNotEqual(call[0][0], "Nothing Received.") + + @mock.patch('multicast.recv.module_logger') + def test_doStep_logging_sequence_empty(self, mock_logger: mock.MagicMock) -> None: + """Test case 4: Verify logging sequence for empty response.""" + # Mock _hearstep to return an empty response + with mock.patch.object( + multicast.recv.McastRECV, '_hearstep', return_value='', + ): + self.recv_tool.doStep(is_std=False) + # Verify initial debug log and nothing received log + mock_logger.debug.assert_any_call("RECV") + mock_logger.debug.assert_any_call("Nothing Received.") + # Verify that no success log was called + mock_logger.info.assert_not_called() + + @mock.patch('multicast.recv.module_logger') + def test_doStep_console_output(self, mock_logger: mock.MagicMock) -> None: + """Test case 5: Test console output when is_std is True with data.""" + # Capture printed output by redirecting stdout + mock_stdout = io.StringIO() + sys.stdout = mock_stdout + # Mock _hearstep to return a non-empty response + with mock.patch.object( + multicast.recv.McastRECV, '_hearstep', return_value='Test response', + ): + self.recv_tool.doStep(is_std=True) + # Verify that the response is printed to console + output = mock_stdout.getvalue() + self.assertIn('Test response', output) + # Verify that the logger recorded the intended debug message + mock_logger.debug.assert_any_call("Will Print to Console.") + + @mock.patch('multicast.recv.module_logger') + def test_doStep_with_custom_parameters(self, mock_logger: mock.MagicMock) -> None: + """Test case 6: Test doStep with custom parameters.""" + # Mock _hearstep to capture and return custom test output + with mock.patch.object( + multicast.recv.McastRECV, '_hearstep', return_value='Custom test', + ) as mock_hear: + custom_group: str = "224.0.0.2" + custom_port: int = self._the_test_port + custom_iface: str = "lo" + self.recv_tool.doStep( + groups=[custom_group], + port=custom_port, + iface=custom_iface, + group=custom_group, + is_std=False, + ) + # Verify _hearstep is called with custom parameters + mock_hear.assert_called_once_with( + [custom_group], custom_port, custom_iface, custom_group, + ) + # Verify that a success log is recorded for the custom parameters + mock_logger.info.assert_called_once_with("Success") + + +if __name__ == '__main__': + unittest.main()