Skip to content

Commit

Permalink
feature: add track events for users
Browse files Browse the repository at this point in the history
  • Loading branch information
densumesh authored and cdxker committed Oct 2, 2024
1 parent 25eefd1 commit ee715fd
Showing 1 changed file with 86 additions and 12 deletions.
98 changes: 86 additions & 12 deletions server/src/operators/dittofeed_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::{
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DittofeedIdentifyUser {
pub r#type: String,
pub message_id: uuid::Uuid,
pub user_id: uuid::Uuid,
pub traits: DittofeedUserTraits,
Expand All @@ -35,16 +36,15 @@ pub struct DittofeedUserTraits {
pub email: String,
pub name: Option<String>,
pub created_at: chrono::NaiveDateTime,
pub org_usages: Vec<DittoOrgUsage>,
pub organization_count: i32,
pub dataset_count: i32,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DittoOrgUsage {
pub organization: Organization,
pub dataset_count: i32,
pub dataset_usages: Vec<DittoDatasetUsage>,
pub top_search_datasets: Vec<Dataset>,
pub top_rag_datasets: Vec<Dataset>,
pub top_recommendation_datasets: Vec<Dataset>,
Expand All @@ -61,13 +61,43 @@ pub struct DittoDatasetUsage {
pub rag_count: u32,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum DittoTrackProperties {
DittoDatasetUsage(DittoDatasetUsage),
DittoOrgUsage(DittoOrgUsage),
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DittoTrackRequest {
pub r#type: String,
pub message_id: uuid::Uuid,
pub event: String,
pub properties: DittoTrackProperties,
pub user_id: uuid::Uuid,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum DittoBatchRequestTypes {
Identify(DittofeedIdentifyUser),
Track(DittoTrackRequest),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DittoBatchRequest {
pub batch: Vec<DittoBatchRequestTypes>,
}

pub async fn get_user_ditto_identity(
user: SlimUser,
pool: web::Data<Pool>,
clickhouse_client: &clickhouse::Client,
) -> Result<DittofeedIdentifyUser, ServiceError> {
) -> Result<DittoBatchRequest, ServiceError> {
let organization_count = user.orgs.len() as i32;
let mut org_usages: Vec<DittoOrgUsage> = vec![];
let mut dataset_usages: Vec<DittoDatasetUsage> = vec![];

for organization in user.orgs {
let usage = get_org_usage_by_id_query(organization.id, pool.clone()).await?;
Expand Down Expand Up @@ -163,8 +193,6 @@ pub async fn get_user_ditto_identity(
)
.await?;

let mut dataset_usages: Vec<DittoDatasetUsage> = vec![];

for dataset in datasets {
let search_metrics =
get_search_metrics_query(dataset.dataset.id, None, clickhouse_client).await?;
Expand Down Expand Up @@ -194,7 +222,6 @@ pub async fn get_user_ditto_identity(
let org_usage = DittoOrgUsage {
organization: organization.clone(),
dataset_count: usage.dataset_count,
dataset_usages,
top_search_datasets,
top_rag_datasets,
top_recommendation_datasets,
Expand All @@ -203,30 +230,77 @@ pub async fn get_user_ditto_identity(
org_usages.push(org_usage);
}

Ok(DittofeedIdentifyUser {
let org_track_requests = org_usages
.into_iter()
.map(|usage| DittoTrackRequest {
r#type: "track".to_string(),
message_id: usage.organization.id,
event: "ORGANIZATION_USAGE".to_string(),
properties: DittoTrackProperties::DittoOrgUsage(usage),
user_id: user.id,
})
.collect::<Vec<_>>();

let dataset_track_requests = dataset_usages
.into_iter()
.map(|usage| DittoTrackRequest {
r#type: "track".to_string(),
message_id: usage.dataset.id,
event: "DATASET_USAGE".to_string(),
properties: DittoTrackProperties::DittoDatasetUsage(usage),
user_id: user.id,
})
.collect::<Vec<_>>();

let dataset_count = dataset_track_requests.len() as i32;

let track_requests = org_track_requests
.into_iter()
.chain(dataset_track_requests.into_iter())
.collect::<Vec<_>>();

let batch_requests = vec![DittoBatchRequestTypes::Identify(DittofeedIdentifyUser {
r#type: "identify".to_string(),
message_id: uuid::Uuid::new_v4(),
user_id: user.id,
traits: DittofeedUserTraits {
email: user.email,
name: user.name,
created_at: user.created_at,
organization_count,
org_usages,
dataset_count,
},
})
})];

let batch_requests = batch_requests
.into_iter()
.chain(
track_requests
.into_iter()
.map(DittoBatchRequestTypes::Track),
)
.collect::<Vec<_>>();

let batch_request = DittoBatchRequest {
batch: batch_requests,
};

Ok(batch_request)
}

pub async fn send_user_ditto_identity(request: DittofeedIdentifyUser) -> Result<(), ServiceError> {
pub async fn send_user_ditto_identity(
batch_request: DittoBatchRequest,
) -> Result<(), ServiceError> {
let dittofeed_url =
std::env::var("DITTOFEED_URL").unwrap_or("https://app.dittofeed.com".to_string());
let api_key = std::env::var("DITTOFEED_API_KEY").expect("DITTOFEED_API_KEY is not set");

let client = reqwest::Client::new();

client
.post(format!("{}/api/public/apps/identify", dittofeed_url))
.post(format!("{}/api/public/apps/batch", dittofeed_url))
.header("Authorization", format!("Basic {}", api_key))
.json(&request)
.json(&batch_request)
.send()
.await
.map_err(|e| ServiceError::BadRequest(e.to_string()))?
Expand Down

0 comments on commit ee715fd

Please sign in to comment.