Skip to content

Commit f23f4c0

Browse files
committed
go: store/datas/pull: pull_chunk_tracker.go: Optimize memory use when backing up to an AWS remote.
PullChunkTracker is responsible for making the HasMany calls against the destination and batching up absent hashes into HashSets which will be delivered to GetManyCompressed and eventually written into table files which are uploaded. This code is used for both pull and pull, when destination is "local" database or when destination is the remote database respectively. It is used when the remote is both doltremoteapi, thus every HasMany call is an RPC, and when the remote is something like file:// or aws://, thus the table file indexes for the remote are in memory and HasMany calls are very quick.o Different operational characteristics of the various dependencies mean that sometimes a Pull is prone to build up large sets of pending HasMany hashes, whereas other times it is prone to build up large sets of absent hashes which are waiting for the fetcher thread(s) to take them. Previously, PullChunkTracker was structured to accumulate HasMany responses and wait to batch them into appropriately-sized batches for GetManyCompressed until the fetcher threads asked for them. This meant that if HasMany batches were very small, because HasMany was very fast, we would accumulate are large number of very small HashSets. These HashSets would take up large amounts of memory. Accumulating the batches as the HasMany responses come in is more memory efficient and should be no slower - we will always accumulate the full batches, and in basically the same order. Tested by pushing a large database to an AWS remote and memory profiling the result.
1 parent a4087f5 commit f23f4c0

File tree

2 files changed

+95
-30
lines changed

2 files changed

+95
-30
lines changed

go/store/datas/pull/pull_chunk_fetcher_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,38 @@ var getManyerErr = fmt.Errorf("always return an error")
183183
func (errorGetManyer) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, nbs.ToChunker)) error {
184184
return getManyerErr
185185
}
186+
187+
func TestPop(t *testing.T) {
188+
var backing [16]*int
189+
for i := range 16 {
190+
backing[i] = new(int)
191+
*backing[i] = i
192+
}
193+
s := backing[:]
194+
for i := range 16 {
195+
assert.Len(t, s, 16-i)
196+
var p *int
197+
p, s = pop(s)
198+
assert.Len(t, s, 16-i-1)
199+
assert.Equal(t, i, *p)
200+
assert.Nil(t, backing[16-i-1], "i is %d", i)
201+
}
202+
}
203+
204+
func TestAppendAbsent(t *testing.T) {
205+
var absent []hash.HashSet
206+
var hashes [16]hash.Hash
207+
for i := range 16 {
208+
hashes[i][0] = byte(i)
209+
}
210+
// Initial set is the full batch.
211+
absent = appendAbsent(absent, hash.NewHashSet(hashes[:]...), 4)
212+
assert.Len(t, absent, 1)
213+
assert.Len(t, absent[0], 16)
214+
// Next set get batched up.
215+
absent = appendAbsent(absent, hash.NewHashSet(hashes[:]...), 4)
216+
assert.Len(t, absent, 5)
217+
for i := range 4 {
218+
assert.Len(t, absent[i+1], 4)
219+
}
220+
}

