Skip to content

Commit

Permalink
Add support for detached processes.
Browse files Browse the repository at this point in the history
  • Loading branch information
lthibault committed Nov 15, 2024
1 parent 17eb004 commit 1d8193b
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 240 deletions.
152 changes: 93 additions & 59 deletions cmd/internal/run/run.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package run

import (
"bytes"
"context"
"io"
"log/slog"
"net/http"

"github.com/ipfs/boxo/path"
"github.com/ipfs/kubo/client/rpc"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
ma "github.com/multiformats/go-multiaddr"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
"github.com/urfave/cli/v2"
ww "github.com/wetware/go"
"github.com/wetware/go/proc"
"github.com/wetware/go/system"
)

Expand Down Expand Up @@ -43,12 +47,10 @@ func Command() *cli.Command {
EnvVars: []string{"WW_DEBUG"},
Usage: "enable WASM debug values",
},
&cli.BoolFlag{
Name: "interactive",
Aliases: []string{"i"},
EnvVars: []string{"WW_INTERACTIVE"},
Usage: "bind to process stdio",
},
// &cli.BoolFlag{
// Name: "listen",
// Usage: "serve network calls after main() exits",
// },
},
Action: run(),
}
Expand All @@ -68,83 +70,115 @@ func run() cli.ActionFunc {
defer h.Close()

// Create an mDNS service to discover peers on the local network
d := mdns.NewMdnsService(h, "ww.local", peerStorer{Peerstore: h.Peerstore()})
peerHook := system.StorePeer{Peerstore: h.Peerstore()}
d := mdns.NewMdnsService(h, "ww.local", peerHook)
if err := d.Start(); err != nil {
return err
}
defer d.Close()

// Set up WASM runtime and host modules
r := wazero.NewRuntimeWithConfig(c.Context, wazero.NewRuntimeConfig().
// WithCompilationCache().
WithDebugInfoEnabled(c.Bool("debug")).
WithCloseOnContextDone(true))
defer r.Close(c.Context)

root, err := path.NewPath(c.Args().First())
wasi, err := wasi_snapshot_preview1.Instantiate(c.Context, r)
if err != nil {
return err
}

unixFS := system.IPFS{
Ctx: c.Context,
Unix: ipfs.Unixfs(),
}
defer wasi.Close(c.Context)

return ww.Env{
Args: c.Args().Slice(),
Vars: c.StringSlice("env"),
Stdin: maybeStdin(c),
Stdout: maybeStdout(c),
Stderr: maybeStderr(c),
Host: h,
Runtime: r,
Root: root.String(),
FS: unixFS,
}.Serve(c.Context)
IO: system.IO{
Args: c.Args().Slice(),
Env: c.StringSlice("env"),
Stdin: stdin(c),
Stdout: c.App.Writer,
Stderr: c.App.ErrWriter,
},
Net: system.Net{
Host: h,
Handler: handler(c, h),
},
FS: system.IPFS{
Ctx: c.Context,
Unix: ipfs.Unixfs(),
},
}.Bind(c.Context, r)
}
}

