Skip to content

Commit

Permalink
Add git serve command.
Browse files Browse the repository at this point in the history
  • Loading branch information
lthibault committed Nov 18, 2024
1 parent 4378290 commit 4255881
Show file tree
Hide file tree
Showing 15 changed files with 407 additions and 246 deletions.
104 changes: 19 additions & 85 deletions cmd/internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,19 @@ package run

import (
"bytes"
"context"
"io"
"log/slog"
"net/http"
"os"

"github.com/ipfs/kubo/client/rpc"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
ma "github.com/multiformats/go-multiaddr"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
"github.com/urfave/cli/v2"
ww "github.com/wetware/go"
"github.com/wetware/go/proc"
"github.com/wetware/go/system"
)

Expand All @@ -33,19 +28,14 @@ func Command() *cli.Command {
Usage: "multi`addr` of IPFS node, or \"local\"",
Value: "local",
},
&cli.StringFlag{
Name: "dial",
EnvVars: []string{"WW_DIAL"},
// Usage: "",
// Value: "", // TODO: default to /ipfs/<CID> pointing to shell
},
&cli.StringSliceFlag{
Name: "env",
EnvVars: []string{"WW_ENV"},
},
&cli.BoolFlag{
Name: "stdin",
Usage: "bind stdin to wasm guest",
Name: "wasm-debug",
EnvVars: []string{"WW_WASM_DEBUG"},
Usage: "enable wasm debug symbols",
},
},
Action: run(),
Expand All @@ -65,20 +55,20 @@ func run() cli.ActionFunc {
}
defer h.Close()

// Create an mDNS service to discover peers on the local network
peerHook := system.StorePeer{Peerstore: h.Peerstore()}
d := mdns.NewMdnsService(h, "ww.local", peerHook)
if err := d.Start(); err != nil {
// Start a multicast DNS service that searches for local
// peers in the background.
d, err := ww.MDNS{
Host: h,
Handler: ww.StorePeer{Peerstore: h.Peerstore()},
}.New(c.Context)
if err != nil {
return err
}
defer d.Close()

// Set up WASM runtime and host modules
r := wazero.NewRuntimeWithConfig(c.Context, wazero.NewRuntimeConfig().
// WithCompilationCache().
WithDebugInfoEnabled(c.Bool("debug")).
WithCloseOnContextDone(true))
defer r.Close(c.Context)

wasi, err := wasi_snapshot_preview1.Instantiate(c.Context, r)
if err != nil {
Expand All @@ -91,71 +81,23 @@ func run() cli.ActionFunc {
Args: c.Args().Slice(),
Env: c.StringSlice("env"),
Stdin: stdin(c),
Stdout: stdout(c),
Stderr: stderr(c),
},
Stdout: c.App.Writer,
Stderr: c.App.ErrWriter},
Net: system.Net{
Host: h,
Handler: handler(c, h),
Proto: ww.Proto,
Host: h,
},
FS: system.IPFS{
FS: system.FS{
Ctx: c.Context,
Unix: ipfs.Unixfs(),
Host: h,
IPFS: ipfs,
},
}.Bind(c.Context, r)
}
}

