Skip to content

Commit 0f6d732

Browse files
committed
Add run command
1 parent d3f1673 commit 0f6d732

File tree

6 files changed

+181
-12
lines changed

6 files changed

+181
-12
lines changed

cmd/llmflow/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ go 1.20
66

77
require (
88
github.com/RussellLuo/kun v0.4.5
9-
github.com/RussellLuo/orchestrator v0.0.0-20240406064758-314114a91fb8
9+
github.com/RussellLuo/orchestrator v0.0.0-20240406114838-81cdc4801382
1010
github.com/RussellLuo/validating/v3 v3.0.0-beta.1
1111
github.com/alecthomas/kong v0.9.0
12+
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
1213
github.com/go-aie/llmflow v0.0.0-00010101000000-000000000000
1314
github.com/go-chi/chi v4.1.2+incompatible
1415
github.com/go-chi/cors v1.2.1

cmd/llmflow/go.sum

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ github.com/RussellLuo/kun v0.4.5 h1:006wh9AhIqf/IDijB0TXju8FrlWI0UAaObRPvKZKXFM=
1616
github.com/RussellLuo/kun v0.4.5/go.mod h1:ITHYogvZMuRxT3CWEZQ7d+B6KU0wZJgVnjY74RfoUjE=
1717
github.com/RussellLuo/mapstructure v1.3.4-0.20230915005935-0ccb59b15cf2 h1:iwHtW2gVIDynzVcv+6NqhnkvhqhufmEOpbgSsxSdN68=
1818
github.com/RussellLuo/mapstructure v1.3.4-0.20230915005935-0ccb59b15cf2/go.mod h1:Wdvd26olfMFybEGBobXgoTlLLXbWmdp25yEfL3KnrMc=
19-
github.com/RussellLuo/orchestrator v0.0.0-20240406064758-314114a91fb8 h1:b55oPGHu68SFqgIHX1bXwyhEQMY/GAAUrWfVzXTE9G0=
20-
github.com/RussellLuo/orchestrator v0.0.0-20240406064758-314114a91fb8/go.mod h1:SRSHtEoCoCmDKWE6eO6nv1UJnXDmfSUytU7/diDbJcY=
19+
github.com/RussellLuo/orchestrator v0.0.0-20240406114838-81cdc4801382 h1:Ek6C+oVzWTlmMpTKftWm+Tf9mhtUH0srTN2wqUAVRQk=
20+
github.com/RussellLuo/orchestrator v0.0.0-20240406114838-81cdc4801382/go.mod h1:SRSHtEoCoCmDKWE6eO6nv1UJnXDmfSUytU7/diDbJcY=
2121
github.com/RussellLuo/structool v0.0.0-20230915010420-41b17257d885 h1:zJHKc/aA6OtUwJEMKpKYyOzZvl7DBBT72sCzOUk7H6M=
2222
github.com/RussellLuo/structool v0.0.0-20230915010420-41b17257d885/go.mod h1:WcYzut1H8YR15z6U3TUijLb+QAhTO2esip1854Aiyes=
2323
github.com/RussellLuo/structs v1.2.0 h1:rLR+opKsDCfDUwHCYG2mvFi1xUvYcWSt9kwns4e8OkU=
@@ -37,6 +37,10 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
3737
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
3838
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
3939
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
40+
github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE=
41+
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8=
42+
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
43+
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8=
4044
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
4145
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
4246
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=

cmd/llmflow/llmflow.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,9 +315,11 @@ func (lf *LLMFlow) StopActor(ctx context.Context, id string, input map[string]an
315315
return nil, nil
316316
}
317317

318-
func startServer(addr string) error {
318+
func startServer(addr string, quiet bool, stopC <-chan int) error {
319319
r := chi.NewRouter()
320-
r.Use(middleware.Logger)
320+
if !quiet {
321+
r.Use(middleware.Logger)
322+
}
321323
r.Use(cors.Handler(cors.Options{
322324
AllowedOrigins: []string{"*"},
323325
AllowedMethods: []string{"OPTIONS", "PUT", "POST", "DELETE"},
@@ -347,7 +349,9 @@ func startServer(addr string) error {
347349

348350
errs := make(chan error, 2)
349351
go func() {
350-
log.Printf("LLMFlow listening on %s\n", addr)
352+
if !quiet {
353+
log.Printf("LLMFlow listening on %s\n", addr)
354+
}
351355
errs <- http.ListenAndServe(addr, r)
352356
}()
353357
go func() {
@@ -356,6 +360,12 @@ func startServer(addr string) error {
356360
errs <- fmt.Errorf("%s", <-c)
357361
}()
358362

359-
log.Printf("LLMFlow terminated (err: %v)", <-errs)
363+
select {
364+
case err := <-errs:
365+
if !quiet {
366+
log.Printf("LLMFlow terminated (err: %v)", err)
367+
}
368+
case <-stopC:
369+
}
360370
return nil
361371
}

cmd/llmflow/main.go

Lines changed: 156 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,179 @@
11
package main
22

33
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"os"
10+
"path/filepath"
11+
"strings"
12+
13+
"github.com/RussellLuo/orchestrator"
14+
"github.com/RussellLuo/orchestrator/builtin"
415
"github.com/alecthomas/kong"
16+
"github.com/chzyer/readline"
17+
"sigs.k8s.io/yaml"
518
)
619

720
type ServeCmd struct {
821
Addr string `help:"The TCP network address to listen on." default:":8888"`
922
}
1023

1124
func (cmd *ServeCmd) Run() error {
12-
return startServer(cmd.Addr)
25+
return startServer(cmd.Addr, false, make(chan int))
1326
}
1427

15-
type RunCmd struct{
28+
type RunCmd struct {
1629
Filename string `help:"Filename or URL to a file that represents a flow." arg:""`
1730
}
1831

1932
func (cmd *RunCmd) Run() error {
33+
stopC := make(chan int)
34+
go func() {
35+
_ = startServer(":8888", true, stopC)
36+
}()
37+
38+
var f flow
39+
switch ext := filepath.Ext(cmd.Filename); ext {
40+
case ".yaml":
41+
if err := cmd.readFlow("application/yaml", cmd.Filename, &f); err != nil {
42+
return err
43+
}
44+
45+
case ".json":
46+
if err := cmd.readFlow("application/json", cmd.Filename, &f); err != nil {
47+
return err
48+
}
49+
50+
default:
51+
return fmt.Errorf("unsupported format: %q", ext)
52+
}
53+
54+
if err := cmd.addFlow(f); err != nil {
55+
return err
56+
}
57+
58+
rl, err := readline.NewEx(&readline.Config{
59+
Prompt: ">>> ",
60+
InterruptPrompt: "^C",
61+
EOFPrompt: "exit",
62+
})
63+
if err != nil {
64+
return err
65+
}
66+
defer rl.Close()
67+
//rl.CaptureExitSignal()
68+
69+
Next:
70+
for {
71+
var query string
72+
query, err = rl.Readline()
73+
if err != nil {
74+
if err == io.EOF {
75+
return nil
76+
}
77+
return err
78+
}
79+
80+
// TODO: Support messages.
81+
data := map[string]string{"query": query}
82+
body, err := json.Marshal(data)
83+
if err != nil {
84+
return err
85+
}
86+
87+
url := fmt.Sprintf("http://localhost:8888/api/flows/%s:run", f.Name)
88+
resp, err := http.Post(url, "application/json", bytes.NewBuffer(body))
89+
if err != nil {
90+
return err
91+
}
92+
defer resp.Body.Close()
93+
94+
reader := builtin.NewEventStreamReader(resp.Body, 1<<16)
95+
for {
96+
event, err := reader.ReadEvent()
97+
if err != nil {
98+
if err == io.EOF {
99+
// Reach the end of the response payload.
100+
continue Next
101+
}
102+
return err
103+
}
104+
// Show the event data.
105+
if len(event.Data) > 0 {
106+
// TODO: Support other output format?
107+
var eventData struct {
108+
Content string `json:"content"`
109+
}
110+
if err := json.Unmarshal(event.Data, &eventData); err != nil {
111+
return err
112+
}
113+
fmt.Printf(eventData.Content)
114+
}
115+
}
116+
}
117+
20118
return nil
21119
}
22120

121+
func (cmd *RunCmd) readFlow(mimetype string, filename string, f *flow) error {
122+
var data []byte
123+
if strings.HasPrefix(filename, "http://") || strings.HasPrefix(filename, "https://") {
124+
resp, err := http.Get(filename)
125+
if err != nil {
126+
return err
127+
}
128+
defer resp.Body.Close()
129+
content, err := io.ReadAll(resp.Body)
130+
if err != nil {
131+
return err
132+
}
133+
data = content
134+
} else {
135+
content, err := os.ReadFile(filename)
136+
if err != nil {
137+
return err
138+
}
139+
data = content
140+
}
141+
142+
switch mimetype {
143+
case "application/yaml":
144+
return yaml.Unmarshal(data, f)
145+
case "application/json":
146+
return json.Unmarshal(data, f)
147+
}
148+
return nil
149+
}
150+
151+
func (cmd *RunCmd) addFlow(f flow) error {
152+
data, err := json.Marshal(f)
153+
if err != nil {
154+
return err
155+
}
156+
157+
req, err := http.NewRequest("PUT", "http://localhost:8888/api/flows/"+f.Name, bytes.NewBuffer(data))
158+
if err != nil {
159+
return err
160+
}
161+
req.Header.Set("Content-Type", "application/json")
162+
163+
resp, err := http.DefaultClient.Do(req)
164+
if err != nil {
165+
return err
166+
}
167+
defer resp.Body.Close()
168+
169+
return nil
170+
}
171+
172+
type flow struct {
173+
orchestrator.TaskHeader
174+
Input map[string]any `json:"input"`
175+
}
176+
23177
var cli struct {
24178
Serve ServeCmd `cmd:"" help:"Start LLMFlow."`
25179
Run RunCmd `cmd:"" help:"Run a flow."`

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/go-aie/llmflow
33
go 1.20
44

55
require (
6-
github.com/RussellLuo/orchestrator v0.0.0-20240406064758-314114a91fb8
6+
github.com/RussellLuo/orchestrator v0.0.0-20240406114838-81cdc4801382
77
github.com/go-aie/xslices v0.0.0-20230221025134-e24f453f38b6
88
github.com/go-openapi/jsonpointer v0.20.0
99
github.com/milvus-io/milvus-sdk-go/v2 v2.3.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ github.com/PaesslerAG/jsonpath v0.1.1 h1:c1/AToHQMVsduPAa4Vh6xp2U0evy4t8SWp8imEs
1212
github.com/PaesslerAG/jsonpath v0.1.1/go.mod h1:lVboNxFGal/VwW6d9JzIy56bUsYAP6tH/x80vjnCseY=
1313
github.com/RussellLuo/mapstructure v1.3.4-0.20230915005935-0ccb59b15cf2 h1:iwHtW2gVIDynzVcv+6NqhnkvhqhufmEOpbgSsxSdN68=
1414
github.com/RussellLuo/mapstructure v1.3.4-0.20230915005935-0ccb59b15cf2/go.mod h1:Wdvd26olfMFybEGBobXgoTlLLXbWmdp25yEfL3KnrMc=
15-
github.com/RussellLuo/orchestrator v0.0.0-20240406064758-314114a91fb8 h1:b55oPGHu68SFqgIHX1bXwyhEQMY/GAAUrWfVzXTE9G0=
16-
github.com/RussellLuo/orchestrator v0.0.0-20240406064758-314114a91fb8/go.mod h1:SRSHtEoCoCmDKWE6eO6nv1UJnXDmfSUytU7/diDbJcY=
15+
github.com/RussellLuo/orchestrator v0.0.0-20240406114838-81cdc4801382 h1:Ek6C+oVzWTlmMpTKftWm+Tf9mhtUH0srTN2wqUAVRQk=
16+
github.com/RussellLuo/orchestrator v0.0.0-20240406114838-81cdc4801382/go.mod h1:SRSHtEoCoCmDKWE6eO6nv1UJnXDmfSUytU7/diDbJcY=
1717
github.com/RussellLuo/structool v0.0.0-20230915010420-41b17257d885 h1:zJHKc/aA6OtUwJEMKpKYyOzZvl7DBBT72sCzOUk7H6M=
1818
github.com/RussellLuo/structool v0.0.0-20230915010420-41b17257d885/go.mod h1:WcYzut1H8YR15z6U3TUijLb+QAhTO2esip1854Aiyes=
1919
github.com/RussellLuo/structs v1.2.0 h1:rLR+opKsDCfDUwHCYG2mvFi1xUvYcWSt9kwns4e8OkU=

0 commit comments

Comments
 (0)