Skip to content

Commit e2958c7

Browse files
committed
Update to PR to reduce RAM usage
1 parent bf59725 commit e2958c7

File tree

1 file changed

+149
-132
lines changed

1 file changed

+149
-132
lines changed

can/io/mf4.py

Lines changed: 149 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@
1010
from hashlib import md5
1111
from io import BufferedIOBase, BytesIO
1212
from pathlib import Path
13-
from typing import Any, BinaryIO, Iterator, Optional, Union, cast
13+
from typing import Any, BinaryIO, Generator, Iterable, Optional, Union, cast
1414

15-
import can
1615
from ..message import Message
1716
from ..typechecking import StringPathLike
1817
from ..util import channel2int, len2dlc
@@ -276,129 +275,153 @@ class MF4Reader(BinaryIOMessageReader):
276275

277276
# NOTE: Readout based on the bus logging code from asammdf GUI
278277

279-
def _extract_can_data_frame(self, data: Signal):
280-
num_records = len(data)
281-
names = set(data.samples[0].dtype.names)
282-
required_names = {"CAN_DataFrame.ID", "CAN_DataFrame.DataBytes", "CAN_DataFrame.DataLength"}
283-
if not required_names & names:
284-
raise ValueError("Missing required columns")
285-
286-
column_timestamps = data.timestamps
287-
column_id = data["CAN_DataFrame.ID"].tolist()
288-
column_data = data["CAN_DataFrame.DataBytes"]
289-
column_data_length = data["CAN_DataFrame.DataLength"].tolist()
290-
291-
default_object = can.Message()
278+
class FrameIterator(object):
279+
"""
280+
Iterator helper class for common handling among CAN DataFrames, ErrorFrames and RemoteFrames.
281+
"""
292282

293-
column_channel = data["CAN_DataFrame.BusChannel"].tolist() if "CAN_DataFrame.BusChannel" in names else [default_object.channel for _ in range(num_records)]
294-
column_ide = data["CAN_DataFrame.IDE"].astype(bool).tolist() if "CAN_DataFrame.IDE" in names else [default_object.is_extended_id for _ in range(num_records)]
295-
column_dir = data["CAN_DataFrame.Dir"].astype(bool).tolist() if "CAN_DataFrame.Dir" in names else [default_object.is_rx for _ in range(num_records)]
296-
column_edl = data["CAN_DataFrame.EDL"].astype(bool).tolist() if "CAN_DataFrame.EDL" in names else [default_object.is_fd for _ in range(num_records)]
297-
column_brs = data["CAN_DataFrame.BRS"].astype(bool).tolist() if "CAN_DataFrame.BRS" in names else [default_object.bitrate_switch for _ in range(num_records)]
298-
column_esi = data["CAN_DataFrame.ESI"].astype(bool).tolist() if "CAN_DataFrame.ESI" in names else [default_object.error_state_indicator for _ in range(num_records)]
283+
# Number of records to request for each asammdf call
284+
_chunk_size = 1000
299285

300-
if "CAN_DataFrame.Dir" in names:
301-
for i in range(num_records):
302-
column_dir[i] = not column_dir[i]
286+
def __init__(self, mdf: MDF, group_index: int, start_timestamp: float, name: str):
287+
self._mdf = mdf
288+
self._group_index = group_index
289+
self._start_timestamp = start_timestamp
290+
self._name = name
291+
292+
return
303293

