Skip to content

Commit

Permalink
cleanup: only make one server call per batch for groups in ingest-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
skeptrunedev authored and cdxker committed Dec 10, 2024
1 parent 9bcd16c commit 582e73b
Showing 1 changed file with 26 additions and 15 deletions.
41 changes: 26 additions & 15 deletions server/src/bin/ingestion-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,17 @@ pub async fn bulk_upload_chunks(
.iter()
.any(|message| message.upsert_by_tracking_id);

let all_group_ids = group_tracking_ids_to_group_ids
.values()
.copied()
.collect::<Vec<uuid::Uuid>>();
let all_groups = if all_group_ids.is_empty() {
vec![]
} else {
log::info!("Getting all groups for {:?} group_ids", all_group_ids.len());
get_groups_from_group_ids_query(all_group_ids, web_pool.clone()).await?
};

let ingestion_data: Vec<ChunkData> = payload
.ingestion_messages
.iter()
Expand Down Expand Up @@ -721,24 +732,24 @@ pub async fn bulk_upload_chunks(
}

let group_tag_set: Option<Vec<Option<String>>> = if !qdrant_only {
// ENSURE chunk_data.group_ids is accruate
if let Some(ref group_ids) = chunk_data.group_ids {
log::info!(
"Getting group tags for chunk with group_ids: {:?}",
group_ids
);
Some(
get_groups_from_group_ids_query(group_ids.clone(), web_pool.clone())
.await?
chunk_data
.group_ids
.as_ref()
.filter(|group_ids| !group_ids.is_empty())
.map(|group_ids| {
all_groups
.iter()
.filter_map(|group| group.tag_set.clone())
.filter_map(|group| {
if group_ids.contains(&group.id) {
group.tag_set.clone()
} else {
None
}
})
.flatten()
.dedup()
.collect(),
)
} else {
None
}
.collect()
})
} else {
None
};
Expand Down

0 comments on commit 582e73b

Please sign in to comment.