From a6acb33434bb7ee928424d064be1bdae37cdb7bc Mon Sep 17 00:00:00 2001 From: centdix Date: Mon, 6 Oct 2025 11:00:59 +0000 Subject: [PATCH 01/16] use own folder for memory --- backend/windmill-worker/src/memory_common.rs | 8 +++----- docker-compose.yml | 7 +++++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/backend/windmill-worker/src/memory_common.rs b/backend/windmill-worker/src/memory_common.rs index 3b934c2ee3361..16821c555b289 100644 --- a/backend/windmill-worker/src/memory_common.rs +++ b/backend/windmill-worker/src/memory_common.rs @@ -2,12 +2,11 @@ use crate::ai::types::OpenAIMessage; use std::path::PathBuf; use tokio::{fs, io::AsyncWriteExt}; use uuid::Uuid; -use windmill_common::worker::TMP_LOGS_DIR; +use windmill_common::worker::TMP_MEMORY_DIR; /// Get the file path for storing memory for a specific AI agent step pub fn path_for(workspace_id: &str, conversation_id: Uuid, step_id: &str) -> PathBuf { - PathBuf::from(TMP_LOGS_DIR) - .join("memory") + PathBuf::from(TMP_MEMORY_DIR) .join(workspace_id) .join(conversation_id.to_string()) .join(format!("{step_id}.json")) @@ -61,8 +60,7 @@ pub async fn delete_conversation_from_disk( workspace_id: &str, conversation_id: Uuid, ) -> anyhow::Result<()> { - let conversation_path = PathBuf::from(TMP_LOGS_DIR) - .join("memory") + let conversation_path = PathBuf::from(TMP_MEMORY_DIR) .join(workspace_id) .join(conversation_id.to_string()); diff --git a/docker-compose.yml b/docker-compose.yml index f8831523ae899..af1e2a1b3a616 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -46,6 +46,9 @@ services: condition: service_healthy volumes: - worker_logs:/tmp/windmill/logs + # for AI agent memory + - worker_memory:/tmp/windmill/memory + logging: *default-logging windmill_worker: @@ -72,6 +75,9 @@ services: - /var/run/docker.sock:/var/run/docker.sock - worker_dependency_cache:/tmp/windmill/cache - worker_logs:/tmp/windmill/logs + # for AI agent memory + - worker_memory:/tmp/windmill/memory + logging: *default-logging ## This worker is specialized for "native" jobs. Native jobs run in-process and thus are much more lightweight than other jobs @@ -188,6 +194,7 @@ volumes: db_data: null worker_dependency_cache: null worker_logs: null + worker_memory: null windmill_index: null lsp_cache: null caddy_data: null From cb70f03fa57dbe939b2fb8388c2f72b25d717d1d Mon Sep 17 00:00:00 2001 From: centdix Date: Mon, 6 Oct 2025 11:08:52 +0000 Subject: [PATCH 02/16] fixes --- .../windmill-api/src/flow_conversations.rs | 2 +- backend/windmill-api/src/jobs.rs | 26 ++++---- backend/windmill-common/src/worker.rs | 1 + backend/windmill-queue/src/jobs.rs | 65 +++++++------------ backend/windmill-worker/src/ai_executor.rs | 13 ++-- 5 files changed, 44 insertions(+), 63 deletions(-) diff --git a/backend/windmill-api/src/flow_conversations.rs b/backend/windmill-api/src/flow_conversations.rs index 96ba5f3af6ca1..b4d3b5571b6db 100644 --- a/backend/windmill-api/src/flow_conversations.rs +++ b/backend/windmill-api/src/flow_conversations.rs @@ -88,7 +88,7 @@ async fn list_conversations( .offset(offset as i64); let sql = sqlb.sql().map_err(|e| { - windmill_common::error::Error::InternalErr(format!("Failed to build SQL: {}", e)) + windmill_common::error::Error::internal_err(format!("Failed to build SQL: {}", e)) })?; let conversations = sqlx::query_as::(&sql) diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index e721facc6ea3e..7f5a9bbd7da5c 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -3942,7 +3942,7 @@ async fn handle_chat_conversation_messages( w_id: &str, flow_path: &str, run_query: &RunJobQuery, - args: &PushArgsOwned, + user_message: Option<&str>, uuid: Uuid, ) -> error::Result<()> { let memory_id = run_query.memory_id.ok_or_else(|| { @@ -3951,13 +3951,11 @@ async fn handle_chat_conversation_messages( ) })?; - let user_msg_raw = args.args.get("user_message").ok_or_else(|| { - windmill_common::error::Error::BadRequest( + if user_message.is_none() { + return Err(windmill_common::error::Error::BadRequest( "user_message argument is required for chat-enabled flows".to_string(), - ) - })?; - - let user_msg = serde_json::from_str::(user_msg_raw.get())?; + )); + } // Create conversation with provided ID (or get existing one) flow_conversations::get_or_create_conversation_with_id( @@ -3965,7 +3963,7 @@ async fn handle_chat_conversation_messages( w_id, flow_path, &authed.username, - &user_msg, + &user_message.unwrap(), memory_id, ) .await?; @@ -3975,7 +3973,7 @@ async fn handle_chat_conversation_messages( tx, memory_id, MessageType::User, - &user_msg, + &user_message.unwrap(), None, // No job_id for user message w_id, ) @@ -4082,7 +4080,7 @@ pub async fn run_flow_by_path_inner( apply_preprocessor: !run_query.skip_preprocessor.unwrap_or(false) && has_preprocessor.unwrap_or(false), }, - PushArgs { args: &args.args, extra: args.extra.clone() }, + PushArgs { args: &args.args, extra: args.extra }, authed.display_username(), email, permissioned_as, @@ -4113,6 +4111,7 @@ pub async fn run_flow_by_path_inner( } // Handle conversation messages for chat-enabled flows + let user_message = args.args.get("user_message").map(|v| v.get()); if chat_input_enabled.unwrap_or(false) { handle_chat_conversation_messages( &mut tx, @@ -4120,7 +4119,7 @@ pub async fn run_flow_by_path_inner( &w_id, &flow_path.to_string(), &run_query, - &args, + user_message, uuid, ) .await?; @@ -5581,7 +5580,7 @@ pub async fn run_wait_result_flow_by_path_internal( apply_preprocessor: !run_query.skip_preprocessor.unwrap_or(false) && has_preprocessor.unwrap_or(false), }, - PushArgs { args: &args.args, extra: args.extra.clone() }, + PushArgs { args: &args.args, extra: args.extra }, authed.display_username(), email, permissioned_as, @@ -5612,6 +5611,7 @@ pub async fn run_wait_result_flow_by_path_internal( } // Handle conversation messages for chat-enabled flows + let user_message = args.args.get("user_message").map(|v| v.get()); if chat_input_enabled.unwrap_or(false) { handle_chat_conversation_messages( &mut tx, @@ -5619,7 +5619,7 @@ pub async fn run_wait_result_flow_by_path_internal( &w_id, &flow_path.to_string(), &run_query, - &args, + user_message, uuid, ) .await?; diff --git a/backend/windmill-common/src/worker.rs b/backend/windmill-common/src/worker.rs index 912ba5c1f74dc..1aaecc2fe33d7 100644 --- a/backend/windmill-common/src/worker.rs +++ b/backend/windmill-common/src/worker.rs @@ -457,6 +457,7 @@ pub async fn store_pull_query(wc: &WorkerConfig) { pub const TMP_DIR: &str = "/tmp/windmill"; pub const TMP_LOGS_DIR: &str = concatcp!(TMP_DIR, "/logs"); +pub const TMP_MEMORY_DIR: &str = concatcp!(TMP_DIR, "/memory"); pub const HUB_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "hub"); diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 3010874ac50ad..f8cf45b5c311b 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -831,53 +831,34 @@ pub async fn add_completed_job( // Update conversation message if it's a flow and it's done (both success and error cases) if !skipped && flow_is_done { let chat_input_enabled = queued_job.parse_chat_input_enabled(); + let value = serde_json::to_value(result.0) + .map_err(|e| Error::InternalErr(format!("Failed to serialize result: {e}")))?; if chat_input_enabled.unwrap_or(false) { - let content = if let Ok(wrapper) = serde_json::from_value::( - serde_json::to_value(result.0).unwrap_or(serde_json::Value::Null), - ) { - // Successfully deserialized to OutputWrapper, use the output field - wrapper.output - } else { - // No string output field, use the whole result - serde_json::to_value(result.0) - .ok() - .and_then(|v| { - if let serde_json::Value::String(s) = v { - Some(s) - } else { - serde_json::to_string_pretty(&v).ok() - } - }) - .unwrap_or_else(|| { - if success { - "Job completed successfully".to_string() - } else { - "Job failed".to_string() - } - }) - }; - - // check if flow_conversation_message exists - let flow_conversation_message_exists = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM flow_conversation_message WHERE job_id = $1 AND message_type = 'assistant')", - queued_job.id - ) - .fetch_one(db) - .await?; + let content = + if let Ok(wrapper) = serde_json::from_value::(value.clone()) { + // Successfully deserialized to OutputWrapper, use the output field + wrapper.output + } else { + // No string output field, use the whole result + if let serde_json::Value::String(s) = value { + s + } else { + serde_json::to_string_pretty(&value) + .unwrap_or_else(|e| format!("Failed to serialize result: {e}")) + } + }; - if flow_conversation_message_exists.unwrap_or(false) { - // Update the assistant message using direct DB access - let _ = sqlx::query!( - "UPDATE flow_conversation_message + // Update the assistant message + let _ = sqlx::query!( + "UPDATE flow_conversation_message SET content = $1 WHERE job_id = $2 ", - content, - queued_job.id, - ) - .execute(db) - .await; - } + content, + queued_job.id, + ) + .execute(db) + .await; } } diff --git a/backend/windmill-worker/src/ai_executor.rs b/backend/windmill-worker/src/ai_executor.rs index 08c8ede63af7c..66c82b7c7916e 100644 --- a/backend/windmill-worker/src/ai_executor.rs +++ b/backend/windmill-worker/src/ai_executor.rs @@ -471,13 +471,12 @@ pub async fn run_agent( // Take the last n messages let start_idx = loaded_messages.len().saturating_sub(context_length); let mut messages_to_load = loaded_messages[start_idx..].to_vec(); + let first_non_tool_message_index = + messages_to_load.iter().position(|m| m.role != "tool"); - // Remove the first message if its role is "tool" to avoid OpenAI API error - // "messages with role 'tool' must be a response to a preceeding message with 'tool_calls'" - if let Some(first_msg) = messages_to_load.first() { - if first_msg.role == "tool" { - messages_to_load.remove(0); - } + // Remove the first messages if their role is "tool" to avoid OpenAI API error + if let Some(index) = first_non_tool_message_index { + messages_to_load = messages_to_load[index..].to_vec(); } messages.extend(messages_to_load); @@ -934,7 +933,7 @@ pub async fn run_agent( let (handle_result, updated_occupancy) = join_handle.await.map_err(|e| { Error::internal_err(format!( - "Tool execution task panicked: {}", + "Tool execution task failed: {}", e )) })?; From 4714e5f66cd88afc11f47e91eab326c40730ca2a Mon Sep 17 00:00:00 2001 From: centdix Date: Mon, 6 Oct 2025 11:30:18 +0000 Subject: [PATCH 03/16] better chat interface --- .../lib/components/details/DetailPageLayout.svelte | 8 ++++++-- frontend/src/lib/components/flows/flowInfers.ts | 2 +- .../(root)/(logged)/flows/get/[...path]/+page.svelte | 12 +++++++----- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/frontend/src/lib/components/details/DetailPageLayout.svelte b/frontend/src/lib/components/details/DetailPageLayout.svelte index 57ecc6cff0261..9a68b91aa2d03 100644 --- a/frontend/src/lib/components/details/DetailPageLayout.svelte +++ b/frontend/src/lib/components/details/DetailPageLayout.svelte @@ -8,6 +8,7 @@ flow_json?: any | undefined selected: string forceSmallScreen?: boolean + isChatMode?: boolean header?: import('svelte').Snippet form?: import('svelte').Snippet scriptRender?: import('svelte').Snippet @@ -21,6 +22,7 @@ flow_json = undefined, selected = $bindable(), forceSmallScreen = false, + isChatMode = false, header, form, scriptRender: script, @@ -74,8 +76,10 @@ {@render header?.()}
- Run form - Inputs + {isChatMode ? 'Chat' : 'Run form'} + {#if !isChatMode} + Inputs + {/if} {#if !isOperator} Triggers {/if} diff --git a/frontend/src/lib/components/flows/flowInfers.ts b/frontend/src/lib/components/flows/flowInfers.ts index 652713f7d7d44..378dd310947c2 100644 --- a/frontend/src/lib/components/flows/flowInfers.ts +++ b/frontend/src/lib/components/flows/flowInfers.ts @@ -30,7 +30,7 @@ export const AI_AGENT_SCHEMA = { streaming: { type: 'boolean', description: 'Whether to stream the output of the AI agent.', - default: false, + default: true, showExpr: "fields.output_type === 'text'" }, messages_context_length: { diff --git a/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte b/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte index 8b4b289d95a3d..c2ee38d3546fe 100644 --- a/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte @@ -508,6 +508,7 @@ bind:selected={rightPaneSelected} isOperator={$userStore?.operator} forceSmallScreen={chatInputEnabled} + isChatMode={chatInputEnabled} flow_json={{ value: flow?.value, summary: flow?.summary, @@ -583,7 +584,9 @@ {#if flow}
{#if flow?.archived} This flow was archived @@ -612,8 +615,7 @@ {#if chatInputEnabled}
-
+
{/if}
-
+
Date: Mon, 6 Oct 2025 11:38:09 +0000 Subject: [PATCH 04/16] fix export tab --- frontend/src/lib/components/details/DetailPageLayout.svelte | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/frontend/src/lib/components/details/DetailPageLayout.svelte b/frontend/src/lib/components/details/DetailPageLayout.svelte index 9a68b91aa2d03..a1f4ea47344e4 100644 --- a/frontend/src/lib/components/details/DetailPageLayout.svelte +++ b/frontend/src/lib/components/details/DetailPageLayout.svelte @@ -2,6 +2,7 @@ import { Tabs, Tab, TabContent } from '$lib/components/common' import { Pane, Splitpanes } from 'svelte-splitpanes' import DetailPageDetailPanel from './DetailPageDetailPanel.svelte' + import FlowViewerInner from '../FlowViewerInner.svelte' interface Props { isOperator?: boolean @@ -101,6 +102,11 @@ {@render triggers?.()} + {#if flow_json} + + + + {/if} {@render script?.()} From bd9fbdce4e113e088ddb9597d7d89385983a8104 Mon Sep 17 00:00:00 2001 From: centdix Date: Mon, 6 Oct 2025 11:48:45 +0000 Subject: [PATCH 05/16] move in folder --- frontend/src/lib/components/FlowPreviewContent.svelte | 2 +- frontend/src/lib/components/flows/content/FlowInput.svelte | 2 +- .../flows/{ => conversations}/FlowChatInterface.svelte | 0 .../flows/{ => conversations}/FlowChatMessage.svelte | 0 .../flows/{ => conversations}/FlowConversationsSidebar.svelte | 0 .../routes/(root)/(logged)/flows/get/[...path]/+page.svelte | 4 ++-- 6 files changed, 4 insertions(+), 4 deletions(-) rename frontend/src/lib/components/flows/{ => conversations}/FlowChatInterface.svelte (100%) rename frontend/src/lib/components/flows/{ => conversations}/FlowChatMessage.svelte (100%) rename frontend/src/lib/components/flows/{ => conversations}/FlowConversationsSidebar.svelte (100%) diff --git a/frontend/src/lib/components/FlowPreviewContent.svelte b/frontend/src/lib/components/FlowPreviewContent.svelte index f7e78a4449125..69541052dbef0 100644 --- a/frontend/src/lib/components/FlowPreviewContent.svelte +++ b/frontend/src/lib/components/FlowPreviewContent.svelte @@ -38,7 +38,7 @@ import { getStepHistoryLoaderContext } from './stepHistoryLoader.svelte' import { aiChatManager } from './copilot/chat/AIChatManager.svelte' import { stateSnapshot } from '$lib/svelte5Utils.svelte' - import FlowChatInterface from './flows/FlowChatInterface.svelte' + import FlowChatInterface from './flows/conversations/FlowChatInterface.svelte' interface Props { previewMode: 'upTo' | 'whole' diff --git a/frontend/src/lib/components/flows/content/FlowInput.svelte b/frontend/src/lib/components/flows/content/FlowInput.svelte index ea15d8f4714d9..80d8c4c4b2d09 100644 --- a/frontend/src/lib/components/flows/content/FlowInput.svelte +++ b/frontend/src/lib/components/flows/content/FlowInput.svelte @@ -44,7 +44,7 @@ import { refreshStateStore } from '$lib/svelte5Utils.svelte' import type { ScriptLang } from '$lib/gen' import { deepEqual } from 'fast-equals' - import FlowChatInterface from '../FlowChatInterface.svelte' + import FlowChatInterface from '$lib/components/flows/conversations/FlowChatInterface.svelte' import Toggle from '$lib/components/Toggle.svelte' import { AI_AGENT_SCHEMA } from '../flowInfers' import { nextId } from '../flowModuleNextId' diff --git a/frontend/src/lib/components/flows/FlowChatInterface.svelte b/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte similarity index 100% rename from frontend/src/lib/components/flows/FlowChatInterface.svelte rename to frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte diff --git a/frontend/src/lib/components/flows/FlowChatMessage.svelte b/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte similarity index 100% rename from frontend/src/lib/components/flows/FlowChatMessage.svelte rename to frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte diff --git a/frontend/src/lib/components/flows/FlowConversationsSidebar.svelte b/frontend/src/lib/components/flows/conversations/FlowConversationsSidebar.svelte similarity index 100% rename from frontend/src/lib/components/flows/FlowConversationsSidebar.svelte rename to frontend/src/lib/components/flows/conversations/FlowConversationsSidebar.svelte diff --git a/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte b/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte index c2ee38d3546fe..d8db733b7baa9 100644 --- a/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte @@ -17,8 +17,8 @@ import { Badge as HeaderBadge, Alert } from '$lib/components/common' import MoveDrawer from '$lib/components/MoveDrawer.svelte' import RunForm from '$lib/components/RunForm.svelte' - import FlowChatInterface from '$lib/components/flows/FlowChatInterface.svelte' - import FlowConversationsSidebar from '$lib/components/flows/FlowConversationsSidebar.svelte' + import FlowChatInterface from '$lib/components/flows/conversations/FlowChatInterface.svelte' + import FlowConversationsSidebar from '$lib/components/flows/conversations/FlowConversationsSidebar.svelte' import ShareModal from '$lib/components/ShareModal.svelte' import { enterpriseLicense, userStore, workspaceStore } from '$lib/stores' import { sendUserToast } from '$lib/toast' From 107d6610b466fa768834a3194149e308c2415945 Mon Sep 17 00:00:00 2001 From: centdix Date: Mon, 6 Oct 2025 12:13:05 +0000 Subject: [PATCH 06/16] dont show flow graph if chat mode --- .../conversations/FlowChatInterface.svelte | 10 +++-- .../(logged)/flows/get/[...path]/+page.svelte | 44 ++++++++++--------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte b/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte index 3919627e50663..539f1643e4a7d 100644 --- a/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte +++ b/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte @@ -411,7 +411,7 @@ } -
+
Send a message to run the flow and see the results

{:else} - {#each messages as message (message.id)} - - {/each} +
+ {#each messages as message (message.id)} + + {/each} +
{/if}
diff --git a/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte b/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte index d8db733b7baa9..8c09140928ce8 100644 --- a/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte @@ -706,27 +706,29 @@
{/if}
-
- { - if (e.detail) { - stepDetail = e.detail - rightPaneSelected = 'flow_step' - } else { - stepDetail = undefined - rightPaneSelected = 'saved_inputs' - } - }} - on:triggerDetail={(e) => { - rightPaneSelected = 'triggers' - }} - /> -
+ {#if !chatInputEnabled} +
+ { + if (e.detail) { + stepDetail = e.detail + rightPaneSelected = 'flow_step' + } else { + stepDetail = undefined + rightPaneSelected = 'saved_inputs' + } + }} + on:triggerDetail={(e) => { + rightPaneSelected = 'triggers' + }} + /> +
+ {/if}
{/if} {/snippet} From cee1d9b96fd206099b1002328da0c2e3046fe5ad Mon Sep 17 00:00:00 2001 From: centdix Date: Mon, 6 Oct 2025 12:34:12 +0000 Subject: [PATCH 07/16] fix --- backend/windmill-queue/src/jobs.rs | 2 +- .../components/flows/conversations/FlowChatInterface.svelte | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index f8cf45b5c311b..3f8699bff1afb 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -832,7 +832,7 @@ pub async fn add_completed_job( if !skipped && flow_is_done { let chat_input_enabled = queued_job.parse_chat_input_enabled(); let value = serde_json::to_value(result.0) - .map_err(|e| Error::InternalErr(format!("Failed to serialize result: {e}")))?; + .map_err(|e| Error::internal_err(format!("Failed to serialize result: {e}")))?; if chat_input_enabled.unwrap_or(false) { let content = if let Ok(wrapper) = serde_json::from_value::(value.clone()) { diff --git a/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte b/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte index 539f1643e4a7d..514bcfe696745 100644 --- a/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte +++ b/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte @@ -423,11 +423,11 @@ {/if} {#if isLoadingMessages} -
- +
+
{:else if messages.length === 0} -
+

Start a conversation

Send a message to run the flow and see the results

From 9b05c40f67f205e248478a100e98d04a60ae5704 Mon Sep 17 00:00:00 2001 From: centdix Date: Mon, 6 Oct 2025 12:59:53 +0000 Subject: [PATCH 08/16] fix too long title --- backend/windmill-api/src/flow_conversations.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/backend/windmill-api/src/flow_conversations.rs b/backend/windmill-api/src/flow_conversations.rs index b4d3b5571b6db..efe11c2045eca 100644 --- a/backend/windmill-api/src/flow_conversations.rs +++ b/backend/windmill-api/src/flow_conversations.rs @@ -123,6 +123,12 @@ pub async fn get_or_create_conversation_with_id( return Ok(existing); } + // Truncate title to 25 char characters max + let title = if title.len() > 25 { + format!("{}...", &title[..25]) + } else { + title.to_string() + }; // Create new conversation with provided ID let conversation = sqlx::query_as!( FlowConversation, From af0e810fe794e9358ba849e756ac3639b5656d89 Mon Sep 17 00:00:00 2001 From: centdix Date: Mon, 6 Oct 2025 13:06:43 +0000 Subject: [PATCH 09/16] fix user message --- backend/windmill-api/src/jobs.rs | 26 +++++++----- .../conversations/FlowChatInterface.svelte | 41 ++++++++++--------- .../conversations/FlowChatMessage.svelte | 5 ++- 3 files changed, 41 insertions(+), 31 deletions(-) diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 7f5a9bbd7da5c..813e912d6e0ca 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -3942,7 +3942,7 @@ async fn handle_chat_conversation_messages( w_id: &str, flow_path: &str, run_query: &RunJobQuery, - user_message: Option<&str>, + user_message_raw: Option<&Box>, uuid: Uuid, ) -> error::Result<()> { let memory_id = run_query.memory_id.ok_or_else(|| { @@ -3951,11 +3951,17 @@ async fn handle_chat_conversation_messages( ) })?; - if user_message.is_none() { - return Err(windmill_common::error::Error::BadRequest( + let user_message_raw = user_message_raw.ok_or_else(|| { + windmill_common::error::Error::BadRequest( "user_message argument is required for chat-enabled flows".to_string(), - )); - } + ) + })?; + + // Deserialize the RawValue to get the actual string without quotes + let user_message: String = serde_json::from_str(user_message_raw.get()) + .map_err(|e| windmill_common::error::Error::BadRequest( + format!("Failed to deserialize user_message: {}", e) + ))?; // Create conversation with provided ID (or get existing one) flow_conversations::get_or_create_conversation_with_id( @@ -3963,7 +3969,7 @@ async fn handle_chat_conversation_messages( w_id, flow_path, &authed.username, - &user_message.unwrap(), + &user_message, memory_id, ) .await?; @@ -3973,7 +3979,7 @@ async fn handle_chat_conversation_messages( tx, memory_id, MessageType::User, - &user_message.unwrap(), + &user_message, None, // No job_id for user message w_id, ) @@ -4111,7 +4117,6 @@ pub async fn run_flow_by_path_inner( } // Handle conversation messages for chat-enabled flows - let user_message = args.args.get("user_message").map(|v| v.get()); if chat_input_enabled.unwrap_or(false) { handle_chat_conversation_messages( &mut tx, @@ -4119,7 +4124,7 @@ pub async fn run_flow_by_path_inner( &w_id, &flow_path.to_string(), &run_query, - user_message, + args.args.get("user_message"), uuid, ) .await?; @@ -5611,7 +5616,6 @@ pub async fn run_wait_result_flow_by_path_internal( } // Handle conversation messages for chat-enabled flows - let user_message = args.args.get("user_message").map(|v| v.get()); if chat_input_enabled.unwrap_or(false) { handle_chat_conversation_messages( &mut tx, @@ -5619,7 +5623,7 @@ pub async fn run_wait_result_flow_by_path_internal( &w_id, &flow_path.to_string(), &run_query, - user_message, + args.args.get("user_message"), uuid, ) .await?; diff --git a/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte b/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte index 514bcfe696745..52f7140e0cd53 100644 --- a/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte +++ b/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte @@ -21,6 +21,7 @@ interface ChatMessage extends FlowConversationMessage { loading?: boolean + streaming?: boolean } let { @@ -48,17 +49,6 @@ const conversationsCache = $state>({}) - // Auto-scroll to bottom when messages change - $effect(() => { - const scroll = async () => { - if (messages.length > 0) { - await tick() - scrollToBottom() - } - } - scroll() - }) - // Cleanup EventSource on unmount $effect(() => { return () => { @@ -156,6 +146,14 @@ } } + function scrollToUserMessage(messageId: string) { + if (!messagesContainer) return + const messageElement = messagesContainer.querySelector(`[data-message-id="${messageId}"]`) + if (messageElement) { + messageElement.scrollIntoView({ behavior: 'smooth', block: 'start' }) + } + } + async function pollJobResult(jobId: string, messageId: string) { try { const result = await waitJob(jobId) @@ -270,11 +268,15 @@ message_type: 'assistant', conversation_id: currentConversationId, job_id: '', - loading: true + loading: true, + streaming: useStreaming && path } messages = [...messages, assistantMessage] + await tick() + scrollToUserMessage(userMessage.id) + if (useStreaming && path) { // Close any existing EventSource if (currentEventSource) { @@ -332,7 +334,8 @@ ? { ...msg, content: finalContent, - loading: false + loading: false, + streaming: false } : msg ) @@ -353,7 +356,8 @@ ? { ...msg, content: accumulatedContent || 'Stream error occurred', - loading: false + loading: false, + streaming: false } : msg ) @@ -369,7 +373,8 @@ ? { ...msg, content: 'Failed to connect to stream', - loading: false + loading: false, + streaming: false } : msg ) @@ -384,8 +389,6 @@ } pollJobResult(jobId, assistantMessageId) } - - scrollToBottom() } catch (error) { console.error('Error running flow:', error) sendUserToast('Failed to run flow: ' + error, true) @@ -416,7 +419,7 @@
{#if deploymentInProgress} @@ -433,7 +436,7 @@

Send a message to run the flow and see the results

{:else} -
+
{#each messages as message (message.id)} {/each} diff --git a/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte b/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte index 44f5a0e625ecf..ab92c18ccd16a 100644 --- a/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte +++ b/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte @@ -13,7 +13,10 @@ let { message }: Props = $props() -
+
( db: &Pool, queued_job: &MiniPulledJob, @@ -834,19 +829,26 @@ pub async fn add_completed_job( let value = serde_json::to_value(result.0) .map_err(|e| Error::internal_err(format!("Failed to serialize result: {e}")))?; if chat_input_enabled.unwrap_or(false) { - let content = - if let Ok(wrapper) = serde_json::from_value::(value.clone()) { - // Successfully deserialized to OutputWrapper, use the output field - wrapper.output - } else { - // No string output field, use the whole result - if let serde_json::Value::String(s) = value { - s + let content = match &value { + // If it's an Object with "output" key AND the output is a String, return it + serde_json::Value::Object(map) + if map.contains_key("output") + && matches!(map.get("output"), Some(serde_json::Value::String(_))) => + { + if let Some(serde_json::Value::String(s)) = map.get("output") { + s.clone() } else { - serde_json::to_string_pretty(&value) + // serialize the whole result + serde_json::to_string(&value) .unwrap_or_else(|e| format!("Failed to serialize result: {e}")) } - }; + } + // Otherwise, if the whole value is a String, return it + serde_json::Value::String(s) => s.clone(), + // Otherwise, prettify the whole result + _ => serde_json::to_string(&value) + .unwrap_or_else(|e| format!("Failed to serialize result: {e}")), + }; // Update the assistant message let _ = sqlx::query!( From 61afb82457561b9c551f3fd455505fc9a0b3f00b Mon Sep 17 00:00:00 2001 From: centdix Date: Mon, 6 Oct 2025 14:55:51 +0000 Subject: [PATCH 14/16] cleaning --- backend/windmill-queue/src/jobs.rs | 14 +++++++------- .../flows/conversations/FlowChatMessage.svelte | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index b15cc319cf14e..6dc4aafc5aefa 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -829,24 +829,24 @@ pub async fn add_completed_job( let value = serde_json::to_value(result.0) .map_err(|e| Error::internal_err(format!("Failed to serialize result: {e}")))?; if chat_input_enabled.unwrap_or(false) { - let content = match &value { + let content = match value { // If it's an Object with "output" key AND the output is a String, return it - serde_json::Value::Object(map) + serde_json::Value::Object(mut map) if map.contains_key("output") && matches!(map.get("output"), Some(serde_json::Value::String(_))) => { - if let Some(serde_json::Value::String(s)) = map.get("output") { - s.clone() + if let Some(serde_json::Value::String(s)) = map.remove("output") { + s } else { // serialize the whole result - serde_json::to_string(&value) + serde_json::to_string_pretty(&serde_json::Value::Object(map)) .unwrap_or_else(|e| format!("Failed to serialize result: {e}")) } } // Otherwise, if the whole value is a String, return it - serde_json::Value::String(s) => s.clone(), + serde_json::Value::String(s) => s, // Otherwise, prettify the whole result - _ => serde_json::to_string(&value) + v => serde_json::to_string_pretty(&v) .unwrap_or_else(|e| format!("Failed to serialize result: {e}")), }; diff --git a/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte b/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte index ebb7e36b98b32..9bb8172e2cf54 100644 --- a/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte +++ b/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte @@ -30,7 +30,7 @@ Processing...
{:else if message.content} -
+
Date: Mon, 6 Oct 2025 15:01:36 +0000 Subject: [PATCH 15/16] cleaning --- backend/windmill-queue/src/jobs.rs | 2 +- .../src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 6dc4aafc5aefa..ef15fb9e99ea6 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -839,7 +839,7 @@ pub async fn add_completed_job( s } else { // serialize the whole result - serde_json::to_string_pretty(&serde_json::Value::Object(map)) + serde_json::to_string_pretty(&map) .unwrap_or_else(|e| format!("Failed to serialize result: {e}")) } } diff --git a/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte b/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte index 8c09140928ce8..3eeea26bf57f0 100644 --- a/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte @@ -424,6 +424,8 @@ flowChatInterface.clearMessages() } + flowChatInterface?.focusInput() + return newConversationId } From 9bfee05dbf3ca5387ed18e57fea5c16ca4da2083 Mon Sep 17 00:00:00 2001 From: centdix Date: Mon, 6 Oct 2025 15:02:37 +0000 Subject: [PATCH 16/16] cleaning --- backend/windmill-queue/src/jobs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index ef15fb9e99ea6..d0055bbe3ac69 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -838,7 +838,7 @@ pub async fn add_completed_job( if let Some(serde_json::Value::String(s)) = map.remove("output") { s } else { - // serialize the whole result + // prettify the whole result serde_json::to_string_pretty(&map) .unwrap_or_else(|e| format!("Failed to serialize result: {e}")) }