Skip to content

Commit 0504983

Browse files
committed
FAB-13669 consensus migration: kafka2raft green path #4
This is the fourth of four (4/4) sub-tasks that focus on the "green" path of consensus-type migration from Kafka to Raft. By "green" we mean that there are no failures or aborts along the way. The 4 sub-tasks are staged in a way that minimizes dependencies between them. In this sub-task we introduce changes to the etcd/raft-base OSNs such that they can restart from a ledger that was started as Kafka, migrated, and restarted. This change concludes all the changes needed to implement the green path on the "Raft" side. See respective JIRA item for further details. Change-Id: I5b408e1cfcb8cf42c39bed4df6c5496792175ef0 Signed-off-by: Yoav Tock <[email protected]>
1 parent 4950edd commit 0504983

File tree

6 files changed

+159
-8
lines changed

6 files changed

+159
-8
lines changed

orderer/common/multichannel/chainsupport.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
1616
"github.com/hyperledger/fabric/orderer/consensus"
1717
cb "github.com/hyperledger/fabric/protos/common"
18+
"github.com/hyperledger/fabric/protos/orderer"
1819
"github.com/hyperledger/fabric/protos/utils"
1920
"github.com/pkg/errors"
2021
)
@@ -68,6 +69,14 @@ func newChainSupport(
6869
// Set up the block writer
6970
cs.BlockWriter = newBlockWriter(lastBlock, registrar, cs)
7071

72+
// TODO Identify recovery after crash in the middle of consensus-type migration
73+
if cs.detectMigration(lastBlock) {
74+
// We do this because the last block after migration (COMMIT/CONTEXT) carries Kafka metadata.
75+
// This prevents the code down the line from unmarshaling it as Raft, and panicking.
76+
metadata.Value = nil
77+
logger.Debugf("[channel: %s] Consensus-type migration: restart on to Raft, resetting Kafka block metadata", cs.ChainID())
78+
}
79+
7180
// Set up the consenter
7281
consenterType := ledgerResources.SharedConfig().ConsensusType()
7382
consenter, ok := consenters[consenterType]
@@ -85,6 +94,53 @@ func newChainSupport(
8594
return cs
8695
}
8796

97+
// detectMigration identifies restart after consensus-type migration was committed (green path).
98+
// Restart after migration is detected by:
99+
// 1. The Kafka2RaftMigration capability in on
100+
// 2. The last block carries a config-tx
101+
// 3. In the config-tx, you have:
102+
// - (system-channel && state=COMMIT), OR
103+
// - (standard-channel && state=CONTEXT)
104+
// This assumes that migration was successful (green path). When migration ends successfully,
105+
// every channel will have a config block as the last block. On the system channel, containing state=COMMIT;
106+
// on standard channels, containing state=CONTEXT.
107+
func (cs *ChainSupport) detectMigration(lastBlock *cb.Block) bool {
108+
isMigration := false
109+
110+
if !cs.ledgerResources.SharedConfig().Capabilities().Kafka2RaftMigration() {
111+
return isMigration
112+
}
113+
114+
lastConfigIndex, err := utils.GetLastConfigIndexFromBlock(lastBlock)
115+
if err != nil {
116+
logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
117+
}
118+
119+
logger.Debugf("[channel: %s], sysChan=%v, lastConfigIndex=%d, H=%d, mig-state: %s",
120+
cs.ChainID(), cs.systemChannel, lastConfigIndex, cs.ledgerResources.Height(),
121+
cs.ledgerResources.SharedConfig().ConsensusMigrationState())
122+
123+
if lastConfigIndex == lastBlock.Header.Number { //The last block was a config-tx
124+
state := cs.ledgerResources.SharedConfig().ConsensusMigrationState()
125+
if cs.systemChannel {
126+
if state == orderer.ConsensusType_MIG_STATE_COMMIT {
127+
isMigration = true
128+
}
129+
} else {
130+
if state == orderer.ConsensusType_MIG_STATE_CONTEXT {
131+
isMigration = true
132+
}
133+
}
134+
135+
if isMigration {
136+
logger.Infof("[channel: %s], Restarting after consensus-type migration. New consensus-type is: %s",
137+
cs.ChainID(), cs.ledgerResources.SharedConfig().ConsensusType())
138+
}
139+
}
140+
141+
return isMigration
142+
}
143+
88144
// Block returns a block with the following number,
89145
// or nil if such a block doesn't exist.
90146
func (cs *ChainSupport) Block(number uint64) *cb.Block {

orderer/consensus/etcdraft/chain.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,14 +289,57 @@ func (c *Chain) Start() {
289289
return
290290
}
291291

292-
c.node.start(c.fresh, c.support.Height() > 1)
292+
isJoin := c.support.Height() > 1
293+
isMigration := false
294+
if isJoin {
295+
isMigration = c.detectMigration()
296+
}
297+
c.node.start(c.fresh, isJoin, isMigration)
293298
close(c.startC)
294299
close(c.errorC)
295300

296301
go c.gc()
297302
go c.serveRequest()
298303
}
299304

305+
// detectMigration detects if the orderer restarts right after consensus-type migration,
306+
// in which the Height>1 but previous blocks were created by Kafka.
307+
// If this is the case, Raft should be started like it is joining a new channel.
308+
func (c *Chain) detectMigration() bool {
309+
startOfChain := false
310+
if c.support.SharedConfig().Capabilities().Kafka2RaftMigration() {
311+
lastBlock := c.support.Block(c.support.Height() - 1)
312+
lastConfigIndex, err := utils.GetLastConfigIndexFromBlock(lastBlock)
313+
if err != nil {
314+
c.logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
315+
}
316+
317+
c.logger.Debugf("[channel: %s], detecting if consensus-type migration, sysChan=%v, lastConfigIndex=%d, H=%d, mig-state: %s",
318+
c.support.ChainID(), c.support.IsSystemChannel(), lastConfigIndex, c.support.Height(), c.support.SharedConfig().ConsensusMigrationState().String())
319+
320+
if lastConfigIndex != c.support.Height()-1 { // The last block is not a config-tx
321+
return startOfChain
322+
}
323+
324+
// The last block was a config-tx
325+
if c.support.IsSystemChannel() {
326+
if c.support.SharedConfig().ConsensusMigrationState() == orderer.ConsensusType_MIG_STATE_COMMIT {
327+
startOfChain = true
328+
}
329+
} else {
330+
if c.support.SharedConfig().ConsensusMigrationState() == orderer.ConsensusType_MIG_STATE_CONTEXT {
331+
startOfChain = true
332+
}
333+
}
334+
335+
if startOfChain {
336+
c.logger.Infof("[channel: %s], Restarting after consensus-type migration. Type: %s, just starting the channel.",
337+
c.support.ChainID(), c.support.SharedConfig().ConsensusType())
338+
}
339+
}
340+
return startOfChain
341+
}
342+
300343
// Order submits normal type transactions for ordering.
301344
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
302345
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
@@ -1038,6 +1081,15 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
10381081
return nil
10391082
}
10401083

