@@ -546,9 +546,13 @@ func (n *node) retrieveSnapshot(snap pb.Snapshot) error {
546
546
547
547
func (n * node ) proposeSnapshot (discardN int ) error {
548
548
snap , err := n .calculateSnapshot (discardN )
549
- if err != nil || snap == nil {
549
+ if err != nil {
550
+ glog .Warningf ("Got error while calculating snapshot: %v" , err )
550
551
return err
551
552
}
553
+ if snap == nil {
554
+ return nil
555
+ }
552
556
proposal := & pb.Proposal {
553
557
Snapshot : snap ,
554
558
}
@@ -959,7 +963,7 @@ func (n *node) abortOldTransactions() {
959
963
// At i7, min pending start ts = S3, therefore snapshotIdx = i5 - 1 = i4.
960
964
// At i7, max commit ts = C1, therefore readTs = C1.
961
965
func (n * node ) calculateSnapshot (discardN int ) (* pb.Snapshot , error ) {
962
- _ , span := otrace .StartSpan (n .ctx , "Propose .Snapshot" )
966
+ _ , span := otrace .StartSpan (n .ctx , "Calculate .Snapshot" )
963
967
defer span .End ()
964
968
965
969
if atomic .LoadInt32 (& n .streaming ) > 0 {
@@ -974,6 +978,18 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
974
978
}
975
979
span .Annotatef (nil , "First index: %d" , first )
976
980
981
+ rsnap , err := n .Store .Snapshot ()
982
+ if err != nil {
983
+ return nil , err
984
+ }
985
+ var snap pb.Snapshot
986
+ if len (rsnap .Data ) > 0 {
987
+ if err := snap .Unmarshal (rsnap .Data ); err != nil {
988
+ return nil , err
989
+ }
990
+ }
991
+ span .Annotatef (nil , "Last snapshot: %+v" , snap )
992
+
977
993
last := n .Applied .DoneUntil ()
978
994
if int (last - first ) < discardN {
979
995
span .Annotate (nil , "Skipping due to insufficient entries" )
@@ -999,7 +1015,8 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
999
1015
// snapshotIdx. In any case, we continue picking up txn updates, to generate
1000
1016
// a maxCommitTs, which would become the readTs for the snapshot.
1001
1017
minPendingStart := posting .Oracle ().MinPendingStartTs ()
1002
- var maxCommitTs , snapshotIdx , maxCommitIdx uint64
1018
+ maxCommitTs := snap .ReadTs
1019
+ var snapshotIdx uint64
1003
1020
for _ , entry := range entries {
1004
1021
if entry .Type != raftpb .EntryNormal {
1005
1022
continue
@@ -1019,7 +1036,6 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
1019
1036
for _ , txn := range proposal .Delta .GetTxns () {
1020
1037
maxCommitTs = x .Max (maxCommitTs , txn .CommitTs )
1021
1038
}
1022
- maxCommitIdx = entry .Index
1023
1039
}
1024
1040
}
1025
1041
if maxCommitTs == 0 {
@@ -1029,8 +1045,10 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
1029
1045
if snapshotIdx <= 0 {
1030
1046
// It is possible that there are no pending transactions. In that case,
1031
1047
// snapshotIdx would be zero.
1032
- span .Annotatef (nil , "Using maxCommitIdx as snapshotIdx: %d" , maxCommitIdx )
1033
- snapshotIdx = maxCommitIdx
1048
+ if len (entries ) > 0 {
1049
+ snapshotIdx = entries [len (entries )- 1 ].Index
1050
+ }
1051
+ span .Annotatef (nil , "snapshotIdx is zero. Using last entry's index: %d" , snapshotIdx )
1034
1052
}
1035
1053
1036
1054
numDiscarding := snapshotIdx - first + 1
@@ -1045,13 +1063,13 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
1045
1063
return nil , nil
1046
1064
}
1047
1065
1048
- snap := & pb.Snapshot {
1066
+ result := & pb.Snapshot {
1049
1067
Context : n .RaftContext ,
1050
1068
Index : snapshotIdx ,
1051
1069
ReadTs : maxCommitTs ,
1052
1070
}
1053
- span .Annotatef (nil , "Got snapshot: %+v" , snap )
1054
- return snap , nil
1071
+ span .Annotatef (nil , "Got snapshot: %+v" , result )
1072
+ return result , nil
1055
1073
}
1056
1074
1057
1075
func (n * node ) joinPeers () error {
0 commit comments