Skip to content

Commit

Permalink
feature: add webhooks
Browse files Browse the repository at this point in the history
  • Loading branch information
densumesh authored and skeptrunedev committed Nov 19, 2024
1 parent d2d1a83 commit 93c865b
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pdf2md/server/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct ErrorResponseBody {
pub message: String,
}

#[derive(Debug, Display, Clone)]
#[derive(Debug, Display, Clone, Eq, PartialEq)]
pub enum ServiceError {
#[display("Internal Server Error: {_0}")]
InternalServerError(String),
Expand Down
48 changes: 45 additions & 3 deletions pdf2md/server/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct ChunkingTask {
pub id: uuid::Uuid,
pub file_name: String,
pub page_range: (u32, u32),
pub model_params: ModelParams,
pub params: ChunkingParams,
pub attempt_number: u8,
}

Expand Down Expand Up @@ -74,21 +74,63 @@ pub struct UploadFileReqPayload {
pub llm_api_key: Option<String>,
/// The System prompt that will be used for the conversion of the file.
pub system_prompt: Option<String>,
/// Optional webhook URL to receive notifications for each page processed.
pub webhook_url: Option<String>,
/// Optional webhook payload template with placeholder values.
/// Supports the following template variables:
/// - {{status}} : Current status of the processing
/// - {{file_name}} : Original file name
/// - {{result}} : Processing result/output
/// - {{error}} : Error message if any
/// Example: {"status": "{{status}}", "data": {"output": "{{result}}"}}
/// If not provided, the default template will be used.
pub webhook_payload_template: Option<String>,
}

#[derive(Debug)]
pub struct WebhookPayloadData {
pub task_id: String,
pub file_name: String,
pub pages: u32,
pub pages_processed: u32,
pub content: String,
pub metadata: String,
pub status: String,
pub timestamp: String,
}

impl WebhookPayloadData {
pub fn from_tasks(task: FileTaskClickhouse, page: ChunkClickhouse) -> Self {
Self {
task_id: task.id.clone(),
file_name: task.id.clone(),
pages: task.pages,
pages_processed: task.pages_processed,
content: page.content.clone(),
metadata: page.metadata.clone(),
status: task.status,
timestamp: task.created_at.to_string(),
}
}
}

#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)]
pub struct ModelParams {
pub struct ChunkingParams {
pub llm_model: Option<String>,
pub llm_api_key: Option<String>,
pub system_prompt: Option<String>,
pub webhook_url: Option<String>,
pub webhook_payload_template: Option<String>,
}

impl From<UploadFileReqPayload> for ModelParams {
impl From<UploadFileReqPayload> for ChunkingParams {
fn from(payload: UploadFileReqPayload) -> Self {
Self {
llm_model: payload.llm_model,
llm_api_key: payload.llm_api_key,
system_prompt: payload.system_prompt,
webhook_url: payload.webhook_url,
webhook_payload_template: payload.webhook_payload_template,
}
}
}
Expand Down
40 changes: 26 additions & 14 deletions pdf2md/server/src/operators/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{
errors::ServiceError,
models::{ChunkClickhouse, ChunkingTask, FileTaskClickhouse, FileTaskStatus, RedisPool},
models::{
ChunkClickhouse, ChunkingTask, FileTaskClickhouse, FileTaskStatus, RedisPool,
WebhookPayloadData,
},
};

pub async fn insert_task(
Expand Down Expand Up @@ -30,7 +33,7 @@ pub async fn insert_page(
page: ChunkClickhouse,
clickhouse_client: &clickhouse::Client,
redis_pool: &RedisPool,
) -> Result<(), ServiceError> {
) -> Result<WebhookPayloadData, ServiceError> {
let mut page_inserter = clickhouse_client.insert("file_chunks").map_err(|e| {
log::error!("Error getting page_inserter: {:?}", e);
ServiceError::InternalServerError(format!("Error getting page_inserter: {:?}", e))
Expand Down Expand Up @@ -70,24 +73,25 @@ pub async fn insert_page(
prev_task.pages
);

update_task_status(
task.id,
FileTaskStatus::ChunkingFile(total_pages_processed),
clickhouse_client,
)
.await?;
if total_pages_processed >= prev_task.pages {
update_task_status(task.id, FileTaskStatus::Completed, clickhouse_client).await?;
}
let task = if total_pages_processed >= prev_task.pages {
update_task_status(task.id, FileTaskStatus::Completed, clickhouse_client).await?
} else {
update_task_status(
task.id,
FileTaskStatus::ChunkingFile(total_pages_processed),
clickhouse_client,
)
.await?
};

Ok(())
Ok(WebhookPayloadData::from_tasks(task, page))
}

pub async fn update_task_status(
task_id: uuid::Uuid,
status: FileTaskStatus,
clickhouse_client: &clickhouse::Client,
) -> Result<(), ServiceError> {
) -> Result<FileTaskClickhouse, ServiceError> {
let query = match status {
FileTaskStatus::ProcessingFile(pages) => {
format!(
Expand Down Expand Up @@ -129,7 +133,15 @@ pub async fn update_task_status(
ServiceError::BadRequest("Failed to update task status".to_string())
})?;

Ok(())
clickhouse_client
.query("SELECT ?fields FROM file_tasks WHERE id = ?")
.bind(task_id)
.fetch_one()
.await
.map_err(|err| {
log::error!("Failed to get task {:?}", err);
ServiceError::BadRequest("Failed to get task".to_string())
})
}

pub async fn get_task(
Expand Down
1 change: 1 addition & 0 deletions pdf2md/server/src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod clickhouse;
pub mod pdf_chunk;
pub mod redis;
pub mod s3;
pub mod webhook_template;
22 changes: 15 additions & 7 deletions pdf2md/server/src/operators/pdf_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::models::RedisPool;
use crate::{
errors::ServiceError,
get_env,
models::{ChunkClickhouse, ChunkingTask, ModelParams},
operators::clickhouse::insert_page,
models::{ChunkClickhouse, ChunkingParams, ChunkingTask},
operators::{clickhouse::insert_page, webhook_template::send_webhook},
};
use base64::Engine;
use image::{codecs::png::PngEncoder, ImageEncoder};
Expand Down Expand Up @@ -48,7 +48,7 @@ fn get_data_url_from_image(img: DynamicImage) -> Result<String, ServiceError> {
Ok(final_encoded)
}

fn get_llm_client(params: ModelParams) -> Client {
fn get_llm_client(params: ChunkingParams) -> Client {
let base_url = get_env!("LLM_BASE_URL", "LLM_BASE_URL should be set").into();

let llm_api_key: String = params.llm_api_key.unwrap_or(
Expand Down Expand Up @@ -77,7 +77,7 @@ async fn get_pages_from_image(
client: Client,
) -> Result<ChunkClickhouse, ServiceError> {
let llm_model: String = task
.model_params
.params
.llm_model
.unwrap_or(get_env!("LLM_MODEL", "LLM_MODEL should be set").into());

Expand All @@ -86,7 +86,7 @@ async fn get_pages_from_image(
let mut messages = vec![
ChatMessage::System {
content: (ChatMessageContent::Text(
task.model_params
task.params
.system_prompt
.unwrap_or(CHUNK_SYSTEM_PROMPT.to_string()),
)),
Expand Down Expand Up @@ -192,7 +192,7 @@ pub async fn chunk_sub_pages(

let mut result_pages = vec![];

let client = get_llm_client(task.model_params.clone());
let client = get_llm_client(task.params.clone());
let mut prev_md_doc = None;

for (page_image, page_num) in pages.into_iter().zip(task.page_range.0..task.page_range.1) {
Expand All @@ -205,7 +205,15 @@ pub async fn chunk_sub_pages(
)
.await?;
prev_md_doc = Some(page.content.clone());
insert_page(task.clone(), page.clone(), clickhouse_client, redis_pool).await?;

let data = insert_page(task.clone(), page.clone(), clickhouse_client, redis_pool).await?;

send_webhook(
task.params.webhook_url.clone(),
task.params.webhook_payload_template.clone(),
data,
)
.await?;
log::info!("Page {} processed", page_num);

result_pages.push(page);
Expand Down
Loading

0 comments on commit 93c865b

Please sign in to comment.