Skip to content

Commit 7d0abd5

Browse files
authored
Merge pull request #28856 from rockwotj/lsm_followup_memtable
lsm: support read-your-own-uncommitted-writes
2 parents 9ae615f + e79ff67 commit 7d0abd5

File tree

15 files changed

+276
-183
lines changed

15 files changed

+276
-183
lines changed

src/v/lsm/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ redpanda_cc_library(
99
"lsm.h",
1010
],
1111
implementation_deps = [
12-
"//src/v/lsm/core/internal:batch",
1312
"//src/v/lsm/core/internal:files",
1413
"//src/v/lsm/core/internal:iterator",
1514
"//src/v/lsm/core/internal:keys",
1615
"//src/v/lsm/db:impl",
16+
"//src/v/lsm/db:memtable",
1717
],
1818
visibility = ["//visibility:public"],
1919
deps = [

src/v/lsm/core/internal/BUILD

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,3 @@ redpanda_cc_library(
9494
"@seastar",
9595
],
9696
)
97-
98-
redpanda_cc_library(
99-
name = "batch",
100-
hdrs = ["batch.h"],
101-
deps = [
102-
":keys",
103-
"//src/v/base",
104-
"//src/v/bytes:iobuf",
105-
"@abseil-cpp//absl/container:btree",
106-
"@seastar",
107-
],
108-
)

src/v/lsm/core/internal/batch.h

Lines changed: 0 additions & 81 deletions
This file was deleted.

