Skip to content

Commit 00384c4

Browse files
committed
Close over triggering retention eval
1 parent ab810d6 commit 00384c4

File tree

2 files changed

+109
-83
lines changed

2 files changed

+109
-83
lines changed

src/osiris_log.erl

Lines changed: 107 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -460,8 +460,11 @@
460460
%% Default manifest implementation which lists the configured dir for index
461461
%% files.
462462
-record(manifest,
463-
{dir :: file:filename_all(),
464-
index_fd :: file:io_device() | undefined}).
463+
{name :: osiris:name(),
464+
directory :: file:filename_all(),
465+
index_fd :: file:io_device() | undefined,
466+
retention :: [retention_spec()],
467+
retention_eval_fun :: fun()}).
465468

466469
-opaque state() :: #?MODULE{}.
467470
-type manifest() :: #manifest{}.
@@ -492,20 +495,10 @@ init(Config) ->
492495
-spec init(config(), writer | acceptor) -> state().
493496
init(#{dir := Dir,
494497
name := Name,
495-
epoch := Epoch} = Config,
498+
epoch := Epoch} = Config0,
496499
WriterType) ->
497-
%% scan directory for segments if in write mode
498-
MaxSizeBytes = maps:get(max_segment_size_bytes, Config,
499-
?DEFAULT_MAX_SEGMENT_SIZE_B),
500-
MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks,
501-
?DEFAULT_MAX_SEGMENT_SIZE_C),
502-
Retention = maps:get(retention, Config, []),
503-
FilterSize = maps:get(filter_size, Config, ?DEFAULT_FILTER_SIZE),
504500
?INFO("Stream: ~ts will use ~ts for osiris log data directory",
505501
[Name, Dir]),
506-
?DEBUG_(Name, "max_segment_size_bytes: ~b,
507-
max_segment_size_chunks ~b, retention ~w, filter size ~b",
508-
[MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]),
509502
ok = filelib:ensure_dir(Dir),
510503
case file:make_dir(Dir) of
511504
ok ->
@@ -516,26 +509,40 @@ init(#{dir := Dir,
516509
throw(Err)
517510
end,
518511

519-
Cnt = make_counter(Config),
512+
ManifestMod = application:get_env(osiris, log_manifest, ?MODULE),
513+
{Info, Config, Manifest0} = case Config0 of
514+
#{acceptor_manifest := {I, M}} ->
515+
{I, Config0, M};
516+
_ ->
517+
Config1 = with_defaults(Config0),
518+
ManifestMod:writer_manifest(Config1)
519+
end,
520+
521+
MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks,
522+
?DEFAULT_MAX_SEGMENT_SIZE_C),
523+
#{max_segment_size_bytes := MaxSizeBytes,
524+
retention := Retention,
525+
filter_size := FilterSize,
526+
shared := Shared,
527+
counter := Cnt,
528+
counter_id := CounterId,
529+
tracking_config := TrackingConfig} = Config,
530+
?DEBUG_(Name, "max_segment_size_bytes: ~b,
531+
max_segment_size_chunks ~b, retention ~w, filter size ~b",
532+
[MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]),
520533
%% initialise offset counter to -1 as 0 is the first offset in the log and
521534
%% it hasn't necessarily been written yet, for an empty log the first offset
522535
%% is initialised to 0 however and will be updated after each retention run.
523536
counters:put(Cnt, ?C_OFFSET, -1),
524537
counters:put(Cnt, ?C_SEGMENTS, 0),
525-
Shared = case Config of
526-
#{shared := S} ->
527-
S;
528-
_ ->
529-
osiris_log_shared:new()
530-
end,
531538
Cfg = #cfg{directory = Dir,
532539
name = Name,
533540
max_segment_size_bytes = MaxSizeBytes,
534541
max_segment_size_chunks = MaxSizeChunks,
535-
tracking_config = maps:get(tracking_config, Config, #{}),
542+
tracking_config = TrackingConfig,
536543
retention = Retention,
537544
counter = Cnt,
538-
counter_id = counter_id(Config),
545+
counter_id = CounterId,
539546
shared = Shared,
540547
filter_size = FilterSize},
541548
DefaultNextOffset = case Config of
@@ -546,14 +553,6 @@ init(#{dir := Dir,
546553
0
547554
end,
548555

549-
ManifestMod = application:get_env(osiris, log_manifest, ?MODULE),
550-
{Info, Manifest0} = case Config of
551-
#{acceptor_manifest := {I, M}} ->
552-
acceptor = WriterType,
553-
{I, M};
554-
_ ->
555-
ManifestMod:writer_manifest(Config)
556-
end,
557556
case Info of
558557
#{num_segments := 0} ->
559558
osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1),
@@ -638,16 +637,11 @@ init(#{dir := Dir,
638637
fd = SegFd}
639638
end.
640639

