Skip to content

Commit f13fdca

Browse files
authored
Merge pull request hyperledger#2 from Param-S/attestationservice
New Block Attestation service
2 parents e0d1ec6 + 91f851e commit f13fdca

File tree

6 files changed

+729
-0
lines changed

6 files changed

+729
-0
lines changed

common/deliver/deliver.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,3 +398,21 @@ func (h *Handler) validateChannelHeader(ctx context.Context, chdr *cb.ChannelHea
398398
func noExpiration(_ []byte) time.Time {
399399
return time.Time{}
400400
}
401+
402+
func (h *Handler) HandleAttestation(ctx context.Context, srv *Server, env *cb.Envelope) error {
403+
status, err := h.deliverBlocks(ctx, srv, env)
404+
if err != nil {
405+
return err
406+
}
407+
408+
err = srv.SendStatusResponse(status)
409+
if status != cb.Status_SUCCESS {
410+
return err
411+
}
412+
if err != nil {
413+
addr := util.ExtractRemoteAddress(ctx)
414+
logger.Warningf("Error sending to %s: %s", addr, err)
415+
return err
416+
}
417+
return nil
418+
}

common/deliver/deliver_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,16 @@ var _ = Describe("Deliver", func() {
651651
Expect(resp).To(Equal(cb.Status_SUCCESS))
652652
})
653653

654+
It("HandleAttestation sends requested block", func() {
655+
err := handler.HandleAttestation(context.Background(), server, envelope)
656+
Expect(err).NotTo(HaveOccurred())
657+
Expect(fakeResponseSender.SendBlockResponseCallCount()).To(Equal(1))
658+
b, _, _, _ := fakeResponseSender.SendBlockResponseArgsForCall(0)
659+
Expect(b).To(Equal(&cb.Block{
660+
Header: &cb.BlockHeader{Number: 100},
661+
}))
662+
})
663+
654664
Context("when sending the success status fails", func() {
655665
BeforeEach(func() {
656666
fakeResponseSender.SendStatusResponseReturns(errors.New("send-success-fails"))
@@ -660,6 +670,11 @@ var _ = Describe("Deliver", func() {
660670
err := handler.Handle(context.Background(), server)
661671
Expect(err).To(MatchError("send-success-fails"))
662672
})
673+
674+
It("HandleAttestation returns the error", func() {
675+
err := handler.HandleAttestation(context.Background(), server, envelope)
676+
Expect(err).To(MatchError("send-success-fails"))
677+
})
663678
})
664679

665680
Context("when receive fails", func() {
@@ -703,6 +718,15 @@ var _ = Describe("Deliver", func() {
703718
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
704719
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
705720
})
721+
722+
It("sends a bad envelope to HandleAttestation", func() {
723+
err := handler.HandleAttestation(context.Background(), server, envelope)
724+
Expect(err).NotTo(HaveOccurred())
725+
726+
Expect(fakeResponseSender.SendStatusResponseCallCount()).To(Equal(1))
727+
resp := fakeResponseSender.SendStatusResponseArgsForCall(0)
728+
Expect(resp).To(Equal(cb.Status_BAD_REQUEST))
729+
})
706730
})
707731

708732
Context("when unmarshalling the channel header fails", func() {
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
Copyright IBM Corp. 2017 All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package server
8+
9+
import (
10+
"runtime/debug"
11+
"time"
12+
13+
cb "github.com/hyperledger/fabric-protos-go/common"
14+
ab "github.com/hyperledger/fabric-protos-go/orderer"
15+
"github.com/hyperledger/fabric/common/deliver"
16+
"github.com/hyperledger/fabric/common/metrics"
17+
"github.com/hyperledger/fabric/common/policies"
18+
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
19+
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
20+
"github.com/hyperledger/fabric/orderer/common/multichannel"
21+
"github.com/hyperledger/fabric/protoutil"
22+
"github.com/pkg/errors"
23+
)
24+
25+
type attestationserver struct {
26+
dh *deliver.Handler
27+
debug *localconfig.Debug
28+
*multichannel.Registrar
29+
}
30+
31+
// NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader
32+
func NewAttestationService(
33+
r *multichannel.Registrar,
34+
metricsProvider metrics.Provider,
35+
debug *localconfig.Debug,
36+
timeWindow time.Duration,
37+
mutualTLS bool,
38+
expirationCheckDisabled bool,
39+
) ab.BlockAttestationsServer {
40+
s := &attestationserver{
41+
dh: deliver.NewHandler(deliverSupport{Registrar: r}, timeWindow, mutualTLS, deliver.NewMetrics(metricsProvider), expirationCheckDisabled),
42+
debug: debug,
43+
Registrar: r,
44+
}
45+
return s
46+
}
47+
48+
func (s *attestationserver) BlockAttestations(env *cb.Envelope, strm ab.BlockAttestations_BlockAttestationsServer) error {
49+
logger.Debugf("Starting new handler for block attestation")
50+
defer func() {
51+
if r := recover(); r != nil {
52+
logger.Criticalf("block attestation client triggered panic: %s\n%s", r, debug.Stack())
53+
}
54+
logger.Debugf("Closing attestation server stream")
55+
}()
56+
57+
policyChecker := func(env *cb.Envelope, channelID string) error {
58+
chain := s.GetChain(channelID)
59+
if chain == nil {
60+
return errors.Errorf("channel %s not found", channelID)
61+
}
62+
// In maintenance mode, we typically require the signature of /Channel/Orderer/Readers.
63+
// This will block Deliver requests from peers (which normally satisfy /Channel/Readers).
64+
sf := msgprocessor.NewSigFilter(policies.ChannelReaders, policies.ChannelOrdererReaders, chain)
65+
return sf.Apply(env)
66+
}
67+
attestationServer := &deliver.Server{
68+
PolicyChecker: deliver.PolicyCheckerFunc(policyChecker),
69+
ResponseSender: &attestationSender{
70+
BlockAttestations_BlockAttestationsServer: strm,
71+
},
72+
}
73+
return s.dh.HandleAttestation(strm.Context(), attestationServer, env)
74+
}
75+
76+
type attestationSender struct {
77+
ab.BlockAttestations_BlockAttestationsServer
78+
}
79+
80+
func (rs *attestationSender) SendStatusResponse(status cb.Status) error {
81+
reply := &ab.BlockAttestationResponse{
82+
Type: &ab.BlockAttestationResponse_Status{Status: status},
83+
}
84+
return rs.Send(reply)
85+
}
86+
87+
func (rs *attestationSender) SendBlockResponse(
88+
block *cb.Block,
89+
channelID string,
90+
chain deliver.Chain,
91+
signedData *protoutil.SignedData,
92+
) error {
93+
blockAttestation := ab.BlockAttestation{Header: block.Header, Metadata: block.Metadata}
94+
response := &ab.BlockAttestationResponse{
95+
Type: &ab.BlockAttestationResponse_BlockAttestation{BlockAttestation: &blockAttestation},
96+
}
97+
return rs.Send(response)
98+
}
99+
100+
func (rs *attestationSender) DataType() string {
101+
return "block_attestation"
102+
}

0 commit comments

Comments
 (0)