func handler(c *cli.Context, h host.Host) system.HandlerFunc {
return func(ctx context.Context, p *proc.P) error {
sub, err := h.EventBus().Subscribe([]any{
new(event.EvtLocalAddressesUpdated),
new(event.EvtLocalProtocolsUpdated)})
if err != nil {
return err
}
defer sub.Close()

// asynchronous event loop
slog.InfoContext(ctx, "wetware started",
"peer", h.ID(),
"path", c.Args().First(),
"proc", p.String(),
"proto", ww.Proto.String())
defer slog.WarnContext(ctx, "wetware stopped",
"peer", h.ID(),
"path", c.Args().First(),
"proc", p.String(),
"proto", ww.Proto.String())

for {
var v any
select {
case <-ctx.Done():
return ctx.Err()

case v = <-sub.Out():
// current event is assigned to v

switch ev := v.(type) {
case *event.EvtLocalAddressesUpdated:
// TODO(easy): emit to libp2p topic
slog.InfoContext(ctx, "local addresses updated",
"peer", h.ID(),
"current", ev.Current,
"removed", ev.Removed,
"diffs", ev.Diffs)

case *event.EvtLocalProtocolsUpdated:
// TODO(easy): emit to libp2p topic
slog.InfoContext(ctx, "local protocols updated",
"peer", h.ID(),
"added", ev.Added,
"removed", ev.Removed)
}
}
}
}
type procServer struct {
Host host.Host
}

func newIPFSClient(c *cli.Context) (ipfs iface.CoreAPI, err error) {
Expand Down Expand Up @@ -189,11 +131,3 @@ func stdin(c *cli.Context) io.Reader {

return &bytes.Reader{} // empty buffer
}

func stdout(c *cli.Context) io.Writer {
return c.App.Writer
}

func stderr(c *cli.Context) io.Writer {
return c.App.ErrWriter
}
189 changes: 189 additions & 0 deletions cmd/internal/serve/serve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package serve

import (
"bytes"
"context"
"io"
"io/fs"
"log/slog"
"net/http"

"github.com/ipfs/kubo/client/rpc"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
ma "github.com/multiformats/go-multiaddr"
"github.com/tetratelabs/wazero"
"github.com/thejerf/suture/v4"
"github.com/urfave/cli/v2"
ww "github.com/wetware/go"
"github.com/wetware/go/proc"
"github.com/wetware/go/system"
"github.com/wetware/go/util"
)

func Command() *cli.Command {
return &cli.Command{
Name: "serve",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "ipfs",
EnvVars: []string{"WW_IPFS"},
Usage: "multi`addr` of IPFS node, or \"local\"",
Value: "local",
},
&cli.StringSliceFlag{
Name: "env",
EnvVars: []string{"WW_ENV"},
},
&cli.BoolFlag{
Name: "wasm-debug",
EnvVars: []string{"WW_WASM_DEBUG"},
Usage: "enable wasm debug symbols",
},
},
Action: func(c *cli.Context) error {
ipfs, err := newIPFSClient(c)
if err != nil {
return err
}

h, err := libp2p.New()
if err != nil {
return err
}
defer h.Close()

s := suture.New("ww", suture.Spec{
EventHook: util.EventHook,
})

// Start a multicast DNS service that searches for local
// peers in the background.
s.Add(ww.MDNS{
Host: h,
Handler: ww.StorePeer{Peerstore: h.Peerstore()},
})

// Global compilation cache
cache := wazero.NewCompilationCache()
defer cache.Close(c.Context)

s.Add(ww.Server{
IPFS: ipfs,
Host: h,
Env: ww.Env{
IO: system.IO{
Args: c.Args().Slice(),
Env: c.StringSlice("env"),
Stdin: stdin(c),
Stdout: c.App.Writer,
Stderr: c.App.ErrWriter,
},
Net: system.Net{
Proto: ww.Proto,
Host: h,
Handler: system.HandlerFunc(func(ctx context.Context, p *proc.P) error {
slog.InfoContext(ctx, "process started",
"peer", h.ID(),
"proc", p.String())
defer slog.WarnContext(ctx, "process stopped",
"peer", h.ID(),
"proc", p.String())
<-ctx.Done()
return ctx.Err()
}),
},
FS: system.FS{
Ctx: c.Context,
Host: h,
IPFS: ipfs,
},
},
RuntimeConfig: wazero.NewRuntimeConfig().
WithCompilationCache(cache).
WithDebugInfoEnabled(c.Bool("debug")).
WithCloseOnContextDone(true),
})

sub, err := h.EventBus().Subscribe([]any{
new(event.EvtLocalAddressesUpdated)})
if err != nil {
return err
}
defer sub.Close()

done := s.ServeBackground(c.Context)
for {
var v any
select {
case err := <-done:
return err // exit
case v = <-sub.Out():
// event received
}

// Synchronous event handler
switch ev := v.(type) {
case *event.EvtLocalAddressesUpdated:
// TODO(easy): emit to libp2p topic
slog.InfoContext(c.Context, "local addresses updated",
"peer", h.ID(),
"current", ev.Current,
"removed", ev.Removed,
"diffs", ev.Diffs)

case *event.EvtLocalProtocolsUpdated:
// TODO(easy): emit to libp2p topic
slog.InfoContext(c.Context, "local protocols updated",
"peer", h.ID(),
"added", ev.Added,
"removed", ev.Removed)
}

}
},
}
}

func newIPFSClient(c *cli.Context) (ipfs iface.CoreAPI, err error) {
var a ma.Multiaddr
if s := c.String("ipfs"); s == "local" {
ipfs, err = rpc.NewLocalApi()
} else if a, err = ma.NewMultiaddr(s); err == nil {
ipfs, err = rpc.NewApiWithClient(a, http.DefaultClient)
}

return
}

func stdin(c *cli.Context) io.Reader {
switch r := c.App.Reader.(type) {
case interface{ Len() int }:
if r.Len() <= 0 {
break
}

return &io.LimitedReader{
R: c.App.Reader,
N: min(int64(r.Len()), 1<<32-1), // max u32
}

case interface{ Stat() (fs.FileInfo, error) }:
info, err := r.Stat()
if err != nil {
slog.Error("failed to get file info for stdin",
"reason", err)
break
} else if info.Size() <= 0 {
break
}

return &io.LimitedReader{
R: c.App.Reader,
N: min(info.Size(), 1<<32-1), // max u32
}
}

return &bytes.Reader{} // empty buffer
}
2 changes: 2 additions & 0 deletions cmd/ww/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/wetware/go/cmd/internal/call"
"github.com/wetware/go/cmd/internal/export"
"github.com/wetware/go/cmd/internal/run"
"github.com/wetware/go/cmd/internal/serve"
)

func main() {
Expand Down Expand Up @@ -41,6 +42,7 @@ func main() {
},
Commands: []*cli.Command{
run.Command(),
serve.Command(),
export.Command(),
call.Command(),
},
Expand Down
Loading

0 comments on commit 4255881

Please sign in to comment.