From 2221a744f236e00c413e4eafb1bd038f7454566d Mon Sep 17 00:00:00 2001 From: wurui Date: Tue, 5 Nov 2024 17:18:41 +0800 Subject: [PATCH] feat(cli): remove deno serverless & support run .ts serverless in nodejs (#935) # Description 1. Support users use `TypeScript` to write a llm sfn, For an example, refer to `example/10-ai/llm-sfn-get-weather-ts`. 2. Add `yomo run app.ts` command to run the ts sfn. 3. Remove the default SFN name 'app'. If a name is not provided, the cli will prompt the user to add it. 4. Remove deno serverless support --- cli/cli.go | 1 + cli/run.go | 29 +- cli/serverless/deno/mod/mod.ts | 116 ----- cli/serverless/deno/runtime.go | 198 -------- cli/serverless/deno/serverless.go | 42 -- cli/serverless/golang/serverless.go | 3 +- cli/serverless/golang/templates/main.tmpl | 2 +- cli/serverless/golang/templates/main_rx.tmpl | 2 +- cli/serverless/nodejs/runtime.go | 144 ++++++ cli/serverless/nodejs/serverless.go | 59 +++ .../nodejs/templates/wrapper_ts.tmpl | 62 +++ cli/serverless/wasm/serverless.go | 7 + example/8-deno/.env | 1 - example/8-deno/README.md | 34 -- example/8-deno/Taskfile.yaml | 45 -- example/8-deno/app.ts | 18 - example/8-nodejs/.gitignore | 4 + example/8-nodejs/README.md | 50 ++ example/8-nodejs/app.ts | 25 + example/8-nodejs/package.json | 19 + example/8-nodejs/pnpm-lock.yaml | 438 ++++++++++++++++++ pkg/wrapper/wrapper.go | 206 ++++++++ 22 files changed, 1027 insertions(+), 478 deletions(-) delete mode 100644 cli/serverless/deno/mod/mod.ts delete mode 100644 cli/serverless/deno/runtime.go delete mode 100644 cli/serverless/deno/serverless.go create mode 100644 cli/serverless/nodejs/runtime.go create mode 100644 cli/serverless/nodejs/serverless.go create mode 100644 cli/serverless/nodejs/templates/wrapper_ts.tmpl delete mode 100644 example/8-deno/.env delete mode 100644 example/8-deno/README.md delete mode 100644 example/8-deno/Taskfile.yaml delete mode 100644 example/8-deno/app.ts create mode 100644 example/8-nodejs/.gitignore create mode 100644 example/8-nodejs/README.md create mode 100644 example/8-nodejs/app.ts create mode 100644 example/8-nodejs/package.json create mode 100644 example/8-nodejs/pnpm-lock.yaml create mode 100644 pkg/wrapper/wrapper.go diff --git a/cli/cli.go b/cli/cli.go index dab3ee284..a5092a166 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -18,6 +18,7 @@ import ( // defaultSFNFile is the default serverless file name const ( defaultSFNSourceFile = "app.go" + defaultSFNSourceTSFile = "app.ts" defaultSFNTestSourceFile = "app_test.go" defaultSFNCompliedFile = "sfn.yomo" defaultSFNWASIFile = "sfn.wasm" diff --git a/cli/run.go b/cli/run.go index 032862787..0a886f9c6 100644 --- a/cli/run.go +++ b/cli/run.go @@ -17,16 +17,15 @@ package cli import ( "os" - "path/filepath" "github.com/spf13/cobra" "github.com/yomorun/yomo/pkg/log" // serverless registrations "github.com/yomorun/yomo/cli/serverless" - _ "github.com/yomorun/yomo/cli/serverless/deno" _ "github.com/yomorun/yomo/cli/serverless/exec" _ "github.com/yomorun/yomo/cli/serverless/golang" + _ "github.com/yomorun/yomo/cli/serverless/nodejs" _ "github.com/yomorun/yomo/cli/serverless/wasm" "github.com/yomorun/yomo/cli/viper" ) @@ -37,7 +36,7 @@ var runCmd = &cobra.Command{ Short: "Run a YoMo Stream Function", Long: "Run a YoMo Stream Function", Run: func(cmd *cobra.Command, args []string) { - if err := parseFileArg(args, &opts, defaultSFNCompliedFile, defaultSFNWASIFile, defaultSFNSourceFile); err != nil { + if err := parseFileArg(args, &opts, defaultSFNCompliedFile, defaultSFNWASIFile, defaultSFNSourceFile, defaultSFNSourceTSFile); err != nil { log.FailureStatusEvent(os.Stdout, err.Error()) return } @@ -68,24 +67,12 @@ var runCmd = &cobra.Command{ ) return } - // build if it's go file - if ext := filepath.Ext(opts.Filename); ext == ".go" { - log.PendingStatusEvent(os.Stdout, "Building YoMo Stream Function instance...") - if err := s.Build(true); err != nil { - log.FailureStatusEvent(os.Stdout, err.Error()) - os.Exit(127) - } - log.SuccessStatusEvent(os.Stdout, "YoMo Stream Function build successful!") - } - // run - // wasi - if ext := filepath.Ext(opts.Filename); ext == ".wasm" { - wasmRuntime := opts.Runtime - if wasmRuntime == "" { - wasmRuntime = "wazero" - } - log.InfoStatusEvent(os.Stdout, "WASM runtime: %s", wasmRuntime) + + if err := s.Build(true); err != nil { + log.FailureStatusEvent(os.Stdout, err.Error()) + os.Exit(127) } + log.InfoStatusEvent( os.Stdout, "Starting YoMo Stream Function instance, connecting to zipper: %v", @@ -103,7 +90,7 @@ func init() { rootCmd.AddCommand(runCmd) runCmd.Flags().StringVarP(&opts.ZipperAddr, "zipper", "z", "localhost:9000", "YoMo-Zipper endpoint addr") - runCmd.Flags().StringVarP(&opts.Name, "name", "n", "app", "yomo stream function name.") + runCmd.Flags().StringVarP(&opts.Name, "name", "n", "", "yomo stream function name.") runCmd.Flags().StringVarP(&opts.ModFile, "modfile", "m", "", "custom go.mod") runCmd.Flags().StringVarP(&opts.Credential, "credential", "d", "", "client credential payload, eg: `token:dBbBiRE7`") runCmd.Flags().StringVarP(&opts.Runtime, "runtime", "r", "", "serverless runtime type") diff --git a/cli/serverless/deno/mod/mod.ts b/cli/serverless/deno/mod/mod.ts deleted file mode 100644 index 17805706a..000000000 --- a/cli/serverless/deno/mod/mod.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { Reader, Writer } from "https://deno.land/std/types.d.ts"; -import { - readVarnum, - varnumBytes, - VarnumOptions, -} from "https://deno.land/std/encoding/binary.ts"; -import { loadSync } from "https://deno.land/std/dotenv/mod.ts"; - -export class Context { - tag: number; - input: Uint8Array; - private writer: Writer; - - constructor(tag: number, input: Uint8Array, writer: Writer) { - this.tag = tag; - this.input = input; - this.writer = writer; - } - - async write(tag: number, data: Uint8Array) { - await this.writer.write(numberToBytes(tag)); - await writeData(this.writer, data); - } -} - -const VARNUM_OPTIONS: VarnumOptions = { - "dataType": "uint32", - "endian": "little", -}; - -function numberToBytes(val: number): Uint8Array { - return varnumBytes(val, VARNUM_OPTIONS); -} - -async function readNumber(reader: Reader): Promise { - try { - return await readVarnum(reader, VARNUM_OPTIONS); - } catch (e) { - if (e instanceof Deno.errors.UnexpectedEof) { - return null; - } - throw e; - } -} - -async function readData(reader: Reader): Promise { - const length = await readNumber(reader); - if (length == null) { - return null; - } - const buf = new Uint8Array(length); - const n = await reader.read(buf); - if (n == null || n !== length) { - return null; - } - return buf; -} - -async function writeData(writer: Writer, data: Uint8Array) { - await writer.write(numberToBytes(data.length)); - await writer.write(data); -} - -export async function run( - observed: [number], - handler: (ctx: Context) => Promise, -) { - let sock = "./sfn.sock"; - let env = null; - if (Deno.args.length > 0) { - sock = Deno.args[0]; - if (Deno.args.length > 1) { - env = Deno.args[1]; - } - } - - if (env != null) { - loadSync({ - envPath: env, - defaultsPath: "", - examplePath: "", - export: true, - allowEmptyValues: true, - }); - } - - const conn: Deno.UnixConn = await Deno.connect({ - path: sock, - transport: "unix", - }); - - await conn.write(numberToBytes(observed.length)); - for (const tag of observed) { - await conn.write(numberToBytes(tag)); - } - - for (;;) { - const tag = await readNumber(conn); - if (tag == null) { - break; - } - - const data = await readData(conn); - if (data == null) { - break; - } - - const ctx = new Context(tag, data, conn); - await handler(ctx); - - await conn.write(numberToBytes(0)); // tag - await conn.write(numberToBytes(0)); // length - } - - conn.close(); -} diff --git a/cli/serverless/deno/runtime.go b/cli/serverless/deno/runtime.go deleted file mode 100644 index 738b6e6a5..000000000 --- a/cli/serverless/deno/runtime.go +++ /dev/null @@ -1,198 +0,0 @@ -// Package deno provides a js/ts serverless runtime -package deno - -import ( - "encoding/binary" - "errors" - "io" - "log" - "net" - "os" - "os/exec" - "time" - - "github.com/yomorun/yomo" - "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/file" - "github.com/yomorun/yomo/serverless" -) - -func listen(path string) (*net.UnixListener, error) { - err := file.Remove(path) - if err != nil { - return nil, err - } - - addr, err := net.ResolveUnixAddr("unix", path) - if err != nil { - return nil, err - } - return net.ListenUnix("unix", addr) -} - -func accept(listener *net.UnixListener) ([]frame.Tag, *net.UnixConn, error) { - defer listener.Close() - - listener.SetUnlinkOnClose(true) - listener.SetDeadline(time.Now().Add(3 * time.Second)) - - conn, err := listener.AcceptUnix() - if err != nil { - return nil, nil, err - } - - conn.SetReadDeadline(time.Now().Add(3 * time.Second)) - var length uint32 - err = binary.Read(conn, binary.LittleEndian, &length) - if err != nil { - conn.Close() - return nil, nil, err - } - - observedBytes := make([]byte, length*4) - _, err = io.ReadFull(conn, observedBytes) - if err != nil { - conn.Close() - return nil, nil, err - } - conn.SetReadDeadline(time.Time{}) - - observed := make([]frame.Tag, length) - for i := 0; i < int(length); i++ { - observed[i] = frame.Tag(binary.LittleEndian.Uint32(observedBytes[i*4 : i*4+4])) - } - - return observed, conn, nil -} - -func runDeno(jsPath string, socketPath string, errCh chan<- error) { - cmd := exec.Command( - "deno", - "run", - "--unstable", - "--allow-read=.,"+socketPath, - "--allow-write=.,"+socketPath, - "--allow-env", - "--allow-net", - jsPath, - socketPath, - ) - cmd.Stdin = os.Stdin - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - err := cmd.Run() - if err != nil { - errCh <- err - } -} - -func startSfn(name string, zipperAddr string, credential string, observed []frame.Tag, conn net.Conn, errCh chan<- error) (yomo.StreamFunction, error) { - sfn := yomo.NewStreamFunction( - name, - zipperAddr, - yomo.WithSfnCredential(credential), - ) - - // init - sfn.Init(func() error { - return nil - }) - - sfn.SetObserveDataTags(observed...) - - sfn.SetHandler( - func(ctx serverless.Context) { - tag := ctx.Tag() - err := binary.Write(conn, binary.LittleEndian, tag) - if err != nil { - errCh <- err - return - } - - data := ctx.Data() - err = binary.Write(conn, binary.LittleEndian, uint32(len(data))) - if err != nil { - errCh <- err - return - } - - _, err = conn.Write(data) - if err != nil { - errCh <- err - return - } - - var length uint32 - for { - err := binary.Read(conn, binary.LittleEndian, &tag) - if err != nil { - errCh <- err - return - } - - err = binary.Read(conn, binary.LittleEndian, &length) - if err != nil { - errCh <- err - return - } - - if tag == 0 && length == 0 { - break - } - - data := make([]byte, length) - _, err = io.ReadFull(conn, data) - if err != nil { - errCh <- err - return - } - - ctx.Write(tag, data) - } - }, - ) - - sfn.SetErrorHandler( - func(err error) { - log.Printf("[deno][%s] error handler: %T %v\n", zipperAddr, err, err) - }, - ) - - err := sfn.Connect() - if err != nil { - return nil, err - } - - return sfn, nil -} - -func run(name string, zipperAddr string, credential string, jsPath string, socketPath string) error { - if _, err := exec.LookPath("deno"); err != nil { - return errors.New("[deno] command was not found. For details, visit https://deno.land") - } - - errCh := make(chan error) - - listener, err := listen(socketPath) - if err != nil { - return err - } - - go runDeno(jsPath, socketPath, errCh) - - observed, conn, err := accept(listener) - if err != nil { - return err - } - defer conn.Close() - - sfn, err := startSfn(name, zipperAddr, credential, observed, conn, errCh) - if err != nil { - return err - } - defer sfn.Close() - - err = <-errCh - return err -} diff --git a/cli/serverless/deno/serverless.go b/cli/serverless/deno/serverless.go deleted file mode 100644 index 6f055f30a..000000000 --- a/cli/serverless/deno/serverless.go +++ /dev/null @@ -1,42 +0,0 @@ -// Package deno provides a js/ts serverless runtime -package deno - -import ( - "github.com/yomorun/yomo/cli/serverless" -) - -// denoServerless will start deno program to run serverless functions. -type denoServerless struct { - name string - fileName string - zipperAddr string - credential string -} - -// Init initializes the serverless -func (s *denoServerless) Init(opts *serverless.Options) error { - s.name = opts.Name - s.fileName = opts.Filename - s.zipperAddr = opts.ZipperAddr - s.credential = opts.Credential - return nil -} - -// Build is an empty implementation -func (s *denoServerless) Build(clean bool) error { - return nil -} - -// Run the wasm serverless function -func (s *denoServerless) Run(verbose bool) error { - return run(s.name, s.zipperAddr, s.credential, s.fileName, "./"+s.name+".sock") -} - -// Executable shows whether the program needs to be built -func (s *denoServerless) Executable() bool { - return true -} - -func init() { - serverless.Register(&denoServerless{}, ".js", ".ts") -} diff --git a/cli/serverless/golang/serverless.go b/cli/serverless/golang/serverless.go index 0f69bffe2..f0e833b17 100644 --- a/cli/serverless/golang/serverless.go +++ b/cli/serverless/golang/serverless.go @@ -136,6 +136,7 @@ func (s *GolangServerless) Init(opts *serverless.Options) error { // Build compiles the serverless to executable func (s *GolangServerless) Build(clean bool) error { + log.PendingStatusEvent(os.Stdout, "Building YoMo Stream Function instance...") // check if the file exists appPath := s.source if _, err := os.Stat(appPath); os.IsNotExist(err) { @@ -183,7 +184,6 @@ func (s *GolangServerless) Build(clean bool) error { err = fmt.Errorf("Build: go mod tidy err %s", out) return err } - // build // wasi dir, _ := filepath.Split(s.opts.Filename) filename := "sfn.yomo" @@ -212,6 +212,7 @@ func (s *GolangServerless) Build(clean bool) error { if clean { file.Remove(s.tempDir) } + log.SuccessStatusEvent(os.Stdout, "YoMo Stream Function build successful!") return nil } diff --git a/cli/serverless/golang/templates/main.tmpl b/cli/serverless/golang/templates/main.tmpl index f20ff6fbe..9e77d10de 100644 --- a/cli/serverless/golang/templates/main.tmpl +++ b/cli/serverless/golang/templates/main.tmpl @@ -68,7 +68,7 @@ func run(_ *cobra.Command, _ []string) { func init() { rootCmd.Flags().StringVarP(&zipper, "zipper", "z", "localhost:9000", "YoMo-Zipper endpoint addr") - rootCmd.Flags().StringVarP(&name, "name", "n", "app", "yomo stream function name") + rootCmd.Flags().StringVarP(&name, "name", "n", "", "yomo stream function name") rootCmd.Flags().StringVarP(&credential, "credential", "d", "", "client credential payload, eg: `token:dBbBiRE7`") viper.SetEnvPrefix("YOMO_SFN") viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) diff --git a/cli/serverless/golang/templates/main_rx.tmpl b/cli/serverless/golang/templates/main_rx.tmpl index a8245391e..196f24bd1 100644 --- a/cli/serverless/golang/templates/main_rx.tmpl +++ b/cli/serverless/golang/templates/main_rx.tmpl @@ -72,7 +72,7 @@ func run(_ *cobra.Command, _ []string) { func init() { rootCmd.Flags().StringVarP(&zipper, "zipper", "z", "localhost:9000", "YoMo-Zipper endpoint addr") - rootCmd.Flags().StringVarP(&name, "name", "n", "app", "yomo stream function name") + rootCmd.Flags().StringVarP(&name, "name", "n", "", "yomo stream function name") rootCmd.Flags().StringVarP(&credential, "credential", "d", "", "client credential payload, eg: `token:dBbBiRE7`") viper.SetEnvPrefix("YOMO_SFN") viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) diff --git a/cli/serverless/nodejs/runtime.go b/cli/serverless/nodejs/runtime.go new file mode 100644 index 000000000..7140d6b43 --- /dev/null +++ b/cli/serverless/nodejs/runtime.go @@ -0,0 +1,144 @@ +package nodejs + +import ( + "errors" + "fmt" + "html/template" + "os" + "os/exec" + "path/filepath" + + _ "embed" +) + +//go:embed templates/wrapper_ts.tmpl +var WrapperTSTmpl string + +var ( + wrapperTS = ".wrapper.ts" + wrapperJS = ".wrapper.js" +) + +// NodejsWrapper is the nodejs implementation of wrapper. +type NodejsWrapper struct { + functionName string + workDir string // eg. src/ + entryTSFile string // eg. src/app.ts + entryJSFile string // eg. src/app.js + fileName string // eg. src/app + + // command path + nodePath string + npmPath string +} + +// NewWrapper returns a new NodejsWrapper +func NewWrapper(functionName, entryTSFile string) (*NodejsWrapper, error) { + // check command + nodePath, err := exec.LookPath("node") + if err != nil { + return nil, errors.New("[node] command was not found. to run the sfn in ts, you need to install node. For details, visit https://nodejs.org") + } + npmPath, err := exec.LookPath("pnpm") + if err != nil { + npmPath, _ = exec.LookPath("npm") + } + + ext := filepath.Ext(entryTSFile) + if ext != ".ts" { + return nil, fmt.Errorf("only support typescript, got: %s", entryTSFile) + } + workdir := filepath.Dir(entryTSFile) + + entryJSFile := entryTSFile[:len(entryTSFile)-len(ext)] + ".js" + + fileName := entryTSFile[:len(entryTSFile)-len(filepath.Ext(entryTSFile))] + + w := &NodejsWrapper{ + functionName: functionName, + workDir: workdir, + entryTSFile: entryTSFile, + entryJSFile: entryJSFile, + fileName: fileName, + nodePath: nodePath, + npmPath: npmPath, + } + + return w, nil +} + +// WorkDir returns the working directory of the serverless function to build and run. +func (w *NodejsWrapper) WorkDir() string { + return w.workDir +} + +// Build defines how to build the serverless function. +func (w *NodejsWrapper) Build(env []string) error { + // 1. generate .wrapper.ts file + dstPath := filepath.Join(w.workDir, wrapperTS) + _ = os.Remove(dstPath) + + if err := w.genWrapperTS(w.functionName, dstPath); err != nil { + return err + } + + // 2. install dependencies + cmd := exec.Command(w.npmPath, "install") + cmd.Dir = w.workDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = env + if err := cmd.Run(); err != nil { + return err + } + + // 3. compile .wrapper.ts file to .wrapper.js + cmd2 := exec.Command("tsc", wrapperTS) + cmd2.Dir = w.workDir + cmd2.Stdout = os.Stdout + cmd2.Stderr = os.Stderr + cmd2.Env = env + if err := cmd2.Run(); err != nil { + return err + } + + return nil +} + +// Run runs the serverless function +func (w *NodejsWrapper) Run(env []string) error { + cmd := exec.Command(w.nodePath, wrapperJS) + cmd.Dir = w.workDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = env + + return cmd.Run() +} + +func (w *NodejsWrapper) genWrapperTS(functionName, dstPath string) error { + data := struct { + WorkDir string + FunctionName string + FileName string + FilePath string + }{ + WorkDir: w.workDir, + FunctionName: functionName, + FileName: w.fileName, + FilePath: w.entryTSFile, + } + + dst, err := os.OpenFile(dstPath, os.O_WRONLY|os.O_CREATE, 0755) + if err != nil { + return err + } + defer dst.Close() + + t := template.Must(template.New("wrapper").Parse(WrapperTSTmpl)) + if err := t.Execute(dst, data); err != nil { + return err + } + + return nil +} diff --git a/cli/serverless/nodejs/serverless.go b/cli/serverless/nodejs/serverless.go new file mode 100644 index 000000000..64149a64e --- /dev/null +++ b/cli/serverless/nodejs/serverless.go @@ -0,0 +1,59 @@ +// Package nodejs provides a ts serverless runtime +package nodejs + +import ( + "os" + + "github.com/yomorun/yomo/cli/serverless" + "github.com/yomorun/yomo/pkg/wrapper" +) + +// nodejsServerless will start js program to run serverless functions. +type nodejsServerless struct { + name string + zipperAddr string + credential string + wrapper *NodejsWrapper +} + +// Init initializes the nodejs serverless +func (s *nodejsServerless) Init(opts *serverless.Options) error { + wrapper, err := NewWrapper(opts.Name, opts.Filename) + if err != nil { + return err + } + + s.name = opts.Name + s.zipperAddr = opts.ZipperAddr + s.credential = opts.Credential + s.wrapper = wrapper + + return nil +} + +// Build calls wrapper.Build +func (s *nodejsServerless) Build(_ bool) error { + return s.wrapper.Build(os.Environ()) +} + +// Run the wrapper.Run +func (s *nodejsServerless) Run(verbose bool) error { + err := serverless.LoadEnvFile(s.wrapper.workDir) + if err != nil { + return err + } + env := os.Environ() + if verbose { + env = append(env, "YOMO_LOG_LEVEL=debug") + } + return wrapper.Run(s.name, s.zipperAddr, s.credential, s.wrapper, env) +} + +// Executable shows whether the program needs to be built +func (s *nodejsServerless) Executable() bool { + return true +} + +func init() { + serverless.Register(&nodejsServerless{}, ".ts") +} diff --git a/cli/serverless/nodejs/templates/wrapper_ts.tmpl b/cli/serverless/nodejs/templates/wrapper_ts.tmpl new file mode 100644 index 000000000..2e52883b8 --- /dev/null +++ b/cli/serverless/nodejs/templates/wrapper_ts.tmpl @@ -0,0 +1,62 @@ +import { join } from 'path' +import { + _genTools, + _readSFNData, + _writeSFNData, + _writeSFNHeader, + _FunctionCall, + _createConnection, + withTimeout, +} from '@yomo/sfn' +import { description, tag, handler } from '{{ .FileName }}' + +const WORK_DIR = '{{ .WorkDir }}' +const FUNCTION_NAME = '{{ .FunctionName }}' +const SFN_FILE_PATH = '{{ .FilePath }}' +const SOCK_PATH = join(WORK_DIR, 'sfn.sock'); +const REDUCE_TAG = 0xe001; + +const TIMEOUT_DURATION = 1000 * 60 + +function run() { + if ( + !description || + !handler || + tag === undefined || + tag === null + ) { + throw Error('description, tags, handler signature must be exported!') + } + const tools = _genTools(FUNCTION_NAME, description, SFN_FILE_PATH) + const header = JSON.stringify({ + tags: [tag], + function_definition: JSON.stringify(tools, null, 2) + }) + const { conn } = _createConnection(SOCK_PATH, { + onConnect: () => { + _writeSFNHeader(conn, header) + }, + onData: async (buf: Buffer) => { + const { tag, data } = _readSFNData(buf) + const fc = new _FunctionCall(data) + const args = fc.readLLMArguments() + let result: any + if (typeof handler === 'function') { + try { + result = await withTimeout(handler(args), TIMEOUT_DURATION) + } catch (error) { + if (error instanceof Error && error.message === 'timeout') { + fc.writeLLMResult('timeout in this function calling, you should ignore this.') + _writeSFNData(conn, REDUCE_TAG, JSON.stringify(fc.data)) + } + return + } + } + if (!result) return + fc.writeLLMResult(JSON.stringify(result)) + _writeSFNData(conn, REDUCE_TAG, JSON.stringify(fc.data)) + } + }) +} + +run() \ No newline at end of file diff --git a/cli/serverless/wasm/serverless.go b/cli/serverless/wasm/serverless.go index 395fc8d6e..812af501e 100644 --- a/cli/serverless/wasm/serverless.go +++ b/cli/serverless/wasm/serverless.go @@ -14,6 +14,7 @@ import ( // wasmServerless will run serverless functions from the given compiled WebAssembly files. type wasmServerless struct { + runtimeName string runtime Runtime name string zipperAddr string @@ -35,6 +36,7 @@ func (s *wasmServerless) Init(opts *cli.Options) error { return err } + s.runtimeName = opts.Runtime s.runtime = runtime s.name = opts.Name s.zipperAddr = opts.ZipperAddr @@ -47,6 +49,11 @@ func (s *wasmServerless) Init(opts *cli.Options) error { // Build is an empty implementation func (s *wasmServerless) Build(clean bool) error { + wasmRuntime := s.runtimeName + if wasmRuntime == "" { + wasmRuntime = "wazero" + } + pkglog.InfoStatusEvent(os.Stdout, "WASM runtime: %s", wasmRuntime) return nil } diff --git a/example/8-deno/.env b/example/8-deno/.env deleted file mode 100644 index 51b318d13..000000000 --- a/example/8-deno/.env +++ /dev/null @@ -1 +0,0 @@ -YOMO_SFN_NAME=upper diff --git a/example/8-deno/README.md b/example/8-deno/README.md deleted file mode 100644 index 7bec5e358..000000000 --- a/example/8-deno/README.md +++ /dev/null @@ -1,34 +0,0 @@ -# Implementing YoMo Stream Function using Deno - -Nowadays, JavaScript (or TypeScript) is one of the most popular programming -languages. It's easy to learn, yet still powerful, thus suitable for serverless -mode. YoMo has integrated the Deno runtime for developers to implement JS/TS -serverless functions. - -## Install Deno runtime - -https://deno.land/#installation - -## Run the demo example - -- Write your own serverless function by reference to [app.ts](app.ts) - -- Start YoMo zipper - - ```sh - yomo serve -c ../uppercase/config.yaml - ``` - -- Start the JS/TS serverless function - - ```sh - yomo run app.ts - ``` - -- Start Source & Sink - - ```sh - cd ../uppercase/source - - go run main.go - ``` diff --git a/example/8-deno/Taskfile.yaml b/example/8-deno/Taskfile.yaml deleted file mode 100644 index 39355dff5..000000000 --- a/example/8-deno/Taskfile.yaml +++ /dev/null @@ -1,45 +0,0 @@ -# https://taskfile.dev - -version: "3" - -output: "prefixed" - -tasks: - run: - desc: run - deps: [zipper, source, sfn] - cmds: - - echo 'deno example run' - silent: true - - clean: - desc: clean - cmds: - - rm -rf ./bin - - source: - desc: run source - deps: [source-build] - cmds: - - "../uppercase/bin/source{{exeExt}}" - env: - YOMO_LOG_LEVEL: error - - source-build: - desc: build source - cmds: - - "go build -o ../uppercase/bin/source{{exeExt}} ../uppercase/source/main.go" - internal: true - - sfn: - desc: run sfn - cmds: - - "yomo run app.ts" - - zipper: - desc: run zipper - cmds: - - "yomo serve -c ../config.yaml" - env: - YOMO_LOG_LEVEL: error - YOMO_ADDR: "localhost:9000" diff --git a/example/8-deno/app.ts b/example/8-deno/app.ts deleted file mode 100644 index ca88c9514..000000000 --- a/example/8-deno/app.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { Context, run } from "../../cli/serverless/deno/mod/mod.ts"; -// In your own applications change the above line to: -// import { Context, run } from "https://deno.land/x/yomo/mod.ts"; - -const enc = new TextEncoder(); -const dec = new TextDecoder(); - -async function handler(ctx: Context) { - console.log( - "deno sfn received %d bytes with tag[%d]", - ctx.input.length, - ctx.tag, - ); - const output = dec.decode(ctx.input).toUpperCase(); - await ctx.write(0x34, enc.encode(output)); -} - -await run([0x33], handler); diff --git a/example/8-nodejs/.gitignore b/example/8-nodejs/.gitignore new file mode 100644 index 000000000..ac134725d --- /dev/null +++ b/example/8-nodejs/.gitignore @@ -0,0 +1,4 @@ +node_modules +.wrapper.ts +.wrapper.js +app.js \ No newline at end of file diff --git a/example/8-nodejs/README.md b/example/8-nodejs/README.md new file mode 100644 index 000000000..d3893501c --- /dev/null +++ b/example/8-nodejs/README.md @@ -0,0 +1,50 @@ +# Implementing YoMo Stream Function using Deno + +Nowadays, TypeScript is one of the most popular programming +languages. It's easy to learn, yet still powerful, thus suitable for serverless +mode. YoMo has integrated the Nodejs runtime for developers to implement TS +serverless functions. + +## Install Nodejs runtime + +https://nodejs.org/en/download/package-manager + +## Run the demo example + +- Write your own serverless function by reference to [app.ts](app.ts) + +- Start YoMo zipper + + ```sh + yomo serve -c ../10-ai/zipper.yaml + ``` + +- Start sfn + + ```sh + yomo run app.ts + ``` + +- Invoke the LLM Function + +```sh +$ curl -i http://127.0.0.1:8000/v1/chat/completions -H "Content-Type: application/json" -d '{ + "messages": [ + { + "role": "system", + "content": "You are a test assistant." + }, + { + "role": "user", + "content": "How is the weather in Korea and UK today?" + } + ], + "stream": false +}' +HTTP/1.1 200 OK +Content-Type: application/json +Date: Mon, 04 Nov 2024 05:10:24 GMT +Content-Length: 824 + +{"id":"chatcmpl-APje3REU3DvpZ9yANpLIpLf6uyWNk","object":"chat.completion","created":1730697023,"model":"gpt-4o-2024-08-06","choices":[{"index":0,"message":{"role":"assistant","content":"Today, the weather in Seoul, Korea is quite warm with a temperature of 33°C. In London, UK, it's cooler with a temperature of 19°C."},"finish_reason":"stop","content_filter_results":{"hate":{"filtered":false},"self_harm":{"filtered":false},"sexual":{"filtered":false},"violence":{"filtered":false},"jailbreak":{"filtered":false,"detected":false},"profanity":{"filtered":false,"detected":false}}}],"usage":{"prompt_tokens":174,"completion_tokens":81,"total_tokens":187,"prompt_tokens_details":{"audio_tokens":0,"cached_tokens":0},"completion_tokens_details":{"audio_tokens":0,"reasoning_tokens":0}},"system_fingerprint":"fp_159d8341cc"} +``` \ No newline at end of file diff --git a/example/8-nodejs/app.ts b/example/8-nodejs/app.ts new file mode 100644 index 000000000..043f9a8f6 --- /dev/null +++ b/example/8-nodejs/app.ts @@ -0,0 +1,25 @@ +export const description = 'Get the current weather for `city_name`' + +export const tag = 0x33 + +// For jsonschema in TypeScript, see: https://github.com/YousefED/typescript-json-schema +export type Argument = { + /** + * The name of the city to be queried + */ + city_name: string; +} + +async function getWeather(city_name: string) { + console.log(`get weather for ${city_name}`) + await sleep(3000) + return { city_name: city_name, temperature: Math.floor(Math.random() * 41) } +} +export async function handler(args: Argument) { + const result = await getWeather(args.city_name) + return result +} + +function sleep(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +} \ No newline at end of file diff --git a/example/8-nodejs/package.json b/example/8-nodejs/package.json new file mode 100644 index 000000000..fdba913cb --- /dev/null +++ b/example/8-nodejs/package.json @@ -0,0 +1,19 @@ +{ + "name": "llm-sfn-get-weather-ts", + "version": "1.0.0", + "description": "", + "main": ".wrapper.js", + "scripts": { + "test": "ts-node .wrapper.ts" + }, + "keywords": [], + "author": "", + "license": "ISC", + "dependencies": { + "@yomo/sfn": "1.0.5" + }, + "devDependencies": { + "@types/node": "^22.8.2", + "ts-node": "^10.9.2" + } +} \ No newline at end of file diff --git a/example/8-nodejs/pnpm-lock.yaml b/example/8-nodejs/pnpm-lock.yaml new file mode 100644 index 000000000..6b7825bef --- /dev/null +++ b/example/8-nodejs/pnpm-lock.yaml @@ -0,0 +1,438 @@ +lockfileVersion: '9.0' + +settings: + autoInstallPeers: true + excludeLinksFromLockfile: false + +importers: + + .: + dependencies: + '@yomo/sfn': + specifier: 1.0.5 + version: 1.0.5 + devDependencies: + '@types/node': + specifier: ^22.8.2 + version: 22.8.2 + ts-node: + specifier: ^10.9.2 + version: 10.9.2(@types/node@22.8.2)(typescript@5.5.4) + +packages: + + '@cspotcode/source-map-support@0.8.1': + resolution: {integrity: sha512-IchNf6dN4tHoMFIn/7OE8LWZ19Y6q/67Bmf6vnGREv8RSbBVb9LPJxEcnwrcwX6ixSvaiGoomAUvu4YSxXrVgw==} + engines: {node: '>=12'} + + '@jridgewell/resolve-uri@3.1.2': + resolution: {integrity: sha512-bRISgCIjP20/tbWSPWMEi54QVPRZExkuD9lJL+UIxUKtwVJA8wW1Trb1jMs1RFXo1CBTNZ/5hpC9QvmKWdopKw==} + engines: {node: '>=6.0.0'} + + '@jridgewell/sourcemap-codec@1.5.0': + resolution: {integrity: sha512-gv3ZRaISU3fjPAgNsriBRqGWQL6quFx04YMPW/zD8XMLsU32mhCCbfbO6KZFLjvYpCZ8zyDEgqsgf+PwPaM7GQ==} + + '@jridgewell/trace-mapping@0.3.9': + resolution: {integrity: sha512-3Belt6tdc8bPgAtbcmdtNJlirVoTmEb5e2gC94PnkwEW9jI6CAHUeoG85tjWP5WquqfavoMtMwiG4P926ZKKuQ==} + + '@tsconfig/node10@1.0.11': + resolution: {integrity: sha512-DcRjDCujK/kCk/cUe8Xz8ZSpm8mS3mNNpta+jGCA6USEDfktlNvm1+IuZ9eTcDbNk41BHwpHHeW+N1lKCz4zOw==} + + '@tsconfig/node12@1.0.11': + resolution: {integrity: sha512-cqefuRsh12pWyGsIoBKJA9luFu3mRxCA+ORZvA4ktLSzIuCUtWVxGIuXigEwO5/ywWFMZ2QEGKWvkZG1zDMTag==} + + '@tsconfig/node14@1.0.3': + resolution: {integrity: sha512-ysT8mhdixWK6Hw3i1V2AeRqZ5WfXg1G43mqoYlM2nc6388Fq5jcXyr5mRsqViLx/GJYdoL0bfXD8nmF+Zn/Iow==} + + '@tsconfig/node16@1.0.4': + resolution: {integrity: sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==} + + '@types/json-schema@7.0.15': + resolution: {integrity: sha512-5+fP8P8MFNC+AyZCDxrB2pkZFPGzqQWUzpSeuuVLvm8VMcorNYavBqoFcxK8bQz4Qsbn4oUEEem4wDLfcysGHA==} + + '@types/node@18.19.62': + resolution: {integrity: sha512-UOGhw+yZV/icyM0qohQVh3ktpY40Sp7tdTW7HxG3pTd7AiMrlFlAJNUrGK9t5mdW0+ViQcFV74zCSIx9ZJpncA==} + + '@types/node@22.8.2': + resolution: {integrity: sha512-NzaRNFV+FZkvK/KLCsNdTvID0SThyrs5SHB6tsD/lajr22FGC73N2QeDPM2wHtVde8mgcXuSsHQkH5cX1pbPLw==} + + '@yomo/sfn@1.0.5': + resolution: {integrity: sha512-SvHQlLFKM7En88V4Cql7yJS63OmuFjU4im4sRFYTQP+H8nxL0TKu1wZQ3icxW3BW5Iy1F4I73XN51W0/P/Z8vg==} + + acorn-walk@8.3.4: + resolution: {integrity: sha512-ueEepnujpqee2o5aIYnvHU6C0A42MNdsIDeqy5BydrkuC5R1ZuUFnm27EeFJGoEHJQgn3uleRvmTXaJgfXbt4g==} + engines: {node: '>=0.4.0'} + + acorn@8.14.0: + resolution: {integrity: sha512-cl669nCJTZBsL97OF4kUQm5g5hC2uihk0NxY3WENAC0TYdILVkAyHymAntgxGkl7K+t0cXIrH5siy5S4XkFycA==} + engines: {node: '>=0.4.0'} + hasBin: true + + ansi-regex@5.0.1: + resolution: {integrity: sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==} + engines: {node: '>=8'} + + ansi-styles@4.3.0: + resolution: {integrity: sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==} + engines: {node: '>=8'} + + arg@4.1.3: + resolution: {integrity: sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==} + + balanced-match@1.0.2: + resolution: {integrity: sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==} + + brace-expansion@1.1.11: + resolution: {integrity: sha512-iCuPHDFgrHX7H2vEI/5xpz07zSHB00TpugqhmYtVmMO6518mCuRMoOYFldEBl0g187ufozdaHgWKcYFb61qGiA==} + + cliui@8.0.1: + resolution: {integrity: sha512-BSeNnyus75C4//NQ9gQt1/csTXyo/8Sb+afLAkzAptFuMsod9HFokGNudZpi/oQV73hnVK+sR+5PVRMd+Dr7YQ==} + engines: {node: '>=12'} + + color-convert@2.0.1: + resolution: {integrity: sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==} + engines: {node: '>=7.0.0'} + + color-name@1.1.4: + resolution: {integrity: sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==} + + concat-map@0.0.1: + resolution: {integrity: sha512-/Srv4dswyQNBfohGpz9o6Yb3Gz3SrUDqBH5rTuhGR7ahtlbYKnVxw2bCFMRljaA7EXHaXZ8wsHdodFvbkhKmqg==} + + create-require@1.1.1: + resolution: {integrity: sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==} + + diff@4.0.2: + resolution: {integrity: sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==} + engines: {node: '>=0.3.1'} + + emoji-regex@8.0.0: + resolution: {integrity: sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==} + + escalade@3.2.0: + resolution: {integrity: sha512-WUj2qlxaQtO4g6Pq5c29GTcWGDyd8itL8zTlipgECz3JesAiiOKotd8JU6otB3PACgG6xkJUyVhboMS+bje/jA==} + engines: {node: '>=6'} + + fs.realpath@1.0.0: + resolution: {integrity: sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==} + + get-caller-file@2.0.5: + resolution: {integrity: sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg==} + engines: {node: 6.* || 8.* || >= 10.*} + + glob@7.2.3: + resolution: {integrity: sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q==} + deprecated: Glob versions prior to v9 are no longer supported + + inflight@1.0.6: + resolution: {integrity: sha512-k92I/b08q4wvFscXCLvqfsHCrjrF7yiXsQuIVvVE7N82W3+aqpzuUdBbfhWcy/FZR3/4IgflMgKLOsvPDrGCJA==} + deprecated: This module is not supported, and leaks memory. Do not use it. Check out lru-cache if you want a good and tested way to coalesce async requests by a key value, which is much more comprehensive and powerful. + + inherits@2.0.4: + resolution: {integrity: sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==} + + is-fullwidth-code-point@3.0.0: + resolution: {integrity: sha512-zymm5+u+sCsSWyD9qNaejV3DFvhCKclKdizYaJUuHA83RLjb7nSuGnddCHGv0hk+KY7BMAlsWeK4Ueg6EV6XQg==} + engines: {node: '>=8'} + + make-error@1.3.6: + resolution: {integrity: sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==} + + minimatch@3.1.2: + resolution: {integrity: sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==} + + once@1.4.0: + resolution: {integrity: sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==} + + path-equal@1.2.5: + resolution: {integrity: sha512-i73IctDr3F2W+bsOWDyyVm/lqsXO47aY9nsFZUjTT/aljSbkxHxxCoyZ9UUrM8jK0JVod+An+rl48RCsvWM+9g==} + + path-is-absolute@1.0.1: + resolution: {integrity: sha512-AVbw3UJ2e9bq64vSaS9Am0fje1Pa8pbGqTTsmXfaIiMpnr5DlDhfJOuLj9Sf95ZPVDAUerDfEk88MPmPe7UCQg==} + engines: {node: '>=0.10.0'} + + require-directory@2.1.1: + resolution: {integrity: sha512-fGxEI7+wsG9xrvdjsrlmL22OMTTiHRwAMroiEeMgq8gzoLC/PQr7RsRDSTLUg/bZAZtF+TVIkHc6/4RIKrui+Q==} + engines: {node: '>=0.10.0'} + + safe-stable-stringify@2.5.0: + resolution: {integrity: sha512-b3rppTKm9T+PsVCBEOUR46GWI7fdOs00VKZ1+9c1EWDaDMvjQc6tUwuFyIprgGgTcWoVHSKrU8H31ZHA2e0RHA==} + engines: {node: '>=10'} + + string-width@4.2.3: + resolution: {integrity: sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==} + engines: {node: '>=8'} + + strip-ansi@6.0.1: + resolution: {integrity: sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==} + engines: {node: '>=8'} + + ts-node@10.9.2: + resolution: {integrity: sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==} + hasBin: true + peerDependencies: + '@swc/core': '>=1.2.50' + '@swc/wasm': '>=1.2.50' + '@types/node': '*' + typescript: '>=2.7' + peerDependenciesMeta: + '@swc/core': + optional: true + '@swc/wasm': + optional: true + + typescript-json-schema@0.65.1: + resolution: {integrity: sha512-tuGH7ff2jPaUYi6as3lHyHcKpSmXIqN7/mu50x3HlYn0EHzLpmt3nplZ7EuhUkO0eqDRc9GqWNkfjgBPIS9kxg==} + hasBin: true + + typescript@5.5.4: + resolution: {integrity: sha512-Mtq29sKDAEYP7aljRgtPOpTvOfbwRWlS6dPRzwjdE+C0R4brX/GUyhHSecbHMFLNBLcJIPt9nl9yG5TZ1weH+Q==} + engines: {node: '>=14.17'} + hasBin: true + + undici-types@5.26.5: + resolution: {integrity: sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==} + + undici-types@6.19.8: + resolution: {integrity: sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==} + + v8-compile-cache-lib@3.0.1: + resolution: {integrity: sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==} + + wrap-ansi@7.0.0: + resolution: {integrity: sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==} + engines: {node: '>=10'} + + wrappy@1.0.2: + resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} + + y18n@5.0.8: + resolution: {integrity: sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==} + engines: {node: '>=10'} + + yargs-parser@21.1.1: + resolution: {integrity: sha512-tVpsJW7DdjecAiFpbIB1e3qxIQsE6NoPc5/eTdrbbIC4h0LVsWhnoa3g+m2HclBIujHzsxZ4VJVA+GUuc2/LBw==} + engines: {node: '>=12'} + + yargs@17.7.2: + resolution: {integrity: sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w==} + engines: {node: '>=12'} + + yn@3.1.1: + resolution: {integrity: sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==} + engines: {node: '>=6'} + +snapshots: + + '@cspotcode/source-map-support@0.8.1': + dependencies: + '@jridgewell/trace-mapping': 0.3.9 + + '@jridgewell/resolve-uri@3.1.2': {} + + '@jridgewell/sourcemap-codec@1.5.0': {} + + '@jridgewell/trace-mapping@0.3.9': + dependencies: + '@jridgewell/resolve-uri': 3.1.2 + '@jridgewell/sourcemap-codec': 1.5.0 + + '@tsconfig/node10@1.0.11': {} + + '@tsconfig/node12@1.0.11': {} + + '@tsconfig/node14@1.0.3': {} + + '@tsconfig/node16@1.0.4': {} + + '@types/json-schema@7.0.15': {} + + '@types/node@18.19.62': + dependencies: + undici-types: 5.26.5 + + '@types/node@22.8.2': + dependencies: + undici-types: 6.19.8 + + '@yomo/sfn@1.0.5': + dependencies: + typescript-json-schema: 0.65.1 + transitivePeerDependencies: + - '@swc/core' + - '@swc/wasm' + + acorn-walk@8.3.4: + dependencies: + acorn: 8.14.0 + + acorn@8.14.0: {} + + ansi-regex@5.0.1: {} + + ansi-styles@4.3.0: + dependencies: + color-convert: 2.0.1 + + arg@4.1.3: {} + + balanced-match@1.0.2: {} + + brace-expansion@1.1.11: + dependencies: + balanced-match: 1.0.2 + concat-map: 0.0.1 + + cliui@8.0.1: + dependencies: + string-width: 4.2.3 + strip-ansi: 6.0.1 + wrap-ansi: 7.0.0 + + color-convert@2.0.1: + dependencies: + color-name: 1.1.4 + + color-name@1.1.4: {} + + concat-map@0.0.1: {} + + create-require@1.1.1: {} + + diff@4.0.2: {} + + emoji-regex@8.0.0: {} + + escalade@3.2.0: {} + + fs.realpath@1.0.0: {} + + get-caller-file@2.0.5: {} + + glob@7.2.3: + dependencies: + fs.realpath: 1.0.0 + inflight: 1.0.6 + inherits: 2.0.4 + minimatch: 3.1.2 + once: 1.4.0 + path-is-absolute: 1.0.1 + + inflight@1.0.6: + dependencies: + once: 1.4.0 + wrappy: 1.0.2 + + inherits@2.0.4: {} + + is-fullwidth-code-point@3.0.0: {} + + make-error@1.3.6: {} + + minimatch@3.1.2: + dependencies: + brace-expansion: 1.1.11 + + once@1.4.0: + dependencies: + wrappy: 1.0.2 + + path-equal@1.2.5: {} + + path-is-absolute@1.0.1: {} + + require-directory@2.1.1: {} + + safe-stable-stringify@2.5.0: {} + + string-width@4.2.3: + dependencies: + emoji-regex: 8.0.0 + is-fullwidth-code-point: 3.0.0 + strip-ansi: 6.0.1 + + strip-ansi@6.0.1: + dependencies: + ansi-regex: 5.0.1 + + ts-node@10.9.2(@types/node@18.19.62)(typescript@5.5.4): + dependencies: + '@cspotcode/source-map-support': 0.8.1 + '@tsconfig/node10': 1.0.11 + '@tsconfig/node12': 1.0.11 + '@tsconfig/node14': 1.0.3 + '@tsconfig/node16': 1.0.4 + '@types/node': 18.19.62 + acorn: 8.14.0 + acorn-walk: 8.3.4 + arg: 4.1.3 + create-require: 1.1.1 + diff: 4.0.2 + make-error: 1.3.6 + typescript: 5.5.4 + v8-compile-cache-lib: 3.0.1 + yn: 3.1.1 + + ts-node@10.9.2(@types/node@22.8.2)(typescript@5.5.4): + dependencies: + '@cspotcode/source-map-support': 0.8.1 + '@tsconfig/node10': 1.0.11 + '@tsconfig/node12': 1.0.11 + '@tsconfig/node14': 1.0.3 + '@tsconfig/node16': 1.0.4 + '@types/node': 22.8.2 + acorn: 8.14.0 + acorn-walk: 8.3.4 + arg: 4.1.3 + create-require: 1.1.1 + diff: 4.0.2 + make-error: 1.3.6 + typescript: 5.5.4 + v8-compile-cache-lib: 3.0.1 + yn: 3.1.1 + + typescript-json-schema@0.65.1: + dependencies: + '@types/json-schema': 7.0.15 + '@types/node': 18.19.62 + glob: 7.2.3 + path-equal: 1.2.5 + safe-stable-stringify: 2.5.0 + ts-node: 10.9.2(@types/node@18.19.62)(typescript@5.5.4) + typescript: 5.5.4 + yargs: 17.7.2 + transitivePeerDependencies: + - '@swc/core' + - '@swc/wasm' + + typescript@5.5.4: {} + + undici-types@5.26.5: {} + + undici-types@6.19.8: {} + + v8-compile-cache-lib@3.0.1: {} + + wrap-ansi@7.0.0: + dependencies: + ansi-styles: 4.3.0 + string-width: 4.2.3 + strip-ansi: 6.0.1 + + wrappy@1.0.2: {} + + y18n@5.0.8: {} + + yargs-parser@21.1.1: {} + + yargs@17.7.2: + dependencies: + cliui: 8.0.1 + escalade: 3.2.0 + get-caller-file: 2.0.5 + require-directory: 2.1.1 + string-width: 4.2.3 + y18n: 5.0.8 + yargs-parser: 21.1.1 + + yn@3.1.1: {} diff --git a/pkg/wrapper/wrapper.go b/pkg/wrapper/wrapper.go new file mode 100644 index 000000000..20e9dd89c --- /dev/null +++ b/pkg/wrapper/wrapper.go @@ -0,0 +1,206 @@ +package wrapper + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "io" + "net" + "os" + "path/filepath" + "sync" + + "github.com/yomorun/yomo" + "github.com/yomorun/yomo/serverless" +) + +// SFNWrapper defines serverless function wrapper, you can implement this interface to wrap any +// runtime as a serverless function to run on Yomo. +type SFNWrapper interface { + // WorkDir returns the working directory of the serverless function to build and run. + WorkDir() string + // Build defines how to build the serverless function. + Build(env []string) error + // Run defines how to run the serverless function. + Run(env []string) error +} + +// BuildAndRun builds and runs the serverless function. +func BuildAndRun(name, zipperAddr, credential string, wrapper SFNWrapper, env []string) error { + if err := wrapper.Build(env); err != nil { + return err + } + return Run(name, zipperAddr, credential, wrapper, env) +} + +// Run runs the serverless function. +func Run(name, zipperAddr, credential string, wrapper SFNWrapper, env []string) error { + sockPath := filepath.Join(wrapper.WorkDir(), "sfn.sock") + _ = os.Remove(sockPath) + + addr, err := net.ResolveUnixAddr("unix", sockPath) + if err != nil { + return err + } + + listener, err := net.ListenUnix("unix", addr) + if err != nil { + return err + } + defer listener.Close() + + errch := make(chan error) + + go func() { + if err := wrapper.Run(env); err != nil { + errch <- err + } + }() + + go func() { + conn, err := listener.Accept() + if err != nil { + errch <- err + return + } + + headerBytes, err := readHeader(conn) + if err != nil { + errch <- err + return + } + + header := &header{} + err = json.Unmarshal(headerBytes, header) + if err != nil { + errch <- err + return + } + + fd := &functionDefinition{} + err = json.Unmarshal([]byte(header.FunctionDefinition), fd) + if err != nil || fd.Name == "" { + errch <- errors.New("invalid jsonschema, please check your jsonschema file") + return + } + + err = serveSFN(name, zipperAddr, credential, header.FunctionDefinition, header.Tags, conn) + errch <- err + }() + + return <-errch +} + +func serveSFN(name, zipperAddr, credential, functionDefinition string, tags []uint32, conn io.ReadWriter) error { + sfn := yomo.NewStreamFunction( + name, + zipperAddr, + yomo.WithSfnReConnect(), + yomo.WithSfnCredential(credential), + yomo.WithAIFunctionJsonDefinition(functionDefinition), + ) + + var once sync.Once + + sfn.SetObserveDataTags(tags...) + sfn.SetHandler(func(ctx serverless.Context) { + var ( + tag = ctx.Tag() + data = ctx.Data() + ) + + WriteTagData(conn, tag, data) + + once.Do(func() { + go func() { + for { + tag, data, err := ReadTagData(conn) + if err == io.EOF { + return + } + _ = ctx.Write(tag, data) + } + }() + }) + }) + + if err := sfn.Connect(); err != nil { + return err + } + + defer sfn.Close() + + sfn.Wait() + + return errors.New("sfn exited") +} + +// ReadTagData reads tag and data from the socket stream. +func ReadTagData(r io.Reader) (uint32, []byte, error) { + var tag uint32 + if err := binary.Read(r, binary.LittleEndian, &tag); err != nil { + return 0, nil, err + } + + lengthBytes := make([]byte, 4) + if err := binary.Read(r, binary.LittleEndian, &lengthBytes); err != nil { + return 0, nil, err + } + + data := make([]byte, binary.LittleEndian.Uint32(lengthBytes)) + if _, err := io.ReadFull(r, data); err != nil { + return 0, nil, err + } + + return tag, data, nil +} + +// WriteTagData writes tag and data to the socket stream. +func WriteTagData(w io.Writer, tag uint32, data []byte) error { + buf := bytes.NewBuffer(nil) + err := binary.Write(buf, binary.LittleEndian, tag) + if err != nil { + return err + } + + err = binary.Write(buf, binary.LittleEndian, uint32(len(data))) + if err != nil { + return err + } + + _, err = buf.Write(data) + if err != nil { + return err + } + + _, err = w.Write(buf.Bytes()) + if err != nil { + return err + } + + return nil +} + +type header struct { + Tags []uint32 `json:"tags"` + FunctionDefinition string `json:"function_definition"` +} + +func readHeader(conn io.Reader) ([]byte, error) { + len := make([]byte, 4) + if err := binary.Read(conn, binary.LittleEndian, &len); err != nil { + return nil, err + } + + title := make([]byte, binary.LittleEndian.Uint32(len)) + if _, err := io.ReadFull(conn, title); err != nil { + return nil, err + } + + return title, nil +} + +type functionDefinition struct { + Name string `json:"name,omitempty"` +}