src/v/lsm/db/BUILD

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ redpanda_cc_library(
7070
"//src/v/base",
7171
"//src/v/bytes:iobuf",
7272
"//src/v/lsm/core:exceptions",
73-
"//src/v/lsm/core/internal:batch",
7473
"//src/v/lsm/core/internal:files",
7574
"//src/v/lsm/core/internal:iterator",
7675
"//src/v/lsm/core/internal:keys",
@@ -94,7 +93,6 @@ redpanda_cc_library(
9493
"//src/v/base",
9594
"//src/v/bytes:iobuf",
9695
"//src/v/lsm/core:lookup_result",
97-
"//src/v/lsm/core/internal:batch",
9896
"//src/v/lsm/core/internal:iterator",
9997
"//src/v/lsm/core/internal:keys",
10098
"@abseil-cpp//absl/container:btree",

src/v/lsm/db/impl.cc

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,12 @@ ss::future<std::unique_ptr<impl>> impl::open(
103103
co_return db;
104104
}
105105

106-
ss::future<> impl::apply(internal::write_batch batch) {
107-
if (batch.empty()) {
106+
ss::future<> impl::apply(ss::lw_shared_ptr<memtable> batch) {
107+
if (batch->empty()) {
108108
co_return;
109109
}
110110
co_await make_room_for_write();
111-
_mem->apply(std::move(batch));
111+
_mem->merge(std::move(batch));
112112
}
113113

114114
ss::future<> impl::make_room_for_write() {
@@ -186,13 +186,22 @@ ss::future<lookup_result> impl::get(internal::key_view key) {
186186
co_return result;
187187
}
188188

189-
ss::future<std::unique_ptr<internal::iterator>> impl::create_iterator() {
190-
auto iter = co_await create_internal_iterator();
189+
ss::future<std::unique_ptr<internal::iterator>>
190+
impl::create_iterator(ss::optimized_optional<ss::lw_shared_ptr<memtable>> wb) {
191+
auto iter = co_await create_internal_iterator(wb->get());
192+
internal::sequence_number iter_seqno = max_applied_seqno();
193+
if (wb) {
194+
auto wb_seqno = (*wb)->last_seqno().value_or(iter_seqno);
195+
vassert(
196+
wb_seqno > iter_seqno,
197+
"write memtable seqno must be greater than the current "
198+
"max_applied_seqno: {} > {}",
199+
wb_seqno,
200+
iter_seqno);
201+
iter_seqno = wb_seqno;
202+
}
191203
co_return create_db_iterator(
192-
std::move(iter),
193-
max_applied_seqno(),
194-
_opts,
195-
[this](internal::key_view key) {
204+
std::move(iter), iter_seqno, _opts, [this](internal::key_view key) {
196205
return _versions->current()->record_read_sample(key).then(
197206
[this](bool compaction_needed) {
198207
if (compaction_needed) {
@@ -203,8 +212,11 @@ ss::future<std::unique_ptr<internal::iterator>> impl::create_iterator() {
203212
}
204213

205214
ss::future<std::unique_ptr<internal::iterator>>
206-
impl::create_internal_iterator() {
215+
impl::create_internal_iterator(ss::optimized_optional<memtable*> wb) {
207216
chunked_vector<std::unique_ptr<internal::iterator>> list;
217+
if (wb) {
218+
list.push_back((*wb)->create_iterator());
219+
}
208220
list.push_back(_mem->create_iterator());
209221
if (_imm) {
210222
list.push_back((*_imm)->create_iterator());

src/v/lsm/db/impl.h

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
#pragma once
1313

1414
#include "base/seastarx.h"
15-
#include "bytes/iobuf.h"
16-
#include "lsm/core/internal/batch.h"
1715
#include "lsm/core/internal/iterator.h"
1816
#include "lsm/core/internal/keys.h"
1917
#include "lsm/core/internal/options.h"
@@ -54,15 +52,17 @@ class impl {
5452
open(ss::lw_shared_ptr<internal::options>, io::persistence);
5553

5654
// Apply a batch of writes to the database atomically.
57-
ss::future<> apply(internal::write_batch);
55+
ss::future<> apply(ss::lw_shared_ptr<memtable>);
5856

5957
// Get a key from the database
6058
ss::future<lookup_result> get(internal::key_view);
6159

62-
// Create an iterator over the database. Note that this iterator
63-
// results in ALL entries from the database, a deduplicating iterator
64-
// needs to be added on top to give a traditional iterator view.
65-
ss::future<std::unique_ptr<internal::iterator>> create_iterator();
60+
// Create an interator over the database.
61+
//
62+
// If a non-null memtable is passed in, then a frozen state of the memtable
63+
// is applied ontop of the existing database.
64+
ss::future<std::unique_ptr<internal::iterator>>
65+
create_iterator(ss::optimized_optional<ss::lw_shared_ptr<memtable>>);
6666

6767
// Flush any pending state in memtables to disk.
6868
ss::future<> flush();
@@ -74,7 +74,14 @@ class impl {
7474
ss::future<> close();
7575

7676
private:
77-
ss::future<std::unique_ptr<internal::iterator>> create_internal_iterator();
77+
// Create an iterator over the database. Note that this iterator
78+
// results in ALL entries from the database, a deduplicating iterator
79+
// needs to be added on top to give a traditional iterator view.
80+
//
81+
// If a non-null memtable is passed in, then a frozen state of the memtable
82+
// is applied ontop of the existing database.
83+
ss::future<std::unique_ptr<internal::iterator>>
84+
create_internal_iterator(ss::optimized_optional<memtable*>);
7885

7986
ss::future<> recover();
8087

src/v/lsm/db/memtable.cc

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
namespace lsm::db {
2525

26+
using internal::operator""_seqno;
27+
2628
class memtable::iterator : public internal::iterator {
2729
public:
2830
// The dummy iterator is only a place holder for the linked list in the
@@ -140,19 +142,50 @@ class memtable::iterator : public internal::iterator {
140142
_it;
141143
};
142144

143-
void memtable::apply(internal::write_batch batch) {
144-
if (_last_seqno) {
145-
dassert(
146-
_last_seqno < batch.last_seqno(),
147-
"expected new batch seqno to be greater than what is applied: {} < "
148-
"{}",
149-
_last_seqno.value(),
150-
batch.last_seqno());
151-
}
145+
void memtable::put(internal::key key, iobuf value) {
146+
internal::key_view::parts parts = internal::key_view{key}.decode();
147+
vassert(
148+
parts.type == internal::value_type::value,
149+
"when add a put to a memtable, keys much be of value type",
150+
key);
151+
vassert(
152+
parts.seqno >= _last_seqno,
153+
"seqno should only go up: {} >= {}",
154+
parts.seqno);
155+
invalidate_iterators();
156+
_memory_usage += key.memory_usage() + value.memory_usage();
157+
_last_seqno = parts.seqno;
158+
_table.emplace(std::move(key), std::move(value));
159+
}
160+
161+
void memtable::remove(internal::key key) {
162+
internal::key_view::parts parts = internal::key_view{key}.decode();
163+
vassert(
164+
parts.type == internal::value_type::tombstone,
165+
"when add a tombstone to a memtable, keys much be of tombstone type",
166+
key);
167+
vassert(
168+
parts.seqno >= _last_seqno,
169+
"seqno should only go up: {} >= {}",
170+
parts.seqno);
171+
invalidate_iterators();
172+
iobuf value;
173+
_memory_usage += key.memory_usage() + value.memory_usage();
174+
_last_seqno = parts.seqno;
175+
_table.emplace(std::move(key), std::move(value));
176+
}
177+
178+
void memtable::merge(ss::lw_shared_ptr<memtable> other) {
179+
vassert(
180+
_last_seqno < other->last_seqno(),
181+
"expected new batch seqno to be greater than what is applied: {} < "
182+
"{}",
183+
_last_seqno.value_or(0_seqno),
184+
other->last_seqno().value_or(0_seqno));
152185
invalidate_iterators();
153-
_memory_usage += batch.memory_usage();
154-
_last_seqno = batch.last_seqno();
155-
_table.merge(std::move(batch.entries()));
186+
_memory_usage += other->approximate_memory_usage();
187+
_last_seqno = other->last_seqno();
188+
_table.merge(std::move(other->_table));
156189
}
157190

158191
lookup_result memtable::get(internal::key_view key) {

src/v/lsm/db/memtable.h

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
#include "absl/container/btree_map.h"
1515
#include "base/seastarx.h"
1616
#include "bytes/iobuf.h"
17-
#include "lsm/core/internal/batch.h"
1817
#include "lsm/core/internal/iterator.h"
1918
#include "lsm/core/internal/keys.h"
2019
#include "lsm/core/lookup_result.h"
@@ -43,11 +42,24 @@ class memtable : public ss::enable_lw_shared_from_this<memtable> {
4342
memtable& operator=(memtable&&) = delete;
4443
~memtable();
4544

46-
// Add batch of key-value pairs to the memtable.
45+
// Add a key-value pair to the database. Overwrites any previous value.
4746
//
48-
// REQUIRES: the sequence numbers in the write batch are >= to the last
49-
// applied sequence number in this memtable.
50-
void apply(internal::write_batch);
47+
// REQUIRES: key.value_type is value
48+
void put(internal::key key, iobuf value);
49+
50+
// Remove the value for a given key.
51+
//
52+
// REQUIRES: key.value_type is tombstone
53+
void remove(internal::key key);
54+
55+
// Merge the supplied memtable into this memtable.
56+
// The resulting memtable should not be used anymore after passed into
57+
// this function as the data is *taken* from the memtable and applied to
58+
// this memtable.
59+
//
60+
// REQUIRES: the sequence numbers in the supplied memtable are >= to the
61+
// last applied sequence number in this memtable.
62+
void merge(ss::lw_shared_ptr<memtable>);
5163

5264
// Get the value for a given key.
5365
//
@@ -56,7 +68,8 @@ class memtable : public ss::enable_lw_shared_from_this<memtable> {
5668

5769
// Create an iterator for this memtable.
5870
//
59-
// This iterator is safe to use in face of concurrent updates
71+
// This iterator is safe to use in face of concurrent updates, but users can
72+
// see newer values that are added since the iterator is created.
6073
std::unique_ptr<internal::iterator> create_iterator();
6174

6275
// The approximate amount of memory used for this memtable.

src/v/lsm/db/tests/BUILD

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ redpanda_cc_binary(
1010
"//src/v/base",
1111
"//src/v/bytes:iobuf",
1212
"//src/v/lsm/core:compression",
13-
"//src/v/lsm/core/internal:batch",
1413
"//src/v/lsm/core/internal:iterator",
1514
"//src/v/lsm/core/internal:keys",
1615
"//src/v/lsm/core/internal:options",
1716
"//src/v/lsm/db:impl",
17+
"//src/v/lsm/db:memtable",
1818
"//src/v/lsm/io:disk_persistence",
1919
"//src/v/lsm/io:memory_persistence",
2020
"//src/v/random:generators",
@@ -35,7 +35,6 @@ redpanda_cc_gtest(
3535
"//src/v/base",
3636
"//src/v/bytes:iobuf",
3737
"//src/v/lsm/core:lookup_result",
38-
"//src/v/lsm/core/internal:batch",
3938
"//src/v/lsm/core/internal:keys",
4039
"//src/v/lsm/core/internal/tests:iterator_test_harness",
4140
"//src/v/lsm/db:memtable",
@@ -131,7 +130,6 @@ redpanda_cc_gtest(
131130
deps = [
132131
"//src/v/base",
133132
"//src/v/bytes:iobuf",
134-
"//src/v/lsm/core/internal:batch",
135133
"//src/v/lsm/core/internal:iterator",
136134
"//src/v/lsm/core/internal:keys",
137135
"//src/v/lsm/core/internal:options",
@@ -152,10 +150,10 @@ redpanda_cc_gtest(
152150
deps = [
153151
"//src/v/base",
154152
"//src/v/bytes:iobuf",
155-
"//src/v/lsm/core/internal:batch",
156153
"//src/v/lsm/core/internal:keys",
157154
"//src/v/lsm/core/internal:options",
158155
"//src/v/lsm/db:impl",
156+
"//src/v/lsm/db:memtable",
159157
"//src/v/lsm/io:memory_persistence",
160158
"//src/v/lsm/sst:builder",
161159
"//src/v/random:generators",

0 commit comments

Comments
 (0)