Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added plugins feature that allows separate binaries as processes #58

Open
wants to merge 3 commits into
base: parser
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (f *Factory) Create(componentName string) (interface{}, error) {
return nil, fmt.Errorf("Factory error: component '%s' does not exist", componentName)
}

//Size returns number of objects in factory
func (f *Factory) Size() int {
return len(f.registry)
}

// // UpdateComponentInfo extracts run-time information about a
// // component and its ports. It is called when an FBP protocol client
// // requests component information.
Expand Down
5 changes: 5 additions & 0 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func (n *Graph) Add(name string, c interface{}) error {
return nil
}

// Get a component by name
func (n *Graph) Get(name string) interface{} {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is ambiguous. I'd suggest calling it GetProcess() instead.

return n.procs[name]
}

// AddGraph adds a new blank graph instance to a network. That instance can
// be modified then at run-time.
func (n *Graph) AddGraph(name string) error {
Expand Down
133 changes: 65 additions & 68 deletions loader.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// +build ignore

package goflow

import (
"encoding/json"
"fmt"
"io/ioutil"
"reflect"
"strings"
Expand All @@ -20,6 +19,7 @@ type graphDescription struct {
Sync bool `json:",omitempty"`
PoolSize int64 `json:",omitempty"`
} `json:",omitempty"`
Parameters map[string]interface{} `json:",omitempty"`
}
Connections []struct {
Data interface{} `json:",omitempty"`
Expand All @@ -43,98 +43,95 @@ type graphDescription struct {

// ParseJSON converts a JSON network definition string into
// a flow.Graph object that can be run or used in other networks
func ParseJSON(js []byte) *Graph {
func ParseJSON(js []byte, factory *Factory) *Graph {
// Parse JSON into Go struct
var descr graphDescription
err := json.Unmarshal(js, &descr)
if err != nil {
fmt.Println("Error parsing JSON", err)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clean up all debug prints before merging, and make use of error to return errors to caller.

return nil
}
// fmt.Printf("%+v\n", descr)

constructor := func() interface{} {
// Create a new Graph
net := new(Graph)
net.InitGraphState()

// Add processes to the network
for procName, procValue := range descr.Processes {
net.AddNew(procValue.Component, procName)
// Process mode detection
if procValue.Metadata.PoolSize > 0 {
proc := net.Get(procName).(*Component)
proc.Mode = ComponentModePool
proc.PoolSize = uint8(procValue.Metadata.PoolSize)
} else if procValue.Metadata.Sync {
proc := net.Get(procName).(*Component)
proc.Mode = ComponentModeSync
}
}
// Create a new Graph
//net := new(Graph)
//net.InitGraphState()
net := NewGraph()

// Add connections
for _, conn := range descr.Connections {
// Check if it is an IIP or actual connection
if conn.Data == nil {
// Add a connection
net.ConnectBuf(conn.Src.Process, conn.Src.Port, conn.Tgt.Process, conn.Tgt.Port, conn.Metadata.Buffer)
} else {
// Add an IIP
net.AddIIP(conn.Data, conn.Tgt.Process, conn.Tgt.Port)
}
// Add processes to the network
for procName, procValue := range descr.Processes {
net.AddNew(procName, procValue.Component, factory)
proc, ok := net.Get(procName).(PlugIn)
if ok {
proc.SetParams(procValue.Parameters)
}
// Process mode detection
// if procValue.Metadata.PoolSize > 0 {
// proc := net.Get(procName).(*Component)
// proc.Mode = ComponentModePool
// proc.PoolSize = uint8(procValue.Metadata.PoolSize)
// } else if procValue.Metadata.Sync {
// proc := net.Get(procName).(*Component)
// proc.Mode = ComponentModeSync
// }
}

// Add port exports
for _, export := range descr.Exports {
// Split private into proc.port
procName := export.Private[:strings.Index(export.Private, ".")]
procPort := export.Private[strings.Index(export.Private, ".")+1:]
// Try to detect port direction using reflection
procType := reflect.TypeOf(net.Get(procName)).Elem()
field, fieldFound := procType.FieldByName(procPort)
if !fieldFound {
panic("Private port '" + export.Private + "' not found")
}
if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.RecvDir) != 0 {
// It's an inport
net.MapInPort(export.Public, procName, procPort)
} else if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.SendDir) != 0 {
// It's an outport
net.MapOutPort(export.Public, procName, procPort)
} else {
// It's not a proper port
panic("Private port '" + export.Private + "' is not a valid channel")
}
// TODO add support for subgraphs
// Add connections
for _, conn := range descr.Connections {
// Check if it is an IIP or actual connection
if conn.Data == nil {
// Add a connection
net.ConnectBuf(conn.Src.Process, conn.Src.Port, conn.Tgt.Process, conn.Tgt.Port, conn.Metadata.Buffer)
} else {
// Add an IIP
net.AddIIP(conn.Tgt.Process, conn.Tgt.Port, conn.Data)
}

return net
}

// Register a component to be reused
if descr.Properties.Name != "" {
Register(descr.Properties.Name, constructor)
// Add port exports
for _, export := range descr.Exports {
// Split private into proc.port
procName := export.Private[:strings.Index(export.Private, ".")]
procPort := export.Private[strings.Index(export.Private, ".")+1:]
// Try to detect port direction using reflection
procType := reflect.TypeOf(net.Get(procName)).Elem()
field, fieldFound := procType.FieldByName(procPort)
if !fieldFound {
panic("Private port '" + export.Private + "' not found")
}
if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.RecvDir) != 0 {
// It's an inport
net.MapInPort(export.Public, procName, procPort)
} else if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.SendDir) != 0 {
// It's an outport
net.MapOutPort(export.Public, procName, procPort)
} else {
// It's not a proper port
panic("Private port '" + export.Private + "' is not a valid channel")
}
// TODO add support for subgraphs
}

return constructor().(*Graph)
return net
}

// LoadJSON loads a JSON graph definition file into
// a flow.Graph object that can be run or used in other networks
func LoadJSON(filename string) *Graph {
func LoadJSON(filename string, factory *Factory) *Graph {
js, err := ioutil.ReadFile(filename)
if err != nil {
return nil
}
return ParseJSON(js)
return ParseJSON(js, factory)
}

// RegisterJSON registers an external JSON graph definition as a component
// that can be instantiated at run-time using component Factory.
// It returns true on success or false if component name is already taken.
func RegisterJSON(componentName, filePath string) bool {
var constructor ComponentConstructor
constructor = func() interface{} {
return LoadJSON(filePath)
}
return Register(componentName, constructor)
}
// func RegisterJSON(componentName, filePath string) bool {
// var constructor ComponentConstructor
// constructor = func() interface{} {
// return LoadJSON(filePath)
// }
// return Register(componentName, constructor)
// }
5 changes: 5 additions & 0 deletions make_plugs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this file should live in test_plugins/.

rm -f $GOBIN/*.so
go install -buildmode=plugin test_plugins/Adder_goplug.go
go install -buildmode=plugin test_plugins/Plug1_goplug.go
go install -buildmode=plugin test_plugins/NGen_goplug.go
103 changes: 103 additions & 0 deletions plugins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package goflow

import (
"fmt"
"io/ioutil"
"path"
"plugin"
"strings"
)

const plugInSuffix = `_goplug.so`

//PlugIn something
type PlugIn interface {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are params mandatory for all plugins? Can't we just load any components as plugins? Forcing all plugin components adhere to this interface is quite restrictive.

Also, I have a strong feeling that Parameters is reinventing IIPs. In FBP, IIPs (initial IPs) are used to send pre-defined input to components. See graph_iip_test.go for example.

Component
SetParams(params map[string]interface{}) error
GetParams() map[string]interface{}
GetParam(param string) interface{}
}

//PlugInS something
type PlugInS struct {
params map[string]interface{}
persist bool
}

func processForever(c Component) {
for {
c.Process()
}
}

func process(c Component) {
c.Process()
}

func (s *PlugInS) Process() {
if s.persist {
processForever(s)
} else {
process(s)
}
}

func (s *PlugInS) SetParams(params map[string]interface{}) error {
s.params = params
value, ok := params["persist"].(bool)
if ok {
s.persist = value
} else {
s.persist = false
}
return nil
}

func (s *PlugInS) GetParams() map[string]interface{} {
return s.params
}

func (s *PlugInS) GetParam(param string) interface{} {
return s.params[param]
}

// LoadComponents goes through all paths, opens all plugins in those paths
// and loads them into factory
// Plugins are denoted by *_goplug.so, The filename must begin with a capitolized letter
func LoadComponents(paths []string, factory *Factory) ([]string, error) {
var loaded []string

for _, apath := range paths {
//fmt.Println("Loading plugins at", apath)
files, err := ioutil.ReadDir(apath)
if err != nil {
fmt.Printf("Path %s not found, error=%s.", apath, err.Error())
continue
}
for _, file := range files {
if strings.HasSuffix(file.Name(), plugInSuffix) {
plugpath := path.Join(apath, file.Name())
plug, err := plugin.Open(plugpath)
if err != nil {
fmt.Printf("Can't open plugin %s, error=%s.", plugpath, err.Error())
continue
}
//get name from name - _goplug.so and register with contructor
name := strings.TrimSuffix(file.Name(), plugInSuffix)
symbol, err := plug.Lookup(name)
if err != nil {
fmt.Printf("Can't find symbol %s in plugin %s, error=%s.", name, plugpath, err.Error())
continue
}
constructor := symbol.(func() (interface{}, error))
anerr := factory.Register(name, constructor)
if anerr != nil {
fmt.Printf("Failed to register plugin %s, error=%s.", plugpath, err.Error())
continue
}
loaded = append(loaded, name)
}
}
}
return loaded, nil
}
Loading