func newIPFSClient(c *cli.Context) (ipfs iface.CoreAPI, err error) {
var a ma.Multiaddr
if s := c.String("ipfs"); s == "local" {
ipfs, err = rpc.NewLocalApi()
} else if a, err = ma.NewMultiaddr(s); err == nil {
ipfs, err = rpc.NewApiWithClient(a, http.DefaultClient)
func stdin(c *cli.Context) io.Reader {
if c.IsSet("stdin") && c.Bool("stdin") {
return c.App.Reader
}

return
}

type peerStorer struct {
peerstore.Peerstore
}

func (s peerStorer) HandlePeerFound(info peer.AddrInfo) {
for _, addr := range info.Addrs {
s.AddAddr(info.ID, addr, peerstore.AddressTTL) // assume a dynamic environment
}
return emptyReader
}

func maybeStdin(c *cli.Context) io.Reader {
if c.Bool("interactive") {
return c.App.Reader // TODO: wrap with libreadline
}

return nil
}
var emptyReader io.Reader = &bytes.Reader{}

func maybeStdout(c *cli.Context) io.Writer {
if c.Bool("interactive") {
return c.App.Writer
func handler(c *cli.Context, h host.Host) system.HandlerFunc {
return func(ctx context.Context, p *proc.P) error {
sub, err := h.EventBus().Subscribe([]any{
new(event.EvtLocalAddressesUpdated),
new(event.EvtLocalProtocolsUpdated)})
if err != nil {
return err
}
defer sub.Close()

// asynchronous event loop
slog.InfoContext(ctx, "wetware started",
"peer", h.ID(),
"path", c.Args().First(),
"proc", p.String(),
"proto", ww.Proto.String())
defer slog.WarnContext(ctx, "wetware stopped",
"peer", h.ID(),
"path", c.Args().First(),
"proc", p.String(),
"proto", ww.Proto.String())

for {
var v any
select {
case <-ctx.Done():
return ctx.Err()

case v = <-sub.Out():
// current event is assigned to v

switch ev := v.(type) {
case *event.EvtLocalAddressesUpdated:
// TODO(easy): emit to libp2p topic
slog.InfoContext(ctx, "local addresses updated",
"peer", h.ID(),
"current", ev.Current,
"removed", ev.Removed,
"diffs", ev.Diffs)

case *event.EvtLocalProtocolsUpdated:
// TODO(easy): emit to libp2p topic
slog.InfoContext(ctx, "local protocols updated",
"peer", h.ID(),
"added", ev.Added,
"removed", ev.Removed)
}
}
}
}

return io.Discard // TODO: handle stdout
}

func maybeStderr(c *cli.Context) io.Writer {
if c.Bool("interactive") {
return c.App.ErrWriter
func newIPFSClient(c *cli.Context) (ipfs iface.CoreAPI, err error) {
var a ma.Multiaddr
if s := c.String("ipfs"); s == "local" {
ipfs, err = rpc.NewLocalApi()
} else if a, err = ma.NewMultiaddr(s); err == nil {
ipfs, err = rpc.NewApiWithClient(a, http.DefaultClient)
}

return io.Discard // TODO: handle stderr
return
}
45 changes: 41 additions & 4 deletions cmd/ww/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ func main() {
Copyright: "2020 The Wetware Project",
Before: setup,
// DefaultCommand: "shell",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "json",
EnvVars: []string{"WW_JSON"},
Usage: "output json logs",
},
&cli.StringFlag{
Name: "loglvl",
EnvVars: []string{"WW_LOGLVL"},
Value: "info",
Usage: "logging level: debug, info, warn, error",
},
},
Commands: []*cli.Command{
run.Command(),
export.Command(),
Expand All @@ -39,10 +52,34 @@ func main() {
}

func setup(c *cli.Context) error {
slog.SetDefault(slog.New(tint.NewHandler(c.App.ErrWriter, &tint.Options{
Level: slog.LevelDebug,
slog.SetDefault(slog.New(logger(c)))
return nil
}

func logger(c *cli.Context) slog.Handler {
// For robots?
if c.Bool("json") {
return slog.NewJSONHandler(c.App.ErrWriter, &slog.HandlerOptions{
Level: loglvl(c),
})
}

// For people
return tint.NewHandler(c.App.ErrWriter, &tint.Options{
Level: loglvl(c),
TimeFormat: time.Kitchen,
})))
})
}

return nil
func loglvl(c *cli.Context) slog.Leveler {
switch c.String("loglvl") {
case "debug":
return slog.LevelDebug
case "warn":
return slog.LevelWarn
case "error":
return slog.LevelError
}

return slog.LevelInfo
}
17 changes: 16 additions & 1 deletion examples/echo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"flag"
"io"
"os"
)
Expand All @@ -14,4 +15,18 @@ func echo() {
}
}

func main() {}
func main() {
stdin := flag.Bool("stdin", false, "read from standard input")
serve := flag.Bool("serve", false, "handle async method calls")
flag.Parse()

if *stdin {
echo()
}

if *serve {
// Signal to caller that this module is ready to handle
// incoming method calls.
os.Exit(0x00ff0000)
}
}
Binary file modified examples/echo/main.wasm
Binary file not shown.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/ipfs/kubo v0.31.0
github.com/libp2p/go-libp2p v0.36.5
github.com/lmittmann/tint v1.0.4
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.13.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -100,7 +101,6 @@ require (
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions proc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func (h StreamHandler) String() string {
}

func (h StreamHandler) Proto() protocol.ID {
name := h.Proc.Mod.Name() // process ID
return h.VersionedID.WithChild(name).Unwrap() // append
pid := protocol.ID(h.Proc.Mod.Name())
return protoutils.Join(h.Unwrap(), "proc", pid)
}

func (h StreamHandler) Match(id protocol.ID) bool {
Expand Down
19 changes: 17 additions & 2 deletions proc/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package proc

import (
"crypto/rand"
"encoding/hex"
"io"

"github.com/libp2p/go-libp2p/core/protocol"
"github.com/mr-tron/base58/base58"
)

type PID [20]byte // 160bit opaque identifier
Expand All @@ -26,6 +28,19 @@ func ReadPID(r io.Reader) (pid PID, err error) {
return
}

func ParsePID(s string) (pid PID, err error) {
var buf []byte
if buf, err = base58.FastBase58Decoding(s); err == nil {
copy(pid[:], buf)
}
return
}

func (pid PID) String() string {
return hex.EncodeToString(pid[:])
return base58.FastBase58Encoding(pid[:])
}

func (pid PID) Proto() protocol.ID {
proto := pid.String()
return protocol.ID(proto)
}
Loading

0 comments on commit 1d8193b

Please sign in to comment.