641-
writer_manifest(#{dir := Dir} = Config) ->
642-
ok = filelib:ensure_dir(Dir),
643-
case file:make_dir(Dir) of
644-
ok ->
645-
ok;
646-
{error, eexist} ->
647-
ok;
648-
Err ->
649-
throw(Err)
650-
end,
640+
writer_manifest(#{dir := Dir,
641+
name := Name,
642+
retention := RetentionSpec,
643+
counter := Cnt,
644+
shared := Shared} = Config) ->
651645
ok = maybe_fix_corrupted_files(Config),
652646
Info = case first_and_last_seginfos(Config) of
653647
none ->
@@ -681,8 +675,24 @@ writer_manifest(#{dir := Dir} = Config) ->
681675
active_segment => SegInfo,
682676
segment_offsets => SegmentOffsets}
683677
end,
678+
%% updates first offset and first timestamp
679+
%% after retention has been evaluated
680+
EvalFun = fun ({{FstOff, _}, FstTs, NumSegLeft})
681+
when is_integer(FstOff),
682+
is_integer(FstTs) ->
683+
osiris_log_shared:set_first_chunk_id(Shared, FstOff),
684+
counters:put(Cnt, ?C_FIRST_OFFSET, FstOff),
685+
counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs),
686+
counters:put(Cnt, ?C_SEGMENTS, NumSegLeft);
687+
(_) ->
688+
ok
689+
end,
690+
Manifest = #manifest{name = Name,
691+
directory = Dir,
692+
retention = RetentionSpec,
693+
retention_eval_fun = EvalFun},
684694
%% The segment_opened event will create the index fd.
685-
{Info, #manifest{dir = Dir}}.
695+
{Info, Config, Manifest}.
686696

687697
maybe_fix_corrupted_files([]) ->
688698
ok;
@@ -910,16 +920,16 @@ evaluate_tracking_snapshot(#?MODULE{mode = #write{type = writer}} = State0, Trk0
910920
{State0, Trk0}
911921
end.
912922

913-
% -spec
914923
-spec init_acceptor(range(), list(), config()) ->
915924
state().
916-
init_acceptor(Range, EpochOffsets0, Conf) ->
925+
init_acceptor(Range, EpochOffsets0, Conf0) ->
917926
EpochOffsets =
918927
lists:reverse(
919928
lists:sort(EpochOffsets0)),
920929
ManifestMod = application:get_env(osiris, log_manifest, ?MODULE),
921-
{Info, Manifest} = ManifestMod:acceptor_manifest(Range, EpochOffsets,
922-
Conf),
930+
Conf1 = with_defaults(Conf0),
931+
{Info, Conf, Manifest} = ManifestMod:acceptor_manifest(Range, EpochOffsets,
932+
Conf1),
923933
InitOffset = case Range of
924934
empty -> 0;
925935
{O, _} -> O
@@ -2215,12 +2225,14 @@ format_status(#?MODULE{cfg = #cfg{directory = Dir,
22152225

22162226
-spec update_retention([retention_spec()], state()) -> state().
22172227
update_retention(Retention,
2218-
#?MODULE{cfg = #cfg{name = Name,
2219-
retention = Retention0} = Cfg} = State0)
2228+
#?MODULE{mode = #write{manifest = {ManifestMod, Manifest0}} =
2229+
Write0,
2230+
cfg = #cfg{name = Name}} = State0)
22202231
when is_list(Retention) ->
2221-
?DEBUG_(Name, " from: ~w to ~w", [Retention0, Retention]),
2222-
State = State0#?MODULE{cfg = Cfg#cfg{retention = Retention}},
2223-
trigger_retention_eval(State).
2232+
?DEBUG_(Name, " updating retention to ~w", [Retention]),
2233+
Manifest = ManifestMod:handle_event({retention_updated, Retention},
2234+
Manifest0),
2235+
State0#?MODULE{mode = Write0#write{manifest = {ManifestMod, Manifest}}}.
22242236

22252237
-spec evaluate_retention(file:filename_all(), [retention_spec()]) ->
22262238
{range(), FirstTimestamp :: osiris:timestamp(),
@@ -2518,13 +2530,12 @@ write_chunk(Chunk,
25182530
State) ->
25192531
case max_segment_size_reached(State) of
25202532
true ->
2521-
trigger_retention_eval(
2522-
write_chunk(Chunk,
2523-
ChType,
2524-
Timestamp,
2525-
Epoch,
2526-
NumRecords,
2527-
open_new_segment(State)));
2533+
write_chunk(Chunk,
2534+
ChType,
2535+
Timestamp,
2536+
Epoch,
2537+
NumRecords,
2538+
open_new_segment(State));
25282539
false ->
25292540
NextOffset = Next + NumRecords,
25302541
Size = iolist_size(Chunk),
@@ -2854,6 +2865,24 @@ validate_crc(ChunkId, Crc, IOData) ->
28542865
exit({crc_validation_failure, {chunk_id, ChunkId}})
28552866
end.
28562867

2868+
2869+
-spec with_defaults(config()) -> config().
2870+
with_defaults(Config0) ->
2871+
Shared = case Config0 of
2872+
#{shared := S} ->
2873+
S;
2874+
_ ->
2875+
osiris_log_shared:new()
2876+
end,
2877+
maps:merge(#{max_segment_size_bytes => ?DEFAULT_MAX_SEGMENT_SIZE_B,
2878+
retention => [],
2879+
filter_size => ?DEFAULT_FILTER_SIZE,
2880+
shared => Shared,
2881+
counter => make_counter(Config0),
2882+
counter_id => counter_id(Config0),
2883+
tracking_config => #{}},
2884+
Config0).
2885+
28572886
-spec make_counter(osiris_log:config()) ->
28582887
counters:counters_ref().
28592888
make_counter(#{counter := Counter}) ->
@@ -3093,28 +3122,6 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir,
30933122
{end_of_stream, State}
30943123
end.
30953124

3096-
trigger_retention_eval(#?MODULE{cfg =
3097-
#cfg{name = Name,
3098-
directory = Dir,
3099-
retention = RetentionSpec,
3100-
counter = Cnt,
3101-
shared = Shared}} = State) ->
3102-
3103-
%% updates first offset and first timestamp
3104-
%% after retention has been evaluated
3105-
EvalFun = fun ({{FstOff, _}, FstTs, NumSegLeft})
3106-
when is_integer(FstOff),
3107-
is_integer(FstTs) ->
3108-
osiris_log_shared:set_first_chunk_id(Shared, FstOff),
3109-
counters:put(Cnt, ?C_FIRST_OFFSET, FstOff),
3110-
counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs),
3111-
counters:put(Cnt, ?C_SEGMENTS, NumSegLeft);
3112-
(_) ->
3113-
ok
3114-
end,
3115-
ok = osiris_retention:eval(Name, Dir, RetentionSpec, EvalFun),
3116-
State.
3117-
31183125
next_location(undefined) ->
31193126
{0, ?LOG_HEADER_SIZE};
31203127
next_location(#chunk_info{id = Id,
@@ -3346,13 +3353,22 @@ list_dir(Dir) ->
33463353
[list_to_binary(F) || F <- Files]
33473354
end.
33483355

3349-
handle_event({segment_opened, _OldSegment, NewSegment},
3350-
#manifest{dir = Dir,
3356+
handle_event({segment_opened, OldSegment, NewSegment},
3357+
#manifest{directory = Dir,
33513358
index_fd = Fd0} = Manifest) ->
33523359
_ = close_fd(Fd0),
33533360
IdxFilename = unicode:characters_to_list(
33543361
string:replace(NewSegment, ".segment", ".index",
33553362
trailing)),
3363+
3364+
case OldSegment of
3365+
undefined ->
3366+
%% Skip retention evaluation when opening a stream.
3367+
ok;
3368+
_ ->
3369+
ok = trigger_retention_eval(Manifest)
3370+
end,
3371+
33563372
{ok, Fd} =
33573373
file:open(
33583374
filename:join(Dir, IdxFilename), ?FILE_OPTS_WRITE),
@@ -3379,8 +3395,18 @@ handle_event({chunk_written, #chunk_info{id = Offset,
33793395
Epoch:64/unsigned,
33803396
SegmentFilePos:32/unsigned,
33813397
ChType:8/unsigned>>),
3398+
Manifest;
3399+
handle_event({retention_updated, Retention}, Manifest0) ->
3400+
Manifest = Manifest0#manifest{retention = Retention},
3401+
trigger_retention_eval(Manifest),
33823402
Manifest.
33833403

3404+
trigger_retention_eval(#manifest{name = Name,
3405+
directory = Dir,
3406+
retention = RetentionSpec,
3407+
retention_eval_fun = EvalFun}) ->
3408+
ok = osiris_retention:eval(Name, Dir, RetentionSpec, EvalFun).
3409+
33843410
close_manifest(#manifest{index_fd = Fd}) ->
33853411
_ = close_fd(Fd),
33863412
ok.

src/osiris_log_manifest.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333
-callback acceptor_manifest(osiris_log:range(),
3434
EpochOffsets :: [{osiris:offset(), osiris:epoch()}],
3535
osiris_log:config()) ->
36-
{log_info(), state()}.
36+
{log_info(), osiris_log:config(), state()}.
3737

3838
-callback writer_manifest(osiris_log:config()) ->
39-
{log_info(), state()}.
39+
{log_info(), osiris_log:config(), state()}.
4040

4141
-callback find_data_reader_position(osiris:tail_info(), osiris_log:config()) ->
4242
{ok, osiris:offset(), Pos :: non_neg_integer(),

0 commit comments

Comments
 (0)