Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.

Commit cec7f86

Browse files
authored
Merge pull request #284 from OpenBazaar/TS_shutdown_gateway
REFACTOR: Make API Gateway closable.
2 parents 7034447 + 7bb5c28 commit cec7f86

File tree

2 files changed

+87
-105
lines changed

2 files changed

+87
-105
lines changed

api/gateway.go

Lines changed: 41 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,35 @@
11
package api
22

33
import (
4-
manet "gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
5-
"gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
64
"net"
75
"net/http"
86
"time"
97

108
"github.com/OpenBazaar/openbazaar-go/core"
119
"github.com/OpenBazaar/openbazaar-go/repo"
12-
"github.com/ipfs/go-ipfs/commands"
1310
"github.com/ipfs/go-ipfs/core/corehttp"
1411
"github.com/op/go-logging"
1512
)
1613

1714
var log = logging.MustGetLogger("api")
1815

19-
func makeHandler(n *core.OpenBazaarNode, ctx commands.Context, authCookie http.Cookie, l net.Listener, config repo.APIConfig, options ...corehttp.ServeOption) (http.Handler, error) {
16+
// Gateway represents an HTTP API gateway
17+
type Gateway struct {
18+
listener net.Listener
19+
handler http.Handler
20+
config repo.APIConfig
21+
shutdownCh chan struct{}
22+
}
23+
24+
// NewGateway instantiates a new `Gateway`
25+
func NewGateway(n *core.OpenBazaarNode, authCookie http.Cookie, l net.Listener, config repo.APIConfig, options ...corehttp.ServeOption) (*Gateway, error) {
2026
topMux := http.NewServeMux()
2127

2228
restAPI, err := newJsonAPIHandler(n, authCookie, config)
2329
if err != nil {
2430
return nil, err
2531
}
26-
wsAPI, err := newWSAPIHandler(n, ctx, config.Authenticated, authCookie, config.Username, config.Password)
32+
wsAPI, err := newWSAPIHandler(n, n.Context, config.Authenticated, authCookie, config.Username, config.Password)
2733
if err != nil {
2834
return nil, err
2935
}
@@ -35,66 +41,47 @@ func makeHandler(n *core.OpenBazaarNode, ctx commands.Context, authCookie http.C
3541

3642
mux := topMux
3743
for _, option := range options {
38-
var err error
3944
mux, err = option(n.IpfsNode, l, mux)
4045
if err != nil {
4146
return nil, err
4247
}
4348
}
44-
return topMux, nil
45-
}
4649

47-
func Serve(cb chan<- bool, node *core.OpenBazaarNode, ctx commands.Context, authCookie http.Cookie, lis net.Listener, config repo.APIConfig, options ...corehttp.ServeOption) error {
48-
handler, err := makeHandler(node, ctx, authCookie, lis, config, options...)
49-
cb <- true
50-
if err != nil {
51-
return err
52-
}
50+
return &Gateway{
51+
listener: l,
52+
handler: topMux,
53+
config: config,
54+
shutdownCh: make(chan struct{}),
55+
}, nil
56+
}
5357

54-
addr, err := manet.FromNetAddr(lis.Addr())
55-
if err != nil {
56-
return err
57-
}
58+
// Close shutsdown the Gateway listener
59+
func (g *Gateway) Close() error {
60+
log.Infof("server at %s terminating...", g.listener.Addr())
5861

59-
// If the server exits beforehand
60-
var serverError error
61-
serverExited := make(chan struct{})
62+
// Print shutdown message every few seconds if we're taking too long
63+
go func() {
64+
select {
65+
case <-g.shutdownCh:
66+
return
67+
case <-time.After(5 * time.Second):
68+
log.Infof("waiting for server at %s to terminate...", g.listener.Addr())
6269

63-
node.IpfsNode.Process().Go(func(p goprocess.Process) {
64-
if config.SSL {
65-
serverError = http.ListenAndServeTLS(lis.Addr().String(), config.SSLCert, config.SSLKey, handler)
66-
} else {
67-
serverError = http.Serve(lis, handler)
6870
}
69-
close(serverExited)
70-
})
71-
72-
// Wait for server to exit
73-
select {
74-
case <-serverExited:
71+
}()
7572

76-
// If node being closed before server exits, close server
77-
case <-node.IpfsNode.Process().Closing():
78-
log.Infof("server at %s terminating...", addr)
79-
if config.SSL {
80-
close(serverExited)
81-
} else {
82-
lis.Close()
83-
}
73+
// Shutdown the listener
74+
close(g.shutdownCh)
75+
return g.listener.Close()
76+
}
8477

85-
outer:
86-
for {
87-
// Wait until server exits
88-
select {
89-
case <-serverExited:
90-
// If the server exited as we are closing, we really do not care about errors
91-
serverError = nil
92-
break outer
93-
case <-time.After(5 * time.Second):
94-
log.Infof("waiting for server at %s to terminate...", addr)
95-
}
96-
}
78+
// Serve begins listening on the configured address
79+
func (g *Gateway) Serve() error {
80+
var err error
81+
if g.config.SSL {
82+
err = http.ListenAndServeTLS(g.listener.Addr().String(), g.config.SSLCert, g.config.SSLKey, g.handler)
83+
} else {
84+
err = http.Serve(g.listener, g.handler)
9785
}
98-
log.Infof("server at %s terminated", addr)
99-
return serverError
86+
return err
10087
}

openbazaard.go

Lines changed: 46 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ var decryptDatabase DecryptDatabase
111111

112112
var parser = flags.NewParser(nil, flags.Default)
113113

114+
var ErrNoGateways = errors.New("No gateway addresses configured")
115+
114116
func main() {
115117
c := make(chan os.Signal, 1)
116118
signal.Notify(c, os.Interrupt)
@@ -505,46 +507,40 @@ func (x *Start) Execute(args []string) error {
505507
UserAgent: USERAGENT,
506508
}
507509

508-
var gwErrc <-chan error
509-
var cb <-chan bool
510-
if len(cfg.Addresses.Gateway) > 0 {
511-
if (apiConfig.SSL && apiConfig.SSLCert == "") || (apiConfig.SSL && apiConfig.SSLKey == "") {
512-
return errors.New("SSL cert and key files must be set when SSL is enabled")
513-
}
514-
err, cb, gwErrc = serveHTTPGateway(core.Node, authCookie, *apiConfig)
515-
if err != nil {
516-
log.Error(err)
517-
return err
518-
}
510+
if len(cfg.Addresses.Gateway) <= 0 {
511+
return ErrNoGateways
512+
}
513+
if (apiConfig.SSL && apiConfig.SSLCert == "") || (apiConfig.SSL && apiConfig.SSLKey == "") {
514+
return errors.New("SSL cert and key files must be set when SSL is enabled")
519515
}
520516

521-
/* Wait for gateway to start before starting the network service.
522-
This way the websocket channel we pass into the service gets created first.
523-
FIXME: There has to be a better way */
524-
for b := range cb {
525-
if b == true {
526-
core.Node.Service = service.New(core.Node, ctx, sqliteDB)
527-
MR := ret.NewMessageRetriever(sqliteDB, ctx, nd, core.Node.Service, 16, core.Node.SendOfflineAck)
528-
go MR.Run()
529-
core.Node.MessageRetriever = MR
530-
PR := rep.NewPointerRepublisher(nd, sqliteDB)
531-
go PR.Run()
532-
core.Node.PointerRepublisher = PR
533-
if !x.DisableWallet {
534-
MR.Wait()
535-
TL := lis.NewTransactionListener(core.Node.Datastore, core.Node.Broadcast, core.Node.Wallet.Params())
536-
wallet.AddTransactionListener(TL.OnTransactionReceived)
537-
log.Info("Starting bitcoin wallet...")
538-
go wallet.Start()
539-
}
540-
core.Node.UpdateFollow()
541-
core.Node.SeedNode()
542-
}
543-
break
517+
gateway, err := newHTTPGateway(core.Node, authCookie, *apiConfig)
518+
if err != nil {
519+
log.Error(err)
520+
return err
544521
}
545522

546-
for err := range gwErrc {
547-
fmt.Println(err)
523+
core.Node.Service = service.New(core.Node, ctx, sqliteDB)
524+
MR := ret.NewMessageRetriever(sqliteDB, ctx, nd, core.Node.Service, 16, core.Node.SendOfflineAck)
525+
go MR.Run()
526+
core.Node.MessageRetriever = MR
527+
PR := rep.NewPointerRepublisher(nd, sqliteDB)
528+
go PR.Run()
529+
core.Node.PointerRepublisher = PR
530+
if !x.DisableWallet {
531+
MR.Wait()
532+
TL := lis.NewTransactionListener(core.Node.Datastore, core.Node.Broadcast, core.Node.Wallet.Params())
533+
wallet.AddTransactionListener(TL.OnTransactionReceived)
534+
log.Info("Starting bitcoin wallet...")
535+
go wallet.Start()
536+
}
537+
core.Node.UpdateFollow()
538+
core.Node.SeedNode()
539+
540+
// Start gateway
541+
err = gateway.Serve()
542+
if err != nil {
543+
log.Error(err)
548544
}
549545

550546
return nil
@@ -596,37 +592,40 @@ func (d *DummyListener) Close() error {
596592
}
597593

598594
// Collects options, creates listener, prints status message and starts serving requests
599-
func serveHTTPGateway(node *core.OpenBazaarNode, authCookie http.Cookie, config repo.APIConfig) (error, <-chan bool, <-chan error) {
595+
func newHTTPGateway(node *core.OpenBazaarNode, authCookie http.Cookie, config repo.APIConfig) (*api.Gateway, error) {
596+
// Get API configuration
600597
cfg, err := node.Context.GetConfig()
601598
if err != nil {
602-
return err, nil, nil
599+
return nil, err
603600
}
604601

602+
// Create a network listener
605603
gatewayMaddr, err := ma.NewMultiaddr(cfg.Addresses.Gateway)
606604
if err != nil {
607-
return fmt.Errorf("serveHTTPGateway: invalid gateway address: %q (err: %s)", cfg.Addresses.Gateway, err), nil, nil
605+
return nil, fmt.Errorf("newHTTPGateway: invalid gateway address: %q (err: %s)", cfg.Addresses.Gateway, err)
608606
}
609607
var gwLis manet.Listener
610608
if config.SSL {
611609
netAddr, err := manet.ToNetAddr(gatewayMaddr)
612610
if err != nil {
613-
return err, nil, nil
611+
return nil, err
614612
}
615613
gwLis, err = manet.WrapNetListener(&DummyListener{netAddr})
616614
if err != nil {
617-
return err, nil, nil
615+
return nil, err
618616
}
619617
} else {
620618
gwLis, err = manet.Listen(gatewayMaddr)
621619
if err != nil {
622-
return fmt.Errorf("serveHTTPGateway: manet.Listen(%s) failed: %s", gatewayMaddr, err), nil, nil
620+
return nil, fmt.Errorf("newHTTPGateway: manet.Listen(%s) failed: %s", gatewayMaddr, err)
623621
}
624622
}
623+
625624
// We might have listened to /tcp/0 - let's see what we are listing on
626625
gatewayMaddr = gwLis.Multiaddr()
627-
628626
log.Infof("Gateway/API server listening on %s\n", gatewayMaddr)
629627

628+
// Setup an options slice
630629
var opts = []corehttp.ServeOption{
631630
corehttp.MetricsCollectionOption("gateway"),
632631
corehttp.CommandsROOption(node.Context),
@@ -640,15 +639,11 @@ func serveHTTPGateway(node *core.OpenBazaarNode, authCookie http.Cookie, config
640639
}
641640

642641
if err != nil {
643-
return fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err), nil, nil
642+
return nil, fmt.Errorf("newHTTPGateway: ConstructNode() failed: %s", err)
644643
}
645-
errc := make(chan error)
646-
cb := make(chan bool)
647-
go func() {
648-
errc <- api.Serve(cb, node, node.Context, authCookie, gwLis.NetListener(), config, opts...)
649-
close(errc)
650-
}()
651-
return nil, cb, errc
644+
645+
// Create and return an API gateway
646+
return api.NewGateway(node, authCookie, gwLis.NetListener(), config, opts...)
652647
}
653648

654649
/* Returns the directory to store repo data in.

0 commit comments

Comments
 (0)