diff --git a/examples/deliver/main.go b/examples/deliver/main.go index 2076866..8fd5b67 100644 --- a/examples/deliver/main.go +++ b/examples/deliver/main.go @@ -1,51 +1,103 @@ -//go:generate tinygo build -o main.wasm -target=wasi -scheduler=none main.go +// //go:generate tinygo build -o main.wasm -target=wasi -scheduler=none main.go package main -import ( - "context" - "io" - "log/slog" - "os" - - "github.com/urfave/cli/v2" -) - -func main() { - ctx := context.TODO() - - app := &cli.App{ - Name: "deliver", - Usage: "read message from stdn and send to `PID`", - ArgsUsage: "", - // Flags: []cli.Flag{}, - Action: deliver, - } - - if err := app.RunContext(ctx, os.Args); err != nil { - slog.ErrorContext(ctx, "application failed", - "reason", err) - os.Exit(1) - } -} - -func deliver(c *cli.Context) error { - name := c.Args().First() - f, err := os.Open(name) - if err != nil { - return err - } - defer f.Close() - - r := io.LimitReader(c.App.Reader, 1<<32-1) // max unit32 - - if n, err := io.Copy(f, r); err != nil { - return err - } else { - slog.DebugContext(c.Context, "delivered message", - "size", n, - "dest", name) - } - - return nil -} +// import ( +// "bytes" +// "context" +// "io" +// "log/slog" +// "os" + +// "capnproto.org/go/capnp/v3" +// "github.com/urfave/cli/v2" +// "github.com/wetware/go/proc" +// ) + +// func main() { +// ctx := context.TODO() + +// app := &cli.App{ +// Name: "deliver", +// Usage: "read message from stdn and send to `PID`", +// ArgsUsage: "", +// Flags: []cli.Flag{ +// &cli.StringFlag{ +// Name: "method", +// Aliases: []string{"m"}, +// Usage: "method name", +// }, +// &cli.Uint64SliceFlag{ +// Name: "push", +// Aliases: []string{"p"}, +// Usage: "push u64 onto stack", +// }, +// }, +// Action: deliver, +// } + +// if err := app.RunContext(ctx, os.Args); err != nil { +// slog.ErrorContext(ctx, "application failed", +// "reason", err) +// os.Exit(1) +// } +// } + +// func deliver(c *cli.Context) error { +// name := c.Args().First() +// f, err := os.Open(name) +// if err != nil { +// return err +// } +// defer f.Close() + +// m, seg := capnp.NewSingleSegmentMessage(nil) +// defer m.Release() + +// call, err := proc.NewRootMethodCall(seg) +// if err != nil { +// return err +// } + +// err = call.SetName(c.String("method")) +// if err != nil { +// return err +// } + +// stack := c.Uint64Slice("stack") +// size := int32(len(stack)) + +// callStack, err := call.NewStack(size) +// if err != nil { +// return err +// } + +// for i, word := range stack { +// callStack.Set(i, word) +// } + +// r := io.LimitReader(c.App.Reader, 1<<32-1) // max u32 +// data, err := io.ReadAll(r) +// if err != nil { +// return err +// } + +// if err = call.SetCallData(data); err != nil { +// return err +// } + +// b, err := m.Marshal() +// if err != nil { +// return err +// } + +// n, err := io.Copy(f, bytes.NewReader(b)) +// if err != nil { +// return err +// } + +// slog.DebugContext(c.Context, "delivered message", +// "size", n, +// "dest", name) +// return nil +// } diff --git a/examples/deliver/main.wasm b/examples/deliver/main.wasm deleted file mode 100644 index f6ef214..0000000 Binary files a/examples/deliver/main.wasm and /dev/null differ diff --git a/proc/proc.go b/proc/proc.go index 7d94308..40fb7ce 100644 --- a/proc/proc.go +++ b/proc/proc.go @@ -24,11 +24,13 @@ type Config struct { } func (cfg Config) Bind(ctx context.Context, p *P) (err error) { - pid := NewPID() - proto := path.Join(string(cfg.Proto), pid.String()) // /ww/0.1.0/ + // /ww/0.1.0/ + proto := path.Join( + string(cfg.Proto), // /ww/ + NewPID().String()) // mc := wazero.NewModuleConfig(). - WithName(pid.String()). + WithName(proto). WithArgs(cfg.Args...). WithStdin(&p.mailbox). WithStdout(cfg.Stdout). diff --git a/util/proto/proto.go b/util/proto/proto.go index 7dec1ee..f950af7 100644 --- a/util/proto/proto.go +++ b/util/proto/proto.go @@ -15,7 +15,7 @@ type VersionedID struct { func (v VersionedID) String() string { proto := string(v.ID) version := v.Version.String() - return path.Join(proto, version) + return path.Join("/", proto, version) } func (v VersionedID) Unwrap() protocol.ID { diff --git a/ww.go b/ww.go index f5326ec..4746fa6 100644 --- a/ww.go +++ b/ww.go @@ -4,9 +4,12 @@ import ( "context" "io" "io/fs" + "log/slog" + "time" "capnproto.org/go/capnp/v3" "github.com/blang/semver/v4" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" @@ -78,15 +81,56 @@ func (env Env) CompileAndServe(ctx context.Context, bytecode []byte) error { } defer p.Close(ctx) - if err := env.Bootstrap(ctx, &p); err != nil { + return env.ServeProc(ctx, &p) +} + +func (env Env) ServeProc(ctx context.Context, p *proc.P) error { + sub, err := env.Host.EventBus().Subscribe([]any{ + new(event.EvtLocalAddressesUpdated), + new(event.EvtLocalProtocolsUpdated)}) + if err != nil { return err } + defer sub.Close() - release := Bind(ctx, env.Host, &p) + // if err := env.Bootstrap(ctx, &p); err != nil { + // return err + // } + + // TODO: client apps shouldn't listen for streams + release := env.Bind(ctx, p) defer release() - <-ctx.Done() - return ctx.Err() + slog.InfoContext(ctx, "event loop started", + "peer", env.Host.ID(), + "proc", p.String()) + defer slog.WarnContext(ctx, "event loop halted", + "peer", env.Host.ID(), + "proc", p.String()) + + for { + var v any + select { + case <-ctx.Done(): + return ctx.Err() + case v = <-sub.Out(): // assign event to v + } + + switch ev := v.(type) { + case *event.EvtLocalAddressesUpdated: + slog.InfoContext(ctx, "local addresses updated", + "peer", env.Host.ID(), + "current", ev.Current, + "removed", ev.Removed, + "diffs", ev.Diffs) + + case *event.EvtLocalProtocolsUpdated: + slog.InfoContext(ctx, "local protocols updated", + "peer", env.Host.ID(), + "added", ev.Added, + "removed", ev.Removed) + } + } } func (env Env) Bootstrap(ctx context.Context, p *proc.P) error { @@ -94,7 +138,7 @@ func (env Env) Bootstrap(ctx context.Context, p *proc.P) error { R: env.Stdin, N: int64(1<<32 - 1), // max u32 }) - if err != nil { + if err != nil || len(b) == 0 { return err } @@ -112,15 +156,22 @@ func (env Env) Bootstrap(ctx context.Context, p *proc.P) error { return p.Deliver(ctx, call) } -type ReleaseFunc func() +func (env Env) Bind(ctx context.Context, p *proc.P) ReleaseFunc { + var timeout time.Duration // default 0 + if dl, ok := ctx.Deadline(); ok { + timeout = time.Until(dl) + } -func Bind(ctx context.Context, h host.Host, p *proc.P) ReleaseFunc { - handler := proc.StreamHandler{Proc: p} + handler := proc.StreamHandler{ + Proc: p, + MessageReadTimeout: timeout} proto := handler.Proto() - h.SetStreamHandlerMatch( + env.Host.SetStreamHandlerMatch( proto, handler.Match, handler.Bind(ctx)) - return func() { h.RemoveStreamHandler(proto) } + return func() { env.Host.RemoveStreamHandler(proto) } } + +type ReleaseFunc func()