Skip to content

Conversation

@WillemKauf
Copy link
Contributor

@WillemKauf WillemKauf commented Dec 4, 2025

More stuff plucked from dev...WillemKauf:redpanda:ct_final_squashed.

This PR:

  • Reworks the sink and source objects to a much better state, with clearer logic around rolling, pushing updates to the committer, and finalizing compaction jobs.
  • Implements the compaction_committer as a stateful object, allowing for compaction jobs to be initialized, have newly compacted L1 objects pushed, and finalized once all the log has been compacted.

When a compaction job is initialized in the committer, a background fiber is kicked off which awaits newly pushed L1 objects and uploads them in the background. When the job is finalized, all inflight upload futures are awaited, and a metastore update is made (either through compact_objects(), if all uploads succeeded, or replace_objects(), if not all uploads succeeded).

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.3.x
  • v25.2.x
  • v25.1.x
  • v24.3.x

Release Notes

  • none

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a stateful compaction_committer and significantly refactors the sink and source compaction components for L1 cloud topics. The key changes enable background uploads of L1 objects during compaction and proper finalization of compaction jobs, with fallback handling when uploads fail.

Key Changes

  • Introduces compaction_job_id tracking for compaction jobs with lifecycle management (begin, add objects, finalize)
  • Implements background upload loop that processes L1 objects as they're produced during compaction
  • Adds stateful tracking of dirty ranges, tombstone ranges, and processed extents in both source and sink
  • Provides fallback from compact_objects() to replace_objects() when uploads fail

Reviewed changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/v/storage/tests/batch_generators.h Updates test helper to support decompressing batches for validation
src/v/kafka/server/tests/produce_consume_utils.h Exposes batch_from_kvs helper function
src/v/kafka/server/tests/produce_consume_utils.cc Extracts batch creation into reusable function
src/v/cloud_topics/level_one/compaction/worker.cc Updates worker to construct source/sink with new parameters including dirty ranges and min lag
src/v/cloud_topics/level_one/compaction/tests/reducer_test.cc Adds comprehensive tests for compaction with tombstones and failure scenarios
src/v/cloud_topics/level_one/compaction/tests/BUILD Updates test dependencies for new test scenarios
src/v/cloud_topics/level_one/compaction/source.h Refactors source to track dirty ranges and extents by reference
src/v/cloud_topics/level_one/compaction/source.cc Implements map building per-extent with lag-based filtering
src/v/cloud_topics/level_one/compaction/sink.h Adds state tracking for rolling L1 objects and extent processing
src/v/cloud_topics/level_one/compaction/sink.cc Implements rolling logic and integration with stateful committer
src/v/cloud_topics/level_one/compaction/meta.h Introduces file_and_md_info and compaction_job_id types
src/v/cloud_topics/level_one/compaction/committing_policy.h Adds should_finalize() method to policy interface
src/v/cloud_topics/level_one/compaction/committing_policy.cc Implements finalization policy
src/v/cloud_topics/level_one/compaction/committer.h Redesigns committer as stateful with job tracking
src/v/cloud_topics/level_one/compaction/committer.cc Implements background upload loop and finalization with retry logic
src/v/cloud_topics/level_one/compaction/BUILD Merges source/sink into single build target with updated dependencies

