Skip to content

Commit

Permalink
Add logging, event loop.
Browse files Browse the repository at this point in the history
  • Loading branch information
lthibault committed Nov 14, 2024
1 parent a34c5ea commit a46d363
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 62 deletions.
148 changes: 100 additions & 48 deletions examples/deliver/main.go
Original file line number Diff line number Diff line change
@@ -1,51 +1,103 @@
//go:generate tinygo build -o main.wasm -target=wasi -scheduler=none main.go
// //go:generate tinygo build -o main.wasm -target=wasi -scheduler=none main.go

package main

import (
"context"
"io"
"log/slog"
"os"

"github.com/urfave/cli/v2"
)

func main() {
ctx := context.TODO()

app := &cli.App{
Name: "deliver",
Usage: "read message from stdn and send to `PID`",
ArgsUsage: "<PID>",
// Flags: []cli.Flag{},
Action: deliver,
}

if err := app.RunContext(ctx, os.Args); err != nil {
slog.ErrorContext(ctx, "application failed",
"reason", err)
os.Exit(1)
}
}

func deliver(c *cli.Context) error {
name := c.Args().First()
f, err := os.Open(name)
if err != nil {
return err
}
defer f.Close()

r := io.LimitReader(c.App.Reader, 1<<32-1) // max unit32

if n, err := io.Copy(f, r); err != nil {
return err
} else {
slog.DebugContext(c.Context, "delivered message",
"size", n,
"dest", name)
}

return nil
}
// import (
// "bytes"
// "context"
// "io"
// "log/slog"
// "os"

// "capnproto.org/go/capnp/v3"
// "github.com/urfave/cli/v2"
// "github.com/wetware/go/proc"
// )

// func main() {
// ctx := context.TODO()

// app := &cli.App{
// Name: "deliver",
// Usage: "read message from stdn and send to `PID`",
// ArgsUsage: "<PID>",
// Flags: []cli.Flag{
// &cli.StringFlag{
// Name: "method",
// Aliases: []string{"m"},
// Usage: "method name",
// },
// &cli.Uint64SliceFlag{
// Name: "push",
// Aliases: []string{"p"},
// Usage: "push u64 onto stack",
// },
// },
// Action: deliver,
// }

// if err := app.RunContext(ctx, os.Args); err != nil {
// slog.ErrorContext(ctx, "application failed",
// "reason", err)
// os.Exit(1)
// }
// }

// func deliver(c *cli.Context) error {
// name := c.Args().First()
// f, err := os.Open(name)
// if err != nil {
// return err
// }
// defer f.Close()

// m, seg := capnp.NewSingleSegmentMessage(nil)
// defer m.Release()

// call, err := proc.NewRootMethodCall(seg)
// if err != nil {
// return err
// }

// err = call.SetName(c.String("method"))
// if err != nil {
// return err
// }

// stack := c.Uint64Slice("stack")
// size := int32(len(stack))

// callStack, err := call.NewStack(size)
// if err != nil {
// return err
// }

// for i, word := range stack {
// callStack.Set(i, word)
// }

// r := io.LimitReader(c.App.Reader, 1<<32-1) // max u32
// data, err := io.ReadAll(r)
// if err != nil {
// return err
// }

// if err = call.SetCallData(data); err != nil {
// return err
// }

// b, err := m.Marshal()
// if err != nil {
// return err
// }

// n, err := io.Copy(f, bytes.NewReader(b))
// if err != nil {
// return err
// }

// slog.DebugContext(c.Context, "delivered message",
// "size", n,
// "dest", name)
// return nil
// }
Binary file removed examples/deliver/main.wasm
Binary file not shown.
8 changes: 5 additions & 3 deletions proc/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ type Config struct {
}

