Skip to content

Commit a0a9189

Browse files
committed
FAB-14699 Kafka2Raft renames & skeletons
Rename structs & vars in migration packge, in response to review comments. Add skeleton function signatures in preparation for for abort & recovery. Update comments. No functional change. Change-Id: I1d36b1011b16821ce45b78bbd96d9a180d95c292 Signed-off-by: Yoav Tock <[email protected]>
1 parent c97b9bf commit a0a9189

File tree

9 files changed

+207
-150
lines changed

9 files changed

+207
-150
lines changed

orderer/common/multichannel/registrar.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func (r *Registrar) ConsensusMigrationCommit() error {
308308
}
309309

310310
// ConsensusMigrationAbort checks pre-conditions and aborts the consensus-type migration.
311-
func (r *Registrar) ConsensusMigrationAbort() (err error) {
311+
func (r *Registrar) ConsensusMigrationAbort() error {
312312
//TODO implement the consensus-type migration abort path
313313
return fmt.Errorf("Not implemented yet")
314314
}

orderer/common/multichannel/util_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func (mc *mockConsenter) HandleChain(support consensus.ConsenterSupport, metadat
3030
support: support,
3131
metadata: metadata,
3232
done: make(chan struct{}),
33-
migrationStatus: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()),
33+
migrationStatus: migration.NewManager(support.IsSystemChannel(), support.ChainID()),
3434
}, nil
3535
}
3636

orderer/consensus/etcdraft/chain.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func NewChain(
267267
},
268268
logger: lg,
269269
opts: opts,
270-
migrationStatus: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()), // Needed by consensus-type migration
270+
migrationStatus: migration.NewManager(support.IsSystemChannel(), support.ChainID()), // Needed by consensus-type migration
271271
}
272272

273273
// DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217

orderer/consensus/inactive/inactive_chain.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,5 @@ func (c *Chain) Halt() {
4545
}
4646

4747
func (c *Chain) MigrationStatus() migration.Status {
48-
return &migration.StatusImpl{}
48+
return migration.NewManager(false, "inactive")
4949
}

orderer/consensus/kafka/chain.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func newChain(
7979
haltChan: make(chan struct{}),
8080
startChan: make(chan struct{}),
8181
doneReprocessingMsgInFlight: doneReprocessingMsgInFlight,
82-
migrationStatusStepper: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()),
82+
migrationManager: migration.NewManager(support.IsSystemChannel(), support.ChainID()),
8383
}, nil
8484
}
8585

@@ -128,12 +128,12 @@ type chainImpl struct {
128128

129129
// provides access to the consensus-type migration status of the chain,
130130
// and allows stepping through the state machine.
131-
migrationStatusStepper migration.StatusStepper
131+
migrationManager migration.Manager
132132
}
133133

134134
// MigrationStatus provides access to the consensus-type migration status of the chain.
135135
func (chain *chainImpl) MigrationStatus() migration.Status {
136-
return chain.migrationStatusStepper
136+
return chain.migrationManager
137137
}
138138

139139
// Errored returns a channel which will close when a partition consumer error
@@ -226,8 +226,9 @@ func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error {
226226
}
227227

