Skip to content

Commit

Permalink
Merge pull request #114 from waggle-sensor/develop
Browse files Browse the repository at this point in the history
Fixed the plugin name when cleaning up the plugin triggered by dropping the job
  • Loading branch information
gemblerz authored Jan 17, 2024
2 parents 5be0313 + 38d217c commit 097317f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
33 changes: 26 additions & 7 deletions pkg/nodescheduler/nodescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nodescheduler

import (
"encoding/json"
"fmt"
"net/url"
"sync"
"time"
Expand Down Expand Up @@ -167,7 +168,9 @@ func (ns *NodeScheduler) Run() {
}
}
if triggerScheduling {
response := datatype.NewEventBuilder(datatype.EventPluginStatusPromoted).AddReason("kb triggered").Build()
response := datatype.NewEventBuilder(datatype.EventPluginStatusPromoted).
AddReason("kb triggered").
Build()
ns.LogToBeehive.SendWaggleMessageOnNodeAsync(response.ToWaggleMessage(), "node")
ns.chanNeedScheduling <- response
}
Expand All @@ -188,7 +191,10 @@ func (ns *NodeScheduler) Run() {
logger.Error.Printf("Failed to get the best task to run %q", err.Error())
} else {
for _, _pr := range pluginsToRun {
e := datatype.NewEventBuilder(datatype.EventPluginStatusScheduled).AddReason("Fit to resource").AddPluginMeta(&_pr.Plugin).Build()
e := datatype.NewEventBuilder(datatype.EventPluginStatusScheduled).
AddReason("Fit to resource").
AddPluginMeta(&_pr.Plugin).
Build()
logger.Debug.Printf("%s: %q (%q)", e.ToString(), e.GetPluginName(), e.GetReason())
ns.LogToBeehive.SendWaggleMessageOnNodeAsync(e.ToWaggleMessage(), "all")
pr := ns.readyQueue.Pop(_pr)
Expand All @@ -200,6 +206,8 @@ func (ns *NodeScheduler) Run() {
logger.Debug.Printf("%s", event.ToString())
switch event.Type {
case datatype.EventPluginStatusLaunched:
pluginName := event.GetPluginName()
logger.Info.Printf("Plugin %q is launched", pluginName)
ns.LogToBeehive.SendWaggleMessageOnNodeAsync(event.ToWaggleMessage(), "all")
case datatype.EventPluginStatusComplete:
// publish plugin completion message locally so that
Expand All @@ -208,6 +216,7 @@ func (ns *NodeScheduler) Run() {
// it if the checker is called before the delivery. We will need to make sure
// the message is delivered before triggering rule checking.
pluginName := event.GetPluginName()
logger.Info.Printf("Plugin %q is successfully completed", pluginName)
message := datatype.NewMessage(
string(datatype.EventPluginLastExecution),
pluginName,
Expand Down Expand Up @@ -285,15 +294,25 @@ func (ns *NodeScheduler) cleanUpGoal(goal *datatype.ScienceGoal) {
logger.Debug.Printf("plugin %s is removed from the ready queue", p.Name)
}
if a := ns.scheduledPlugins.Pop(pr); a != nil {
if pod, err := ns.ResourceManager.GetPod(a.Plugin.Name); err != nil {
// Pods have their job ID in the name
var podName string
if a.Plugin.JobID != "" {
podName = fmt.Sprintf("%s-%s", a.Plugin.Name, a.Plugin.JobID)
} else {
podName = a.Plugin.Name
}
if pod, err := ns.ResourceManager.GetPod(podName); err != nil {
logger.Error.Printf("Failed to get pod of the plugin %q", a.Plugin.Name)
} else {
e := datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddPluginMeta(&a.Plugin).AddPodMeta(pod).AddReason("Cleaning up the plugin due to deletion of the goal").Build()
e := datatype.NewEventBuilder(datatype.EventPluginStatusFailed).
AddPluginMeta(&a.Plugin).
AddPodMeta(pod).
AddReason("Cleaning up the plugin due to deletion of the goal").
Build()
ns.LogToBeehive.SendWaggleMessageOnNodeAsync(e.ToWaggleMessage(), "all")
ns.ResourceManager.TerminatePod(podName)
logger.Info.Printf("plugin %s is removed from running", p.Name)
}
ns.ResourceManager.RemovePlugin(&a.Plugin)
logger.Debug.Printf("plugin %s is removed from running", p.Name)

}
}
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/nodescheduler/resourcemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,10 +1387,6 @@ func (rm *ResourceManager) RunGabageCollector() error {
return nil
}

func (rm *ResourceManager) RemovePlugin(p *datatype.Plugin) {
rm.TerminatePod(p.Name)
}

func (rm *ResourceManager) Configure() (err error) {
err = rm.CreateNamespace("ses")
if err != nil {
Expand Down

0 comments on commit 097317f

Please sign in to comment.