Skip to content

Commit

Permalink
feature: process 1 page at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
densumesh authored and skeptrunedev committed Nov 19, 2024
1 parent 93c865b commit bbb9d67
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 246 deletions.
8 changes: 5 additions & 3 deletions pdf2md/cli/src/operators/create_task.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use base64::Engine;

pub fn create_task(file: &str, base_url: &str, api_key: &str) {
let file = std::fs::read(file).expect("Failed to read file");
let file = base64::prelude::BASE64_STANDARD.encode(file);
let file_buf = std::fs::read(file).expect("Failed to read file");
let file_base64 = base64::prelude::BASE64_STANDARD.encode(file_buf);

let request = ureq::post(format!("{}/api/task", base_url).as_str())
.set("Content-Type", "application/json")
.set("Authorization", api_key)
.send_json(serde_json::json!({
"base64_file": file,
"base64_file": file_base64,
"file_name": file,
}))
.map_err(|e| e.to_string())
.expect("Failed to send request");

let response: serde_json::Value = request.into_json().expect("Failed to parse response");
Expand Down
5 changes: 1 addition & 4 deletions pdf2md/cli/src/operators/poll_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ pub fn poll_task(task_id: &str, base_url: &str, api_key: &str) {

let response: serde_json::Value = request.into_json().expect("Failed to parse response");

if (response["status"] == "Completed"
|| response["total_document_pages"].as_i64() != Some(0))
&& response["pages"].as_array() != Some(&vec![])
{
if response["status"] == "Completed" || response["status"] == "Failed" {
println!("{}", response);
break;
} else {
Expand Down
37 changes: 0 additions & 37 deletions pdf2md/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pdf2md/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ actix-cors = "0.7.0"
reqwest = "0.12.9"
clickhouse = { version = "0.13.1", features = ["time"] }
chm = "0.1.17"
lopdf = "0.34.0"
base64 = "0.22.1"
pdf2image = "0.1.2"
image = "0.25.5"
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ALTER TABLE file_tasks ADD COLUMN IF NOT EXISTS file_name TEXT NOT NULL;
ALTER TABLE file_tasks ADD COLUMN IF NOT EXISTS file_name String;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
DROP TABLE IF EXISTS file_chunks;
CREATE TABLE IF NOT EXISTS file_chunks (
id String,
task_id String,
content String,
metadata String,
created_at DateTime,
) ENGINE = MergeTree()
ORDER BY (task_id, id)
PARTITION BY
(task_id)
TTL created_at + INTERVAL 30 DAY;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
DROP TABLE IF EXISTS default.file_chunks;
CREATE TABLE IF NOT EXISTS file_chunks (
id String,
task_id String,
content String,
usage String,
page UInt32,
created_at DateTime,
) ENGINE = MergeTree()
ORDER BY (task_id, page, id)
PARTITION BY
(task_id)
TTL created_at + INTERVAL 30 DAY;
17 changes: 11 additions & 6 deletions pdf2md/server/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl TaskMessage for FileTask {
pub struct ChunkingTask {
pub id: uuid::Uuid,
pub file_name: String,
pub page_range: (u32, u32),
pub page_num: u32,
pub params: ChunkingParams,
pub attempt_number: u8,
}
Expand Down Expand Up @@ -94,7 +94,8 @@ pub struct WebhookPayloadData {
pub pages: u32,
pub pages_processed: u32,
pub content: String,
pub metadata: String,
pub page_num: u32,
pub usage: String,
pub status: String,
pub timestamp: String,
}
Expand All @@ -107,7 +108,8 @@ impl WebhookPayloadData {
pages: task.pages,
pages_processed: task.pages_processed,
content: page.content.clone(),
metadata: page.metadata.clone(),
page_num: page.page,
usage: page.usage.clone(),
status: task.status,
timestamp: task.created_at.to_string(),
}
Expand Down Expand Up @@ -151,7 +153,8 @@ pub struct ChunkClickhouse {
pub id: String,
pub task_id: String,
pub content: String,
pub metadata: String,
pub page: u32,
pub usage: String,
#[serde(with = "clickhouse::serde::time::datetime")]
pub created_at: OffsetDateTime,
}
Expand All @@ -161,7 +164,8 @@ pub struct Chunk {
pub id: String,
pub task_id: String,
pub content: String,
pub metadata: serde_json::Value,
pub page_num: u32,
pub usage: serde_json::Value,
pub created_at: String,
}

Expand All @@ -171,7 +175,8 @@ impl From<ChunkClickhouse> for Chunk {
id: c.id,
task_id: c.task_id,
content: c.content,
metadata: serde_json::from_str(&c.metadata).unwrap(),
page_num: c.page,
usage: serde_json::from_str(&c.usage).unwrap(),
created_at: c.created_at.to_string(),
}
}
Expand Down
25 changes: 24 additions & 1 deletion pdf2md/server/src/operators/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ pub async fn insert_page(
);

let task = if total_pages_processed >= prev_task.pages {
update_task_status(
task.id,
FileTaskStatus::ChunkingFile(total_pages_processed),
clickhouse_client,
)
.await?;
update_task_status(task.id, FileTaskStatus::Completed, clickhouse_client).await?
} else {
update_task_status(
Expand Down Expand Up @@ -172,7 +178,7 @@ pub async fn get_task_pages(

let pages: Vec<ChunkClickhouse> = clickhouse_client
.query(
"SELECT ?fields FROM file_chunks WHERE task_id = ? AND id > ? ORDER BY id LIMIT ?",
"SELECT ?fields FROM file_chunks WHERE task_id = ? AND id > ? ORDER BY page LIMIT ?",
)
.bind(task.id.clone())
.bind(offset_id.unwrap_or(uuid::Uuid::nil()))
Expand All @@ -189,3 +195,20 @@ pub async fn get_task_pages(

Ok(vec![])
}

pub async fn get_last_page(
task_id: uuid::Uuid,
clickhouse_client: &clickhouse::Client,
) -> Result<Option<ChunkClickhouse>, ServiceError> {
let page = clickhouse_client
.query("SELECT ?fields FROM file_chunks WHERE task_id = ? ORDER BY created_at DESC LIMIT 1")
.bind(task_id)
.fetch_optional()
.await
.map_err(|err| {
log::error!("Failed to get page {:?}", err);
ServiceError::BadRequest("Failed to get page".to_string())
})?;

Ok(page)
}
74 changes: 36 additions & 38 deletions pdf2md/server/src/operators/pdf_chunk.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::models::RedisPool;
use crate::operators::clickhouse::get_last_page;
use crate::{
errors::ServiceError,
get_env,
Expand All @@ -14,7 +15,7 @@ use openai_dive::v1::{
ChatMessageImageContentPart, ImageUrlType,
},
};
use pdf2image::{image::DynamicImage, PDF};
use pdf2image::image::DynamicImage;
use regex::Regex;
use s3::creds::time::OffsetDateTime;

Expand Down Expand Up @@ -69,7 +70,7 @@ fn get_llm_client(params: ChunkingParams) -> Client {
}
}

async fn get_pages_from_image(
async fn get_markdown_from_image(
img: DynamicImage,
prev_md_doc: Option<String>,
page: u32,
Expand Down Expand Up @@ -150,18 +151,17 @@ async fn get_pages_from_image(
}
};

let mut metadata = serde_json::json!({
"page": page,
});
let mut metadata = serde_json::json!({});
if let Some(usage) = response.usage {
metadata["usage"] = serde_json::json!(usage);
metadata = serde_json::json!(usage);
}

Ok(ChunkClickhouse {
id: uuid::Uuid::new_v4().to_string(),
task_id: task.id.to_string().clone(),
content: format_markdown(&content),
metadata: metadata.to_string(),
page,
usage: metadata.to_string(),
created_at: OffsetDateTime::now_utc(),
})
}
Expand All @@ -181,43 +181,41 @@ pub async fn chunk_sub_pages(
task: ChunkingTask,
clickhouse_client: &clickhouse::Client,
redis_pool: &RedisPool,
) -> Result<Vec<ChunkClickhouse>, ServiceError> {
log::info!("Chunking pages for {:?} size {}", task.id, data.len());
let pdf = PDF::from_bytes(data)
.map_err(|err| ServiceError::BadRequest(format!("Failed to open PDF file {:?}", err)))?;
) -> Result<(), ServiceError> {
log::info!("Chunking page {} for {:?}", task.page_num, task.id);

let pages = pdf
.render(pdf2image::Pages::All, None)
let page = image::load_from_memory_with_format(&data, image::ImageFormat::Jpeg)
.map_err(|err| ServiceError::BadRequest(format!("Failed to render PDF file {:?}", err)))?;

let mut result_pages = vec![];

let client = get_llm_client(task.params.clone());
let mut prev_md_doc = None;

for (page_image, page_num) in pages.into_iter().zip(task.page_range.0..task.page_range.1) {
let page = get_pages_from_image(
page_image,
prev_md_doc,
page_num,
task.clone(),
client.clone(),
)
.await?;
prev_md_doc = Some(page.content.clone());

let data = insert_page(task.clone(), page.clone(), clickhouse_client, redis_pool).await?;
let prev_md_doc = if task.page_num > 1 {
let prev_page = get_last_page(task.id, clickhouse_client).await?;

send_webhook(
task.params.webhook_url.clone(),
task.params.webhook_payload_template.clone(),
data,
)
.await?;
log::info!("Page {} processed", page_num);
prev_page.map(|p| p.content)
} else {
None
};

result_pages.push(page);
}
let page = get_markdown_from_image(
page,
prev_md_doc,
task.page_num,
task.clone(),
client.clone(),
)
.await?;

let data = insert_page(task.clone(), page.clone(), clickhouse_client, redis_pool).await?;

send_webhook(
task.params.webhook_url.clone(),
task.params.webhook_payload_template.clone(),
data,
)
.await?;

log::info!("Page {} processed", task.page_num);

Ok(result_pages)
Ok(())
}
Loading

0 comments on commit bbb9d67

Please sign in to comment.