Skip to content

Commit d5b6a85

Browse files
committed
Improve catalog file names
1 parent c33822e commit d5b6a85

File tree

2 files changed

+50
-11
lines changed

2 files changed

+50
-11
lines changed

nautilus_trader/persistence/catalog/parquet.py

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@
4242
from nautilus_trader.core.correctness import PyCondition
4343
from nautilus_trader.core.data import Data
4444
from nautilus_trader.core.datetime import dt_to_unix_nanos
45+
from nautilus_trader.core.datetime import maybe_dt_to_unix_nanos
4546
from nautilus_trader.core.datetime import time_object_to_dt
47+
from nautilus_trader.core.datetime import unix_nanos_to_iso8601
4648
from nautilus_trader.core.inspect import is_nautilus_class
4749
from nautilus_trader.core.message import Event
4850
from nautilus_trader.core.nautilus_pyo3 import DataBackendSession
@@ -313,7 +315,8 @@ def _write_chunk(
313315

314316
start = start if start else data[0].ts_init
315317
end = end if end else data[-1].ts_init
316-
parquet_file = f"{directory}/{start}-{end}.parquet"
318+
filename = _timestamps_to_filename(start, end)
319+
parquet_file = f"{directory}/{filename}"
317320
pq.write_table(
318321
table,
319322
where=parquet_file,
@@ -390,13 +393,19 @@ def extend_file_name(
390393

391394
for interval in intervals:
392395
if interval[0] == end + 1:
393-
old_path = os.path.join(directory, f"{interval[0]}-{interval[1]}.parquet")
394-
new_path = os.path.join(directory, f"{start}-{interval[1]}.parquet")
396+
old_path = os.path.join(
397+
directory,
398+
_timestamps_to_filename(interval[0], interval[1]),
399+
)
400+
new_path = os.path.join(directory, _timestamps_to_filename(start, interval[1]))
395401
self.fs.rename(old_path, new_path)
396402
break
397403
elif interval[1] == start - 1:
398-
old_path = os.path.join(directory, f"{interval[0]}-{interval[1]}.parquet")
399-
new_path = os.path.join(directory, f"{interval[0]}-{end}.parquet")
404+
old_path = os.path.join(
405+
directory,
406+
_timestamps_to_filename(interval[0], interval[1]),
407+
)
408+
new_path = os.path.join(directory, _timestamps_to_filename(interval[0], end))
400409
self.fs.rename(old_path, new_path)
401410
break
402411

@@ -482,7 +491,7 @@ def _reset_file_names(self, directory: str) -> None:
482491
if first_ts == -1:
483492
continue
484493

485-
new_filename = f"{first_ts}-{last_ts}.parquet"
494+
new_filename = _timestamps_to_filename(first_ts, last_ts)
486495
new_path = os.path.join(os.path.dirname(file), new_filename)
487496
self.fs.rename(file, new_path)
488497

@@ -609,7 +618,10 @@ def _consolidate_directory(
609618
if ensure_contiguous_files:
610619
assert _are_intervals_contiguous(intervals)
611620

612-
new_file_name = os.path.join(directory, f"{intervals[0][0]}-{intervals[-1][1]}.parquet")
621+
new_file_name = os.path.join(
622+
directory,
623+
_timestamps_to_filename(intervals[0][0], intervals[-1][1]),
624+
)
613625
files_to_consolidate.sort()
614626
self._combine_parquet_files(files_to_consolidate, new_file_name)
615627

@@ -1378,6 +1390,18 @@ def convert_stream_to_data(
13781390
used_catalog.write_data(all_data)
13791391

13801392

1393+
def _timestamps_to_filename(timestamp_1: int, timestamp_2: int) -> str:
1394+
datetime_1 = _iso_timestamp_to_file_timestamp(unix_nanos_to_iso8601(timestamp_1))
1395+
datetime_2 = _iso_timestamp_to_file_timestamp(unix_nanos_to_iso8601(timestamp_2))
1396+
1397+
return f"{datetime_1}_{datetime_2}.parquet"
1398+
1399+
1400+
def _iso_timestamp_to_file_timestamp(iso_timestamp: str) -> str:
1401+
# Assumes format YYYY-MM-DDTHH:MM:SS.nanosecondsZ, "2023-10-26T07:30:50.123456789Z" becomes "2023-10-26T07-30-50-123456789Z"
1402+
return iso_timestamp.replace(":", "-").replace(".", "-")
1403+
1404+
13811405
def _query_intersects_filename(
13821406
filename: str,
13831407
start: pd.Timestamp | None,
@@ -1395,17 +1419,31 @@ def _query_intersects_filename(
13951419

13961420
def _parse_filename_timestamps(filename: str) -> tuple[int, int] | None:
13971421
base_filename = os.path.splitext(os.path.basename(filename))[0]
1398-
match = re.match(r"(\d+)-(\d+)", base_filename)
1422+
match = re.match(r"(.*?)_(.*)", base_filename)
13991423

14001424
if not match:
14011425
return None
14021426

1403-
first_ts = int(match.group(1))
1404-
last_ts = int(match.group(2))
1427+
first_ts = maybe_dt_to_unix_nanos(_file_timestamp_to_iso_timestamp(match.group(1)))
1428+
last_ts = maybe_dt_to_unix_nanos(_file_timestamp_to_iso_timestamp(match.group(2)))
1429+
1430+
if not first_ts or not last_ts:
1431+
return None
14051432

14061433
return (first_ts, last_ts)
14071434

14081435

1436+
def _file_timestamp_to_iso_timestamp(file_timestamp: str) -> str:
1437+
# Assumes format YYYY-MM-DDTHH-MM-SS-nanosecondsZ, "2023-10-26T07-30-50-123456789Z" becomes "2023-10-26T07:30:50.123456789Z"
1438+
date_part, time_part = file_timestamp.split("T")
1439+
time_part = time_part[:-1]
1440+
last_hyphen_idx = time_part.rfind("-")
1441+
time_with_dot_for_nanos = time_part[:last_hyphen_idx] + "." + time_part[last_hyphen_idx + 1 :]
1442+
final_time_part = time_with_dot_for_nanos.replace("-", ":")
1443+
1444+
return f"{date_part}T{final_time_part}Z"
1445+
1446+
14091447
def _min_max_from_parquet_metadata(file_path: str, column_name: str) -> tuple[int, int]:
14101448
parquet_file = pq.ParquetFile(file_path)
14111449
metadata = parquet_file.metadata

tests/unit_tests/data/test_engine.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
from nautilus_trader.model.instruments.base import Instrument
7878
from nautilus_trader.model.objects import Price
7979
from nautilus_trader.model.objects import Quantity
80+
from nautilus_trader.persistence.catalog.parquet import _timestamps_to_filename
8081
from nautilus_trader.portfolio.portfolio import Portfolio
8182
from nautilus_trader.test_kit.mocks.data import MockMarketDataClient
8283
from nautilus_trader.test_kit.mocks.data import setup_catalog
@@ -2521,7 +2522,7 @@ def test_request_bars_when_catalog_and_client_registered(self):
25212522
"data",
25222523
"bar",
25232524
str(bar_type),
2524-
"1711238400000000000-1711324800000000000.parquet",
2525+
_timestamps_to_filename(1711238400000000000, 1711324800000000000),
25252526
)
25262527
other_name = os.path.join(catalog.path, "data", "bar", str(bar_type), "other.parquet")
25272528
os.rename(parquet_file, other_name)

0 commit comments

Comments
 (0)