-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add workloads to the plan #571
base: master
Are you sure you want to change the base?
Changes from all commits
c5147ab
3bcacec
df6760e
1549f34
40ce30e
5c8915f
f4d0bb9
4dfa7ee
2dce94f
4694cd7
988a31f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |
"errors" | ||
"fmt" | ||
"io" | ||
"maps" | ||
"os" | ||
"os/exec" | ||
"os/user" | ||
|
@@ -95,6 +96,7 @@ type serviceData struct { | |
manager *ServiceManager | ||
state serviceState | ||
config *plan.Service | ||
workload *Workload | ||
logs *servicelog.RingBuffer | ||
started chan error | ||
stopped chan error | ||
|
@@ -115,13 +117,21 @@ func (m *ServiceManager) doStart(task *state.Task, tomb *tomb.Tomb) error { | |
} | ||
|
||
currentPlan := m.getPlan() | ||
config, ok := currentPlan.Services[request.Name] | ||
if !ok { | ||
config, configFound := currentPlan.Services[request.Name] | ||
if !configFound { | ||
return fmt.Errorf("cannot find service %q in plan", request.Name) | ||
} | ||
|
||
var workload *Workload | ||
if config.Workload != "" { | ||
ws := currentPlan.Sections[WorkloadsField].(*WorkloadsSection) | ||
if workload = ws.Entries[config.Workload]; workload == nil { | ||
return fmt.Errorf("cannot find workload %q for service %q in plan", config.Workload, request.Name) | ||
} | ||
} | ||
|
||
// Create the service object (or reuse the existing one by name). | ||
service, taskLog := m.serviceForStart(config) | ||
service, taskLog := m.serviceForStart(config, workload) | ||
if taskLog != "" { | ||
addTaskLog(task, taskLog) | ||
} | ||
|
@@ -167,7 +177,7 @@ func (m *ServiceManager) doStart(task *state.Task, tomb *tomb.Tomb) error { | |
// and is running. | ||
// | ||
// It also returns a message to add to the task's log, or empty string if none. | ||
func (m *ServiceManager) serviceForStart(config *plan.Service) (service *serviceData, taskLog string) { | ||
func (m *ServiceManager) serviceForStart(config *plan.Service, workload *Workload) (service *serviceData, taskLog string) { | ||
m.servicesLock.Lock() | ||
defer m.servicesLock.Unlock() | ||
|
||
|
@@ -177,17 +187,23 @@ func (m *ServiceManager) serviceForStart(config *plan.Service) (service *service | |
service = &serviceData{ | ||
manager: m, | ||
state: stateInitial, | ||
config: config.Copy(), | ||
logs: servicelog.NewRingBuffer(maxLogBytes), | ||
started: make(chan error, 1), | ||
stopped: make(chan error, 2), // enough for killTimeElapsed to send, and exit if it happens after | ||
} | ||
service.config = config.Copy() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder why we were doing a copy here? As the plan isn't mutated. What do you think about simplifying and not doing a copy on either? CC: @flotter as I think he's thought about this some already. |
||
if workload != nil { | ||
service.workload = workload.copy() | ||
} | ||
m.services[config.Name] = service | ||
return service, "" | ||
} | ||
|
||
// Ensure config is up-to-date from the plan whenever the user starts a service. | ||
service.config = config.Copy() | ||
if workload != nil { | ||
service.workload = workload.copy() | ||
} | ||
|
||
switch service.state { | ||
case stateInitial, stateStarting, stateRunning: | ||
|
@@ -338,17 +354,31 @@ func (s *serviceData) startInternal() error { | |
s.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} | ||
|
||
// Copy environment to avoid updating original. | ||
environment := make(map[string]string) | ||
for k, v := range s.config.Environment { | ||
environment[k] = v | ||
var environment map[string]string | ||
if s.workload != nil && len(s.workload.Environment) > 0 { | ||
environment = maps.Clone(s.workload.Environment) | ||
} else if len(s.config.Environment) > 0 { | ||
environment = maps.Clone(s.config.Environment) | ||
} else { | ||
environment = make(map[string]string) | ||
} | ||
|
||
s.cmd.Dir = s.config.WorkingDir | ||
|
||
// Start as another user if specified in plan. | ||
uid, gid, err := osutil.NormalizeUidGid(s.config.UserID, s.config.GroupID, s.config.User, s.config.Group) | ||
if err != nil { | ||
return err | ||
var uid, gid *int | ||
if s.config.UserID != nil || s.config.GroupID != nil || s.config.User != "" || s.config.Group != "" { | ||
// User/group config from the service takes precedence if any of them are set | ||
uid, gid, err = osutil.NormalizeUidGid(s.config.UserID, s.config.GroupID, s.config.User, s.config.Group) | ||
if err != nil { | ||
return err | ||
} | ||
} else if s.workload != nil { | ||
// Take user/group config from workload | ||
uid, gid, err = osutil.NormalizeUidGid(s.workload.UserID, s.workload.GroupID, s.workload.User, s.workload.Group) | ||
anpep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return err | ||
} | ||
} | ||
if uid != nil && gid != nil { | ||
isCurrent, err := osutil.IsCurrent(*uid, *gid) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -264,7 +264,17 @@ func (m *ServiceManager) Replan() ([][]string, [][]string, error) { | |
if config.Equal(s.config) { | ||
continue | ||
} | ||
s.config = config.Copy() // update service config from plan | ||
// Update service config and workload from plan | ||
s.config = config.Copy() | ||
if s.config.Workload != "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If workload changed from "foo" to "" with the previous update, I think you do want to update There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also if the plan.Service is the same, and the workload changed, replan will not restart the service. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was going to comment the same thing as Fred: don't we want to also restart the service if the workload details have changed (its user/group), not just the service.workload name? |
||
ws, ok := currentPlan.Sections[WorkloadsField].(*WorkloadsSection) | ||
if !ok { | ||
return nil, nil, fmt.Errorf("internal error: invalid section type %T", ws) | ||
} | ||
s.workload = ws.Entries[s.config.Workload].copy() | ||
} else { | ||
s.workload = nil | ||
} | ||
} | ||
needsRestart[name] = true | ||
stop = append(stop, name) | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,217 @@ | ||||||||||||
// Copyright (c) 2025 Canonical Ltd | ||||||||||||
// | ||||||||||||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||
// you may not use this file except in compliance with the License. | ||||||||||||
// You may obtain a copy of the License at | ||||||||||||
// | ||||||||||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||
// | ||||||||||||
// Unless required by applicable law or agreed to in writing, software | ||||||||||||
// distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||
// See the License for the specific language governing permissions and | ||||||||||||
// limitations under the License. | ||||||||||||
|
||||||||||||
package servstate | ||||||||||||
|
||||||||||||
import ( | ||||||||||||
"bytes" | ||||||||||||
"errors" | ||||||||||||
"fmt" | ||||||||||||
"maps" | ||||||||||||
|
||||||||||||
"gopkg.in/yaml.v3" | ||||||||||||
|
||||||||||||
"github.com/canonical/pebble/internals/osutil" | ||||||||||||
"github.com/canonical/pebble/internals/plan" | ||||||||||||
) | ||||||||||||
|
||||||||||||
var _ plan.SectionExtension = (*WorkloadsSectionExtension)(nil) | ||||||||||||
|
||||||||||||
type WorkloadsSectionExtension struct{} | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would argue this shouldn't be in Naming suggestions:
|
||||||||||||
|
||||||||||||
func (ext *WorkloadsSectionExtension) CombineSections(sections ...plan.Section) (plan.Section, error) { | ||||||||||||
ws := &WorkloadsSection{} | ||||||||||||
for _, section := range sections { | ||||||||||||
layer, ok := section.(*WorkloadsSection) | ||||||||||||
if !ok { | ||||||||||||
return nil, fmt.Errorf("internal error: invalid section type %T", layer) | ||||||||||||
} | ||||||||||||
if err := ws.combine(layer); err != nil { | ||||||||||||
return nil, err | ||||||||||||
} | ||||||||||||
} | ||||||||||||
return ws, nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (ext *WorkloadsSectionExtension) ParseSection(data yaml.Node) (plan.Section, error) { | ||||||||||||
ws := &WorkloadsSection{} | ||||||||||||
// The following issue prevents us from using the yaml.Node decoder | ||||||||||||
// with KnownFields = true behavior. Once one of the proposals get | ||||||||||||
// merged, we can remove the intermediate Marshal step. | ||||||||||||
anpep marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
// https://github.com/go-yaml/yaml/issues/460 | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it worth factoring the following yaml.Marshal + KnownFields + Decode into a helper function and using it from the couple of places we need it, just to avoid duplication (of the comment and bug link too)? Could be |
||||||||||||
if len(data.Content) != 0 { | ||||||||||||
yml, err := yaml.Marshal(data) | ||||||||||||
if err != nil { | ||||||||||||
return nil, fmt.Errorf(`internal error: cannot marshal "workloads" section: %w`, err) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've seen people quote with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I did not understand correctly, but Lines 732 to 736 in e3b81e1
|
||||||||||||
} | ||||||||||||
dec := yaml.NewDecoder(bytes.NewReader(yml)) | ||||||||||||
dec.KnownFields(true) | ||||||||||||
if err = dec.Decode(ws); err != nil { | ||||||||||||
return nil, &plan.FormatError{ | ||||||||||||
Message: fmt.Sprintf(`cannot parse the "workloads" section: %v`, err), | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
for name, workload := range ws.Entries { | ||||||||||||
if workload != nil { | ||||||||||||
workload.Name = name | ||||||||||||
} | ||||||||||||
} | ||||||||||||
return ws, nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (ext *WorkloadsSectionExtension) ValidatePlan(p *plan.Plan) error { | ||||||||||||
ws, ok := p.Sections[WorkloadsField].(*WorkloadsSection) | ||||||||||||
if !ok { | ||||||||||||
return fmt.Errorf("internal error: invalid section type %T", ws) | ||||||||||||
} | ||||||||||||
for name, service := range p.Services { | ||||||||||||
if service.Workload == "" { | ||||||||||||
continue | ||||||||||||
} | ||||||||||||
if _, ok := ws.Entries[service.Workload]; !ok { | ||||||||||||
return &plan.FormatError{ | ||||||||||||
Message: fmt.Sprintf(`plan service %q workload not defined: %q`, name, service.Workload), | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
for name, workload := range ws.Entries { | ||||||||||||
if _, _, err := osutil.NormalizeUidGid(workload.UserID, workload.GroupID, workload.User, workload.Group); err != nil { | ||||||||||||
return &plan.FormatError{ | ||||||||||||
Message: fmt.Sprintf(`plan workload %q %v`, err, name), | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
anpep marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
return nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
const WorkloadsField = "workloads" | ||||||||||||
|
||||||||||||
var _ plan.Section = (*WorkloadsSection)(nil) | ||||||||||||
|
||||||||||||
type WorkloadsSection struct { | ||||||||||||
Entries map[string]*Workload `yaml:",inline"` | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (ws *WorkloadsSection) IsZero() bool { | ||||||||||||
anpep marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
return len(ws.Entries) == 0 | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (ws *WorkloadsSection) Validate() error { | ||||||||||||
for name, workload := range ws.Entries { | ||||||||||||
if workload == nil { | ||||||||||||
return &plan.FormatError{ | ||||||||||||
Message: fmt.Sprintf("workload %q cannot have a null value", name), | ||||||||||||
} | ||||||||||||
} | ||||||||||||
if err := workload.validate(); err != nil { | ||||||||||||
return &plan.FormatError{ | ||||||||||||
Message: fmt.Sprintf("workload %q %v", name, err), | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
return nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (ws *WorkloadsSection) combine(other *WorkloadsSection) error { | ||||||||||||
for name, workload := range other.Entries { | ||||||||||||
if ws.Entries == nil { | ||||||||||||
ws.Entries = make(map[string]*Workload, len(other.Entries)) | ||||||||||||
} | ||||||||||||
switch workload.Override { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if there's same way we can factor this logic out into a helper function (likely in the |
||||||||||||
case plan.MergeOverride: | ||||||||||||
if current, ok := ws.Entries[name]; ok { | ||||||||||||
copied := current.copy() | ||||||||||||
copied.merge(workload) | ||||||||||||
ws.Entries[name] = copied | ||||||||||||
break | ||||||||||||
} | ||||||||||||
fallthrough | ||||||||||||
case plan.ReplaceOverride: | ||||||||||||
ws.Entries[name] = workload.copy() | ||||||||||||
case plan.UnknownOverride: | ||||||||||||
return &plan.FormatError{ | ||||||||||||
Message: fmt.Sprintf(`workload %q must define an "override" policy`, name), | ||||||||||||
} | ||||||||||||
default: | ||||||||||||
return &plan.FormatError{ | ||||||||||||
Message: fmt.Sprintf(`workload %q has an invalid "override" policy: %q`, name, workload.Override), | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
return nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
type Workload struct { | ||||||||||||
// Basic details | ||||||||||||
Name string `yaml:"-"` | ||||||||||||
Override plan.Override `yaml:"override,omitempty"` | ||||||||||||
|
||||||||||||
// Options for command execution | ||||||||||||
Environment map[string]string `yaml:"environment,omitempty"` | ||||||||||||
UserID *int `yaml:"user-id,omitempty"` | ||||||||||||
User string `yaml:"user,omitempty"` | ||||||||||||
GroupID *int `yaml:"group-id,omitempty"` | ||||||||||||
Group string `yaml:"group,omitempty"` | ||||||||||||
anpep marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
} | ||||||||||||
|
||||||||||||
func (w *Workload) validate() error { | ||||||||||||
if w.Name == "" { | ||||||||||||
return errors.New("cannot have an empty name") | ||||||||||||
} | ||||||||||||
// Value of Override is checked in the (*WorkloadSection).combine() method | ||||||||||||
anpep marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
return nil | ||||||||||||
benhoyt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
} | ||||||||||||
|
||||||||||||
func (w *Workload) copy() *Workload { | ||||||||||||
copied := *w | ||||||||||||
copied.Environment = maps.Clone(w.Environment) | ||||||||||||
copied.UserID = copyPtr(w.UserID) | ||||||||||||
copied.GroupID = copyPtr(w.GroupID) | ||||||||||||
return &copied | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (w *Workload) merge(other *Workload) { | ||||||||||||
if len(other.Environment) > 0 { | ||||||||||||
w.Environment = makeMapIfNil(w.Environment) | ||||||||||||
maps.Copy(w.Environment, other.Environment) | ||||||||||||
} | ||||||||||||
if other.UserID != nil { | ||||||||||||
anpep marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
w.UserID = copyPtr(other.UserID) | ||||||||||||
} | ||||||||||||
if other.User != "" { | ||||||||||||
w.User = other.User | ||||||||||||
} | ||||||||||||
if other.GroupID != nil { | ||||||||||||
w.GroupID = copyPtr(other.GroupID) | ||||||||||||
} | ||||||||||||
if other.Group != "" { | ||||||||||||
w.Group = other.Group | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
func copyPtr[T any](p *T) *T { | ||||||||||||
if p == nil { | ||||||||||||
return nil | ||||||||||||
} | ||||||||||||
copied := *p | ||||||||||||
return &copied | ||||||||||||
} | ||||||||||||
|
||||||||||||
func makeMapIfNil[K comparable, V any](m map[K]V) map[K]V { | ||||||||||||
if m == nil { | ||||||||||||
m = make(map[K]V) | ||||||||||||
} | ||||||||||||
return m | ||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't that be done in
plan.Validate
, that is, thep.Validate()
call above? Or isn't that done automatically by theValidatePlan
method on the extension?