Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion backend/windmill-api/src/flow_conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn list_conversations(
.offset(offset as i64);

let sql = sqlb.sql().map_err(|e| {
windmill_common::error::Error::InternalErr(format!("Failed to build SQL: {}", e))
windmill_common::error::Error::internal_err(format!("Failed to build SQL: {}", e))
})?;

let conversations = sqlx::query_as::<Postgres, FlowConversation>(&sql)
Expand Down Expand Up @@ -123,6 +123,12 @@ pub async fn get_or_create_conversation_with_id(
return Ok(existing);
}

// Truncate title to 25 char characters max
let title = if title.len() > 25 {
format!("{}...", &title[..25])
} else {
title.to_string()
};
// Create new conversation with provided ID
let conversation = sqlx::query_as!(
FlowConversation,
Expand Down
22 changes: 13 additions & 9 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3942,7 +3942,7 @@ async fn handle_chat_conversation_messages(
w_id: &str,
flow_path: &str,
run_query: &RunJobQuery,
args: &PushArgsOwned,
user_message_raw: Option<&Box<serde_json::value::RawValue>>,
uuid: Uuid,
) -> error::Result<()> {
let memory_id = run_query.memory_id.ok_or_else(|| {
Expand All @@ -3951,21 +3951,25 @@ async fn handle_chat_conversation_messages(
)
})?;

let user_msg_raw = args.args.get("user_message").ok_or_else(|| {
let user_message_raw = user_message_raw.ok_or_else(|| {
windmill_common::error::Error::BadRequest(
"user_message argument is required for chat-enabled flows".to_string(),
)
})?;

let user_msg = serde_json::from_str::<String>(user_msg_raw.get())?;
// Deserialize the RawValue to get the actual string without quotes
let user_message: String = serde_json::from_str(user_message_raw.get())
.map_err(|e| windmill_common::error::Error::BadRequest(
format!("Failed to deserialize user_message: {}", e)
))?;

// Create conversation with provided ID (or get existing one)
flow_conversations::get_or_create_conversation_with_id(
tx,
w_id,
flow_path,
&authed.username,
&user_msg,
&user_message,
memory_id,
)
.await?;
Expand All @@ -3975,7 +3979,7 @@ async fn handle_chat_conversation_messages(
tx,
memory_id,
MessageType::User,
&user_msg,
&user_message,
None, // No job_id for user message
w_id,
)
Expand Down Expand Up @@ -4082,7 +4086,7 @@ pub async fn run_flow_by_path_inner(
apply_preprocessor: !run_query.skip_preprocessor.unwrap_or(false)
&& has_preprocessor.unwrap_or(false),
},
PushArgs { args: &args.args, extra: args.extra.clone() },
PushArgs { args: &args.args, extra: args.extra },
authed.display_username(),
email,
permissioned_as,
Expand Down Expand Up @@ -4120,7 +4124,7 @@ pub async fn run_flow_by_path_inner(
&w_id,
&flow_path.to_string(),
&run_query,
&args,
args.args.get("user_message"),
uuid,
)
.await?;
Expand Down Expand Up @@ -5581,7 +5585,7 @@ pub async fn run_wait_result_flow_by_path_internal(
apply_preprocessor: !run_query.skip_preprocessor.unwrap_or(false)
&& has_preprocessor.unwrap_or(false),
},
PushArgs { args: &args.args, extra: args.extra.clone() },
PushArgs { args: &args.args, extra: args.extra },
authed.display_username(),
email,
permissioned_as,
Expand Down Expand Up @@ -5619,7 +5623,7 @@ pub async fn run_wait_result_flow_by_path_internal(
&w_id,
&flow_path.to_string(),
&run_query,
&args,
args.args.get("user_message"),
uuid,
)
.await?;
Expand Down
1 change: 1 addition & 0 deletions backend/windmill-common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ pub async fn store_pull_query(wc: &WorkerConfig) {

pub const TMP_DIR: &str = "/tmp/windmill";
pub const TMP_LOGS_DIR: &str = concatcp!(TMP_DIR, "/logs");
pub const TMP_MEMORY_DIR: &str = concatcp!(TMP_DIR, "/memory");

pub const HUB_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "hub");

Expand Down
75 changes: 29 additions & 46 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,11 +746,6 @@ lazy_static::lazy_static! {
pub static ref MAX_RESULT_SIZE_MB: usize = std::env::var("MAX_RESULT_SIZE_MB").unwrap_or("500".to_string()).parse().unwrap_or(500);
}

#[derive(Deserialize)]
struct OutputWrapper {
output: String,
}

pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
db: &Pool<Postgres>,
queued_job: &MiniPulledJob,
Expand Down Expand Up @@ -831,53 +826,41 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
// Update conversation message if it's a flow and it's done (both success and error cases)
if !skipped && flow_is_done {
let chat_input_enabled = queued_job.parse_chat_input_enabled();
let value = serde_json::to_value(result.0)
.map_err(|e| Error::internal_err(format!("Failed to serialize result: {e}")))?;
if chat_input_enabled.unwrap_or(false) {
let content = if let Ok(wrapper) = serde_json::from_value::<OutputWrapper>(
serde_json::to_value(result.0).unwrap_or(serde_json::Value::Null),
) {
// Successfully deserialized to OutputWrapper, use the output field
wrapper.output
} else {
// No string output field, use the whole result
serde_json::to_value(result.0)
.ok()
.and_then(|v| {
if let serde_json::Value::String(s) = v {
Some(s)
} else {
serde_json::to_string_pretty(&v).ok()
}
})
.unwrap_or_else(|| {
if success {
"Job completed successfully".to_string()
} else {
"Job failed".to_string()
}
})
let content = match value {
// If it's an Object with "output" key AND the output is a String, return it
serde_json::Value::Object(mut map)
if map.contains_key("output")
&& matches!(map.get("output"), Some(serde_json::Value::String(_))) =>
{
if let Some(serde_json::Value::String(s)) = map.remove("output") {
s
} else {
// prettify the whole result
serde_json::to_string_pretty(&map)
.unwrap_or_else(|e| format!("Failed to serialize result: {e}"))
}
}
// Otherwise, if the whole value is a String, return it
serde_json::Value::String(s) => s,
// Otherwise, prettify the whole result
v => serde_json::to_string_pretty(&v)
.unwrap_or_else(|e| format!("Failed to serialize result: {e}")),
};

// check if flow_conversation_message exists
let flow_conversation_message_exists = sqlx::query_scalar!(
"SELECT EXISTS(SELECT 1 FROM flow_conversation_message WHERE job_id = $1 AND message_type = 'assistant')",
queued_job.id
)
.fetch_one(db)
.await?;

if flow_conversation_message_exists.unwrap_or(false) {
// Update the assistant message using direct DB access
let _ = sqlx::query!(
"UPDATE flow_conversation_message
// Update the assistant message
let _ = sqlx::query!(
"UPDATE flow_conversation_message
SET content = $1
WHERE job_id = $2
",
content,
queued_job.id,
)
.execute(db)
.await;
}
content,
queued_job.id,
)
.execute(db)
.await;
}
}

Expand Down
13 changes: 6 additions & 7 deletions backend/windmill-worker/src/ai_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,12 @@ pub async fn run_agent(
// Take the last n messages
let start_idx = loaded_messages.len().saturating_sub(context_length);
let mut messages_to_load = loaded_messages[start_idx..].to_vec();
let first_non_tool_message_index =
messages_to_load.iter().position(|m| m.role != "tool");

// Remove the first message if its role is "tool" to avoid OpenAI API error
// "messages with role 'tool' must be a response to a preceeding message with 'tool_calls'"
if let Some(first_msg) = messages_to_load.first() {
if first_msg.role == "tool" {
messages_to_load.remove(0);
}
// Remove the first messages if their role is "tool" to avoid OpenAI API error
if let Some(index) = first_non_tool_message_index {
messages_to_load = messages_to_load[index..].to_vec();
}

messages.extend(messages_to_load);
Expand Down Expand Up @@ -934,7 +933,7 @@ pub async fn run_agent(
let (handle_result, updated_occupancy) =
join_handle.await.map_err(|e| {
Error::internal_err(format!(
"Tool execution task panicked: {}",
"Tool execution task failed: {}",
e
))
})?;
Expand Down
8 changes: 3 additions & 5 deletions backend/windmill-worker/src/memory_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use crate::ai::types::OpenAIMessage;
use std::path::PathBuf;
use tokio::{fs, io::AsyncWriteExt};
use uuid::Uuid;
use windmill_common::worker::TMP_LOGS_DIR;
use windmill_common::worker::TMP_MEMORY_DIR;

/// Get the file path for storing memory for a specific AI agent step
pub fn path_for(workspace_id: &str, conversation_id: Uuid, step_id: &str) -> PathBuf {
PathBuf::from(TMP_LOGS_DIR)
.join("memory")
PathBuf::from(TMP_MEMORY_DIR)
.join(workspace_id)
.join(conversation_id.to_string())
.join(format!("{step_id}.json"))
Expand Down Expand Up @@ -61,8 +60,7 @@ pub async fn delete_conversation_from_disk(
workspace_id: &str,
conversation_id: Uuid,
) -> anyhow::Result<()> {
let conversation_path = PathBuf::from(TMP_LOGS_DIR)
.join("memory")
let conversation_path = PathBuf::from(TMP_MEMORY_DIR)
.join(workspace_id)
.join(conversation_id.to_string());

Expand Down
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ services:
condition: service_healthy
volumes:
- worker_logs:/tmp/windmill/logs

logging: *default-logging

windmill_worker:
Expand All @@ -72,6 +73,9 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
- worker_dependency_cache:/tmp/windmill/cache
- worker_logs:/tmp/windmill/logs
# for AI agent memory
- worker_memory:/tmp/windmill/memory

logging: *default-logging

## This worker is specialized for "native" jobs. Native jobs run in-process and thus are much more lightweight than other jobs
Expand Down Expand Up @@ -188,6 +192,7 @@ volumes:
db_data: null
worker_dependency_cache: null
worker_logs: null
worker_memory: null
windmill_index: null
lsp_cache: null
caddy_data: null
2 changes: 1 addition & 1 deletion frontend/src/lib/components/FlowPreviewContent.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import { getStepHistoryLoaderContext } from './stepHistoryLoader.svelte'
import { aiChatManager } from './copilot/chat/AIChatManager.svelte'
import { stateSnapshot } from '$lib/svelte5Utils.svelte'
import FlowChatInterface from './flows/FlowChatInterface.svelte'
import FlowChatInterface from './flows/conversations/FlowChatInterface.svelte'

interface Props {
previewMode: 'upTo' | 'whole'
Expand Down
14 changes: 12 additions & 2 deletions frontend/src/lib/components/details/DetailPageLayout.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import { Tabs, Tab, TabContent } from '$lib/components/common'
import { Pane, Splitpanes } from 'svelte-splitpanes'
import DetailPageDetailPanel from './DetailPageDetailPanel.svelte'
import FlowViewerInner from '../FlowViewerInner.svelte'

interface Props {
isOperator?: boolean
flow_json?: any | undefined
selected: string
forceSmallScreen?: boolean
isChatMode?: boolean
header?: import('svelte').Snippet
form?: import('svelte').Snippet
scriptRender?: import('svelte').Snippet
Expand All @@ -21,6 +23,7 @@
flow_json = undefined,
selected = $bindable(),
forceSmallScreen = false,
isChatMode = false,
header,
form,
scriptRender: script,
Expand Down Expand Up @@ -74,8 +77,10 @@
{@render header?.()}
<div class="grow min-h-0 w-full flex flex-col">
<Tabs bind:selected={mobileTab} wrapperClass="flex-none">
<Tab value="form">Run form</Tab>
<Tab value="saved_inputs">Inputs</Tab>
<Tab value="form">{isChatMode ? 'Chat' : 'Run form'}</Tab>
{#if !isChatMode}
<Tab value="saved_inputs">Inputs</Tab>
{/if}
{#if !isOperator}
<Tab value="triggers">Triggers</Tab>
{/if}
Expand All @@ -97,6 +102,11 @@
<TabContent value="triggers" class="flex flex-col flex-1 h-full mt-[-2px]">
{@render triggers?.()}
</TabContent>
{#if flow_json}
<TabContent value="raw" class="flex flex-col flex-1 h-full overflow-auto p-2">
<FlowViewerInner flow={flow_json} />
</TabContent>
{/if}
<TabContent value="script" class="flex flex-col flex-1 h-full">
{@render script?.()}
</TabContent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import { refreshStateStore } from '$lib/svelte5Utils.svelte'
import type { ScriptLang } from '$lib/gen'
import { deepEqual } from 'fast-equals'
import FlowChatInterface from '../FlowChatInterface.svelte'
import FlowChatInterface from '$lib/components/flows/conversations/FlowChatInterface.svelte'
import Toggle from '$lib/components/Toggle.svelte'
import { AI_AGENT_SCHEMA } from '../flowInfers'
import { nextId } from '../flowModuleNextId'
Expand Down
Loading