Skip to content

Commit 4e64b7f

Browse files
authored
Simplify Snapshots (#2475)
We had a complicated mechanism to keep track of which transactions have been concluded, and which Raft proposals corresponded to those transactions. This required us to have a txnMark watermark, which kept track of all applied mutations and their Raft indices, which is quite confusing. Instead, now we just make use of the StartTs stored in proposal.Mutations and the fact that we have a Oracle which already knows the status of transactions. I make use of those givens to remove txnMarks and simplify how we do snapshots. Further more, I now propose snapshot indices so that the entire Raft group is synchronized on the same snapshot index, calculated by the leader. This also makes it clear that only leader does the snapshot calculation, and the corresponding action of aborting old transactions, with no health signal for some time. Also, rename MaxPending to MaxAssigned, which is a better term for what the variable holds, i.e. MaxAssigned timestamp by Zero so far.
1 parent 2e044e1 commit 4e64b7f

File tree

13 files changed

+889
-529
lines changed

13 files changed

+889
-529
lines changed

contrib/scripts/functions.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ function runCluster {
1010
go build . && go install . && md5sum dgraph $GOPATH/bin/dgraph
1111
DATA="/tmp/dg" docker-compose up --force-recreate --remove-orphans --detach
1212
popd
13-
$basedir/contrib/wait-for-it.sh localhost:6080
14-
$basedir/contrib/wait-for-it.sh localhost:9180
13+
$basedir/contrib/wait-for-it.sh -t 60 localhost:6080
14+
$basedir/contrib/wait-for-it.sh -t 60 localhost:9180
1515
sleep 10 # Sleep 10 seconds to get things ready.
1616
}

dgraph/cmd/zero/oracle.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ type Oracle struct {
2828
x.SafeMutex
2929
commits map[uint64]uint64 // startTs -> commitTs
3030
// TODO: Check if we need LRU.
31-
rowCommit map[string]uint64 // fp(key) -> commitTs. Used to detect conflict.
32-
aborts map[uint64]struct{} // key is startTs
33-
maxPending uint64 // max transaction startTs given out by us.
31+
rowCommit map[string]uint64 // fp(key) -> commitTs. Used to detect conflict.
32+
aborts map[uint64]struct{} // key is startTs
33+
maxAssigned uint64 // max transaction assigned by us.
3434

3535
// timestamp at the time of start of server or when it became leader. Used to detect conflicts.
3636
tmax uint64
@@ -131,7 +131,7 @@ func (o *Oracle) currentState() *intern.OracleDelta {
131131
for abort := range o.aborts {
132132
resp.Aborts = append(resp.Aborts, abort)
133133
}
134-
resp.MaxPending = o.maxPending
134+
resp.MaxAssigned = o.maxAssigned
135135
return resp
136136
}
137137

@@ -169,8 +169,8 @@ func (o *Oracle) sendDeltasToSubscribers() {
169169
slurp_loop:
170170
for {
171171
// Consume tctx.
172-
if update.MaxPending > delta.MaxPending {
173-
delta.MaxPending = update.MaxPending
172+
if update.MaxAssigned > delta.MaxAssigned {
173+
delta.MaxAssigned = update.MaxAssigned
174174
}
175175
for _, startTs := range update.Aborts {
176176
delta.Aborts = append(delta.Aborts, startTs)
@@ -244,19 +244,19 @@ func (o *Oracle) storePending(ids *api.AssignedIds) {
244244
// Wait to finish up processing everything before start id.
245245
o.doneUntil.WaitForMark(context.Background(), ids.EndId)
246246
// Now send it out to updates.
247-
o.updates <- &intern.OracleDelta{MaxPending: ids.EndId}
247+
o.updates <- &intern.OracleDelta{MaxAssigned: ids.EndId}
248248
o.Lock()
249249
defer o.Unlock()
250250
max := ids.EndId
251-
if o.maxPending < max {
252-
o.maxPending = max
251+
if o.maxAssigned < max {
252+
o.maxAssigned = max
253253
}
254254
}
255255

256256
func (o *Oracle) MaxPending() uint64 {
257257
o.RLock()
258258
defer o.RUnlock()
259-
return o.maxPending
259+
return o.maxAssigned
260260
}
261261

262262
var errConflict = errors.New("Transaction conflict")
@@ -317,8 +317,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
317317
// map to be keyed by uint64, which would be cheaper. But, unsure about the repurcussions of
318318
// that. It would save some memory. So, worth a try.
319319

320-
var num intern.Num
321-
num.Val = 1
320+
num := intern.Num{Val: 1}
322321
assigned, err := s.lease(ctx, &num, true)
323322
if err != nil {
324323
return err

dgraph/docker-compose.yml

Lines changed: 69 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -20,42 +20,42 @@ services:
2020
# - type: bind
2121
# source: "${DATA}"
2222
# target: /data
23-
zero2:
24-
image: debian:latest
25-
container_name: bank-dg0.2
26-
command: /gobin/dgraph zero --my=zero2:5080 --replicas 3 --peer=zero1:5080 --idx 2
27-
volumes:
28-
- type: bind
29-
source: $GOPATH/bin
30-
target: /gobin
31-
read_only: true
32-
zero3:
33-
image: debian:latest
34-
container_name: bank-dg0.3
35-
command: /gobin/dgraph zero --my=zero3:5080 --replicas 3 --peer=zero1:5080 --idx 3
36-
volumes:
37-
- type: bind
38-
source: $GOPATH/bin
39-
target: /gobin
40-
read_only: true
41-
zero4:
42-
image: debian:latest
43-
container_name: bank-dg0.4
44-
command: /gobin/dgraph zero --my=zero4:5080 --replicas 3 --peer=zero1:5080 --idx 4
45-
volumes:
46-
- type: bind
47-
source: $GOPATH/bin
48-
target: /gobin
49-
read_only: true
50-
zero5:
51-
image: debian:latest
52-
container_name: bank-dg0.5
53-
command: /gobin/dgraph zero --my=zero5:5080 --replicas 3 --peer=zero1:5080 --idx 5
54-
volumes:
55-
- type: bind
56-
source: $GOPATH/bin
57-
target: /gobin
58-
read_only: true
23+
# zero2:
24+
# image: debian:latest
25+
# container_name: bank-dg0.2
26+
# command: /gobin/dgraph zero --my=zero2:5080 --replicas 3 --peer=zero1:5080 --idx 2
27+
# volumes:
28+
# - type: bind
29+
# source: $GOPATH/bin
30+
# target: /gobin
31+
# read_only: true
32+
# zero3:
33+
# image: debian:latest
34+
# container_name: bank-dg0.3
35+
# command: /gobin/dgraph zero --my=zero3:5080 --replicas 3 --peer=zero1:5080 --idx 3
36+
# volumes:
37+
# - type: bind
38+
# source: $GOPATH/bin
39+
# target: /gobin
40+
# read_only: true
41+
# zero4:
42+
# image: debian:latest
43+
# container_name: bank-dg0.4
44+
# command: /gobin/dgraph zero --my=zero4:5080 --replicas 3 --peer=zero1:5080 --idx 4
45+
# volumes:
46+
# - type: bind
47+
# source: $GOPATH/bin
48+
# target: /gobin
49+
# read_only: true
50+
# zero5:
51+
# image: debian:latest
52+
# container_name: bank-dg0.5
53+
# command: /gobin/dgraph zero --my=zero5:5080 --replicas 3 --peer=zero1:5080 --idx 5
54+
# volumes:
55+
# - type: bind
56+
# source: $GOPATH/bin
57+
# target: /gobin
58+
# read_only: true
5959

6060
dg1:
6161
image: debian:latest
@@ -73,6 +73,7 @@ services:
7373
- 8180:8180
7474
- 9180:9180
7575
command: /gobin/dgraph server --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --profile_mode block --block_rate 10
76+
7677
dg2:
7778
image: debian:latest
7879
container_name: bank-dg2
@@ -108,37 +109,37 @@ services:
108109
command: /gobin/dgraph server --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --profile_mode block --block_rate 10
109110

110111

111-
dg4:
112-
image: debian:latest
113-
container_name: bank-dg4
114-
working_dir: /data/dg4
115-
volumes:
116-
- type: bind
117-
source: $GOPATH/bin
118-
target: /gobin
119-
read_only: true
120-
# - type: bind
121-
# source: "${DATA}"
122-
# target: /data
123-
ports:
124-
- 8184:8184
125-
- 9184:9184
126-
command: /gobin/dgraph server --my=dg4:7184 --lru_mb=1024 --zero=zero1:5080 -o 104 --expose_trace --trace 1.0 --profile_mode block --block_rate 10
112+
# dg4:
113+
# image: debian:latest
114+
# container_name: bank-dg4
115+
# working_dir: /data/dg4
116+
# volumes:
117+
# - type: bind
118+
# source: $GOPATH/bin
119+
# target: /gobin
120+
# read_only: true
121+
# # - type: bind
122+
# # source: "${DATA}"
123+
# # target: /data
124+
# ports:
125+
# - 8184:8184
126+
# - 9184:9184
127+
# command: /gobin/dgraph server --my=dg4:7184 --lru_mb=1024 --zero=zero1:5080 -o 104 --expose_trace --trace 1.0 --profile_mode block --block_rate 10
127128

128-
dg5:
129-
image: debian:latest
130-
container_name: bank-dg5
131-
working_dir: /data/dg5
132-
volumes:
133-
- type: bind
134-
source: $GOPATH/bin
135-
target: /gobin
136-
read_only: true
137-
# - type: bind
138-
# source: "${DATA}"
139-
# target: /data
140-
ports:
141-
- 8185:8185
142-
- 9185:9185
143-
command: /gobin/dgraph server --my=dg5:7185 --lru_mb=1024 --zero=zero1:5080 -o 105 --expose_trace --trace 1.0 --profile_mode block --block_rate 10
129+
# dg5:
130+
# image: debian:latest
131+
# container_name: bank-dg5
132+
# working_dir: /data/dg5
133+
# volumes:
134+
# - type: bind
135+
# source: $GOPATH/bin
136+
# target: /gobin
137+
# read_only: true
138+
# # - type: bind
139+
# # source: "${DATA}"
140+
# # target: /data
141+
# ports:
142+
# - 8185:8185
143+
# - 9185:9185
144+
# command: /gobin/dgraph server --my=dg5:7185 --lru_mb=1024 --zero=zero1:5080 -o 105 --expose_trace --trace 1.0 --profile_mode block --block_rate 10
144145

posting/mvcc.go

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,15 @@ var (
2929
txnMarks *x.WaterMark // Used to find out till what RAFT index we can snapshot entries.
3030
)
3131

32+
type transactions struct {
33+
x.SafeMutex
34+
m map[uint64]*Txn
35+
}
36+
3237
func init() {
3338
txns = new(transactions)
3439
txns.m = make(map[uint64]*Txn)
35-
txnMarks = &x.WaterMark{Name: "Transaction watermark"}
36-
txnMarks.Init()
37-
}
38-
39-
func TxnMarks() *x.WaterMark {
40-
return txnMarks
4140
}
42-
4341
func Txns() *transactions {
4442
return txns
4543
}
@@ -52,6 +50,8 @@ type delta struct {
5250
posting *intern.Posting
5351
checkConflict bool // Check conflict detection.
5452
}
53+
54+
// TODO: This structure can be merged back into Oracle.
5555
type Txn struct {
5656
StartTs uint64
5757

@@ -66,11 +66,7 @@ type Txn struct {
6666
nextKeyIdx int
6767
}
6868

69-
type transactions struct {
70-
x.SafeMutex
71-
m map[uint64]*Txn
72-
}
73-
69+
// TODO: What is this for?
7470
func (t *transactions) MinTs() uint64 {
7571
t.Lock()
7672
var minTs uint64
@@ -94,28 +90,9 @@ func (t *transactions) MinTs() uint64 {
9490
return minTs
9591
}
9692

97-
func (t *transactions) TxnsSinceSnapshot(pending uint64) []uint64 {
98-
lastSnapshotIdx := TxnMarks().DoneUntil()
99-
var timestamps []uint64
100-
t.Lock()
101-
defer t.Unlock()
102-
var oldest float64 = 0.2 * float64(pending)
103-
for _, txn := range t.m {
104-
index := txn.startIdx()
105-
// We abort oldest 20% of the transactions.
106-
if index-lastSnapshotIdx <= uint64(oldest) {
107-
timestamps = append(timestamps, txn.StartTs)
108-
}
109-
}
110-
return timestamps
111-
}
112-
11393
func (t *transactions) Reset() {
11494
t.Lock()
11595
defer t.Unlock()
116-
for _, txn := range t.m {
117-
txn.done()
118-
}
11996
t.m = make(map[uint64]*Txn)
12097
}
12198

@@ -158,21 +135,9 @@ func (t *transactions) Get(startTs uint64) *Txn {
158135
func (t *transactions) Done(startTs uint64) {
159136
t.Lock()
160137
defer t.Unlock()
161-
txn, ok := t.m[startTs]
162-
if !ok {
163-
return
164-
}
165-
txn.done()
166138
delete(t.m, startTs)
167139
}
168140

169-
func (t *Txn) done() {
170-
t.Lock()
171-
defer t.Unlock()
172-
// All indices should have been added by now.
173-
TxnMarks().DoneMany(t.Indices)
174-
}
175-
176141
// LastIndex returns the index of last prewrite proposal associated with
177142
// the transaction.
178143
func (t *Txn) LastIndex() uint64 {

0 commit comments

Comments
 (0)