Skip to content

Commit

Permalink
feature: incrementally add pages
Browse files Browse the repository at this point in the history
  • Loading branch information
densumesh committed Dec 11, 2024
1 parent 07191b4 commit 2cb34d3
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pdf2md/server/src/operators/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub async fn get_task_pages(

let pages: Vec<ChunkClickhouse> = clickhouse_client
.query(
"SELECT ?fields FROM file_chunks WHERE task_id = ? AND id > ? ORDER BY page LIMIT ?",
"SELECT ?fields FROM file_chunks WHERE task_id = ? AND id > ? ORDER BY id LIMIT ?",
)
.bind(task.id.clone())
.bind(offset_id.unwrap_or(uuid::Uuid::nil()))
Expand Down
80 changes: 45 additions & 35 deletions server/src/bin/file-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,18 @@ async fn upload_file(
}
};

let file_size_mb = (file_data.len() as f64 / 1024.0 / 1024.0).round() as i64;
let created_file = create_file_query(
file_id,
file_size_mb,
file_worker_message.upload_file_data.clone(),
file_worker_message.dataset_id,
web_pool.clone(),
)
.await?;

log::info!("Waiting on Task {}", task_id);
#[allow(unused_assignments)]
let mut processed_pages = std::collections::HashSet::new();
let mut chunk_htmls: Vec<String> = vec![];
let mut pagination_token: Option<String> = None;
let mut completed = false;
Expand All @@ -351,7 +361,7 @@ async fn upload_file(
if completed && pagination_token.is_none() {
break;
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

let request = if let Some(pagination_token) = &pagination_token {
log::info!(
"Polling on task {} with pagination token {}",
Expand Down Expand Up @@ -402,47 +412,47 @@ async fn upload_file(
))
})?;

if task_response.status == "Completed" && task_response.pages.is_some() {
completed = true;
pagination_token = task_response.pagination_token.clone();
if let Some(pages) = task_response.pages {
log::info!("Got {} pages from task {}", pages.len(), task_id);
for page in pages {
log::info!(".");
chunk_htmls.push(page.content.clone());
let mut new_chunks = Vec::new();
if let Some(pages) = task_response.pages {
log::info!("Got {} pages from task {}", pages.len(), task_id);

for page in pages {
let page_id = format!("{}", page.page_num);

if !processed_pages.contains(&page_id) {
processed_pages.insert(page_id);
new_chunks.push(page.content.clone());
chunk_htmls.push(page.content);
}
}

continue;
// If we have new chunks, process them
if !new_chunks.is_empty() {
create_file_chunks(
created_file.id,
file_worker_message.upload_file_data.clone(),
new_chunks.clone(),
dataset_org_plan_sub.clone(),
web_pool.clone(),
event_queue.clone(),
redis_conn.clone(),
)
.await?;
}
}

completed = task_response.status == "Completed";
pagination_token = if let Some(token) = task_response.pagination_token.clone() {
Some(token)
} else {
log::info!("Task {} not ready", task_id);
pagination_token
};

if !completed && (pagination_token.is_none() || new_chunks.is_empty()) {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
}

// Poll Chunks from pdf chunks from service
let file_size_mb = (file_data.len() as f64 / 1024.0 / 1024.0).round() as i64;
let created_file = create_file_query(
file_id,
file_size_mb,
file_worker_message.upload_file_data.clone(),
file_worker_message.dataset_id,
web_pool.clone(),
)
.await?;

create_file_chunks(
created_file.id,
file_worker_message.upload_file_data,
chunk_htmls,
dataset_org_plan_sub,
web_pool.clone(),
event_queue.clone(),
redis_conn,
)
.await?;

return Ok(Some(file_id));
}
}
Expand Down

0 comments on commit 2cb34d3

Please sign in to comment.