Skip to content

Commit 9ae615f

Browse files
authored
Merge pull request #28877 from WillemKauf/metastore_replace_objects_non_contiguous
`ct/l1`: allow non-contiguous, extent aligned `replace_objects()` requests in `metastore`
2 parents aed7aa0 + ed1f72e commit 9ae615f

File tree

2 files changed

+273
-50
lines changed

2 files changed

+273
-50
lines changed

src/v/cloud_topics/level_one/metastore/state_update.cc

Lines changed: 109 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,60 @@ std::optional<extent_range> get_range(
4040
return extent_range{base_it, last_it};
4141
}
4242

43+
using contiguous_intervals_by_tidp_t = chunked_hash_map<
44+
model::topic_id_partition,
45+
chunked_vector<offset_interval_set::interval>>;
46+
47+
// Returns a vector of contiguous offset intervals found in
48+
// `sorted_extents_by_tp` or an `stm_update_error` if the offsets of the
49+
// provided extent are invalid. That is, the extents `[[0, 99], [100,199],
50+
// [200,299], [300,399]]` are combined into the single continuous interval
51+
// `[0,399]`, whereas the extents `[[0, 99], [100,199], [250,299], [300,399]]`
52+
// would return two continuous intervals `[[0,199], [250,399]]`. An example of
53+
// an invalid extent input would be `[[0,99], [200, 249], [239, 299]]`.
54+
std::expected<contiguous_intervals_by_tidp_t, stm_update_error>
55+
contiguous_intervals_for_extents(
56+
const new_object::sorted_extents_by_tidp_t& sorted_extents_by_tp) {
57+
contiguous_intervals_by_tidp_t ret;
58+
for (const auto& [tidp, extents] : sorted_extents_by_tp) {
59+
auto& ret_intervals = ret[tidp];
60+
if (extents.empty()) {
61+
continue;
62+
}
63+
auto current_base = extents.begin()->base_offset;
64+
auto current_last = extents.begin()->last_offset;
65+
// Skip the first entry when iterating over `extents`.
66+
for (const auto& extent : extents | std::views::drop(1)) {
67+
auto expected_next = kafka::next_offset(current_last);
68+
if (extent.base_offset == expected_next) {
69+
// A new extent that is contiguous with the current interval.
70+
// Extend the current interval with the extent's last offset.
71+
current_last = extent.last_offset;
72+
} else if (extent.base_offset > expected_next) {
73+
// A new extent that is non-contiguous with the current
74+
// interval. Push back the current interval and start a new
75+
// interval with the extent's offset range.
76+
ret_intervals.push_back(
77+
{.base_offset = current_base, .last_offset = current_last});
78+
current_base = extent.base_offset;
79+
current_last = extent.last_offset;
80+
} else {
81+
return std::unexpected(stm_update_error(
82+
fmt::format(
83+
"Input object breaks partition {} offset ordering: "
84+
"previous: {}, next: {}",
85+
tidp,
86+
current_last,
87+
extent.base_offset)));
88+
}
89+
}
90+
ret_intervals.push_back(
91+
{.base_offset = current_base, .last_offset = current_last});
92+
}
93+
94+
return ret;
95+
}
96+
4397
} // namespace
4498

