Skip to content

Commit

Permalink
feature: include dataset tracking_ids on get top datasetse query
Browse files Browse the repository at this point in the history
  • Loading branch information
skeptrunedev committed Aug 26, 2024
1 parent d60c164 commit 632f29d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 23 deletions.
4 changes: 3 additions & 1 deletion server/src/data/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4203,10 +4203,11 @@ impl SearchAnalyticsFilter {
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct TopDatasetsResponse {
pub dataset_id: uuid::Uuid,
pub dataset_tracking_id: Option<String>,
pub total_queries: i64,
}

#[derive(Debug, Serialize, Deserialize, ToSchema, Row)]
#[derive(Debug, Serialize, Deserialize, ToSchema, Row, Clone)]
pub struct TopDatasetsResponseClickhouse {
#[serde(with = "clickhouse::serde::uuid")]
pub dataset_id: uuid::Uuid,
Expand All @@ -4217,6 +4218,7 @@ impl From<TopDatasetsResponseClickhouse> for TopDatasetsResponse {
fn from(clickhouse_response: TopDatasetsResponseClickhouse) -> TopDatasetsResponse {
TopDatasetsResponse {
dataset_id: uuid::Uuid::from_bytes(*clickhouse_response.dataset_id.as_bytes()),
dataset_tracking_id: None,
total_queries: clickhouse_response.total_queries,
}
}
Expand Down
3 changes: 2 additions & 1 deletion server/src/handlers/analytics_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,10 @@ pub async fn get_top_datasets(
_user: AdminOnly,
data: web::Json<GetTopDatasetsRequestBody>,
clickhouse_client: web::Data<clickhouse::Client>,
pool: web::Data<Pool>,
) -> Result<HttpResponse, ServiceError> {
let top_datasets =
get_top_datasets_query(data.into_inner(), clickhouse_client.get_ref()).await?;
get_top_datasets_query(data.into_inner(), clickhouse_client.get_ref(), pool).await?;

Ok(HttpResponse::Ok().json(top_datasets))
}
52 changes: 40 additions & 12 deletions server/src/operators/analytics_operator.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
use actix_web::web;
use futures::future::join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use ureq::json;
use utoipa::ToSchema;

use super::chunk_operator::get_metadata_from_tracking_id_query;
use crate::{
data::models::{
ClusterAnalyticsFilter, ClusterTopicsClickhouse, DatasetAnalytics, Granularity,
Expand All @@ -26,8 +20,14 @@ use crate::{
CTRDataRequestBody, GetTopDatasetsRequestBody, RateQueryRequest,
},
};

use super::chunk_operator::get_metadata_from_tracking_id_query;
use actix_web::web;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use futures::future::join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use ureq::json;
use utoipa::ToSchema;

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct SearchClusterResponse {
Expand Down Expand Up @@ -1359,7 +1359,10 @@ pub async fn set_query_rating_query(
pub async fn get_top_datasets_query(
data: GetTopDatasetsRequestBody,
clickhouse_client: &clickhouse::Client,
pool: web::Data<Pool>,
) -> Result<Vec<TopDatasetsResponse>, ServiceError> {
use crate::data::schema::datasets::dsl as datasets_columns;

let mut query_string = format!(
"SELECT
dataset_id,
Expand Down Expand Up @@ -1393,7 +1396,7 @@ pub async fn get_top_datasets_query(
LIMIT 10",
);

let clickhouse_query = clickhouse_client
let clickhouse_resp_data = clickhouse_client
.query(query_string.as_str())
.fetch_all::<TopDatasetsResponseClickhouse>()
.await
Expand All @@ -1402,9 +1405,34 @@ pub async fn get_top_datasets_query(
ServiceError::InternalServerError("Error fetching query".to_string())
})?;

let response = clickhouse_query
let dataset_ids = clickhouse_resp_data
.iter()
.map(|x| x.dataset_id)
.collect::<Vec<_>>();
let mut conn = pool
.get()
.await
.map_err(|_| ServiceError::BadRequest("Could not get database connection".to_string()))?;
let dataset_id_and_tracking_ids = datasets_columns::datasets
.select((datasets_columns::id, datasets_columns::tracking_id))
.filter(datasets_columns::id.eq_any(dataset_ids))
.load::<(uuid::Uuid, Option<String>)>(&mut conn)
.await
.map_err(|e| {
log::error!("Error fetching dataset ids: {:?}", e);
ServiceError::InternalServerError("Error fetching dataset ids".to_string())
})?;

let response = clickhouse_resp_data
.into_iter()
.map(|x| x.into())
.map(|x| {
let mut top_dataset_resps = TopDatasetsResponse::from(x.clone());
top_dataset_resps.dataset_tracking_id = dataset_id_and_tracking_ids
.iter()
.find(|(id, _)| id == &x.dataset_id)
.and_then(|(_, tracking_id)| tracking_id.clone());
top_dataset_resps
})
.collect::<Vec<_>>();

Ok(response)
Expand Down
13 changes: 4 additions & 9 deletions server/src/operators/message_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,10 @@ pub async fn create_message_query(
) -> Result<(), ServiceError> {
use crate::data::schema::messages::dsl::messages;

let mut conn = match pool.get().await {
Ok(conn) => conn,
Err(e) => {
log::error!("Error getting connection from pool: {:?}", e);
return Err(ServiceError::InternalServerError(
"Error getting postgres connection from pool".into(),
));
}
};
let mut conn = pool
.get()
.await
.map_err(|_| ServiceError::BadRequest("Could not get database connection".to_string()))?;

diesel::insert_into(messages)
.values(&new_message)
Expand Down

0 comments on commit 632f29d

Please sign in to comment.