Skip to content

Commit

Permalink
Merge pull request #99 from waggle-sensor/profile
Browse files Browse the repository at this point in the history
Added plugin-controller sidecar container to plugins
  • Loading branch information
gemblerz authored Jun 30, 2023
2 parents 3f5426d + 7ba66bd commit 76054ca
Show file tree
Hide file tree
Showing 17 changed files with 612 additions and 309 deletions.
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ RUN apt-get update \

ARG TARGETARCH
WORKDIR /tmp
RUN wget https://golang.org/dl/go1.19.1.linux-${TARGETARCH}.tar.gz \
&& rm -rf /usr/local/go && tar -C /usr/local -xzf go1.19.1.linux-${TARGETARCH}.tar.gz \
RUN wget https://golang.org/dl/go1.20.3.linux-${TARGETARCH}.tar.gz \
&& rm -rf /usr/local/go && tar -C /usr/local -xzf go1.20.3.linux-${TARGETARCH}.tar.gz \
&& echo "PATH=\$PATH:/usr/local/go/bin" | tee -a $HOME/.bashrc \
&& rm go1.19.1.linux-${TARGETARCH}.tar.gz
&& rm go1.20.3.linux-${TARGETARCH}.tar.gz

FROM base as builder
WORKDIR $GOPATH/src/github.com/waggle-sensor/edge-scheduler
Expand Down
28 changes: 14 additions & 14 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@ VERSION?=0.0.0
cli-all-arch: cli-linux-amd64 cli-linux-arm64 cli-darwin-amd64 cli-windows-amd64

cli-linux-arm64:
GOOS=linux GOARCH=arm64 go build -o ./out/runplugin-linux-arm64 ./cmd/runplugin
GOOS=linux GOARCH=arm64 go build -o ./out/pluginctl-linux-arm64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl
GOOS=linux GOARCH=arm64 go build -o ./out/sesctl-linux-arm64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./out/runplugin-linux-arm64 ./cmd/runplugin
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./out/pluginctl-linux-arm64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./out/sesctl-linux-arm64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl

cli-linux-amd64:
GOOS=linux GOARCH=amd64 go build -o ./out/runplugin-linux-amd64 ./cmd/runplugin
GOOS=linux GOARCH=amd64 go build -o ./out/pluginctl-linux-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl
GOOS=linux GOARCH=amd64 go build -o ./out/sesctl-linux-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./out/runplugin-linux-amd64 ./cmd/runplugin
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./out/pluginctl-linux-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./out/sesctl-linux-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl

cli-darwin-amd64:
GOOS=darwin GOARCH=amd64 go build -o ./out/runplugin-darwin-amd64 ./cmd/runplugin
GOOS=darwin GOARCH=amd64 go build -o ./out/pluginctl-darwin-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl
GOOS=darwin GOARCH=amd64 go build -o ./out/sesctl-darwin-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./out/runplugin-darwin-amd64 ./cmd/runplugin
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./out/pluginctl-darwin-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./out/sesctl-darwin-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl

cli-windows-amd64:
GOOS=windows GOARCH=amd64 go build -o ./out/runplugin-windows-amd64 ./cmd/runplugin
GOOS=windows GOARCH=amd64 go build -o ./out/pluginctl-windows-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl
GOOS=windows GOARCH=amd64 go build -o ./out/sesctl-windows-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -o ./out/runplugin-windows-amd64 ./cmd/runplugin
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -o ./out/pluginctl-windows-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -o ./out/sesctl-windows-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl

cli:
go build -o ./out/pluginctl -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl
go build -o ./out/sesctl -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl
CGO_ENABLED=0 go build -o ./out/pluginctl -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl
CGO_ENABLED=0 go build -o ./out/sesctl -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl

scheduler-all-arch: scheduler-amd64 scheduler-arm64

