Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: playbook runners #441

Merged
merged 4 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/flanksource/duty/types"
"github.com/flanksource/kommons"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/samber/lo"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"gorm.io/gorm"
Expand Down Expand Up @@ -96,6 +97,22 @@ func (k Context) User() *models.Person {
return v.(*models.Person)
}

// WithAgent sets the current session's agent in the context
func (k Context) WithAgent(agent models.Agent) Context {
k.GetSpan().SetAttributes(attribute.String("agent-id", agent.ID.String()))
return Context{
Context: k.WithValue("agent", agent),
}
}

func (k Context) Agent() *models.Agent {
v := k.Value("agent")
if v == nil {
return nil
}
return lo.ToPtr(v.(models.Agent))
}

func (k Context) WithTrace() Context {
k.Context = k.Context.WithTrace()
return k
Expand Down
1 change: 1 addition & 0 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
const (
ResourceTypeCheckStatuses = "check_statuses"
ResourceTypeComponent = "components"
ResourceTypePlaybook = "playbook"
ResourceTypeUpstream = "upstream"
)

Expand Down
34 changes: 28 additions & 6 deletions models/playbooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
type PlaybookRunStatus string

const (
PlaybookRunStatusPending PlaybookRunStatus = "pending"
PlaybookRunStatusScheduled PlaybookRunStatus = "scheduled"
PlaybookRunStatusRunning PlaybookRunStatus = "running"
PlaybookRunStatusCancelled PlaybookRunStatus = "cancelled"
PlaybookRunStatusFailed PlaybookRunStatus = "failed"
PlaybookRunStatusCompleted PlaybookRunStatus = "completed"
PlaybookRunStatusFailed PlaybookRunStatus = "failed"
PlaybookRunStatusPending PlaybookRunStatus = "pending" // pending approval
PlaybookRunStatusRunning PlaybookRunStatus = "running"
PlaybookRunStatusScheduled PlaybookRunStatus = "scheduled"
PlaybookRunStatusSleeping PlaybookRunStatus = "sleeping"
PlaybookRunStatusWaiting PlaybookRunStatus = "waiting" // waiting for a consumer
)

// PlaybookRunStatus are statuses for a playbook run and its actions.
Expand All @@ -27,10 +28,17 @@ const (
PlaybookActionStatusCompleted PlaybookActionStatus = "completed"
PlaybookActionStatusFailed PlaybookActionStatus = "failed"
PlaybookActionStatusRunning PlaybookActionStatus = "running"
PlaybookActionStatusScheduled PlaybookActionStatus = "scheduled"
PlaybookActionStatusSkipped PlaybookActionStatus = "skipped"
PlaybookActionStatusSleeping PlaybookActionStatus = "sleeping"
)

var PlaybookActionFinalStates = []PlaybookActionStatus{
PlaybookActionStatusFailed,
PlaybookActionStatusCompleted,
PlaybookActionStatusSkipped,
}

var (
PlaybookRunStatusExecutingGroup = []PlaybookRunStatus{
PlaybookRunStatusRunning,
Expand Down Expand Up @@ -61,7 +69,7 @@ type PlaybookRun struct {
PlaybookID uuid.UUID `json:"playbook_id"`
Status PlaybookRunStatus `json:"status,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty" time_format:"postgres_timestamp" gorm:"<-:false"`
StartTime time.Time `json:"start_time,omitempty" time_format:"postgres_timestamp"`
StartTime *time.Time `json:"start_time,omitempty" time_format:"postgres_timestamp"`
ScheduledTime time.Time `json:"scheduled_time,omitempty" time_format:"postgres_timestamp" gorm:"default:NOW(), NOT NULL"`
EndTime *time.Time `json:"end_time,omitempty" time_format:"postgres_timestamp"`
CreatedBy *uuid.UUID `json:"created_by,omitempty"`
Expand All @@ -85,7 +93,9 @@ type PlaybookRunAction struct {
StartTime time.Time `json:"start_time,omitempty" time_format:"postgres_timestamp" gorm:"default:NOW(), NOT NULL"`
EndTime *time.Time `json:"end_time,omitempty" time_format:"postgres_timestamp"`
Result types.JSONMap `json:"result,omitempty" gorm:"default:null"`
Error string `json:"error,omitempty" gorm:"default:null"`
Error *string `json:"error,omitempty" gorm:"default:null"`
IsPushed bool `json:"is_pushed"`
AgentID *uuid.UUID `json:"agent_id,omitempty"`
}

func (p PlaybookRunAction) AsMap(removeFields ...string) map[string]any {
Expand All @@ -103,3 +113,15 @@ type PlaybookApproval struct {
func (p PlaybookApproval) AsMap(removeFields ...string) map[string]any {
return asMap(p, removeFields...)
}

type PlaybookActionAgentData struct {
ActionID uuid.UUID `json:"action_id"`
RunID uuid.UUID `json:"run_id"`
PlaybookID uuid.UUID `json:"playbook_id"`
Spec types.JSON `json:"spec"`
Env types.JSON `json:"env,omitempty"`
}

func (t *PlaybookActionAgentData) TableName() string {
return "playbook_action_agent_data"
}
63 changes: 58 additions & 5 deletions schema/playbooks.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,41 @@ table "playbook_runs" {
}
}

table "playbook_action_agent_data" {
schema = schema.public
comment = "saves the necessary details for the agent to run a playbook action (eg: template env vars). Only applicable to agent runners."
column "action_id" {
null = false
type = uuid
}
column "playbook_id" {
comment = "saves the linked upstream playbook id"
null = false
type = uuid
}
column "run_id" {
comment = "saves the linked upstream playbook run id"
null = false
type = uuid
}
column "spec" {
comment = "Action spec provided by upstream"
null = false
type = jsonb
}
column "env" {
comment = "templateEnv for the action provided by the upstream"
null = true
type = jsonb
}
foreign_key "playbook_action_template_env_agent_action_id_fkey" {
columns = [column.action_id]
ref_columns = [table.playbook_run_actions.column.id]
on_update = NO_ACTION
on_delete = CASCADE
}
}

table "playbook_run_actions" {
schema = schema.public
column "id" {
Expand All @@ -238,13 +273,13 @@ table "playbook_run_actions" {
default = "running"
}
column "playbook_run_id" {
null = false
type = uuid
null = true
type = uuid
comment = "a run id is mandatory except for an agent"
}
column "start_time" {
null = true
type = timestamptz
default = sql("now()")
null = true
type = timestamptz
}
column "scheduled_time" {
null = false
Expand All @@ -259,6 +294,17 @@ table "playbook_run_actions" {
null = true
type = jsonb
}
column "is_pushed" {
null = false
default = false
type = bool
}
column "agent_id" {
null = true
default = null
type = uuid
comment = "id of the agent that ran this action"
}
column "error" {
null = true
type = text
Expand All @@ -272,4 +318,11 @@ table "playbook_run_actions" {
on_update = NO_ACTION
on_delete = CASCADE
}
check "playbook_action_not_null_run_id" {
expr = <<EOF
(playbook_run_id IS NULL AND agent_id IS NOT NULL) OR
(playbook_run_id IS NOT NULL)
EOF
comment = "a run id is mandatory except for an agent"
}
}
16 changes: 16 additions & 0 deletions upstream/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,21 @@ func InsertUpstreamMsg(ctx context.Context, req *PushData) error {
}
}

for i := range req.PlaybookActions {
updates := map[string]any{
"result": req.PlaybookActions[i].Result,
"end_time": req.PlaybookActions[i].EndTime,
"status": req.PlaybookActions[i].Status,
"error": req.PlaybookActions[i].Error,
}
if err := db.Model(&models.PlaybookRunAction{}).Where("id = ?", req.PlaybookActions[i].ID).Updates(updates).Error; err != nil {
return fmt.Errorf("error updating playbook action [%s]: %w", req.PlaybookActions[i].ID, err)
}

if err := db.Exec("UPDATE playbook_runs SET status = ? WHERE id = (SELECT playbook_run_id FROM playbook_run_actions WHERE id = ?)", models.PlaybookRunStatusScheduled, req.PlaybookActions[i].ID).Error; err != nil {
return fmt.Errorf("error updating playbook run [%s] status to %s : %w", req.PlaybookActions[i].PlaybookRunID, models.PlaybookRunStatusScheduled, err)
}
}

return nil
}
3 changes: 2 additions & 1 deletion upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type PushData struct {
ComponentRelationships []models.ComponentRelationship `json:"component_relationships,omitempty"`
ConfigComponentRelationships []models.ConfigComponentRelationship `json:"config_component_relationships,omitempty"`
Topologies []models.Topology `json:"topologies,omitempty"`
PlaybookActions []models.PlaybookRunAction `json:"playbook_actions,omitempty"`
}

func (p *PushData) String() string {
Expand Down Expand Up @@ -148,7 +149,7 @@ func (p *PushData) Attributes() map[string]any {
func (t *PushData) Count() int {
return len(t.Canaries) + len(t.Checks) + len(t.Components) + len(t.ConfigScrapers) +
len(t.ConfigAnalysis) + len(t.ConfigChanges) + len(t.ConfigItems) + len(t.CheckStatuses) +
len(t.ConfigRelationships) + len(t.ComponentRelationships) + len(t.ConfigComponentRelationships) + len(t.Topologies)
len(t.ConfigRelationships) + len(t.ComponentRelationships) + len(t.ConfigComponentRelationships) + len(t.Topologies) + len(t.PlaybookActions)
}

// ReplaceTopologyID replaces the topology_id for all the components
Expand Down
44 changes: 44 additions & 0 deletions views/018_playbooks.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
-- Notify playbook action created or status updated
CREATE OR REPLACE FUNCTION notify_playbook_action_update() RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
NOTIFY playbook_action_updates;
ELSEIF TG_OP = 'UPDATE' THEN
IF OLD.status != NEW.status AND NEW.status = 'scheduled' THEN
NOTIFY playbook_action_updates;
END IF;
END IF;

RETURN NULL;
END
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER playbook_action_updates
AFTER INSERT OR UPDATE ON playbook_run_actions
FOR EACH ROW
EXECUTE PROCEDURE notify_playbook_action_update();

-- Notify playbook run created or status updated
CREATE OR REPLACE FUNCTION notify_playbook_run_update() RETURNS TRIGGER AS $$
BEGIN
Expand Down Expand Up @@ -33,3 +53,27 @@ CREATE OR REPLACE TRIGGER playbook_updated_trigger
AFTER UPDATE ON playbooks
FOR EACH ROW
EXECUTE PROCEDURE notify_playbook_update();

-- List of all the playbooks that can be run by an agent
DROP VIEW IF EXISTS playbooks_for_agent;

CREATE OR REPLACE VIEW
playbooks_for_agent AS
WITH interim AS (
SELECT
id,
jsonb_array_elements_text(spec -> 'runsOn') AS agent_name
FROM
playbooks
WHERE
spec ->> 'runsOn' IS NOT NULL
)
SELECT
interim.agent_name,
agents.person_id,
agents.id as agent_id,
jsonb_agg(interim.id) AS playbook_ids
FROM
interim
INNER JOIN agents ON interim.agent_name :: TEXT = agents.name
GROUP BY agent_name, agents.person_id, agent_id
Loading