4599
void new_object::collect_extents_by_tidp(sorted_extents_by_tidp_t* ret) const {
@@ -320,44 +374,35 @@ replace_objects_update::can_apply(const state& state) {
320374
o.collect_extents_by_tidp(&new_extents_by_tp);
321375
}
322376

323-
for (const auto& [tidp, new_prt_extents] : new_extents_by_tp) {
324-
auto req_base = new_prt_extents.begin()->base_offset;
325-
auto req_last = std::prev(new_prt_extents.end())->last_offset;
377+
auto contiguous_intervals_by_tp = contiguous_intervals_for_extents(
378+
new_extents_by_tp);
379+
if (!contiguous_intervals_by_tp.has_value()) {
380+
return std::unexpected(contiguous_intervals_by_tp.error());
381+
}
326382

327-
auto p_state = state.partition_state(tidp);
328-
if (!p_state) {
329-
return std::unexpected(stm_update_error(
330-
fmt::format("Partition {} not tracked by state", tidp)));
331-
}
383+
for (const auto& [tidp, intervals] : contiguous_intervals_by_tp.value()) {
384+
for (const auto& interval : intervals) {
385+
auto req_base = interval.base_offset;
386+
auto req_last = interval.last_offset;
332387

333-
// Check that the new range's offset aligns with existing extents.
334-
const auto& prt = p_state->get();
335-
auto iters = get_range(prt.extents, req_base, req_last);
336-
if (!iters.has_value()) {
337-
return std::unexpected(stm_update_error(
338-
fmt::format(
339-
"Partition {} doesn't contain extents that span exactly [{}, "
340-
"{}]",
341-
tidp,
342-
req_base,
343-
req_last)));
344-
}
388+
auto p_state = state.partition_state(tidp);
389+
if (!p_state) {
390+
return std::unexpected(stm_update_error(
391+
fmt::format("Partition {} not tracked by state", tidp)));
392+
}
345393

346-
// Check that the new range of extents is contiguous, which in turn
347-
// ensures the resulting total set of extents will be contiguous.
348-
const auto& [base_it, last_it] = *iters;
349-
auto expected_next = base_it->base_offset;
350-
for (const auto& new_extent : new_prt_extents) {
351-
if (new_extent.base_offset != expected_next) {
394+
// Check that the new range's offset aligns with existing extents.
395+
const auto& prt = p_state->get();
396+
auto iters = get_range(prt.extents, req_base, req_last);
397+
if (!iters.has_value()) {
352398
return std::unexpected(stm_update_error(
353399
fmt::format(
354-
"Input object breaks partition {} offset ordering: "
355-
"expected next: {}, actual: {}",
400+
"Partition {} doesn't contain extents that span exactly "
401+
"[{}, {}]",
356402
tidp,
357-
expected_next,
358-
new_extent.base_offset)));
403+
req_base,
404+
req_last)));
359405
}
360-
expected_next = kafka::next_offset(new_extent.last_offset);
361406
}
362407
}
363408
if (compaction_updates.empty()) {
@@ -491,27 +536,43 @@ replace_objects_update::apply(state& state) {
491536
.object_size = o.object_size,
492537
});
493538
}
494-
for (const auto& [tidp, new_extents] : new_extents_by_tp) {
539+
540+
auto contiguous_intervals_by_tp
541+
= contiguous_intervals_for_extents(new_extents_by_tp).value();
542+
543+
for (const auto& [tidp, intervals] : contiguous_intervals_by_tp) {
495544
auto& p_state
496545
= state.topic_to_state[tidp.topic_id].pid_to_state[tidp.partition];
497-
auto requested_base = new_extents.begin()->base_offset;
498-
auto requested_last = new_extents.rbegin()->last_offset;
499-
auto iters = get_range(p_state.extents, requested_base, requested_last);
500-
auto [base_it, last_it] = *iters;
501-
auto end_it = std::next(last_it);
502-
for (auto iter = base_it; iter != end_it; ++iter) {
503-
auto& old_extent = *iter;
504-
state.objects[old_extent.oid].removed_data_size += old_extent.len;
505-
vlog(
506-
cd_log.debug,
507-
"Removing extent of {} in {} [{}, {}]",
508-
tidp,
509-
old_extent.oid,
510-
old_extent.base_offset,
511-
old_extent.last_offset);
546+
for (const auto& interval : intervals) {
547+
auto requested_base = interval.base_offset;
548+
auto requested_last = interval.last_offset;
549+
auto iters = get_range(
550+
p_state.extents, requested_base, requested_last);
551+
auto [base_it, last_it] = *iters;
552+
auto end_it = std::next(last_it);
553+
for (auto iter = base_it; iter != end_it; ++iter) {
554+
auto& old_extent = *iter;
555+
state.objects[old_extent.oid].removed_data_size
556+
+= old_extent.len;
557+
vlog(
558+
cd_log.debug,
559+
"Removing extent of {} in {} [{}, {}]",
560+
tidp,
561+
old_extent.oid,
562+
old_extent.base_offset,
563+
old_extent.last_offset);
564+
}
565+
566+
p_state.extents.erase(base_it, end_it);
512567
}
568+
}
569+
570+
for (const auto& [tidp, new_extents] : new_extents_by_tp) {
571+
auto& p_state
572+
= state.topic_to_state[tidp.topic_id].pid_to_state[tidp.partition];
573+
// NOTE: we don't need to update the start or next offsets since we've
574+
// validated that the new extents replace exact ranges.
513575

514-
p_state.extents.erase(base_it, end_it);
515576
for (const auto& e : new_extents) {
516577
p_state.extents.emplace(e);
517578
vlog(
@@ -522,8 +583,6 @@ replace_objects_update::apply(state& state) {
522583
e.base_offset,
523584
e.last_offset);
524585
}
525-
// NOTE: we don't need to update the start or next offsets since we've
526-
// validated that the new extents replace exact ranges.
527586

528587
for (const auto& extent : new_extents) {
529588
state.objects[extent.oid].total_data_size += extent.len;

src/v/cloud_topics/level_one/metastore/tests/state_update_test.cc

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ const object_id oid1 = l1::create_object_id();
2626
const object_id oid2 = l1::create_object_id();
2727
const object_id oid3 = l1::create_object_id();
2828
const object_id oid4 = l1::create_object_id();
29+
const object_id oid5 = l1::create_object_id();
30+
const object_id oid6 = l1::create_object_id();
2931
const std::string_view tidp_a = "deadbeef-aaaa-0000-0000-000000000000/0";
3032
const std::string_view tidp_b = "deadbeef-bbbb-0000-0000-000000000000/0";
3133
const std::string_view tidp_c = "deadbeef-cccc-0000-0000-000000000000/0";
@@ -498,6 +500,168 @@ TEST(StateUpdateTest, TestEmptyReplace) {
498500
testing::StrEq("No objects requested"));
499501
}
500502

503+
TEST(StateUpdateTest, TestReplaceValidNonContiguous) {
504+
auto add = add_objects_builder()
505+
.add(new_obj_builder(oid1, 100, 1100)
506+
.add(tidp_a, 0_o, 99_o, 1999_t, 0, 99)
507+
.build())
508+
.add(new_obj_builder(oid2, 100, 1100)
509+
.add(tidp_a, 100_o, 199_o, 1999_t, 0, 99)
510+
.build())
511+
.add(new_obj_builder(oid3, 100, 1100)
512+
.add(tidp_a, 200_o, 299_o, 1999_t, 0, 99)
513+
.build())
514+
.add_term_start(tidp_a, 0_tm, 0_o)
515+
.build();
516+
state s;
517+
auto add_res = add.apply(s);
518+
ASSERT_TRUE(add_res.has_value());
519+
520+
// Attempt to replace oid1 and oid3 while leaving oid2 in place with a
521+
// non-contiguous update. While the update itself is non-contiguous, the
522+
// individual objects still align with existing extents, and is therefore
523+
// valid.
524+
auto replace = replace_objects_builder()
525+
.add(new_obj_builder(oid4, 100, 1100)
526+
.add(tidp_a, 0_o, 99_o, 1999_t, 0, 99)
527+
.build())
528+
.add(new_obj_builder(oid5, 100, 1100)
529+
.add(tidp_a, 200_o, 299_o, 1999_t, 0, 99)
530+
.build())
531+
.build();
532+
533+
auto replace_res = replace.apply(s);
534+
ASSERT_TRUE(replace_res.has_value());
535+
536+
auto& p = s.partition_state(model::topic_id_partition::from(tidp_a))->get();
537+
ASSERT_EQ(p.extents.size(), 3);
538+
}
539+
540+
TEST(StateUpdateTest, TestReplaceValidNonContiguousSplitExtent) {
541+
auto add = add_objects_builder()
542+
.add(new_obj_builder(oid1, 100, 1100)
543+
.add(tidp_a, 0_o, 99_o, 1999_t, 0, 99)
544+
.build())
545+
.add(new_obj_builder(oid2, 100, 1100)
546+
.add(tidp_a, 100_o, 199_o, 1999_t, 0, 99)
547+
.build())
548+
.add(new_obj_builder(oid3, 100, 1100)
549+
.add(tidp_a, 200_o, 299_o, 1999_t, 0, 99)
550+
.build())
551+
.add_term_start(tidp_a, 0_tm, 0_o)
552+
.build();
553+
state s;
554+
auto add_res = add.apply(s);
555+
ASSERT_TRUE(add_res.has_value());
556+
557+
// Attempt to replace oid1 and oid3 while leaving oid2 in place with a
558+
// non-contiguous update whose objects align with existing extents.
559+
// The update should see oid3 split into two new extents (for a total of 4
560+
// extents).
561+
auto replace = replace_objects_builder()
562+
.add(new_obj_builder(oid4, 100, 1100)
563+
.add(tidp_a, 0_o, 99_o, 1999_t, 0, 99)
564+
.build())
565+
.add(new_obj_builder(oid5, 100, 1100)
566+
.add(tidp_a, 200_o, 249_o, 1999_t, 0, 99)
567+
.build())
568+
.add(new_obj_builder(oid6, 100, 1100)
569+
.add(tidp_a, 250_o, 299_o, 1999_t, 0, 99)
570+
.build())
571+
.build();
572+
573+
auto replace_res = replace.apply(s);
574+
ASSERT_TRUE(replace_res.has_value());
575+
576+
auto& p = s.partition_state(model::topic_id_partition::from(tidp_a))->get();
577+
ASSERT_EQ(p.extents.size(), 4);
578+
}
579+
580+
TEST(StateUpdateTest, TestReplaceInvalidNonContiguousBadOffsets) {
581+
using testing::ElementsAre;
582+
auto add = add_objects_builder()
583+
.add(new_obj_builder(oid1, 100, 1100)
584+
.add(tidp_a, 0_o, 99_o, 1999_t, 0, 99)
585+
.build())
586+
.add(new_obj_builder(oid2, 100, 1100)
587+
.add(tidp_a, 100_o, 199_o, 1999_t, 0, 99)
588+
.build())
589+
.add(new_obj_builder(oid3, 100, 1100)
590+
.add(tidp_a, 200_o, 299_o, 1999_t, 0, 99)
591+
.build())
592+
.add_term_start(tidp_a, 0_tm, 0_o)
593+
.build();
594+
state s;
595+
auto add_res = add.apply(s);
596+
ASSERT_TRUE(add_res.has_value());
597+
598+
// Attempt to replace oid1 and oid3 while leaving oid2 in place with a
599+
// invalid non-contiguous update with bad offsets.
600+
auto replace = replace_objects_builder()
601+
.add(new_obj_builder(oid4, 100, 1100)
602+
.add(tidp_a, 0_o, 99_o, 1999_t, 0, 99)
603+
.build())
604+
.add(new_obj_builder(oid5, 100, 1100)
605+
.add(tidp_a, 200_o, 249_o, 1999_t, 0, 99)
606+
.build())
607+
.add(new_obj_builder(oid6, 100, 1100)
608+
.add(tidp_a, 239_o, 299_o, 1999_t, 0, 99)
609+
.build())
610+
.build();
611+
612+
auto replace_res = replace.apply(s);
613+
ASSERT_FALSE(replace_res.has_value());
614+
EXPECT_THAT(
615+
std::string(replace_res.error()()),
616+
testing::ContainsRegex("breaks partition .+ offset ordering"));
617+
618+
auto& p = s.partition_state(model::topic_id_partition::from(tidp_a))->get();
619+
ASSERT_EQ(p.extents.size(), 3);
620+
}
621+
622+
TEST(StateUpdateTest, TestReplaceInvalidNonContiguousDoesNotSpan) {
623+
using testing::ElementsAre;
624+
auto add = add_objects_builder()
625+
.add(new_obj_builder(oid1, 100, 1100)
626+
.add(tidp_a, 0_o, 99_o, 1999_t, 0, 99)
627+
.build())
628+
.add(new_obj_builder(oid2, 100, 1100)
629+
.add(tidp_a, 100_o, 199_o, 1999_t, 0, 99)
630+
.build())
631+
.add(new_obj_builder(oid3, 100, 1100)
632+
.add(tidp_a, 200_o, 299_o, 1999_t, 0, 99)
633+
.build())
634+
.add_term_start(tidp_a, 0_tm, 0_o)
635+
.build();
636+
state s;
637+
auto add_res = add.apply(s);
638+
ASSERT_TRUE(add_res.has_value());
639+
640+
// Attempt to replace oid1 and oid3 while leaving oid2 in place with a
641+
// invalid non-contiguous update that doesn't exactly span existing extents.
642+
auto replace = replace_objects_builder()
643+
.add(new_obj_builder(oid4, 100, 1100)
644+
.add(tidp_a, 0_o, 99_o, 1999_t, 0, 99)
645+
.build())
646+
.add(new_obj_builder(oid5, 100, 1100)
647+
.add(tidp_a, 200_o, 249_o, 1999_t, 0, 99)
648+
.build())
649+
.add(new_obj_builder(oid6, 100, 1100)
650+
.add(tidp_a, 250_o, 298_o, 1999_t, 0, 99)
651+
.build())
652+
.build();
653+
654+
auto replace_res = replace.apply(s);
655+
ASSERT_FALSE(replace_res.has_value());
656+
EXPECT_THAT(
657+
std::string(replace_res.error()()),
658+
testing::ContainsRegex(
659+
"Partition .+ doesn't contain extents that span exactly"));
660+
661+
auto& p = s.partition_state(model::topic_id_partition::from(tidp_a))->get();
662+
ASSERT_EQ(p.extents.size(), 3);
663+
}
664+
501665
TEST(StateUpdateTest, TestReplaceWithCompaction) {
502666
using testing::ElementsAre;
503667
using range = struct compaction_state_update::cleaned_range;

0 commit comments

Comments
 (0)