1084+
// Detect if it is a restart right after consensus-type migration. If yes, return early in order to avoid using
1085+
// the block metadata as etcdraft.RaftMetadata (see below). Right after migration the block metadata will carry
1086+
// Kafka metadata. The etcdraft.RaftMetadata should be extracted from the ConsensusType.Metadata, instead.
1087+
if c.detectMigration() {
1088+
c.logger.Infof("[channel: %s], Restarting after consensus-type migration. Type: %s, just starting the chain.",
1089+
c.support.ChainID(), c.support.SharedConfig().ConsensusType())
1090+
return nil
1091+
}
1092+
10411093
// extract membership mapping from configuration block metadata
10421094
// and compare with Raft configuration
10431095
metadata, err := utils.GetMetadataFromBlock(lastBlock, common.BlockMetadataIndex_ORDERER)

orderer/consensus/etcdraft/chain_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2697,7 +2697,12 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
26972697

26982698
support := &consensusmocks.FakeConsenterSupport{}
26992699
support.ChainIDReturns(channel)
2700-
support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})
2700+
support.SharedConfigReturns(&mockconfig.Orderer{
2701+
BatchTimeoutVal: timeout,
2702+
CapabilitiesVal: &mockconfig.OrdererCapabilities{
2703+
Kafka2RaftMigVal: false,
2704+
},
2705+
})
27012706

27022707
cutter := mockblockcutter.NewReceiver()
27032708
close(cutter.Block)

orderer/consensus/etcdraft/consenter.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package etcdraft
88