Expand Down
3 changes: 2 additions & 1 deletion cmd/pluginctl/cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ func init() {
flags.StringSliceVarP(&deployment.EnvVarString, "env", "e", []string{}, "Set environment variables")
flags.StringVarP(&deployment.EnvFromFile, "env-from", "", "", "Set environment variables from file")
flags.BoolVar(&deployment.DevelopMode, "develop", false, "Enable the following development time features: access to wan network")
flags.StringVar(&deployment.Type, "type", "job", "Type of the plugin. It is one of ['job', 'deployment', 'daemonset]. Default is 'job'.")
flags.StringVar(&deployment.Type, "type", "job", "Type of the plugin. It is one of ['pod', 'job', 'deployment', 'daemonset]. Default is 'pod'.")
flags.StringVar(&deployment.ResourceString, "resource", "", "Specify resource requirement for running the plugin")
flags.BoolVar(&deployment.EnablePluginController, "enable-plugin-controller", false, "Enable plugin controller supporting the plugin")
rootCmd.AddCommand(cmdDeploy)
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/runplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func runPlugin(resourceManager *nodescheduler.ResourceManager, plugin *datatype.

log.Printf("plugin name is %s", plugin.Name)

deployment, err := resourceManager.CreateDeployment(plugin)
deployment, err := resourceManager.CreateDeployment(&datatype.PluginRuntime{
Plugin: *plugin,
})
if err != nil {
return fmt.Errorf("resourceManager.CreateDeployment: %s", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/waggle-sensor/edge-scheduler

go 1.19
go 1.20

require (
github.com/boltdb/bolt v1.3.1
Expand Down
50 changes: 33 additions & 17 deletions pkg/datatype/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@ type EventBuilder struct {
type Event struct {
Type EventType
Timestamp int64
Meta map[string]string
Meta map[string]interface{}
}

func NewEventBuilder(eventType EventType) *EventBuilder {
return &EventBuilder{
e: Event{
Type: eventType,
Timestamp: time.Now().UnixNano(),
Meta: map[string]string{},
Meta: map[string]interface{}{},
},
}
}

func (eb *EventBuilder) AddValue(v interface{}) *EventBuilder {
eb.e.Meta["value"] = v
return eb
}

func (eb *EventBuilder) AddReason(reason string) *EventBuilder {
eb.e.Meta["reason"] = reason
return eb
Expand All @@ -41,7 +46,7 @@ func (eb *EventBuilder) AddJob(j *Job) *EventBuilder {
}

func (e *Event) GetJobID() string {
return e.get("job_id")
return e.get("job_id").(string)
}

func (eb *EventBuilder) AddGoal(goal *ScienceGoal) *EventBuilder {
Expand All @@ -50,7 +55,7 @@ func (eb *EventBuilder) AddGoal(goal *ScienceGoal) *EventBuilder {
return eb
}

func (eb *EventBuilder) AddEntry(k string, v string) *EventBuilder {
func (eb *EventBuilder) AddEntry(k string, v interface{}) *EventBuilder {
eb.e.Meta[k] = v
return eb
}
Expand Down Expand Up @@ -87,14 +92,17 @@ func (eb *EventBuilder) AddPodMeta(pod *apiv1.Pod) *EventBuilder {
eb.e.Meta["k3s_pod_name"] = pod.Name
eb.e.Meta["k3s_pod_status"] = string(pod.Status.Phase)
eb.e.Meta["k3s_pod_node_name"] = pod.Spec.NodeName
if v, found := pod.Labels["sagecontinuum.org/plugin-instance"]; found {
eb.e.Meta["k3s_pod_instance"] = v
}
return eb
}

func (eb *EventBuilder) Build() Event {
return eb.e
}

func (e *Event) get(name string) string {
func (e *Event) get(name string) interface{} {
if value, ok := e.Meta[name]; ok {
return value
} else {
Expand All @@ -103,22 +111,22 @@ func (e *Event) get(name string) string {
}

func (e *Event) GetGoalName() string {
return e.get("goal_name")
return e.get("goal_name").(string)
}

func (e *Event) GetGoalID() string {
return e.get("goal_id")
return e.get("goal_id").(string)
}

func (e *Event) GetPluginName() string {
return e.get("plugin_name")
return e.get("plugin_name").(string)
}

func (e *Event) GetReason() string {
return e.get("reason")
return e.get("reason").(string)
}

func (e *Event) GetEntry(k string) string {
func (e *Event) GetEntry(k string) interface{} {
return e.get(k)
}

Expand All @@ -129,7 +137,7 @@ func (e *Event) ToString() string {
func NewEventBuilderFromWaggleMessage(m *WaggleMessage) (*EventBuilder, error) {
builder := NewEventBuilder(EventType(m.Name))
builder.e.Timestamp = m.Timestamp
var body map[string]string
var body map[string]interface{}
err := json.Unmarshal([]byte(m.Value.(string)), &body)
if err != nil {
return nil, err
Expand All @@ -148,20 +156,24 @@ func NewEventBuilderFromWaggleMessage(m *WaggleMessage) (*EventBuilder, error) {
func (e *Event) ToWaggleMessage() *WaggleMessage {
// TODO: beehive-influxdb does not handle bytes so body is always string.
// This should be lifted once it accepts bytes.
encodedBody, err := e.encodeMetaToJson()
if err != nil {
logger.Debug.Printf("Failed to convert to Waggle message: %q", err.Error())
return nil
body := e.get("value")
if body == "" {
encodedBody, err := e.EncodeMetaToJson()
if err != nil {
logger.Debug.Printf("Failed to convert to Waggle message: %q", err.Error())
return nil
}
body = string(encodedBody)
}
return NewMessage(
string(e.Type),
string(encodedBody),
body,
e.Timestamp,
map[string]string{},
)
}

func (e *Event) encodeMetaToJson() ([]byte, error) {
func (e *Event) EncodeMetaToJson() ([]byte, error) {
return json.Marshal(e.Meta)
}

Expand All @@ -186,6 +198,10 @@ const (
EventPluginLastExecution EventType = "sys.scheduler.plugin.lastexecution"
EventPluginStatusFailed EventType = "sys.scheduler.status.plugin.failed"
EventFailure EventType = "sys.scheduler.failure"

EventPluginPerfCPU EventType = "sys.plugin.perf.cpu"
EventPluginPerfMem EventType = "sys.plugin.perf.mem"
EventPluginPerfGPU EventType = "sys.plugin.perf.gpu"
)

// type EventErrorCode string
Expand Down
16 changes: 12 additions & 4 deletions pkg/datatype/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,27 @@ import (
func TestEventWaggleConversion(t *testing.T) {
tests := map[string]struct {
Type string
Payload map[string]string
Payload map[string]interface{}
}{
"simple": {
Type: string(EventPluginStatusLaunched),
Payload: map[string]string{
"test": "great",
Payload: map[string]interface{}{
"test": "great",
"float": 3.14,
},
},
}
for _, test := range tests {
e := NewEventBuilder(EventType(test.Type))
for k, v := range test.Payload {
e.AddEntry(k, v)
switch v.(type) {
case string:
e.AddEntry(k, v.(string))
case int:
t.Errorf("integer type is not supported. use float instead.")
case float64:
e.AddEntry(k, v.(float64))
}
}
msg := e.Build()
waggleMsg := msg.ToWaggleMessage()
Expand Down
23 changes: 22 additions & 1 deletion pkg/datatype/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
type Plugin struct {
Name string `json:"name" yaml:"name"`
PluginSpec *PluginSpec `json:"plugin_spec" yaml:"pluginSpec,omitempty"`
DataShims []*DataShim `json:"datathims,omitempty" yaml:"datashims,omitempty"`
GoalID string `json:"goal_id,omitempty" yaml:"goalID,omitempty"`
}

Expand Down Expand Up @@ -96,6 +95,28 @@ type PluginCredential struct {
Password string `yaml:"password,omitempty"`
}

type PluginRuntime struct {
Plugin Plugin
Duration int
EnablePluginController bool
Resource Resource
}

func NewPluginRuntimeWithScienceRule(p Plugin, runtimeArgs ScienceRule) *PluginRuntime {
pr := &PluginRuntime{
Plugin: p,
}
// TODO: any runtime parameters of the plugin should be parsed and added to the runtime
// if v, found := runtimeArgs.ActionParameters["duration"]; found {
// pr.Duration
// }
return pr
}

func (pr *PluginRuntime) SetPluginController(flag bool) {
pr.EnablePluginController = flag
}

// type Plugin struct {
// Name string `yaml:"name"`
// Image string `yaml:"image"`
Expand Down
26 changes: 13 additions & 13 deletions pkg/datatype/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "sync"

type Queue struct {
mu sync.Mutex
entities []*Plugin
entities []*PluginRuntime
index int
}

Expand All @@ -18,7 +18,7 @@ func (q *Queue) More() bool {
return q.index < len(q.entities)
}

func (q *Queue) Next() *Plugin {
func (q *Queue) Next() *PluginRuntime {
q.mu.Lock()
if q.index > len(q.entities) {
return nil
Expand All @@ -32,8 +32,8 @@ func (q *Queue) Next() *Plugin {
func (q *Queue) GetPluginNames() (list []string) {
q.ResetIter()
for q.More() {
plugin := q.Next()
list = append(list, plugin.Name)
pr := q.Next()
list = append(list, pr.Plugin.Name)
}
return
}
Expand All @@ -42,8 +42,8 @@ func (q *Queue) GetGoalIDs() (list map[string]bool) {
list = make(map[string]bool)
q.ResetIter()
for q.More() {
plugin := q.Next()
list[plugin.GoalID] = true
pr := q.Next()
list[pr.Plugin.GoalID] = true
}
return
}
Expand All @@ -52,20 +52,20 @@ func (q *Queue) Length() int {
return len(q.entities)
}

func (q *Queue) Push(p *Plugin) {
func (q *Queue) Push(p *PluginRuntime) {
q.mu.Lock()
q.entities = append(q.entities, p)
q.index += 1
q.mu.Unlock()
}

func (q *Queue) Pop(p *Plugin) *Plugin {
func (q *Queue) Pop(pr *PluginRuntime) *PluginRuntime {
q.mu.Lock()
var found *Plugin
for i, _p := range q.entities {
if _p.Name == p.Name {
var found *PluginRuntime
for i, _pr := range q.entities {
if _pr.Plugin.Name == pr.Plugin.Name {
q.entities = append(q.entities[:i], q.entities[i+1:]...)
found = _p
found = _pr
q.index -= 1
break
}
Expand All @@ -74,7 +74,7 @@ func (q *Queue) Pop(p *Plugin) *Plugin {
return found
}

func (q *Queue) PopFirst() *Plugin {
func (q *Queue) PopFirst() *PluginRuntime {
if q.Length() > 0 {
return q.Pop(q.entities[0])
} else {
Expand Down
Loading

0 comments on commit 76054ca

Please sign in to comment.