228228
func (chain *chainImpl) order(env *cb.Envelope, configSeq uint64, originalOffset int64) error {
229-
// During consensus-type migration: stop all normal txs on the system-channel and standard-channels.
230-
if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() {
229+
// During consensus-type migration: stop all normal txs on the system-channel and standard-channels. This
230+
// happens in the broadcast-phase, and will prevent new transactions from entering Kafka.
231+
if chain.migrationManager.IsPending() || chain.migrationManager.IsCommitted() {
231232
return fmt.Errorf("[channel: %s] cannot enqueue, consensus-type migration pending", chain.ChainID())
232233
}
233234

@@ -248,7 +249,7 @@ func (chain *chainImpl) Configure(config *cb.Envelope, configSeq uint64) error {
248249

249250
func (chain *chainImpl) configure(config *cb.Envelope, configSeq uint64, originalOffset int64) error {
250251
// During consensus-type migration, stop channel creation
251-
if chain.ConsenterSupport.IsSystemChannel() && chain.migrationStatusStepper.IsPending() {
252+
if chain.ConsenterSupport.IsSystemChannel() && chain.migrationManager.IsPending() {
252253
ordererTx, err := isOrdererTx(config)
253254
if err != nil {
254255
err = errors.Wrap(err, "cannot determine if config-tx is of type ORDERER_TX, on system channel")
@@ -272,7 +273,7 @@ func (chain *chainImpl) configure(config *cb.Envelope, configSeq uint64, origina
272273
return nil
273274
}
274275

275-
// enqueue accepts a message and returns true on acceptance, or false otheriwse.
276+
// enqueue accepts a message and returns true on acceptance, or false otherwise.
276277
func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
277278
logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID())
278279
select {
@@ -855,8 +856,8 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
855856
offset = chain.lastOriginalOffsetProcessed
856857
}
857858

858-
// During consensus-type migration, drop normal messages that managed to sneak in past Order, possibly from other orderers
859-
if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() {
859+
// During consensus-type migration, drop normal messages on the channel.
860+
if chain.migrationManager.IsPending() || chain.migrationManager.IsCommitted() {
860861
logger.Warningf("[channel: %s] Normal message is dropped, consensus-type migration pending", chain.ChainID())
861862
return nil
862863
}
@@ -935,7 +936,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
935936
if doCommit {
936937
commitConfigMsg(env, offset)
937938
} else {
938-
logger.Infof("[channel: %s] Dropping config message with offset %d, because of consensus-type migration step", chain.ChainID(), receivedOffset)
939+
logger.Warningf("[channel: %s] Dropping config message with offset %d, because of consensus-type migration step", chain.ChainID(), receivedOffset)
939940
}
940941

941942
default:
@@ -968,10 +969,10 @@ func (chain *chainImpl) processMigrationStep(configTx *cb.Envelope) (commitBlock
968969
switch chdr.Type {
969970

970971
case int32(cb.HeaderType_ORDERER_TRANSACTION):
971-
if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() {
972+
if chain.migrationManager.IsPending() || chain.migrationManager.IsCommitted() {
972973
commitBlock = false
973974
logger.Debugf("[channel: %s] Consensus-type migration: Dropping ORDERER_TRANSACTION because consensus-type migration pending; Status: %s",
974-
chain.ChainID(), chain.migrationStatusStepper)
975+
chain.ChainID(), chain.migrationManager)
975976
} else {
976977
commitBlock = true
977978
}
@@ -999,8 +1000,8 @@ func (chain *chainImpl) processMigrationStep(configTx *cb.Envelope) (commitBlock
9991000
logger.Infof("[channel: %s] Consensus-type migration: Processing config tx: type: %s, state: %s, context: %d",
10001001
chain.ChainID(), nextConsensusType, nextMigState.String(), nextMigContext)
10011002

1002-
commitMigration := false // Prevent shadowing of commitBlock
1003-
commitBlock, commitMigration = chain.migrationStatusStepper.Step( // Evaluate the migration state machine
1003+
commitMigration := false // Prevent shadowing of commitBlock
1004+
commitBlock, commitMigration = chain.migrationManager.Step( // Evaluate the migration state machine
10041005
chain.ChainID(), nextConsensusType, nextMigState, nextMigContext, chain.lastCutBlockNumber, chain.consenter.migrationController())
10051006
logger.Debugf("[channel: %s] Consensus-type migration: commitBlock=%v, commitMigration=%v", chain.ChainID(), commitBlock, commitMigration)
10061007

@@ -1017,15 +1018,15 @@ func (chain *chainImpl) processMigrationStep(configTx *cb.Envelope) (commitBlock
10171018
block := chain.CreateNextBlock([]*cb.Envelope{configTx})
10181019
replacer := file.NewReplacer(chain.consenter.bootstrapFile())
10191020
if err = replacer.ReplaceGenesisBlockFile(block); err != nil {
1020-
_, context := chain.migrationStatusStepper.StateContext()
1021-
chain.migrationStatusStepper.SetStateContext(ab.ConsensusType_MIG_STATE_START, context) //Undo the commit
1021+
_, context := chain.migrationManager.StateContext()
1022+
chain.migrationManager.SetStateContext(ab.ConsensusType_MIG_STATE_START, context) //Undo the commit
10221023
logger.Warningf("[channel: %s] Consensus-type migration: Reject Config tx on system channel, cannot replace bootstrap file; Status: %s",
1023-
chain.ChainID(), chain.migrationStatusStepper.String())
1024+
chain.ChainID(), chain.migrationManager.String())
10241025
return false, err
10251026
}
10261027

10271028
logger.Infof("[channel: %s] Consensus-type migration: committed; Replaced bootstrap file: %s; Status: %s",
1028-
chain.ChainID(), chain.consenter.bootstrapFile(), chain.migrationStatusStepper.String())
1029+
chain.ChainID(), chain.consenter.bootstrapFile(), chain.migrationManager.String())
10291030
}
10301031

10311032
default:

orderer/consensus/kafka/chain_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2581,7 +2581,7 @@ func TestResubmission(t *testing.T) {
25812581
errorChan: errorChan,
25822582
haltChan: haltChan,
25832583
doneProcessingMessagesToBlocks: make(chan struct{}),
2584-
migrationStatusStepper: migration.NewStatusStepper(mockSupport.IsSystemChannel(), mockSupport.ChainID()),
2584+
migrationManager: migration.NewManager(mockSupport.IsSystemChannel(), mockSupport.ChainID()),
25852585
}
25862586

25872587
var counts []uint64
@@ -2771,7 +2771,7 @@ func TestResubmission(t *testing.T) {
27712771
haltChan: haltChan,
27722772
doneProcessingMessagesToBlocks: make(chan struct{}),
27732773
doneReprocessingMsgInFlight: doneReprocessing,
2774-
migrationStatusStepper: migration.NewStatusStepper(mockSupport.IsSystemChannel(), mockSupport.ChainID()),
2774+
migrationManager: migration.NewManager(mockSupport.IsSystemChannel(), mockSupport.ChainID()),
27752775
}
27762776

27772777
var counts []uint64

0 commit comments

Comments
 (0)