@@ -67,12 +67,18 @@ void KeyValueMerger::flush() {
67
67
68
68
IndexValuesWithVerPB index_value_pb;
69
69
for (const auto & index_value_with_ver : _index_value_vers) {
70
+ if (_merge_base_level && index_value_with_ver.second == IndexValue (NullIndexValue)) {
71
+ // deleted
72
+ continue ;
73
+ }
70
74
auto * value = index_value_pb.add_values ();
71
75
value->set_version (index_value_with_ver.first );
72
76
value->set_rssid (index_value_with_ver.second .get_rssid ());
73
77
value->set_rowid (index_value_with_ver.second .get_rowid ());
74
78
}
75
- _builder->Add (Slice (_key), Slice (index_value_pb.SerializeAsString ()));
79
+ if (index_value_pb.values_size () > 0 ) {
80
+ _builder->Add (Slice (_key), Slice (index_value_pb.SerializeAsString ()));
81
+ }
76
82
_index_value_vers.clear ();
77
83
}
78
84
@@ -130,7 +136,7 @@ Status LakePersistentIndex::minor_compact() {
130
136
PersistentIndexSstablePB sstable_pb;
131
137
sstable_pb.set_filename (filename);
132
138
sstable_pb.set_filesize (filesize);
133
- sstable_pb.set_version (_immutable_memtable->max_version ());
139
+ sstable_pb.set_max_rss_rowid (_immutable_memtable->max_rss_rowid ());
134
140
auto * block_cache = _tablet_mgr->update_mgr ()->block_cache ();
135
141
if (block_cache == nullptr ) {
136
142
return Status::InternalError (" Block cache is null." );
@@ -260,10 +266,54 @@ Status LakePersistentIndex::replace(size_t n, const Slice* keys, const IndexValu
260
266
return Status::OK ();
261
267
}
262
268
269
+ void LakePersistentIndex::pick_sstables_for_merge (const PersistentIndexSstableMetaPB& sstable_meta,
270
+ std::vector<PersistentIndexSstablePB>* sstables,
271
+ bool * merge_base_level) {
272
+ // There are two levels in persistent index:
273
+ // 1) base level. It contains only one sst file.
274
+ // 2) cumulative level. Sst files that except base level.
275
+ // And there are two kinds of merge:
276
+ // 1) base merge. Merge all sst files.
277
+ // 2) cumulative merge. Only merge cumulative sst files.
278
+ //
279
+ // And we use this strategy to decide whether to use base merge or cumulative merge:
280
+ // 1. When total size of cumulative level sst files reach 1/10 of base level, use base merge.
281
+ // 2. Otherwise, use cumulative merge.
282
+ DCHECK (sstable_meta.sstables_size () > 0 );
283
+ int64_t base_level_bytes = 0 ;
284
+ int64_t cumulative_level_bytes = 0 ;
285
+ std::vector<PersistentIndexSstablePB> cumulative_sstables;
286
+ for (int i = 0 ; i < sstable_meta.sstables_size (); i++) {
287
+ if (i == 0 ) {
288
+ base_level_bytes = sstable_meta.sstables (i).filesize ();
289
+ } else {
290
+ cumulative_level_bytes += sstable_meta.sstables (i).filesize ();
291
+ cumulative_sstables.push_back (sstable_meta.sstables (i));
292
+ }
293
+ }
294
+
295
+ if ((double )base_level_bytes * config::lake_pk_index_cumulative_base_compaction_ratio >
296
+ (double )cumulative_level_bytes) {
297
+ // cumulative merge
298
+ sstables->swap (cumulative_sstables);
299
+ *merge_base_level = false ;
300
+ } else {
301
+ // base merge
302
+ sstables->push_back (sstable_meta.sstables (0 ));
303
+ sstables->insert (sstables->end (), cumulative_sstables.begin (), cumulative_sstables.end ());
304
+ *merge_base_level = true ;
305
+ }
306
+ // Limit max sstable count that can do merge, to avoid cost too much memory.
307
+ const int32_t max_limit = config::lake_pk_index_sst_max_compaction_versions;
308
+ if (sstables->size () > max_limit) {
309
+ sstables->resize (max_limit);
310
+ }
311
+ }
312
+
263
313
Status LakePersistentIndex::prepare_merging_iterator (
264
314
TabletManager* tablet_mgr, const TabletMetadata& metadata, TxnLogPB* txn_log,
265
315
std::vector<std::shared_ptr<PersistentIndexSstable>>* merging_sstables,
266
- std::unique_ptr<sstable::Iterator>* merging_iter_ptr) {
316
+ std::unique_ptr<sstable::Iterator>* merging_iter_ptr, bool * merge_base_level ) {
267
317
sstable::ReadOptions read_options;
268
318
// No need to cache input sst's blocks.
269
319
read_options.fill_cache = false ;
@@ -274,11 +324,16 @@ Status LakePersistentIndex::prepare_merging_iterator(
274
324
}
275
325
});
276
326
277
- auto max_compaction_bytes = config::lake_pk_index_sst_max_compaction_bytes;
278
327
iters.reserve (metadata.sstable_meta ().sstables ().size ());
279
- size_t total_filesize = 0 ;
280
328
std::stringstream ss_debug;
281
- for (const auto & sstable_pb : metadata.sstable_meta ().sstables ()) {
329
+ std::vector<PersistentIndexSstablePB> sstables_to_merge;
330
+ // Pick sstable for merge, decide to use base merge or cumulative merge.
331
+ pick_sstables_for_merge (metadata.sstable_meta (), &sstables_to_merge, merge_base_level);
332
+ if (sstables_to_merge.size () <= 1 ) {
333
+ // no need to do merge
334
+ return Status::OK ();
335
+ }
336
+ for (const auto & sstable_pb : sstables_to_merge) {
282
337
// build sstable from meta, instead of reuse `_sstables`, to keep it thread safe
283
338
ASSIGN_OR_RETURN (auto rf,
284
339
fs::new_random_access_file (tablet_mgr->sst_location (metadata.id (), sstable_pb.filename ())));
@@ -287,14 +342,9 @@ Status LakePersistentIndex::prepare_merging_iterator(
287
342
merging_sstables->push_back (merging_sstable);
288
343
sstable::Iterator* iter = merging_sstable->new_iterator (read_options);
289
344
iters.emplace_back (iter);
290
- total_filesize += sstable_pb.filesize ();
291
345
// add input sstable.
292
346
txn_log->mutable_op_compaction ()->add_input_sstables ()->CopyFrom (merging_sstable->sstable_pb ());
293
347
ss_debug << sstable_pb.filename () << " | " ;
294
- if (total_filesize >= max_compaction_bytes &&
295
- merging_sstables->size () >= config::lake_pk_index_sst_min_compaction_versions) {
296
- break ;
297
- }
298
348
}
299
349
sstable::Options options;
300
350
(*merging_iter_ptr).reset (sstable::NewMergingIterator (options.comparator , &iters[0 ], iters.size ()));
@@ -304,9 +354,9 @@ Status LakePersistentIndex::prepare_merging_iterator(
304
354
return Status::OK ();
305
355
}
306
356
307
- Status LakePersistentIndex::merge_sstables (std::unique_ptr<sstable::Iterator> iter_ptr,
308
- sstable::TableBuilder* builder ) {
309
- auto merger = std::make_unique<KeyValueMerger>(iter_ptr->key ().to_string (), builder);
357
+ Status LakePersistentIndex::merge_sstables (std::unique_ptr<sstable::Iterator> iter_ptr, sstable::TableBuilder* builder,
358
+ bool base_level_merge ) {
359
+ auto merger = std::make_unique<KeyValueMerger>(iter_ptr->key ().to_string (), builder, base_level_merge );
310
360
while (iter_ptr->Valid ()) {
311
361
RETURN_IF_ERROR (merger->merge (iter_ptr->key ().to_string (), iter_ptr->value ().to_string ()));
312
362
iter_ptr->Next ();
@@ -324,8 +374,14 @@ Status LakePersistentIndex::major_compact(TabletManager* tablet_mgr, const Table
324
374
325
375
std::vector<std::shared_ptr<PersistentIndexSstable>> sstable_vec;
326
376
std::unique_ptr<sstable::Iterator> merging_iter_ptr;
377
+ bool merge_base_level = false ;
327
378
// build merge iterator
328
- RETURN_IF_ERROR (prepare_merging_iterator (tablet_mgr, metadata, txn_log, &sstable_vec, &merging_iter_ptr));
379
+ RETURN_IF_ERROR (prepare_merging_iterator (tablet_mgr, metadata, txn_log, &sstable_vec, &merging_iter_ptr,
380
+ &merge_base_level));
381
+ if (merging_iter_ptr == nullptr ) {
382
+ // no need to do merge
383
+ return Status::OK ();
384
+ }
329
385
if (!merging_iter_ptr->Valid ()) {
330
386
return merging_iter_ptr->status ();
331
387
}
@@ -338,7 +394,7 @@ Status LakePersistentIndex::major_compact(TabletManager* tablet_mgr, const Table
338
394
filter_policy.reset (const_cast <sstable::FilterPolicy*>(sstable::NewBloomFilterPolicy (10 )));
339
395
options.filter_policy = filter_policy.get ();
340
396
sstable::TableBuilder builder (options, wf.get ());
341
- RETURN_IF_ERROR (merge_sstables (std::move (merging_iter_ptr), &builder));
397
+ RETURN_IF_ERROR (merge_sstables (std::move (merging_iter_ptr), &builder, merge_base_level ));
342
398
RETURN_IF_ERROR (wf->close ());
343
399
344
400
// record output sstable pb
@@ -354,7 +410,8 @@ Status LakePersistentIndex::apply_opcompaction(const TxnLogPB_OpCompaction& op_c
354
410
355
411
PersistentIndexSstablePB sstable_pb;
356
412
sstable_pb.CopyFrom (op_compaction.output_sstable ());
357
- sstable_pb.set_version (op_compaction.input_sstables (op_compaction.input_sstables ().size () - 1 ).version ());
413
+ sstable_pb.set_max_rss_rowid (
414
+ op_compaction.input_sstables (op_compaction.input_sstables ().size () - 1 ).max_rss_rowid ());
358
415
auto sstable = std::make_unique<PersistentIndexSstable>();
359
416
ASSIGN_OR_RETURN (auto rf, fs::new_random_access_file (_tablet_mgr->sst_location (_tablet_id, sstable_pb.filename ())));
360
417
auto * block_cache = _tablet_mgr->update_mgr ()->block_cache ();
@@ -367,24 +424,31 @@ Status LakePersistentIndex::apply_opcompaction(const TxnLogPB_OpCompaction& op_c
367
424
for (const auto & input_sstable : op_compaction.input_sstables ()) {
368
425
filenames.insert (input_sstable.filename ());
369
426
}
427
+ // Erase merged sstable from sstable list
370
428
_sstables.erase (std::remove_if (_sstables.begin (), _sstables.end (),
371
429
[&](const std::unique_ptr<PersistentIndexSstable>& sstable) {
372
430
return filenames.contains (sstable->sstable_pb ().filename ());
373
431
}),
374
432
_sstables.end ());
375
- _sstables.insert (_sstables.begin (), std::move (sstable));
433
+ // Insert sstable to sstable list by `max_rss_rowid` order.
434
+ auto lower_it = std::lower_bound (
435
+ _sstables.begin (), _sstables.end (), sstable,
436
+ [](const std::unique_ptr<PersistentIndexSstable>& a, const std::unique_ptr<PersistentIndexSstable>& b) {
437
+ return a->sstable_pb ().max_rss_rowid () < b->sstable_pb ().max_rss_rowid ();
438
+ });
439
+ _sstables.insert (lower_it, std::move (sstable));
376
440
return Status::OK ();
377
441
}
378
442
379
443
Status LakePersistentIndex::commit (MetaFileBuilder* builder) {
380
444
PersistentIndexSstableMetaPB sstable_meta;
381
- int64_t last_version = 0 ;
445
+ int64_t last_max_rss_rowid = 0 ;
382
446
for (auto & sstable : _sstables) {
383
- int64_t sstable_version = sstable->sstable_pb ().version ();
384
- if (last_version > sstable_version ) {
385
- return Status::InternalError (" Versions of sstables are not ordered" );
447
+ int64_t max_rss_rowid = sstable->sstable_pb ().max_rss_rowid ();
448
+ if (last_max_rss_rowid > max_rss_rowid ) {
449
+ return Status::InternalError (" sstables are not ordered" );
386
450
}
387
- last_version = sstable_version ;
451
+ last_max_rss_rowid = max_rss_rowid ;
388
452
auto * sstable_pb = sstable_meta.add_sstables ();
389
453
sstable_pb->CopyFrom (sstable->sstable_pb ());
390
454
}
@@ -406,13 +470,9 @@ Status LakePersistentIndex::load_from_lake_tablet(TabletManager* tablet_mgr, con
406
470
_key_size = PrimaryKeyEncoder::get_encoded_fixed_size (pkey_schema);
407
471
408
472
const auto & sstables = metadata->sstable_meta ().sstables ();
409
- int64_t max_sstable_version = sstables.empty () ? 0 : sstables.rbegin ()->version ();
410
- if (max_sstable_version > base_version) {
411
- return Status::OK ();
412
- }
413
- TRACE_COUNTER_INCREMENT (" max_sstable_version" , max_sstable_version);
414
- TRACE_COUNTER_INCREMENT (" new_version" , metadata->version ());
415
-
473
+ // Rebuild persistent index from `rebuild_rss_rowid_point`
474
+ const uint64_t rebuild_rss_rowid_point = sstables.empty () ? 0 : sstables.rbegin ()->max_rss_rowid ();
475
+ const uint32_t rebuild_rss_id = rebuild_rss_rowid_point >> 32 ;
416
476
OlapReaderStatistics stats;
417
477
std::unique_ptr<Column> pk_column;
418
478
if (pk_columns.size () > 1 ) {
@@ -428,18 +488,13 @@ Status LakePersistentIndex::load_from_lake_tablet(TabletManager* tablet_mgr, con
428
488
auto rowsets = Rowset::get_rowsets (tablet_mgr, metadata);
429
489
// Rowset whose version is between max_sstable_version and base_version should be recovered.
430
490
for (auto & rowset : rowsets) {
431
- TRACE_COUNTER_INCREMENT (" total_rowsets" , 1 );
432
- TRACE_COUNTER_INCREMENT (" total_segments" , rowset->num_segments ());
433
- TRACE_COUNTER_INCREMENT (" total_datasize_bytes" , rowset->data_size ());
491
+ TRACE_COUNTER_INCREMENT (" total_segment_cnt" , rowset->num_segments ());
434
492
TRACE_COUNTER_INCREMENT (" total_num_rows" , rowset->num_rows ());
435
- // If it is upgraded from old version of sr, the rowset version will be not set.
436
- // The generated rowset version will be treated as base_version.
437
- int64_t rowset_version = rowset->version () != 0 ? rowset->version () : base_version;
438
- // The data whose version is max_sstable_version in memtable may be not flushed to sstable.
439
- // So rowset whose version is max_sstable_version should also be recovered.
440
- if (rowset_version < max_sstable_version) {
493
+ if (rowset->id () + rowset->num_segments () <= rebuild_rss_id) {
494
+ // All segments under this rowset are not need to rebuild
441
495
continue ;
442
496
}
497
+ const int64_t rowset_version = rowset->version () != 0 ? rowset->version () : base_version;
443
498
auto res = rowset->get_each_segment_iterator_with_delvec (pkey_schema, base_version, builder, &stats);
444
499
if (!res.ok ()) {
445
500
return res.status ();
@@ -451,6 +506,13 @@ Status LakePersistentIndex::load_from_lake_tablet(TabletManager* tablet_mgr, con
451
506
if (itr == nullptr ) {
452
507
continue ;
453
508
}
509
+ if (rowset->id () + i < rebuild_rss_id) {
510
+ // lower than rebuild point, skip
511
+ // Notice: segment id that equal `rebuild_rss_id` can't be skip because
512
+ // there are maybe some rows need to rebuild.
513
+ continue ;
514
+ }
515
+ TRACE_COUNTER_INCREMENT (" rebuild_index_segment_cnt" , 1 );
454
516
while (true ) {
455
517
chunk->reset ();
456
518
rowids.clear ();
@@ -476,6 +538,11 @@ Status LakePersistentIndex::load_from_lake_tablet(TabletManager* tablet_mgr, con
476
538
for (uint32_t i = 0 ; i < pkc->size (); i++) {
477
539
values.emplace_back (base + rowids[i]);
478
540
}
541
+ if (values.back ().get_value () <= rebuild_rss_rowid_point) {
542
+ // lower AND equal than rebuild point, skip
543
+ continue ;
544
+ }
545
+ TRACE_COUNTER_INCREMENT (" rebuild_index_num_rows" , pkc->size ());
479
546
Status st;
480
547
if (pkc->is_binary ()) {
481
548
RETURN_IF_ERROR (insert (pkc->size (), reinterpret_cast <const Slice*>(pkc->raw_data ()),
@@ -495,10 +562,6 @@ Status LakePersistentIndex::load_from_lake_tablet(TabletManager* tablet_mgr, con
495
562
}
496
563
itr->close ();
497
564
}
498
- TRACE_COUNTER_INCREMENT (" loaded_rowsets" , 1 );
499
- TRACE_COUNTER_INCREMENT (" loaded_segments" , rowset->num_segments ());
500
- TRACE_COUNTER_INCREMENT (" loaded_datasize_bytes" , rowset->data_size ());
501
- TRACE_COUNTER_INCREMENT (" loaded_num_rows" , rowset->num_rows ());
502
565
}
503
566
return Status::OK ();
504
567
}
0 commit comments