forked from CognitionFoundry/gohfc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
orderer.go
155 lines (141 loc) · 4.32 KB
/
orderer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
Copyright: Cognition Foundry. All Rights Reserved.
License: Apache License Version 2.0
*/
package gohfc
import (
"google.golang.org/grpc"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"context"
"fmt"
"google.golang.org/grpc/credentials"
"github.com/golang/protobuf/proto"
"time"
)
// Orderer expose API's to communicate with orderers.
type Orderer struct {
Name string
Uri string
Opts []grpc.DialOption
caPath string
con *grpc.ClientConn
}
const timeout = 5
// Broadcast Broadcast envelope to orderer for execution.
func (o *Orderer) Broadcast(envelope *common.Envelope) (*orderer.BroadcastResponse, error) {
if o.con ==nil{
c, err := grpc.Dial(o.Uri, o.Opts...)
if err != nil {
return nil, fmt.Errorf("cannot connect to orderer: %s err is: %v", o.Name, err)
}
o.con=c
}
bcc, err := orderer.NewAtomicBroadcastClient(o.con).Broadcast(context.Background())
if err != nil {
return nil, err
}
defer bcc.CloseSend()
bcc.Send(envelope)
response, err:= bcc.Recv()
if err != nil {
return nil, err
}
if response.Status != common.Status_SUCCESS {
return nil,fmt.Errorf("unexpected status: %v", response.Status)
}
return response,err
}
// Deliver delivers envelope to orderer. Please note that new connection will be created on every call of Deliver.
func (o *Orderer) Deliver(envelope *common.Envelope) (*common.Block, error) {
connection, err := grpc.Dial(o.Uri, o.Opts...)
if err != nil {
return nil, fmt.Errorf("cannot connect to orderer: %s err is: %v", o.Name, err)
}
defer connection.Close()
dk, err := orderer.NewAtomicBroadcastClient(connection).Deliver(context.Background())
if err != nil {
return nil, err
}
if err := dk.Send(envelope); err != nil {
return nil, err
}
var block *common.Block
timer := time.NewTimer(time.Second * time.Duration(timeout))
defer timer.Stop()
for {
select {
case <-timer.C:
return nil, ErrOrdererTimeout
default:
response, err := dk.Recv()
if err != nil {
return nil, err
}
switch t := response.Type.(type) {
case *orderer.DeliverResponse_Status:
if t.Status == common.Status_SUCCESS {
return block, nil
}
if t.Status != common.Status_SUCCESS {
return nil, fmt.Errorf("orderer response with status: %v", t.Status)
}
case *orderer.DeliverResponse_Block:
block = response.GetBlock()
default:
return nil, fmt.Errorf("unknown response type from orderer: %s", t)
}
}
}
}
func (o *Orderer) getGenesisBlock(identity *Identity, crypto CryptoSuite, channel *Channel) (*common.Block, error) {
seekInfo := &orderer.SeekInfo{
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: 0}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: 0}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}
seekInfoBytes, err := proto.Marshal(seekInfo)
if err != nil {
return nil, err
}
creator, err := marshalProtoIdentity(identity, channel)
if err != nil {
return nil, err
}
txId, err := newTransactionId(creator)
if err != nil {
return nil, err
}
headerBytes, err := channelHeader(common.HeaderType_DELIVER_SEEK_INFO, txId, channel, 0, nil)
signatureHeaderBytes, err := signatureHeader(creator, txId)
if err != nil {
return nil, err
}
header := header(signatureHeaderBytes, headerBytes)
payloadBytes, err := payload(header, seekInfoBytes)
if err != nil {
return nil, err
}
payloadSignedBytes, err := crypto.Sign(payloadBytes, identity.PrivateKey)
if err != nil {
return nil, err
}
env := &common.Envelope{Payload: payloadBytes, Signature: payloadSignedBytes}
return o.Deliver(env)
}
// NewOrdererFromConfig create new Orderer from config
func NewOrdererFromConfig(conf OrdererConfig) (*Orderer, error) {
o := Orderer{Uri: conf.Host, caPath: conf.TlsPath}
if conf.Insecure {
o.Opts = []grpc.DialOption{grpc.WithInsecure()}
} else if o.caPath != "" {
creds, err := credentials.NewClientTLSFromFile(o.caPath, "")
if err != nil {
return nil, fmt.Errorf("cannot read orderer %s credentials err is: %v", o.Name, err)
}
o.Opts = append(o.Opts, grpc.WithTransportCredentials(creds))
}
o.Opts = append(o.Opts, grpc.WithBlock())
o.Opts = append(o.Opts, grpc.WithTimeout(3*time.Second))
return &o, nil
}