auto key = iobuf_to_string(rec.release_key());
std::optional<ss::sstring> val;
if (rec.has_value()) {
auto val = iobuf_to_string(rec.release_value());
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable val is declared but never used, shadowing the outer scope variable. The value retrieved from the record is not assigned to the outer scope val variable. Remove the type specifier to assign to the existing variable.

Suggested change
auto val = iobuf_to_string(rec.release_value());
val = iobuf_to_string(rec.release_value());

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch mr robot


ss::future<> compaction_committer::cancel_active_jobs() {
static constexpr size_t max_concurrent_removal = 1024;
for (auto& [job, state] : _compaction_jobs) {
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The variable name job is misleading as it represents the job ID (key), not the job state. Consider renaming to job_id for clarity.

Suggested change
for (auto& [job, state] : _compaction_jobs) {
for (auto& [job_id, state] : _compaction_jobs) {

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also valid

@WillemKauf
Copy link
Contributor Author

Force push to make changes suggested by our robot overlords

std::move(output_batches));
}

class ReducerTestFixture : public l1::l1_reader_fixture {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is coming back in a future commit, don't worry!

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Dec 4, 2025

CI test results

test results on build#77300
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
NodesDecommissioningTest test_decommissioning_rebalancing_node {"shutdown_decommissioned": true} integration https://buildkite.com/redpanda/redpanda/builds/77300#019ae6fc-2f58-47e3-9baa-c6be088c5ca5 FLAKY 18/21 upstream reliability is '94.1304347826087'. current run reliability is '85.71428571428571'. drift is 8.41615 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_decommissioning_rebalancing_node
SimpleEndToEndTest test_relaxed_acks {"write_caching": false} integration https://buildkite.com/redpanda/redpanda/builds/77300#019ae6fc-2f5e-4e55-8639-8013f1434052 FLAKY 19/21 upstream reliability is '98.78048780487805'. current run reliability is '90.47619047619048'. drift is 8.3043 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=SimpleEndToEndTest&test_method=test_relaxed_acks
test results on build#77313
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
WriteCachingFailureInjectionE2ETest test_crash_all {"use_transactions": false} integration https://buildkite.com/redpanda/redpanda/builds/77313#019ae7c7-801d-42db-bf60-815581943635 FLAKY 16/21 upstream reliability is '89.2638036809816'. current run reliability is '76.19047619047619'. drift is 13.07333 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=WriteCachingFailureInjectionE2ETest&test_method=test_crash_all

Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't gotten to the compaction committer yet, but will look more later/tomorrow

bool map_is_full{false};
struct return_t {
bool map_is_full;
std::optional<model::offset> max_indexed_offset;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since we're still in the cloud topics namespace, can we return optionalkafka::offset instead and do whatever casting we need closer to where the kafka<->model switch happens? Or does that complicate other things?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels simpler to leave things in map_building_reducer as model::offset, and then cast back when dealing with its return values. Some of the code in maybe_index_record_in_map() is a bit uglier when doing the casting locally.

Comment on lines 185 to 186
auto dirty_range_intervals = intervals_for_dirty_range(
dirty_range, _extents);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this something else. It is very easy to confuse this with _dirty_range_intervals. How about extent_aligned_ranges or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 60 to 61
// Iterator used during `map_building_iteration()` which points into the
// above vector `_dirty_range_intervals`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably worth a comment explaining why we're iterating backwards

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Comment on lines 64 to 67
// The extents of the CTP, as returned from the `metastore`.
metastore::extent_metadata_vec _extents;
metastore::extent_metadata_vec::const_iterator _extents_it;
metastore::extent_metadata_vec::const_iterator _extents_end_it;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you expect that this will always be all of the partition's extents? If so, do you have longer term ideas for making that not the case? This seems like something we might want to change sooner rather than later, and I'm wondering if it's a big lift (and either way, whether it's easier to make that lift now or later)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you expect that this will always be all of the partition's extents

Currently, that is the assumption in the code, yes.

If so, do you have longer term ideas for making that not the case?

If you are referring to the potential issue of not being able to return all the extents in the metastore in one RPC response and instead needing to paginate extent_metadata, I think this will be an easy enough fix with the right abstraction here (something similar to an iterator or a stream, but may from time to time block on an RPC call to get more extent_metadatas from the metastore).

I think this is probably a good thing to tackle sooner rather than later.

Comment on lines 188 to 190
for (auto it = dirty_range_intervals.crbegin();
it != dirty_range_intervals.crend();
++it) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it's worth commenting somewhere that this alignment to extents isn't required for correctness, but serves the purposes of:

  • giving us natural chunks to iterate backwards on, and
  • giving us naturally fine-grained intervals to indicate the presence of tombstones

I think an alternate approach would have been to iterate backwards with some fixed chunk size and it would have been equally correct. That might not be a bad idea either, if we end up having to not rely on depending on so much extent information for scalability reasons

On the flip side, I think it's also worth commenting in the forwards pass that the extent alignment is required as a part of the contract with the metastore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

auto prev_offset = std::max(
kafka::prev_offset(model::offset_cast(b.base_offset())),
kafka::offset{0});
set_last_processed_offset(prev_offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we setting the prev up front, rather setting the last on exit? A comment would be nice if setting the last offset is not correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to set the _last_processed_offset before calling into maybe_roll() in order to get the offset bounds of the built L1 object correct.

I.e, imagine a bunch of batches between [0] [2], [3], [5], [10] have been removed, and the currently active L1 object needs a roll when batch[10] is passed to sink::operator(). Technically that L1 object spans batches [0] -> [9], but we are otherwise unaware of this because they have not been supplied to the sink.

We could also call set_last_processed_offset() after adding the batch, but it doesn't help us for the reason stated above (you'll notice set_last_processed_offset() is usually called before every call to roll() or maybe_roll().)

Comment on lines +48 to +44
// current L1 object composing the range [0,10] would be rolled, and a new
// L1 object would be started for the range [21,30].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other question about whether we should stop after seeing an ineligible extent. It certainly seems simpler (and conceptually easier to reason about) if we ensure the forwards pass is done in order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my other response on Kafka semantics and compaction. I don't think it make senses to potentially block all of compaction due to one segment or extent with an ineligible timestamp.

@WillemKauf
Copy link
Contributor Author

Force push to:

  • Add requested comments
  • Fix order of arguments in vlog statement
  • rename dirty_range_intervals -> extent_aligned_ranges

Comment on lines 151 to 152
// 1. Push currently built L1 object & metadata to the committer.
if (_active_staging_file) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are fundamentally keeping track of two things:

  • the currently pending object
  • the total set of input extents that we have processed

We track these to achieve a few goals:

  • trim our final resulting metadata (cleaned ranges, removed tombstone ranges) to be a subset of the processed input extents
  • make sure the objects that we're building are contiguous when the inputs are contiguous
  • make sure we start new objects when inputs are not contiguous

If these seem correct, I think it's worth documenting these in a comment somewhere, maybe above the members or in the header above the class declaration. They're pretty nuanced, and without this knowledge it's hard to understand why the code is doing what it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With those goals in mind, I'm thinking if we can simplify the amount of state tracked in the sink. WDYT about adding a new struct to encapsulate the in-progress output object?

struct compacted_object {
  staging_file active_staging_file;
  object_builder builder;
  kafka::offset base_offset;
};

std::unique_ptr<compacted_object> inflight_object;
std::optional<kafka::offset> iteration_base_offset; // set only if we're in the middle of iterating
offset_interval_set iterated_extents; // instead of processed extents?

I think _last_processed_offset is mostly there to be able to preserve that the output objects are contiguous when we're iterating. I think that can instead be accomplished by adding an argument to roll, like roll(kafka::offset last_offset), where in the operator() we pass it prev(batch.base_offset).

I think the dance around keeping the next extents confusing, especially given the nuance with the first object. I'm thinking it may make sense to split up process_next_extent_offset_bounds() into prepare_iteration(kafka::offset base_offset) and finish_iteration(kafka::offset last_offset), where:

  • prepare sets iteration_base_offset and initializes the inflight_object if it doesn't exist, and if called with a base offset that doesn't align with the last processed offset, it calls roll(_processed_calls.back().last_offset)
  • the finish calls _processed_calls.insert(extent_base_offset, last_offset), and is called after the consume call finishes

TLDR: I think it isn't too unnatural to have iteration bounds be a part of the sink's interface, and having an in-flight object also feels natural. The details, I could be way off base, so maybe it breaks down somewhere. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. We now have prepare_iteration and finish_iteration calls around the processing of batches for the sink.
This also helps with reducing the amount of book-kept state within sink - _extent_base_offset and _extent_last_offset are gone, and roll() now takes two arguments: roll(object_last_offset, next_base_offset). These are two separate offsets because there is no guarantee that they are contiguous.

Comment on lines 35 to 75
offset_interval_set get_removed_tombstone_ranges(
const offset_interval_set& removable_tombstone_ranges,
const offset_interval_set& processed_extents) {
offset_interval_set removed_tombstone_ranges;
auto stream = removable_tombstone_ranges.make_stream();
while (stream.has_next()) {
auto i = stream.next();
if (processed_extents.covers(i.base_offset, i.last_offset)) {
removed_tombstone_ranges.insert(i.base_offset, i.last_offset);
}
}
return removed_tombstone_ranges;
}

chunked_vector<metastore::compaction_update::cleaned_range>
get_new_cleaned_ranges(
const chunked_vector<metastore::compaction_update::cleaned_range>&
maybe_cleaned_ranges,
const offset_interval_set& processed_extents) {
chunked_vector<metastore::compaction_update::cleaned_range>
new_cleaned_ranges;
new_cleaned_ranges.reserve(maybe_cleaned_ranges.size());
for (const auto& cleaned_range : maybe_cleaned_ranges) {
if (processed_extents.covers(
cleaned_range.base_offset, cleaned_range.last_offset)) {
new_cleaned_ranges.push_back(cleaned_range);
}
}

new_cleaned_ranges.shrink_to_fit();
return new_cleaned_ranges;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please add comments explaining why these are needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added!

ssx::when_all_succeed<res_t>(std::move(inflight_uploads)));
vassert(
!res.failed(),
"expected that these futures wouldn't throw, just propagate an error.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't they throw if the job mutex is broken? What prevents that?

finalize_op op = finalize_op::compact_objects;
if (fut.failed()) {
auto e = fut.get_exception();
auto lvl = ssx::is_shutdown_exception(e) ? ss::log_level::debug
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At what point do we stop if we're shutting down? I'm kind of surprised to see us continue on for shutdown errors

}

ss::future<compaction_job_id>
compaction_committer::begin_compaction_job(model::topic_id_partition tidp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe vassert that _compaction_jobs is empty? (and also comment where to look to verify how that is a strict guarantee)

Don't consider any parts of partially truncated extents in the calculation.
Unfortunately these two objects will have a dependency on each other
that is hard to break.

Compile them into one `cc_library`. This really isn't a big deal since
they will used together nearly always anyways.
* Changes to internal state
* Changes to both `map_building_iteration()`:
  * `map_building_iteration` now chunks reads of dirty ranges in reads
    over the `extent`s that compromise the dirty range.
  * `map_building_reducer` and book-keeping of `new_cleaned_ranges` improved.
    The procedure for determining `new_cleaned_ranges` and whether they have tombstones
    is now done entirely in the `source`. To assist, the return type of
    `map_building_reducer::end_of_stream()` is now a `struct` containing
    a boolean indicating if the map is full (i.e if the extent was fully indexed
    before breaking out of the index loop), the `max_indexed_offset`, and
    a boolean indicating whether the range has tombstones or not.
* Changes to `deduplication_iteration()`:
  * `min.compaction.lag.ms` is now supported, due to use of helper function
    `should_compact_extent()`.
Reworking of the internal state of `sink` and setting up the `initialize()`
function call with the necessary shared state from the `source`.
@WillemKauf WillemKauf force-pushed the ct_next_3 branch 2 times, most recently from c366e23 to f49a380 Compare December 6, 2025 01:53
@WillemKauf WillemKauf marked this pull request as draft December 6, 2025 01:53
@WillemKauf
Copy link
Contributor Author

Converting to draft mode, not because I don't think this PR isn't ready for review, but because I think it will go through a few more iterations and there's no point in wasting money on CI runs.

Implements rolling, setting extent bounds, and determining removed tombstone
ranges and dirty ranges made clean during finalization.
A more descriptive `struct` name for what will ultimately be a type
used quite frequently in the `compaction_committer`.
A UUID type for compaction jobs, to be used for stateful management
of compaction job states in the `compaction_committer`.
Provides interfaces for:

1. Initializing a compaction job, which spins off a background
   fiber which will take new updates and push them to cloud storage
   while building up a `object_metadata_builder` update for the `metastore`.
2. Pushing updates to the `compaction_committer` for a given `compaction_job_id`.
3. Finalizing a `compaction_job_id`, which will force the background fiber
   to finish its uploads, build a `metastore::compaction_map_t` update, and
   finalize a commit to the `metastore`.

A background fiber is kicked off for every `compaction_job_id`, but it is
currently expected that only one compaction job should be in progress per
shard at a time.
Hooks the `sink` up to the committer interfaces in order to:

1. Initialize a compaction job
2. Push L1 objects as they are rolled to the compaction job
3. Finalize a compaction job
This will be useful in more contexts as a standalone function.
In case we are validating compressed batches (to avoid an assertion
failure).
A lot of code had to change, so this test was temporarily removed and
re-added with the updated interfaces.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants