-
Notifications
You must be signed in to change notification settings - Fork 127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added plugins feature that allows separate binaries as processes #58
base: parser
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,8 @@ | ||
// +build ignore | ||
|
||
package goflow | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"io/ioutil" | ||
"reflect" | ||
"strings" | ||
|
@@ -20,6 +19,7 @@ type graphDescription struct { | |
Sync bool `json:",omitempty"` | ||
PoolSize int64 `json:",omitempty"` | ||
} `json:",omitempty"` | ||
Parameters map[string]interface{} `json:",omitempty"` | ||
} | ||
Connections []struct { | ||
Data interface{} `json:",omitempty"` | ||
|
@@ -43,98 +43,95 @@ type graphDescription struct { | |
|
||
// ParseJSON converts a JSON network definition string into | ||
// a flow.Graph object that can be run or used in other networks | ||
func ParseJSON(js []byte) *Graph { | ||
func ParseJSON(js []byte, factory *Factory) *Graph { | ||
// Parse JSON into Go struct | ||
var descr graphDescription | ||
err := json.Unmarshal(js, &descr) | ||
if err != nil { | ||
fmt.Println("Error parsing JSON", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please clean up all debug prints before merging, and make use of |
||
return nil | ||
} | ||
// fmt.Printf("%+v\n", descr) | ||
|
||
constructor := func() interface{} { | ||
// Create a new Graph | ||
net := new(Graph) | ||
net.InitGraphState() | ||
|
||
// Add processes to the network | ||
for procName, procValue := range descr.Processes { | ||
net.AddNew(procValue.Component, procName) | ||
// Process mode detection | ||
if procValue.Metadata.PoolSize > 0 { | ||
proc := net.Get(procName).(*Component) | ||
proc.Mode = ComponentModePool | ||
proc.PoolSize = uint8(procValue.Metadata.PoolSize) | ||
} else if procValue.Metadata.Sync { | ||
proc := net.Get(procName).(*Component) | ||
proc.Mode = ComponentModeSync | ||
} | ||
} | ||
// Create a new Graph | ||
//net := new(Graph) | ||
//net.InitGraphState() | ||
net := NewGraph() | ||
|
||
// Add connections | ||
for _, conn := range descr.Connections { | ||
// Check if it is an IIP or actual connection | ||
if conn.Data == nil { | ||
// Add a connection | ||
net.ConnectBuf(conn.Src.Process, conn.Src.Port, conn.Tgt.Process, conn.Tgt.Port, conn.Metadata.Buffer) | ||
} else { | ||
// Add an IIP | ||
net.AddIIP(conn.Data, conn.Tgt.Process, conn.Tgt.Port) | ||
} | ||
// Add processes to the network | ||
for procName, procValue := range descr.Processes { | ||
net.AddNew(procName, procValue.Component, factory) | ||
proc, ok := net.Get(procName).(PlugIn) | ||
if ok { | ||
proc.SetParams(procValue.Parameters) | ||
} | ||
// Process mode detection | ||
// if procValue.Metadata.PoolSize > 0 { | ||
// proc := net.Get(procName).(*Component) | ||
// proc.Mode = ComponentModePool | ||
// proc.PoolSize = uint8(procValue.Metadata.PoolSize) | ||
// } else if procValue.Metadata.Sync { | ||
// proc := net.Get(procName).(*Component) | ||
// proc.Mode = ComponentModeSync | ||
// } | ||
} | ||
|
||
// Add port exports | ||
for _, export := range descr.Exports { | ||
// Split private into proc.port | ||
procName := export.Private[:strings.Index(export.Private, ".")] | ||
procPort := export.Private[strings.Index(export.Private, ".")+1:] | ||
// Try to detect port direction using reflection | ||
procType := reflect.TypeOf(net.Get(procName)).Elem() | ||
field, fieldFound := procType.FieldByName(procPort) | ||
if !fieldFound { | ||
panic("Private port '" + export.Private + "' not found") | ||
} | ||
if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.RecvDir) != 0 { | ||
// It's an inport | ||
net.MapInPort(export.Public, procName, procPort) | ||
} else if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.SendDir) != 0 { | ||
// It's an outport | ||
net.MapOutPort(export.Public, procName, procPort) | ||
} else { | ||
// It's not a proper port | ||
panic("Private port '" + export.Private + "' is not a valid channel") | ||
} | ||
// TODO add support for subgraphs | ||
// Add connections | ||
for _, conn := range descr.Connections { | ||
// Check if it is an IIP or actual connection | ||
if conn.Data == nil { | ||
// Add a connection | ||
net.ConnectBuf(conn.Src.Process, conn.Src.Port, conn.Tgt.Process, conn.Tgt.Port, conn.Metadata.Buffer) | ||
} else { | ||
// Add an IIP | ||
net.AddIIP(conn.Tgt.Process, conn.Tgt.Port, conn.Data) | ||
} | ||
|
||
return net | ||
} | ||
|
||
// Register a component to be reused | ||
if descr.Properties.Name != "" { | ||
Register(descr.Properties.Name, constructor) | ||
// Add port exports | ||
for _, export := range descr.Exports { | ||
// Split private into proc.port | ||
procName := export.Private[:strings.Index(export.Private, ".")] | ||
procPort := export.Private[strings.Index(export.Private, ".")+1:] | ||
// Try to detect port direction using reflection | ||
procType := reflect.TypeOf(net.Get(procName)).Elem() | ||
field, fieldFound := procType.FieldByName(procPort) | ||
if !fieldFound { | ||
panic("Private port '" + export.Private + "' not found") | ||
} | ||
if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.RecvDir) != 0 { | ||
// It's an inport | ||
net.MapInPort(export.Public, procName, procPort) | ||
} else if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.SendDir) != 0 { | ||
// It's an outport | ||
net.MapOutPort(export.Public, procName, procPort) | ||
} else { | ||
// It's not a proper port | ||
panic("Private port '" + export.Private + "' is not a valid channel") | ||
} | ||
// TODO add support for subgraphs | ||
} | ||
|
||
return constructor().(*Graph) | ||
return net | ||
} | ||
|
||
// LoadJSON loads a JSON graph definition file into | ||
// a flow.Graph object that can be run or used in other networks | ||
func LoadJSON(filename string) *Graph { | ||
func LoadJSON(filename string, factory *Factory) *Graph { | ||
js, err := ioutil.ReadFile(filename) | ||
if err != nil { | ||
return nil | ||
} | ||
return ParseJSON(js) | ||
return ParseJSON(js, factory) | ||
} | ||
|
||
// RegisterJSON registers an external JSON graph definition as a component | ||
// that can be instantiated at run-time using component Factory. | ||
// It returns true on success or false if component name is already taken. | ||
func RegisterJSON(componentName, filePath string) bool { | ||
var constructor ComponentConstructor | ||
constructor = func() interface{} { | ||
return LoadJSON(filePath) | ||
} | ||
return Register(componentName, constructor) | ||
} | ||
// func RegisterJSON(componentName, filePath string) bool { | ||
// var constructor ComponentConstructor | ||
// constructor = func() interface{} { | ||
// return LoadJSON(filePath) | ||
// } | ||
// return Register(componentName, constructor) | ||
// } |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#!/bin/bash | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this file should live in |
||
rm -f $GOBIN/*.so | ||
go install -buildmode=plugin test_plugins/Adder_goplug.go | ||
go install -buildmode=plugin test_plugins/Plug1_goplug.go | ||
go install -buildmode=plugin test_plugins/NGen_goplug.go |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package goflow | ||
|
||
import ( | ||
"fmt" | ||
"io/ioutil" | ||
"path" | ||
"plugin" | ||
"strings" | ||
) | ||
|
||
const plugInSuffix = `_goplug.so` | ||
|
||
//PlugIn something | ||
type PlugIn interface { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are Also, I have a strong feeling that |
||
Component | ||
SetParams(params map[string]interface{}) error | ||
GetParams() map[string]interface{} | ||
GetParam(param string) interface{} | ||
} | ||
|
||
//PlugInS something | ||
type PlugInS struct { | ||
params map[string]interface{} | ||
persist bool | ||
} | ||
|
||
func processForever(c Component) { | ||
for { | ||
c.Process() | ||
} | ||
} | ||
|
||
func process(c Component) { | ||
c.Process() | ||
} | ||
|
||
func (s *PlugInS) Process() { | ||
if s.persist { | ||
processForever(s) | ||
} else { | ||
process(s) | ||
} | ||
} | ||
|
||
func (s *PlugInS) SetParams(params map[string]interface{}) error { | ||
s.params = params | ||
value, ok := params["persist"].(bool) | ||
if ok { | ||
s.persist = value | ||
} else { | ||
s.persist = false | ||
} | ||
return nil | ||
} | ||
|
||
func (s *PlugInS) GetParams() map[string]interface{} { | ||
return s.params | ||
} | ||
|
||
func (s *PlugInS) GetParam(param string) interface{} { | ||
return s.params[param] | ||
} | ||
|
||
// LoadComponents goes through all paths, opens all plugins in those paths | ||
// and loads them into factory | ||
// Plugins are denoted by *_goplug.so, The filename must begin with a capitolized letter | ||
func LoadComponents(paths []string, factory *Factory) ([]string, error) { | ||
var loaded []string | ||
|
||
for _, apath := range paths { | ||
//fmt.Println("Loading plugins at", apath) | ||
files, err := ioutil.ReadDir(apath) | ||
if err != nil { | ||
fmt.Printf("Path %s not found, error=%s.", apath, err.Error()) | ||
continue | ||
} | ||
for _, file := range files { | ||
if strings.HasSuffix(file.Name(), plugInSuffix) { | ||
plugpath := path.Join(apath, file.Name()) | ||
plug, err := plugin.Open(plugpath) | ||
if err != nil { | ||
fmt.Printf("Can't open plugin %s, error=%s.", plugpath, err.Error()) | ||
continue | ||
} | ||
//get name from name - _goplug.so and register with contructor | ||
name := strings.TrimSuffix(file.Name(), plugInSuffix) | ||
symbol, err := plug.Lookup(name) | ||
if err != nil { | ||
fmt.Printf("Can't find symbol %s in plugin %s, error=%s.", name, plugpath, err.Error()) | ||
continue | ||
} | ||
constructor := symbol.(func() (interface{}, error)) | ||
anerr := factory.Register(name, constructor) | ||
if anerr != nil { | ||
fmt.Printf("Failed to register plugin %s, error=%s.", plugpath, err.Error()) | ||
continue | ||
} | ||
loaded = append(loaded, name) | ||
} | ||
} | ||
} | ||
return loaded, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name is ambiguous. I'd suggest calling it
GetProcess()
instead.