Skip to content

Commit

Permalink
Export runtime package. (#71)
Browse files Browse the repository at this point in the history
* Export internal/runtime as pkg/runtime.

* Merge internal/util/runtime into pkg/runtime.

* Refactor public runtime API.

* Add pkg/runtime/doc.go

* Rename runtime.Env.Flag to runtime.Env.Flags.

* Add unit test for Env.Options().
  • Loading branch information
lthibault authored Dec 12, 2022
1 parent c467e3c commit 529bdc9
Show file tree
Hide file tree
Showing 15 changed files with 393 additions and 256 deletions.
8 changes: 3 additions & 5 deletions internal/cmd/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import (

"github.com/lthibault/log"
"github.com/urfave/cli/v2"
"github.com/wetware/ww/internal/runtime"
runtimeutil "github.com/wetware/ww/internal/util/runtime"
"github.com/wetware/ww/pkg/client"
"github.com/wetware/ww/pkg/runtime"
"go.uber.org/fx"
)

Expand Down Expand Up @@ -67,10 +66,9 @@ func Command() *cli.Command {
func setup() cli.BeforeFunc {
return func(c *cli.Context) (err error) {
app = fx.New(
runtime.Prelude(runtimeutil.New(c)),
runtime.NewClient(c.Context, c),
fx.StartTimeout(c.Duration("timeout")),
fx.Populate(&logger, &dialer),
runtime.Client())
fx.Populate(&logger, &dialer))

ctx, cancel := context.WithTimeout(
c.Context,
Expand Down
8 changes: 3 additions & 5 deletions internal/cmd/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import (
"github.com/urfave/cli/v2"
"go.uber.org/fx"

"github.com/wetware/ww/internal/runtime"
runtimeutil "github.com/wetware/ww/internal/util/runtime"
"github.com/wetware/ww/pkg/client"
"github.com/wetware/ww/pkg/runtime"
)

var (
Expand Down Expand Up @@ -67,10 +66,9 @@ func Command() *cli.Command {
func setup() cli.BeforeFunc {
return func(c *cli.Context) (err error) {
app = fx.New(
runtime.Prelude(runtimeutil.New(c)),
runtime.NewClient(c.Context, c),
fx.StartTimeout(c.Duration("timeout")),
fx.Populate(&logger, &dialer),
runtime.Client())
fx.Populate(&logger, &dialer))

ctx, cancel := context.WithTimeout(
c.Context,
Expand Down
8 changes: 3 additions & 5 deletions internal/cmd/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (
"github.com/urfave/cli/v2"
"go.uber.org/fx"

"github.com/wetware/ww/internal/runtime"
runtimeutil "github.com/wetware/ww/internal/util/runtime"
"github.com/wetware/ww/pkg/runtime"
"github.com/wetware/ww/pkg/server"
)

Expand Down Expand Up @@ -70,9 +69,8 @@ func Command() *cli.Command {
func setup() cli.BeforeFunc {
return func(c *cli.Context) error {
app = fx.New(
runtime.Prelude(runtimeutil.New(c)),
fx.Populate(&logger, &node),
runtime.Server())
runtime.NewServer(c.Context, c),
fx.Populate(&logger, &node))

return start(c.Context, app)
}
Expand Down
26 changes: 0 additions & 26 deletions internal/runtime/client.go

This file was deleted.

42 changes: 0 additions & 42 deletions internal/runtime/server.go

This file was deleted.

53 changes: 0 additions & 53 deletions internal/util/runtime/runtime.go

This file was deleted.

68 changes: 43 additions & 25 deletions internal/runtime/discover.go → pkg/runtime/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-kad-dht/dual"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
casm "github.com/wetware/casm/pkg"
Expand All @@ -23,6 +24,26 @@ import (
"go.uber.org/fx"
)

type bootConfig struct {
fx.In

Log log.Logger
Metrics metrics.Client
Vat casm.Vat
Flag Flags
}

func (bc bootConfig) host() host.Host {
return bc.Vat.Host
}

func (bc bootConfig) metrics() bootMetrics {
return bootMetrics{
Log: bc.Log,
Metrics: bc.Metrics,
}
}

func (c Config) ClientBootstrap() fx.Option {
return fx.Provide(c.newClientDisc)
}
Expand All @@ -31,14 +52,14 @@ func (c Config) ServerBootstrap() fx.Option {
return fx.Provide(c.newServerDisc)
}

func (c Config) newServerDisc(env Env, lx fx.Lifecycle, vat casm.Vat) (d discovery.Discovery, err error) {
if env.IsSet("addr") {
d, err = boot.NewStaticAddrStrings(env.StringSlice("addr")...)
func (c Config) newServerDisc(config bootConfig, lx fx.Lifecycle) (d discovery.Discovery, err error) {
if config.Flag.IsSet("addr") {
d, err = boot.NewStaticAddrStrings(config.Flag.StringSlice("addr")...)
return
}

d, err = bootutil.ListenString(vat.Host, env.String("discover"),
socket.WithLogger(env.Log()),
d, err = bootutil.ListenString(config.host(), config.Flag.String("discover"),
socket.WithLogger(config.Log),
socket.WithRateLimiter(socket.NewPacketLimiter(256, 16)))
if c, ok := d.(io.Closer); ok {
lx.Append(closer(c))
Expand All @@ -47,22 +68,22 @@ func (c Config) newServerDisc(env Env, lx fx.Lifecycle, vat casm.Vat) (d discove
return
}

func (c Config) newClientDisc(env Env, lx fx.Lifecycle, vat casm.Vat) (d discovery.Discoverer, err error) {
if env.IsSet("addr") {
d, err = boot.NewStaticAddrStrings(env.StringSlice("addr")...)
func (c Config) newClientDisc(config bootConfig, lx fx.Lifecycle) (d discovery.Discoverer, err error) {
if config.Flag.IsSet("addr") {
d, err = boot.NewStaticAddrStrings(config.Flag.StringSlice("addr")...)
return
}

d, err = bootutil.DialString(vat.Host, env.String("discover"),
socket.WithLogger(env.Log()),
d, err = bootutil.DialString(config.host(), config.Flag.String("discover"),
socket.WithLogger(config.Log),
socket.WithRateLimiter(socket.NewPacketLimiter(256, 16)))
if c, ok := d.(io.Closer); ok {
lx.Append(closer(c))
}

return &logMetricDisc{
disc: d,
metrics: bootMetrics{env},
metrics: config.metrics(),
}, err
}

Expand All @@ -78,8 +99,7 @@ func (c Config) withPubSubDiscovery(d discovery.Discovery, config psBootConfig)
type psBootConfig struct {
fx.In

Env Env
Vat casm.Vat
Boot bootConfig
DHT *dual.DHT
Datastore ds.Batching
Lifecycle fx.Lifecycle
Expand All @@ -91,9 +111,9 @@ func (config psBootConfig) maybePeX(d discovery.Discovery, opt []pex.Option) (di
return d, nil
}

px, err := pex.New(config.Vat.Host, append([]pex.Option{
px, err := pex.New(config.Boot.host(), append([]pex.Option{
// default options for PeX
pex.WithLogger(config.Env.Log()),
pex.WithLogger(config.Boot.Log),
pex.WithDatastore(config.Datastore),
pex.WithDiscovery(d),
}, opt...)...)
Expand All @@ -110,15 +130,15 @@ func (config psBootConfig) Wrap(d discovery.Discovery) *boot.Namespace {
//
// 1. the bootstrap service, iff namespace matches cluster topic; else
// 2. the DHT-backed discovery service.
bootTopic := "floodsub:" + config.Env.String("ns")
bootTopic := "floodsub:" + config.Boot.Flag.String("ns")
match := func(ns string) bool {
return ns == bootTopic
}

target := logMetricDisc{
disc: d,
advt: d,
metrics: bootMetrics{config.Env},
metrics: config.Boot.metrics(),
}

return &boot.Namespace{
Expand Down Expand Up @@ -159,18 +179,16 @@ func (b logMetricDisc) Advertise(ctx context.Context, ns string, opt ...discover
}

type bootMetrics struct {
env interface {
Log() log.Logger
Metrics() metrics.Client
}
Log log.Logger
Metrics metrics.Client
}

func (m bootMetrics) OnFindPeers(ns string) {
m.env.Log().Debug("bootstrapping namespace")
m.env.Metrics().Incr(fmt.Sprintf("boot.%s.find_peers", ns))
m.Log.Debug("bootstrapping namespace")
m.Metrics.Incr(fmt.Sprintf("boot.%s.find_peers", ns))
}

func (m bootMetrics) OnAdvertise(ns string) {
m.env.Log().Debug("advertising namespace")
m.env.Metrics().Decr(fmt.Sprintf("boot.%s.find_peers", ns))
m.Log.Debug("advertising namespace")
m.Metrics.Decr(fmt.Sprintf("boot.%s.find_peers", ns))
}
35 changes: 35 additions & 0 deletions pkg/runtime/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Package runtime provides a high-level API for constructing Wetware
clients and servers using dependency injection.
The runtime package is intended to be used with go.uber.org/fx. As
such, it provides constructors for the fx.Option type, which can be
consumed by fx.New to create a client or server node. Refer to the
Fx documentation for information about how to consume fx.Options.
The runtime package exports three basic types:
1. runtime.Env is a configuration struct containing top-level types
required by the runtime. These types are effectful, interacting
with the host environment: loggers, metrics, contexts, configs,
etc. The Options() method exports these types to Fx.
2. runtime.Config contains options for the libp2p, CASM and Wetware
constructors required to build a node. These are passed to their
constructors lazily, via the fx.Options returned by the Client()
and Server() methods. The Options returned by either Client() or
Server() MUST be passed to fx.New() along with those returned by
runtime.Env.Options().
3. runtime.Option allows callers to pass options to libp2p, CASM or
Wetware. These are initially staged in runtime.Config, whereupon
they are passed into the fx.Options produced by the Client() and
Server() methods.
The runtime API also exports two high-level constructors for building
client and server nodes: NewClient() and NewServer(). Callers SHOULD
prefer these over manual invocation of Env.Options(), Config.Client()
and Config().Server(), as they apply sensible defaults and are easier
to use overall. The lower-level API is provided for advanced users.
*/
package runtime
Loading

0 comments on commit 529bdc9

Please sign in to comment.