Skip to content

Commit bb0c8c1

Browse files
committed
mostly finish DeploymentIdLayer and StateChangeLayer
1 parent 74b8fe9 commit bb0c8c1

File tree

17 files changed

+355
-389
lines changed

17 files changed

+355
-389
lines changed

auth/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::io;
22

33
use clap::Parser;
4-
use shuttle_common::backends::tracing::setup_tracing;
4+
use shuttle_common::{backends::tracing::setup_tracing, log::Backend};
55
use sqlx::migrate::Migrator;
66
use tracing::{info, trace};
77

@@ -15,7 +15,7 @@ async fn main() -> io::Result<()> {
1515

1616
trace!(args = ?args, "parsed args");
1717

18-
setup_tracing(tracing_subscriber::registry(), "auth");
18+
setup_tracing(tracing_subscriber::registry(), Backend::Auth);
1919

2020
let db_path = args.state.join("authentication.sqlite");
2121

common/src/backends/tracing.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ use tracing::{debug_span, instrument::Instrumented, Instrument, Span, Subscriber
2020
use tracing_opentelemetry::OpenTelemetrySpanExt;
2121
use tracing_subscriber::{fmt, prelude::*, registry::LookupSpan, EnvFilter};
2222

23+
use crate::log::Backend;
24+
2325
const OTLP_ADDRESS: &str = "http://otel-collector:4317";
2426

25-
pub fn setup_tracing<S>(subscriber: S, service_name: &str)
27+
pub fn setup_tracing<S>(subscriber: S, backend: Backend)
2628
where
2729
S: Subscriber + for<'a> LookupSpan<'a> + Send + Sync,
2830
{
@@ -46,7 +48,7 @@ where
4648
.with_trace_config(
4749
trace::config().with_resource(Resource::new(vec![KeyValue::new(
4850
"service.name",
49-
service_name.to_string(),
51+
backend.to_string(),
5052
)])),
5153
)
5254
.install_batch(Tokio)
@@ -196,15 +198,17 @@ pub fn serde_json_map_to_key_value_list(
196198
/// Convert an [AnyValue] to a [serde_json::Value]
197199
pub fn from_any_value_to_serde_json_value(any_value: AnyValue) -> serde_json::Value {
198200
let Some(value) = any_value.value else {
199-
return serde_json::Value::Null
201+
return serde_json::Value::Null;
200202
};
201203

202204
match value {
203205
any_value::Value::StringValue(s) => serde_json::Value::String(s),
204206
any_value::Value::BoolValue(b) => serde_json::Value::Bool(b),
205207
any_value::Value::IntValue(i) => serde_json::Value::Number(i.into()),
206208
any_value::Value::DoubleValue(f) => {
207-
let Some(number) = serde_json::Number::from_f64(f) else {return serde_json::Value::Null};
209+
let Some(number) = serde_json::Number::from_f64(f) else {
210+
return serde_json::Value::Null;
211+
};
208212
serde_json::Value::Number(number)
209213
}
210214
any_value::Value::ArrayValue(a) => {

common/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub type DeploymentId = Uuid;
1212
#[cfg(feature = "service")]
1313
pub mod log;
1414
#[cfg(feature = "service")]
15-
pub use log::Item as LogItem;
15+
pub use log::LogItem;
1616
#[cfg(feature = "models")]
1717
pub mod models;
1818
#[cfg(feature = "service")]

common/src/log.rs

Lines changed: 131 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,59 @@
11
use chrono::{DateTime, Utc};
22
#[cfg(feature = "display")]
3-
use crossterm::style::{StyledContent, Stylize};
3+
use crossterm::style::Stylize;
44
use serde::{Deserialize, Serialize};
55
use strum::EnumString;
6+
use tracing::{field::Visit, span, warn, Metadata, Subscriber};
7+
use tracing_subscriber::Layer;
68
#[cfg(feature = "openapi")]
79
use utoipa::ToSchema;
810
use uuid::Uuid;
911

12+
/// Used to determine settings based on which backend crate does what
1013
#[derive(Clone, Debug, EnumString, Eq, PartialEq, Deserialize, Serialize)]
1114
#[cfg_attr(feature = "display", derive(strum::Display))]
1215
#[cfg_attr(feature = "openapi", derive(ToSchema))]
13-
pub enum InternalLogOrigin {
16+
pub enum Backend {
17+
/// Is considered an error
1418
Unknown,
15-
Deployer,
19+
20+
Auth,
1621
// Builder,
17-
// ResourceRecorder,
22+
Deployer,
23+
Gateway,
24+
Logger,
25+
Provisioner,
26+
ResourceRecorder,
1827
}
1928

20-
impl Default for InternalLogOrigin {
29+
impl Default for Backend {
2130
fn default() -> Self {
2231
Self::Unknown
2332
}
2433
}
2534

2635
#[derive(Clone, Debug, Deserialize, Serialize)]
2736
#[cfg_attr(feature = "openapi", derive(ToSchema))]
28-
#[cfg_attr(feature = "openapi", schema(as = shuttle_common::log::Item))]
29-
pub struct Item {
37+
#[cfg_attr(feature = "openapi", schema(as = shuttle_common::log::LogItem))]
38+
pub struct LogItem {
39+
/// Deployment id
3040
#[cfg_attr(feature = "openapi", schema(value_type = KnownFormat::Uuid))]
3141
pub id: Uuid,
42+
43+
/// Internal service that produced this log
3244
#[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::log::InternalLogOrigin))]
33-
pub internal_origin: InternalLogOrigin,
45+
pub internal_origin: Backend,
46+
47+
/// Time log was produced
3448
#[cfg_attr(feature = "openapi", schema(value_type = KnownFormat::DateTime))]
3549
pub timestamp: DateTime<Utc>,
50+
51+
/// The log line
3652
pub line: String,
3753
}
3854

3955
#[cfg(feature = "display")]
40-
impl std::fmt::Display for Item {
56+
impl std::fmt::Display for LogItem {
4157
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4258
let datetime: chrono::DateTime<chrono::Local> = DateTime::from(self.timestamp);
4359

@@ -51,6 +67,110 @@ impl std::fmt::Display for Item {
5167
}
5268
}
5369

70+
/// Records logs for the deployment progress
71+
pub trait LogRecorder: Clone + Send + 'static {
72+
fn record(&self, log: LogItem);
73+
}
74+
75+
/// Tracing subscriber layer which logs based on if the log
76+
/// is from a span that is tagged with a deployment id
77+
pub struct DeploymentLogLayer<R>
78+
where
79+
R: LogRecorder + Send + Sync,
80+
{
81+
pub recorder: R,
82+
pub internal_service: Backend,
83+
}
84+
85+
impl<R, S> Layer<S> for DeploymentLogLayer<R>
86+
where
87+
S: Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
88+
R: LogRecorder + Send + Sync + 'static,
89+
{
90+
fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
91+
// We only care about events in some scope
92+
let scope = if let Some(scope) = ctx.event_scope(event) {
93+
scope
94+
} else {
95+
return;
96+
};
97+
98+
// Find the outermost scope with the scope details containing the current state
99+
for span in scope.from_root() {
100+
let extensions = span.extensions();
101+
102+
if let Some(details) = extensions.get::<ScopeDetails>() {
103+
self.recorder.record(LogItem {
104+
id: details.id,
105+
internal_origin: self.internal_service.clone(),
106+
timestamp: Utc::now(),
107+
line: "Test".into(),
108+
});
109+
break;
110+
}
111+
}
112+
}
113+
fn on_new_span(
114+
&self,
115+
attrs: &span::Attributes<'_>,
116+
id: &span::Id,
117+
ctx: tracing_subscriber::layer::Context<'_, S>,
118+
) {
119+
// We only care about spans that change the state
120+
if !DeploymentIdVisitor::is_valid(attrs.metadata()) {
121+
return;
122+
}
123+
let mut visitor = DeploymentIdVisitor::default();
124+
attrs.record(&mut visitor);
125+
let details = visitor.details;
126+
127+
if details.id.is_nil() {
128+
warn!("scope details does not have a valid id");
129+
return;
130+
}
131+
132+
// Safe to unwrap since this is the `on_new_span` method
133+
let span = ctx.span(id).unwrap();
134+
let mut extensions = span.extensions_mut();
135+
136+
self.recorder.record(LogItem {
137+
id: details.id,
138+
internal_origin: self.internal_service.clone(),
139+
timestamp: Utc::now(),
140+
line: "Test".into(),
141+
});
142+
143+
extensions.insert::<ScopeDetails>(details);
144+
}
145+
}
146+
147+
#[derive(Debug, Default)]
148+
struct ScopeDetails {
149+
id: Uuid,
150+
}
151+
/// To extract `id` field for scopes that have it
152+
#[derive(Default)]
153+
struct DeploymentIdVisitor {
154+
details: ScopeDetails,
155+
}
156+
157+
impl DeploymentIdVisitor {
158+
/// Field containing the deployment identifier
159+
const ID_IDENT: &'static str = "id";
160+
161+
fn is_valid(metadata: &Metadata) -> bool {
162+
metadata.is_span() && metadata.fields().field(Self::ID_IDENT).is_some()
163+
}
164+
}
165+
166+
impl Visit for DeploymentIdVisitor {
167+
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
168+
if field.name() == Self::ID_IDENT {
169+
self.details.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default();
170+
}
171+
}
172+
}
173+
54174
#[cfg(test)]
55175
mod tests {
56176
use super::*;
@@ -66,9 +186,9 @@ mod tests {
66186

67187
#[test]
68188
fn test_timezone_formatting() {
69-
let item = Item {
189+
let item = LogItem {
70190
id: Uuid::new_v4(),
71-
internal_origin: InternalLogOrigin::Deployer,
191+
internal_origin: Backend::Deployer,
72192
timestamp: Utc::now(),
73193
line: r#"{"message": "Building"}"#.to_owned(),
74194
};

deployer/src/deployment/mod.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,27 @@
1-
pub mod deploy_layer;
2-
pub mod gateway_client;
3-
mod queue;
4-
mod run;
5-
61
use std::{path::PathBuf, sync::Arc};
72

83
pub use queue::Queued;
94
pub use run::{ActiveDeploymentsGetter, Built};
10-
use shuttle_common::storage_manager::ArtifactsStorageManager;
5+
use shuttle_common::{log::LogRecorder, storage_manager::ArtifactsStorageManager};
116
use shuttle_proto::logger::logger_client::LoggerClient;
7+
use tokio::{
8+
sync::{mpsc, Mutex},
9+
task::JoinSet,
10+
};
1211
use tracing::{instrument, Span};
1312
use tracing_opentelemetry::OpenTelemetrySpanExt;
13+
use uuid::Uuid;
1414

15+
pub mod gateway_client;
16+
mod queue;
17+
mod run;
18+
pub mod state_change_layer;
19+
20+
use self::gateway_client::BuildQueueClient;
1521
use crate::{
1622
persistence::{DeploymentUpdater, ResourceManager, SecretGetter, SecretRecorder, State},
1723
RuntimeManager,
1824
};
19-
use tokio::{
20-
sync::{mpsc, Mutex},
21-
task::JoinSet,
22-
};
23-
use uuid::Uuid;
24-
25-
use self::{deploy_layer::LogRecorder, gateway_client::BuildQueueClient};
2625

2726
const QUEUE_BUFFER_SIZE: usize = 100;
2827
const RUN_BUFFER_SIZE: usize = 100;

deployer/src/deployment/queue.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ use chrono::Utc;
1111
use crossbeam_channel::Sender;
1212
use flate2::read::GzDecoder;
1313
use opentelemetry::global;
14-
use shuttle_common::claims::Claim;
15-
use shuttle_service::builder::{build_workspace, BuiltService};
14+
use shuttle_common::log::LogRecorder;
1615
use tar::Archive;
1716
use tokio::fs;
1817
use tokio::task::JoinSet;
@@ -22,12 +21,17 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
2221
use ulid::Ulid;
2322
use uuid::Uuid;
2423

25-
use super::deploy_layer::{Log, LogRecorder};
24+
use shuttle_common::{
25+
claims::Claim,
26+
storage_manager::{ArtifactsStorageManager, StorageManager},
27+
LogItem,
28+
};
29+
use shuttle_service::builder::{build_workspace, BuiltService};
30+
2631
use super::gateway_client::BuildQueueClient;
2732
use super::{Built, QueueReceiver, RunSender, State};
2833
use crate::error::{Error, Result, TestError};
2934
use crate::persistence::{DeploymentUpdater, SecretRecorder};
30-
use shuttle_common::storage_manager::{ArtifactsStorageManager, StorageManager};
3135

3236
pub async fn task(
3337
mut recv: QueueReceiver,
@@ -192,18 +196,13 @@ impl Queued {
192196
trace!(?message, "received cargo message");
193197
// TODO: change these to `info!(...)` as [valuable] support increases.
194198
// Currently it is not possible to turn these serde `message`s into a `valuable`, but once it is the passing down of `log_recorder` should be removed.
195-
let log = match message {
196-
Message::TextLine(line) => Log {
197-
deployment_id: self.id,
198-
internal_origin: shuttle_common::log::InternalLogOrigin::Deployer, // will change to Builder
199-
tx_timestamp: Utc::now(),
200-
line,
201-
},
202-
message => Log {
203-
deployment_id: self.id,
204-
internal_origin: shuttle_common::log::InternalLogOrigin::Deployer, // will change to Builder
205-
tx_timestamp: Utc::now(),
206-
line: serde_json::to_string(&message).unwrap(),
199+
let log = LogItem {
200+
id: self.id,
201+
internal_origin: shuttle_common::log::Backend::Deployer, // will change to Builder
202+
timestamp: Utc::now(),
203+
line: match message {
204+
Message::TextLine(line) => line,
205+
message => serde_json::to_string(&message).unwrap(),
207206
},
208207
};
209208
log_recorder.record(log);

0 commit comments

Comments
 (0)