From 1815f7deca8c79268b052da5b5678b7bf0780c08 Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Tue, 11 Apr 2023 13:32:53 -0500 Subject: [PATCH 01/12] Stashed changes --- Dockerfile | 6 ++-- go.mod | 2 +- pkg/datatype/plugin.go | 12 +++++++- pkg/nodescheduler/resourcemanager.go | 45 ++++++++++++++++++---------- 4 files changed, 44 insertions(+), 21 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1b79261..9ceb13a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,10 +15,10 @@ RUN apt-get update \ ARG TARGETARCH WORKDIR /tmp -RUN wget https://golang.org/dl/go1.19.1.linux-${TARGETARCH}.tar.gz \ - && rm -rf /usr/local/go && tar -C /usr/local -xzf go1.19.1.linux-${TARGETARCH}.tar.gz \ +RUN wget https://golang.org/dl/go1.20.3.linux-${TARGETARCH}.tar.gz \ + && rm -rf /usr/local/go && tar -C /usr/local -xzf go1.20.3.linux-${TARGETARCH}.tar.gz \ && echo "PATH=\$PATH:/usr/local/go/bin" | tee -a $HOME/.bashrc \ - && rm go1.19.1.linux-${TARGETARCH}.tar.gz + && rm go1.20.3.linux-${TARGETARCH}.tar.gz FROM base as builder WORKDIR $GOPATH/src/github.com/waggle-sensor/edge-scheduler diff --git a/go.mod b/go.mod index 72a6d47..d9a09a1 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/waggle-sensor/edge-scheduler -go 1.19 +go 1.20 require ( github.com/boltdb/bolt v1.3.1 diff --git a/pkg/datatype/plugin.go b/pkg/datatype/plugin.go index 862eaa5..b30ea8e 100644 --- a/pkg/datatype/plugin.go +++ b/pkg/datatype/plugin.go @@ -10,7 +10,6 @@ import ( type Plugin struct { Name string `json:"name" yaml:"name"` PluginSpec *PluginSpec `json:"plugin_spec" yaml:"pluginSpec,omitempty"` - DataShims []*DataShim `json:"datathims,omitempty" yaml:"datashims,omitempty"` GoalID string `json:"goal_id,omitempty" yaml:"goalID,omitempty"` } @@ -96,6 +95,17 @@ type PluginCredential struct { Password string `yaml:"password,omitempty"` } +type PluginRuntime struct { + Plugin *Plugin + Duration int + NeedProfile bool + Resource Resource +} + +func NewPluginRuntimeWithScienceRule(p *Plugin, runtimeArgs *ScienceRule) *PluginRuntime { + return &PluginRuntime{} +} + // type Plugin struct { // Name string `yaml:"name"` // Image string `yaml:"image"` diff --git a/pkg/nodescheduler/resourcemanager.go b/pkg/nodescheduler/resourcemanager.go index 29b4ffd..47d6f87 100644 --- a/pkg/nodescheduler/resourcemanager.go +++ b/pkg/nodescheduler/resourcemanager.go @@ -921,30 +921,30 @@ func (rm *ResourceManager) ClenUp() error { return nil } -func (rm *ResourceManager) LaunchAndWatchPlugin(plugin *datatype.Plugin) { - logger.Debug.Printf("Running plugin %q...", plugin.Name) - job, err := rm.CreateJob(plugin) +func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) { + logger.Debug.Printf("Running plugin %q...", pr.Plugin.Name) + job, err := rm.CreateJob(pr.Plugin) if err != nil { - logger.Error.Printf("Failed to create Kubernetes Job for %q: %q", plugin.Name, err.Error()) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(plugin).Build()) + logger.Error.Printf("Failed to create Kubernetes Job for %q: %q", pr.Plugin.Name, err.Error()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(pr.Plugin).Build()) return } err = rm.RunPlugin(job) defer rm.TerminateJob(job.Name) if err != nil { logger.Error.Printf("Failed to run %q: %q", job.Name, err.Error()) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(pr.Plugin).Build()) return } logger.Info.Printf("Plugin %q is scheduled", job.Name) - plugin.PluginSpec.Job = job.Name + pr.Plugin.PluginSpec.Job = job.Name // NOTE: The for loop helps to re-connect to Kubernetes watcher when the connection // gets closed while the plugin is running for { watcher, err := rm.WatchJob(job.Name, rm.Namespace, 1) if err != nil { logger.Error.Printf("Failed to watch %q. Abort the execution", job.Name) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddK3SJobMeta(job).AddPluginMeta(plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddK3SJobMeta(job).AddPluginMeta(pr.Plugin).Build()) return } chanEvent := watcher.ResultChan() @@ -955,14 +955,23 @@ func (rm *ResourceManager) LaunchAndWatchPlugin(plugin *datatype.Plugin) { case watch.Added: job := event.Object.(*batchv1.Job) pod, _ := rm.GetPod(job.Name) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusLaunched).AddK3SJobMeta(job).AddPodMeta(pod).AddPluginMeta(plugin).Build()) + var gpuMetricHostIP string + if pr.NeedProfile { + // we need to stop the timer eventually + defer resourceCollectorTicker.Stop() + } else { + // if we don't need to profile plugin we just stop it + resourceCollectorTicker.Stop() + } + gpuMetricHostIP = pod.Status.HostIP + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusLaunched).AddK3SJobMeta(job).AddPodMeta(pod).AddPluginMeta(pr.Plugin).Build()) case watch.Modified: job := event.Object.(*batchv1.Job) if len(job.Status.Conditions) > 0 { logger.Debug.Printf("Plugin %s status %s: %s", job.Name, event.Type, job.Status.Conditions[0].Type) switch job.Status.Conditions[0].Type { case batchv1.JobComplete: - eventBuilder := datatype.NewEventBuilder(datatype.EventPluginStatusComplete).AddK3SJobMeta(job).AddPluginMeta(plugin) + eventBuilder := datatype.NewEventBuilder(datatype.EventPluginStatusComplete).AddK3SJobMeta(job).AddPluginMeta(pr.Plugin) if pod, err := rm.GetPod(job.Name); err != nil { logger.Debug.Printf("Failed to get pod for job %q: %s", job.Name, err.Error()) } else { @@ -974,7 +983,7 @@ func (rm *ResourceManager) LaunchAndWatchPlugin(plugin *datatype.Plugin) { logger.Error.Printf("Plugin %q has failed.", job.Name) eventBuilder := datatype.NewEventBuilder(datatype.EventPluginStatusFailed). AddK3SJobMeta(job). - AddPluginMeta(plugin) + AddPluginMeta(pr.Plugin) if pod, err := rm.GetPod(job.Name); err != nil { logger.Error.Printf("Failed to get pod for job %q: %s", job.Name, err.Error()) } else { @@ -1000,11 +1009,11 @@ func (rm *ResourceManager) LaunchAndWatchPlugin(plugin *datatype.Plugin) { } case watch.Deleted: logger.Debug.Printf("Plugin got deleted. Returning resource and notify") - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("Plugin deleted").AddK3SJobMeta(job).AddPluginMeta(plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("Plugin deleted").AddK3SJobMeta(job).AddPluginMeta(pr.Plugin).Build()) return case watch.Error: logger.Debug.Printf("Error on watcher. Returning resource and notify") - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("Error on watcher").AddK3SJobMeta(job).AddPluginMeta(plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("Error on watcher").AddK3SJobMeta(job).AddPluginMeta(pr.Plugin).Build()) return default: logger.Error.Printf("Watcher of plugin %q received unknown event %q", job.Name, event.Type) @@ -1031,8 +1040,12 @@ func (rm *ResourceManager) LaunchAndWatchPlugin(plugin *datatype.Plugin) { } -func (rm *ResourceManager) GatherResourceUse() { - // rm.Clientset.metricsv1 +func (rm *ResourceManager) PublishPluginProfileToRMQ(gpuMetricHostIP string) { + // Resource collection every 5 seconds + resourceCollectorTicker := time.NewTicker(5 * time.Second) + gpuMetric := interfacing.NewHTTPRequest(fmt.Sprintf("http://%s:9101", gpuMetricHostIP)) + gpuMetric.RequestGet("metrics") + podMetrics, err := rm.MetricsClient.MetricsV1beta1().PodMetricses(rm.Namespace).List(context.TODO(), metav1.ListOptions{}) } // RunGabageCollector cleans up completed/failed jobs that exceed @@ -1144,7 +1157,7 @@ func (rm *ResourceManager) Run() { // msg := fmt.Sprintf("Node Name: %s \n CPU usage: %s \n Memory usage: %s", nodeMetric.Name, cpuQuantity, memQuantity) // logger.Debug.Println(msg) // } - // podMetrics, err := rm.MetricsClient.MetricsV1beta1().PodMetricses(rm.Namespace).List(context.TODO(), metav1.ListOptions{}) + // podMetrics, err := rm.MetricsClient.MetricsV1beta1().PodMetricses(rm.Namespace).List(context.TODO(), metav1.ListOptions{}) // for _, podMetric := range podMetrics.Items { // podContainers := podMetric.Containers // for _, container := range podContainers { From 6e106f2b2115266a6e9833102c13c123e447e6d0 Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Sat, 13 May 2023 21:55:47 -0500 Subject: [PATCH 02/12] Staged code --- cmd/pluginctl/cmd/deploy.go | 2 +- pkg/datatype/plugin.go | 13 ++- pkg/datatype/queue.go | 26 +++--- pkg/nodescheduler/nodescheduler.go | 54 ++++++++----- pkg/nodescheduler/policy/default.go | 4 +- pkg/nodescheduler/policy/gpuaware.go | 20 ++--- pkg/nodescheduler/policy/gpuaware_test.go | 52 +++++++----- pkg/nodescheduler/policy/roundrobin.go | 2 +- pkg/nodescheduler/resourcemanager.go | 98 ++++++++++++++++------- pkg/pluginctl/pluginctl.go | 7 ++ 10 files changed, 178 insertions(+), 100 deletions(-) diff --git a/cmd/pluginctl/cmd/deploy.go b/cmd/pluginctl/cmd/deploy.go index ec3efab..158094d 100644 --- a/cmd/pluginctl/cmd/deploy.go +++ b/cmd/pluginctl/cmd/deploy.go @@ -18,7 +18,7 @@ func init() { flags.StringSliceVarP(&deployment.EnvVarString, "env", "e", []string{}, "Set environment variables") flags.StringVarP(&deployment.EnvFromFile, "env-from", "", "", "Set environment variables from file") flags.BoolVar(&deployment.DevelopMode, "develop", false, "Enable the following development time features: access to wan network") - flags.StringVar(&deployment.Type, "type", "job", "Type of the plugin. It is one of ['job', 'deployment', 'daemonset]. Default is 'job'.") + flags.StringVar(&deployment.Type, "type", "pod", "Type of the plugin. It is one of ['pod', 'job', 'deployment', 'daemonset]. Default is 'pod'.") flags.StringVar(&deployment.ResourceString, "resource", "", "Specify resource requirement for running the plugin") rootCmd.AddCommand(cmdDeploy) } diff --git a/pkg/datatype/plugin.go b/pkg/datatype/plugin.go index b30ea8e..df976c8 100644 --- a/pkg/datatype/plugin.go +++ b/pkg/datatype/plugin.go @@ -96,14 +96,21 @@ type PluginCredential struct { } type PluginRuntime struct { - Plugin *Plugin + Plugin Plugin Duration int NeedProfile bool Resource Resource } -func NewPluginRuntimeWithScienceRule(p *Plugin, runtimeArgs *ScienceRule) *PluginRuntime { - return &PluginRuntime{} +func NewPluginRuntimeWithScienceRule(p Plugin, runtimeArgs ScienceRule) *PluginRuntime { + pr := &PluginRuntime{ + Plugin: p, + } + // TODO: any runtime parameters of the plugin should be parsed and added to the runtime + // if v, found := runtimeArgs.ActionParameters["duration"]; found { + // pr.Duration + // } + return pr } // type Plugin struct { diff --git a/pkg/datatype/queue.go b/pkg/datatype/queue.go index 843e018..59add87 100644 --- a/pkg/datatype/queue.go +++ b/pkg/datatype/queue.go @@ -4,7 +4,7 @@ import "sync" type Queue struct { mu sync.Mutex - entities []*Plugin + entities []*PluginRuntime index int } @@ -18,7 +18,7 @@ func (q *Queue) More() bool { return q.index < len(q.entities) } -func (q *Queue) Next() *Plugin { +func (q *Queue) Next() *PluginRuntime { q.mu.Lock() if q.index > len(q.entities) { return nil @@ -32,8 +32,8 @@ func (q *Queue) Next() *Plugin { func (q *Queue) GetPluginNames() (list []string) { q.ResetIter() for q.More() { - plugin := q.Next() - list = append(list, plugin.Name) + pr := q.Next() + list = append(list, pr.Plugin.Name) } return } @@ -42,8 +42,8 @@ func (q *Queue) GetGoalIDs() (list map[string]bool) { list = make(map[string]bool) q.ResetIter() for q.More() { - plugin := q.Next() - list[plugin.GoalID] = true + pr := q.Next() + list[pr.Plugin.GoalID] = true } return } @@ -52,20 +52,20 @@ func (q *Queue) Length() int { return len(q.entities) } -func (q *Queue) Push(p *Plugin) { +func (q *Queue) Push(p *PluginRuntime) { q.mu.Lock() q.entities = append(q.entities, p) q.index += 1 q.mu.Unlock() } -func (q *Queue) Pop(p *Plugin) *Plugin { +func (q *Queue) Pop(pr *PluginRuntime) *PluginRuntime { q.mu.Lock() - var found *Plugin - for i, _p := range q.entities { - if _p.Name == p.Name { + var found *PluginRuntime + for i, _pr := range q.entities { + if _pr.Plugin.Name == pr.Plugin.Name { q.entities = append(q.entities[:i], q.entities[i+1:]...) - found = _p + found = _pr q.index -= 1 break } @@ -74,7 +74,7 @@ func (q *Queue) Pop(p *Plugin) *Plugin { return found } -func (q *Queue) PopFirst() *Plugin { +func (q *Queue) PopFirst() *PluginRuntime { if q.Length() > 0 { return q.Pop(q.entities[0]) } else { diff --git a/pkg/nodescheduler/nodescheduler.go b/pkg/nodescheduler/nodescheduler.go index dc9885f..791e1ae 100644 --- a/pkg/nodescheduler/nodescheduler.go +++ b/pkg/nodescheduler/nodescheduler.go @@ -112,8 +112,11 @@ func (ns *NodeScheduler) Run() { if plugin == nil { logger.Error.Printf("failed to promote plugin: plugin name %q does not exist in goal %q", pluginName, goalID) } else { - if p := ns.waitingQueue.Pop(plugin); p != nil { - ns.readyQueue.Push(p) + // make a hard copy of the plugin + _p := *plugin + pr := datatype.NewPluginRuntimeWithScienceRule(_p, *r) + if _pr := ns.waitingQueue.Pop(pr); _pr != nil { + ns.readyQueue.Push(pr) triggerScheduling = true logger.Debug.Printf("Plugin %s is promoted by rules", plugin.Name) } @@ -161,7 +164,7 @@ func (ns *NodeScheduler) Run() { logger.Debug.Printf("Reason for (re)scheduling %q", event.Type) logger.Debug.Printf("Plugins in ready queue: %+v", ns.readyQueue.GetPluginNames()) // Select the best task - plugins, err := ns.SchedulingPolicy.SelectBestPlugins( + pluginsToRun, err := ns.SchedulingPolicy.SelectBestPlugins( &ns.readyQueue, &ns.scheduledPlugins, datatype.Resource{ @@ -173,13 +176,13 @@ func (ns *NodeScheduler) Run() { if err != nil { logger.Error.Printf("Failed to get the best task to run %q", err.Error()) } else { - for _, plugin := range plugins { - e := datatype.NewEventBuilder(datatype.EventPluginStatusScheduled).AddReason("Fit to resource").AddPluginMeta(plugin).Build() + for _, _pr := range pluginsToRun { + e := datatype.NewEventBuilder(datatype.EventPluginStatusScheduled).AddReason("Fit to resource").AddPluginMeta(&_pr.Plugin).Build() logger.Debug.Printf("%s: %q (%q)", e.ToString(), e.GetPluginName(), e.GetReason()) go ns.LogToBeehive.SendWaggleMessage(e.ToWaggleMessage(), "all") - ns.readyQueue.Pop(plugin) - ns.scheduledPlugins.Push(plugin) - go ns.ResourceManager.LaunchAndWatchPlugin(plugin) + pr := ns.readyQueue.Pop(_pr) + ns.scheduledPlugins.Push(pr) + go ns.ResourceManager.LaunchAndWatchPlugin(pr) } } case event := <-ns.chanFromResourceManager: @@ -208,10 +211,13 @@ func (ns *NodeScheduler) Run() { logger.Error.Printf("Could not get goal to update plugin status: %q", err.Error()) } else { pluginName := event.GetPluginName() - plugin := scienceGoal.GetMySubGoal(ns.NodeID).GetPlugin(pluginName) - if plugin != nil { - ns.scheduledPlugins.Pop(plugin) - ns.waitingQueue.Push(plugin) + if plugin := scienceGoal.GetMySubGoal(ns.NodeID).GetPlugin(pluginName); plugin != nil { + p := *plugin + _pr := &datatype.PluginRuntime{ + Plugin: p, + } + pr := ns.scheduledPlugins.Pop(_pr) + ns.waitingQueue.Push(pr) go ns.LogToBeehive.SendWaggleMessage(event.ToWaggleMessage(), "all") } } @@ -243,7 +249,11 @@ func (ns *NodeScheduler) registerGoal(goal *datatype.ScienceGoal) { logger.Error.Printf("Failed to add science rules of goal %q: %s", goal.ID, err.Error()) } for _, p := range mySubGoal.GetPlugins() { - ns.waitingQueue.Push(p) + // copy plugin object + _p := *p + ns.waitingQueue.Push(&datatype.PluginRuntime{ + Plugin: _p, + }) logger.Debug.Printf("plugin %s is added to the watiting queue", p.Name) } } @@ -252,20 +262,24 @@ func (ns *NodeScheduler) registerGoal(goal *datatype.ScienceGoal) { func (ns *NodeScheduler) cleanUpGoal(goal *datatype.ScienceGoal) { ns.Knowledgebase.DropRules(goal.Name) for _, p := range goal.GetMySubGoal(ns.NodeID).GetPlugins() { - if a := ns.waitingQueue.Pop(p); a != nil { + _p := *p + pr := &datatype.PluginRuntime{ + Plugin: _p, + } + if a := ns.waitingQueue.Pop(pr); a != nil { logger.Debug.Printf("plugin %s is removed from the waiting queue", p.Name) } - if a := ns.readyQueue.Pop(p); a != nil { + if a := ns.readyQueue.Pop(pr); a != nil { logger.Debug.Printf("plugin %s is removed from the ready queue", p.Name) } - if a := ns.scheduledPlugins.Pop(p); a != nil { - if pod, err := ns.ResourceManager.GetPod(a.Name); err != nil { - logger.Error.Printf("Failed to get pod of the plugin %q", a.Name) + if a := ns.scheduledPlugins.Pop(pr); a != nil { + if pod, err := ns.ResourceManager.GetPod(a.Plugin.Name); err != nil { + logger.Error.Printf("Failed to get pod of the plugin %q", a.Plugin.Name) } else { - e := datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddPluginMeta(a).AddPodMeta(pod).AddReason("Cleaning up the plugin due to deletion of the goal").Build() + e := datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddPluginMeta(&a.Plugin).AddPodMeta(pod).AddReason("Cleaning up the plugin due to deletion of the goal").Build() go ns.LogToBeehive.SendWaggleMessage(e.ToWaggleMessage(), "all") } - ns.ResourceManager.RemovePlugin(a) + ns.ResourceManager.RemovePlugin(&a.Plugin) logger.Debug.Printf("plugin %s is removed from running", p.Name) } diff --git a/pkg/nodescheduler/policy/default.go b/pkg/nodescheduler/policy/default.go index 3b5448d..038e7b9 100644 --- a/pkg/nodescheduler/policy/default.go +++ b/pkg/nodescheduler/policy/default.go @@ -6,7 +6,7 @@ import ( ) type SchedulingPolicy interface { - SelectBestPlugins(*datatype.Queue, *datatype.Queue, datatype.Resource) ([]*datatype.Plugin, error) + SelectBestPlugins(*datatype.Queue, *datatype.Queue, datatype.Resource) ([]*datatype.PluginRuntime, error) } func GetSchedulingPolicyByName(policyName string) SchedulingPolicy { @@ -35,7 +35,7 @@ func NewSimpleSchedulingPolicy() *SimpleSchedulingPolicy { // SelectBestPlugins returns the best plugin to run at the time // For SimpleSchedulingPolicy, it returns all "ready" plugins -func (ss *SimpleSchedulingPolicy) SelectBestPlugins(readyQueue *datatype.Queue, scheduledPlugins *datatype.Queue, availableResource datatype.Resource) (pluginsToRun []*datatype.Plugin, err error) { +func (ss *SimpleSchedulingPolicy) SelectBestPlugins(readyQueue *datatype.Queue, scheduledPlugins *datatype.Queue, availableResource datatype.Resource) (pluginsToRun []*datatype.PluginRuntime, err error) { readyQueue.ResetIter() for readyQueue.More() { pluginsToRun = append(pluginsToRun, readyQueue.Next()) diff --git a/pkg/nodescheduler/policy/gpuaware.go b/pkg/nodescheduler/policy/gpuaware.go index 5a7a8af..62f0ee1 100644 --- a/pkg/nodescheduler/policy/gpuaware.go +++ b/pkg/nodescheduler/policy/gpuaware.go @@ -15,31 +15,31 @@ func NewGPUAwareSchedulingPolicy() *GPUAwareSchedulingPolicy { // SelectBestPlugins returns the best plugin to run at the time // For non-GPU-demand plugins, it returns all the plugins. // For GPU-demand plugins it returns the oldest one if no GPU-demand plugins in the scheduled plugin list -func (rs *GPUAwareSchedulingPolicy) SelectBestPlugins(readyQueue *datatype.Queue, scheduledPlugins *datatype.Queue, availableResource datatype.Resource) (pluginsToRun []*datatype.Plugin, err error) { +func (rs *GPUAwareSchedulingPolicy) SelectBestPlugins(readyQueue *datatype.Queue, scheduledPlugins *datatype.Queue, availableResource datatype.Resource) (pluginsToRun []*datatype.PluginRuntime, err error) { GPUPluginExists := false // Flag if GPU-demand plugin already exists in scheduled plugin list scheduledPlugins.ResetIter() for scheduledPlugins.More() { - p := scheduledPlugins.Next() - if p.PluginSpec.IsGPURequired() { + pr := scheduledPlugins.Next() + if pr.Plugin.PluginSpec.IsGPURequired() { GPUPluginExists = true - logger.Debug.Printf("GPU-demand plugin %q exists in scheduled plugin list.", p.Name) + logger.Debug.Printf("GPU-demand plugin %q exists in scheduled plugin list.", pr.Plugin.Name) break } } readyQueue.ResetIter() for readyQueue.More() { - p := readyQueue.Next() - if p.PluginSpec.IsGPURequired() { + pr := readyQueue.Next() + if pr.Plugin.PluginSpec.IsGPURequired() { if GPUPluginExists == false { - pluginsToRun = append(pluginsToRun, p) - logger.Debug.Printf("GPU-demand plugin %q is added to scheduled plugin list.", p.Name) + pluginsToRun = append(pluginsToRun, pr) + logger.Debug.Printf("GPU-demand plugin %q is added to scheduled plugin list.", pr.Plugin.Name) GPUPluginExists = true } else { - logger.Debug.Printf("GPU-demand plugin %q needs to wait because other GPU-demand plugin is scheduled or being run.", p.Name) + logger.Debug.Printf("GPU-demand plugin %q needs to wait because other GPU-demand plugin is scheduled or being run.", pr.Plugin.Name) } } else { - pluginsToRun = append(pluginsToRun, p) + pluginsToRun = append(pluginsToRun, pr) } } return diff --git a/pkg/nodescheduler/policy/gpuaware_test.go b/pkg/nodescheduler/policy/gpuaware_test.go index f76a4e7..908ebcc 100644 --- a/pkg/nodescheduler/policy/gpuaware_test.go +++ b/pkg/nodescheduler/policy/gpuaware_test.go @@ -11,33 +11,41 @@ func TestGPUAwarePolicy(t *testing.T) { readyQueue datatype.Queue scheduledPlugins datatype.Queue ) - readyQueue.Push(&datatype.Plugin{ - Name: "nongpu-plugin-a", - PluginSpec: &datatype.PluginSpec{ - Image: "plugin-a:latest", + readyQueue.Push(&datatype.PluginRuntime{ + Plugin: datatype.Plugin{ + Name: "nongpu-plugin-a", + PluginSpec: &datatype.PluginSpec{ + Image: "plugin-a:latest", + }, }, }) - readyQueue.Push(&datatype.Plugin{ - Name: "gpu-plugin-b", - PluginSpec: &datatype.PluginSpec{ - Image: "plugin-b:latest", - Selector: map[string]string{ - "resource.gpu": "true", + readyQueue.Push(&datatype.PluginRuntime{ + Plugin: datatype.Plugin{ + Name: "gpu-plugin-b", + PluginSpec: &datatype.PluginSpec{ + Image: "plugin-b:latest", + Selector: map[string]string{ + "resource.gpu": "true", + }, }, }, }) - readyQueue.Push(&datatype.Plugin{ - Name: "nongpu-plugin-c", - PluginSpec: &datatype.PluginSpec{ - Image: "plugin-c:latest", + readyQueue.Push(&datatype.PluginRuntime{ + Plugin: datatype.Plugin{ + Name: "nongpu-plugin-c", + PluginSpec: &datatype.PluginSpec{ + Image: "plugin-c:latest", + }, }, }) - readyQueue.Push(&datatype.Plugin{ - Name: "gpu-plugin-d", - PluginSpec: &datatype.PluginSpec{ - Image: "plugin-d:latest", - Selector: map[string]string{ - "resource.gpu": "true", + readyQueue.Push(&datatype.PluginRuntime{ + Plugin: datatype.Plugin{ + Name: "gpu-plugin-d", + PluginSpec: &datatype.PluginSpec{ + Image: "plugin-d:latest", + Selector: map[string]string{ + "resource.gpu": "true", + }, }, }, }) @@ -55,8 +63,8 @@ func TestGPUAwarePolicy(t *testing.T) { } if len(pluginsToSchedule) != 3 { t.Errorf("all 3 plugins are expected to be scheduled, but %d plugins were scheduled", len(pluginsToSchedule)) - for _, p := range pluginsToSchedule { - t.Log(p.Name) + for _, pr := range pluginsToSchedule { + t.Log(pr.Plugin.Name) } } } diff --git a/pkg/nodescheduler/policy/roundrobin.go b/pkg/nodescheduler/policy/roundrobin.go index 98e7367..b699c39 100644 --- a/pkg/nodescheduler/policy/roundrobin.go +++ b/pkg/nodescheduler/policy/roundrobin.go @@ -13,7 +13,7 @@ func NewRoundRobinSchedulingPolicy() *RoundRobinSchedulingPolicy { // SelectBestPlugins returns the best plugin to run at the time // It returns the oldest plugin amongst "ready" plugins -func (rs *RoundRobinSchedulingPolicy) SelectBestPlugins(readyQueue *datatype.Queue, scheduledPlugins *datatype.Queue, availableResource datatype.Resource) (pluginsToRun []*datatype.Plugin, err error) { +func (rs *RoundRobinSchedulingPolicy) SelectBestPlugins(readyQueue *datatype.Queue, scheduledPlugins *datatype.Queue, availableResource datatype.Resource) (pluginsToRun []*datatype.PluginRuntime, err error) { if scheduledPlugins.Length() > 0 { return } diff --git a/pkg/nodescheduler/resourcemanager.go b/pkg/nodescheduler/resourcemanager.go index 47d6f87..fdbc59c 100644 --- a/pkg/nodescheduler/resourcemanager.go +++ b/pkg/nodescheduler/resourcemanager.go @@ -456,6 +456,12 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi }, }, }, + // { + // Name: "local-dev", + // VolumeSource: apiv1.VolumeSource{ + // EmptyDir: &apiv1.EmptyDirVolumeSource{}, + // }, + // }, } volumeMounts := []apiv1.VolumeMount{ @@ -473,6 +479,10 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi MountPath: "/etc/asound.conf", SubPath: "asound.conf", }, + // { + // Name: "local-dev", + // MountPath: "/dev", + // }, } // provide privileged plugins access to host devices @@ -543,6 +553,11 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi }, } + sidecar := apiv1.Container{ + Name: "plugin-controller", + Image: "10.31.81.1:5000/", + } + resources, err := resourceListForConfig(plugin.PluginSpec) if err != nil { return v1.PodTemplateSpec{}, err @@ -558,6 +573,7 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi Resources: resources, VolumeMounts: volumeMounts, }, + sidecar, } if plugin.PluginSpec.Entrypoint != "" { @@ -580,6 +596,27 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi }, nil } +// CreateKubernetesPod creates a Pod for the plugin +func (rm *ResourceManager) CreatePod(plugin *datatype.Plugin) (*apiv1.Pod, error) { + name, err := pluginNameForSpecDeployment(plugin) + if err != nil { + return nil, err + } + template, err := rm.createPodTemplateSpecForPlugin(plugin) + if err != nil { + return nil, err + } + template.Spec.RestartPolicy = apiv1.RestartPolicyNever + return &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: rm.Namespace, + Labels: template.Labels, + }, + Spec: template.Spec, + }, nil +} + // CreateK3SJob creates and returns a Kubernetes job object of the pllugin func (rm *ResourceManager) CreateJob(plugin *datatype.Plugin) (*batchv1.Job, error) { name, err := pluginNameForSpecDeployment(plugin) @@ -681,6 +718,19 @@ func (rm *ResourceManager) CreateDataConfigMap(configName string, datashims []*d return err } +func (rm *ResourceManager) UpdatePod(pod *apiv1.Pod) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + pods := rm.Clientset.CoreV1().Pods(rm.Namespace) + if _, err := pods.Get(ctx, pod.Name, metav1.GetOptions{}); err == nil { + _, err := pods.Update(ctx, pod, metav1.UpdateOptions{}) + return err + } else { + _, err := pods.Create(ctx, pod, metav1.CreateOptions{}) + return err + } +} + func (rm *ResourceManager) UpdateDeployment(deployment *appsv1.Deployment) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -923,17 +973,17 @@ func (rm *ResourceManager) ClenUp() error { func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) { logger.Debug.Printf("Running plugin %q...", pr.Plugin.Name) - job, err := rm.CreateJob(pr.Plugin) + job, err := rm.CreateJob(&pr.Plugin) if err != nil { logger.Error.Printf("Failed to create Kubernetes Job for %q: %q", pr.Plugin.Name, err.Error()) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(pr.Plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(&pr.Plugin).Build()) return } err = rm.RunPlugin(job) defer rm.TerminateJob(job.Name) if err != nil { logger.Error.Printf("Failed to run %q: %q", job.Name, err.Error()) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(pr.Plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(&pr.Plugin).Build()) return } logger.Info.Printf("Plugin %q is scheduled", job.Name) @@ -944,7 +994,7 @@ func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) { watcher, err := rm.WatchJob(job.Name, rm.Namespace, 1) if err != nil { logger.Error.Printf("Failed to watch %q. Abort the execution", job.Name) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddK3SJobMeta(job).AddPluginMeta(pr.Plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).Build()) return } chanEvent := watcher.ResultChan() @@ -955,23 +1005,23 @@ func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) { case watch.Added: job := event.Object.(*batchv1.Job) pod, _ := rm.GetPod(job.Name) - var gpuMetricHostIP string - if pr.NeedProfile { - // we need to stop the timer eventually - defer resourceCollectorTicker.Stop() - } else { - // if we don't need to profile plugin we just stop it - resourceCollectorTicker.Stop() - } - gpuMetricHostIP = pod.Status.HostIP - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusLaunched).AddK3SJobMeta(job).AddPodMeta(pod).AddPluginMeta(pr.Plugin).Build()) + // var gpuMetricHostIP string + // if pr.NeedProfile { + // // we need to stop the timer eventually + // defer resourceCollectorTicker.Stop() + // } else { + // // if we don't need to profile plugin we just stop it + // resourceCollectorTicker.Stop() + // } + // gpuMetricHostIP = pod.Status.HostIP + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusLaunched).AddK3SJobMeta(job).AddPodMeta(pod).AddPluginMeta(&pr.Plugin).Build()) case watch.Modified: job := event.Object.(*batchv1.Job) if len(job.Status.Conditions) > 0 { logger.Debug.Printf("Plugin %s status %s: %s", job.Name, event.Type, job.Status.Conditions[0].Type) switch job.Status.Conditions[0].Type { case batchv1.JobComplete: - eventBuilder := datatype.NewEventBuilder(datatype.EventPluginStatusComplete).AddK3SJobMeta(job).AddPluginMeta(pr.Plugin) + eventBuilder := datatype.NewEventBuilder(datatype.EventPluginStatusComplete).AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin) if pod, err := rm.GetPod(job.Name); err != nil { logger.Debug.Printf("Failed to get pod for job %q: %s", job.Name, err.Error()) } else { @@ -983,7 +1033,7 @@ func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) { logger.Error.Printf("Plugin %q has failed.", job.Name) eventBuilder := datatype.NewEventBuilder(datatype.EventPluginStatusFailed). AddK3SJobMeta(job). - AddPluginMeta(pr.Plugin) + AddPluginMeta(&pr.Plugin) if pod, err := rm.GetPod(job.Name); err != nil { logger.Error.Printf("Failed to get pod for job %q: %s", job.Name, err.Error()) } else { @@ -1009,11 +1059,11 @@ func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) { } case watch.Deleted: logger.Debug.Printf("Plugin got deleted. Returning resource and notify") - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("Plugin deleted").AddK3SJobMeta(job).AddPluginMeta(pr.Plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("Plugin deleted").AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).Build()) return case watch.Error: logger.Debug.Printf("Error on watcher. Returning resource and notify") - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("Error on watcher").AddK3SJobMeta(job).AddPluginMeta(pr.Plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("Error on watcher").AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).Build()) return default: logger.Error.Printf("Watcher of plugin %q received unknown event %q", job.Name, event.Type) @@ -1026,12 +1076,12 @@ func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) { // To get out from this loop, we check if the pod is running, if not, we should terminate the plugin if pod, err := rm.GetPod(job.Name); err != nil { logger.Error.Printf("failed to get status of pod for job %q: %s", job.Name, err.Error()) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("pod no longer exist").AddK3SJobMeta(job).AddPluginMeta(plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("pod no longer exist").AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).Build()) return } else { if pod.Status.Phase != apiv1.PodRunning { logger.Error.Printf("pod %q is not running for job %q. Closing plugin", pod.Name, job.Name) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("pod no longer running").AddK3SJobMeta(job).AddPluginMeta(plugin).AddPodMeta(pod).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("pod no longer running").AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).AddPodMeta(pod).Build()) return } } @@ -1040,14 +1090,6 @@ func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) { } -func (rm *ResourceManager) PublishPluginProfileToRMQ(gpuMetricHostIP string) { - // Resource collection every 5 seconds - resourceCollectorTicker := time.NewTicker(5 * time.Second) - gpuMetric := interfacing.NewHTTPRequest(fmt.Sprintf("http://%s:9101", gpuMetricHostIP)) - gpuMetric.RequestGet("metrics") - podMetrics, err := rm.MetricsClient.MetricsV1beta1().PodMetricses(rm.Namespace).List(context.TODO(), metav1.ListOptions{}) -} - // RunGabageCollector cleans up completed/failed jobs that exceed // their lifespan specified in `ttlSecondsAfterFinished` // diff --git a/pkg/pluginctl/pluginctl.go b/pkg/pluginctl/pluginctl.go index 2bce526..cc179b1 100644 --- a/pkg/pluginctl/pluginctl.go +++ b/pkg/pluginctl/pluginctl.go @@ -180,6 +180,13 @@ func (p *PluginCtl) Deploy(dep *Deployment) (string, error) { }, } switch dep.Type { + case "pod": + pod, err := p.ResourceManager.CreatePod(&plugin) + if err != nil { + return "", err + } + err = p.ResourceManager.UpdatePod(pod) + return pod.Name, err case "job": job, err := p.ResourceManager.CreateJob(&plugin) if err != nil { From 7f3cecc56261a141a62b85413b0e8a3d63fd5dd2 Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Wed, 31 May 2023 17:47:19 -0500 Subject: [PATCH 03/12] Created Event types for app performance --- pkg/datatype/event.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/datatype/event.go b/pkg/datatype/event.go index 7e9c511..0807b58 100644 --- a/pkg/datatype/event.go +++ b/pkg/datatype/event.go @@ -148,7 +148,7 @@ func NewEventBuilderFromWaggleMessage(m *WaggleMessage) (*EventBuilder, error) { func (e *Event) ToWaggleMessage() *WaggleMessage { // TODO: beehive-influxdb does not handle bytes so body is always string. // This should be lifted once it accepts bytes. - encodedBody, err := e.encodeMetaToJson() + encodedBody, err := e.EncodeMetaToJson() if err != nil { logger.Debug.Printf("Failed to convert to Waggle message: %q", err.Error()) return nil @@ -161,7 +161,7 @@ func (e *Event) ToWaggleMessage() *WaggleMessage { ) } -func (e *Event) encodeMetaToJson() ([]byte, error) { +func (e *Event) EncodeMetaToJson() ([]byte, error) { return json.Marshal(e.Meta) } @@ -186,6 +186,10 @@ const ( EventPluginLastExecution EventType = "sys.scheduler.plugin.lastexecution" EventPluginStatusFailed EventType = "sys.scheduler.status.plugin.failed" EventFailure EventType = "sys.scheduler.failure" + + EventPluginPerfCPU EventType = "sys.plugin.perf.cpu" + EventPluginPerfMem EventType = "sys.plugin.perf.mem" + EventPluginPerfGPU EventType = "sys.plugin.perf.gpu" ) // type EventErrorCode string From 87571099622c78b26bec3b0562527c1474d0eaa2 Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Thu, 8 Jun 2023 22:51:28 -0500 Subject: [PATCH 04/12] Added sidecar for performance gathering --- Makefile | 28 ++++++------ cmd/pluginctl/cmd/deploy.go | 2 +- pkg/nodescheduler/resourcemanager.go | 68 +++++++++++++++++++++++----- 3 files changed, 72 insertions(+), 26 deletions(-) diff --git a/Makefile b/Makefile index 7277f1b..bada9a4 100644 --- a/Makefile +++ b/Makefile @@ -2,28 +2,28 @@ VERSION?=0.0.0 cli-all-arch: cli-linux-amd64 cli-linux-arm64 cli-darwin-amd64 cli-windows-amd64 cli-linux-arm64: - GOOS=linux GOARCH=arm64 go build -o ./out/runplugin-linux-arm64 ./cmd/runplugin - GOOS=linux GOARCH=arm64 go build -o ./out/pluginctl-linux-arm64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl - GOOS=linux GOARCH=arm64 go build -o ./out/sesctl-linux-arm64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl + CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./out/runplugin-linux-arm64 ./cmd/runplugin + CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./out/pluginctl-linux-arm64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl + CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./out/sesctl-linux-arm64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl cli-linux-amd64: - GOOS=linux GOARCH=amd64 go build -o ./out/runplugin-linux-amd64 ./cmd/runplugin - GOOS=linux GOARCH=amd64 go build -o ./out/pluginctl-linux-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl - GOOS=linux GOARCH=amd64 go build -o ./out/sesctl-linux-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./out/runplugin-linux-amd64 ./cmd/runplugin + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./out/pluginctl-linux-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./out/sesctl-linux-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl cli-darwin-amd64: - GOOS=darwin GOARCH=amd64 go build -o ./out/runplugin-darwin-amd64 ./cmd/runplugin - GOOS=darwin GOARCH=amd64 go build -o ./out/pluginctl-darwin-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl - GOOS=darwin GOARCH=amd64 go build -o ./out/sesctl-darwin-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl + CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./out/runplugin-darwin-amd64 ./cmd/runplugin + CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./out/pluginctl-darwin-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl + CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./out/sesctl-darwin-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl cli-windows-amd64: - GOOS=windows GOARCH=amd64 go build -o ./out/runplugin-windows-amd64 ./cmd/runplugin - GOOS=windows GOARCH=amd64 go build -o ./out/pluginctl-windows-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl - GOOS=windows GOARCH=amd64 go build -o ./out/sesctl-windows-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl + CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -o ./out/runplugin-windows-amd64 ./cmd/runplugin + CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -o ./out/pluginctl-windows-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl + CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -o ./out/sesctl-windows-amd64 -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl cli: - go build -o ./out/pluginctl -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl - go build -o ./out/sesctl -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl + CGO_ENABLED=0 go build -o ./out/pluginctl -ldflags "-X main.Version=${VERSION}" ./cmd/pluginctl + CGO_ENABLED=0 go build -o ./out/sesctl -ldflags "-X main.Version=${VERSION}" ./cmd/sesctl scheduler-all-arch: scheduler-amd64 scheduler-arm64 diff --git a/cmd/pluginctl/cmd/deploy.go b/cmd/pluginctl/cmd/deploy.go index 158094d..e56dae3 100644 --- a/cmd/pluginctl/cmd/deploy.go +++ b/cmd/pluginctl/cmd/deploy.go @@ -18,7 +18,7 @@ func init() { flags.StringSliceVarP(&deployment.EnvVarString, "env", "e", []string{}, "Set environment variables") flags.StringVarP(&deployment.EnvFromFile, "env-from", "", "", "Set environment variables from file") flags.BoolVar(&deployment.DevelopMode, "develop", false, "Enable the following development time features: access to wan network") - flags.StringVar(&deployment.Type, "type", "pod", "Type of the plugin. It is one of ['pod', 'job', 'deployment', 'daemonset]. Default is 'pod'.") + flags.StringVar(&deployment.Type, "type", "job", "Type of the plugin. It is one of ['pod', 'job', 'deployment', 'daemonset]. Default is 'pod'.") flags.StringVar(&deployment.ResourceString, "resource", "", "Specify resource requirement for running the plugin") rootCmd.AddCommand(cmdDeploy) } diff --git a/pkg/nodescheduler/resourcemanager.go b/pkg/nodescheduler/resourcemanager.go index fdbc59c..a022271 100644 --- a/pkg/nodescheduler/resourcemanager.go +++ b/pkg/nodescheduler/resourcemanager.go @@ -456,12 +456,12 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi }, }, }, - // { - // Name: "local-dev", - // VolumeSource: apiv1.VolumeSource{ - // EmptyDir: &apiv1.EmptyDirVolumeSource{}, - // }, - // }, + { + Name: "local-share", + VolumeSource: apiv1.VolumeSource{ + EmptyDir: &apiv1.EmptyDirVolumeSource{}, + }, + }, } volumeMounts := []apiv1.VolumeMount{ @@ -479,10 +479,10 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi MountPath: "/etc/asound.conf", SubPath: "asound.conf", }, - // { - // Name: "local-dev", - // MountPath: "/dev", - // }, + { + Name: "local-share", + MountPath: "/waggle", + }, } // provide privileged plugins access to host devices @@ -529,6 +529,8 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi "set", "--nodename", "$(HOST)", + "--host", + "wes-app-meta-cache.default.svc.cluster.local", "app-meta.$(WAGGLE_APP_ID)", string(appMetaData), }, @@ -555,7 +557,33 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi sidecar := apiv1.Container{ Name: "plugin-controller", - Image: "10.31.81.1:5000/", + Image: "waggle/plugin-controller:0.0.2", + Args: []string{ + "--enable-cpu-performance", + "--app-cgroup-dir", + "/app/sys/fs/cgroup", + "--enable-gpu-performance", + }, + Env: []apiv1.EnvVar{ + { + Name: "GPU_METRIC_HOST", + Value: "wes-jetson-exporter.default.svc.cluster.local", + }, + }, + VolumeMounts: []apiv1.VolumeMount{ + { + Name: "local-share", + MountPath: "/app/", + ReadOnly: true, + }, + }, + Resources: apiv1.ResourceRequirements{ + Limits: apiv1.ResourceList{}, + Requests: apiv1.ResourceList{ + apiv1.ResourceCPU: resource.MustParse("50m"), + apiv1.ResourceMemory: resource.MustParse("20Mi"), + }, + }, } resources, err := resourceListForConfig(plugin.PluginSpec) @@ -572,6 +600,19 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi Env: envs, Resources: resources, VolumeMounts: volumeMounts, + // make a symbolic link of the app container's cgroup to + // access to it from the sidecar container + Lifecycle: &apiv1.Lifecycle{ + PostStart: &apiv1.LifecycleHandler{ + Exec: &v1.ExecAction{ + Command: []string{ + "sh", + "-c", + "mkdir -p /waggle/sys/fs; ln -sf /sys/fs/cgroup /waggle/sys/fs", + }, + }, + }, + }, }, sidecar, } @@ -589,6 +630,9 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi PriorityClassName: "wes-app-priority", NodeSelector: nodeSelectorForConfig(plugin.PluginSpec), // TODO: The priority class will be revisited when using resource metrics to schedule plugins + // NOTE(Yongho): ShareProcessNamespace allows the sidecar to access app container's filesystem. + // This can be used later as needed + // ShareProcessNamespace: booltoPtr(true), InitContainers: initContainers, Containers: containers, Volumes: volumes, @@ -1525,6 +1569,8 @@ func int32Ptr(i int32) *int32 { return &i } func int64Ptr(i int64) *int64 { return &i } +func booltoPtr(b bool) *bool { return &b } + // GetK3SClient returns an instance of clientset talking to a K3S cluster func GetK3SClient(incluster bool, pathToConfig string) (*kubernetes.Clientset, error) { if incluster { From 40f2b25a1ac6071010d385b24cf20c99ce684ef0 Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Wed, 21 Jun 2023 00:33:26 -0500 Subject: [PATCH 05/12] Added plugin-controller in both scheduler and pluginctl --- cmd/pluginctl/cmd/deploy.go | 1 + cmd/runplugin/main.go | 4 +- pkg/datatype/plugin.go | 12 +- pkg/nodescheduler/nodescheduler.go | 2 + pkg/nodescheduler/resourcemanager.go | 171 ++++++++++++++------------- pkg/pluginctl/pluginctl.go | 61 +++++----- 6 files changed, 134 insertions(+), 117 deletions(-) diff --git a/cmd/pluginctl/cmd/deploy.go b/cmd/pluginctl/cmd/deploy.go index e56dae3..95a62c7 100644 --- a/cmd/pluginctl/cmd/deploy.go +++ b/cmd/pluginctl/cmd/deploy.go @@ -20,6 +20,7 @@ func init() { flags.BoolVar(&deployment.DevelopMode, "develop", false, "Enable the following development time features: access to wan network") flags.StringVar(&deployment.Type, "type", "job", "Type of the plugin. It is one of ['pod', 'job', 'deployment', 'daemonset]. Default is 'pod'.") flags.StringVar(&deployment.ResourceString, "resource", "", "Specify resource requirement for running the plugin") + flags.BoolVar(&deployment.EnablePluginController, "enable-plugin-controller", false, "Enable plugin controller supporting the plugin") rootCmd.AddCommand(cmdDeploy) } diff --git a/cmd/runplugin/main.go b/cmd/runplugin/main.go index e87d113..a606c5d 100644 --- a/cmd/runplugin/main.go +++ b/cmd/runplugin/main.go @@ -147,7 +147,9 @@ func runPlugin(resourceManager *nodescheduler.ResourceManager, plugin *datatype. log.Printf("plugin name is %s", plugin.Name) - deployment, err := resourceManager.CreateDeployment(plugin) + deployment, err := resourceManager.CreateDeployment(&datatype.PluginRuntime{ + Plugin: *plugin, + }) if err != nil { return fmt.Errorf("resourceManager.CreateDeployment: %s", err.Error()) } diff --git a/pkg/datatype/plugin.go b/pkg/datatype/plugin.go index df976c8..0605e05 100644 --- a/pkg/datatype/plugin.go +++ b/pkg/datatype/plugin.go @@ -96,10 +96,10 @@ type PluginCredential struct { } type PluginRuntime struct { - Plugin Plugin - Duration int - NeedProfile bool - Resource Resource + Plugin Plugin + Duration int + EnablePluginController bool + Resource Resource } func NewPluginRuntimeWithScienceRule(p Plugin, runtimeArgs ScienceRule) *PluginRuntime { @@ -113,6 +113,10 @@ func NewPluginRuntimeWithScienceRule(p Plugin, runtimeArgs ScienceRule) *PluginR return pr } +func (pr *PluginRuntime) SetPluginController(flag bool) { + pr.EnablePluginController = flag +} + // type Plugin struct { // Name string `yaml:"name"` // Image string `yaml:"image"` diff --git a/pkg/nodescheduler/nodescheduler.go b/pkg/nodescheduler/nodescheduler.go index 791e1ae..bd24c01 100644 --- a/pkg/nodescheduler/nodescheduler.go +++ b/pkg/nodescheduler/nodescheduler.go @@ -115,6 +115,8 @@ func (ns *NodeScheduler) Run() { // make a hard copy of the plugin _p := *plugin pr := datatype.NewPluginRuntimeWithScienceRule(_p, *r) + // TODO: we enable plugin-controller always. we will want to control this. + pr.SetPluginController(true) if _pr := ns.waitingQueue.Pop(pr); _pr != nil { ns.readyQueue.Push(pr) triggerScheduling = true diff --git a/pkg/nodescheduler/resourcemanager.go b/pkg/nodescheduler/resourcemanager.go index a022271..79c2211 100644 --- a/pkg/nodescheduler/resourcemanager.go +++ b/pkg/nodescheduler/resourcemanager.go @@ -350,7 +350,7 @@ func (rm *ResourceManager) WatchJobs(namespace string) (watch.Interface, error) return watcher, err } -func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugin) (v1.PodTemplateSpec, error) { +func (rm *ResourceManager) createPodTemplateSpecForPlugin(pr *datatype.PluginRuntime) (v1.PodTemplateSpec, error) { envs := []apiv1.EnvVar{ { Name: "PULSE_SERVER", @@ -414,14 +414,14 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi }, } - for k, v := range plugin.PluginSpec.Env { + for k, v := range pr.Plugin.PluginSpec.Env { envs = append(envs, apiv1.EnvVar{ Name: k, Value: v, }) } - tag, err := plugin.PluginSpec.GetImageTag() + tag, err := pr.Plugin.PluginSpec.GetImageTag() if err != nil { return v1.PodTemplateSpec{}, err } @@ -431,7 +431,7 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi Name: "uploads", VolumeSource: apiv1.VolumeSource{ HostPath: &apiv1.HostPathVolumeSource{ - Path: path.Join("/media/plugin-data/uploads", plugin.PluginSpec.Job, plugin.Name, tag), + Path: path.Join("/media/plugin-data/uploads", pr.Plugin.PluginSpec.Job, pr.Plugin.Name, tag), Type: &hostPathDirectoryOrCreate, }, }, @@ -479,14 +479,23 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi MountPath: "/etc/asound.conf", SubPath: "asound.conf", }, - { + } + + if pr.EnablePluginController { + volumes = append(volumes, apiv1.Volume{ + Name: "local-share", + VolumeSource: apiv1.VolumeSource{ + EmptyDir: &apiv1.EmptyDirVolumeSource{}, + }, + }) + volumeMounts = append(volumeMounts, apiv1.VolumeMount{ Name: "local-share", MountPath: "/waggle", - }, + }) } // provide privileged plugins access to host devices - if plugin.PluginSpec.Privileged { + if pr.Plugin.PluginSpec.Privileged { volumes = append(volumes, apiv1.Volume{ Name: "dev", VolumeSource: apiv1.VolumeSource{ @@ -509,9 +518,9 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi Plugin string `json:"plugin"` }{ Host: "$(HOST)", - Task: plugin.Name, - Job: plugin.PluginSpec.Job, - Plugin: plugin.PluginSpec.Image, + Task: pr.Plugin.Name, + Job: pr.Plugin.PluginSpec.Job, + Plugin: pr.Plugin.PluginSpec.Image, } appMetaData, err := json.Marshal(appMeta) @@ -555,98 +564,94 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(plugin *datatype.Plugi }, } - sidecar := apiv1.Container{ - Name: "plugin-controller", - Image: "waggle/plugin-controller:0.0.2", - Args: []string{ - "--enable-cpu-performance", - "--app-cgroup-dir", - "/app/sys/fs/cgroup", - "--enable-gpu-performance", - }, - Env: []apiv1.EnvVar{ - { - Name: "GPU_METRIC_HOST", - Value: "wes-jetson-exporter.default.svc.cluster.local", - }, - }, - VolumeMounts: []apiv1.VolumeMount{ - { - Name: "local-share", - MountPath: "/app/", - ReadOnly: true, - }, - }, - Resources: apiv1.ResourceRequirements{ - Limits: apiv1.ResourceList{}, - Requests: apiv1.ResourceList{ - apiv1.ResourceCPU: resource.MustParse("50m"), - apiv1.ResourceMemory: resource.MustParse("20Mi"), - }, - }, - } - - resources, err := resourceListForConfig(plugin.PluginSpec) + resources, err := resourceListForConfig(pr.Plugin.PluginSpec) if err != nil { return v1.PodTemplateSpec{}, err } containers := []apiv1.Container{ { - SecurityContext: securityContextForConfig(plugin.PluginSpec), - Name: plugin.Name, - Image: plugin.PluginSpec.Image, - Args: plugin.PluginSpec.Args, + SecurityContext: securityContextForConfig(pr.Plugin.PluginSpec), + Name: pr.Plugin.Name, + Image: pr.Plugin.PluginSpec.Image, + Args: pr.Plugin.PluginSpec.Args, Env: envs, Resources: resources, VolumeMounts: volumeMounts, - // make a symbolic link of the app container's cgroup to - // access to it from the sidecar container - Lifecycle: &apiv1.Lifecycle{ - PostStart: &apiv1.LifecycleHandler{ - Exec: &v1.ExecAction{ - Command: []string{ - "sh", - "-c", - "mkdir -p /waggle/sys/fs; ln -sf /sys/fs/cgroup /waggle/sys/fs", - }, - }, - }, - }, }, - sidecar, } - if plugin.PluginSpec.Entrypoint != "" { - containers[0].Command = []string{plugin.PluginSpec.Entrypoint} + if pr.Plugin.PluginSpec.Entrypoint != "" { + containers[0].Command = []string{pr.Plugin.PluginSpec.Entrypoint} + } + + // add plugin-controller sidecar container + if pr.EnablePluginController { + logger.Info.Printf("plugin-controller sidecar is added to %s", pr.Plugin.Name) + pluginControllerArgs := []string{ + "--enable-cpu-performance", + "--plugin-process-name", + containers[0].Command[0], + } + if _, found := pr.Plugin.PluginSpec.Selector["resource.gpu"]; found { + logger.Debug.Printf("%s's plugin-controller will collect GPU performance", pr.Plugin.Name) + pluginControllerArgs = append(pluginControllerArgs, "--enable-gpu-performance") + } + // adding plugin-controller to the pod + containers = append(containers, apiv1.Container{ + Name: "plugin-controller", + Image: "waggle/plugin-controller:0.1.0", + // Image: "10.31.81.1:5000/local/plugin-controller", + Args: pluginControllerArgs, + Env: []apiv1.EnvVar{ + { + Name: "GPU_METRIC_HOST", + Value: "wes-jetson-exporter.default.svc.cluster.local", + }, + }, + VolumeMounts: []apiv1.VolumeMount{ + { + Name: "local-share", + MountPath: "/app/", + ReadOnly: true, + }, + }, + Resources: apiv1.ResourceRequirements{ + Limits: apiv1.ResourceList{}, + Requests: apiv1.ResourceList{ + apiv1.ResourceCPU: resource.MustParse("50m"), + apiv1.ResourceMemory: resource.MustParse("20Mi"), + }, + }, + }) } return v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: rm.labelsForPlugin(plugin), + Labels: rm.labelsForPlugin(&pr.Plugin), }, Spec: apiv1.PodSpec{ ServiceAccountName: "wes-plugin-account", PriorityClassName: "wes-app-priority", - NodeSelector: nodeSelectorForConfig(plugin.PluginSpec), + NodeSelector: nodeSelectorForConfig(pr.Plugin.PluginSpec), // TODO: The priority class will be revisited when using resource metrics to schedule plugins - // NOTE(Yongho): ShareProcessNamespace allows the sidecar to access app container's filesystem. - // This can be used later as needed - // ShareProcessNamespace: booltoPtr(true), - InitContainers: initContainers, - Containers: containers, - Volumes: volumes, + // NOTE: ShareProcessNamespace allows containers in a pod to share the process namespace. + // containers in that pod can see other's processes + ShareProcessNamespace: booltoPtr(true), + InitContainers: initContainers, + Containers: containers, + Volumes: volumes, }, }, nil } // CreateKubernetesPod creates a Pod for the plugin -func (rm *ResourceManager) CreatePod(plugin *datatype.Plugin) (*apiv1.Pod, error) { - name, err := pluginNameForSpecDeployment(plugin) +func (rm *ResourceManager) CreatePod(pr *datatype.PluginRuntime) (*apiv1.Pod, error) { + name, err := pluginNameForSpecDeployment(&pr.Plugin) if err != nil { return nil, err } - template, err := rm.createPodTemplateSpecForPlugin(plugin) + template, err := rm.createPodTemplateSpecForPlugin(pr) if err != nil { return nil, err } @@ -662,12 +667,12 @@ func (rm *ResourceManager) CreatePod(plugin *datatype.Plugin) (*apiv1.Pod, error } // CreateK3SJob creates and returns a Kubernetes job object of the pllugin -func (rm *ResourceManager) CreateJob(plugin *datatype.Plugin) (*batchv1.Job, error) { - name, err := pluginNameForSpecDeployment(plugin) +func (rm *ResourceManager) CreateJob(pr *datatype.PluginRuntime) (*batchv1.Job, error) { + name, err := pluginNameForSpecDeployment(&pr.Plugin) if err != nil { return nil, err } - template, err := rm.createPodTemplateSpecForPlugin(plugin) + template, err := rm.createPodTemplateSpecForPlugin(pr) if err != nil { return nil, err } @@ -688,12 +693,12 @@ func (rm *ResourceManager) CreateJob(plugin *datatype.Plugin) (*batchv1.Job, err // CreateDeployment creates and returns a Kubernetes deployment object of the plugin // It also embeds a K3S configmap for plugin if needed -func (rm *ResourceManager) CreateDeployment(plugin *datatype.Plugin) (*appsv1.Deployment, error) { - name, err := pluginNameForSpecDeployment(plugin) +func (rm *ResourceManager) CreateDeployment(pr *datatype.PluginRuntime) (*appsv1.Deployment, error) { + name, err := pluginNameForSpecDeployment(&pr.Plugin) if err != nil { return nil, err } - template, err := rm.createPodTemplateSpecForPlugin(plugin) + template, err := rm.createPodTemplateSpecForPlugin(pr) if err != nil { return nil, err } @@ -712,12 +717,12 @@ func (rm *ResourceManager) CreateDeployment(plugin *datatype.Plugin) (*appsv1.De }, nil } -func (rm *ResourceManager) CreateDaemonSet(plugin *datatype.Plugin) (*appsv1.DaemonSet, error) { - name, err := pluginNameForSpecDeployment(plugin) +func (rm *ResourceManager) CreateDaemonSet(pr *datatype.PluginRuntime) (*appsv1.DaemonSet, error) { + name, err := pluginNameForSpecDeployment(&pr.Plugin) if err != nil { return nil, err } - template, err := rm.createPodTemplateSpecForPlugin(plugin) + template, err := rm.createPodTemplateSpecForPlugin(pr) if err != nil { return nil, err } @@ -1017,7 +1022,7 @@ func (rm *ResourceManager) ClenUp() error { func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) { logger.Debug.Printf("Running plugin %q...", pr.Plugin.Name) - job, err := rm.CreateJob(&pr.Plugin) + job, err := rm.CreateJob(pr) if err != nil { logger.Error.Printf("Failed to create Kubernetes Job for %q: %q", pr.Plugin.Name, err.Error()) rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(&pr.Plugin).Build()) diff --git a/pkg/pluginctl/pluginctl.go b/pkg/pluginctl/pluginctl.go index cc179b1..48592d7 100644 --- a/pkg/pluginctl/pluginctl.go +++ b/pkg/pluginctl/pluginctl.go @@ -38,18 +38,19 @@ type PluginCtl struct { // Deployment holds the config pluginctl uses to deploy plugins type Deployment struct { - Name string - SelectorString string - Node string - Entrypoint string - Privileged bool - PluginImage string - PluginArgs []string - EnvVarString []string - EnvFromFile string - DevelopMode bool - Type string - ResourceString string + Name string + SelectorString string + Node string + Entrypoint string + Privileged bool + PluginImage string + PluginArgs []string + EnvVarString []string + EnvFromFile string + DevelopMode bool + Type string + ResourceString string + EnablePluginController bool } func NewPluginCtl(kubeconfig string) (*PluginCtl, error) { @@ -164,45 +165,47 @@ func (p *PluginCtl) Deploy(dep *Deployment) (string, error) { if err != nil { return "", fmt.Errorf("Failed to parse env %q", err.Error()) } - plugin := datatype.Plugin{ - Name: dep.Name, - PluginSpec: &datatype.PluginSpec{ - Privileged: dep.Privileged, - Node: dep.Node, - Image: dep.PluginImage, - Args: dep.PluginArgs, - Job: pluginctlJob, - Selector: selector, - Entrypoint: dep.Entrypoint, - Env: envs, - DevelopMode: dep.DevelopMode, - Resource: resource, + pluginRuntime := datatype.PluginRuntime{ + Plugin: datatype.Plugin{ + Name: dep.Name, + PluginSpec: &datatype.PluginSpec{ + Privileged: dep.Privileged, + Node: dep.Node, + Image: dep.PluginImage, + Args: dep.PluginArgs, + Job: pluginctlJob, + Selector: selector, + Entrypoint: dep.Entrypoint, + Env: envs, + DevelopMode: dep.DevelopMode, + Resource: resource, + }, }, } switch dep.Type { case "pod": - pod, err := p.ResourceManager.CreatePod(&plugin) + pod, err := p.ResourceManager.CreatePod(&pluginRuntime) if err != nil { return "", err } err = p.ResourceManager.UpdatePod(pod) return pod.Name, err case "job": - job, err := p.ResourceManager.CreateJob(&plugin) + job, err := p.ResourceManager.CreateJob(&pluginRuntime) if err != nil { return "", err } err = p.ResourceManager.RunPlugin(job) return job.Name, err case "deployment": - deployment, err := p.ResourceManager.CreateDeployment(&plugin) + deployment, err := p.ResourceManager.CreateDeployment(&pluginRuntime) if err != nil { return "", err } err = p.ResourceManager.UpdateDeployment(deployment) return deployment.Name, err case "daemonset": - daemonSet, err := p.ResourceManager.CreateDaemonSet(&plugin) + daemonSet, err := p.ResourceManager.CreateDaemonSet(&pluginRuntime) if err != nil { return "", err } From 3448e50b957272f6a42694a957fb34e158948356 Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Fri, 23 Jun 2023 15:52:51 -0500 Subject: [PATCH 06/12] Event now allows to send a single value --- pkg/datatype/event.go | 39 +++++++++++++--------- pkg/nodescheduler/nodescheduler.go | 6 ++-- pkg/nodescheduler/resourcemanager.go | 48 ++++++++++++++++++++-------- pkg/pluginctl/pluginctl.go | 1 + 4 files changed, 63 insertions(+), 31 deletions(-) diff --git a/pkg/datatype/event.go b/pkg/datatype/event.go index 0807b58..c6987ee 100644 --- a/pkg/datatype/event.go +++ b/pkg/datatype/event.go @@ -17,7 +17,7 @@ type EventBuilder struct { type Event struct { Type EventType Timestamp int64 - Meta map[string]string + Meta map[string]interface{} } func NewEventBuilder(eventType EventType) *EventBuilder { @@ -25,11 +25,16 @@ func NewEventBuilder(eventType EventType) *EventBuilder { e: Event{ Type: eventType, Timestamp: time.Now().UnixNano(), - Meta: map[string]string{}, + Meta: map[string]interface{}{}, }, } } +func (eb *EventBuilder) AddValue(v interface{}) *EventBuilder { + eb.e.Meta["value"] = v + return eb +} + func (eb *EventBuilder) AddReason(reason string) *EventBuilder { eb.e.Meta["reason"] = reason return eb @@ -41,7 +46,7 @@ func (eb *EventBuilder) AddJob(j *Job) *EventBuilder { } func (e *Event) GetJobID() string { - return e.get("job_id") + return e.get("job_id").(string) } func (eb *EventBuilder) AddGoal(goal *ScienceGoal) *EventBuilder { @@ -94,7 +99,7 @@ func (eb *EventBuilder) Build() Event { return eb.e } -func (e *Event) get(name string) string { +func (e *Event) get(name string) interface{} { if value, ok := e.Meta[name]; ok { return value } else { @@ -103,22 +108,22 @@ func (e *Event) get(name string) string { } func (e *Event) GetGoalName() string { - return e.get("goal_name") + return e.get("goal_name").(string) } func (e *Event) GetGoalID() string { - return e.get("goal_id") + return e.get("goal_id").(string) } func (e *Event) GetPluginName() string { - return e.get("plugin_name") + return e.get("plugin_name").(string) } func (e *Event) GetReason() string { - return e.get("reason") + return e.get("reason").(string) } -func (e *Event) GetEntry(k string) string { +func (e *Event) GetEntry(k string) interface{} { return e.get(k) } @@ -129,7 +134,7 @@ func (e *Event) ToString() string { func NewEventBuilderFromWaggleMessage(m *WaggleMessage) (*EventBuilder, error) { builder := NewEventBuilder(EventType(m.Name)) builder.e.Timestamp = m.Timestamp - var body map[string]string + var body map[string]interface{} err := json.Unmarshal([]byte(m.Value.(string)), &body) if err != nil { return nil, err @@ -148,14 +153,18 @@ func NewEventBuilderFromWaggleMessage(m *WaggleMessage) (*EventBuilder, error) { func (e *Event) ToWaggleMessage() *WaggleMessage { // TODO: beehive-influxdb does not handle bytes so body is always string. // This should be lifted once it accepts bytes. - encodedBody, err := e.EncodeMetaToJson() - if err != nil { - logger.Debug.Printf("Failed to convert to Waggle message: %q", err.Error()) - return nil + body := e.get("value") + if body == "" { + encodedBody, err := e.EncodeMetaToJson() + if err != nil { + logger.Debug.Printf("Failed to convert to Waggle message: %q", err.Error()) + return nil + } + body = string(encodedBody) } return NewMessage( string(e.Type), - string(encodedBody), + body, e.Timestamp, map[string]string{}, ) diff --git a/pkg/nodescheduler/nodescheduler.go b/pkg/nodescheduler/nodescheduler.go index bd24c01..098fed0 100644 --- a/pkg/nodescheduler/nodescheduler.go +++ b/pkg/nodescheduler/nodescheduler.go @@ -79,7 +79,7 @@ func (ns *NodeScheduler) Run() { select { case event := <-ns.chanFromCloudScheduler: logger.Debug.Printf("%s", event.ToString()) - goals := event.GetEntry("goals") + goals := event.GetEntry("goals").(string) err := ns.ResourceManager.CreateConfigMap( configMapNameForGoals, map[string]string{"goals": goals}, @@ -115,7 +115,7 @@ func (ns *NodeScheduler) Run() { // make a hard copy of the plugin _p := *plugin pr := datatype.NewPluginRuntimeWithScienceRule(_p, *r) - // TODO: we enable plugin-controller always. we will want to control this. + // TODO: we enable plugin-controller always. we will want to control this later. pr.SetPluginController(true) if _pr := ns.waitingQueue.Pop(pr); _pr != nil { ns.readyQueue.Push(pr) @@ -228,7 +228,7 @@ func (ns *NodeScheduler) Run() { case datatype.EventGoalStatusReceivedBulk: // A goal set is received. We add or update the goals. logger.Debug.Printf("A bulk goal is received") - data := event.GetEntry("goals") + data := event.GetEntry("goals").(string) var goals []datatype.ScienceGoal err := json.Unmarshal([]byte(data), &goals) if err != nil { diff --git a/pkg/nodescheduler/resourcemanager.go b/pkg/nodescheduler/resourcemanager.go index 79c2211..670ab92 100644 --- a/pkg/nodescheduler/resourcemanager.go +++ b/pkg/nodescheduler/resourcemanager.go @@ -456,12 +456,6 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(pr *datatype.PluginRun }, }, }, - { - Name: "local-share", - VolumeSource: apiv1.VolumeSource{ - EmptyDir: &apiv1.EmptyDirVolumeSource{}, - }, - }, } volumeMounts := []apiv1.VolumeMount{ @@ -590,24 +584,52 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(pr *datatype.PluginRun logger.Info.Printf("plugin-controller sidecar is added to %s", pr.Plugin.Name) pluginControllerArgs := []string{ "--enable-cpu-performance", - "--plugin-process-name", - containers[0].Command[0], + "--enable-metrics-publishing", + } + if len(containers[0].Command) >= 1 { + pluginProcessName := containers[0].Command[0] + logger.Info.Printf("user specified plugin process (%s). it will be passed to the plugin-controller", pluginProcessName) + pluginControllerArgs = append(pluginControllerArgs, "--plugin-process-name", pluginProcessName) } if _, found := pr.Plugin.PluginSpec.Selector["resource.gpu"]; found { - logger.Debug.Printf("%s's plugin-controller will collect GPU performance", pr.Plugin.Name) + logger.Info.Printf("%s's plugin-controller will collect GPU performance", pr.Plugin.Name) pluginControllerArgs = append(pluginControllerArgs, "--enable-gpu-performance") } // adding plugin-controller to the pod containers = append(containers, apiv1.Container{ - Name: "plugin-controller", - Image: "waggle/plugin-controller:0.1.0", - // Image: "10.31.81.1:5000/local/plugin-controller", - Args: pluginControllerArgs, + Name: "plugin-controller", + // Image: "waggle/plugin-controller:0.1.1", + Image: "10.31.81.1:5000/local/plugin-controller", + Args: pluginControllerArgs, Env: []apiv1.EnvVar{ { Name: "GPU_METRIC_HOST", Value: "wes-jetson-exporter.default.svc.cluster.local", }, + { + Name: "WAGGLE_PLUGIN_HOST", + Value: "wes-rabbitmq.default.svc.cluster.local", + }, + { + Name: "WAGGLE_PLUGIN_PORT", + Value: "5672", + }, + { + Name: "WAGGLE_PLUGIN_USERNAME", + Value: "plugin", + }, + { + Name: "WAGGLE_PLUGIN_PASSWORD", + Value: "plugin", + }, + { + Name: "WAGGLE_APP_ID", + ValueFrom: &apiv1.EnvVarSource{ + FieldRef: &apiv1.ObjectFieldSelector{ + FieldPath: "metadata.uid", + }, + }, + }, }, VolumeMounts: []apiv1.VolumeMount{ { diff --git a/pkg/pluginctl/pluginctl.go b/pkg/pluginctl/pluginctl.go index 48592d7..2e7683f 100644 --- a/pkg/pluginctl/pluginctl.go +++ b/pkg/pluginctl/pluginctl.go @@ -181,6 +181,7 @@ func (p *PluginCtl) Deploy(dep *Deployment) (string, error) { Resource: resource, }, }, + EnablePluginController: dep.EnablePluginController, } switch dep.Type { case "pod": From ada132c05f511c3bbb8ca3bdd2611890648b697b Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Mon, 26 Jun 2023 11:27:04 -0500 Subject: [PATCH 07/12] Fixed event_test.go --- pkg/datatype/event.go | 2 +- pkg/datatype/event_test.go | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/datatype/event.go b/pkg/datatype/event.go index c6987ee..9debd9b 100644 --- a/pkg/datatype/event.go +++ b/pkg/datatype/event.go @@ -55,7 +55,7 @@ func (eb *EventBuilder) AddGoal(goal *ScienceGoal) *EventBuilder { return eb } -func (eb *EventBuilder) AddEntry(k string, v string) *EventBuilder { +func (eb *EventBuilder) AddEntry(k string, v interface{}) *EventBuilder { eb.e.Meta[k] = v return eb } diff --git a/pkg/datatype/event_test.go b/pkg/datatype/event_test.go index da7ba12..c5db4c8 100644 --- a/pkg/datatype/event_test.go +++ b/pkg/datatype/event_test.go @@ -8,19 +8,27 @@ import ( func TestEventWaggleConversion(t *testing.T) { tests := map[string]struct { Type string - Payload map[string]string + Payload map[string]interface{} }{ "simple": { Type: string(EventPluginStatusLaunched), - Payload: map[string]string{ - "test": "great", + Payload: map[string]interface{}{ + "test": "great", + "float": 3.14, }, }, } for _, test := range tests { e := NewEventBuilder(EventType(test.Type)) for k, v := range test.Payload { - e.AddEntry(k, v) + switch v.(type) { + case string: + e.AddEntry(k, v.(string)) + case int: + t.Errorf("integer type is not supported. use float instead.") + case float64: + e.AddEntry(k, v.(float64)) + } } msg := e.Build() waggleMsg := msg.ToWaggleMessage() From 849d6e27650efa2215b1be75310897e6f2bcb3b3 Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Mon, 26 Jun 2023 14:32:08 -0500 Subject: [PATCH 08/12] Added PreStart hook to let pc know the plugin has started --- pkg/nodescheduler/resourcemanager.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/nodescheduler/resourcemanager.go b/pkg/nodescheduler/resourcemanager.go index 670ab92..b039eae 100644 --- a/pkg/nodescheduler/resourcemanager.go +++ b/pkg/nodescheduler/resourcemanager.go @@ -572,6 +572,14 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(pr *datatype.PluginRun Env: envs, Resources: resources, VolumeMounts: volumeMounts, + // let plugin-controller know that it has started + Lifecycle: &apiv1.Lifecycle{ + PostStart: &apiv1.LifecycleHandler{ + Exec: &apiv1.ExecAction{ + Command: []string{"touch", "/waggle/started"}, + }, + }, + }, }, } @@ -598,7 +606,7 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(pr *datatype.PluginRun // adding plugin-controller to the pod containers = append(containers, apiv1.Container{ Name: "plugin-controller", - // Image: "waggle/plugin-controller:0.1.1", + // Image: "waggle/plugin-controller:0.2.0", Image: "10.31.81.1:5000/local/plugin-controller", Args: pluginControllerArgs, Env: []apiv1.EnvVar{ From 5bb820f91d55d973be750511f612fb9101ec6b5d Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Tue, 27 Jun 2023 16:48:46 -0500 Subject: [PATCH 09/12] scheduler uses Pod --- pkg/datatype/event.go | 3 + pkg/nodescheduler/resourcemanager.go | 274 ++++++++++++++++----------- pkg/pluginctl/pluginctl.go | 4 +- 3 files changed, 167 insertions(+), 114 deletions(-) diff --git a/pkg/datatype/event.go b/pkg/datatype/event.go index 9debd9b..46e0d01 100644 --- a/pkg/datatype/event.go +++ b/pkg/datatype/event.go @@ -92,6 +92,9 @@ func (eb *EventBuilder) AddPodMeta(pod *apiv1.Pod) *EventBuilder { eb.e.Meta["k3s_pod_name"] = pod.Name eb.e.Meta["k3s_pod_status"] = string(pod.Status.Phase) eb.e.Meta["k3s_pod_node_name"] = pod.Spec.NodeName + if v, found := pod.Labels["sagecontinuum.org/plugin-instance"]; found { + eb.e.Meta["k3s_pod_instance"] = v + } return eb } diff --git a/pkg/nodescheduler/resourcemanager.go b/pkg/nodescheduler/resourcemanager.go index b039eae..b866f54 100644 --- a/pkg/nodescheduler/resourcemanager.go +++ b/pkg/nodescheduler/resourcemanager.go @@ -2,12 +2,12 @@ package nodescheduler import ( "context" - "crypto/rand" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "io" + "math/rand" "os" "path" "path/filepath" @@ -41,6 +41,7 @@ const ( namespace = "ses" rancherKubeconfigPath = "/etc/rancher/k3s/k3s.yaml" configMapNameForGoals = "waggle-plugin-scheduler-goals" + letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" ) var ( @@ -99,14 +100,23 @@ func generatePassword() string { return hex.EncodeToString(b) } +func generateRandomString(n int) string { + s := make([]byte, n) + for i := range s { + s[i] = letters[rand.Intn(len(letters))] + } + return string(s) +} + func (rm *ResourceManager) labelsForPlugin(plugin *datatype.Plugin) map[string]string { labels := map[string]string{ - "app": plugin.Name, - "app.kubernetes.io/name": plugin.Name, - "app.kubernetes.io/managed-by": rm.runner, - "app.kubernetes.io/created-by": rm.runner, - "sagecontinuum.org/plugin-job": plugin.PluginSpec.Job, - "sagecontinuum.org/plugin-task": plugin.Name, + "app": plugin.Name, + "app.kubernetes.io/name": plugin.Name, + "app.kubernetes.io/managed-by": rm.runner, + "app.kubernetes.io/created-by": rm.runner, + "sagecontinuum.org/plugin-job": plugin.PluginSpec.Job, + "sagecontinuum.org/plugin-task": plugin.Name, + "sagecontinuum.org/plugin-instance": plugin.Name + "-" + generateRandomString(6), } // in develop mode, we omit the role labels to opt out of network traffic filtering @@ -350,6 +360,30 @@ func (rm *ResourceManager) WatchJobs(namespace string) (watch.Interface, error) return watcher, err } +func (rm *ResourceManager) WatchPod(name string, namespace string, retry int) (watcher watch.Interface, err error) { + if namespace == "" { + namespace = rm.Namespace + } + for i := 0; i <= retry; i++ { + pod, err := rm.Clientset.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + logger.Debug.Printf("Failed to get pod %q", err.Error()) + time.Sleep(3 * time.Second) + continue + } + // var selector *metav1.LabelSelector + // err = metav1.Convert_Map_string_To_string_To_v1_LabelSelector(&configMap.Labels, selector, nil) + watcher, err = rm.Clientset.CoreV1().Pods(namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace})) + if err != nil { + logger.Debug.Printf("Failed to get watcher for %q: %q", pod.Name, err.Error()) + time.Sleep(3 * time.Second) + continue + } + break + } + return +} + func (rm *ResourceManager) createPodTemplateSpecForPlugin(pr *datatype.PluginRuntime) (v1.PodTemplateSpec, error) { envs := []apiv1.EnvVar{ { @@ -572,14 +606,6 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(pr *datatype.PluginRun Env: envs, Resources: resources, VolumeMounts: volumeMounts, - // let plugin-controller know that it has started - Lifecycle: &apiv1.Lifecycle{ - PostStart: &apiv1.LifecycleHandler{ - Exec: &apiv1.ExecAction{ - Command: []string{"touch", "/waggle/started"}, - }, - }, - }, }, } @@ -654,6 +680,17 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(pr *datatype.PluginRun }, }, }) + // let plugin-controller know that it has started. + // note that this hook will probably fail if the plugin container + // runs too fast. for example, bash echo "hi" makes the plugin container + // exits before Kubernetes attempts to inject this hook + containers[0].Lifecycle = &apiv1.Lifecycle{ + PostStart: &apiv1.LifecycleHandler{ + Exec: &apiv1.ExecAction{ + Command: []string{"touch", "/waggle/started"}, + }, + }, + } } return v1.PodTemplateSpec{ @@ -676,7 +713,7 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(pr *datatype.PluginRun } // CreateKubernetesPod creates a Pod for the plugin -func (rm *ResourceManager) CreatePod(pr *datatype.PluginRuntime) (*apiv1.Pod, error) { +func (rm *ResourceManager) CreatePodTemplate(pr *datatype.PluginRuntime) (*apiv1.Pod, error) { name, err := pluginNameForSpecDeployment(&pr.Plugin) if err != nil { return nil, err @@ -810,6 +847,13 @@ func (rm *ResourceManager) UpdatePod(pod *apiv1.Pod) error { } } +func (rm *ResourceManager) CreatePod(pod *apiv1.Pod) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + _, err := rm.Clientset.CoreV1().Pods(rm.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + return err +} + func (rm *ResourceManager) UpdateDeployment(deployment *appsv1.Deployment) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -916,6 +960,18 @@ func (rm *ResourceManager) TerminateJob(jobName string) error { }) } +// TerminatePod terminates the Kubernetes Pod object. We set the graceperiod to 1 second to terminate +// any Job or Pod in case they do not respond to the termination signal +func (rm *ResourceManager) TerminatePod(podName string) error { + // This option allows us to quickly spin up the same plugin + // The Foreground option waits until the pod deletes, which takes time + deleteDependencies := metav1.DeletePropagationBackground + return rm.Clientset.CoreV1().Pods(rm.Namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{ + GracePeriodSeconds: int64Ptr(0), + PropagationPolicy: &deleteDependencies, + }) +} + func (rm *ResourceManager) GetPluginStatus(jobName string) (apiv1.PodPhase, error) { // TODO: Later we use pod name as we run plugins in one-shot? ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second) @@ -966,29 +1022,14 @@ func (rm *ResourceManager) GetPodName(jobName string) (string, error) { return pod.Name, nil } -func (rm *ResourceManager) GetPluginLogHandler(jobName string, follow bool) (io.ReadCloser, error) { - pod, err := rm.GetPod(jobName) - if err != nil { - return nil, err - } - switch pod.Status.Phase { - case apiv1.PodPending: - return nil, fmt.Errorf("The plugin is in pending state") - case apiv1.PodRunning: - fallthrough - case apiv1.PodSucceeded: - fallthrough - case apiv1.PodFailed: - req := rm.Clientset.CoreV1().Pods(rm.Namespace).GetLogs(pod.Name, &apiv1.PodLogOptions{Follow: follow}) - return req.Stream(context.TODO()) - } - return nil, fmt.Errorf("The plugin (pod) is in %q state", string(pod.Status.Phase)) - // podWatcher, err = rm.Clientset.CoreV1().Pods(rm.Namespace).Watch(ctx, metav1.ListOptions{LabelSelector: selector.String()}) +func (rm *ResourceManager) GetPodLogHandler(podName string, follow bool) (io.ReadCloser, error) { + req := rm.Clientset.CoreV1().Pods(rm.Namespace).GetLogs(podName, &apiv1.PodLogOptions{Follow: follow}) + return req.Stream(context.TODO()) } // GetLastPluginLog fills up given buffer with the last plugin log. -func (rm *ResourceManager) GetLastPluginLog(jobName string, lastLog []byte) (int, error) { - logReader, err := rm.GetPluginLogHandler(jobName, false) +func (rm *ResourceManager) GetPodLog(podName string, lastLog []byte) (int, error) { + logReader, err := rm.GetPodLogHandler(podName, false) if err != nil { return 0, nil } @@ -1052,119 +1093,128 @@ func (rm *ResourceManager) ClenUp() error { func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) { logger.Debug.Printf("Running plugin %q...", pr.Plugin.Name) - job, err := rm.CreateJob(pr) + pod, err := rm.CreatePodTemplate(pr) if err != nil { - logger.Error.Printf("Failed to create Kubernetes Job for %q: %q", pr.Plugin.Name, err.Error()) + logger.Error.Printf("Failed to create Kubernetes Pod for %q: %q", pr.Plugin.Name, err.Error()) rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(&pr.Plugin).Build()) return } - err = rm.RunPlugin(job) - defer rm.TerminateJob(job.Name) + err = rm.CreatePod(pod) + defer rm.TerminatePod(pod.Name) if err != nil { - logger.Error.Printf("Failed to run %q: %q", job.Name, err.Error()) + logger.Error.Printf("Failed to run %q: %q", pod.Name, err.Error()) rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(&pr.Plugin).Build()) return } - logger.Info.Printf("Plugin %q is scheduled", job.Name) - pr.Plugin.PluginSpec.Job = job.Name + logger.Info.Printf("Plugin %q is scheduled", pod.Name) + pr.Plugin.PluginSpec.Job = pod.Name // NOTE: The for loop helps to re-connect to Kubernetes watcher when the connection // gets closed while the plugin is running for { - watcher, err := rm.WatchJob(job.Name, rm.Namespace, 1) + watcher, err := rm.WatchPod(pod.Name, rm.Namespace, 1) if err != nil { - logger.Error.Printf("Failed to watch %q. Abort the execution", job.Name) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).Build()) + logger.Error.Printf("Failed to watch %q. Abort the execution", pod.Name) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed). + AddReason(err.Error()). + AddPodMeta(pod). + AddPluginMeta(&pr.Plugin). + Build()) return } chanEvent := watcher.ResultChan() defer watcher.Stop() for event := range chanEvent { - logger.Debug.Printf("Plugin %s received an event %s", job.Name, event.Type) + logger.Debug.Printf("Plugin %s received an event %s", pod.Name, event.Type) + _pod := event.Object.(*v1.Pod) switch event.Type { case watch.Added: - job := event.Object.(*batchv1.Job) - pod, _ := rm.GetPod(job.Name) - // var gpuMetricHostIP string - // if pr.NeedProfile { - // // we need to stop the timer eventually - // defer resourceCollectorTicker.Stop() - // } else { - // // if we don't need to profile plugin we just stop it - // resourceCollectorTicker.Stop() - // } - // gpuMetricHostIP = pod.Status.HostIP - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusLaunched).AddK3SJobMeta(job).AddPodMeta(pod).AddPluginMeta(&pr.Plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusLaunched). + AddPodMeta(_pod). + AddPluginMeta(&pr.Plugin). + Build()) case watch.Modified: - job := event.Object.(*batchv1.Job) - if len(job.Status.Conditions) > 0 { - logger.Debug.Printf("Plugin %s status %s: %s", job.Name, event.Type, job.Status.Conditions[0].Type) - switch job.Status.Conditions[0].Type { - case batchv1.JobComplete: - eventBuilder := datatype.NewEventBuilder(datatype.EventPluginStatusComplete).AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin) - if pod, err := rm.GetPod(job.Name); err != nil { - logger.Debug.Printf("Failed to get pod for job %q: %s", job.Name, err.Error()) - } else { - eventBuilder = eventBuilder.AddPodMeta(pod) - } - rm.Notifier.Notify(eventBuilder.Build()) - return - case batchv1.JobFailed: - logger.Error.Printf("Plugin %q has failed.", job.Name) - eventBuilder := datatype.NewEventBuilder(datatype.EventPluginStatusFailed). - AddK3SJobMeta(job). - AddPluginMeta(&pr.Plugin) - if pod, err := rm.GetPod(job.Name); err != nil { - logger.Error.Printf("Failed to get pod for job %q: %s", job.Name, err.Error()) - } else { - eventBuilder = eventBuilder.AddPodMeta(pod) - if state := pod.Status.ContainerStatuses[0].State.Terminated; state != nil { - eventBuilder = eventBuilder.AddReason(state.Reason). - AddEntry("return_code", fmt.Sprintf("%d", state.ExitCode)) - } - lastLog := make([]byte, 1024) - if lengthRead, err := rm.GetLastPluginLog(job.Name, lastLog); err == nil { - eventBuilder = eventBuilder.AddEntry("error_log", string(lastLog[:lengthRead])) - logger.Debug.Printf("Logs of the plugin %q: %s", job.Name, string(lastLog[:lengthRead])) - } else { - eventBuilder = eventBuilder.AddEntry("error_log", err.Error()) - logger.Debug.Printf("Failed to get plugin log: %s", err.Error()) - } + logger.Debug.Printf("%s: %s", _pod.Name, _pod.Status.Phase) + for _, c := range _pod.Status.ContainerStatuses { + logger.Debug.Printf("%s: (%s) %s", _pod.Name, c.Name, &c.State) + } + switch _pod.Status.Phase { + case v1.PodSucceeded: + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusComplete). + AddPodMeta(_pod). + AddPluginMeta(&pr.Plugin). + Build()) + return + case v1.PodFailed: + // even if the pod failed, if the plugin container succeeded + // we consider the pod succeeded. + pluginContainerStatus := _pod.Status.ContainerStatuses[0] + if pluginContainerStatus.State.Terminated.ExitCode == 0 { + // only for debugging purpose + if pcStatus := _pod.Status.ContainerStatuses[1].State.Terminated; pcStatus != nil { + logger.Debug.Printf("%s's plugin controller failed: %d (%s) %s", _pod.Name, pcStatus.ExitCode, pcStatus.Reason, pcStatus.Message) } - rm.Notifier.Notify(eventBuilder.Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusComplete). + AddPodMeta(_pod). + AddPluginMeta(&pr.Plugin). + Build()) return } - } else { - logger.Debug.Printf("Plugin %s status %s: %s", job.Name, event.Type, "UNKNOWN") + logger.Error.Printf("Plugin %q has failed.", _pod.Name) + eventBuilder := datatype.NewEventBuilder(datatype.EventPluginStatusFailed). + AddPodMeta(_pod). + AddPluginMeta(&pr.Plugin) + if state := _pod.Status.ContainerStatuses[0].State.Terminated; state != nil { + eventBuilder = eventBuilder.AddReason(state.Reason). + AddEntry("return_code", fmt.Sprintf("%d", state.ExitCode)) + } + lastLog := make([]byte, 1024) + if lengthRead, err := rm.GetPodLog(_pod.Name, lastLog); err == nil { + eventBuilder = eventBuilder.AddEntry("error_log", string(lastLog[:lengthRead])) + logger.Debug.Printf("Logs of the plugin %q: %s", _pod.Name, string(lastLog[:lengthRead])) + } else { + eventBuilder = eventBuilder.AddEntry("error_log", err.Error()) + logger.Debug.Printf("Failed to get plugin log: %s", err.Error()) + } + rm.Notifier.Notify(eventBuilder.Build()) + return } case watch.Deleted: logger.Debug.Printf("Plugin got deleted. Returning resource and notify") - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("Plugin deleted").AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed). + AddReason("Plugin deleted"). + AddPodMeta(_pod). + AddPluginMeta(&pr.Plugin). + Build()) return case watch.Error: logger.Debug.Printf("Error on watcher. Returning resource and notify") - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("Error on watcher").AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).Build()) + rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed). + AddReason("Error on watcher"). + AddPodMeta(_pod). + AddPluginMeta(&pr.Plugin). + Build()) return default: - logger.Error.Printf("Watcher of plugin %q received unknown event %q", job.Name, event.Type) + logger.Error.Printf("Watcher of plugin %q received unknown event %q", _pod.Name, event.Type) } } watcher.Stop() - logger.Error.Printf("Watcher of the plugin %s is unexpectedly closed. ", job.Name) + logger.Error.Printf("Watcher of the plugin %s is unexpectedly closed. ", pod.Name) // when a pod becomes unhealthy (e.g., a host device of the pod disconnected) the watcher // gets closed, but the job remains valid in the cluster and never runs the pod again. // To get out from this loop, we check if the pod is running, if not, we should terminate the plugin - if pod, err := rm.GetPod(job.Name); err != nil { - logger.Error.Printf("failed to get status of pod for job %q: %s", job.Name, err.Error()) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("pod no longer exist").AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).Build()) - return - } else { - if pod.Status.Phase != apiv1.PodRunning { - logger.Error.Printf("pod %q is not running for job %q. Closing plugin", pod.Name, job.Name) - rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("pod no longer running").AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).AddPodMeta(pod).Build()) - return - } - } - logger.Info.Printf("attemping to re-connect for job %q", job.Name) + // if pod, err := rm.GetPod(job.Name); err != nil { + // logger.Error.Printf("failed to get status of pod for job %q: %s", job.Name, err.Error()) + // rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("pod no longer exist").AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).Build()) + // return + // } else { + // if pod.Status.Phase != apiv1.PodRunning { + // logger.Error.Printf("pod %q is not running for job %q. Closing plugin", pod.Name, job.Name) + // rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason("pod no longer running").AddK3SJobMeta(job).AddPluginMeta(&pr.Plugin).AddPodMeta(pod).Build()) + // return + // } + // } + logger.Info.Printf("attemping to re-connect for pod %q", pod.Name) } } diff --git a/pkg/pluginctl/pluginctl.go b/pkg/pluginctl/pluginctl.go index 2e7683f..b4d05aa 100644 --- a/pkg/pluginctl/pluginctl.go +++ b/pkg/pluginctl/pluginctl.go @@ -185,7 +185,7 @@ func (p *PluginCtl) Deploy(dep *Deployment) (string, error) { } switch dep.Type { case "pod": - pod, err := p.ResourceManager.CreatePod(&pluginRuntime) + pod, err := p.ResourceManager.CreatePodTemplate(&pluginRuntime) if err != nil { return "", err } @@ -301,7 +301,7 @@ func (p *PluginCtl) RunAsync(dep *Deployment, chEvent chan<- datatype.Event, out } func (p *PluginCtl) PrintLog(pluginName string, follow bool) (func(), chan os.Signal, error) { - podLog, err := p.ResourceManager.GetPluginLogHandler(pluginName, follow) + podLog, err := p.ResourceManager.GetPodLogHandler(pluginName, follow) if err != nil { return nil, nil, err } From e4fdec3e8f413e0e439dfaf6f38f1b74c991394c Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Fri, 30 Jun 2023 14:26:15 -0500 Subject: [PATCH 10/12] Updated RMQ.go to support async message publishing --- pkg/interfacing/rmq.go | 81 +++++++++++++++++++++--------- pkg/nodescheduler/nodescheduler.go | 26 ++++++---- 2 files changed, 71 insertions(+), 36 deletions(-) diff --git a/pkg/interfacing/rmq.go b/pkg/interfacing/rmq.go index 033f7d4..4407e47 100644 --- a/pkg/interfacing/rmq.go +++ b/pkg/interfacing/rmq.go @@ -13,6 +13,22 @@ import ( "gopkg.in/cenkalti/backoff.v1" ) +type RabbitMQMessageWrapper struct { + DestName string + RoutingKey string + Body []byte + ContentType string +} + +func NewRabbitMQMessageWrapper(to string, scope string, body []byte, contentType string) *RabbitMQMessageWrapper { + return &RabbitMQMessageWrapper{ + DestName: to, + RoutingKey: scope, + Body: body, + ContentType: contentType, + } +} + type RabbitMQHandler struct { RabbitmqURI string rabbitmqUsername string @@ -21,6 +37,7 @@ type RabbitMQHandler struct { rabbitmqConn *amqp.Connection rabbitmqChan *amqp.Channel appID string + chanToPublish chan RabbitMQMessageWrapper } func NewRabbitMQHandler(rabbitmqURI string, rabbitmqUsername string, rabbitmqPassword string, cacertPath string, appID string) *RabbitMQHandler { @@ -30,6 +47,7 @@ func NewRabbitMQHandler(rabbitmqURI string, rabbitmqUsername string, rabbitmqPas rabbitmqPassword: rabbitmqPassword, cacertPath: cacertPath, appID: appID, + chanToPublish: make(chan RabbitMQMessageWrapper, 100), } } @@ -124,8 +142,7 @@ func (rh *RabbitMQHandler) DeclareQueueAndConnectToExchange(exchangeName string, return &q, err } -func (rh *RabbitMQHandler) SendYAML(routingKey string, message []byte) error { - exchange := "scheduler" +func (rh *RabbitMQHandler) publish(m RabbitMQMessageWrapper) error { if rh.rabbitmqConn == nil || rh.rabbitmqConn.IsClosed() { err := rh.Connect() if err != nil { @@ -133,43 +150,46 @@ func (rh *RabbitMQHandler) SendYAML(routingKey string, message []byte) error { } } err := rh.rabbitmqChan.Publish( - exchange, - routingKey, + m.DestName, + m.RoutingKey, false, false, amqp.Publishing{ - ContentType: "application/yaml", - Body: message, + Body: m.Body, + DeliveryMode: 2, + UserId: rh.rabbitmqUsername, + AppId: rh.appID, + ContentType: m.ContentType, }, ) return err } -// SendWaggleMessage delivers a Waggle message to Waggle data pipeline +// SendWaggleMessageOnNode delivers a Waggle message to Waggle data pipeline inside a node // // The message is sent to the "to-validator" exchange -func (rh *RabbitMQHandler) SendWaggleMessage(message *datatype.WaggleMessage, scope string) error { +func (rh *RabbitMQHandler) SendWaggleMessageOnNode(message *datatype.WaggleMessage, scope string) error { logger.Debug.Println(string(datatype.Dump(message))) - exchange := "to-validator" - if rh.rabbitmqConn == nil || rh.rabbitmqConn.IsClosed() { - err := rh.Connect() - if err != nil { - return err - } + return rh.publish(*NewRabbitMQMessageWrapper( + "to-validator", + scope, + datatype.Dump(message), + "", + )) +} + +// SendWaggleMessageOnNodeAsync caches message internally. The background routine will push messages asynchronously. +func (rh *RabbitMQHandler) SendWaggleMessageOnNodeAsync(message *datatype.WaggleMessage, scope string) error { + if len(rh.chanToPublish) == cap(rh.chanToPublish) { + return fmt.Errorf("maximum capacity (%d) reached. this message will not be cached", cap(rh.chanToPublish)) } - err := rh.rabbitmqChan.Publish( - exchange, + rh.chanToPublish <- *NewRabbitMQMessageWrapper( + "to-validator", scope, - false, - false, - amqp.Publishing{ - Body: datatype.Dump(message), - DeliveryMode: 2, - UserId: rh.rabbitmqUsername, - AppId: rh.appID, - }, + datatype.Dump(message), + "", ) - return err + return nil } func (rh *RabbitMQHandler) GetReceiver(queueName string) (<-chan amqp.Delivery, error) { @@ -234,3 +254,14 @@ func (rh *RabbitMQHandler) SubscribeEvents(exchange string, queueName string, to }() return nil } + +func (rh *RabbitMQHandler) StartLoop() { + go func() { + for m := range rh.chanToPublish { + logger.Debug.Printf("%v", m) + if err := rh.publish(m); err != nil { + logger.Error.Printf("failed to send message to %s: %s", m.DestName, err.Error()) + } + } + }() +} diff --git a/pkg/nodescheduler/nodescheduler.go b/pkg/nodescheduler/nodescheduler.go index 098fed0..4d304c2 100644 --- a/pkg/nodescheduler/nodescheduler.go +++ b/pkg/nodescheduler/nodescheduler.go @@ -59,7 +59,7 @@ func (ns *NodeScheduler) Configure() (err error) { return } if ns.Config.GoalStreamURL != "" { - logger.Info.Printf("Subscribing goal downstream from %s", ns.Config.GoalStreamURL) + logger.Info.Printf("subscribing goal downstream from %s", ns.Config.GoalStreamURL) u, err := url.Parse(ns.Config.GoalStreamURL) if err != nil { return err @@ -67,6 +67,10 @@ func (ns *NodeScheduler) Configure() (err error) { s := interfacing.NewHTTPRequest(u.Scheme + "://" + u.Host) s.Subscribe(u.Path, ns.chanFromCloudScheduler, true) } + if ns.LogToBeehive != nil { + logger.Info.Println("starting THE RMQ handler loop for message publishing") + ns.LogToBeehive.StartLoop() + } return } @@ -138,7 +142,7 @@ func (ns *NodeScheduler) Run() { } else { to = "all" } - go ns.LogToBeehive.SendWaggleMessage(message, to) + ns.LogToBeehive.SendWaggleMessageOnNodeAsync(message, to) case datatype.ScienceRuleActionSet: stateName := r.ActionObject var value interface{} @@ -159,7 +163,7 @@ func (ns *NodeScheduler) Run() { } if triggerScheduling { response := datatype.NewEventBuilder(datatype.EventPluginStatusPromoted).AddReason("kb triggered").Build() - go ns.LogToBeehive.SendWaggleMessage(response.ToWaggleMessage(), "node") + ns.LogToBeehive.SendWaggleMessageOnNodeAsync(response.ToWaggleMessage(), "node") ns.chanNeedScheduling <- response } case event := <-ns.chanNeedScheduling: @@ -181,7 +185,7 @@ func (ns *NodeScheduler) Run() { for _, _pr := range pluginsToRun { e := datatype.NewEventBuilder(datatype.EventPluginStatusScheduled).AddReason("Fit to resource").AddPluginMeta(&_pr.Plugin).Build() logger.Debug.Printf("%s: %q (%q)", e.ToString(), e.GetPluginName(), e.GetReason()) - go ns.LogToBeehive.SendWaggleMessage(e.ToWaggleMessage(), "all") + ns.LogToBeehive.SendWaggleMessageOnNodeAsync(e.ToWaggleMessage(), "all") pr := ns.readyQueue.Pop(_pr) ns.scheduledPlugins.Push(pr) go ns.ResourceManager.LaunchAndWatchPlugin(pr) @@ -191,7 +195,7 @@ func (ns *NodeScheduler) Run() { logger.Debug.Printf("%s", event.ToString()) switch event.Type { case datatype.EventPluginStatusLaunched: - go ns.LogToBeehive.SendWaggleMessage(event.ToWaggleMessage(), "all") + ns.LogToBeehive.SendWaggleMessageOnNodeAsync(event.ToWaggleMessage(), "all") case datatype.EventPluginStatusComplete: // publish plugin completion message locally so that // rule checker knows when the last execution was @@ -205,7 +209,7 @@ func (ns *NodeScheduler) Run() { event.Timestamp, map[string]string{}, ) - go ns.LogToBeehive.SendWaggleMessage(message, "node") + ns.LogToBeehive.SendWaggleMessageOnNodeAsync(message, "node") fallthrough case datatype.EventPluginStatusFailed: scienceGoal, err := ns.GoalManager.GetScienceGoalByID(event.GetGoalID()) @@ -220,7 +224,7 @@ func (ns *NodeScheduler) Run() { } pr := ns.scheduledPlugins.Pop(_pr) ns.waitingQueue.Push(pr) - go ns.LogToBeehive.SendWaggleMessage(event.ToWaggleMessage(), "all") + ns.LogToBeehive.SendWaggleMessageOnNodeAsync(event.ToWaggleMessage(), "all") } } // We trigger the scheduling logic for plugins that need to run @@ -279,7 +283,7 @@ func (ns *NodeScheduler) cleanUpGoal(goal *datatype.ScienceGoal) { logger.Error.Printf("Failed to get pod of the plugin %q", a.Plugin.Name) } else { e := datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddPluginMeta(&a.Plugin).AddPodMeta(pod).AddReason("Cleaning up the plugin due to deletion of the goal").Build() - go ns.LogToBeehive.SendWaggleMessage(e.ToWaggleMessage(), "all") + ns.LogToBeehive.SendWaggleMessageOnNodeAsync(e.ToWaggleMessage(), "all") } ns.ResourceManager.RemovePlugin(&a.Plugin) logger.Debug.Printf("plugin %s is removed from running", p.Name) @@ -312,13 +316,13 @@ func (ns *NodeScheduler) handleBulkGoals(goals []datatype.ScienceGoal) { ns.cleanUpGoal(existingGoal) ns.registerGoal(&goal) e := datatype.NewEventBuilder(datatype.EventGoalStatusUpdated).AddGoal(&goal).Build() - go ns.LogToBeehive.SendWaggleMessage(e.ToWaggleMessage(), "all") + ns.LogToBeehive.SendWaggleMessageOnNodeAsync(e.ToWaggleMessage(), "all") } } else { logger.Info.Printf("Adding the new goal %s %q", goal.Name, goal.ID) ns.registerGoal(&goal) e := datatype.NewEventBuilder(datatype.EventGoalStatusReceived).AddGoal(&goal).Build() - go ns.LogToBeehive.SendWaggleMessage(e.ToWaggleMessage(), "all") + ns.LogToBeehive.SendWaggleMessageOnNodeAsync(e.ToWaggleMessage(), "all") } } // Remove any existing goal that is not included in the new goal set @@ -326,7 +330,7 @@ func (ns *NodeScheduler) handleBulkGoals(goals []datatype.ScienceGoal) { if _, exist := goalsToKeep[goal.ID]; !exist { ns.cleanUpGoal(&goal) event := datatype.NewEventBuilder(datatype.EventGoalStatusRemoved).AddGoal(&goal).Build() - go ns.LogToBeehive.SendWaggleMessage(event.ToWaggleMessage(), "all") + ns.LogToBeehive.SendWaggleMessageOnNodeAsync(event.ToWaggleMessage(), "all") } } } From 6062a46732e0a8901be7acccee6d68ee5925b518 Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Fri, 30 Jun 2023 14:34:46 -0500 Subject: [PATCH 11/12] Made RMQ logs more useful --- pkg/interfacing/rmq.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/interfacing/rmq.go b/pkg/interfacing/rmq.go index 4407e47..bf07fb1 100644 --- a/pkg/interfacing/rmq.go +++ b/pkg/interfacing/rmq.go @@ -258,7 +258,7 @@ func (rh *RabbitMQHandler) SubscribeEvents(exchange string, queueName string, to func (rh *RabbitMQHandler) StartLoop() { go func() { for m := range rh.chanToPublish { - logger.Debug.Printf("%v", m) + logger.Debug.Printf("to %s with routing key %s: %s", m.DestName, m.RoutingKey, m.Body) if err := rh.publish(m); err != nil { logger.Error.Printf("failed to send message to %s: %s", m.DestName, err.Error()) } From 7ba66bd9e34475ce37e77840d20360f1cbc9eeb0 Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Fri, 30 Jun 2023 14:50:22 -0500 Subject: [PATCH 12/12] Updated the sidecar container image --- pkg/nodescheduler/resourcemanager.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/nodescheduler/resourcemanager.go b/pkg/nodescheduler/resourcemanager.go index b866f54..851be9a 100644 --- a/pkg/nodescheduler/resourcemanager.go +++ b/pkg/nodescheduler/resourcemanager.go @@ -631,9 +631,8 @@ func (rm *ResourceManager) createPodTemplateSpecForPlugin(pr *datatype.PluginRun } // adding plugin-controller to the pod containers = append(containers, apiv1.Container{ - Name: "plugin-controller", - // Image: "waggle/plugin-controller:0.2.0", - Image: "10.31.81.1:5000/local/plugin-controller", + Name: "plugin-controller", + Image: "waggle/plugin-controller:0.2.0", Args: pluginControllerArgs, Env: []apiv1.EnvVar{ {