304-
# Transform to python-can Messages
305-
for i in range(num_records):
306-
self._samples.append(
307-
Message(
308-
timestamp=float(column_timestamps[i]) + self._start_timestamp,
309-
arbitration_id=column_id[i] & 0x1FFFFFFF,
310-
is_extended_id=column_ide[i],
311-
channel=column_channel[i],
312-
is_rx=column_dir[i],
313-
is_fd=column_edl[i],
314-
bitrate_switch=column_brs[i],
315-
error_state_indicator=column_esi[i],
316-
data=column_data[i][:column_data_length[i]].tobytes(),
317-
)
294+
def _get_data(self, current_offset: int) -> asammdf.Signal:
295+
# NOTE: asammdf suggests using select instead of get. Select seem to miss converting some channels which
296+
# get does convert as expected.
297+
data_raw = self._mdf.get(
298+
self._name,
299+
self._group_index,
300+
record_offset=current_offset,
301+
record_count=self._chunk_size,
302+
raw=False
318303
)
304+
305+
return data_raw
319306

320-
return
307+
pass
321308

322-
def _extract_can_error_frame(self, data: Signal):
323-
num_records = len(data)
324-
names = set(data.samples[0].dtype.names)
325-
column_timestamps = data.timestamps
326-
default_object = can.Message()
327-
328-
column_id = data["CAN_ErrorFrame.ID"].tolist() if "CAN_ErrorFrame.ID" in names else [default_object.arbitration_id for _ in range(num_records)]
329-
column_data = data["CAN_ErrorFrame.DataBytes"] if "CAN_ErrorFrame.DataBytes" in names else [default_object.data for _ in range(num_records)]
330-
column_data_length = data["CAN_ErrorFrame.DataLength"].tolist() if "CAN_ErrorFrame.DataLength" in names else [default_object.dlc for _ in range(num_records)]
331-
column_channel = data["CAN_ErrorFrame.BusChannel"].tolist() if "CAN_ErrorFrame.BusChannel" in names else [default_object.channel for _ in range(num_records)]
332-
column_ide = data["CAN_ErrorFrame.IDE"].astype(bool).tolist() if "CAN_ErrorFrame.IDE" in names else [default_object.is_extended_id for _ in range(num_records)]
333-
column_dir = data["CAN_ErrorFrame.Dir"].astype(bool).tolist() if "CAN_ErrorFrame.Dir" in names else [default_object.is_rx for _ in range(num_records)]
334-
column_rtr = data["CAN_ErrorFrame.RTR"].astype(bool).tolist() if "CAN_ErrorFrame.RTR" in names else [default_object.is_remote_frame for _ in range(num_records)]
335-
column_edl = data["CAN_ErrorFrame.EDL"].astype(bool).tolist() if "CAN_ErrorFrame.EDL" in names else [default_object.is_fd for _ in range(num_records)]
336-
column_brs = data["CAN_ErrorFrame.BRS"].astype(bool).tolist() if "CAN_ErrorFrame.BRS" in names else [default_object.bitrate_switch for _ in range(num_records)]
337-
column_esi = data["CAN_ErrorFrame.ESI"].astype(bool).tolist() if "CAN_ErrorFrame.ESI" in names else [default_object.error_state_indicator for _ in range(num_records)]
338-
339-
if "CAN_ErrorFrame.Dir" in names:
340-
for i in range(num_records):
341-
column_dir[i] = not column_dir[i]
309+
class CANDataFrameIterator(FrameIterator):
342310

343-
# Transform to python-can Messages
344-
for i in range(num_records):
345-
message = Message(
346-
timestamp=float(column_timestamps[i]) + self._start_timestamp,
347-
arbitration_id=column_id[i] & 0x1FFFFFFF,
348-
is_extended_id=column_ide[i],
349-
is_error_frame=True,
350-
is_remote_frame=column_rtr[i],
351-
channel=column_channel[i],
352-
is_rx=column_dir[i],
353-
is_fd=column_edl[i],
354-
bitrate_switch=column_brs[i],
355-
error_state_indicator=column_esi[i],
356-
dlc=column_data_length[i]
357-
)
311+
def __init__(self, mdf: MDF, group_index: int, start_timestamp: float):
312+
super().__init__(mdf, group_index, start_timestamp, "CAN_DataFrame")
358313

359-
if column_data[i] is not None:
360-
message.data = column_data[i][:column_data_length[i]].tobytes()
314+
return
315+
316+
def __iter__(self) -> Generator[Message, None, None]:
317+
for current_offset in range(0, self._mdf.groups[self._group_index].channel_group.cycles_nr, self._chunk_size):
318+
data = self._get_data(current_offset)
319+
names = data.samples[0].dtype.names
320+
321+
for i in range(len(data)):
322+
data_length = int(data["CAN_DataFrame.DataLength"][i])
323+
324+
kv = {
325+
"timestamp": float(data.timestamps[i]) + self._start_timestamp,
326+
"arbitration_id": int(data["CAN_DataFrame.ID"][i]) & 0x1FFFFFFF,
327+
"data": data["CAN_DataFrame.DataBytes"][i][:data_length].tobytes(),
328+
}
329+
330+
if "CAN_DataFrame.BusChannel" in names:
331+
kv["channel"] = int(data["CAN_DataFrame.BusChannel"][i])
332+
if "CAN_DataFrame.Dir" in names:
333+
kv["is_rx"] = int(data["CAN_DataFrame.Dir"][i]) == 0
334+
if "CAN_DataFrame.IDE" in names:
335+
kv["is_extended_id"] = bool(data["CAN_DataFrame.IDE"][i])
336+
if "CAN_DataFrame.EDL" in names:
337+
kv["is_fd"] = bool(data["CAN_DataFrame.EDL"][i])
338+
if "CAN_DataFrame.BRS" in names:
339+
kv["bitrate_switch"] = bool(data["CAN_DataFrame.BRS"][i])
340+
if "CAN_DataFrame.ESI" in names:
341+
kv["error_state_indicator"] = bool(data["CAN_DataFrame.ESI"][i])
342+
343+
yield Message(**kv)
361344

362-
self._samples.append(message)
345+
return None
363346

364-
return
347+
pass
365348

366-
def _extract_can_remote_frame(self, data: Signal):
367-
num_records = len(data)
368-
names = set(data.samples[0].dtype.names)
369-
required_names = {"CAN_RemoteFrame.ID", "CAN_RemoteFrame.DLC"}
370-
if not required_names & names:
371-
raise ValueError("Missing required columns")
349+
class CANErrorFrameIterator(FrameIterator):
372350

373-
column_timestamps = data.timestamps
374-
column_id = data["CAN_RemoteFrame.ID"].tolist()
375-
column_dlc = data["CAN_RemoteFrame.DataLength"].tolist()
351+
def __init__(self, mdf: MDF, group_index: int, start_timestamp: float):
352+
super().__init__(mdf, group_index, start_timestamp, "CAN_ErrorFrame")
353+
354+
return
376355

377-
default_object = can.Message()
356+
def __iter__(self) -> Generator[Message, None, None]:
357+
for current_offset in range(0, self._mdf.groups[self._group_index].channel_group.cycles_nr, self._chunk_size):
358+
data = self._get_data(current_offset)
359+
names = data.samples[0].dtype.names
360+
361+
for i in range(len(data)):
362+
kv = {
363+
"timestamp": float(data.timestamps[i]) + self._start_timestamp,
364+
"is_error_frame": True,
365+
}
366+
367+
if "CAN_ErrorFrame.BusChannel" in names:
368+
kv["channel"] = int(data["CAN_ErrorFrame.BusChannel"][i])
369+
if "CAN_ErrorFrame.Dir" in names:
370+
kv["is_rx"] = int(data["CAN_ErrorFrame.Dir"][i]) == 0
371+
if "CAN_ErrorFrame.ID" in names:
372+
kv["arbitration_id"] = int(data["CAN_ErrorFrame.ID"][i]) & 0x1FFFFFFF
373+
if "CAN_ErrorFrame.IDE" in names:
374+
kv["is_extended_id"] = bool(data["CAN_ErrorFrame.IDE"][i])
375+
if "CAN_ErrorFrame.EDL" in names:
376+
kv["is_fd"] = bool(data["CAN_ErrorFrame.EDL"][i])
377+
if "CAN_ErrorFrame.BRS" in names:
378+
kv["bitrate_switch"] = bool(data["CAN_ErrorFrame.BRS"][i])
379+
if "CAN_ErrorFrame.ESI" in names:
380+
kv["error_state_indicator"] = bool(data["CAN_ErrorFrame.ESI"][i])
381+
if "CAN_ErrorFrame.RTR" in names:
382+
kv["is_remote_frame"] = bool(data["CAN_ErrorFrame.RTR"][i])
383+
if "CAN_ErrorFrame.DataLength" in names and "CAN_ErrorFrame.DataBytes" in names:
384+
data_length = int(data["CAN_ErrorFrame.DataLength"][i])
385+
kv["data"] = data["CAN_ErrorFrame.DataBytes"][i][:data_length].tobytes()
386+
387+
yield Message(**kv)
388+
389+
return None
378390

379-
column_channel = data["CAN_RemoteFrame.BusChannel"].tolist() if "CAN_RemoteFrame.BusChannel" in names else [default_object.channel for _ in range(num_records)]
380-
column_ide = data["CAN_RemoteFrame.IDE"].astype(bool).tolist() if "CAN_RemoteFrame.IDE" in names else [default_object.is_extended_id for _ in range(num_records)]
381-
column_dir = data["CAN_RemoteFrame.Dir"].astype(bool).tolist() if "CAN_RemoteFrame.Dir" in names else [default_object.is_rx for _ in range(num_records)]
391+
pass
392+
393+
class CANRemoteFrameIterator(FrameIterator):
382394

383-
if "CAN_RemoteFrame.Dir" in names:
384-
for i in range(num_records):
385-
column_dir[i] = not column_dir[i]
395+
def __init__(self, mdf: MDF, group_index: int, start_timestamp: float):
396+
super().__init__(mdf, group_index, start_timestamp, "CAN_RemoteFrame")
397+
398+
return
386399

387-
# Transform to python-can Messages
388-
for i in range(num_records):
389-
self._samples.append(
390-
Message(
391-
timestamp=float(column_timestamps[i]) + self._start_timestamp,
392-
arbitration_id=column_id[i] & 0x1FFFFFFF,
393-
is_extended_id=column_ide[i],
394-
is_remote_frame=True,
395-
channel=column_channel[i],
396-
is_rx=column_dir[i],
397-
dlc=int(column_dlc[i]),
398-
)
399-
)
400+
def __iter__(self) -> Generator[Message, None, None]:
401+
for current_offset in range(0, self._mdf.groups[self._group_index].channel_group.cycles_nr, self._chunk_size):
402+
data = self._get_data(current_offset)
403+
names = data.samples[0].dtype.names
404+
405+
for i in range(len(data)):
406+
kv = {
407+
"timestamp": float(data.timestamps[i]) + self._start_timestamp,
408+
"arbitration_id": int(data["CAN_RemoteFrame.ID"][i]) & 0x1FFFFFFF,
409+
"dlc": int(data["CAN_RemoteFrame.DLC"][i]),
410+
"is_remote_frame": True,
411+
}
412+
413+
if "CAN_RemoteFrame.BusChannel" in names:
414+
kv["channel"] = int(data["CAN_RemoteFrame.BusChannel"][i])
415+
if "CAN_RemoteFrame.Dir" in names:
416+
kv["is_rx"] = int(data["CAN_RemoteFrame.Dir"][i]) == 0
417+
if "CAN_RemoteFrame.IDE" in names:
418+
kv["is_extended_id"] = bool(data["CAN_RemoteFrame.IDE"][i])
419+
420+
yield Message(**kv)
421+
422+
return None
400423

401-
return
424+
pass
402425

403426
def __init__(
404427
self,
@@ -418,17 +441,21 @@ def __init__(
418441

419442
super().__init__(file, mode="rb")
420443

421-
m: MDF4
444+
self._mdf: MDF
422445
if isinstance(file, BufferedIOBase):
423-
m = MDF(BytesIO(file.read()))
446+
self._mdf = MDF(BytesIO(file.read()))
424447
else:
425-
m = MDF(file)
448+
self._mdf = MDF(file)
449+
450+
self._start_timestamp = self._mdf.header.start_time.timestamp()
426451

427-
self._start_timestamp = m.header.start_time.timestamp()
428-
self._samples = []
452+
def __iter__(self) -> Iterable[Message]:
453+
import heapq
429454

430-
# Extract all data to a common list
431-
for i, group in enumerate(m.groups):
455+
# To handle messages split over multiple channel groups, create a single iterator per channel group and merge
456+
# these iterators into a single iterator using heapq.
457+
iterators = []
458+
for group_index, group in enumerate(self._mdf.groups):
432459
channel_group: ChannelGroup = group.channel_group
433460

434461
if not channel_group.flags & FLAG_CG_BUS_EVENT:
@@ -439,7 +466,6 @@ def __init__(
439466
# No data, skip
440467
continue
441468

442-
# Get a handle to the acquisition source
443469
acquisition_source: Optional[Source] = channel_group.acq_source
444470

445471
if acquisition_source is None:
@@ -453,28 +479,19 @@ def __init__(
453479

454480
if acquisition_source.bus_type == Source.BUS_TYPE_CAN:
455481
if "CAN_DataFrame" in channel_names:
456-
# Ensure all required fields are present
457-
data = m.get("CAN_DataFrame", group=i, raw=False)
458-
self._extract_can_data_frame(data)
482+
iterators.append(self.CANDataFrameIterator(self._mdf, group_index, self._start_timestamp))
459483
elif "CAN_ErrorFrame" in channel_names:
460-
data = m.get("CAN_ErrorFrame", group=i, raw=False)
461-
self._extract_can_error_frame(data)
484+
iterators.append(self.CANErrorFrameIterator(self._mdf, group_index, self._start_timestamp))
462485
elif "CAN_RemoteFrame" in channel_names:
463-
data = m.get("CAN_RemoteFrame", group=i, raw=False)
464-
self._extract_can_remote_frame(data)
486+
iterators.append(self.CANRemoteFrameIterator(self._mdf, group_index, self._start_timestamp))
465487
else:
466488
# Unknown bus type, skip
467489
continue
468-
469-
pass
470-
471-
# Ensure the samples are sorted according to timestamp
472-
self._samples.sort(key=lambda x: x.timestamp)
473490

474-
m.close()
475-
476-
def __iter__(self) -> Iterator[Message]:
477-
return iter(self._samples)
491+
# Create merged iterator over all the groups, using the timestamps as comparison key
492+
return heapq.merge(*iterators, key=lambda x: x.timestamp)
478493

479494
def stop(self) -> None:
495+
self._mdf.close()
496+
self._mdf = None
480497
super().stop()

0 commit comments

Comments
 (0)