99
import (
1010
"bytes"
11+
"encoding/hex"
1112
"path"
1213
"reflect"
1314
"time"
@@ -118,11 +119,27 @@ func (c *Consenter) detectSelfID(consenters map[uint64]*etcdraft.Consenter) (uin
118119

119120
// HandleChain returns a new Chain instance or an error upon failure
120121
func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *common.Metadata) (consensus.Chain, error) {
122+
123+
if support.SharedConfig().Capabilities().Kafka2RaftMigration() {
124+
c.Logger.Debugf("SharedConfig.ConsensusType fields: Type=%s, ConsensusMigrationState=%s, ConsensusMigrationContext=%d, ConsensusMetadata length=%d",
125+
support.SharedConfig().ConsensusType(), support.SharedConfig().ConsensusMigrationState(),
126+
support.SharedConfig().ConsensusMigrationContext(), len(support.SharedConfig().ConsensusMetadata()))
127+
if support.SharedConfig().ConsensusMigrationState() != orderer.ConsensusType_MIG_STATE_NONE {
128+
c.Logger.Debugf("SharedConfig.ConsensusType: ConsensusMetadata dump:\n%s", hex.Dump(support.SharedConfig().ConsensusMetadata()))
129+
}
130+
}
131+
121132
m := &etcdraft.Metadata{}
122133
if err := proto.Unmarshal(support.SharedConfig().ConsensusMetadata(), m); err != nil {
123134
return nil, errors.Wrap(err, "failed to unmarshal consensus metadata")
124135
}
125136

137+
if support.SharedConfig().Capabilities().Kafka2RaftMigration() &&
138+
support.SharedConfig().ConsensusMigrationState() != orderer.ConsensusType_MIG_STATE_NONE {
139+
c.Logger.Debugf("SharedConfig().ConsensusMetadata(): %s", m.String())
140+
c.Logger.Debugf("block metadata.Value dump: \n%s", hex.Dump(metadata.Value))
141+
}
142+
126143
if m.Options == nil {
127144
return nil, errors.New("etcdraft options have not been provided")
128145
}

orderer/consensus/etcdraft/consenter_test.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,12 @@ var _ = Describe("Consenter", func() {
152152
},
153153
}
154154
metadata := utils.MarshalOrPanic(m)
155-
support.SharedConfigReturns(&mockconfig.Orderer{ConsensusMetadataVal: metadata})
155+
support.SharedConfigReturns(&mockconfig.Orderer{
156+
ConsensusMetadataVal: metadata,
157+
CapabilitiesVal: &mockconfig.OrdererCapabilities{
158+
Kafka2RaftMigVal: false,
159+
},
160+
})
156161

157162
consenter := newConsenter(chainGetter)
158163
consenter.EtcdRaftConfig.WALDir = walDir
@@ -181,7 +186,12 @@ var _ = Describe("Consenter", func() {
181186
}
182187
metadata := utils.MarshalOrPanic(m)
183188
support := &consensusmocks.FakeConsenterSupport{}
184-
support.SharedConfigReturns(&mockconfig.Orderer{ConsensusMetadataVal: metadata})
189+
support.SharedConfigReturns(&mockconfig.Orderer{
190+
ConsensusMetadataVal: metadata,
191+
CapabilitiesVal: &mockconfig.OrdererCapabilities{
192+
Kafka2RaftMigVal: false,
193+
},
194+
})
185195
support.ChainIDReturns("foo")
186196

187197
consenter := newConsenter(chainGetter)
@@ -200,7 +210,12 @@ var _ = Describe("Consenter", func() {
200210
},
201211
}
202212
metadata := utils.MarshalOrPanic(m)
203-
support.SharedConfigReturns(&mockconfig.Orderer{ConsensusMetadataVal: metadata})
213+
support.SharedConfigReturns(&mockconfig.Orderer{
214+
ConsensusMetadataVal: metadata,
215+
CapabilitiesVal: &mockconfig.OrdererCapabilities{
216+
Kafka2RaftMigVal: false,
217+
},
218+
})
204219

205220
consenter := newConsenter(chainGetter)
206221

orderer/consensus/etcdraft/node.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,19 @@ type node struct {
4141
raft.Node
4242
}
4343

44-
func (n *node) start(fresh, join bool) {
44+
func (n *node) start(fresh, join, migration bool) {
4545
raftPeers := RaftPeers(n.metadata.Consenters)
46+
n.logger.Debugf("Starting raft node: #peers: %v", len(raftPeers))
4647

4748
if fresh {
4849
if join {
49-
raftPeers = nil
50-
n.logger.Info("Starting raft node to join an existing channel")
50+
if !migration {
51+
raftPeers = nil
52+
n.logger.Info("Starting raft node to join an existing channel")
53+
54+
} else {
55+
n.logger.Info("Starting raft node to join an existing channel, after consensus-type migration")
56+
}
5157
} else {
5258
n.logger.Info("Starting raft node as part of a new channel")
5359
}

0 commit comments

Comments
 (0)