@@ -22,7 +22,7 @@ DEF SCHAR_MAX = 127
22
22
MODULE_IMPLEMENTATION = " cython"
23
23
24
24
IntervalEvent = typing.Tuple[int , bool , AnyInterval]
25
- IntervalEventWithCount = typing.Tuple[int , bool , AnyInterval, int ]
25
+ IntervalEventOnTrack = typing.Tuple[int , bool , AnyInterval, int ]
26
26
27
27
def intervals_to_events (
28
28
intervals: list[AnyInterval],
@@ -295,10 +295,11 @@ def default_is_same_result(interval_constructor: IntervalConstructor):
295
295
== interval_constructor(0, 0, active_intervals2))
296
296
return is_same_result
297
297
298
- def find_rectangles(all_intervals: list[list[AnyInterval]],
299
- interval_constructor: IntervalConstructor,
300
- is_same_result: SameResult | None = None) \
301
- -> list[AnyInterval]:
298
+ def find_rectangles(
299
+ all_intervals: list[list[AnyInterval]],
300
+ interval_constructor: IntervalConstructor,
301
+ is_same_result: SameResult | None = None,
302
+ ) -> list[AnyInterval]:
302
303
"""
303
304
Low- level engine for interval construction.
304
305
@@ -342,41 +343,76 @@ def find_rectangles(all_intervals: list[list[AnyInterval]],
342
343
(time, event, interval, j)
343
344
for j, intervals in enumerate(all_intervals)
344
345
for interval in intervals
345
- for (time,event) in [(interval.lower, True), (interval.upper, False)]
346
+ for (time, event) in [(interval.lower, True), (interval.upper, False)]
346
347
]
347
-
348
348
event_count = len(events)
349
349
350
350
if event_count == 0:
351
351
return []
352
352
353
353
def compare_events(
354
- event1: IntervalEventWithCount , event2: IntervalEventWithCount
354
+ event1: IntervalEventOnTrack , event2: IntervalEventOnTrack
355
355
) -> int:
356
356
"""
357
357
Sorting comparator to ensure we process events in the correct order:
358
358
- earlier time first
359
359
- if same time and same track, close events before open events
360
360
(so we don' t incorrectly treat a consecutive interval on the same track
361
361
as overlapping).
362
+
363
+ Index of event1 and event2:
364
+ - [0 ]: time of event
365
+ - [1 ]: opening (True ) or closing (False )
366
+ - [2 ]: the interval to which the event belongs
367
+ - [3 ]: track index
362
368
"""
363
- if event1[0] < event2[0]: # event1 is earlier
369
+ if event1[0] < event2[0]: # event1 is earlier
364
370
return -1
365
- elif event2[0] < event1[0]: # event2 is earlier
371
+ elif event2[0] < event1[0]: # event2 is earlier
366
372
return 1
367
- elif event1[3] == event2[3]: # at the same time and on same track,
368
- if event1[2] is event2[2]: # same interval
369
- return -1 if (event1[1] is True) else 1 # sort open events before open events
370
- else: # different intervals
371
- return -1 if (event1[1] is False) else 1 # sort close events before open events
372
- else: # at the same time, but different tracks => any order is fine
373
+ elif event1[3] == event2[3]: # at the same time and on same track,
374
+ if event1[2] == event2[2]: # same interval (we don't check for "is" because they might be different objects, but still represent the same interval)
375
+ return (
376
+ -1 if (event1[1] is True) else 1
377
+ ) # sort open events before open events
378
+ else: # different intervals
379
+ return (
380
+ -1 if (event1[1] is False) else 1
381
+ ) # sort close events before open events
382
+ else: # at the same time, but different tracks => any order is fine
373
383
return 1
374
384
375
385
# Sort events chronologically according to compare_events
376
386
events.sort(key=cmp_to_key(compare_events))
377
387
378
388
active_intervals: list[GeneralizedInterval] = [None] * track_count
379
389
390
+ def finalize_interval(
391
+ interval_start_time: int,
392
+ current_time: int,
393
+ interval_start_state: List[GeneralizedInterval],
394
+ ) -> None:
395
+ """
396
+ Appends a new time slice (interval_start_time - > current_time) to ' result_intervals' ,
397
+ ensuring we don' t create duplicate adjacency boundaries if the previous slice ends
398
+ exactly where the new one starts.
399
+ """
400
+ if len(result_intervals) > 0:
401
+ previous_result = result_intervals[-1]
402
+ if previous_result[1] == interval_start_time:
403
+ # Adjust the previous slice so it doesn't overlap or duplicate
404
+ result_intervals[-1] = (
405
+ previous_result[0],
406
+ previous_result[1] - 1,
407
+ previous_result[2],
408
+ )
409
+
410
+ # Now finalize the current slice
411
+ result_intervals.append(
412
+ (interval_start_time, current_time, interval_start_state)
413
+ )
414
+
415
+
380
416
def process_events_for_point_in_time(
381
417
index: int, point_time: int
382
418
) -> Tuple[int, int, int] | None:
@@ -405,32 +441,41 @@ def find_rectangles(all_intervals: list[list[AnyInterval]],
405
441
# [START_TIME1, 10:59:59] [11:00:00, END_TIME2]
406
442
# have no gap between them and can be considered a single
407
443
# continuous interval [START_TIME1, END_TIME2].
408
- if (point_time == time) or (open_ and (point_time == time - 1)):
444
+
445
+ point_interval_closing = any_open and not open_ and interval.lower == interval.upper == point_time
446
+
447
+ if ((point_time == time) and not point_interval_closing) or (open_ and (point_time == time - 1)):
409
448
if time > high_time:
410
449
high_time = time
411
450
any_open |= open_
412
451
else:
413
452
# As soon as we find an event that’s clearly beyond the cluster at point_time,
414
453
# we break and return
415
- return i, time, high_time if any_open else high_time + 1
454
+ return (
455
+ i,
456
+ time,
457
+ high_time if any_open else high_time + 1,
458
+ )
416
459
417
460
# Opening => set this track’s active interval to the new interval
418
461
# Closing => set it to None
419
462
active_intervals[track] = interval if open_ else None
463
+
464
+ # If we exit the loop fully, we used all events
420
465
return None
421
466
422
- result_intervals = []
423
467
# Step through event "clusters" with a common point in time and
424
468
# emit result intervals with unchanged interval "payload".
425
469
index: int | None = 0
426
- time: int | None = events[0 ][0]
470
+ time: int | None = events[index ][0]
427
471
interval_start_time: int = time
428
472
result_intervals: list[tuple[int, int, List[GeneralizedInterval]]] = []
429
473
430
474
if time is None:
431
475
# No events at all
432
476
return []
433
477
478
+ # process the event at index 0 at the first timepoint
434
479
res = process_events_for_point_in_time(index, time)
435
480
436
481
if res is None:
@@ -440,31 +485,6 @@ def find_rectangles(all_intervals: list[list[AnyInterval]],
440
485
441
486
interval_start_state = active_intervals.copy()
442
487
443
- def finalize_interval(
444
- interval_start_time: int,
445
- current_time: int,
446
- interval_start_state: List[GeneralizedInterval],
447
- ) -> None:
448
- """
449
- Appends a new time slice (interval_start_time - > current_time) to ' result_intervals' ,
450
- ensuring we don' t create duplicate adjacency boundaries if the previous slice ends
451
- exactly where the new one starts.
452
- """
453
- if len(result_intervals) > 0:
454
- previous_result = result_intervals[-1]
455
- if previous_result[1] == interval_start_time:
456
- # Adjust the previous slice so it doesn't overlap or duplicate
457
- result_intervals[-1] = (
458
- previous_result[0],
459
- previous_result[1] - 1,
460
- previous_result[2],
461
- )
462
-
463
- # Now finalize the current slice
464
- result_intervals.append(
465
- (interval_start_time, current_time, interval_start_state)
466
- )
467
-
468
488
# The main loop: step through event clusters
469
489
while True:
470
490
res = process_events_for_point_in_time(index, time)
0 commit comments