Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: backend workflow validation #2013

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions keep-ui/app/workflows/models.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export type Workflow = {
providers: Provider[];
triggers: Trigger[];
disabled:boolean,
is_valid: boolean,
last_execution_time: string;
last_execution_status: string;
last_updated: string;
Expand Down
14 changes: 10 additions & 4 deletions keep-ui/app/workflows/workflow-tile.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import SlidingPanel from "react-sliding-side-panel";
import { useFetchProviders } from "app/providers/page.client";
import { Provider as FullProvider } from "app/providers/providers";
import "./workflow-tile.css";
import { CheckCircleIcon, XCircleIcon } from "@heroicons/react/24/outline";
import { CheckCircleIcon, XCircleIcon, InformationCircleIcon } from "@heroicons/react/24/outline";
import AlertTriggerModal from "./workflow-run-with-alert-modal";
import { formatDistanceToNowStrict } from "date-fns";
import TimeAgo, { Formatter, Suffix, Unit } from "react-timeago";
Expand Down Expand Up @@ -551,7 +551,7 @@ function WorkflowTile({ workflow }: { workflow: Workflow }) {
</div>
)}
<Card
className="relative flex flex-col justify-between bg-white rounded shadow p-2 h-full hover:border-orange-400 hover:border-2 overflow-hidden"
className={`relative flex flex-col justify-between bg-white rounded shadow p-2 h-full ${workflow.is_valid ? 'hover:border-orange-400' : 'hover:border-red-400'} hover:border-2 overflow-hidden`}
onClick={(e) => {
e.stopPropagation();
e.preventDefault();
Expand Down Expand Up @@ -581,14 +581,20 @@ function WorkflowTile({ workflow }: { workflow: Workflow }) {
<div className="m-2 flex flex-col justify-around item-start flex-wrap">
<WorkflowGraph workflow={workflow} />
<div className="container flex flex-col space-between">
<div className="h-24 cursor-default">
<div className="h-20 cursor-default">
<h2 className="truncate leading-6 font-bold text-base md:text-lg lg:text-xl">
{workflow?.name || "Unkown"}
{workflow?.name || "Unknown"}
</h2>
<p className="text-gray-500 line-clamp-2">
{workflow?.description || "no description"}
</p>

</div>
{!workflow.is_valid ?
<div className={"text-red-900 flex items-center -ml-1 h-9"}>
<Icon icon={InformationCircleIcon} color={"red"} size={"sm"}/> Invalid workflow Configuration
</div> : <div className={"h-9"}></div>
}
<div className="flex flex-row justify-between items-center gap-1 flex-wrap text-sm">
{!!workflow?.interval && (
<Button
Expand Down
2 changes: 1 addition & 1 deletion keep-ui/utils/hooks/useWorkflowInitialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ const useWorkflowInitialization = (
const initializeWorkflow = async () => {
setIsLoading(true);
let parsedWorkflow = definition?.value;
const name = parsedWorkflow?.properties?.name || parsedWorkflow?.properties?.id;
const name = parsedWorkflow?.properties?.name || '';

const sequences = [
{
Expand Down
16 changes: 12 additions & 4 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def get_workflows_that_should_run():
.filter(Workflow.is_deleted == False)
.filter(Workflow.is_disabled == False)
.filter(Workflow.interval != None)
.filter(Workflow.is_valid == True)
.filter(Workflow.interval > 0)
.all()
)
Expand Down Expand Up @@ -312,22 +313,24 @@ def add_or_update_workflow(
interval,
workflow_raw,
is_disabled,
is_valid,
provisioned=False,
provisioned_file=None,
updated_by=None,
) -> Workflow:
with Session(engine, expire_on_commit=False) as session:
# TODO: we need to better understanad if that's the right behavior we want
existing_workflow = (
existing_workflow: Workflow = (
session.query(Workflow)
.filter_by(name=name)
.filter_by(id=id)
.filter_by(tenant_id=tenant_id)
.first()
)

if existing_workflow:
# tb: no need to override the id field here because it has foreign key constraints.
existing_workflow.tenant_id = tenant_id
existing_workflow.name = name
existing_workflow.description = description
existing_workflow.updated_by = (
updated_by or existing_workflow.updated_by
Expand All @@ -337,6 +340,7 @@ def add_or_update_workflow(
existing_workflow.revision += 1 # Increment the revision
existing_workflow.last_updated = datetime.now() # Update last_updated
existing_workflow.is_deleted = False
existing_workflow.is_valid = is_valid
existing_workflow.is_disabled = is_disabled
existing_workflow.provisioned = provisioned
existing_workflow.provisioned_file = provisioned_file
Expand All @@ -352,6 +356,7 @@ def add_or_update_workflow(
updated_by=updated_by, # Set updated_by to the provided value
interval=interval,
is_disabled=is_disabled,
is_valid=is_valid,
workflow_raw=workflow_raw,
provisioned=provisioned,
provisioned_file=provisioned_file,
Expand Down Expand Up @@ -546,7 +551,7 @@ def get_workflow(tenant_id: str, workflow_id: str) -> Workflow:
workflow = session.exec(
select(Workflow)
.where(Workflow.tenant_id == tenant_id)
.where(Workflow.name == workflow_id)
.where(Workflow.id == workflow_id)
.where(Workflow.is_deleted == False)
).first()
if not workflow:
Expand Down Expand Up @@ -2806,7 +2811,9 @@ def get_incident_unique_fingerprint_count(tenant_id: str, incident_id: str) -> i
).scalar()


def get_last_alerts_for_incidents(incident_ids: List[str | UUID]) -> Dict[str, List[Alert]]:
def get_last_alerts_for_incidents(
incident_ids: List[str | UUID],
) -> Dict[str, List[Alert]]:
with Session(engine) as session:
query = (
session.query(
Expand All @@ -2828,6 +2835,7 @@ def get_last_alerts_for_incidents(incident_ids: List[str | UUID]) -> Dict[str, L

return incidents_alerts


def remove_alerts_to_incident_by_incident_id(
tenant_id: str, incident_id: str | UUID, alert_ids: List[UUID]
) -> Optional[int]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""is_valid flag to workflow

Revision ID: b4caf6779ec1
Revises: 493f217af6b6
Create Date: 2024-09-26 19:43:45.169045

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "b4caf6779ec1"
down_revision = "01ebe17218c0"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###

with op.batch_alter_table("workflow", schema=None) as batch_op:
# Add the column with default=True for new records
batch_op.add_column(
sa.Column(
"is_valid", sa.Boolean(), nullable=False, server_default=sa.true()
)
)

# Update existing rows to set 'is_valid' to True
op.execute("UPDATE workflow SET is_valid = TRUE")

# Remove the server_default as it's not needed after the column is populated for old records
with op.batch_alter_table("workflow", schema=None) as batch_op:
batch_op.alter_column("is_valid", server_default=None)


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("workflow", schema=None) as batch_op:
batch_op.drop_column("is_valid")
1 change: 1 addition & 0 deletions keep/api/models/db/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Workflow(SQLModel, table=True):
workflow_raw: str = Field(sa_column=Column(TEXT))
is_deleted: bool = Field(default=False)
is_disabled: bool = Field(default=False)
is_valid: bool = Field(default=True)
revision: int = Field(default=1, nullable=False)
last_updated: datetime = Field(default_factory=datetime.utcnow)
provisioned: bool = Field(default=False)
Expand Down
3 changes: 2 additions & 1 deletion keep/api/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class WorkflowDTO(BaseModel):
workflow_raw: str
revision: int = 1
last_updated: datetime = None
invalid: bool = False # whether the workflow is invalid or not (for UI purposes)
is_valid: bool
last_executions: List[dict] = None
last_execution_started: datetime = None
provisioned: bool = False
Expand Down Expand Up @@ -118,4 +118,5 @@ class WorkflowExecutionDTO(BaseModel):
class WorkflowCreateOrUpdateDTO(BaseModel):
workflow_id: str
status: Literal["created", "updated"]
is_valid: bool
revision: int = 1
78 changes: 64 additions & 14 deletions keep/api/routes/workflows.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
import logging
import os
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

import validators
import yaml
Expand Down Expand Up @@ -129,6 +129,7 @@ def get_workflows(
interval=workflow.interval,
providers=providers_dto,
triggers=triggers,
is_valid=workflow.is_valid,
workflow_raw=workflow.workflow_raw,
revision=workflow.revision,
last_updated=workflow.last_updated,
Expand Down Expand Up @@ -237,7 +238,9 @@ async def run_workflow_from_definition(
) -> dict:
tenant_id = authenticated_entity.tenant_id
created_by = authenticated_entity.email
workflow = await __get_workflow_raw_data(request, file)
workflow, is_valid = await __get_workflow_raw_data(
request, file, tenant_id=tenant_id
)
workflowstore = WorkflowStore()
workflowmanager = WorkflowManager.get_instance()
try:
Expand Down Expand Up @@ -273,7 +276,9 @@ async def run_workflow_from_definition(
return workflow_execution


async def __get_workflow_raw_data(request: Request, file: UploadFile) -> dict:
async def __get_workflow_raw_data(
request: Request | None, file: UploadFile, tenant_id
) -> Tuple[dict, bool]:
try:
# we support both File upload (from frontend) or raw yaml (e.g. curl)
if file:
Expand All @@ -291,7 +296,20 @@ async def __get_workflow_raw_data(request: Request, file: UploadFile) -> dict:
except yaml.YAMLError:
logger.exception("Invalid YAML format")
raise HTTPException(status_code=400, detail="Invalid YAML format")
return workflow_data

try:
parser = Parser()
parser.parse(tenant_id=tenant_id, parsed_workflow_yaml=workflow_data)
workflow_data["id"] = parser._get_workflow_id(tenant_id, workflow_data)
is_valid = True
except Exception as e:
is_valid = False
logger.error(
f"Error while validating workflow yaml {workflow_data['id']}",
extra={"exception": e},
)

return workflow_data, is_valid


@router.post(
Expand All @@ -305,12 +323,19 @@ async def create_workflow(
) -> WorkflowCreateOrUpdateDTO:
tenant_id = authenticated_entity.tenant_id
created_by = authenticated_entity.email
workflow = await __get_workflow_raw_data(request=None, file=file)
workflow, is_valid = await __get_workflow_raw_data(
request=None, file=file, tenant_id=tenant_id
)

workflowstore = WorkflowStore()

# Create the workflow
try:
workflow = workflowstore.create_workflow(
tenant_id=tenant_id, created_by=created_by, workflow=workflow
tenant_id=tenant_id,
created_by=created_by,
workflow=workflow,
is_valid=is_valid,
)
except Exception:
logger.exception(
Expand All @@ -323,11 +348,17 @@ async def create_workflow(
)
if workflow.revision == 1:
return WorkflowCreateOrUpdateDTO(
workflow_id=workflow.id, status="created", revision=workflow.revision
workflow_id=workflow.id,
status="created",
revision=workflow.revision,
is_valid=is_valid,
)
else:
return WorkflowCreateOrUpdateDTO(
workflow_id=workflow.id, status="updated", revision=workflow.revision
workflow_id=workflow.id,
status="updated",
revision=workflow.revision,
is_valid=is_valid,
)


Expand All @@ -344,12 +375,19 @@ async def create_workflow_from_body(
) -> WorkflowCreateOrUpdateDTO:
tenant_id = authenticated_entity.tenant_id
created_by = authenticated_entity.email
workflow = await __get_workflow_raw_data(request, None)
workflow, is_valid = await __get_workflow_raw_data(
request, None, tenant_id=tenant_id
)
print("I AM HERE: ", workflow)
workflowstore = WorkflowStore()

# Create the workflow
try:
workflow = workflowstore.create_workflow(
tenant_id=tenant_id, created_by=created_by, workflow=workflow
tenant_id=tenant_id,
created_by=created_by,
workflow=workflow,
is_valid=is_valid,
)
except Exception:
logger.exception(
Expand All @@ -362,11 +400,17 @@ async def create_workflow_from_body(
)
if workflow.revision == 1:
return WorkflowCreateOrUpdateDTO(
workflow_id=workflow.id, status="created", revision=workflow.revision
workflow_id=workflow.id,
status="created",
revision=workflow.revision,
is_valid=is_valid,
)
else:
return WorkflowCreateOrUpdateDTO(
workflow_id=workflow.id, status="updated", revision=workflow.revision
workflow_id=workflow.id,
status="updated",
revision=workflow.revision,
is_valid=is_valid,
)


Expand Down Expand Up @@ -448,7 +492,9 @@ async def update_workflow_by_id(
if workflow_from_db.provisioned:
raise HTTPException(403, detail="Cannot update a provisioned workflow")

workflow = await __get_workflow_raw_data(request, None)
workflow, is_valid = await __get_workflow_raw_data(
request, None, tenant_id=tenant_id
)
parser = Parser()
workflow_interval = parser.parse_interval(workflow)
# In case the workflow name changed to empty string, keep the old name
Expand All @@ -459,12 +505,15 @@ async def update_workflow_by_id(
workflow_from_db.description = workflow.get("description")
workflow_from_db.interval = workflow_interval
workflow_from_db.workflow_raw = yaml.dump(workflow)
workflow_from_db.is_valid = is_valid
workflow_from_db.last_updated = datetime.datetime.now()
session.add(workflow_from_db)
session.commit()
session.refresh(workflow_from_db)
logger.info(f"Updated workflow {workflow_id}", extra={"tenant_id": tenant_id})
return WorkflowCreateOrUpdateDTO(workflow_id=workflow_id, status="updated")
return WorkflowCreateOrUpdateDTO(
workflow_id=workflow_id, status="updated", is_valid=is_valid
)


@router.get("/{workflow_id}/raw", description="Get workflow executions by ID")
Expand Down Expand Up @@ -589,6 +638,7 @@ def get_workflow_by_id(
workflow_raw=workflow.workflow_raw,
last_updated=workflow.last_updated,
disabled=workflow.is_disabled,
is_valid=workflow.is_valid,
)
return WorkflowExecutionsPaginatedResultsDto(
limit=limit,
Expand Down
Loading
Loading