Skip to content

Commit

Permalink
Improved --dry-run for job submission; Added jobID prefix to Pod name
Browse files Browse the repository at this point in the history
  • Loading branch information
gemblerz committed Nov 20, 2023
1 parent 2e2d15b commit d6b49e3
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 26 deletions.
6 changes: 3 additions & 3 deletions data/jobs/a_policy_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ plugins:
- -c
- "echo hello world; sleep 5; echo bye"
nodes:
W023:
W097:
scienceRules:
- "myfirstapp: True"
- "mysecondapp: True"
- "schedule(myfirstapp): True"
- "schedule(mysecondapp): True"
#- "myfirstapp: cronjob('myfirstapp', '0 * * * *')"
successcriteria:
- WallClock(1d)
10 changes: 7 additions & 3 deletions pkg/cloudscheduler/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ func (api *APIServer) handlerSubmitJobs(w http.ResponseWriter, r *http.Request)
return
}
}
// TODO: we should not commit to change on the existing goal of job when --dry-run is given
errorList := api.cloudScheduler.ValidateJobAndCreateScienceGoalForExistingJob(queries.Get("id"), user, flagDryRun)
if len(errorList) > 0 {
response := datatype.NewAPIMessageBuilder().AddError(fmt.Sprintf("%v", errorList)).Build()
Expand Down Expand Up @@ -372,12 +373,15 @@ func (api *APIServer) handlerSubmitJobs(w http.ResponseWriter, r *http.Request)
return
} else {
newJob.ScienceGoal = sg
jobID := api.cloudScheduler.GoalManager.AddJob(newJob)
response := datatype.NewAPIMessageBuilder().AddEntity("job_id", jobID)
response := datatype.NewAPIMessageBuilder().AddEntity("job_name", newJob.Name)
if flagDryRun {
response = response.AddEntity("dryrun", true)
} else {
response = response.AddEntity("state", datatype.JobSubmitted)
jobID := api.cloudScheduler.GoalManager.AddJob(newJob)
newJob.UpdateJobID(jobID)
api.cloudScheduler.GoalManager.UpdateJob(newJob, true)
response = response.AddEntity("job_id", jobID).
AddEntity("state", datatype.JobSubmitted)
}
respondJSON(w, http.StatusOK, response.Build().ToJson())
return
Expand Down
4 changes: 3 additions & 1 deletion pkg/cloudscheduler/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func (jv *JobValidator) GetPluginManifest(pluginImage string, updateDBIfNotExist
// Plugin name must follow RFC 1123.
// Reference: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-subdomain-names
func (jv *JobValidator) IsPluginNameValid(name string) bool {
if len(name) > 256 {
// the maximum length allowed is 256, but the scheduler may use several characters
// to indicate job ID when it names a plugin, thus reduce length of user plugins to 200
if len(name) > 200 {
return false
}
var validNamePattern = regexp.MustCompile("^[a-z0-9-]+$")
Expand Down
7 changes: 7 additions & 0 deletions pkg/datatype/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ func (j *Job) DropNode(nodeName string) {
}
}

func (j *Job) UpdateJobID(id string) {
j.JobID = id
if j.ScienceGoal != nil {
j.ScienceGoal.JobID = id
}
}

// ConvertToTemplate returns a new job object that contains the job description. The new job object does
// not contain user-specific information such as job ID, owner, email, etc. This is mostly useful
// when someone creates a new job from an existing job.
Expand Down
1 change: 1 addition & 0 deletions pkg/datatype/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Plugin struct {
Name string `json:"name" yaml:"name"`
PluginSpec *PluginSpec `json:"plugin_spec" yaml:"pluginSpec,omitempty"`
GoalID string `json:"goal_id,omitempty" yaml:"goalID,omitempty"`
JobID string `json:"job_id,omitempty" yaml:"jobID,omitempty"`
}

func (p *Plugin) GetPluginImage() (string, error) {
Expand Down
43 changes: 24 additions & 19 deletions pkg/nodescheduler/nodescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func (ns *NodeScheduler) Run() {
pr := datatype.NewPluginRuntimeWithScienceRule(_p, *r)
// TODO: we enable plugin-controller always. we will want to control this later.
pr.SetPluginController(true)
// we name Kubenetes Pod of the plugin using << pluginName-jobID >>
// to distinguish the same plugin names from different jobs
pr.Plugin.JobID = sg.JobID
if _pr := ns.waitingQueue.Pop(pr); _pr != nil {
ns.readyQueue.Push(pr)
triggerScheduling = true
Expand Down Expand Up @@ -269,27 +272,29 @@ func (ns *NodeScheduler) registerGoal(goal *datatype.ScienceGoal) {

func (ns *NodeScheduler) cleanUpGoal(goal *datatype.ScienceGoal) {
ns.Knowledgebase.DropRules(goal.Name)
for _, p := range goal.GetMySubGoal(ns.NodeID).GetPlugins() {
_p := *p
pr := &datatype.PluginRuntime{
Plugin: _p,
}
if a := ns.waitingQueue.Pop(pr); a != nil {
logger.Debug.Printf("plugin %s is removed from the waiting queue", p.Name)
}
if a := ns.readyQueue.Pop(pr); a != nil {
logger.Debug.Printf("plugin %s is removed from the ready queue", p.Name)
}
if a := ns.scheduledPlugins.Pop(pr); a != nil {
if pod, err := ns.ResourceManager.GetPod(a.Plugin.Name); err != nil {
logger.Error.Printf("Failed to get pod of the plugin %q", a.Plugin.Name)
} else {
e := datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddPluginMeta(&a.Plugin).AddPodMeta(pod).AddReason("Cleaning up the plugin due to deletion of the goal").Build()
ns.LogToBeehive.SendWaggleMessageOnNodeAsync(e.ToWaggleMessage(), "all")
if mySubGoal := goal.GetMySubGoal(ns.NodeID); mySubGoal != nil {
for _, p := range goal.GetMySubGoal(ns.NodeID).GetPlugins() {
_p := *p
pr := &datatype.PluginRuntime{
Plugin: _p,
}
ns.ResourceManager.RemovePlugin(&a.Plugin)
logger.Debug.Printf("plugin %s is removed from running", p.Name)
if a := ns.waitingQueue.Pop(pr); a != nil {
logger.Debug.Printf("plugin %s is removed from the waiting queue", p.Name)
}
if a := ns.readyQueue.Pop(pr); a != nil {
logger.Debug.Printf("plugin %s is removed from the ready queue", p.Name)
}
if a := ns.scheduledPlugins.Pop(pr); a != nil {
if pod, err := ns.ResourceManager.GetPod(a.Plugin.Name); err != nil {
logger.Error.Printf("Failed to get pod of the plugin %q", a.Plugin.Name)
} else {
e := datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddPluginMeta(&a.Plugin).AddPodMeta(pod).AddReason("Cleaning up the plugin due to deletion of the goal").Build()
ns.LogToBeehive.SendWaggleMessageOnNodeAsync(e.ToWaggleMessage(), "all")
}
ns.ResourceManager.RemovePlugin(&a.Plugin)
logger.Debug.Printf("plugin %s is removed from running", p.Name)

}
}
}
ns.GoalManager.DropGoal(goal.ID)
Expand Down
4 changes: 4 additions & 0 deletions pkg/nodescheduler/resourcemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,10 @@ func (rm *ResourceManager) CleanUp() error {
func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) {
logger.Debug.Printf("Running plugin %q...", pr.Plugin.Name)
pod, err := rm.CreatePodTemplate(pr)
// we override the plugin name to distinguish the same plugin name from different jobs
if pr.Plugin.JobID != "" {
pod.SetName(fmt.Sprintf("%s-%s", pod.GetName(), pr.Plugin.JobID))
}
if err != nil {
logger.Error.Printf("Failed to create Kubernetes Pod for %q: %q", pr.Plugin.Name, err.Error())
rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(&pr.Plugin).Build())
Expand Down

0 comments on commit d6b49e3

Please sign in to comment.