+ }
+ tooltipText="System prompt to use when chunking. This is an optional field which allows you to specify the system prompt to use when chunking the text. If not specified, the default system prompt is used. However, you may want to use a different system prompt."
+ />
+
+
diff --git a/pdf2md/server/src/operators/pdf_chunk.rs b/pdf2md/server/src/operators/pdf_chunk.rs
index c6423a6cd6..72c182508d 100644
--- a/pdf2md/server/src/operators/pdf_chunk.rs
+++ b/pdf2md/server/src/operators/pdf_chunk.rs
@@ -20,9 +20,17 @@ use regex::Regex;
use s3::creds::time::OffsetDateTime;
const CHUNK_SYSTEM_PROMPT: &str = "
- Convert the following PDF page to markdown.
- Return only the markdown with no explanation text.
- Do not exclude any content from the page.";
+Convert this PDF page to markdown formatting, following these requirements:
+
+1. Break the content into logical sections with clear markdown headings (# for main sections, ## for subsections, etc.)
+2. Create section headers that accurately reflect the content and hierarchy of each part
+3. Include all body content from the page
+4. Exclude any PDF headers and footers
+5. Return only the formatted markdown without any explanatory text
+6. Match the original document's content organization but with explicit markdown structure
+
+Please provide the markdown version using this structured approach.
+";
fn get_data_url_from_image(img: DynamicImage) -> Result {
let mut encoded = Vec::new();
@@ -108,7 +116,7 @@ async fn get_markdown_from_image(
if let Some(prev_md_doc) = prev_md_doc {
let prev_md_doc_message = ChatMessage::System {
content: ChatMessageContent::Text(format!(
- "Markdown must maintain consistent formatting with the following page: \n\n {}",
+ "Markdown must maintain consistent formatting with the following page, DO NOT INCLUDE CONTENT FROM THIS PAGE IN YOUR RESPONSE: \n\n {}",
prev_md_doc
)),
name: None,
diff --git a/server/src/bin/file-worker.rs b/server/src/bin/file-worker.rs
index 532620fc0c..37557438bb 100644
--- a/server/src/bin/file-worker.rs
+++ b/server/src/bin/file-worker.rs
@@ -7,7 +7,7 @@ use std::sync::{
Arc,
};
use trieve_server::{
- data::models::{self, FileWorkerMessage},
+ data::models::{self, ChunkGroup, FileWorkerMessage},
errors::ServiceError,
establish_connection, get_env,
handlers::chunk_handler::ChunkReqPayload,
@@ -17,9 +17,23 @@ use trieve_server::{
file_operator::{
create_file_chunks, create_file_query, get_aws_bucket, preprocess_file_to_chunks,
},
+ group_operator::{create_group_from_file_query, create_groups_query},
},
};
+const HEADING_CHUNKING_SYSTEM_PROMPT: &str = "
+Analyze this PDF page and restructure it into clear markdown sections based on the content topics. For each distinct topic or theme discussed:
+
+1. Create a meaningful section heading using markdown (# for main topics, ## for subtopics)
+2. Group related content under each heading
+3. Break up dense paragraphs into more readable chunks where appropriate
+4. Maintain the key information but organize it by subject matter
+5. Skip headers, footers, and page numbers
+6. Focus on semantic organization rather than matching the original layout
+
+Please provide just the reorganized markdown without any explanatory text
+";
+
fn main() {
dotenvy::dotenv().ok();
env_logger::builder()
@@ -299,6 +313,70 @@ async fn upload_file(
)
.await?;
+ let file_size_mb = (file_data.len() as f64 / 1024.0 / 1024.0).round() as i64;
+
+ let created_file = create_file_query(
+ file_id,
+ file_size_mb,
+ file_worker_message.upload_file_data.clone(),
+ file_worker_message.dataset_id,
+ web_pool.clone(),
+ )
+ .await?;
+
+ let group_id = if !file_worker_message
+ .upload_file_data
+ .pdf2md_options
+ .as_ref()
+ .is_some_and(|options| options.split_headings.unwrap_or(false))
+ {
+ let chunk_group = ChunkGroup::from_details(
+ Some(file_worker_message.upload_file_data.file_name.clone()),
+ file_worker_message.upload_file_data.description.clone(),
+ dataset_org_plan_sub.dataset.id,
+ file_worker_message
+ .upload_file_data
+ .group_tracking_id
+ .clone(),
+ None,
+ file_worker_message
+ .upload_file_data
+ .tag_set
+ .clone()
+ .map(|tag_set| tag_set.into_iter().map(Some).collect()),
+ );
+
+ let chunk_group_option = create_groups_query(vec![chunk_group], true, web_pool.clone())
+ .await
+ .map_err(|e| {
+ log::error!("Could not create group {:?}", e);
+ ServiceError::BadRequest("Could not create group".to_string())
+ })?
+ .pop();
+
+ let chunk_group = match chunk_group_option {
+ Some(group) => group,
+ None => {
+ return Err(ServiceError::BadRequest(
+ "Could not create group from file".to_string(),
+ ));
+ }
+ };
+
+ let group_id = chunk_group.id;
+
+ create_group_from_file_query(group_id, created_file.id, web_pool.clone())
+ .await
+ .map_err(|e| {
+ log::error!("Could not create group from file {:?}", e);
+ e
+ })?;
+
+ Some(group_id)
+ } else {
+ None
+ };
+
if file_name.ends_with(".pdf")
&& file_worker_message
.upload_file_data
@@ -330,6 +408,19 @@ async fn upload_file(
json_value["system_prompt"] = serde_json::json!(system_prompt);
}
+ if file_worker_message
+ .upload_file_data
+ .pdf2md_options
+ .as_ref()
+ .is_some_and(|options| options.split_headings.unwrap_or(false))
+ {
+ json_value["system_prompt"] = serde_json::json!(format!(
+ "{}\n\n{}",
+ json_value["system_prompt"].as_str().unwrap_or(""),
+ HEADING_CHUNKING_SYSTEM_PROMPT
+ ));
+ }
+
log::info!("Sending file to pdf2md");
let pdf2md_response = pdf2md_client
.post(format!("{}/api/task", pdf2md_url))
@@ -356,16 +447,6 @@ async fn upload_file(
}
};
- let file_size_mb = (file_data.len() as f64 / 1024.0 / 1024.0).round() as i64;
- let created_file = create_file_query(
- file_id,
- file_size_mb,
- file_worker_message.upload_file_data.clone(),
- file_worker_message.dataset_id,
- web_pool.clone(),
- )
- .await?;
-
log::info!("Waiting on Task {}", task_id);
let mut processed_pages = std::collections::HashSet::new();
let mut pagination_token: Option = None;
@@ -481,6 +562,7 @@ async fn upload_file(
file_worker_message.upload_file_data.clone(),
new_chunks.clone(),
dataset_org_plan_sub.clone(),
+ group_id,
web_pool.clone(),
event_queue.clone(),
redis_conn.clone(),
@@ -545,17 +627,6 @@ async fn upload_file(
));
}
- let file_size_mb = (file_data.len() as f64 / 1024.0 / 1024.0).round() as i64;
-
- let created_file = create_file_query(
- file_id,
- file_size_mb,
- file_worker_message.upload_file_data.clone(),
- file_worker_message.dataset_id,
- web_pool.clone(),
- )
- .await?;
-
if file_worker_message
.upload_file_data
.create_chunks
@@ -611,6 +682,7 @@ async fn upload_file(
file_worker_message.upload_file_data,
chunks,
dataset_org_plan_sub,
+ group_id,
web_pool.clone(),
event_queue.clone(),
redis_conn,
diff --git a/server/src/lib.rs b/server/src/lib.rs
index a5ecfa173c..3cbc8778ba 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -357,6 +357,7 @@ impl Modify for SecurityAddon {
handlers::file_handler::CreatePresignedUrlForCsvJsonlReqPayload,
handlers::file_handler::CreatePresignedUrlForCsvJsonResponseBody,
handlers::file_handler::UploadHtmlPageReqPayload,
+ handlers::file_handler::Pdf2MdOptions,
handlers::invitation_handler::InvitationData,
handlers::event_handler::GetEventsData,
handlers::organization_handler::CreateOrganizationReqPayload,
diff --git a/server/src/operators/file_operator.rs b/server/src/operators/file_operator.rs
index d99d1ff30b..a5bf10649e 100644
--- a/server/src/operators/file_operator.rs
+++ b/server/src/operators/file_operator.rs
@@ -122,59 +122,146 @@ pub fn preprocess_file_to_chunks(
Ok(chunk_htmls)
}
+pub fn split_markdown_by_headings(markdown_text: &str) -> Vec {
+ let lines: Vec<&str> = markdown_text
+ .trim()
+ .lines()
+ .filter(|x| !x.trim().is_empty())
+ .collect();
+ let mut chunks = Vec::new();
+ let mut current_content = Vec::new();
+ let mut pending_heading: Option = None;
+
+ fn is_heading(line: &str) -> bool {
+ line.trim().starts_with('#')
+ }
+
+ fn save_chunk(chunks: &mut Vec, content: &[String]) {
+ if !content.is_empty() {
+ chunks.push(content.join("\n").trim().to_string());
+ }
+ }
+
+ for (i, line) in lines.iter().enumerate() {
+ if is_heading(line) {
+ if !current_content.is_empty() {
+ save_chunk(&mut chunks, ¤t_content);
+ current_content.clear();
+ }
+
+ if i + 1 < lines.len() && !is_heading(lines[i + 1]) {
+ if let Some(heading) = pending_heading.take() {
+ current_content.push(heading);
+ }
+ current_content.push(line.to_string());
+ } else {
+ pending_heading = Some(line.to_string());
+ }
+ } else if !line.trim().is_empty() || !current_content.is_empty() {
+ current_content.push(line.to_string());
+ }
+ }
+
+ if !current_content.is_empty() {
+ save_chunk(&mut chunks, ¤t_content);
+ }
+
+ if let Some(heading) = pending_heading {
+ chunks.push(heading);
+ }
+
+ if chunks.is_empty() && !lines.is_empty() {
+ chunks.push(lines.join("\n").trim().to_string());
+ }
+
+ chunks
+}
+
#[allow(clippy::too_many_arguments)]
pub async fn create_file_chunks(
created_file_id: uuid::Uuid,
upload_file_data: UploadFileReqPayload,
mut chunks: Vec,
dataset_org_plan_sub: DatasetAndOrgWithSubAndPlan,
+ group_id: Option,
pool: web::Data,
event_queue: web::Data,
mut redis_conn: MultiplexedConnection,
) -> Result<(), ServiceError> {
let name = upload_file_data.file_name.clone();
- let chunk_group = ChunkGroup::from_details(
- Some(name.clone()),
- upload_file_data.description.clone(),
- dataset_org_plan_sub.dataset.id,
- upload_file_data.group_tracking_id.clone(),
- None,
- upload_file_data
- .tag_set
- .clone()
- .map(|tag_set| tag_set.into_iter().map(Some).collect()),
- );
-
- let chunk_group_option = create_groups_query(vec![chunk_group], true, pool.clone())
- .await
- .map_err(|e| {
- log::error!("Could not create group {:?}", e);
- ServiceError::BadRequest("Could not create group".to_string())
- })?
- .pop();
-
- let chunk_group = match chunk_group_option {
- Some(group) => group,
- None => {
- return Err(ServiceError::BadRequest(
- "Could not create group from file".to_string(),
- ));
+ if upload_file_data
+ .pdf2md_options
+ .is_some_and(|x| x.split_headings.unwrap_or(false))
+ {
+ let mut new_chunks = Vec::new();
+
+ for chunk in chunks {
+ let chunk_group = ChunkGroup::from_details(
+ Some(format!(
+ "{}-page-{}",
+ name,
+ chunk.metadata.as_ref().unwrap_or(&serde_json::json!({
+ "page_num": 0
+ }))["page_num"]
+ .as_i64()
+ .unwrap_or(0)
+ )),
+ upload_file_data.description.clone(),
+ dataset_org_plan_sub.dataset.id,
+ upload_file_data.group_tracking_id.clone(),
+ chunk.metadata.clone(),
+ upload_file_data
+ .tag_set
+ .clone()
+ .map(|tag_set| tag_set.into_iter().map(Some).collect()),
+ );
+
+ let chunk_group_option = create_groups_query(vec![chunk_group], true, pool.clone())
+ .await
+ .map_err(|e| {
+ log::error!("Could not create group {:?}", e);
+ ServiceError::BadRequest("Could not create group".to_string())
+ })?
+ .pop();
+
+ let chunk_group = match chunk_group_option {
+ Some(group) => group,
+ None => {
+ return Err(ServiceError::BadRequest(
+ "Could not create group from file".to_string(),
+ ));
+ }
+ };
+
+ let group_id = chunk_group.id;
+
+ create_group_from_file_query(group_id, created_file_id, pool.clone())
+ .await
+ .map_err(|e| {
+ log::error!("Could not create group from file {:?}", e);
+ e
+ })?;
+
+ let split_chunks =
+ split_markdown_by_headings(chunk.chunk_html.as_ref().unwrap_or(&String::new()));
+
+ for (i, split_chunk) in split_chunks.into_iter().enumerate() {
+ new_chunks.push(ChunkReqPayload {
+ chunk_html: Some(split_chunk),
+ tracking_id: chunk.tracking_id.clone().map(|x| format!("{}-{}", x, i)),
+ group_ids: Some(vec![group_id]),
+ ..chunk.clone()
+ });
+ }
}
- };
-
- let group_id = chunk_group.id;
- chunks.iter_mut().for_each(|chunk| {
- chunk.group_ids = Some(vec![group_id]);
- });
-
- create_group_from_file_query(group_id, created_file_id, pool.clone())
- .await
- .map_err(|e| {
- log::error!("Could not create group from file {:?}", e);
- e
- })?;
+ chunks = new_chunks;
+ } else {
+ chunks.iter_mut().for_each(|chunk| {
+ chunk.group_ids = group_id.map(|id| vec![id]);
+ });
+ }
let chunk_count = get_row_count_for_organization_id_query(
dataset_org_plan_sub.organization.organization.id,