go/store/datas/pull/pull_chunk_tracker.go

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (t *PullChunkTracker) reqRespThread(ctx context.Context, initial hash.HashS
131131

132132
eg, ctx := errgroup.WithContext(ctx)
133133

134-
for i := 0; i < hasManyThreadCount; i++ {
134+
for range hasManyThreadCount {
135135
eg.Go(func() error {
136136
return hasManyThread(ctx, t.cfg.HasManyer, hasManyReqCh, hasManyRespCh, doneCh)
137137
})
@@ -169,61 +169,57 @@ func (t *PullChunkTracker) reqRespThread(ctx context.Context, initial hash.HashS
169169

170170
select {
171171
case h, ok := <-t.uncheckedCh:
172+
// uncheckedCh closing means the PullChunkTracker was closed.
172173
if !ok {
173174
return nil
174175
}
176+
// |h| is a hash we need to check against HasMany on the destination.
177+
// It could become a hash we fetch from source and add to the destination
178+
// or it could already be present.
175179
if len(unchecked) == 0 || len(unchecked[len(unchecked)-1]) >= t.cfg.BatchSize {
176180
outstanding += 1
177181
unchecked = append(unchecked, make(hash.HashSet))
178182
}
179183
unchecked[len(unchecked)-1].Insert(h)
180184
case resp := <-hasManyRespCh:
185+
// A hasMany response came back.
181186
outstanding -= 1
182187
if resp.err != nil {
183188
err = errors.Join(err, resp.err)
184-
} else if len(resp.hs) > 0 {
185-
absent = append(absent, resp.hs)
186189
}
190+
// Add all the resp.hs hashes, those which are not already present
191+
// in dest, to our batches of absent hashes we will return through
192+
// GetChunksToFetch.
193+
absent = appendAbsent(absent, resp.hs, t.cfg.BatchSize)
187194
case thisHasManyReqCh <- hasManyReq:
188-
copy(unchecked[:], unchecked[1:])
189-
if len(unchecked) > 1 {
190-
unchecked[len(unchecked)-1] = nil
191-
}
192-
unchecked = unchecked[:len(unchecked)-1]
195+
// We delivered a hasMany request to a hasManyThread.
196+
// Remove it here. We do not need to update |outstanding|, since
197+
// it was updated when we created the new batch in |unchecked|.
198+
_, unchecked = pop(unchecked)
193199
case <-t.processedCh:
200+
// TickProcessed helps us keep track of chunks which we returned from GetChunksToFetch
201+
// and which are still being processed by WalkAddrs. This only gets called after all
202+
// unchecked hashes in the chunks have been delivered to us.
194203
unprocessed -= 1
195204
case req := <-thisReqCh:
205+
// A request for GetChunksToFetch.
196206
if err != nil {
207+
// Deliver an error we experienced. HasMany can error, and this is where
208+
// a client sees it, for example.
197209
req.err = err
198210
close(req.ready)
199211
err = nil
200212
} else if len(absent) == 0 {
213+
// We have no more chunks to deliver. If len(absent) == 0 but we had
214+
// unprocessed stuff, we would have had a |nil| |thisReqCh|. The fact that
215+
// we accepted the request means we are ready to tell the client that there
216+
// are no more chunks to fetch.
201217
req.ok = false
202218
close(req.ready)
203219
} else {
220+
// |absent[0]| is as full a batch as we have.
204221
req.ok = true
205-
req.hs = absent[0]
206-
var i int
207-
for i = 1; i < len(absent); i++ {
208-
l := len(absent[i])
209-
if len(req.hs)+l < t.cfg.BatchSize {
210-
req.hs.InsertAll(absent[i])
211-
} else {
212-
for h := range absent[i] {
213-
if len(req.hs) >= t.cfg.BatchSize {
214-
break
215-
}
216-
req.hs.Insert(h)
217-
absent[i].Remove(h)
218-
}
219-
break
220-
}
221-
}
222-
copy(absent[:], absent[i:])
223-
for j := len(absent) - i; j < len(absent); j++ {
224-
absent[j] = nil
225-
}
226-
absent = absent[:len(absent)-i]
222+
req.hs, absent = pop(absent)
227223
unprocessed += len(req.hs)
228224
close(req.ready)
229225
}
@@ -236,6 +232,40 @@ func (t *PullChunkTracker) reqRespThread(ctx context.Context, initial hash.HashS
236232
return eg.Wait()
237233
}
238234

235+
// Pop returns the first element of s and the remaining elements of
236+
// s. It copies any suffix to the front of |s| and nils the last
237+
// element of |s| so that memory doesn't leak through |s[1:]| retaining
238+
// s[0].
239+
func pop[T any](s []T) (T, []T) {
240+
ret := s[0]
241+
copy(s[:], s[1:])
242+
var empty T
243+
s[len(s)-1] = empty
244+
return ret, s[:len(s)-1]
245+
}
246+
247+
// appendAbsent adds all elements in |toadd| to HashSets at the end of |absent|. It creates new HashSets
248+
// at the end of |absent| if inserting into an existing set would cause its size to exceed |sz|.
249+
//
250+
// As a special case, |appendAbsent| just returns []hash.HashSet{toadd} if |absent| is already empty.
251+
func appendAbsent(absent []hash.HashSet, toadd hash.HashSet, sz int) []hash.HashSet {
252+
if len(toadd) == 0 {
253+
return absent
254+
}
255+
// Don't bother splitting up toadd to |sz| if it is the first batch.
256+
if len(absent) == 0 {
257+
absent = append(absent, toadd)
258+
return absent
259+
}
260+
for h := range toadd {
261+
if len(absent[len(absent)-1]) >= sz {
262+
absent = append(absent, make(hash.HashSet))
263+
}
264+
absent[len(absent)-1].Insert(h)
265+
}
266+
return absent
267+
}
268+
239269
// Run by a PullChunkTracker, calls HasMany on a batch of addresses and delivers the results.
240270
func hasManyThread(ctx context.Context, hasManyer HasManyer, reqCh <-chan trackerHasManyReq, respCh chan<- trackerHasManyResp, doneCh <-chan struct{}) error {
241271
for {

0 commit comments

Comments
 (0)