Skip to content

Commit

Permalink
Add ComponentPath to UdfIdentifier (#27943)
Browse files Browse the repository at this point in the history
This PR adds `ComponentPath` to `UdfIdentifier`, which we need for namespacing logs and usage. I updated the `/app_metrics` API endpoints to take in component path. I'll follow up with adding that to the dashboard app_metrics calls after backend is pushed with this.

GitOrigin-RevId: f87f9667711b865a387a83afcbdcc1dd62d779ef
  • Loading branch information
emmaling27 authored and Convex, Inc. committed Jul 17, 2024
1 parent 172bebb commit 0fa8a6c
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 26 deletions.
36 changes: 31 additions & 5 deletions crates/application/src/function_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ use std::{
};

use common::{
components::CanonicalizedComponentFunctionPath,
components::{
CanonicalizedComponentFunctionPath,
ComponentFunctionPath,
ComponentPath,
},
errors::{
report_error,
JsError,
Expand Down Expand Up @@ -191,7 +195,14 @@ impl FunctionExecution {

fn identifier(&self) -> UdfIdentifier {
match &self.params {
UdfParams::Function { identifier, .. } => UdfIdentifier::Function(identifier.clone()),
UdfParams::Function { identifier, .. } => {
let component = ComponentPath::TODO();
let path = ComponentFunctionPath {
component,
udf_path: identifier.clone().strip(),
};
UdfIdentifier::Function(path.canonicalize())
},
UdfParams::Http { identifier, .. } => UdfIdentifier::Http(identifier.clone()),
}
}
Expand Down Expand Up @@ -622,8 +633,13 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
TrackUsage::Track(usage_tracker) => {
let usage_stats = usage_tracker.gather_user_stats();
let aggregated = usage_stats.aggregate();
let component = ComponentPath::TODO();
let path = ComponentFunctionPath {
component,
udf_path: udf_path.clone().strip(),
};
self.usage_tracking.track_call(
UdfIdentifier::Function(udf_path.clone()),
UdfIdentifier::Function(path.canonicalize()),
context.execution_id.clone(),
if was_cached {
CallType::CachedQuery
Expand Down Expand Up @@ -759,8 +775,13 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
TrackUsage::Track(usage_tracker) => {
let usage_stats = usage_tracker.gather_user_stats();
let aggregated = usage_stats.aggregate();
let component = ComponentPath::TODO();
let path = ComponentFunctionPath {
component,
udf_path: udf_path.clone().strip(),
};
self.usage_tracking.track_call(
UdfIdentifier::Function(udf_path.clone()),
UdfIdentifier::Function(path.canonicalize()),
context.execution_id.clone(),
CallType::Mutation,
usage_stats,
Expand Down Expand Up @@ -854,8 +875,13 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
TrackUsage::Track(usage_tracker) => {
let usage_stats = usage_tracker.gather_user_stats();
let aggregated = usage_stats.aggregate();
let component = ComponentPath::TODO();
let path = ComponentFunctionPath {
component,
udf_path: udf_path.clone().strip(),
};
self.usage_tracking.track_call(
UdfIdentifier::Function(udf_path.clone()),
UdfIdentifier::Function(path.canonicalize()),
completion.context.execution_id.clone(),
CallType::Action {
env: completion.environment,
Expand Down
7 changes: 7 additions & 0 deletions crates/common/src/components/component_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ impl ComponentPath {
format!(" in '{}'", String::from(self.clone()))
}
}

pub fn deserialize(path: Option<&str>) -> anyhow::Result<Self> {
match path {
Some(p) => p.parse(),
None => Ok(ComponentPath::root()),
}
}
}

impl Deref for ComponentPath {
Expand Down
10 changes: 6 additions & 4 deletions crates/common/src/types/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ use serde::{
Deserialize,
Serialize,
};
use sync_types::CanonicalizedUdfPath;
use value::{
heap_size::HeapSize,
id_v6::DeveloperDocumentId,
};

use super::HttpActionRoute;
use crate::version::ClientVersion;
use crate::{
components::CanonicalizedComponentFunctionPath,
version::ClientVersion,
};

#[derive(Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -104,15 +106,15 @@ impl From<UdfTypeProto> for UdfType {
/// A unique identifier for a UDF
#[derive(Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub enum UdfIdentifier {
Function(CanonicalizedUdfPath),
Function(CanonicalizedComponentFunctionPath),
Http(HttpActionRoute),
Cli(String),
}

impl fmt::Display for UdfIdentifier {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
UdfIdentifier::Function(path) => write!(f, "{}", path),
UdfIdentifier::Function(path) => write!(f, "{}", path.debug_str()),
UdfIdentifier::Http(route) => write!(f, "{}", route.path),
UdfIdentifier::Cli(command) => write!(f, "_cli/{command}"),
}
Expand Down
37 changes: 26 additions & 11 deletions crates/database/src/tests/usage_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ use std::time::Duration;
use common::{
assert_obj,
bootstrap_model::index::IndexMetadata,
components::{
ComponentFunctionPath,
ComponentPath,
},
execution_context::ExecutionId,
maybe_val,
query::{
Expand All @@ -22,6 +26,7 @@ use keybroker::Identity;
use maplit::btreeset;
use pretty_assertions::assert_eq;
use runtime::testing::TestRuntime;
use sync_types::CanonicalizedUdfPath;
use usage_tracking::{
CallType,
FunctionUsageTracker,
Expand All @@ -41,6 +46,16 @@ use crate::{
UserFacingModel,
};

fn test_udf_identifier() -> UdfIdentifier {
let udf_path: CanonicalizedUdfPath = "test.js:default".parse().unwrap();
let component = ComponentPath::root();
let path = ComponentFunctionPath {
component,
udf_path: udf_path.strip(),
};
UdfIdentifier::Function(path.canonicalize())
}

#[convex_macro::test_runtime]
async fn vector_insert_with_no_index_does_not_count_usage(rt: TestRuntime) -> anyhow::Result<()> {
let fixtures = VectorFixtures::new(rt).await?;
Expand All @@ -55,7 +70,7 @@ async fn vector_insert_with_no_index_does_not_count_usage(rt: TestRuntime) -> an
add_document_vec_array(&mut tx, &table_name, [3f64, 4f64]).await?;
fixtures.db.commit(tx).await?;
fixtures.db.usage_counter().track_call(
UdfIdentifier::Function("test.js:default".parse()?),
test_udf_identifier(),
ExecutionId::new(),
CallType::Action {
env: ModuleEnvironment::Isolate,
Expand Down Expand Up @@ -84,7 +99,7 @@ async fn vector_insert_counts_usage_for_backfilling_indexes(rt: TestRuntime) ->
add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?;
fixtures.db.commit(tx).await?;
fixtures.db.usage_counter().track_call(
UdfIdentifier::Function("test.js:default".parse()?),
test_udf_identifier(),
ExecutionId::new(),
CallType::Mutation,
tx_usage.gather_user_stats(),
Expand Down Expand Up @@ -117,7 +132,7 @@ async fn vector_insert_counts_usage_for_enabled_indexes(rt: TestRuntime) -> anyh
add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?;
fixtures.db.commit(tx).await?;
fixtures.db.usage_counter().track_call(
UdfIdentifier::Function("test.js:default".parse()?),
test_udf_identifier(),
ExecutionId::new(),
CallType::Action {
env: ModuleEnvironment::Isolate,
Expand Down Expand Up @@ -150,7 +165,7 @@ async fn vectors_in_segment_count_as_usage(rt: TestRuntime) -> anyhow::Result<()
add_document_vec_array(&mut tx, index_name.table(), [3f64, 4f64]).await?;
fixtures.db.commit(tx).await?;
fixtures.db.usage_counter().track_call(
UdfIdentifier::Function("test.js:default".parse()?),
test_udf_identifier(),
ExecutionId::new(),
CallType::Action {
env: ModuleEnvironment::Isolate,
Expand Down Expand Up @@ -199,7 +214,7 @@ async fn vector_query_counts_bandwidth(rt: TestRuntime) -> anyhow::Result<()> {
tx_usage.add(usage_stats);

fixtures.db.usage_counter().track_call(
UdfIdentifier::Function("test.js:default".parse()?),
test_udf_identifier(),
ExecutionId::new(),
CallType::Action {
env: ModuleEnvironment::Isolate,
Expand Down Expand Up @@ -241,7 +256,7 @@ async fn test_usage_tracking_basic_insert_and_get(rt: TestRuntime) -> anyhow::Re
.await?;
db.commit(tx).await?;
db.usage_counter().track_call(
UdfIdentifier::Function("test.js:default".parse()?),
test_udf_identifier(),
ExecutionId::new(),
CallType::Mutation,
tx_usage.gather_user_stats(),
Expand All @@ -266,7 +281,7 @@ async fn test_usage_tracking_basic_insert_and_get(rt: TestRuntime) -> anyhow::Re
.await?;
db.commit(tx).await?;
db.usage_counter().track_call(
UdfIdentifier::Function("test.js:default".parse()?),
test_udf_identifier(),
ExecutionId::new(),
CallType::Mutation,
tx_usage.gather_user_stats(),
Expand Down Expand Up @@ -308,7 +323,7 @@ async fn test_usage_tracking_insert_with_index(rt: TestRuntime) -> anyhow::Resul
.unwrap_or_else(|e| panic!("Failed to add index for {} {:?}", "by_key", e));
db.commit(tx).await?;
db.usage_counter().track_call(
UdfIdentifier::Function("test.js:default".parse()?),
test_udf_identifier(),
ExecutionId::new(),
CallType::Mutation,
tx_usage.gather_user_stats(),
Expand All @@ -332,7 +347,7 @@ async fn test_usage_tracking_insert_with_index(rt: TestRuntime) -> anyhow::Resul
.await?;
db.commit(tx).await?;
db.usage_counter().track_call(
UdfIdentifier::Function("test.js:default".parse()?),
test_udf_identifier(),
ExecutionId::new(),
CallType::Mutation,
tx_usage.gather_user_stats(),
Expand All @@ -359,7 +374,7 @@ async fn test_usage_tracking_insert_with_index(rt: TestRuntime) -> anyhow::Resul
while query_stream.next(&mut tx, None).await?.is_some() {}
db.commit(tx).await?;
db.usage_counter().track_call(
UdfIdentifier::Function("test.js:default".parse()?),
test_udf_identifier(),
ExecutionId::new(),
CallType::Mutation,
tx_usage.gather_user_stats(),
Expand All @@ -386,7 +401,7 @@ async fn http_action_counts_compute(rt: TestRuntime) -> anyhow::Result<()> {

let tx_usage = FunctionUsageTracker::new();
db.usage_counter().track_call(
UdfIdentifier::Function("test.js:default".parse()?),
test_udf_identifier(),
ExecutionId::new(),
CallType::HttpAction {
duration: Duration::from_secs(5),
Expand Down
15 changes: 10 additions & 5 deletions crates/local_backend/src/node_action_callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use serde_json::{
};
use sync_types::{
AuthenticationToken,
CanonicalizedUdfPath,
UdfPath,
};
use usage_tracking::FunctionUsageTracker;
Expand Down Expand Up @@ -321,12 +322,16 @@ pub async fn vector_search(
if let Some(action_name) = action_name {
let usage = FunctionUsageTracker::new();
usage.add(usage_stats);
let component = ComponentPath::TODO();
let udf_path: CanonicalizedUdfPath = action_name
.parse()
.context(format!("Unexpected udf path format, got {action_name}"))?;
let path = ComponentFunctionPath {
component,
udf_path: udf_path.clone().strip(),
};
st.application.usage_counter().track_function_usage(
UdfIdentifier::Function(
action_name
.parse()
.context(format!("Unexpected udf path format, got {action_name}"))?,
),
UdfIdentifier::Function(path.canonicalize()),
// TODO(CX-6045) - have the action send the ExecutionId as a request header
context.execution_id,
usage.gather_user_stats(),
Expand Down
2 changes: 1 addition & 1 deletion crates/usage_tracking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl UsageCounter {
// Because system udfs might cause usage before any data is added by the user,
// we do not count their calls. We do count their bandwidth.
let (should_track_calls, udf_id_type) = match &udf_path {
UdfIdentifier::Function(path) => (!path.is_system(), "function"),
UdfIdentifier::Function(path) => (!path.udf_path.is_system(), "function"),
UdfIdentifier::Http(_) => (true, "http"),
UdfIdentifier::Cli(_) => (false, "cli"),
};
Expand Down

0 comments on commit 0fa8a6c

Please sign in to comment.