Skip to content

Commit e79ff67

Browse files
committed
lsm: add support for read-your-own-writes of uncommitted data
Now that we have unified the memtable and write batch to a single structure, we can provide iteration and `get` semantics over the pending write batch to support use cases where you want to explicitly flush pending operations or provide some kind of transactional support with read-your-own-writes guarentees. For prior art, see the following in RocksDB: https://github.com/facebook/rocksdb/wiki/Write-Batch-With-Index#write-batch-with-index
1 parent e368227 commit e79ff67

File tree

7 files changed

+185
-21
lines changed

7 files changed

+185
-21
lines changed

src/v/lsm/db/impl.cc

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,22 @@ ss::future<lookup_result> impl::get(internal::key_view key) {
186186
co_return result;
187187
}
188188

189-
ss::future<std::unique_ptr<internal::iterator>> impl::create_iterator() {
190-
auto iter = co_await create_internal_iterator();
189+
ss::future<std::unique_ptr<internal::iterator>>
190+
impl::create_iterator(ss::optimized_optional<ss::lw_shared_ptr<memtable>> wb) {
191+
auto iter = co_await create_internal_iterator(wb->get());
192+
internal::sequence_number iter_seqno = max_applied_seqno();
193+
if (wb) {
194+
auto wb_seqno = (*wb)->last_seqno().value_or(iter_seqno);
195+
vassert(
196+
wb_seqno > iter_seqno,
197+
"write memtable seqno must be greater than the current "
198+
"max_applied_seqno: {} > {}",
199+
wb_seqno,
200+
iter_seqno);
201+
iter_seqno = wb_seqno;
202+
}
191203
co_return create_db_iterator(
192-
std::move(iter),
193-
max_applied_seqno(),
194-
_opts,
195-
[this](internal::key_view key) {
204+
std::move(iter), iter_seqno, _opts, [this](internal::key_view key) {
196205
return _versions->current()->record_read_sample(key).then(
197206
[this](bool compaction_needed) {
198207
if (compaction_needed) {
@@ -203,8 +212,11 @@ ss::future<std::unique_ptr<internal::iterator>> impl::create_iterator() {
203212
}
204213

205214
ss::future<std::unique_ptr<internal::iterator>>
206-
impl::create_internal_iterator() {
215+
impl::create_internal_iterator(ss::optimized_optional<memtable*> wb) {
207216
chunked_vector<std::unique_ptr<internal::iterator>> list;
217+
if (wb) {
218+
list.push_back((*wb)->create_iterator());
219+
}
208220
list.push_back(_mem->create_iterator());
209221
if (_imm) {
210222
list.push_back((*_imm)->create_iterator());

src/v/lsm/db/impl.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@ class impl {
5757
// Get a key from the database
5858
ss::future<lookup_result> get(internal::key_view);
5959

60-
// Create an iterator over the database. Note that this iterator
61-
// results in ALL entries from the database, a deduplicating iterator
62-
// needs to be added on top to give a traditional iterator view.
63-
ss::future<std::unique_ptr<internal::iterator>> create_iterator();
60+
// Create an interator over the database.
61+
//
62+
// If a non-null memtable is passed in, then a frozen state of the memtable
63+
// is applied ontop of the existing database.
64+
ss::future<std::unique_ptr<internal::iterator>>
65+
create_iterator(ss::optimized_optional<ss::lw_shared_ptr<memtable>>);
6466

6567
// Flush any pending state in memtables to disk.
6668
ss::future<> flush();
@@ -72,7 +74,14 @@ class impl {
7274
ss::future<> close();
7375

7476
private:
75-
ss::future<std::unique_ptr<internal::iterator>> create_internal_iterator();
77+
// Create an iterator over the database. Note that this iterator
78+
// results in ALL entries from the database, a deduplicating iterator
79+
// needs to be added on top to give a traditional iterator view.
80+
//
81+
// If a non-null memtable is passed in, then a frozen state of the memtable
82+
// is applied ontop of the existing database.
83+
ss::future<std::unique_ptr<internal::iterator>>
84+
create_internal_iterator(ss::optimized_optional<memtable*>);
7685

7786
ss::future<> recover();
7887

src/v/lsm/db/memtable.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ class memtable : public ss::enable_lw_shared_from_this<memtable> {
6868

6969
// Create an iterator for this memtable.
7070
//
71-
// This iterator is safe to use in face of concurrent updates
71+
// This iterator is safe to use in face of concurrent updates, but users can
72+
// see newer values that are added since the iterator is created.
7273
std::unique_ptr<internal::iterator> create_iterator();
7374

7475
// The approximate amount of memory used for this memtable.

src/v/lsm/db/tests/db_bench.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ class benchmark {
475475
stats.bytes_read = 0;
476476

477477
// Perform sequential iteration
478-
auto iter = co_await _db->create_iterator();
478+
auto iter = co_await _db->create_iterator(std::nullopt);
479479
co_await iter->seek_to_first();
480480

481481
size_t count = 0;
@@ -667,7 +667,7 @@ class benchmark {
667667

668668
// Build a set of all keys from iterator
669669
std::map<ss::sstring, iobuf> db_keys;
670-
auto iter = co_await _db->create_iterator();
670+
auto iter = co_await _db->create_iterator(std::nullopt);
671671
co_await iter->seek_to_first();
672672

673673
while (iter->valid()) {

src/v/lsm/db/tests/impl_test.cc

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class ImplTest : public testing::Test {
118118
}
119119

120120
testing::AssertionResult matches_shadow() {
121-
auto iter = _db->create_iterator().get();
121+
auto iter = _db->create_iterator(std::nullopt).get();
122122
auto it = _shadow.begin();
123123
std::vector<std::string> errors;
124124
for (iter->seek_to_first().get(); iter->valid(); iter->next().get()) {
@@ -220,4 +220,83 @@ TEST_F(ImplTest, Randomized) {
220220
}
221221
}
222222

223+
TEST_F(ImplTest, ReadYourOwnWrites) {
224+
// Write some initial data to the database
225+
auto batch1 = ss::make_lw_shared<lsm::db::memtable>();
226+
auto seqno = _db->max_applied_seqno();
227+
auto key1 = lsm::internal::key::encode({
228+
.key = "key1",
229+
.seqno = ++seqno,
230+
});
231+
auto value1 = iobuf::from("value1");
232+
batch1->put(key1, value1.share());
233+
234+
auto key2 = lsm::internal::key::encode({
235+
.key = "key2",
236+
.seqno = ++seqno,
237+
});
238+
auto value2 = iobuf::from("value2");
239+
batch1->put(key2, value2.share());
240+
241+
_db->apply(batch1).get();
242+
243+
// Create a pending write batch with new keys (not yet applied)
244+
auto pending_batch = ss::make_lw_shared<lsm::db::memtable>();
245+
seqno = _db->max_applied_seqno();
246+
247+
auto key3 = lsm::internal::key::encode({
248+
.key = "key3",
249+
.seqno = ++seqno,
250+
});
251+
auto value3 = iobuf::from("value3");
252+
pending_batch->put(key3, value3.share());
253+
254+
// Verify new key is visible through iterator with write batch
255+
auto iter = _db->create_iterator(pending_batch).get();
256+
std::map<ss::sstring, iobuf> seen;
257+
for (iter->seek_to_first().get(); iter->valid(); iter->next().get()) {
258+
seen.insert_or_assign(
259+
ss::sstring(iter->key().user_key()), iter->value());
260+
}
261+
EXPECT_EQ(seen.size(), 3);
262+
EXPECT_EQ(seen["key1"], value1);
263+
EXPECT_EQ(seen["key2"], value2);
264+
EXPECT_EQ(seen["key3"], value3);
265+
266+
// Create another pending write batch that updates an existing key
267+
auto update_batch = ss::make_lw_shared<lsm::db::memtable>();
268+
seqno = _db->max_applied_seqno();
269+
270+
auto key2_updated = lsm::internal::key::encode({
271+
.key = "key2",
272+
.seqno = ++seqno,
273+
});
274+
auto value2_updated = iobuf::from("value2_updated");
275+
update_batch->put(key2_updated, value2_updated.share());
276+
277+
// Verify updated value is visible through iterator with write batch
278+
auto iter2 = _db->create_iterator(update_batch).get();
279+
seen.clear();
280+
for (iter2->seek_to_first().get(); iter2->valid(); iter2->next().get()) {
281+
seen.insert_or_assign(
282+
ss::sstring(iter2->key().user_key()), iter2->value());
283+
}
284+
EXPECT_EQ(seen.size(), 2);
285+
EXPECT_EQ(seen["key1"], value1);
286+
EXPECT_EQ(seen["key2"], value2_updated);
287+
288+
// Apply the update batch and verify all data is now committed
289+
_db->apply(update_batch).get();
290+
291+
auto iter3 = _db->create_iterator(std::nullopt).get();
292+
seen.clear();
293+
for (iter3->seek_to_first().get(); iter3->valid(); iter3->next().get()) {
294+
seen.insert_or_assign(
295+
ss::sstring(iter3->key().user_key()), iter3->value());
296+
}
297+
EXPECT_EQ(seen.size(), 2);
298+
EXPECT_EQ(seen["key1"], value1);
299+
EXPECT_EQ(seen["key2"], value2_updated);
300+
}
301+
223302
} // namespace

src/v/lsm/lsm.cc

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,15 @@ ss::future<std::optional<iobuf>> database::get(std::string_view target) {
105105
}
106106

107107
ss::future<iterator> database::create_iterator() {
108-
auto iter = co_await _impl->create_iterator();
108+
auto iter = co_await _impl->create_iterator(std::nullopt);
109109
co_return iterator(std::move(iter));
110110
}
111111

112-
write_batch::write_batch()
113-
: _batch(ss::make_lw_shared<db::memtable>()) {}
112+
write_batch database::create_write_batch() { return write_batch{_impl.get()}; }
113+
114+
write_batch::write_batch(db::impl* db)
115+
: _batch(ss::make_lw_shared<db::memtable>())
116+
, _db(db) {}
114117
write_batch::write_batch(write_batch&&) noexcept = default;
115118
write_batch& write_batch::operator=(write_batch&&) noexcept = default;
116119
write_batch::~write_batch() noexcept = default;
@@ -133,4 +136,22 @@ void write_batch::remove(std::string_view key, model::offset offset) {
133136
_batch->remove(std::move(k));
134137
}
135138

139+
ss::future<std::optional<iobuf>> write_batch::get(std::string_view target) {
140+
auto key = internal::key::encode({
141+
.key = target,
142+
.seqno = internal::sequence_number::max(),
143+
.type = internal::value_type::value,
144+
});
145+
auto result = _batch->get(key);
146+
if (result.is_missing()) {
147+
result = co_await _db->get(key);
148+
}
149+
co_return std::move(result).take_value();
150+
}
151+
152+
ss::future<iterator> write_batch::create_iterator() {
153+
auto iter = co_await _db->create_iterator(_batch);
154+
co_return iterator(std::move(iter));
155+
}
156+
136157
} // namespace lsm

src/v/lsm/lsm.h

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,32 @@ class database {
5959

6060
// The maximum offset that has been persisted to durable storage.
6161
model::offset max_persisted_offset() const;
62+
6263
// The maximum offset that has been applied to the database (persisted or
6364
// not).
6465
model::offset max_applied_offset() const;
6566

6667
// Apply a batch of data atomically to the database.
68+
//
69+
// Any gets/iteration on the write batch must be finished before calling
70+
// this method.
6771
ss::future<> apply(write_batch);
6872

6973
// Lookup a value in the database
7074
ss::future<std::optional<iobuf>> get(std::string_view key);
7175

7276
// Create an iterator over the database.
77+
//
78+
// This iterator gives snapshot isolation, so writes applied after
79+
// this future completes will not be seen by the iterator.
7380
ss::future<iterator> create_iterator();
7481

82+
// Create a write batch that can be applied to the the database.
83+
//
84+
// This write batch can only be used with this database and it's lifetime is
85+
// tied to this database as well.
86+
write_batch create_write_batch();
87+
7588
private:
7689
std::unique_ptr<db::impl> _impl;
7790
};
@@ -85,9 +98,11 @@ class iterator {
8598
iterator& operator=(const iterator&) = delete;
8699
iterator& operator=(iterator&&) noexcept;
87100
~iterator() noexcept;
101+
88102
// An iterator is either positioned at a key/value pair, or
89103
// not valid. This method returns true iff the iterator is valid.
90104
bool valid() const;
105+
91106
// Position at the first key in the source. The iterator is valid()
92107
// after this call iff the source is not empty.
93108
ss::future<> seek_to_first();
@@ -124,10 +139,14 @@ class iterator {
124139
std::unique_ptr<internal::iterator> _impl;
125140
};
126141

127-
// A batch of data that can be applied to the database.
142+
// A batch of data that can be applied atomically to the database.
143+
//
144+
// In addition to being a staging ground for write operations,
145+
// the write batch also supports reading the staged data as a view over the
146+
// existing database so that you can Read-Your-Own-Writes before data is
147+
// committed to the LSM tree.
128148
class write_batch {
129149
public:
130-
write_batch();
131150
write_batch(const write_batch&) = delete;
132151
write_batch(write_batch&&) noexcept;
133152
write_batch& operator=(const write_batch&) = delete;
@@ -144,9 +163,32 @@ class write_batch {
144163
// REQUIRES: offsets must be monotonically increasing as added to the batch.
145164
void remove(std::string_view key, model::offset);
146165

166+
// Lookup a value in the database as if the current write batch was applied.
167+
//
168+
// The returned future must finish resolving before the write_batch is
169+
// applied to the database.
170+
ss::future<std::optional<iobuf>> get(std::string_view key);
171+
172+
// Create an iterator over the database as if the current write batch was
173+
// applied.
174+
//
175+
// This iterator gives snapshot isolation, so writes applied after
176+
// this future completes will not be seen by the iterator as long as
177+
// these writes do not have the same offset.
178+
//
179+
// The resulting iterator is safe to use concurrently with additional writes
180+
// being applied to the batch.
181+
//
182+
// The returned iterator must not be used after the write_batch is applied
183+
// to the database.
184+
ss::future<iterator> create_iterator();
185+
147186
private:
187+
explicit write_batch(db::impl*);
188+
148189
friend class database;
149190
ss::lw_shared_ptr<db::memtable> _batch;
191+
db::impl* _db;
150192
};
151193

152194
} // namespace lsm

0 commit comments

Comments
 (0)