func (cfg Config) Bind(ctx context.Context, p *P) (err error) {
pid := NewPID()
proto := path.Join(string(cfg.Proto), pid.String()) // /ww/0.1.0/<pid>
// /ww/0.1.0/<pid>
proto := path.Join(
string(cfg.Proto), // /ww/<version>
NewPID().String()) // <pid>

mc := wazero.NewModuleConfig().
WithName(pid.String()).
WithName(proto).
WithArgs(cfg.Args...).
WithStdin(&p.mailbox).
WithStdout(cfg.Stdout).
Expand Down
2 changes: 1 addition & 1 deletion util/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type VersionedID struct {
func (v VersionedID) String() string {
proto := string(v.ID)
version := v.Version.String()
return path.Join(proto, version)
return path.Join("/", proto, version)
}

func (v VersionedID) Unwrap() protocol.ID {
Expand Down
71 changes: 61 additions & 10 deletions ww.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"io"
"io/fs"
"log/slog"
"time"

"capnproto.org/go/capnp/v3"
"github.com/blang/semver/v4"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
Expand Down Expand Up @@ -78,23 +81,64 @@ func (env Env) CompileAndServe(ctx context.Context, bytecode []byte) error {
}
defer p.Close(ctx)

if err := env.Bootstrap(ctx, &p); err != nil {
return env.ServeProc(ctx, &p)
}

func (env Env) ServeProc(ctx context.Context, p *proc.P) error {
sub, err := env.Host.EventBus().Subscribe([]any{
new(event.EvtLocalAddressesUpdated),
new(event.EvtLocalProtocolsUpdated)})
if err != nil {
return err
}
defer sub.Close()

release := Bind(ctx, env.Host, &p)
// if err := env.Bootstrap(ctx, &p); err != nil {
// return err
// }

// TODO: client apps shouldn't listen for streams
release := env.Bind(ctx, p)
defer release()

<-ctx.Done()
return ctx.Err()
slog.InfoContext(ctx, "event loop started",
"peer", env.Host.ID(),
"proc", p.String())
defer slog.WarnContext(ctx, "event loop halted",
"peer", env.Host.ID(),
"proc", p.String())

for {
var v any
select {
case <-ctx.Done():
return ctx.Err()
case v = <-sub.Out(): // assign event to v
}

switch ev := v.(type) {
case *event.EvtLocalAddressesUpdated:
slog.InfoContext(ctx, "local addresses updated",
"peer", env.Host.ID(),
"current", ev.Current,
"removed", ev.Removed,
"diffs", ev.Diffs)

case *event.EvtLocalProtocolsUpdated:
slog.InfoContext(ctx, "local protocols updated",
"peer", env.Host.ID(),
"added", ev.Added,
"removed", ev.Removed)
}
}
}

func (env Env) Bootstrap(ctx context.Context, p *proc.P) error {
b, err := io.ReadAll(&io.LimitedReader{
R: env.Stdin,
N: int64(1<<32 - 1), // max u32
})
if err != nil {
if err != nil || len(b) == 0 {
return err
}

Expand All @@ -112,15 +156,22 @@ func (env Env) Bootstrap(ctx context.Context, p *proc.P) error {
return p.Deliver(ctx, call)
}

type ReleaseFunc func()
func (env Env) Bind(ctx context.Context, p *proc.P) ReleaseFunc {
var timeout time.Duration // default 0
if dl, ok := ctx.Deadline(); ok {
timeout = time.Until(dl)
}

func Bind(ctx context.Context, h host.Host, p *proc.P) ReleaseFunc {
handler := proc.StreamHandler{Proc: p}
handler := proc.StreamHandler{
Proc: p,
MessageReadTimeout: timeout}
proto := handler.Proto()

h.SetStreamHandlerMatch(
env.Host.SetStreamHandlerMatch(
proto,
handler.Match,
handler.Bind(ctx))
return func() { h.RemoveStreamHandler(proto) }
return func() { env.Host.RemoveStreamHandler(proto) }
}

type ReleaseFunc func()

0 comments on commit a46d363

Please sign in to comment.