Skip to content

Commit

Permalink
feature: finish implementing QDRANT_ONLY mode with chunk_count and clear
Browse files Browse the repository at this point in the history
functionality preserved
  • Loading branch information
skeptrunedev authored and densumesh committed Nov 24, 2024
1 parent de775d2 commit 0eee2f8
Show file tree
Hide file tree
Showing 13 changed files with 679 additions and 267 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"name": "Debug executable 'trieve-server'",
"cargo": {
"args": [
"+nightly",
"+default",
"build",
"--manifest-path=./server/Cargo.toml",
"--bin=trieve-server",
Expand Down
1 change: 1 addition & 0 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ rust-argon2 = "2"
serde_json = { version = "1" }
serde = { version = "1" }
time = { version = "0.3" }
uuid = { version = "1", features = ["v4", "serde"] }
uuid = { version = "1", features = ["v4", "serde", "v5"] }
diesel_migrations = { version = "2.0" }
regex = "1.7.3"
openai_dive = { git = "https://github.com/devflowinc/openai-client.git", branch = "bugfix/parallel-tool-calls-public", features = ["stream"] }
Expand Down
25 changes: 24 additions & 1 deletion server/src/bin/delete-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,29 @@ pub async fn delete_or_clear_dataset(

if delete_worker_message.empty_dataset {
log::info!("Clearing dataset {:?}", delete_worker_message.dataset_id);

if dataset_config.QDRANT_ONLY {
bulk_delete_chunks_query(
None,
delete_worker_message.deleted_at,
delete_worker_message.dataset_id,
dataset_config.clone(),
web_pool.clone(),
)
.await
.map_err(|err| {
log::error!("Failed to bulk delete chunks: {:?}", err);
err
})?;

log::info!(
"Bulk deleted chunks for dataset: {:?}",
delete_worker_message.dataset_id
);

return Ok(());
}

clear_dataset_query(
delete_worker_message.dataset_id,
delete_worker_message.deleted_at,
Expand Down Expand Up @@ -412,7 +435,7 @@ pub async fn bulk_delete_chunks(
let dataset_config = DatasetConfiguration::from_json(dataset.server_configuration);

bulk_delete_chunks_query(
chunk_delete_message.filter,
Some(chunk_delete_message.filter),
chunk_delete_message.deleted_at,
chunk_delete_message.dataset_id,
dataset_config,
Expand Down
150 changes: 88 additions & 62 deletions server/src/bin/ingestion-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use trieve_server::handlers::group_handler::dataset_owns_group;
use trieve_server::operators::chunk_operator::{
bulk_insert_chunk_metadata_query, bulk_revert_insert_chunk_metadata_query,
get_row_count_for_organization_id_query, insert_chunk_boost, insert_chunk_metadata_query,
update_chunk_boost_query, update_chunk_metadata_query,
update_chunk_boost_query, update_chunk_metadata_query, update_dataset_chunk_count,
};
use trieve_server::operators::clickhouse_operator::{ClickHouseEvent, EventQueue};
use trieve_server::operators::dataset_operator::{
Expand Down Expand Up @@ -567,9 +567,9 @@ pub async fn bulk_upload_chunks(
"calling_BULK_insert_chunk_metadata_query",
);

let only_insert_qdrant = std::env::var("ONLY_INSERT_QDRANT").unwrap_or("false".to_string());
let only_insert_qdrant = dataset_config.QDRANT_ONLY;

let inserted_chunk_metadatas = if only_insert_qdrant == "true" {
let inserted_chunk_metadatas = if only_insert_qdrant {
ingestion_data
.clone()
.into_iter()
Expand Down Expand Up @@ -733,7 +733,13 @@ pub async fn bulk_upload_chunks(
))
.then(
|(chunk_data, embedding_vector, splade_vector, bm25_vector)| async {
let qdrant_point_id = chunk_data.chunk_metadata.qdrant_point_id;
let mut qdrant_point_id = chunk_data.chunk_metadata.qdrant_point_id;
if only_insert_qdrant {
if let Some(tracking_id) = chunk_data.clone().chunk_metadata.tracking_id {
qdrant_point_id =
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, tracking_id.as_bytes());
}
}

let chunk_tags: Option<Vec<Option<String>>> =
if let Some(ref group_ids) = chunk_data.group_ids {
Expand Down Expand Up @@ -789,7 +795,6 @@ pub async fn bulk_upload_chunks(
);
}

// If qdrant_point_id does not exist, does not get written to qdrant
Ok(PointStruct::new(
qdrant_point_id.to_string(),
vector_payload,
Expand All @@ -816,18 +821,31 @@ pub async fn bulk_upload_chunks(
"calling_BULK_create_new_qdrant_points_query",
);

let create_point_result =
let create_point_result: Result<(), ServiceError> =
bulk_upsert_qdrant_points_query(qdrant_points, dataset_config.clone()).await;

insert_tx.finish();

if let Err(err) = create_point_result {
if !upsert_by_tracking_id_being_used {
bulk_revert_insert_chunk_metadata_query(inserted_chunk_metadata_ids, web_pool.clone())
if !only_insert_qdrant {
if let Err(err) = create_point_result {
if !upsert_by_tracking_id_being_used {
bulk_revert_insert_chunk_metadata_query(
inserted_chunk_metadata_ids,
web_pool.clone(),
)
.await?;
}
}

return Err(err);
return Err(err);
}
} else {
create_point_result?;
update_dataset_chunk_count(
payload.dataset_id,
inserted_chunk_metadata_ids.len() as i32,
web_pool.clone(),
)
.await?;
}

Ok(inserted_chunk_metadata_ids)
Expand All @@ -841,14 +859,16 @@ async fn upload_chunk(
web_pool: actix_web::web::Data<models::Pool>,
reqwest_client: reqwest::Client,
) -> Result<uuid::Uuid, ServiceError> {
let tx_ctx = sentry::TransactionContext::new(
"ingestion worker upload_chunk",
"ingestion worker upload_chunk",
);
let transaction = sentry::start_transaction(tx_ctx);
sentry::configure_scope(|scope| scope.set_span(Some(transaction.clone().into())));

let dataset_id = payload.dataset_id;
let qdrant_only = dataset_config.QDRANT_ONLY;
let mut qdrant_point_id = uuid::Uuid::new_v4();
if qdrant_only {
if let Some(tracking_id) = payload.chunk.tracking_id.clone() {
qdrant_point_id =
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, tracking_id.as_bytes());
}
}

let content = match payload.chunk.convert_html_to_text.unwrap_or(true) {
true => convert_html_to_text(&(payload.chunk.chunk_html.clone().unwrap_or_default())),
false => payload.chunk.chunk_html.clone().unwrap_or_default(),
Expand Down Expand Up @@ -1015,44 +1035,50 @@ async fn upload_chunk(

//if collision is not nil, insert chunk with collision
let chunk_metadata_id = {
let original_id = payload.ingest_specific_chunk_metadata.id;
let mut inserted_chunk_id = original_id;
payload.ingest_specific_chunk_metadata.qdrant_point_id = qdrant_point_id;

let insert_tx = transaction.start_child(
"calling_insert_chunk_metadata_query",
"calling_insert_chunk_metadata_query",
);

let inserted_chunk = insert_chunk_metadata_query(
chunk_metadata.clone(),
payload.chunk.group_ids.clone(),
payload.dataset_id,
payload.upsert_by_tracking_id,
web_pool.clone(),
)
.await?;

if payload.chunk.fulltext_boost.is_some() || payload.chunk.semantic_boost.is_some() {
insert_chunk_boost(
ChunkBoost {
chunk_id: inserted_chunk.id,
fulltext_boost_phrase: payload.chunk.fulltext_boost.clone().map(|x| x.phrase),
fulltext_boost_factor: payload.chunk.fulltext_boost.map(|x| x.boost_factor),
semantic_boost_phrase: payload.chunk.semantic_boost.clone().map(|x| x.phrase),
semantic_boost_factor: payload
.chunk
.semantic_boost
.map(|x| x.distance_factor as f64),
},
let group_tag_set = if qdrant_only {
None
} else {
let inserted_chunk = insert_chunk_metadata_query(
chunk_metadata.clone(),
payload.chunk.group_ids.clone(),
payload.dataset_id,
payload.upsert_by_tracking_id,
web_pool.clone(),
)
.await?;
}

insert_tx.finish();
inserted_chunk_id = inserted_chunk.id;

if payload.chunk.fulltext_boost.is_some() || payload.chunk.semantic_boost.is_some() {
insert_chunk_boost(
ChunkBoost {
chunk_id: inserted_chunk.id,
fulltext_boost_phrase: payload
.chunk
.fulltext_boost
.clone()
.map(|x| x.phrase),
fulltext_boost_factor: payload.chunk.fulltext_boost.map(|x| x.boost_factor),
semantic_boost_phrase: payload
.chunk
.semantic_boost
.clone()
.map(|x| x.phrase),
semantic_boost_factor: payload
.chunk
.semantic_boost
.map(|x| x.distance_factor as f64),
},
web_pool.clone(),
)
.await?;
}

qdrant_point_id = inserted_chunk.qdrant_point_id;
qdrant_point_id = inserted_chunk.qdrant_point_id;

let chunk_tags: Option<Vec<Option<String>>> =
if let Some(ref group_ids) = payload.chunk.group_ids {
Some(
get_groups_from_group_ids_query(group_ids.clone(), web_pool.clone())
Expand All @@ -1065,10 +1091,11 @@ async fn upload_chunk(
)
} else {
None
};
}
};

let qdrant_payload =
QdrantPayload::new(chunk_metadata, payload.chunk.group_ids, None, chunk_tags);
QdrantPayload::new(chunk_metadata, payload.chunk.group_ids, None, group_tag_set);

let vector_name = match &embedding_vector {
Some(embedding_vector) => match embedding_vector.len() {
Expand Down Expand Up @@ -1109,28 +1136,27 @@ async fn upload_chunk(
vector_payload,
qdrant_payload,
);
let insert_tx = transaction.start_child(
"calling_bulk_create_new_qdrant_points_query",
"calling_bulk_create_new_qdrant_points_query",
);

if let Err(e) = bulk_upsert_qdrant_points_query(vec![point], dataset_config).await {
let upsert_qdrant_point_result =
bulk_upsert_qdrant_points_query(vec![point], dataset_config).await;

if let Err(e) = upsert_qdrant_point_result {
log::error!("Failed to create qdrant point: {:?}", e);

if payload.upsert_by_tracking_id {
bulk_revert_insert_chunk_metadata_query(vec![inserted_chunk.id], web_pool.clone())
if !qdrant_only && (payload.upsert_by_tracking_id || original_id == inserted_chunk_id) {
bulk_revert_insert_chunk_metadata_query(vec![inserted_chunk_id], web_pool.clone())
.await?;
}

return Err(e);
};
if qdrant_only {
update_dataset_chunk_count(dataset_id, 1_i32, web_pool.clone()).await?;
}

insert_tx.finish();

inserted_chunk.id
inserted_chunk_id
};

transaction.finish();
Ok(chunk_metadata_id)
}

Expand Down
Loading

0 comments on commit 0eee2f8

Please sign in to comment.