diff --git a/apps/webapp/app/components/code/JSONEditor.tsx b/apps/webapp/app/components/code/JSONEditor.tsx index f40e1ef413..b4c3f7a6ed 100644 --- a/apps/webapp/app/components/code/JSONEditor.tsx +++ b/apps/webapp/app/components/code/JSONEditor.tsx @@ -125,13 +125,13 @@ export function JSONEditor(opts: JSONEditorProps) { } }, [defaultValue, view]); - const clear = useCallback(() => { + const clear = () => { if (view === undefined) return; view.dispatch({ changes: { from: 0, to: view.state.doc.length, insert: undefined }, }); onChange?.(""); - }, [view]); + }; const copy = useCallback(() => { if (view === undefined) return; diff --git a/apps/webapp/app/components/runs/v3/ReplayRunDialog.tsx b/apps/webapp/app/components/runs/v3/ReplayRunDialog.tsx index 207217504d..e2e68a9038 100644 --- a/apps/webapp/app/components/runs/v3/ReplayRunDialog.tsx +++ b/apps/webapp/app/components/runs/v3/ReplayRunDialog.tsx @@ -1,18 +1,39 @@ +import { conform, useForm } from "@conform-to/react"; +import { parse } from "@conform-to/zod"; import { DialogClose } from "@radix-ui/react-dialog"; -import { Form, useNavigation, useSubmit } from "@remix-run/react"; -import { useCallback, useEffect, useRef } from "react"; +import { Form, useActionData, useNavigation, useParams, useSubmit } from "@remix-run/react"; +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { type UseDataFunctionReturn, useTypedFetcher } from "remix-typedjson"; +import { TaskIcon } from "~/assets/icons/TaskIcon"; import { JSONEditor } from "~/components/code/JSONEditor"; import { EnvironmentCombo } from "~/components/environments/EnvironmentLabel"; import { Button } from "~/components/primitives/Buttons"; import { DialogContent, DialogHeader } from "~/components/primitives/Dialog"; -import { Header3 } from "~/components/primitives/Headers"; +import { DurationPicker } from "~/components/primitives/DurationPicker"; +import { Fieldset } from "~/components/primitives/Fieldset"; +import { FormError } from "~/components/primitives/FormError"; +import { Hint } from "~/components/primitives/Hint"; +import { Input } from "~/components/primitives/Input"; import { InputGroup } from "~/components/primitives/InputGroup"; import { Label } from "~/components/primitives/Label"; import { Paragraph } from "~/components/primitives/Paragraph"; +import { type loader as queuesLoader } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues"; +import { + ResizableHandle, + ResizablePanel, + ResizablePanelGroup, +} from "~/components/primitives/Resizable"; import { Select, SelectItem } from "~/components/primitives/Select"; import { Spinner, SpinnerWhite } from "~/components/primitives/Spinner"; +import { TabButton, TabContainer } from "~/components/primitives/Tabs"; +import { TextLink } from "~/components/primitives/TextLink"; import { type loader } from "~/routes/resources.taskruns.$runParam.replay"; +import { docsPath } from "~/utils/pathBuilder"; +import { ReplayRunData } from "~/v3/replayTask"; +import { RectangleStackIcon } from "@heroicons/react/20/solid"; +import { Badge } from "~/components/primitives/Badge"; +import { RunTagInput } from "./RunTagInput"; +import { MachinePresetName } from "@trigger.dev/core/v3"; type ReplayRunDialogProps = { runFriendlyId: string; @@ -21,154 +42,513 @@ type ReplayRunDialogProps = { export function ReplayRunDialog({ runFriendlyId, failedRedirect }: ReplayRunDialogProps) { return ( - + ); } function ReplayContent({ runFriendlyId, failedRedirect }: ReplayRunDialogProps) { - const fetcher = useTypedFetcher(); - const isLoading = fetcher.state === "loading"; + const replayDataFetcher = useTypedFetcher(); + const isLoading = replayDataFetcher.state === "loading"; + const queueFetcher = useTypedFetcher(); + const [environmentIdOverride, setEnvironmentIdOverride] = useState(undefined); + + useEffect(() => { + const searchParams = new URLSearchParams(); + if (environmentIdOverride) { + searchParams.set("environmentIdOverride", environmentIdOverride); + } + + replayDataFetcher.load( + `/resources/taskruns/${runFriendlyId}/replay?${searchParams.toString()}` + ); + }, [runFriendlyId, environmentIdOverride]); + + const params = useParams(); useEffect(() => { - fetcher.load(`/resources/taskruns/${runFriendlyId}/replay`); - }, [runFriendlyId]); + if (params.organizationSlug && params.projectParam && params.envParam) { + const searchParams = new URLSearchParams(); + searchParams.set("type", "custom"); + searchParams.set("per_page", "100"); + + let envSlug = params.envParam; + + if (environmentIdOverride) { + const environmentOverride = replayDataFetcher.data?.environments.find( + (env) => env.id === environmentIdOverride + ); + envSlug = environmentOverride?.slug ?? envSlug; + } + + queueFetcher.load( + `/resources/orgs/${params.organizationSlug}/projects/${ + params.projectParam + }/env/${envSlug}/queues?${searchParams.toString()}` + ); + } + }, [params.organizationSlug, params.projectParam, params.envParam, environmentIdOverride]); + + const customQueues = useMemo(() => { + return queueFetcher.data?.queues ?? []; + }, [queueFetcher.data?.queues]); return ( - <> - Replay this run - {isLoading ? ( -
+
+ Replay this run + {isLoading && !replayDataFetcher.data ? ( +
- ) : fetcher.data ? ( + ) : replayDataFetcher.data ? ( ) : ( <>Failed to get run data )} - +
); } +const startingJson = "{\n\n}"; +const machinePresets = Object.values(MachinePresetName.enum); + function ReplayForm({ - payload, - payloadType, - environment, - environments, failedRedirect, runFriendlyId, -}: UseDataFunctionReturn & { failedRedirect: string; runFriendlyId: string }) { + replayData, + customQueues, + environmentIdOverride, + setEnvironmentIdOverride, +}: { + failedRedirect: string; + runFriendlyId: string; + replayData: UseDataFunctionReturn; + customQueues: UseDataFunctionReturn["queues"]; + environmentIdOverride: string | undefined; + setEnvironmentIdOverride: (environment: string) => void; +}) { const navigation = useNavigation(); const submit = useSubmit(); - const currentJson = useRef(payload); + + const [defaultPayloadJson, setDefaultPayloadJson] = useState( + replayData.payload ?? startingJson + ); + const setPayload = useCallback((code: string) => { + setDefaultPayloadJson(code); + }, []); + const currentPayloadJson = useRef(replayData.payload ?? startingJson); + + const [defaultMetadataJson, setDefaultMetadataJson] = useState( + replayData.metadata ?? startingJson + ); + const setMetadata = useCallback((code: string) => { + setDefaultMetadataJson(code); + }, []); + const currentMetadataJson = useRef(replayData.metadata ?? startingJson); + const formAction = `/resources/taskruns/${runFriendlyId}/replay`; + const isSubmitting = navigation.formAction === formAction; const editablePayload = - payloadType === "application/json" || payloadType === "application/super+json"; + replayData.payloadType === "application/json" || + replayData.payloadType === "application/super+json"; + + const [tab, setTab] = useState<"payload" | "metadata">("payload"); + + const { defaultTaskQueue } = replayData; - const submitForm = useCallback( - (e: React.FormEvent) => { - const formData = new FormData(e.currentTarget); - const data: Record = { - environment: formData.get("environment") as string, - failedRedirect: formData.get("failedRedirect") as string, - }; + const queues = + defaultTaskQueue && !customQueues.some((q) => q.id === defaultTaskQueue.id) + ? [defaultTaskQueue, ...customQueues] + : customQueues; + const queueItems = queues.map((q) => ({ + value: q.type === "task" ? `task/${q.name}` : q.name, + label: q.name, + type: q.type, + paused: q.paused, + })); + + const lastSubmission = useActionData(); + const [ + form, + { + environment, + payload, + metadata, + delaySeconds, + ttlSeconds, + idempotencyKey, + idempotencyKeyTTLSeconds, + queue, + concurrencyKey, + maxAttempts, + maxDurationSeconds, + tags, + version, + machine, + }, + ] = useForm({ + id: "replay-task", + lastSubmission: lastSubmission as any, + onSubmit(event, { formData }) { + event.preventDefault(); if (editablePayload) { - data.payload = currentJson.current; + formData.set(payload.name, currentPayloadJson.current); } + formData.set(metadata.name, currentMetadataJson.current); - submit(data, { - action: formAction, - method: "post", - }); - e.preventDefault(); + submit(formData, { method: "POST", action: formAction }); }, - [currentJson] - ); + onValidate({ formData }) { + return parse(formData, { schema: ReplayRunData }); + }, + }); return ( -
submitForm(e)} className="pt-2"> - {editablePayload ? ( - <> - - Replaying will create a new run using the same or modified payload, executing against - the latest version in your selected environment. - - Payload -
+ + + + + Replaying will create a new run in the selected environment. You can modify the payload, + metadata and run options. + + + +
{ - currentJson.current = v; + if (tab === "payload") { + currentPayloadJson.current = v; + setPayload(v); + } else { + currentMetadataJson.current = v; + setMetadata(v); + } }} - showClearButton={false} - showCopyButton={false} height="100%" min-height="100%" max-height="100%" + additionalActions={ + +
+ { + setTab("payload"); + }} + > + Payload + + { + setTab("metadata"); + }} + > + Metadata + +
+
+ } />
- - ) : null} - - - - - -
+ + + +
+
+ + Options enable you to control the execution behavior of your task.{" "} + Read the docs. + + + + + Overrides the machine preset. + {machine.error} + + + + + {replayData.disableVersionSelection ? ( + Only the latest version is available in the development environment. + ) : ( + Runs task on a specific version. + )} + {version.error} + + + + {replayData.allowArbitraryQueues ? ( + + ) : ( + + )} + Assign run to a specific queue. + {queue.error} + + + + + Add tags to easily filter runs. + {tags.error} + + + + { + // only allow entering integers > 1 + if (["-", "+", ".", "e", "E"].includes(e.key)) { + e.preventDefault(); + } + }} + onBlur={(e) => { + const value = parseInt(e.target.value); + if (value < 1 && e.target.value !== "") { + e.target.value = "1"; + } + }} + /> + Retries failed runs up to the specified number of attempts. + {maxAttempts.error} + + + + + Overrides the maximum compute time limit for the run. + {maxDurationSeconds.error} + + + + + {idempotencyKey.error} + + Specify an idempotency key to ensure that a task is only triggered once with the + same key. + + + + + + Keys expire after 30 days by default. + + {idempotencyKeyTTLSeconds.error} + + + + + + + Limits concurrency by creating a separate queue for each value of the key. + + {concurrencyKey.error} + + + + + Delays run by a specific duration. + {delaySeconds.error} + + + + + Expires the run if it hasn't started within the TTL. + {ttlSeconds.error} + + {form.error} +
+
+
+ + +
- +
+ + + + + +
); diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx index d869f08dc5..b3bc0802bc 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx @@ -357,7 +357,7 @@ function StandardTaskForm({ const currentPayloadJson = useRef(defaultPayloadJson); const [defaultMetadataJson, setDefaultMetadataJson] = useState( - lastRun?.seedMetadata ?? "{}" + lastRun?.seedMetadata ?? startingJson ); const setMetadata = useCallback((code: string) => { setDefaultMetadataJson(code); @@ -447,7 +447,7 @@ function StandardTaskForm({ setConcurrencyKeyValue(template.concurrencyKey ?? ""); setMaxAttemptsValue(template.maxAttempts ?? undefined); setMaxDurationValue(template.maxDurationSeconds ?? 0); - setMachineValue(template.machinePreset ?? ""); + setMachineValue(template.machinePreset ?? undefined); setTagsValue(template.tags ?? []); setQueueValue(template.queue ?? undefined); }} @@ -481,10 +481,10 @@ function StandardTaskForm({ onChange={(v) => { if (!tab || tab === "payload") { currentPayloadJson.current = v; - setDefaultPayloadJson(v); + setPayload(v); } else { currentMetadataJson.current = v; - setDefaultMetadataJson(v); + setMetadata(v); } }} height="100%" @@ -527,21 +527,55 @@ function StandardTaskForm({ Read the docs. - - - Delays run by a specific duration. - {delaySeconds.error} + + + Overrides the machine preset. + {machine.error} - - - Expires the run if it hasn't started within the TTL. - {ttlSeconds.error} + + + {disableVersionSelection ? ( + Only the latest version is available in the development environment. + ) : ( + Runs task on a specific version. + )} + {version.error} - - - Overrides the machine preset. - {machine.error} + + + Delays run by a specific duration. + {delaySeconds.error} - - - {disableVersionSelection ? ( - Only the latest version is available in the development environment. - ) : ( - Runs task on a specific version. - )} - {version.error} + + + Expires the run if it hasn't started within the TTL. + {ttlSeconds.error} {form.error} @@ -912,7 +912,7 @@ function ScheduledTaskForm({ setConcurrencyKeyValue(template.concurrencyKey ?? ""); setMaxAttemptsValue(template.maxAttempts ?? undefined); setMaxDurationValue(template.maxDurationSeconds ?? 0); - setMachineValue(template.machinePreset ?? ""); + setMachineValue(template.machinePreset ?? undefined); setTagsValue(template.tags ?? []); setQueueValue(template.queue ?? undefined); @@ -936,12 +936,12 @@ function ScheduledTaskForm({ setMaxDurationValue(run.maxDurationInSeconds); setTagsValue(run.runTags ?? []); setQueueValue(run.queue); - setMachineValue(run.machinePreset); + setMachineValue(run.machinePreset ?? undefined); }} />
-
+
diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 431cc9f34c..0e87a3d1bd 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -1,6 +1,6 @@ import { parse } from "@conform-to/zod"; import { type ActionFunction, json, type LoaderFunctionArgs } from "@remix-run/node"; -import { prettyPrintPacket } from "@trigger.dev/core/v3"; +import { type EnvironmentType, prettyPrintPacket } from "@trigger.dev/core/v3"; import { typedjson } from "remix-typedjson"; import { z } from "zod"; import { $replica, prisma } from "~/db.server"; @@ -11,20 +11,42 @@ import { requireUserId } from "~/services/session.server"; import { sortEnvironments } from "~/utils/environmentSort"; import { v3RunSpanPath } from "~/utils/pathBuilder"; import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server"; +import parseDuration from "parse-duration"; +import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; +import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server"; +import { ReplayRunData } from "~/v3/replayTask"; const ParamSchema = z.object({ runParam: z.string(), }); +const QuerySchema = z.object({ + environmentIdOverride: z.string().optional(), +}); + export async function loader({ request, params }: LoaderFunctionArgs) { const userId = await requireUserId(request); const { runParam } = ParamSchema.parse(params); + const { environmentIdOverride } = QuerySchema.parse( + Object.fromEntries(new URL(request.url).searchParams) + ); const run = await $replica.taskRun.findFirst({ select: { payload: true, payloadType: true, + seedMetadata: true, + seedMetadataType: true, runtimeEnvironmentId: true, + concurrencyKey: true, + maxAttempts: true, + maxDurationInSeconds: true, + machinePreset: true, + ttl: true, + idempotencyKey: true, + runTags: true, + queue: true, + taskIdentifier: true, project: { select: { environments: { @@ -66,52 +88,78 @@ export async function loader({ request, params }: LoaderFunctionArgs) { throw new Response("Not Found", { status: 404 }); } - const environment = run.project.environments.find((env) => env.id === run.runtimeEnvironmentId); + const runEnvironment = run.project.environments.find( + (env) => env.id === run.runtimeEnvironmentId + ); + const environmentOverride = run.project.environments.find( + (env) => env.id === environmentIdOverride + ); + const environment = environmentOverride ?? runEnvironment; if (!environment) { throw new Response("Environment not found", { status: 404 }); } + const [taskQueue, backgroundWorkers] = await Promise.all([ + findTaskQueue(environment, run.taskIdentifier), + listLatestBackgroundWorkers(environment), + ]); + + const latestVersions = backgroundWorkers.map((v) => v.version); + const disableVersionSelection = environment.type === "DEVELOPMENT"; + const allowArbitraryQueues = backgroundWorkers.at(0)?.engine === "V1"; + return typedjson({ + concurrencyKey: run.concurrencyKey, + maxAttempts: run.maxAttempts, + maxDurationSeconds: run.maxDurationInSeconds, + machinePreset: run.machinePreset, + ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined, + idempotencyKey: run.idempotencyKey, + runTags: run.runTags, payload: await prettyPrintPacket(run.payload, run.payloadType), payloadType: run.payloadType, + queue: run.queue, + metadata: run.seedMetadata + ? await prettyPrintPacket(run.seedMetadata, run.seedMetadataType) + : undefined, + defaultTaskQueue: taskQueue + ? { + id: taskQueue.friendlyId, + name: taskQueue.name.replace(/^task\//, ""), + type: queueTypeFromType(taskQueue.type), + paused: taskQueue.paused, + } + : undefined, + latestVersions, + disableVersionSelection, + allowArbitraryQueues, environment: { ...displayableEnvironment(environment, userId), branchName: environment.branchName ?? undefined, }, environments: sortEnvironments( - run.project.environments.map((environment) => { - return { - ...displayableEnvironment(environment, userId), - branchName: environment.branchName ?? undefined, - }; - }) - ).filter((env) => { - if (env.type === "PREVIEW" && !env.branchName) return false; - return true; - }), + run.project.environments + .filter((env) => env.type !== "PREVIEW" || env.branchName) + .map((env) => ({ + ...displayableEnvironment(env, userId), + branchName: env.branchName ?? undefined, + })) + ), }); } -const FormSchema = z.object({ - environment: z.string().optional(), - payload: z.string().optional(), - failedRedirect: z.string(), -}); - export const action: ActionFunction = async ({ request, params }) => { - const userId = await requireUserId(request); - const { runParam } = ParamSchema.parse(params); const formData = await request.formData(); - const submission = parse(formData, { schema: FormSchema }); + const submission = parse(formData, { schema: ReplayRunData }); if (!submission.value) { return json(submission); } try { - const taskRun = await prisma.taskRun.findUnique({ + const taskRun = await prisma.taskRun.findFirst({ where: { friendlyId: runParam, }, @@ -137,6 +185,18 @@ export const action: ActionFunction = async ({ request, params }) => { const newRun = await replayRunService.call(taskRun, { environmentId: submission.value.environment, payload: submission.value.payload, + metadata: submission.value.metadata, + tags: submission.value.tags, + queue: submission.value.queue, + concurrencyKey: submission.value.concurrencyKey, + maxAttempts: submission.value.maxAttempts, + maxDurationSeconds: submission.value.maxDurationSeconds, + machine: submission.value.machine, + delaySeconds: submission.value.delaySeconds, + idempotencyKey: submission.value.idempotencyKey, + idempotencyKeyTTLSeconds: submission.value.idempotencyKeyTTLSeconds, + ttlSeconds: submission.value.ttlSeconds, + version: submission.value.version, }); if (!newRun) { @@ -176,13 +236,78 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); return redirectWithErrorMessage(submission.value.failedRedirect, request, error.message); - } else { - logger.error("Failed to replay run", { error }); - return redirectWithErrorMessage( - submission.value.failedRedirect, - request, - JSON.stringify(error) - ); } + + logger.error("Failed to replay run", { error }); + return redirectWithErrorMessage( + submission.value.failedRedirect, + request, + JSON.stringify(error) + ); } }; + +async function findTask( + environment: { type: EnvironmentType; id: string }, + taskIdentifier: string +) { + if (environment.type === "DEVELOPMENT") { + return $replica.backgroundWorkerTask.findFirst({ + select: { + queueId: true, + }, + where: { + slug: taskIdentifier, + runtimeEnvironmentId: environment.id, + }, + orderBy: { + createdAt: "desc", + }, + }); + } + + const currentDeployment = await findCurrentWorkerDeployment({ + environmentId: environment.id, + }); + return currentDeployment?.worker?.tasks.find((t) => t.slug === taskIdentifier); +} + +async function findTaskQueue( + environment: { type: EnvironmentType; id: string }, + taskIdentifier: string +) { + const task = await findTask(environment, taskIdentifier); + + if (!task?.queueId) { + return undefined; + } + + return $replica.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: environment.id, + id: task.queueId, + }, + select: { + friendlyId: true, + name: true, + type: true, + paused: true, + }, + }); +} + +function listLatestBackgroundWorkers(environment: { id: string }, limit = 20) { + return $replica.backgroundWorker.findMany({ + where: { + runtimeEnvironmentId: environment.id, + }, + select: { + version: true, + engine: true, + }, + orderBy: { + createdAt: "desc", + }, + take: limit, + }); +} diff --git a/apps/webapp/app/v3/replayTask.ts b/apps/webapp/app/v3/replayTask.ts new file mode 100644 index 0000000000..b7283c3c3f --- /dev/null +++ b/apps/webapp/app/v3/replayTask.ts @@ -0,0 +1,47 @@ +import { z } from "zod"; +import { RunOptionsData } from "./testTask"; + +export const ReplayRunData = z + .object({ + environment: z.string().optional(), + payload: z + .string() + .optional() + .transform((val, ctx) => { + if (!val) { + return {}; + } + + try { + return JSON.parse(val); + } catch { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "Payload must be a valid JSON string", + }); + return z.NEVER; + } + }), + metadata: z + .string() + .optional() + .transform((val, ctx) => { + if (!val) { + return {}; + } + + try { + return JSON.parse(val); + } catch { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "Metadata must be a valid JSON string", + }); + return z.NEVER; + } + }), + failedRedirect: z.string(), + }) + .and(RunOptionsData); + +export type ReplayRunData = z.infer; diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index 5b7ca7098b..2c45fa2b03 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -1,27 +1,26 @@ import { + type MachinePresetName, conditionallyImportPacket, - IOPacket, parsePacket, - RunTags, - stringifyIO, } from "@trigger.dev/core/v3"; -import { replaceSuperJsonPayload } from "@trigger.dev/core/v3/utils/ioSerialization"; -import { TaskRun } from "@trigger.dev/database"; +import { type TaskRun } from "@trigger.dev/database"; import { findEnvironmentById } from "~/models/runtimeEnvironment.server"; import { getTagsForRunId } from "~/models/taskRunTag.server"; import { logger } from "~/services/logger.server"; import { BaseService } from "./baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; +import { type RunOptionsData } from "../testTask"; type OverrideOptions = { environmentId?: string; - payload?: string; -}; + payload?: unknown; + metadata?: unknown; +} & RunOptionsData; export class ReplayTaskRunService extends BaseService { - public async call(existingTaskRun: TaskRun, overrideOptions?: OverrideOptions) { + public async call(existingTaskRun: TaskRun, overrideOptions: OverrideOptions = {}) { const authenticatedEnvironment = await findEnvironmentById( - overrideOptions?.environmentId ?? existingTaskRun.runtimeEnvironmentId + overrideOptions.environmentId ?? existingTaskRun.runtimeEnvironmentId ); if (!authenticatedEnvironment) { return; @@ -36,57 +35,15 @@ export class ReplayTaskRunService extends BaseService { taskRunFriendlyId: existingTaskRun.friendlyId, }); - let payloadPacket: IOPacket; - - if (overrideOptions?.payload) { - if (existingTaskRun.payloadType === "application/super+json") { - const newPayload = await replaceSuperJsonPayload( - existingTaskRun.payload, - overrideOptions.payload - ); - payloadPacket = await stringifyIO(newPayload); - } else { - payloadPacket = await conditionallyImportPacket({ - data: overrideOptions.payload, - dataType: existingTaskRun.payloadType, - }); - } - } else { - payloadPacket = await conditionallyImportPacket({ - data: existingTaskRun.payload, - dataType: existingTaskRun.payloadType, - }); - } - - const parsedPayload = - payloadPacket.dataType === "application/json" - ? await parsePacket(payloadPacket) - : payloadPacket.data; - - logger.info("Replaying task run payload", { - taskRunId: existingTaskRun.id, - taskRunFriendlyId: existingTaskRun.friendlyId, - payloadPacketType: payloadPacket.dataType, - }); - - const metadata = existingTaskRun.seedMetadata - ? await parsePacket({ - data: existingTaskRun.seedMetadata, - dataType: existingTaskRun.seedMetadataType, - }) - : undefined; + const payload = overrideOptions.payload ?? (await this.getExistingPayload(existingTaskRun)); + const metadata = overrideOptions.metadata ?? (await this.getExistingMetadata(existingTaskRun)); + const tags = overrideOptions.tags ?? existingTaskRun.runTags; try { - const tags = await getTagsForRunId({ - friendlyId: existingTaskRun.friendlyId, - environmentId: authenticatedEnvironment.id, - }); - - //get the queue from the original run, so we can use the same settings on the replay const taskQueue = await this._prisma.taskQueue.findFirst({ where: { runtimeEnvironmentId: authenticatedEnvironment.id, - name: existingTaskRun.queue, + name: overrideOptions.queue ?? existingTaskRun.queue, }, }); @@ -95,18 +52,34 @@ export class ReplayTaskRunService extends BaseService { existingTaskRun.taskIdentifier, authenticatedEnvironment, { - payload: parsedPayload, + payload, options: { queue: taskQueue ? { name: taskQueue.name, } : undefined, - concurrencyKey: existingTaskRun.concurrencyKey ?? undefined, test: existingTaskRun.isTest, - payloadType: payloadPacket.dataType, - tags: tags?.map((t) => t.name) as RunTags, - metadata, + tags, + metadata: metadata, + delay: overrideOptions.delaySeconds + ? new Date(Date.now() + overrideOptions.delaySeconds * 1000) + : undefined, + ttl: overrideOptions.ttlSeconds, + idempotencyKey: overrideOptions.idempotencyKey, + idempotencyKeyTTL: overrideOptions.idempotencyKeyTTLSeconds + ? `${overrideOptions.idempotencyKeyTTLSeconds}s` + : undefined, + concurrencyKey: + overrideOptions.concurrencyKey ?? existingTaskRun.concurrencyKey ?? undefined, + maxAttempts: overrideOptions.maxAttempts, + maxDuration: overrideOptions.maxDurationSeconds, + machine: + overrideOptions.machine ?? + (existingTaskRun.machinePreset as MachinePresetName) ?? + undefined, + lockToVersion: + overrideOptions.version === "latest" ? undefined : overrideOptions.version, }, }, { @@ -131,4 +104,26 @@ export class ReplayTaskRunService extends BaseService { return; } } + + private async getExistingPayload(existingTaskRun: TaskRun) { + const existingPayloadPacket = await conditionallyImportPacket({ + data: existingTaskRun.payload, + dataType: existingTaskRun.payloadType, + }); + + return existingPayloadPacket.dataType === "application/json" + ? await parsePacket(existingPayloadPacket) + : existingPayloadPacket.data; + } + + private async getExistingMetadata(existingTaskRun: TaskRun) { + if (!existingTaskRun.seedMetadata) { + return undefined; + } + + return parsePacket({ + data: existingTaskRun.seedMetadata, + dataType: existingTaskRun.seedMetadataType, + }); + } } diff --git a/apps/webapp/app/v3/testTask.ts b/apps/webapp/app/v3/testTask.ts index b48b1fb59c..79d9e97ec3 100644 --- a/apps/webapp/app/v3/testTask.ts +++ b/apps/webapp/app/v3/testTask.ts @@ -1,6 +1,55 @@ import { z } from "zod"; import { MachinePresetName } from "@trigger.dev/core/v3/schemas"; +export const RunOptionsData = z.object({ + delaySeconds: z + .number() + .min(0) + .optional() + .transform((val) => (val === 0 ? undefined : val)), + ttlSeconds: z + .number() + .min(0) + .optional() + .transform((val) => (val === 0 ? undefined : val)), + idempotencyKey: z.string().optional(), + idempotencyKeyTTLSeconds: z + .number() + .min(0) + .optional() + .transform((val) => (val === 0 ? undefined : val)), + queue: z.string().optional(), + concurrencyKey: z.string().optional(), + maxAttempts: z.number().min(1).optional(), + machine: MachinePresetName.optional(), + maxDurationSeconds: z + .number() + .min(0) + .optional() + .transform((val) => (val === 0 ? undefined : val)), + tags: z + .string() + .optional() + .transform((val) => { + if (!val || val.trim() === "") { + return undefined; + } + return val + .split(",") + .map((tag) => tag.trim()) + .filter((tag) => tag.length > 0); + }) + .refine((tags) => !tags || tags.length <= 10, { + message: "Maximum 10 tags allowed", + }) + .refine((tags) => !tags || tags.every((tag) => tag.length <= 128), { + message: "Each tag must be at most 128 characters long", + }), + version: z.string().optional(), +}); + +export type RunOptionsData = z.infer; + export const TestTaskData = z .discriminatedUnion("triggerSource", [ z.object({ @@ -53,54 +102,11 @@ export const TestTaskData = z externalId: z.preprocess((val) => (val === "" ? undefined : val), z.string().optional()), }), ]) + .and(RunOptionsData) .and( z.object({ taskIdentifier: z.string(), environmentId: z.string(), - delaySeconds: z - .number() - .min(0) - .optional() - .transform((val) => (val === 0 ? undefined : val)), - ttlSeconds: z - .number() - .min(0) - .optional() - .transform((val) => (val === 0 ? undefined : val)), - idempotencyKey: z.string().optional(), - idempotencyKeyTTLSeconds: z - .number() - .min(0) - .optional() - .transform((val) => (val === 0 ? undefined : val)), - queue: z.string().optional(), - concurrencyKey: z.string().optional(), - maxAttempts: z.number().min(1).optional(), - machine: MachinePresetName.optional(), - maxDurationSeconds: z - .number() - .min(0) - .optional() - .transform((val) => (val === 0 ? undefined : val)), - tags: z - .string() - .optional() - .transform((val) => { - if (!val || val.trim() === "") { - return undefined; - } - return val - .split(",") - .map((tag) => tag.trim()) - .filter((tag) => tag.length > 0); - }) - .refine((tags) => !tags || tags.length <= 10, { - message: "Maximum 10 tags allowed", - }) - .refine((tags) => !tags || tags.every((tag) => tag.length <= 128), { - message: "Each tag must be at most 128 characters long", - }), - version: z.string().optional(), }) );