diff --git a/can/io/asc.py b/can/io/asc.py index 07243cd5b..ceffaf5cb 100644 --- a/can/io/asc.py +++ b/can/io/asc.py @@ -7,7 +7,6 @@ """ import logging import re -import time from datetime import datetime from typing import Any, Dict, Final, Generator, List, Optional, TextIO, Union @@ -340,8 +339,7 @@ class ASCWriter(TextIOMessageWriter): "{bit_timing_conf_ext_data:>8}", ] ) - FORMAT_START_OF_FILE_DATE = "%a %b %d %I:%M:%S.%f %p %Y" - FORMAT_DATE = "%a %b %d %I:%M:%S.{} %p %Y" + FORMAT_DATE = "%a %b %d %H:%M:%S.{} %Y" FORMAT_EVENT = "{timestamp: 9.6f} {message}\n" def __init__( @@ -367,12 +365,8 @@ def __init__( self.channel = channel # write start of file header - now = datetime.now().strftime(self.FORMAT_START_OF_FILE_DATE) - # Note: CANoe requires that the microsecond field only have 3 digits - idx = now.index(".") # Find the index in the string of the decimal - # Keep decimal and first three ms digits (4), remove remaining digits - now = now.replace(now[idx + 4 : now[idx:].index(" ") + idx], "") - self.file.write(f"date {now}\n") + start_time = self._format_header_datetime(datetime.now()) + self.file.write(f"date {start_time}\n") self.file.write("base hex timestamps absolute\n") self.file.write("internal events logged\n") @@ -381,6 +375,15 @@ def __init__( self.last_timestamp = 0.0 self.started = 0.0 + def _format_header_datetime(self, dt: datetime) -> str: + # Note: CANoe requires that the microsecond field only have 3 digits + # Since Python strftime only supports microsecond formatters, we must + # manually include the millisecond portion before passing the format + # to strftime + msec = dt.microsecond // 1000 % 1000 + format_w_msec = self.FORMAT_DATE.format(msec) + return dt.strftime(format_w_msec) + def stop(self) -> None: # This is guaranteed to not be None since we raise ValueError in __init__ if not self.file.closed: @@ -400,12 +403,11 @@ def log_event(self, message: str, timestamp: Optional[float] = None) -> None: # this is the case for the very first message: if not self.header_written: - self.last_timestamp = timestamp or 0.0 - self.started = self.last_timestamp - mlsec = repr(self.last_timestamp).split(".")[1][:3] - formatted_date = time.strftime( - self.FORMAT_DATE.format(mlsec), time.localtime(self.last_timestamp) - ) + self.started = self.last_timestamp = timestamp or 0.0 + + start_time = datetime.fromtimestamp(self.last_timestamp) + formatted_date = self._format_header_datetime(start_time) + self.file.write(f"Begin Triggerblock {formatted_date}\n") self.header_written = True self.log_event("Start of measurement") # caution: this is a recursive call! diff --git a/test/data/single_frame_us_locale.asc b/test/data/single_frame_us_locale.asc new file mode 100644 index 000000000..f6bfcc3db --- /dev/null +++ b/test/data/single_frame_us_locale.asc @@ -0,0 +1,7 @@ +date Sat Sep 30 15:06:13.191 2017 +base hex timestamps absolute +internal events logged +Begin Triggerblock Sat Sep 30 15:06:13.191 2017 + 0.000000 Start of measurement + 0.000000 1 123x Rx d 1 68 +End TriggerBlock diff --git a/test/logformats_test.py b/test/logformats_test.py index 8694fefdc..a7e75d7c8 100644 --- a/test/logformats_test.py +++ b/test/logformats_test.py @@ -11,19 +11,22 @@ TODO: correctly set preserves_channel and adds_default_channel """ +import locale import logging import os import tempfile import unittest from abc import ABCMeta, abstractmethod +from contextlib import contextmanager from datetime import datetime from itertools import zip_longest +from pathlib import Path +from unittest.mock import patch from parameterized import parameterized import can from can.io import blf - from .data.example_data import ( TEST_COMMENTS, TEST_MESSAGES_BASE, @@ -42,6 +45,14 @@ asammdf = None +@contextmanager +def override_locale(category: int, locale_str: str) -> None: + prev_locale = locale.getlocale(category) + locale.setlocale(category, locale_str) + yield + locale.setlocale(category, prev_locale) + + class ReaderWriterExtensionTest(unittest.TestCase): def _get_suffix_case_variants(self, suffix): return [ @@ -403,12 +414,16 @@ def _setup_instance(self): adds_default_channel=0, ) + def _get_logfile_location(self, filename: str) -> Path: + my_dir = Path(__file__).parent + return my_dir / "data" / filename + def _read_log_file(self, filename, **kwargs): - logfile = os.path.join(os.path.dirname(__file__), "data", filename) + logfile = self._get_logfile_location(filename) with can.ASCReader(logfile, **kwargs) as reader: return list(reader) - def test_absolute_time(self): + def test_read_absolute_time(self): time_from_file = "Sat Sep 30 10:06:13.191 PM 2017" start_time = datetime.strptime( time_from_file, self.FORMAT_START_OF_FILE_DATE @@ -436,7 +451,7 @@ def test_absolute_time(self): actual = self._read_log_file("test_CanMessage.asc", relative_timestamp=False) self.assertMessagesEqual(actual, expected_messages) - def test_can_message(self): + def test_read_can_message(self): expected_messages = [ can.Message( timestamp=2.5010, @@ -459,7 +474,7 @@ def test_can_message(self): actual = self._read_log_file("test_CanMessage.asc") self.assertMessagesEqual(actual, expected_messages) - def test_can_remote_message(self): + def test_read_can_remote_message(self): expected_messages = [ can.Message( timestamp=2.510001, @@ -488,7 +503,7 @@ def test_can_remote_message(self): actual = self._read_log_file("test_CanRemoteMessage.asc") self.assertMessagesEqual(actual, expected_messages) - def test_can_fd_remote_message(self): + def test_read_can_fd_remote_message(self): expected_messages = [ can.Message( timestamp=30.300981, @@ -504,7 +519,7 @@ def test_can_fd_remote_message(self): actual = self._read_log_file("test_CanFdRemoteMessage.asc") self.assertMessagesEqual(actual, expected_messages) - def test_can_fd_message(self): + def test_read_can_fd_message(self): expected_messages = [ can.Message( timestamp=30.005021, @@ -541,7 +556,7 @@ def test_can_fd_message(self): actual = self._read_log_file("test_CanFdMessage.asc") self.assertMessagesEqual(actual, expected_messages) - def test_can_fd_message_64(self): + def test_read_can_fd_message_64(self): expected_messages = [ can.Message( timestamp=30.506898, @@ -566,7 +581,7 @@ def test_can_fd_message_64(self): actual = self._read_log_file("test_CanFdMessage64.asc") self.assertMessagesEqual(actual, expected_messages) - def test_can_and_canfd_error_frames(self): + def test_read_can_and_canfd_error_frames(self): expected_messages = [ can.Message(timestamp=2.501000, channel=0, is_error_frame=True), can.Message(timestamp=3.501000, channel=0, is_error_frame=True), @@ -582,16 +597,16 @@ def test_can_and_canfd_error_frames(self): actual = self._read_log_file("test_CanErrorFrames.asc") self.assertMessagesEqual(actual, expected_messages) - def test_ignore_comments(self): + def test_read_ignore_comments(self): _msg_list = self._read_log_file("logfile.asc") - def test_no_triggerblock(self): + def test_read_no_triggerblock(self): _msg_list = self._read_log_file("issue_1256.asc") - def test_can_dlc_greater_than_8(self): + def test_read_can_dlc_greater_than_8(self): _msg_list = self._read_log_file("issue_1299.asc") - def test_error_frame_channel(self): + def test_read_error_frame_channel(self): # gh-issue 1578 err_frame = can.Message(is_error_frame=True, channel=4) @@ -611,6 +626,31 @@ def test_error_frame_channel(self): finally: os.unlink(temp_file.name) + def test_write_millisecond_handling(self): + now = datetime( + year=2017, month=9, day=30, hour=15, minute=6, second=13, microsecond=191456 + ) + + # We temporarily set the locale to C to ensure test reproducibility + with override_locale(category=locale.LC_TIME, locale_str="C"): + # We mock datetime.now during ASCWriter __init__ for reproducibility + # Unfortunately, now() is a readonly attribute, so we mock datetime + with patch("can.io.asc.datetime") as mock_datetime: + mock_datetime.now.return_value = now + writer = can.ASCWriter(self.test_file_name) + + msg = can.Message( + timestamp=now.timestamp(), arbitration_id=0x123, data=b"h" + ) + writer.on_message_received(msg) + + writer.stop() + + actual_file = Path(self.test_file_name) + expected_file = self._get_logfile_location("single_frame_us_locale.asc") + + self.assertEqual(expected_file.read_text(), actual_file.read_text()) + class TestBlfFileFormat(ReaderWriterTest): """Tests can.BLFWriter and can.BLFReader.