Skip to content

Commit

Permalink
Merge pull request #111 from waggle-sensor/develop
Browse files Browse the repository at this point in the history
Bug fixes
  • Loading branch information
gemblerz authored Nov 21, 2023
2 parents 2bf6583 + d6b49e3 commit 4dc5f6d
Show file tree
Hide file tree
Showing 12 changed files with 2,495 additions and 698 deletions.
6 changes: 3 additions & 3 deletions data/jobs/a_policy_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ plugins:
- -c
- "echo hello world; sleep 5; echo bye"
nodes:
W023:
W097:
scienceRules:
- "myfirstapp: True"
- "mysecondapp: True"
- "schedule(myfirstapp): True"
- "schedule(mysecondapp): True"
#- "myfirstapp: cronjob('myfirstapp', '0 * * * *')"
successcriteria:
- WallClock(1d)
19 changes: 12 additions & 7 deletions pkg/cloudscheduler/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ func (api *APIServer) handlerSubmitJobs(w http.ResponseWriter, r *http.Request)
return
}
}
errorList := api.cloudScheduler.ValidateJobAndCreateScienceGoal(queries.Get("id"), user, flagDryRun)
// TODO: we should not commit to change on the existing goal of job when --dry-run is given
errorList := api.cloudScheduler.ValidateJobAndCreateScienceGoalForExistingJob(queries.Get("id"), user, flagDryRun)
if len(errorList) > 0 {
response := datatype.NewAPIMessageBuilder().AddError(fmt.Sprintf("%v", errorList)).Build()
respondJSON(w, http.StatusBadRequest, response.ToJson())
Expand Down Expand Up @@ -362,21 +363,25 @@ func (api *APIServer) handlerSubmitJobs(w http.ResponseWriter, r *http.Request)
return
}
newJob.User = user.GetUserName()
jobID := api.cloudScheduler.GoalManager.AddJob(newJob)
errorList := api.cloudScheduler.ValidateJobAndCreateScienceGoal(jobID, user, flagDryRun)
sg, errorList := api.cloudScheduler.ValidateJobAndCreateScienceGoal(newJob, user)
if len(errorList) > 0 {
response := datatype.NewAPIMessageBuilder().
AddEntity("job_id", jobID).
AddEntity("message", "job is added, but failed to be validated. Please edit the job and try again.").
AddEntity("job_name", newJob.Name).
AddEntity("message", "validation failed. Please revise the job and try again.").
AddError(fmt.Sprintf("%v", errorList)).Build()
respondJSON(w, http.StatusBadRequest, response.ToJson())
return
} else {
response := datatype.NewAPIMessageBuilder().AddEntity("job_id", jobID)
newJob.ScienceGoal = sg
response := datatype.NewAPIMessageBuilder().AddEntity("job_name", newJob.Name)
if flagDryRun {
response = response.AddEntity("dryrun", true)
} else {
response = response.AddEntity("state", datatype.JobSubmitted)
jobID := api.cloudScheduler.GoalManager.AddJob(newJob)
newJob.UpdateJobID(jobID)
api.cloudScheduler.GoalManager.UpdateJob(newJob, true)
response = response.AddEntity("job_id", jobID).
AddEntity("state", datatype.JobSubmitted)
}
respondJSON(w, http.StatusOK, response.Build().ToJson())
return
Expand Down
46 changes: 34 additions & 12 deletions pkg/cloudscheduler/cloudscheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,22 @@ func (cs *CloudScheduler) Configure() error {
return nil
}

func (cs *CloudScheduler) ValidateJobAndCreateScienceGoal(jobID string, user *User, dryrun bool) (errorList []error) {
job, err := cs.GoalManager.GetJob(jobID)
if err != nil {
return []error{err}
}
func (cs *CloudScheduler) ValidateJobAndCreateScienceGoal(job *datatype.Job, user *User) (scienceGoal *datatype.ScienceGoal, errorList []error) {
scienceGoalBuilder := datatype.NewScienceGoalBuilder(job.Name, job.JobID)
logger.Info.Printf("Validating %s...", job.Name)
// Step 1: Resolve node tags
job.AddNodes(cs.Validator.GetNodeNamesByTags(job.NodeTags))
// TODO: Jobs may be submitted without nodes in the future
// For example, Chicago nodes without having any node in Chicago yet
if len(job.Nodes) < 1 {
return []error{fmt.Errorf("Node is not selected")}
errorList = append(errorList, fmt.Errorf("Node is not selected"))
return
}
// Check if email is set for notification
if len(job.NotificationOn) > 0 {
if job.Email == "" {
return []error{fmt.Errorf("No email is set for notification")}
errorList = append(errorList, fmt.Errorf("No email is set for notification"))
return
}
// Check if given notification types are valid
for _, s := range job.NotificationOn {
Expand Down Expand Up @@ -121,7 +119,17 @@ func (cs *CloudScheduler) ValidateJobAndCreateScienceGoal(jobID string, user *Us
errorList = append(errorList, fmt.Errorf("%s does not exist", nodeName))
continue
}
// pluginNameForDuplication checks if plugin names are duplicate
pluginNameForDuplication := map[string]bool{}
for _, plugin := range job.Plugins {
if !cs.Validator.IsPluginNameValid(plugin.Name) {
errorList = append(errorList, fmt.Errorf("plugin name %q must consist of up to 256 alphanumeric characters with '-' or '.' in the middle, RFC1123", plugin.Name))
continue
}
if _, found := pluginNameForDuplication[plugin.Name]; found {
errorList = append(errorList, fmt.Errorf("the plugin name %q is duplicated. plugin names must be unique", plugin.Name))
continue
}
pluginImage, err := plugin.GetPluginImage()
if err != nil {
errorList = append(errorList, fmt.Errorf("%s does not specify plugin image", plugin.Name))
Expand Down Expand Up @@ -162,7 +170,6 @@ func (cs *CloudScheduler) ValidateJobAndCreateScienceGoal(jobID string, user *Us
continue
}
logger.Info.Printf("%s passed Check 3", plugin.Name)

// Check 4: the required resource is available in node devices
// for _, c := range supportedComputes {
// supported, _ := c.GetUnsupportedPluginProfiles(pluginManifest)
Expand All @@ -179,6 +186,7 @@ func (cs *CloudScheduler) ValidateJobAndCreateScienceGoal(jobID string, user *Us
// }
// }
approvedPlugins = append(approvedPlugins, plugin)
pluginNameForDuplication[plugin.Name] = true
}
// Check 4: conditions of job are valid

Expand All @@ -196,21 +204,35 @@ func (cs *CloudScheduler) ValidateJobAndCreateScienceGoal(jobID string, user *Us
scienceGoalBuilder = scienceGoalBuilder.AddSubGoal(nodeName, approvedPlugins, rules)
}
if len(errorList) > 0 {
logger.Info.Printf("Validation failed for Job ID %q: %v", jobID, errorList)
return errorList
logger.Info.Printf("Validation failed for Job %q: %v", job.Name, errorList)
} else {
scienceGoal = scienceGoalBuilder.Build()
logger.Info.Printf("A new goal %q is generated for Job %q", scienceGoal.ID, job.Name)
}
return
}

func (cs *CloudScheduler) ValidateJobAndCreateScienceGoalForExistingJob(jobID string, user *User, dryrun bool) (errorList []error) {
job, err := cs.GoalManager.GetJob(jobID)
if err != nil {
return []error{err}
}
sg, errorList := cs.ValidateJobAndCreateScienceGoal(job, user)
if len(errorList) > 0 {
return
}
logger.Info.Printf("Updating science goal for JOB ID %q", jobID)
if job.ScienceGoal != nil {
logger.Info.Printf("job ID %q has an existing goal %q. dropping it first...", jobID, job.ScienceGoal.ID)
cs.GoalManager.RemoveScienceGoal(job.ScienceGoal.ID)
}
job.ScienceGoal = scienceGoalBuilder.Build()
job.ScienceGoal = sg
if dryrun {
cs.GoalManager.UpdateJob(job, false)
} else {
cs.GoalManager.UpdateJob(job, true)
}
return nil
return
}

func (cs *CloudScheduler) updateNodes(nodes []string) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/cloudscheduler/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ func (jv *JobValidator) GetPluginManifest(pluginImage string, updateDBIfNotExist
}
}

// IsPluginNameValid checks if given plugin name is valid.
// Plugin name must follow RFC 1123.
// Reference: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-subdomain-names
func (jv *JobValidator) IsPluginNameValid(name string) bool {
// the maximum length allowed is 256, but the scheduler may use several characters
// to indicate job ID when it names a plugin, thus reduce length of user plugins to 200
if len(name) > 200 {
return false
}
var validNamePattern = regexp.MustCompile("^[a-z0-9-]+$")
return validNamePattern.MatchString(name)
}

// LoadDatabase loads node and plugin manifests
// this function should be called only at initialization
func (jv *JobValidator) LoadDatabase() error {
Expand Down
7 changes: 7 additions & 0 deletions pkg/datatype/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ func (j *Job) DropNode(nodeName string) {
}
}

func (j *Job) UpdateJobID(id string) {
j.JobID = id
if j.ScienceGoal != nil {
j.ScienceGoal.JobID = id
}
}

// ConvertToTemplate returns a new job object that contains the job description. The new job object does
// not contain user-specific information such as job ID, owner, email, etc. This is mostly useful
// when someone creates a new job from an existing job.
Expand Down
1 change: 1 addition & 0 deletions pkg/datatype/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Plugin struct {
Name string `json:"name" yaml:"name"`
PluginSpec *PluginSpec `json:"plugin_spec" yaml:"pluginSpec,omitempty"`
GoalID string `json:"goal_id,omitempty" yaml:"goalID,omitempty"`
JobID string `json:"job_id,omitempty" yaml:"jobID,omitempty"`
}

func (p *Plugin) GetPluginImage() (string, error) {
Expand Down
43 changes: 24 additions & 19 deletions pkg/nodescheduler/nodescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func (ns *NodeScheduler) Run() {
pr := datatype.NewPluginRuntimeWithScienceRule(_p, *r)
// TODO: we enable plugin-controller always. we will want to control this later.
pr.SetPluginController(true)
// we name Kubenetes Pod of the plugin using << pluginName-jobID >>
// to distinguish the same plugin names from different jobs
pr.Plugin.JobID = sg.JobID
if _pr := ns.waitingQueue.Pop(pr); _pr != nil {
ns.readyQueue.Push(pr)
triggerScheduling = true
Expand Down Expand Up @@ -269,27 +272,29 @@ func (ns *NodeScheduler) registerGoal(goal *datatype.ScienceGoal) {

func (ns *NodeScheduler) cleanUpGoal(goal *datatype.ScienceGoal) {
ns.Knowledgebase.DropRules(goal.Name)
for _, p := range goal.GetMySubGoal(ns.NodeID).GetPlugins() {
_p := *p
pr := &datatype.PluginRuntime{
Plugin: _p,
}
if a := ns.waitingQueue.Pop(pr); a != nil {
logger.Debug.Printf("plugin %s is removed from the waiting queue", p.Name)
}
if a := ns.readyQueue.Pop(pr); a != nil {
logger.Debug.Printf("plugin %s is removed from the ready queue", p.Name)
}
if a := ns.scheduledPlugins.Pop(pr); a != nil {
if pod, err := ns.ResourceManager.GetPod(a.Plugin.Name); err != nil {
logger.Error.Printf("Failed to get pod of the plugin %q", a.Plugin.Name)
} else {
e := datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddPluginMeta(&a.Plugin).AddPodMeta(pod).AddReason("Cleaning up the plugin due to deletion of the goal").Build()
ns.LogToBeehive.SendWaggleMessageOnNodeAsync(e.ToWaggleMessage(), "all")
if mySubGoal := goal.GetMySubGoal(ns.NodeID); mySubGoal != nil {
for _, p := range goal.GetMySubGoal(ns.NodeID).GetPlugins() {
_p := *p
pr := &datatype.PluginRuntime{
Plugin: _p,
}
ns.ResourceManager.RemovePlugin(&a.Plugin)
logger.Debug.Printf("plugin %s is removed from running", p.Name)
if a := ns.waitingQueue.Pop(pr); a != nil {
logger.Debug.Printf("plugin %s is removed from the waiting queue", p.Name)
}
if a := ns.readyQueue.Pop(pr); a != nil {
logger.Debug.Printf("plugin %s is removed from the ready queue", p.Name)
}
if a := ns.scheduledPlugins.Pop(pr); a != nil {
if pod, err := ns.ResourceManager.GetPod(a.Plugin.Name); err != nil {
logger.Error.Printf("Failed to get pod of the plugin %q", a.Plugin.Name)
} else {
e := datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddPluginMeta(&a.Plugin).AddPodMeta(pod).AddReason("Cleaning up the plugin due to deletion of the goal").Build()
ns.LogToBeehive.SendWaggleMessageOnNodeAsync(e.ToWaggleMessage(), "all")
}
ns.ResourceManager.RemovePlugin(&a.Plugin)
logger.Debug.Printf("plugin %s is removed from running", p.Name)

}
}
}
ns.GoalManager.DropGoal(goal.ID)
Expand Down
21 changes: 14 additions & 7 deletions pkg/nodescheduler/resourcemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,12 @@ func generateRandomString(n int) string {

func (rm *ResourceManager) labelsForPlugin(plugin *datatype.Plugin) map[string]string {
labels := map[string]string{
"app": plugin.Name,
"app.kubernetes.io/name": plugin.Name,
"app.kubernetes.io/managed-by": rm.runner,
"app.kubernetes.io/created-by": rm.runner,
"sagecontinuum.org/plugin-job": plugin.PluginSpec.Job,
"sagecontinuum.org/plugin-task": plugin.Name,
"sagecontinuum.org/plugin-instance": plugin.Name + "-" + generateRandomString(6),
"app": plugin.Name,
"app.kubernetes.io/name": plugin.Name,
"app.kubernetes.io/managed-by": rm.runner,
"app.kubernetes.io/created-by": rm.runner,
"sagecontinuum.org/plugin-job": plugin.PluginSpec.Job,
"sagecontinuum.org/plugin-task": plugin.Name,
}

// in develop mode, we omit the role labels to opt out of network traffic filtering
Expand Down Expand Up @@ -731,6 +730,10 @@ func (rm *ResourceManager) CreatePodTemplate(pr *datatype.PluginRuntime) (*apiv1
if err != nil {
return nil, err
}
// add instance label to distinguish between Pods of the same plugin
// reference on the fact that Pods are not designed to be updated
// https://github.com/kubernetes/kubernetes/issues/24913#issuecomment-694817890
template.Labels["sagecontinuum.org/plugin-instance"] = pr.Plugin.Name + "-" + generateRandomString(6)
template.Spec.RestartPolicy = apiv1.RestartPolicyNever
return &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1148,6 +1151,10 @@ func (rm *ResourceManager) CleanUp() error {
func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) {
logger.Debug.Printf("Running plugin %q...", pr.Plugin.Name)
pod, err := rm.CreatePodTemplate(pr)
// we override the plugin name to distinguish the same plugin name from different jobs
if pr.Plugin.JobID != "" {
pod.SetName(fmt.Sprintf("%s-%s", pod.GetName(), pr.Plugin.JobID))
}
if err != nil {
logger.Error.Printf("Failed to create Kubernetes Pod for %q: %q", pr.Plugin.Name, err.Error())
rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(&pr.Plugin).Build())
Expand Down
Loading

0 comments on commit 4dc5f6d

Please sign in to comment.