diff --git a/cmd/internal/run/run.go b/cmd/internal/run/run.go index dad7f56..3d1ce9c 100644 --- a/cmd/internal/run/run.go +++ b/cmd/internal/run/run.go @@ -9,7 +9,6 @@ import ( "github.com/ipfs/kubo/client/rpc" iface "github.com/ipfs/kubo/core/coreiface" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/host" ma "github.com/multiformats/go-multiaddr" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" @@ -77,18 +76,19 @@ func run() cli.ActionFunc { defer wasi.Close(c.Context) return ww.Env{ + IPFS: ipfs, + Host: h, Cmd: system.Cmd{ - Path: c.Args().First(), - Args: c.Args().Tail(), + Args: c.Args().Slice(), Env: c.StringSlice("env"), Stdin: stdin(c), Stdout: c.App.Writer, Stderr: c.App.ErrWriter}, Net: system.Net{ - Proto: ww.Proto, + Proto: system.Proto.Unwrap(), Host: h, }, - FS: system.FS{ + FS: system.Anchor{ Ctx: c.Context, Host: h, IPFS: ipfs, @@ -97,10 +97,6 @@ func run() cli.ActionFunc { } } -type procServer struct { - Host host.Host -} - func newIPFSClient(c *cli.Context) (ipfs iface.CoreAPI, err error) { var a ma.Multiaddr if s := c.String("ipfs"); s == "local" { diff --git a/cmd/internal/serve/serve.go b/cmd/internal/serve/serve.go index a05e967..389e119 100644 --- a/cmd/internal/serve/serve.go +++ b/cmd/internal/serve/serve.go @@ -4,22 +4,20 @@ import ( "bytes" "context" "io" - "io/fs" "log/slog" "net/http" + "os" "github.com/ipfs/kubo/client/rpc" iface "github.com/ipfs/kubo/core/coreiface" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/event" ma "github.com/multiformats/go-multiaddr" "github.com/tetratelabs/wazero" - "github.com/thejerf/suture/v4" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" "github.com/urfave/cli/v2" ww "github.com/wetware/go" "github.com/wetware/go/proc" "github.com/wetware/go/system" - "github.com/wetware/go/util" ) func Command() *cli.Command { @@ -42,108 +40,70 @@ func Command() *cli.Command { Usage: "enable wasm debug symbols", }, }, - Action: func(c *cli.Context) error { - ipfs, err := newIPFSClient(c) - if err != nil { - return err - } - - h, err := libp2p.New() - if err != nil { - return err - } - defer h.Close() - - s := suture.New("ww", suture.Spec{ - EventHook: util.EventHook, - }) - - // Start a multicast DNS service that searches for local - // peers in the background. - s.Add(ww.MDNS{ - Host: h, - Handler: ww.StorePeer{Peerstore: h.Peerstore()}, - }) - - // Global compilation cache - cache := wazero.NewCompilationCache() - defer cache.Close(c.Context) - - s.Add(ww.Server{ - IPFS: ipfs, + Action: serve(), + } +} + +func serve() cli.ActionFunc { + return func(c *cli.Context) error { + ipfs, err := newIPFSClient(c) + if err != nil { + return err + } + + h, err := libp2p.New() + if err != nil { + return err + } + defer h.Close() + + // Start a multicast DNS service that searches for local + // peers in the background. + d, err := ww.MDNS{ + Host: h, + Handler: ww.StorePeer{Peerstore: h.Peerstore()}, + }.New(c.Context) + if err != nil { + return err + } + defer d.Close() + + r := wazero.NewRuntimeWithConfig(c.Context, wazero.NewRuntimeConfig(). + WithDebugInfoEnabled(c.Bool("debug")). + WithCloseOnContextDone(true)) + + wasi, err := wasi_snapshot_preview1.Instantiate(c.Context, r) + if err != nil { + return err + } + defer wasi.Close(c.Context) + + return ww.Env{ + IPFS: ipfs, + Host: h, + Cmd: system.Cmd{ + Args: c.Args().Slice(), + Env: c.StringSlice("env"), + Stdin: stdin(c), + Stdout: c.App.Writer, + Stderr: c.App.ErrWriter}, + Net: system.Net{ + Proto: system.Proto.Unwrap(), + Host: h, + Handler: system.HandlerFunc(func(ctx context.Context, p *proc.P) error { + slog.InfoContext(ctx, "process started", + "pid", p.String()) + + <-ctx.Done() + return ctx.Err() + }), + }, + FS: system.Anchor{ + Ctx: c.Context, Host: h, - Env: ww.Env{ - Cmd: system.Cmd{ - Path: c.Args().First(), - Args: c.Args().Tail(), - Env: c.StringSlice("env"), - Stdin: stdin(c), - Stdout: c.App.Writer, - Stderr: c.App.ErrWriter, - }, - Net: system.Net{ - Proto: ww.Proto, - Host: h, - Handler: system.HandlerFunc(func(ctx context.Context, p *proc.P) error { - slog.InfoContext(ctx, "process started", - "peer", h.ID(), - "proc", p.String()) - defer slog.WarnContext(ctx, "process stopped", - "peer", h.ID(), - "proc", p.String()) - <-ctx.Done() - return ctx.Err() - }), - }, - FS: system.FS{ - Ctx: c.Context, - Host: h, - IPFS: ipfs, - }, - }, - RuntimeConfig: wazero.NewRuntimeConfig(). - WithCompilationCache(cache). - WithDebugInfoEnabled(c.Bool("debug")). - WithCloseOnContextDone(true), - }) - - sub, err := h.EventBus().Subscribe([]any{ - new(event.EvtLocalAddressesUpdated)}) - if err != nil { - return err - } - defer sub.Close() - - done := s.ServeBackground(c.Context) - for { - var v any - select { - case err := <-done: - return err // exit - case v = <-sub.Out(): - // event received - } - - // Synchronous event handler - switch ev := v.(type) { - case *event.EvtLocalAddressesUpdated: - // TODO(easy): emit to libp2p topic - slog.InfoContext(c.Context, "local addresses updated", - "peer", h.ID(), - "current", ev.Current, - "removed", ev.Removed, - "diffs", ev.Diffs) - - case *event.EvtLocalProtocolsUpdated: - // TODO(easy): emit to libp2p topic - slog.InfoContext(c.Context, "local protocols updated", - "peer", h.ID(), - "added", ev.Added, - "removed", ev.Removed) - } - - } - }, + IPFS: ipfs, + }, + }.Bind(c.Context, r) } } @@ -160,29 +120,19 @@ func newIPFSClient(c *cli.Context) (ipfs iface.CoreAPI, err error) { func stdin(c *cli.Context) io.Reader { switch r := c.App.Reader.(type) { - case interface{ Len() int }: - if r.Len() <= 0 { - break - } - - return &io.LimitedReader{ - R: c.App.Reader, - N: min(int64(r.Len()), 1<<32-1), // max u32 - } - - case interface{ Stat() (fs.FileInfo, error) }: + case *os.File: info, err := r.Stat() if err != nil { - slog.Error("failed to get file info for stdin", - "reason", err) - break - } else if info.Size() <= 0 { + panic(err) + } + + if info.Size() <= 0 { break } return &io.LimitedReader{ R: c.App.Reader, - N: min(info.Size(), 1<<32-1), // max u32 + N: 1<<32 - 1, // max u32 } } diff --git a/examples/deliver/main.go b/examples/deliver/main.go index db28062..27bb725 100644 --- a/examples/deliver/main.go +++ b/examples/deliver/main.go @@ -10,7 +10,8 @@ import ( ) func main() { - if nargs := len(os.Args); nargs < 1 { + args := os.Args[1:] + if nargs := len(args); nargs < 1 { slog.Error("wrong number of arguments", "want", 1, "got", nargs, @@ -18,11 +19,11 @@ func main() { os.Exit(1) } - f, err := os.Open(os.Args[0]) + f, err := os.Open(args[0]) if err != nil { slog.Error("failed to open file", "reason", err, - "name", os.Args[0]) + "name", args[0]) os.Exit(1) } defer f.Close() diff --git a/examples/deliver/main.wasm b/examples/deliver/main.wasm index 34f0ab3..0e79bf5 100644 Binary files a/examples/deliver/main.wasm and b/examples/deliver/main.wasm differ diff --git a/examples/echo/main.go b/examples/echo/main.go index 8a5d662..c713bdd 100644 --- a/examples/echo/main.go +++ b/examples/echo/main.go @@ -10,13 +10,25 @@ import ( "os" ) +//export echo +func echo() { + if err := _echo(os.Stdout, os.Stdin); err != nil { + panic(err) + } +} + +func _echo(dst io.Writer, src io.Reader) error { + _, err := io.Copy(dst, src) + return err +} + func main() { stdin := flag.Bool("stdin", false, "read data from stdin") flag.Parse() if *stdin { - if _, err := io.Copy(os.Stdout, os.Stdin); err != nil { - slog.Error("failed echo stdin", + if err := _echo(os.Stdout, os.Stdin); err != nil { + slog.Error("failed to echo stdin", "reason", err) os.Exit(1) } @@ -34,10 +46,3 @@ func main() { } } } - -//export echo -func echo() { - if _, err := io.Copy(os.Stdout, os.Stdin); err != nil { - panic(err) - } -} diff --git a/examples/echo/main.wasm b/examples/echo/main.wasm index bf55504..2dab042 100644 Binary files a/examples/echo/main.wasm and b/examples/echo/main.wasm differ diff --git a/go.mod b/go.mod index 1cc9e9a..e227c2d 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,6 @@ require ( github.com/tetratelabs/wazero v1.8.1 github.com/thejerf/suture/v4 v4.0.5 github.com/urfave/cli/v2 v2.27.3 - golang.org/x/sync v0.8.0 ) require ( @@ -166,6 +165,7 @@ require ( golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.19.0 // indirect golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect golang.org/x/tools v0.23.0 // indirect diff --git a/proc/event.go b/proc/event.go new file mode 100644 index 0000000..3600b24 --- /dev/null +++ b/proc/event.go @@ -0,0 +1,3 @@ +package proc + +type EvtStarted struct{ *P } diff --git a/proc/handler.go b/proc/handler.go deleted file mode 100644 index c4f888a..0000000 --- a/proc/handler.go +++ /dev/null @@ -1,134 +0,0 @@ -//go:generate capnp compile -I $GOPATH/src/capnproto.org/go/capnp/std -ogo proc.capnp - -package proc - -import ( - "bytes" - "context" - "errors" - "io" - "log/slog" - "path" - "strings" - "time" - - "capnproto.org/go/capnp/v3" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/protocol" - protoutils "github.com/wetware/go/util/proto" - "golang.org/x/sync/semaphore" -) - -const DefaultMessageReadTimeout = time.Second * 30 -const DefaultMaxMessageSize = 1024 << 10 // 1MB. Arbitrary. Can grow. - -type StreamHandler struct { - protoutils.VersionedID - Proc *P - MessageReadTimeout time.Duration - MaxMessageSize uint32 -} - -func (h StreamHandler) String() string { - parent := h.VersionedID.String() - child := h.Proc.String() - return path.Join(parent, child) -} - -func (h StreamHandler) Proto() protocol.ID { - pid := protocol.ID(h.Proc.Mod.Name()) - return protoutils.Join(h.Unwrap(), "proc", pid) -} - -func (h StreamHandler) Match(id protocol.ID) bool { - prefix := string(h.Proto()) - return strings.HasPrefix(string(id), prefix) -} - -func (h StreamHandler) Bind(ctx context.Context) network.StreamHandler { - if h.MessageReadTimeout <= 0 { - h.MessageReadTimeout = DefaultMessageReadTimeout - } - - if h.MaxMessageSize == 0 { - h.MaxMessageSize = DefaultMaxMessageSize - } - - var ( - // Use a weighted semaphor as a mutex because it allows asynchronous - // acquires with ctx. FIFO message-processing is a nice side-benefit. - mu = semaphore.NewWeighted(1) - - rd = io.LimitedReader{N: int64(h.MaxMessageSize)} - buf bytes.Buffer - ) - return func(s network.Stream) { - defer s.Close() - defer buf.Reset() - - ctx, cancel := context.WithTimeout(ctx, h.MessageReadTimeout) - defer cancel() - - if err := mu.Acquire(ctx, 1); err != nil { - slog.DebugContext(ctx, "closing stream", - "reason", err, - "stream", s.ID()) - return - } - defer mu.Release(1) - - d := time.Now().Add(h.MessageReadTimeout) - if err := s.SetReadDeadline(d); err != nil { - slog.ErrorContext(ctx, "failed to set delivery deadline", - "reason", err, - "stream", s.ID()) - return - } - - // Read the message into a local buffer - n, err := io.Copy(&buf, &rd) - if err != nil { - slog.ErrorContext(ctx, "failed to read message", - "reason", err, - "stream", s.ID(), - "n_bytes", n) - return - } else if n > (1<<32 - 1) { // max uint32 - slog.ErrorContext(ctx, "failed to read message", - "reason", errors.New("size overflows u32"), - "stream", s.ID(), - "n_bytes", n) - return - } - - // Unmarshal the method call from the buffer - m, err := capnp.Unmarshal(buf.Bytes()) - if err != nil { - slog.ErrorContext(ctx, "failed to unmarshal capnp message", - "reason", err, - "stream", s.ID()) - return - } - defer m.Release() - - call, err := ReadRootMethodCall(m) - if err != nil { - slog.ErrorContext(ctx, "failed to read root method call", - "reason", err, - "stream", s.ID()) - return - } - - // copy the stream to the process' mailbox - err = h.Proc.Deliver(ctx, call) - if errors.Is(err, context.DeadlineExceeded) { - slog.DebugContext(ctx, "closing stream", - "reason", err, - "stream", s.ID()) - } else if err != nil { - slog.ErrorContext(ctx, "message delivery failed", - "reason", err, - "stream", s.ID()) - } - } -} diff --git a/proc/proc.go b/proc/proc.go index 58d75b7..6bb0406 100644 --- a/proc/proc.go +++ b/proc/proc.go @@ -6,6 +6,7 @@ import ( "crypto/rand" "errors" "io" + "io/fs" "log/slog" "runtime" "strings" @@ -17,9 +18,9 @@ import ( ) type Command struct { - Path string Args, Env []string Stdout, Stderr io.Writer + FS fs.FS } func (cmd Command) Instantiate(ctx context.Context, r wazero.Runtime, cm wazero.CompiledModule) (*P, error) { @@ -35,6 +36,7 @@ func (cmd Command) Instantiate(ctx context.Context, r wazero.Runtime, cm wazero. WithStdout(cmd.Stdout). WithStderr(cmd.Stderr). WithEnv("WW_PID", pid). + WithFS(cmd.FS). WithRandSource(rand.Reader). WithOsyield(runtime.Gosched). WithSysNanosleep(). diff --git a/server.go b/server.go index 436c7bb..a575c92 100644 --- a/server.go +++ b/server.go @@ -4,22 +4,19 @@ import ( "context" "path" - iface "github.com/ipfs/kubo/core/coreiface" - "github.com/libp2p/go-libp2p/core/host" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + "github.com/wetware/go/system" ) type Server struct { - IPFS iface.CoreAPI - Host host.Host Env Env RuntimeConfig wazero.RuntimeConfig } func (s Server) String() string { - peer := string(s.Host.ID()) - return path.Join("/p2p", peer, Proto.String()) + peer := string(s.Env.Host.ID()) + return path.Join("/p2p", peer, system.Proto.String()) } func (s Server) Serve(ctx context.Context) error { diff --git a/system/anchor.go b/system/anchor.go new file mode 100644 index 0000000..ca03d36 --- /dev/null +++ b/system/anchor.go @@ -0,0 +1,85 @@ +package system + +import ( + "context" + "io/fs" + "strings" + "time" + + iface "github.com/ipfs/kubo/core/coreiface" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" +) + +var _ fs.FS = (*Anchor)(nil) + +type Anchor struct { + Ctx context.Context + IPFS iface.CoreAPI + Host host.Host +} + +func (root Anchor) Open(name string) (f fs.File, err error) { + switch { + case strings.HasPrefix(name, "/p2p/"): + return HostNode{ + Ctx: root.Ctx, + Host: root.Host, + }.Open(name) + + case strings.HasPrefix(name, "/ipfs/"): + return UnixFS{ + Ctx: root.Ctx, + Unix: root.IPFS.Unixfs(), + }.Open(name) + + default: + return nil, fs.ErrNotExist + } +} + +var _ fs.File = (*StreamNode)(nil) + +type StreamNode struct { + network.Stream +} + +// fs.File methods +//// + +func (s StreamNode) Stat() (fs.FileInfo, error) { + return s, nil +} + +// fs.FileInfo methods +//// + +// base name of the file +func (s StreamNode) Name() string { + return s.Stream.ID() +} + +// length in bytes for regular files; system-dependent for others +func (s StreamNode) Size() int64 { + return 0 +} + +// file mode bits +func (s StreamNode) Mode() fs.FileMode { + return 0 +} + +// modification time +func (s StreamNode) ModTime() time.Time { + return time.Time{} +} + +// abbreviation for Mode().IsDir() +func (s StreamNode) IsDir() bool { + return false +} + +// underlying data source (can return nil) +func (s StreamNode) Sys() any { + return s.Stream +} diff --git a/system/cmd.go b/system/cmd.go index b04fc36..f136df9 100644 --- a/system/cmd.go +++ b/system/cmd.go @@ -3,7 +3,6 @@ package system import "io" type Cmd struct { - Path string Stdin io.Reader Stdout, Stderr io.Writer Args, Env []string diff --git a/system/fs.go b/system/fs.go deleted file mode 100644 index eb78826..0000000 --- a/system/fs.go +++ /dev/null @@ -1,62 +0,0 @@ -package system - -import ( - "context" - "io/fs" - "strings" - - "github.com/ipfs/boxo/files" - "github.com/ipfs/boxo/path" - iface "github.com/ipfs/kubo/core/coreiface" - "github.com/libp2p/go-libp2p/core/host" - "github.com/pkg/errors" -) - -var _ fs.FS = (*FS)(nil) - -type FS struct { - Ctx context.Context - Host host.Host - IPFS iface.CoreAPI -} - -func (sys FS) Open(name string) (fs.File, error) { - switch { - case strings.HasPrefix(name, "/p2p/"): - return nil, &fs.PathError{ - Op: "open", - Path: name, - Err: errors.New("TODO"), // TODO(soon) - } - - case strings.HasPrefix(name, "/ipfs/"): - p, err := path.NewPath(name) - if err != nil { - return nil, err - } - - n, err := sys.OpenUnix(sys.Ctx, p) - if err != nil { - return nil, &fs.PathError{ - Op: "open", - Path: name, - Err: err, - } - } - - return &UnixNode{ - Path: p, - Node: n, - }, nil - } - - return nil, &fs.PathError{ - Op: "open", - Path: name, - Err: fs.ErrNotExist, - } -} - -func (sys FS) OpenUnix(ctx context.Context, p path.Path) (files.Node, error) { - return sys.IPFS.Unixfs().Get(ctx, p) -} diff --git a/system/net.go b/system/net.go index 93eb854..25bfbac 100644 --- a/system/net.go +++ b/system/net.go @@ -2,57 +2,111 @@ package system import ( "context" + "io" + "io/fs" "log/slog" + "strings" + "capnproto.org/go/capnp/v3" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/wetware/go/proc" - protoutils "github.com/wetware/go/util/proto" ) +type Handler interface { + ServeProc(context.Context, *proc.P) error +} + type Net struct { - Proto protoutils.VersionedID + Proto protocol.ID Host host.Host Handler } -func (n Net) Bind(ctx context.Context, p *proc.P) ReleaseFunc { - handler := proc.StreamHandler{ - VersionedID: n.Proto, - Proc: p} - proto := handler.Proto() - pid := handler.String() - peer := n.Host.ID() - - n.Host.SetStreamHandlerMatch( - proto, - handler.Match, - handler.Bind(ctx)) - slog.DebugContext(ctx, "attached process stream handlers", - "peer", peer, - "proto", proto, - "proc", pid) - return func() { - n.Host.RemoveStreamHandler(proto) - slog.DebugContext(ctx, "detached process stream handlers", - "peer", peer, - "proto", proto, - "proc", pid) +func (n Net) Match(id protocol.ID) bool { + proto := strings.TrimPrefix(string(id), string(n.Proto)) + return strings.HasPrefix(proto, "/proc/") +} + +func (n Net) Bind(ctx context.Context, p *proc.P) network.StreamHandler { + log := slog.Default().With( + "peer", n.Host.ID(), + "pid", p.String()) + + return func(s network.Stream) { + defer s.Close() + + // TODO: handle context deadline + + if call, err := ReadCall(s); err != nil { + log.ErrorContext(ctx, "failed to read method call", + "reason", err) + } else if err := p.Deliver(ctx, call); err != nil { + log.ErrorContext(ctx, "failed to deliver method call", + "reason", err) + } } } -func (n Net) ServeProc(ctx context.Context, p *proc.P) (err error) { - if n.Handler != nil { - err = n.Handler.ServeProc(ctx, p) +func (n Net) ServeProc(ctx context.Context, p *proc.P) error { + if n.Handler == nil { + return nil } - return + + return n.Handler.ServeProc(ctx, p) } -type Handler interface { - ServeProc(ctx context.Context, p *proc.P) error +func ReadCall(r io.Reader) (proc.MethodCall, error) { + b, err := io.ReadAll(r) + if err != nil { + return proc.MethodCall{}, err + } + + m, err := capnp.Unmarshal(b) + if err != nil { + return proc.MethodCall{}, err + } + + return proc.ReadRootMethodCall(m) +} + +// HostNode allows +type HostNode struct { + Ctx context.Context + Host host.Host +} + +func (h HostNode) Open(name string) (fs.File, error) { + path, err := NewPath(name) + if err != nil { + return nil, err + } + + return h.Walk(h.Ctx, path) +} + +func (h HostNode) Walk(ctx context.Context, p Path) (fs.File, error) { + /* + Example path: /ww/0.1.0/proc/ + */ + + id, err := p.Peer() + if err != nil { + return nil, err + } + + proto, err := p.Proto() + if err != nil { + return nil, err + } + + s, err := h.Host.NewStream(ctx, id, proto) + return StreamNode{Stream: s}, err } type HandlerFunc func(context.Context, *proc.P) error -func (handle HandlerFunc) ServeProc(ctx context.Context, p *proc.P) error { - return handle(ctx, p) +func (serve HandlerFunc) ServeProc(ctx context.Context, p *proc.P) error { + return serve(ctx, p) } diff --git a/system/path.go b/system/path.go new file mode 100644 index 0000000..a5412fd --- /dev/null +++ b/system/path.go @@ -0,0 +1,120 @@ +package system + +import ( + "fmt" + + "github.com/blang/semver/v4" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/mr-tron/base58/base58" + ma "github.com/multiformats/go-multiaddr" +) + +const ( + // TODO: go back and pick good values for these + P_WW = 9001 + iota + P_PID +) + +var ( + protoWW = ma.Protocol{ + Name: "ww", + Code: P_WW, + VCode: ma.CodeToVarint(P_WW), + Size: ma.LengthPrefixedVarSize, // ww/ + Transcoder: WwTranscoder{}, + } + protoPID = ma.Protocol{ + Name: "pid", + Code: P_PID, + VCode: ma.CodeToVarint(P_PID), + Size: 20, // 160bit PID + Transcoder: PidTranscoder{}, + } +) + +type WwTranscoder struct{} + +// Validates and encodes to bytes a multiaddr that's in the string representation. +func (t WwTranscoder) StringToBytes(s string) ([]byte, error) { + return []byte(s), nil +} + +// Validates and decodes to a string a multiaddr that's in the bytes representation. +func (t WwTranscoder) BytesToString(b []byte) (string, error) { + return string(b), nil +} + +// Validates bytes when parsing a multiaddr that's already in the bytes representation. +func (t WwTranscoder) ValidateBytes(b []byte) error { + return nil +} + +type PidTranscoder struct{} + +// Validates and encodes to bytes a multiaddr that's in the string representation. +func (t PidTranscoder) StringToBytes(s string) ([]byte, error) { + return base58.FastBase58Decoding(s) +} + +// Validates and decodes to a string a multiaddr that's in the bytes representation. +func (t PidTranscoder) BytesToString(b []byte) (string, error) { + return base58.FastBase58Encoding(b), nil +} + +// Validates bytes when parsing a multiaddr that's already in the bytes representation. +func (t PidTranscoder) ValidateBytes(b []byte) error { + if size := len(b); size != 20 { + return fmt.Errorf("expected 20byte PID, got %d", size) + } + + return nil +} + +func init() { + for _, p := range []ma.Protocol{ + protoWW, + protoPID, + } { + if err := ma.AddProtocol(p); err != nil { + panic(err) + } + } +} + +type Path struct { + ma.Multiaddr +} + +func NewPath(name string) (p Path, err error) { + p.Multiaddr, err = ma.NewMultiaddr(name) + return +} + +func (p Path) Version() (semver.Version, error) { + s, err := p.ValueForProtocol(P_WW) + if err != nil { + return semver.Version{}, err + } + + return semver.Parse(s) +} + +func (p Path) Peer() (peer.ID, error) { + id, err := p.ValueForProtocol(ma.P_P2P) + if err != nil { + return "", err + } + + return peer.Decode(id) +} + +func (p Path) Proto() (protocol.ID, error) { + s, err := p.ValueForProtocol(P_PID) + if err != nil { + return "", err + } + + proto := "/proc/" + s + return protocol.ID(proto), nil +} diff --git a/system/system.go b/system/system.go index b5a3171..a5f46e5 100644 --- a/system/system.go +++ b/system/system.go @@ -1,6 +1,16 @@ package system -type ReleaseFunc func() +import ( + "github.com/blang/semver/v4" + protoutils "github.com/wetware/go/util/proto" +) + +const Version = "0.1.0" + +var Proto = protoutils.VersionedID{ + ID: "ww", + Version: semver.MustParse(Version), +} type ExitError interface { error diff --git a/system/ipfs.go b/system/unix.go similarity index 95% rename from system/ipfs.go rename to system/unix.go index 3e12f9b..e50c0b3 100644 --- a/system/ipfs.go +++ b/system/unix.go @@ -15,17 +15,17 @@ import ( "github.com/wetware/go/util" ) -var _ fs.FS = (*IPFS)(nil) +var _ fs.FS = (*UnixFS)(nil) -// An IPFS provides access to a hierarchical file system. +// An UnixFS provides access to a hierarchical file system. // -// The IPFS interface is the minimum implementation required of the file system. +// The UnixFS interface is the minimum implementation required of the file system. // A file system may implement additional interfaces, // such as [ReadFileFS], to provide additional or optimized functionality. // -// [testing/fstest.TestFS] may be used to test implementations of an IPFS for +// [testing/fstest.TestFS] may be used to test implementations of an UnixFS for // correctness. -type IPFS struct { +type UnixFS struct { Ctx context.Context Unix iface.UnixfsAPI } @@ -39,7 +39,7 @@ type IPFS struct { // Open should reject attempts to open names that do not satisfy // fs.ValidPath(name), returning a *fs.PathError with Err set to // fs.ErrInvalid or fs.ErrNotExist. -func (f IPFS) Open(name string) (fs.File, error) { +func (f UnixFS) Open(name string) (fs.File, error) { path, node, err := util.Resolve(f.Ctx, f.Unix, name) if err != nil { return nil, &fs.PathError{ diff --git a/system/ipfs_test.go b/system/unix_test.go similarity index 100% rename from system/ipfs_test.go rename to system/unix_test.go diff --git a/ww.go b/ww.go index 930e5cf..594c558 100644 --- a/ww.go +++ b/ww.go @@ -3,35 +3,32 @@ package ww import ( "context" "errors" + "fmt" "io" + "log/slog" "path/filepath" "capnproto.org/go/capnp/v3" - "github.com/blang/semver/v4" "github.com/ipfs/boxo/files" "github.com/ipfs/boxo/path" + iface "github.com/ipfs/kubo/core/coreiface" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/tetratelabs/wazero" "github.com/wetware/go/proc" "github.com/wetware/go/system" - - protoutils "github.com/wetware/go/util/proto" ) -const Version = "0.1.0" - -var Proto = protoutils.VersionedID{ - ID: "ww", - Version: semver.MustParse(Version), -} - type Env struct { - Cmd system.Cmd - Net system.Net - FS system.FS + IPFS iface.CoreAPI + Host host.Host + Cmd system.Cmd + Net system.Net + FS system.Anchor } func (env Env) Bind(ctx context.Context, r wazero.Runtime) error { - cm, err := env.LoadAndCompile(ctx, r, env.Cmd.Path) // FIXME: panic if len(args)=0 + cm, err := env.LoadAndCompile(ctx, r, env.Cmd.Args[0]) // FIXME: panic if len(args)=0 if err != nil { return err } @@ -43,8 +40,13 @@ func (env Env) Bind(ctx context.Context, r wazero.Runtime) error { } defer p.Close(ctx) - release := env.Net.Bind(ctx, p) - defer release() + // Bind libp2p streams that allow remote peers to send + // messages to p. + env.Host.SetStreamHandlerMatch(env.ProtoFor(p), + env.Net.Match, + env.Net.Bind(ctx, p)) + defer env.Host.RemoveStreamHandler(env.ProtoFor(p)) + slog.DebugContext(ctx, "attached process stream handlers") // Call main() function (alias _start method) m, seg, err := capnp.NewMessage(capnp.SingleSegment(nil)) @@ -67,9 +69,14 @@ func (env Env) Bind(ctx context.Context, r wazero.Runtime) error { } err = p.Deliver(ctx, call) - if e, ok := err.(system.ExitError); ok && e.ExitCode() != 0 { - return err - } else if err != nil { + switch e := err.(type) { + case system.ExitError: + if e.ExitCode() == 0 { + err = nil + } + } + + if err != nil { return err } @@ -78,11 +85,11 @@ func (env Env) Bind(ctx context.Context, r wazero.Runtime) error { func (env Env) Instantiate(ctx context.Context, r wazero.Runtime, cm wazero.CompiledModule) (*proc.P, error) { return proc.Command{ - Path: env.Cmd.Path, Args: env.Cmd.Args, Env: env.Cmd.Env, Stdout: env.Cmd.Stdout, Stderr: env.Cmd.Stderr, + FS: env.FS, }.Instantiate(ctx, r, cm) } @@ -92,7 +99,7 @@ func (env Env) LoadAndCompile(ctx context.Context, r wazero.Runtime, name string return nil, err } - n, err := env.FS.OpenUnix(ctx, p) + n, err := env.OpenUnix(ctx, p) if err != nil { return nil, err } @@ -118,3 +125,13 @@ func (env Env) LoadAndCompile(ctx context.Context, r wazero.Runtime, name string return nil, errors.New("binary not found") } + +func (env Env) OpenUnix(ctx context.Context, p path.Path) (files.Node, error) { + root := env.IPFS.Unixfs() + return root.Get(ctx, p) +} + +func (env Env) ProtoFor(pid fmt.Stringer) protocol.ID { + proto := filepath.Join(system.Proto.String(), "proc", pid.String()) + return protocol.ID(proto) +}