Skip to content

Commit

Permalink
feature: add timings to all search and recommendation routes for both…
Browse files Browse the repository at this point in the history
… chunk's and chunk_group's
  • Loading branch information
skeptrunedev committed Apr 9, 2024
1 parent dabdacf commit 7df3f1c
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 142 deletions.
6 changes: 3 additions & 3 deletions server/src/bin/ingestion-microservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use trieve_server::operators::chunk_operator::{
update_chunk_metadata_query,
};
use trieve_server::operators::event_operator::create_event_query;
use trieve_server::operators::model_operator::{create_embeddings, get_splade_embedding};
use trieve_server::operators::model_operator::{create_embeddings, get_sparse_vector};
use trieve_server::operators::parse_operator::{
average_embeddings, coarse_doc_chunker, convert_html_to_text,
};
Expand Down Expand Up @@ -420,7 +420,7 @@ async fn upload_chunk(
};

let splade_vector = if dataset_config.FULLTEXT_ENABLED {
match get_splade_embedding(&content.clone(), "doc").await {
match get_sparse_vector(&content.clone(), "doc").await {
Ok(v) => v,
Err(_) => vec![(0, 0.0)],
}
Expand Down Expand Up @@ -606,7 +606,7 @@ async fn update_chunk(
.map_err(|_| ServiceError::BadRequest("chunk not found".into()))?;

let splade_vector = if server_dataset_config.FULLTEXT_ENABLED {
match get_splade_embedding(&content, "doc").await {
match get_sparse_vector(&content, "doc").await {
Ok(v) => v,
Err(_) => vec![(0, 0.0)],
}
Expand Down
18 changes: 16 additions & 2 deletions server/src/handlers/chunk_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,7 @@ pub async fn search_chunk(
pool,
dataset_org_plan_sub.dataset,
server_dataset_config,
&mut timer,
)
.await?
}
Expand All @@ -1297,6 +1298,7 @@ pub async fn search_chunk(
pool,
dataset_org_plan_sub.dataset,
server_dataset_config,
&mut timer,
)
.await?
}
Expand All @@ -1307,8 +1309,8 @@ pub async fn search_chunk(
page,
pool,
dataset_org_plan_sub.dataset,
&mut timer,
server_dataset_config,
&mut timer,
)
.await?
}
Expand Down Expand Up @@ -1507,6 +1509,10 @@ pub async fn get_recommended_chunks(

let mut positive_qdrant_ids = vec![];

let mut timer = Timer::new();

timer.add("start extending tracking_ids and chunk_ids to qdrant_point_ids");

if let Some(positive_chunk_ids) = positive_chunk_ids {
positive_qdrant_ids.extend(
get_point_ids_from_unified_chunk_ids(
Expand Down Expand Up @@ -1589,6 +1595,8 @@ pub async fn get_recommended_chunks(
)
}

timer.add("finish extending tracking_ids and chunk_ids to qdrant_point_ids; start recommend_qdrant_query");

let recommended_qdrant_point_ids = recommend_qdrant_query(
positive_qdrant_ids,
negative_qdrant_ids,
Expand All @@ -1603,6 +1611,8 @@ pub async fn get_recommended_chunks(
ServiceError::BadRequest(format!("Could not get recommended chunks: {}", err))
})?;

timer.add("finish recommend_qdrant_query; start get_metadata_from_point_ids");

let recommended_chunk_metadatas =
get_metadata_from_point_ids(recommended_qdrant_point_ids, pool)
.await
Expand All @@ -1613,6 +1623,8 @@ pub async fn get_recommended_chunks(
))
})?;

timer.add("finish get_metadata_from_point_ids and return results");

if data.slim_chunks.unwrap_or(false) {
let res = recommended_chunk_metadatas
.into_iter()
Expand All @@ -1622,7 +1634,9 @@ pub async fn get_recommended_chunks(
return Ok(HttpResponse::Ok().json(res));
}

Ok(HttpResponse::Ok().json(recommended_chunk_metadatas))
Ok(HttpResponse::Ok()
.insert_header((Timer::header_key(), timer.header_value()))
.json(recommended_chunk_metadatas))
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
Expand Down
32 changes: 28 additions & 4 deletions server/src/handlers/group_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
};
use actix_web::{web, HttpResponse};
use serde::{Deserialize, Serialize};
use simple_server_timing_header::Timer;
use utoipa::{IntoParams, ToSchema};

#[tracing::instrument(skip(pool))]
Expand Down Expand Up @@ -955,6 +956,10 @@ pub async fn get_recommended_groups(
ServerDatasetConfiguration::from_json(dataset_org_plan_sub.dataset.server_configuration);
let dataset_id = dataset_org_plan_sub.dataset.id;

let mut timer = Timer::new();

timer.add("start to extend qdrant_point_ids for group_tracking_ids and group_ids");

let mut positive_qdrant_ids = vec![];

if let Some(positive_group_ids) = positive_group_ids {
Expand Down Expand Up @@ -1039,6 +1044,8 @@ pub async fn get_recommended_groups(
);
}

timer.add("finish to extend qdrant_point_ids for group_tracking_ids and group_ids; start to recommend_qdrant_groups_query from qdrant");

let recommended_qdrant_point_ids = recommend_qdrant_groups_query(
positive_qdrant_ids,
negative_qdrant_ids,
Expand All @@ -1059,9 +1066,13 @@ pub async fn get_recommended_groups(
total_chunk_pages: (recommended_qdrant_point_ids.len() as f64 / 10.0).ceil() as i64,
};

timer.add("finish to recommend_qdrant_groups_query from qdrant; start to get_metadata_from_groups from postgres");

let recommended_chunk_metadatas =
get_metadata_from_groups(group_query_result, Some(false), pool).await?;

timer.add("finish to get_metadata_from_groups from postgres and return results");

if data.slim_chunks.unwrap_or(false) {
let res = recommended_chunk_metadatas
.into_iter()
Expand All @@ -1075,10 +1086,14 @@ pub async fn get_recommended_groups(
})
.collect::<Vec<GroupSlimChunksDTO>>();

return Ok(HttpResponse::Ok().json(res));
return Ok(HttpResponse::Ok()
.insert_header((Timer::header_key(), timer.header_value()))
.json(res));
}

Ok(HttpResponse::Ok().json(recommended_chunk_metadatas))
Ok(HttpResponse::Ok()
.insert_header((Timer::header_key(), timer.header_value()))
.json(recommended_chunk_metadatas))
}

#[derive(Serialize, Deserialize, Clone, Debug, ToSchema, IntoParams)]
Expand Down Expand Up @@ -1332,6 +1347,8 @@ pub async fn search_over_groups(

let parsed_query = parse_query(data.query.clone());

let mut timer = Timer::new();

let result_chunks = match data.search_type.as_str() {
"fulltext" => {
if !server_dataset_config.FULLTEXT_ENABLED {
Expand All @@ -1348,6 +1365,7 @@ pub async fn search_over_groups(
pool,
dataset_org_plan_sub.dataset,
server_dataset_config,
&mut timer,
)
.await?
}
Expand All @@ -1359,6 +1377,7 @@ pub async fn search_over_groups(
pool,
dataset_org_plan_sub.dataset,
server_dataset_config,
&mut timer,
)
.await?
}
Expand All @@ -1370,6 +1389,7 @@ pub async fn search_over_groups(
pool,
dataset_org_plan_sub.dataset,
server_dataset_config,
&mut timer,
)
.await?
}
Expand All @@ -1394,8 +1414,12 @@ pub async fn search_over_groups(
total_chunk_pages: result_chunks.total_chunk_pages,
};

return Ok(HttpResponse::Ok().json(res));
return Ok(HttpResponse::Ok()
.insert_header((Timer::header_key(), timer.header_value()))
.json(res));
}

Ok(HttpResponse::Ok().json(result_chunks))
Ok(HttpResponse::Ok()
.insert_header((Timer::header_key(), timer.header_value()))
.json(result_chunks))
}
2 changes: 1 addition & 1 deletion server/src/operators/model_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub struct CustomSparseEmbedData {
}

#[tracing::instrument]
pub async fn get_splade_embedding(
pub async fn get_sparse_vector(
message: &str,
embed_type: &str,
) -> Result<Vec<(u32, f32)>, ServiceError> {
Expand Down
Loading

0 comments on commit 7df3f1c

Please sign in to comment.