diff --git a/client/src/lib.rs b/client/src/lib.rs index 4d6088f87..8b54aa1a6 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -591,7 +591,7 @@ impl ClientOptions { client: TemporalServiceClient::new(svc), options: Arc::new(self.clone()), capabilities: None, - workers: Arc::new(ClientWorkerSet::new()), + workers: Arc::new(ClientWorkerSet::new(false)), }; if !self.skip_get_system_info { match client diff --git a/client/src/raw.rs b/client/src/raw.rs index 92e6a7956..f461de84d 100644 --- a/client/src/raw.rs +++ b/client/src/raw.rs @@ -1345,6 +1345,15 @@ proxier! { r.extensions_mut().insert(labels); } ); + ( + describe_worker, + DescribeWorkerRequest, + DescribeWorkerResponse, + |r| { + let labels = namespaced_request!(r); + r.extensions_mut().insert(labels); + } + ); ( record_worker_heartbeat, RecordWorkerHeartbeatRequest, diff --git a/client/src/worker_registry/mod.rs b/client/src/worker_registry/mod.rs index f10b128ce..f97adab1a 100644 --- a/client/src/worker_registry/mod.rs +++ b/client/src/worker_registry/mod.rs @@ -46,15 +46,19 @@ struct ClientWorkerSetImpl { all_workers: HashMap>, /// Maps namespace to shared worker for worker heartbeating shared_worker: HashMap>, + /// Disables erroring when multiple workers on the same namespace+task queue are registered. + /// This is used with testing, where multiple tests run in parallel on the same client + disable_dupe_check: bool, } impl ClientWorkerSetImpl { /// Factory method. - fn new() -> Self { + fn new(disable_dupe_check: bool) -> Self { Self { slot_providers: Default::default(), all_workers: Default::default(), shared_worker: Default::default(), + disable_dupe_check, } } @@ -81,7 +85,7 @@ impl ClientWorkerSetImpl { worker.namespace().to_string(), worker.task_queue().to_string(), ); - if self.slot_providers.contains_key(&slot_key) { + if self.slot_providers.contains_key(&slot_key) && !self.disable_dupe_check { bail!( "Registration of multiple workers on the same namespace and task queue for the same client not allowed: {slot_key:?}, worker_instance_key: {:?}.", worker.worker_instance_key() @@ -133,14 +137,8 @@ impl ClientWorkerSetImpl { if let Some(w) = self.shared_worker.get_mut(worker.namespace()) { let (callback, is_empty) = w.unregister_callback(worker.worker_instance_key()); - if let Some(cb) = callback { - if is_empty { - self.shared_worker.remove(worker.namespace()); - } - - // To maintain single ownership of the callback, we must re-register the callback - // back to the ClientWorker - worker.register_callback(cb); + if callback.is_some() && is_empty { + self.shared_worker.remove(worker.namespace()); } } @@ -188,16 +186,16 @@ pub struct ClientWorkerSet { impl Default for ClientWorkerSet { fn default() -> Self { - Self::new() + Self::new(false) } } impl ClientWorkerSet { /// Factory method. - pub fn new() -> Self { + pub fn new(disable_dupe_check: bool) -> Self { Self { worker_grouping_key: Uuid::new_v4(), - worker_manager: RwLock::new(ClientWorkerSetImpl::new()), + worker_manager: RwLock::new(ClientWorkerSetImpl::new(disable_dupe_check)), } } @@ -212,14 +210,6 @@ impl ClientWorkerSet { .try_reserve_wft_slot(namespace, task_queue) } - /// Unregisters a local worker, typically when that worker starts shutdown. - pub fn unregister_worker( - &self, - worker_instance_key: Uuid, - ) -> Result, anyhow::Error> { - self.worker_manager.write().unregister(worker_instance_key) - } - /// Register a local worker that can provide WFT processing slots and potentially worker heartbeating. pub fn register_worker( &self, @@ -228,6 +218,14 @@ impl ClientWorkerSet { self.worker_manager.write().register(worker) } + /// Unregisters a local worker, typically when that worker starts shutdown. + pub fn unregister_worker( + &self, + worker_instance_key: Uuid, + ) -> Result, anyhow::Error> { + self.worker_manager.write().unregister(worker_instance_key) + } + /// Returns the worker grouping key, which is unique for each worker. pub fn worker_grouping_key(&self) -> Uuid { self.worker_grouping_key @@ -256,7 +254,7 @@ impl std::fmt::Debug for ClientWorkerSet { } /// Contains a worker heartbeat callback, wrapped for mocking -pub type HeartbeatCallback = Box WorkerHeartbeat + Send + Sync>; +pub type HeartbeatCallback = Arc WorkerHeartbeat + Send + Sync>; /// Represents a complete worker that can handle both slot management /// and worker heartbeat functionality. @@ -276,7 +274,7 @@ pub trait ClientWorker: Send + Sync { fn try_reserve_wft_slot(&self) -> Option>; /// Unique identifier for this worker instance. - /// This must be stable across the worker's lifetime but unique per instance. + /// This must be stable across the worker's lifetime and unique per instance. fn worker_instance_key(&self) -> Uuid; /// Indicates if worker heartbeating is enabled for this client worker. @@ -289,9 +287,6 @@ pub trait ClientWorker: Send + Sync { fn new_shared_namespace_worker( &self, ) -> Result, anyhow::Error>; - - /// Registers a worker heartbeat callback, typically when a worker is unregistered from a client - fn register_callback(&self, callback: HeartbeatCallback); } #[cfg(test)] @@ -340,7 +335,7 @@ mod tests { #[test] fn registry_keeps_one_provider_per_namespace() { - let manager = ClientWorkerSet::new(); + let manager = ClientWorkerSet::new(false); let mut worker_keys = vec![]; let mut successful_registrations = 0; @@ -453,7 +448,7 @@ mod tests { if heartbeat_enabled { mock_provider .expect_heartbeat_callback() - .returning(|| Some(Box::new(WorkerHeartbeat::default))); + .returning(|| Some(Arc::new(WorkerHeartbeat::default))); let namespace_clone = namespace.clone(); mock_provider @@ -463,8 +458,6 @@ mod tests { namespace_clone.clone(), ))) }); - - mock_provider.expect_register_callback().returning(|_| {}); } mock_provider @@ -472,7 +465,7 @@ mod tests { #[test] fn duplicate_namespace_task_queue_registration_fails() { - let manager = ClientWorkerSet::new(); + let manager = ClientWorkerSet::new(false); let worker1 = new_mock_provider_with_heartbeat( "test_namespace".to_string(), @@ -511,7 +504,7 @@ mod tests { #[test] fn multiple_workers_same_namespace_share_heartbeat_manager() { - let manager = ClientWorkerSet::new(); + let manager = ClientWorkerSet::new(false); let worker1 = new_mock_provider_with_heartbeat( "shared_namespace".to_string(), @@ -544,7 +537,7 @@ mod tests { #[test] fn different_namespaces_get_separate_heartbeat_managers() { - let manager = ClientWorkerSet::new(); + let manager = ClientWorkerSet::new(false); let worker1 = new_mock_provider_with_heartbeat( "namespace1".to_string(), "queue1".to_string(), @@ -572,7 +565,7 @@ mod tests { #[test] fn unregister_heartbeat_workers_cleans_up_shared_worker_when_last_removed() { - let manager = ClientWorkerSet::new(); + let manager = ClientWorkerSet::new(false); // Create two workers with same namespace but different task queues let worker1 = new_mock_provider_with_heartbeat( diff --git a/core-api/Cargo.toml b/core-api/Cargo.toml index ab2640dca..83042665a 100644 --- a/core-api/Cargo.toml +++ b/core-api/Cargo.toml @@ -31,6 +31,7 @@ tonic = { workspace = true } tracing = "0.1" tracing-core = "0.1" url = "2.5" +uuid = { version = "1.18.1", features = ["v4"] } [dependencies.temporal-sdk-core-protos] path = "../sdk-core-protos" diff --git a/core-api/src/lib.rs b/core-api/src/lib.rs index ca65ccae9..511c9383f 100644 --- a/core-api/src/lib.rs +++ b/core-api/src/lib.rs @@ -19,6 +19,7 @@ use temporal_sdk_core_protos::coresdk::{ workflow_activation::WorkflowActivation, workflow_completion::WorkflowActivationCompletion, }; +use uuid::Uuid; /// This trait is the primary way by which language specific SDKs interact with the core SDK. /// It represents one worker, which has a (potentially shared) client for connecting to the service @@ -138,6 +139,10 @@ pub trait Worker: Send + Sync { /// This should be called only after [Worker::shutdown] has resolved and/or both polling /// functions have returned `ShutDown` errors. async fn finalize_shutdown(self); + + /// Unique identifier for this worker instance. + /// This must be stable across the worker's lifetime and unique per instance. + fn worker_instance_key(&self) -> Uuid; } #[async_trait::async_trait] @@ -205,6 +210,10 @@ where async fn finalize_shutdown(self) { panic!("Can't finalize shutdown on Arc'd worker") } + + fn worker_instance_key(&self) -> Uuid { + (**self).worker_instance_key() + } } macro_rules! dbg_panic { diff --git a/core-api/src/telemetry/metrics.rs b/core-api/src/telemetry/metrics.rs index 407603f8d..bb276f9bd 100644 --- a/core-api/src/telemetry/metrics.rs +++ b/core-api/src/telemetry/metrics.rs @@ -1,4 +1,5 @@ use crate::dbg_panic; +use std::sync::atomic::{AtomicU64, Ordering}; use std::{ any::Any, borrow::Cow, @@ -26,6 +27,18 @@ pub trait CoreMeter: Send + Sync + Debug { attribs: NewAttributes, ) -> MetricAttributes; fn counter(&self, params: MetricParameters) -> Counter; + + /// Create a counter with in-memory tracking for worker heartbeating reporting + fn counter_with_in_memory( + &self, + params: MetricParameters, + in_memory_counter: HeartbeatMetricType, + ) -> Counter { + let primary_counter = self.counter(params); + + Counter::new_with_in_memory(primary_counter.primary.metric.clone(), in_memory_counter) + } + fn histogram(&self, params: MetricParameters) -> Histogram; fn histogram_f64(&self, params: MetricParameters) -> HistogramF64; /// Create a histogram which records Durations. Implementations should choose to emit in @@ -33,10 +46,217 @@ pub trait CoreMeter: Send + Sync + Debug { /// [MetricParameters::unit] should be overwritten by implementations to be `ms` or `s` /// accordingly. fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration; + + /// Create a histogram duration with in-memory tracking for worker heartbeating reporting + fn histogram_duration_with_in_memory( + &self, + params: MetricParameters, + in_memory_hist: HeartbeatMetricType, + ) -> HistogramDuration { + let primary_hist = self.histogram_duration(params); + + HistogramDuration::new_with_in_memory(primary_hist.primary.metric.clone(), in_memory_hist) + } fn gauge(&self, params: MetricParameters) -> Gauge; + + /// Create a gauge with in-memory tracking for worker heartbeating reporting + fn gauge_with_in_memory( + &self, + params: MetricParameters, + in_memory_metrics: HeartbeatMetricType, + ) -> Gauge { + let primary_gauge = self.gauge(params.clone()); + Gauge::new_with_in_memory(primary_gauge.primary.metric.clone(), in_memory_metrics) + } + fn gauge_f64(&self, params: MetricParameters) -> GaugeF64; } +/// Provides a generic way to record metrics in memory. +/// This can be done either with individual metrics or more fine-grained metrics +/// that vary by a set of labels for the same metric. +#[derive(Clone, Debug)] +pub enum HeartbeatMetricType { + Individual(Arc), + WithLabel { + label_key: String, + metrics: HashMap>, + }, +} + +impl HeartbeatMetricType { + fn record_counter(&self, delta: u64) { + match self { + HeartbeatMetricType::Individual(metric) => { + metric.fetch_add(delta, Ordering::Relaxed); + } + HeartbeatMetricType::WithLabel { .. } => { + dbg_panic!("Counter does not support in-memory metric with labels"); + } + } + } + + fn record_histogram_observation(&self) { + match self { + HeartbeatMetricType::Individual(metric) => { + metric.fetch_add(1, Ordering::Relaxed); + } + HeartbeatMetricType::WithLabel { .. } => { + dbg_panic!("Histogram does not support in-memory metric with labels"); + } + } + } + + fn record_gauge(&self, value: u64, attributes: &MetricAttributes) { + match self { + HeartbeatMetricType::Individual(metric) => { + metric.store(value, Ordering::Relaxed); + } + HeartbeatMetricType::WithLabel { label_key, metrics } => { + if let Some(metric) = label_value_from_attributes(attributes, label_key.as_str()) + .and_then(|label_value| metrics.get(label_value.as_str())) + { + metric.store(value, Ordering::Relaxed) + } + } + } + } +} + +fn label_value_from_attributes(attributes: &MetricAttributes, key: &str) -> Option { + match attributes { + MetricAttributes::Prometheus { labels } => labels.as_prom_labels().get(key).cloned(), + #[cfg(feature = "otel_impls")] + MetricAttributes::OTel { kvs } => kvs + .iter() + .find(|kv| kv.key.as_str() == key) + .map(|kv| kv.value.to_string()), + MetricAttributes::NoOp(labels) => labels.get(key).cloned(), + _ => None, + } +} + +#[derive(Default, Debug)] +pub struct NumPollersMetric { + pub wft_current_pollers: Arc, + pub sticky_wft_current_pollers: Arc, + pub activity_current_pollers: Arc, + pub nexus_current_pollers: Arc, +} + +impl NumPollersMetric { + pub fn as_map(&self) -> HashMap> { + HashMap::from([ + ( + "workflow_task".to_string(), + self.wft_current_pollers.clone(), + ), + ( + "sticky_workflow_task".to_string(), + self.sticky_wft_current_pollers.clone(), + ), + ( + "activity_task".to_string(), + self.activity_current_pollers.clone(), + ), + ("nexus_task".to_string(), self.nexus_current_pollers.clone()), + ]) + } +} + +#[derive(Default, Debug)] +pub struct SlotMetrics { + pub workflow_worker: Arc, + pub activity_worker: Arc, + pub nexus_worker: Arc, + pub local_activity_worker: Arc, +} + +impl SlotMetrics { + pub fn as_map(&self) -> HashMap> { + HashMap::from([ + ("WorkflowWorker".to_string(), self.workflow_worker.clone()), + ("ActivityWorker".to_string(), self.activity_worker.clone()), + ("NexusWorker".to_string(), self.nexus_worker.clone()), + ( + "LocalActivityWorker".to_string(), + self.local_activity_worker.clone(), + ), + ]) + } +} + +#[derive(Default, Debug)] +pub struct WorkerHeartbeatMetrics { + pub sticky_cache_size: Arc, + pub total_sticky_cache_hit: Arc, + pub total_sticky_cache_miss: Arc, + pub num_pollers: NumPollersMetric, + pub worker_task_slots_used: SlotMetrics, + pub worker_task_slots_available: SlotMetrics, + pub workflow_task_execution_failed: Arc, + pub activity_execution_failed: Arc, + pub nexus_task_execution_failed: Arc, + pub local_activity_execution_failed: Arc, + pub activity_execution_latency: Arc, + pub local_activity_execution_latency: Arc, + pub workflow_task_execution_latency: Arc, + pub nexus_task_execution_latency: Arc, +} + +impl WorkerHeartbeatMetrics { + pub fn get_metric(&self, name: &str) -> Option { + match name { + "sticky_cache_size" => Some(HeartbeatMetricType::Individual( + self.sticky_cache_size.clone(), + )), + "sticky_cache_hit" => Some(HeartbeatMetricType::Individual( + self.total_sticky_cache_hit.clone(), + )), + "sticky_cache_miss" => Some(HeartbeatMetricType::Individual( + self.total_sticky_cache_miss.clone(), + )), + "num_pollers" => Some(HeartbeatMetricType::WithLabel { + label_key: "poller_type".to_string(), + metrics: self.num_pollers.as_map(), + }), + "worker_task_slots_used" => Some(HeartbeatMetricType::WithLabel { + label_key: "worker_type".to_string(), + metrics: self.worker_task_slots_used.as_map(), + }), + "worker_task_slots_available" => Some(HeartbeatMetricType::WithLabel { + label_key: "worker_type".to_string(), + metrics: self.worker_task_slots_available.as_map(), + }), + "workflow_task_execution_failed" => Some(HeartbeatMetricType::Individual( + self.workflow_task_execution_failed.clone(), + )), + "activity_execution_failed" => Some(HeartbeatMetricType::Individual( + self.activity_execution_failed.clone(), + )), + "nexus_task_execution_failed" => Some(HeartbeatMetricType::Individual( + self.nexus_task_execution_failed.clone(), + )), + "local_activity_execution_failed" => Some(HeartbeatMetricType::Individual( + self.local_activity_execution_failed.clone(), + )), + "activity_execution_latency" => Some(HeartbeatMetricType::Individual( + self.activity_execution_latency.clone(), + )), + "local_activity_execution_latency" => Some(HeartbeatMetricType::Individual( + self.local_activity_execution_latency.clone(), + )), + "workflow_task_execution_latency" => Some(HeartbeatMetricType::Individual( + self.workflow_task_execution_latency.clone(), + )), + "nexus_task_execution_latency" => Some(HeartbeatMetricType::Individual( + self.nexus_task_execution_latency.clone(), + )), + _ => None, + } + } +} + #[derive(Debug, Clone, derive_builder::Builder)] pub struct MetricParameters { /// The name for the new metric/instrument @@ -124,6 +344,7 @@ pub enum MetricAttributes { }, Buffer(BufferAttributes), Dynamic(Arc), + NoOp(Arc>), Empty, } @@ -155,6 +376,16 @@ where } } +impl From for HashMap { + fn from(value: NewAttributes) -> Self { + value + .attributes + .into_iter() + .map(|kv| (kv.key, kv.value.to_string())) + .collect() + } +} + /// A K/V pair that can be used to label a specific recording of a metric #[derive(Clone, Debug, PartialEq)] pub struct MetricKeyValue { @@ -227,43 +458,79 @@ impl LazyBoundMetric { pub trait CounterBase: Send + Sync { fn adds(&self, value: u64); } -pub type Counter = LazyBoundMetric< + +pub type CounterImpl = LazyBoundMetric< Arc> + Send + Sync>, Arc, >; + +#[derive(Clone)] +pub struct Counter { + primary: CounterImpl, + in_memory: Option, +} impl Counter { pub fn new(inner: Arc> + Send + Sync>) -> Self { Self { - metric: inner, - attributes: MetricAttributes::Empty, - bound_cache: OnceLock::new(), + primary: LazyBoundMetric { + metric: inner, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + }, + in_memory: None, + } + } + + pub fn new_with_in_memory( + primary: Arc> + Send + Sync>, + in_memory: HeartbeatMetricType, + ) -> Self { + Self { + primary: LazyBoundMetric { + metric: primary, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + }, + in_memory: Some(in_memory), } } + pub fn add(&self, value: u64, attributes: &MetricAttributes) { - match self.metric.with_attributes(attributes) { - Ok(base) => { - base.adds(value); - } + match self.primary.metric.with_attributes(attributes) { + Ok(base) => base.adds(value), Err(e) => { - dbg_panic!("Failed to initialize metric, will drop values: {e:?}",); + dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}"); } } + + if let Some(ref in_mem) = self.in_memory { + in_mem.record_counter(value); + } + } + + pub fn update_attributes(&mut self, new_attributes: MetricAttributes) { + self.primary.update_attributes(new_attributes.clone()); } } impl CounterBase for Counter { fn adds(&self, value: u64) { // TODO: Replace all of these with below when stable // https://doc.rust-lang.org/std/sync/struct.OnceLock.html#method.get_or_try_init - let bound = self.bound_cache.get_or_init(|| { - self.metric - .with_attributes(&self.attributes) + let bound = self.primary.bound_cache.get_or_init(|| { + self.primary + .metric + .with_attributes(&self.primary.attributes) .map(Into::into) .unwrap_or_else(|e| { - dbg_panic!("Failed to initialize metric, will drop values: {e:?}"); + dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}"); Arc::new(NoOpInstrument) as Arc }) }); bound.adds(value); + + if let Some(ref in_mem) = self.in_memory { + in_mem.record_counter(value); + } } } impl MetricAttributable for Counter { @@ -271,10 +538,15 @@ impl MetricAttributable for Counter { &self, attributes: &MetricAttributes, ) -> Result> { - Ok(Self { - metric: self.metric.clone(), + let primary = LazyBoundMetric { + metric: self.primary.metric.clone(), attributes: attributes.clone(), bound_cache: OnceLock::new(), + }; + + Ok(Counter { + primary, + in_memory: self.in_memory.clone(), }) } } @@ -390,22 +662,45 @@ impl MetricAttributable for HistogramF64 { pub trait HistogramDurationBase: Send + Sync { fn records(&self, value: Duration); } -pub type HistogramDuration = LazyBoundMetric< + +pub type HistogramDurationImpl = LazyBoundMetric< Arc> + Send + Sync>, Arc, >; + +#[derive(Clone)] +pub struct HistogramDuration { + primary: HistogramDurationImpl, + in_memory: Option, +} impl HistogramDuration { pub fn new( inner: Arc> + Send + Sync>, ) -> Self { Self { - metric: inner, - attributes: MetricAttributes::Empty, - bound_cache: OnceLock::new(), + primary: LazyBoundMetric { + metric: inner, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + }, + in_memory: None, + } + } + pub fn new_with_in_memory( + primary: Arc> + Send + Sync>, + in_memory: HeartbeatMetricType, + ) -> Self { + Self { + primary: LazyBoundMetric { + metric: primary, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + }, + in_memory: Some(in_memory), } } pub fn record(&self, value: Duration, attributes: &MetricAttributes) { - match self.metric.with_attributes(attributes) { + match self.primary.metric.with_attributes(attributes) { Ok(base) => { base.records(value); } @@ -413,13 +708,22 @@ impl HistogramDuration { dbg_panic!("Failed to initialize metric, will drop values: {e:?}",); } } + + if let Some(ref in_mem) = self.in_memory { + in_mem.record_histogram_observation(); + } + } + + pub fn update_attributes(&mut self, new_attributes: MetricAttributes) { + self.primary.update_attributes(new_attributes.clone()); } } impl HistogramDurationBase for HistogramDuration { fn records(&self, value: Duration) { - let bound = self.bound_cache.get_or_init(|| { - self.metric - .with_attributes(&self.attributes) + let bound = self.primary.bound_cache.get_or_init(|| { + self.primary + .metric + .with_attributes(&self.primary.attributes) .map(Into::into) .unwrap_or_else(|e| { dbg_panic!("Failed to initialize metric, will drop values: {e:?}"); @@ -427,6 +731,10 @@ impl HistogramDurationBase for HistogramDuration { }) }); bound.records(value); + + if let Some(ref in_mem) = self.in_memory { + in_mem.record_histogram_observation(); + } } } impl MetricAttributable for HistogramDuration { @@ -434,10 +742,15 @@ impl MetricAttributable for HistogramDuration { &self, attributes: &MetricAttributes, ) -> Result> { - Ok(Self { - metric: self.metric.clone(), + let primary = LazyBoundMetric { + metric: self.primary.metric.clone(), attributes: attributes.clone(), bound_cache: OnceLock::new(), + }; + + Ok(HistogramDuration { + primary, + in_memory: self.in_memory.clone(), }) } } @@ -445,41 +758,77 @@ impl MetricAttributable for HistogramDuration { pub trait GaugeBase: Send + Sync { fn records(&self, value: u64); } -pub type Gauge = LazyBoundMetric< + +pub type GaugeImpl = LazyBoundMetric< Arc> + Send + Sync>, Arc, >; + +#[derive(Clone)] +pub struct Gauge { + primary: GaugeImpl, + in_memory: Option, +} impl Gauge { pub fn new(inner: Arc> + Send + Sync>) -> Self { Self { - metric: inner, - attributes: MetricAttributes::Empty, - bound_cache: OnceLock::new(), + primary: LazyBoundMetric { + metric: inner, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + }, + in_memory: None, + } + } + + pub fn new_with_in_memory( + primary: Arc> + Send + Sync>, + in_memory: HeartbeatMetricType, + ) -> Self { + Self { + primary: LazyBoundMetric { + metric: primary, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + }, + in_memory: Some(in_memory), } } + pub fn record(&self, value: u64, attributes: &MetricAttributes) { - match self.metric.with_attributes(attributes) { - Ok(base) => { - base.records(value); - } + match self.primary.metric.with_attributes(attributes) { + Ok(base) => base.records(value), Err(e) => { - dbg_panic!("Failed to initialize metric, will drop values: {e:?}",); + dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}"); } } + + if let Some(ref in_mem) = self.in_memory { + in_mem.record_gauge(value, attributes); + } + } + + pub fn update_attributes(&mut self, new_attributes: MetricAttributes) { + self.primary.update_attributes(new_attributes.clone()); } } impl GaugeBase for Gauge { fn records(&self, value: u64) { - let bound = self.bound_cache.get_or_init(|| { - self.metric - .with_attributes(&self.attributes) + let bound = self.primary.bound_cache.get_or_init(|| { + self.primary + .metric + .with_attributes(&self.primary.attributes) .map(Into::into) .unwrap_or_else(|e| { - dbg_panic!("Failed to initialize metric, will drop values: {e:?}"); + dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}"); Arc::new(NoOpInstrument) as Arc }) }); bound.records(value); + + if let Some(ref in_mem) = self.in_memory { + in_mem.record_gauge(value, &self.primary.attributes); + } } } impl MetricAttributable for Gauge { @@ -487,10 +836,15 @@ impl MetricAttributable for Gauge { &self, attributes: &MetricAttributes, ) -> Result> { - Ok(Self { - metric: self.metric.clone(), + let primary = LazyBoundMetric { + metric: self.primary.metric.clone(), attributes: attributes.clone(), bound_cache: OnceLock::new(), + }; + + Ok(Gauge { + primary, + in_memory: self.in_memory.clone(), }) } } @@ -633,12 +987,23 @@ impl LazyRef { #[derive(Debug)] pub struct NoOpCoreMeter; impl CoreMeter for NoOpCoreMeter { - fn new_attributes(&self, _: NewAttributes) -> MetricAttributes { - MetricAttributes::Dynamic(Arc::new(NoOpAttributes)) + fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes { + MetricAttributes::NoOp(Arc::new(attribs.into())) } - fn extend_attributes(&self, existing: MetricAttributes, _: NewAttributes) -> MetricAttributes { - existing + fn extend_attributes( + &self, + existing: MetricAttributes, + attribs: NewAttributes, + ) -> MetricAttributes { + if let MetricAttributes::NoOp(labels) = existing { + let mut labels = (*labels).clone(); + labels.extend::>(attribs.into()); + MetricAttributes::NoOp(Arc::new(labels)) + } else { + dbg_panic!("Must use NoOp attributes with a NoOp metric implementation"); + existing + } } fn counter(&self, _: MetricParameters) -> Counter { @@ -701,11 +1066,41 @@ impl_no_op!(HistogramDurationBase, Duration); impl_no_op!(GaugeBase, u64); impl_no_op!(GaugeF64Base, f64); -#[derive(Debug, Clone)] -pub struct NoOpAttributes; -impl CustomMetricAttributes for NoOpAttributes { - fn as_any(self: Arc) -> Arc { - self as Arc +#[cfg(test)] +mod tests { + use super::*; + use std::{ + collections::HashMap, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + }; + + #[test] + fn in_memory_attributes_provide_label_values() { + let meter = NoOpCoreMeter; + let base_attrs = meter.new_attributes(NewAttributes::default()); + let attrs = meter.extend_attributes( + base_attrs, + NewAttributes::from(vec![MetricKeyValue::new("poller_type", "workflow_task")]), + ); + + let value = Arc::new(AtomicU64::new(0)); + let mut metrics = HashMap::new(); + metrics.insert("workflow_task".to_string(), value.clone()); + let heartbeat_metric = HeartbeatMetricType::WithLabel { + label_key: "poller_type".to_string(), + metrics, + }; + + heartbeat_metric.record_gauge(3, &attrs); + + assert_eq!(value.load(Ordering::Relaxed), 3); + assert_eq!( + label_value_from_attributes(&attrs, "poller_type").as_deref(), + Some("workflow_task") + ); } } diff --git a/core-api/src/worker.rs b/core-api/src/worker.rs index d92efeec0..8fffb3eb0 100644 --- a/core-api/src/worker.rs +++ b/core-api/src/worker.rs @@ -11,6 +11,7 @@ use temporal_sdk_core_protos::{ coresdk::{ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo}, temporal, temporal::api::enums::v1::VersioningBehavior, + temporal::api::worker::v1::PluginInfo, }; /// Defines per-worker configuration options @@ -161,6 +162,10 @@ pub struct WorkerConfig { /// A versioning strategy for this worker. pub versioning_strategy: WorkerVersioningStrategy, + + /// List of plugins used by lang + #[builder(default)] + pub plugins: Vec, } impl WorkerConfig { @@ -357,6 +362,12 @@ pub trait SlotSupplier { fn available_slots(&self) -> Option { None } + + /// Returns a human-friendly identifier describing this supplier implementation for + /// diagnostics and telemetry. + fn slot_supplier_kind(&self) -> String { + "Custom".to_string() + } } pub trait SlotReservationContext: Send + Sync { diff --git a/core/src/abstractions.rs b/core/src/abstractions.rs index d4b86cb35..0d5a53206 100644 --- a/core/src/abstractions.rs +++ b/core/src/abstractions.rs @@ -25,6 +25,7 @@ use tokio_util::sync::CancellationToken; #[derive(Clone)] pub(crate) struct MeteredPermitDealer { supplier: Arc + Send + Sync>, + slot_supplier_kind: SlotSupplierKind, /// The number of permit owners who have acquired a permit, but are not yet meaningfully using /// that permit. This is useful for giving a more semantically accurate count of used task /// slots, since we typically wait for a permit first before polling, but that slot isn't used @@ -54,6 +55,35 @@ pub(crate) struct PermitDealerContextData { pub(crate) worker_deployment_version: Option, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum SlotSupplierKind { + Fixed, + ResourceBased, + Custom(String), +} + +impl SlotSupplierKind { + fn from_label(label: &str) -> Self { + if label == "Fixed" { + SlotSupplierKind::Fixed + } else if label == "ResourceBased" { + SlotSupplierKind::ResourceBased + } else { + SlotSupplierKind::Custom(label.to_string()) + } + } +} + +impl std::fmt::Display for SlotSupplierKind { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + SlotSupplierKind::Fixed => f.write_str("Fixed"), + SlotSupplierKind::ResourceBased => f.write_str("ResourceBased"), + SlotSupplierKind::Custom(name) => f.write_str(name.as_str()), + } + } +} + impl MeteredPermitDealer where SK: SlotKind + 'static, @@ -65,8 +95,11 @@ where context_data: Arc, meter: Option, ) -> Self { + let supplier_kind_label = supplier.slot_supplier_kind(); + let slot_supplier_kind = SlotSupplierKind::from_label(supplier_kind_label.as_ref()); Self { supplier, + slot_supplier_kind, unused_claimants: Arc::new(AtomicUsize::new(0)), extant_permits: watch::channel(0), metrics_ctx, @@ -81,6 +114,10 @@ where self.supplier.available_slots() } + pub(crate) fn slot_supplier_kind(&self) -> &SlotSupplierKind { + &self.slot_supplier_kind + } + #[cfg(test)] pub(crate) fn unused_permits(&self) -> Option { self.available_permits() @@ -492,4 +529,10 @@ pub(crate) mod tests { // Now it'll proceed acquire_fut.await; } + + #[test] + fn captures_slot_supplier_kind() { + let dealer = fixed_size_permit_dealer::(1); + assert_eq!(*dealer.slot_supplier_kind(), SlotSupplierKind::Fixed); + } } diff --git a/core/src/core_tests/workers.rs b/core/src/core_tests/workers.rs index f5288a442..68314c082 100644 --- a/core/src/core_tests/workers.rs +++ b/core/src/core_tests/workers.rs @@ -321,12 +321,12 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool) if api_success { mock.expect_shutdown_worker() .times(1) - .returning(|_| Ok(ShutdownWorkerResponse {})); + .returning(|_, _| Ok(ShutdownWorkerResponse {})); } else { // worker.shutdown() should succeed even if shutdown_worker fails mock.expect_shutdown_worker() .times(1) - .returning(|_| Err(tonic::Status::unavailable("fake shutdown error"))); + .returning(|_, _| Err(tonic::Status::unavailable("fake shutdown error"))); } } else { mock.expect_shutdown_worker().times(0); diff --git a/core/src/core_tests/workflow_tasks.rs b/core/src/core_tests/workflow_tasks.rs index e5550938b..8d5df2b7a 100644 --- a/core/src/core_tests/workflow_tasks.rs +++ b/core/src/core_tests/workflow_tasks.rs @@ -2996,7 +2996,6 @@ async fn both_normal_and_sticky_pollers_poll_concurrently() { Arc::new(mock_client), None, None, - false, ) .unwrap(); diff --git a/core/src/lib.rs b/core/src/lib.rs index 3995b7562..a73eb43ad 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -41,10 +41,9 @@ pub use temporal_sdk_core_protos as protos; pub use temporal_sdk_core_protos::TaskToken; pub use url::Url; pub use worker::{ - FixedSizeSlotSupplier, RealSysInfo, ResourceBasedSlotsOptions, - ResourceBasedSlotsOptionsBuilder, ResourceBasedTuner, ResourceSlotOptions, SlotSupplierOptions, - TunerBuilder, TunerHolder, TunerHolderOptions, TunerHolderOptionsBuilder, Worker, WorkerConfig, - WorkerConfigBuilder, + FixedSizeSlotSupplier, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, + ResourceBasedTuner, ResourceSlotOptions, SlotSupplierOptions, TunerBuilder, TunerHolder, + TunerHolderOptions, TunerHolderOptionsBuilder, Worker, WorkerConfig, WorkerConfigBuilder, }; /// Expose [WorkerClient] symbols @@ -123,7 +122,6 @@ where client_bag.clone(), Some(&runtime.telemetry), runtime.heartbeat_interval, - false, ) } diff --git a/core/src/pollers/poll_buffer.rs b/core/src/pollers/poll_buffer.rs index 7bc4311fb..786b195e4 100644 --- a/core/src/pollers/poll_buffer.rs +++ b/core/src/pollers/poll_buffer.rs @@ -6,8 +6,10 @@ use crate::{ client::{PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient}, }, }; +use crossbeam_utils::atomic::AtomicCell; use futures_util::{FutureExt, StreamExt, future::BoxFuture}; use governor::{Quota, RateLimiter}; +use std::time::SystemTime; use std::{ cmp, fmt::Debug, @@ -74,9 +76,15 @@ impl LongPollBuffer { shutdown: CancellationToken, num_pollers_handler: Option, options: WorkflowTaskOptions, + last_successful_poll_time: Arc>>, ) -> Self { let is_sticky = sticky_queue.is_some(); - let poll_scaler = PollScaler::new(poller_behavior, num_pollers_handler, shutdown.clone()); + let poll_scaler = PollScaler::new( + poller_behavior, + num_pollers_handler, + shutdown.clone(), + last_successful_poll_time, + ); if let Some(wftps) = options.wft_poller_shared.as_ref() { if is_sticky { wftps.set_sticky_active(poll_scaler.active_rx.clone()); @@ -136,6 +144,7 @@ impl LongPollBuffer { } impl LongPollBuffer { + #[allow(clippy::too_many_arguments)] pub(crate) fn new_activity_task( client: Arc, task_queue: String, @@ -144,6 +153,7 @@ impl LongPollBuffer { shutdown: CancellationToken, num_pollers_handler: Option, options: ActivityTaskOptions, + last_successful_poll_time: Arc>>, ) -> Self { let pre_permit_delay = options .max_worker_acts_per_second @@ -183,7 +193,12 @@ impl LongPollBuffer { } }; - let poll_scaler = PollScaler::new(poller_behavior, num_pollers_handler, shutdown.clone()); + let poll_scaler = PollScaler::new( + poller_behavior, + num_pollers_handler, + shutdown.clone(), + last_successful_poll_time, + ); Self::new( poll_fn, permit_dealer, @@ -196,6 +211,7 @@ impl LongPollBuffer { } impl LongPollBuffer { + #[allow(clippy::too_many_arguments)] pub(crate) fn new_nexus_task( client: Arc, task_queue: String, @@ -203,6 +219,7 @@ impl LongPollBuffer { permit_dealer: MeteredPermitDealer, shutdown: CancellationToken, num_pollers_handler: Option, + last_successful_poll_time: Arc>>, send_heartbeat: bool, ) -> Self { let no_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) { @@ -232,7 +249,12 @@ impl LongPollBuffer { poll_fn, permit_dealer, shutdown.clone(), - PollScaler::new(poller_behavior, num_pollers_handler, shutdown), + PollScaler::new( + poller_behavior, + num_pollers_handler, + shutdown, + last_successful_poll_time, + ), None:: BoxFuture<'static, ()>>, None::, ) @@ -417,6 +439,7 @@ where behavior: PollerBehavior, num_pollers_handler: Option, shutdown: CancellationToken, + last_successful_poll_time: Arc>>, ) -> Self { let (active_tx, active_rx) = watch::channel(0); let num_pollers_handler = num_pollers_handler.map(Arc::new); @@ -437,6 +460,7 @@ where ingested_this_period: Default::default(), ingested_last_period: Default::default(), scale_up_allowed: AtomicBool::new(true), + last_successful_poll_time, }); let rhc = report_handle.clone(); let ingestor_task = if behavior.is_autoscaling() { @@ -499,6 +523,7 @@ struct PollScalerReportHandle { ingested_this_period: AtomicUsize, ingested_last_period: AtomicUsize, scale_up_allowed: AtomicBool, + last_successful_poll_time: Arc>>, } impl PollScalerReportHandle { @@ -506,6 +531,8 @@ impl PollScalerReportHandle { fn poll_result(&self, res: &Result) -> bool { match res { Ok(res) => { + self.last_successful_poll_time + .store(Some(SystemTime::now())); if let PollerBehavior::SimpleMaximum(_) = self.behavior { // We don't do auto-scaling with the simple max return true; @@ -739,6 +766,7 @@ mod tests { WorkflowTaskOptions { wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))), }, + Arc::new(AtomicCell::new(None)), ); // Poll a bunch of times, "interrupting" it each time, we should only actually have polled @@ -794,6 +822,7 @@ mod tests { WorkflowTaskOptions { wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(1)))), }, + Arc::new(AtomicCell::new(None)), ); // Should not see error, unwraps should get empty response diff --git a/core/src/replay/mod.rs b/core/src/replay/mod.rs index 1e4990000..03f0003be 100644 --- a/core/src/replay/mod.rs +++ b/core/src/replay/mod.rs @@ -114,7 +114,7 @@ where hist_allow_tx.send("Failed".to_string()).unwrap(); async move { Ok(RespondWorkflowTaskFailedResponse::default()) }.boxed() }); - let mut worker = Worker::new(self.config, None, Arc::new(client), None, None, false)?; + let mut worker = Worker::new(self.config, None, Arc::new(client), None, None)?; worker.set_post_activate_hook(post_activate); shutdown_tok(worker.shutdown_token()); Ok(worker) diff --git a/core/src/telemetry/metrics.rs b/core/src/telemetry/metrics.rs index fe6aec8e1..d39bf02b0 100644 --- a/core/src/telemetry/metrics.rs +++ b/core/src/telemetry/metrics.rs @@ -13,7 +13,7 @@ use temporal_sdk_core_api::telemetry::metrics::{ GaugeF64, GaugeF64Base, Histogram, HistogramBase, HistogramDuration, HistogramDurationBase, HistogramF64, HistogramF64Base, LazyBufferInstrument, MetricAttributable, MetricAttributes, MetricCallBufferer, MetricEvent, MetricKeyValue, MetricKind, MetricParameters, MetricUpdateVal, - NewAttributes, NoOpCoreMeter, TemporalMeter, + NewAttributes, NoOpCoreMeter, TemporalMeter, WorkerHeartbeatMetrics, }; use temporal_sdk_core_protos::temporal::api::{ enums::v1::WorkflowTaskFailedCause, failure::v1::Failure, @@ -25,6 +25,7 @@ pub(crate) struct MetricsContext { meter: Arc, kvs: MetricAttributes, instruments: Arc, + in_memory_metrics: Option>, } #[derive(Clone)] @@ -70,11 +71,13 @@ impl MetricsContext { pub(crate) fn no_op() -> Self { let meter = Arc::new(NoOpCoreMeter); let kvs = meter.new_attributes(Default::default()); - let instruments = Arc::new(Instruments::new(meter.as_ref())); + let in_memory_metrics = Some(Arc::new(WorkerHeartbeatMetrics::default())); + let instruments = Arc::new(Instruments::new(meter.as_ref(), in_memory_metrics.clone())); Self { kvs, instruments, meter, + in_memory_metrics, } } @@ -95,12 +98,14 @@ impl MetricsContext { .push(MetricKeyValue::new(KEY_NAMESPACE, namespace)); meter.default_attribs.attributes.push(task_queue(tq)); let kvs = meter.inner.new_attributes(meter.default_attribs); - let mut instruments = Instruments::new(meter.inner.as_ref()); + let in_memory_metrics = Some(Arc::new(WorkerHeartbeatMetrics::default())); + let mut instruments = Instruments::new(meter.inner.as_ref(), in_memory_metrics.clone()); instruments.update_attributes(&kvs); Self { kvs, instruments: Arc::new(instruments), meter: meter.inner, + in_memory_metrics, } } else { Self::no_op() @@ -121,9 +126,14 @@ impl MetricsContext { instruments: Arc::new(instruments), kvs, meter: self.meter.clone(), + in_memory_metrics: self.in_memory_metrics.clone(), } } + pub(crate) fn in_memory_meter(&self) -> Option> { + self.in_memory_metrics.clone() + } + /// A workflow task queue poll succeeded pub(crate) fn wf_tq_poll_ok(&self) { self.instruments.wf_task_queue_poll_succeed_counter.adds(1); @@ -299,7 +309,31 @@ impl MetricsContext { } impl Instruments { - fn new(meter: &dyn CoreMeter) -> Self { + fn new(meter: &dyn CoreMeter, in_memory: Option>) -> Self { + let counter_with_in_mem = |params: MetricParameters| -> Counter { + in_memory + .clone() + .and_then(|in_mem| in_mem.get_metric(¶ms.name)) + .map(|metric| meter.counter_with_in_memory(params.clone(), metric)) + .unwrap_or_else(|| meter.counter(params)) + }; + + let gauge_with_in_mem = |params: MetricParameters| -> Gauge { + in_memory + .clone() + .and_then(|in_mem| in_mem.get_metric(¶ms.name)) + .map(|metric| meter.gauge_with_in_memory(params.clone(), metric)) + .unwrap_or_else(|| meter.gauge(params)) + }; + + let histogram_with_in_mem = |params: MetricParameters| -> HistogramDuration { + in_memory + .clone() + .and_then(|in_mem| in_mem.get_metric(¶ms.name)) + .map(|metric| meter.histogram_duration_with_in_memory(params.clone(), metric)) + .unwrap_or_else(|| meter.histogram_duration(params)) + }; + Self { wf_completed_counter: meter.counter(MetricParameters { name: "workflow_completed".into(), @@ -331,12 +365,12 @@ impl Instruments { description: "Count of workflow task queue poll timeouts (no new task)".into(), unit: "".into(), }), - wf_task_queue_poll_succeed_counter: meter.counter(MetricParameters { + wf_task_queue_poll_succeed_counter: counter_with_in_mem(MetricParameters { name: "workflow_task_queue_poll_succeed".into(), description: "Count of workflow task queue poll successes".into(), unit: "".into(), }), - wf_task_execution_failure_counter: meter.counter(MetricParameters { + wf_task_execution_failure_counter: counter_with_in_mem(MetricParameters { name: "workflow_task_execution_failed".into(), description: "Count of workflow task execution failures".into(), unit: "".into(), @@ -351,7 +385,7 @@ impl Instruments { unit: "duration".into(), description: "Histogram of workflow task replay latencies".into(), }), - wf_task_execution_latency: meter.histogram_duration(MetricParameters { + wf_task_execution_latency: histogram_with_in_mem(MetricParameters { name: WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME.into(), unit: "duration".into(), description: "Histogram of workflow task execution (not replay) latencies".into(), @@ -361,12 +395,12 @@ impl Instruments { description: "Count of activity task queue poll timeouts (no new task)".into(), unit: "".into(), }), - act_task_received_counter: meter.counter(MetricParameters { + act_task_received_counter: counter_with_in_mem(MetricParameters { name: "activity_task_received".into(), description: "Count of activity task queue poll successes".into(), unit: "".into(), }), - act_execution_failed: meter.counter(MetricParameters { + act_execution_failed: counter_with_in_mem(MetricParameters { name: "activity_execution_failed".into(), description: "Count of activity task execution failures".into(), unit: "".into(), @@ -376,7 +410,7 @@ impl Instruments { unit: "duration".into(), description: "Histogram of activity schedule-to-start latencies".into(), }), - act_exec_latency: meter.histogram_duration(MetricParameters { + act_exec_latency: histogram_with_in_mem(MetricParameters { name: ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME.into(), unit: "duration".into(), description: "Histogram of activity execution latencies".into(), @@ -397,7 +431,7 @@ impl Instruments { description: "Count of local activity executions that failed".into(), unit: "".into(), }), - la_exec_latency: meter.histogram_duration(MetricParameters { + la_exec_latency: histogram_with_in_mem(MetricParameters { name: "local_activity_execution_latency".into(), unit: "duration".into(), description: "Histogram of local activity execution latencies".into(), @@ -409,7 +443,7 @@ impl Instruments { "Histogram of local activity execution latencies for successful local activities" .into(), }), - la_total: meter.counter(MetricParameters { + la_total: counter_with_in_mem(MetricParameters { name: "local_activity_total".into(), description: "Count of local activities executed".into(), unit: "".into(), @@ -429,12 +463,12 @@ impl Instruments { unit: "duration".into(), description: "Histogram of nexus task end-to-end latencies".into(), }), - nexus_task_execution_latency: meter.histogram_duration(MetricParameters { + nexus_task_execution_latency: histogram_with_in_mem(MetricParameters { name: "nexus_task_execution_latency".into(), unit: "duration".into(), description: "Histogram of nexus task execution latencies".into(), }), - nexus_task_execution_failed: meter.counter(MetricParameters { + nexus_task_execution_failed: counter_with_in_mem(MetricParameters { name: "nexus_task_execution_failed".into(), description: "Count of nexus task execution failures".into(), unit: "".into(), @@ -445,35 +479,34 @@ impl Instruments { description: "Count of the number of initialized workers".into(), unit: "".into(), }), - num_pollers: meter.gauge(MetricParameters { + num_pollers: gauge_with_in_mem(MetricParameters { name: NUM_POLLERS_NAME.into(), description: "Current number of active pollers per queue type".into(), unit: "".into(), }), - task_slots_available: meter.gauge(MetricParameters { + task_slots_available: gauge_with_in_mem(MetricParameters { name: TASK_SLOTS_AVAILABLE_NAME.into(), description: "Current number of available slots per task type".into(), unit: "".into(), }), - task_slots_used: meter.gauge(MetricParameters { + task_slots_used: gauge_with_in_mem(MetricParameters { name: TASK_SLOTS_USED_NAME.into(), description: "Current number of used slots per task type".into(), unit: "".into(), }), - sticky_cache_hit: meter.counter(MetricParameters { + sticky_cache_hit: counter_with_in_mem(MetricParameters { name: "sticky_cache_hit".into(), description: "Count of times the workflow cache was used for a new workflow task" .into(), unit: "".into(), }), - sticky_cache_miss: meter.counter(MetricParameters { + sticky_cache_miss: counter_with_in_mem(MetricParameters { name: "sticky_cache_miss".into(), description: - "Count of times the workflow cache was missing a workflow for a sticky task" - .into(), + "Count of times the workflow cache was missing a workflow for a sticky task".into(), unit: "".into(), }), - sticky_cache_size: meter.gauge(MetricParameters { + sticky_cache_size: gauge_with_in_mem(MetricParameters { name: STICKY_CACHE_SIZE_NAME.into(), description: "Current number of cached workflows".into(), unit: "".into(), diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index 4f4536938..94457f697 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -39,6 +39,7 @@ use std::{ atomic::{AtomicBool, Ordering}, }, }; +pub(crate) use temporal_sdk_core_api::telemetry::metrics::WorkerHeartbeatMetrics; use temporal_sdk_core_api::telemetry::{ CoreLog, CoreTelemetry, Logger, TelemetryOptions, TelemetryOptionsBuilder, metrics::{CoreMeter, MetricKeyValue, NewAttributes, TemporalMeter}, diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index 410e63a83..50ecddd39 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -156,6 +156,7 @@ pub fn build_otlp_metric_exporter( opts.histogram_bucket_overrides, )? .build(); + Ok::<_, anyhow::Error>(CoreOtelMeter { meter: mp.meter(TELEM_SERVICE_NAME), use_seconds_for_durations: opts.use_seconds_for_durations, diff --git a/core/src/worker/activities.rs b/core/src/worker/activities.rs index 505d5c840..a4edb4f2c 100644 --- a/core/src/worker/activities.rs +++ b/core/src/worker/activities.rs @@ -728,6 +728,7 @@ mod tests { prost_dur, worker::client::mocks::mock_worker_client, }; + use crossbeam_utils::atomic::AtomicCell; use temporal_sdk_core_api::worker::PollerBehavior; use temporal_sdk_core_protos::coresdk::activity_result::ActivityExecutionResult; @@ -773,6 +774,7 @@ mod tests { max_worker_acts_per_second: Some(2.0), max_tps: None, }, + Arc::new(AtomicCell::new(None)), ); let atm = WorkerActivityTasks::new( sem.clone(), @@ -864,6 +866,7 @@ mod tests { max_worker_acts_per_second: None, max_tps: None, }, + Arc::new(AtomicCell::new(None)), ); let atm = WorkerActivityTasks::new( sem.clone(), @@ -937,6 +940,7 @@ mod tests { max_worker_acts_per_second: None, max_tps: None, }, + Arc::new(AtomicCell::new(None)), ); let atm = WorkerActivityTasks::new( sem.clone(), diff --git a/core/src/worker/client.rs b/core/src/worker/client.rs index 5d773330e..65250fc58 100644 --- a/core/src/worker/client.rs +++ b/core/src/worker/client.rs @@ -2,13 +2,18 @@ pub(crate) mod mocks; use crate::protosext::legacy_query_failure; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; +use prost_types::Duration as PbDuration; +use std::collections::HashMap; +use std::time::SystemTime; use std::{sync::Arc, time::Duration}; use temporal_client::{ Client, ClientWorkerSet, IsWorkerTaskLongPoll, Namespace, NamespacedClient, NoRetryOnMatching, RetryClient, WorkflowService, }; use temporal_sdk_core_api::worker::WorkerVersioningStrategy; +use temporal_sdk_core_protos::temporal::api::enums::v1::WorkerStatus; +use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerSlotsInfo; use temporal_sdk_core_protos::{ TaskToken, coresdk::{workflow_commands::QueryResult, workflow_completion}, @@ -48,6 +53,7 @@ pub(crate) struct WorkerClientBag { namespace: String, identity: String, worker_versioning_strategy: WorkerVersioningStrategy, + worker_heartbeat_map: Arc>>, } impl WorkerClientBag { @@ -62,6 +68,7 @@ impl WorkerClientBag { namespace, identity, worker_versioning_strategy, + worker_heartbeat_map: Arc::new(Mutex::new(HashMap::new())), } } @@ -211,7 +218,11 @@ pub trait WorkerClient: Sync + Send { /// Describe the namespace async fn describe_namespace(&self) -> Result; /// Shutdown the worker - async fn shutdown_worker(&self, sticky_task_queue: String) -> Result; + async fn shutdown_worker( + &self, + sticky_task_queue: String, + final_heartbeat: Option, + ) -> Result; /// Record a worker heartbeat async fn record_worker_heartbeat( &self, @@ -233,6 +244,9 @@ pub trait WorkerClient: Sync + Send { fn identity(&self) -> String; /// Get worker grouping key fn worker_grouping_key(&self) -> Uuid; + /// Sets the client-reliant fields for WorkerHeartbeat. This also updates client-level tracking + /// of heartbeat fields, like last heartbeat timestamp. + fn set_heartbeat_client_fields(&self, heartbeat: &mut WorkerHeartbeat); } /// Configuration options shared by workflow, activity, and Nexus polling calls @@ -640,13 +654,22 @@ impl WorkerClient for WorkerClientBag { .into_inner()) } - async fn shutdown_worker(&self, sticky_task_queue: String) -> Result { + async fn shutdown_worker( + &self, + sticky_task_queue: String, + final_heartbeat: Option, + ) -> Result { + let mut final_heartbeat = final_heartbeat; + if let Some(w) = final_heartbeat.as_mut() { + w.status = WorkerStatus::Shutdown.into(); + self.set_heartbeat_client_fields(w); + } let request = ShutdownWorkerRequest { namespace: self.namespace.clone(), identity: self.identity.clone(), sticky_task_queue, reason: "graceful shutdown".to_string(), - worker_heartbeat: None, + worker_heartbeat: final_heartbeat, }; Ok( @@ -708,6 +731,67 @@ impl WorkerClient for WorkerClientBag { .get_client() .worker_grouping_key() } + + fn set_heartbeat_client_fields(&self, heartbeat: &mut WorkerHeartbeat) { + if let Some(host_info) = heartbeat.host_info.as_mut() { + host_info.process_key = self.worker_grouping_key().to_string(); + } + heartbeat.worker_identity = self.identity(); + let sdk_name_and_ver = self.sdk_name_and_version(); + heartbeat.sdk_name = sdk_name_and_ver.0; + heartbeat.sdk_version = sdk_name_and_ver.1; + + let now = SystemTime::now(); + heartbeat.heartbeat_time = Some(now.into()); + let mut heartbeat_map = self.worker_heartbeat_map.lock(); + let client_heartbeat_data = heartbeat_map + .entry(heartbeat.worker_instance_key.clone()) + .or_default(); + let elapsed_since_last_heartbeat = + client_heartbeat_data.last_heartbeat_time.map(|hb_time| { + let dur = now.duration_since(hb_time).unwrap_or(Duration::ZERO); + PbDuration { + seconds: dur.as_secs() as i64, + nanos: dur.subsec_nanos() as i32, + } + }); + heartbeat.elapsed_since_last_heartbeat = elapsed_since_last_heartbeat; + client_heartbeat_data.last_heartbeat_time = Some(now); + + if let Some(wft_slot_info) = heartbeat.workflow_task_slots_info.as_mut() { + wft_slot_info.last_interval_processed_tasks = wft_slot_info.total_processed_tasks + - client_heartbeat_data + .workflow_task_slots_info + .total_processed_tasks; + wft_slot_info.last_interval_failure_tasks = wft_slot_info.total_failed_tasks + - client_heartbeat_data + .workflow_task_slots_info + .total_failed_tasks; + + client_heartbeat_data + .workflow_task_slots_info + .total_processed_tasks = wft_slot_info.total_processed_tasks; + client_heartbeat_data + .workflow_task_slots_info + .total_failed_tasks = wft_slot_info.total_failed_tasks; + } + update_slots( + &mut heartbeat.workflow_task_slots_info, + &mut client_heartbeat_data.workflow_task_slots_info, + ); + update_slots( + &mut heartbeat.activity_task_slots_info, + &mut client_heartbeat_data.activity_task_slots_info, + ); + update_slots( + &mut heartbeat.nexus_task_slots_info, + &mut client_heartbeat_data.nexus_task_slots_info, + ); + update_slots( + &mut heartbeat.local_activity_slots_info, + &mut client_heartbeat_data.local_activity_slots_info, + ); + } } impl NamespacedClient for WorkerClientBag { @@ -745,3 +829,31 @@ pub struct WorkflowTaskCompletion { /// Versioning behavior of the workflow, if any. pub versioning_behavior: VersioningBehavior, } + +#[derive(Clone, Default)] +struct SlotsInfo { + total_processed_tasks: i32, + total_failed_tasks: i32, +} + +#[derive(Clone, Default)] +struct ClientHeartbeatData { + last_heartbeat_time: Option, + + workflow_task_slots_info: SlotsInfo, + activity_task_slots_info: SlotsInfo, + nexus_task_slots_info: SlotsInfo, + local_activity_slots_info: SlotsInfo, +} + +fn update_slots(slots_info: &mut Option, client_heartbeat_data: &mut SlotsInfo) { + if let Some(wft_slot_info) = slots_info.as_mut() { + wft_slot_info.last_interval_processed_tasks = + wft_slot_info.total_processed_tasks - client_heartbeat_data.total_processed_tasks; + wft_slot_info.last_interval_failure_tasks = + wft_slot_info.total_failed_tasks - client_heartbeat_data.total_failed_tasks; + + client_heartbeat_data.total_processed_tasks = wft_slot_info.total_processed_tasks; + client_heartbeat_data.total_failed_tasks = wft_slot_info.total_failed_tasks; + } +} diff --git a/core/src/worker/client/mocks.rs b/core/src/worker/client/mocks.rs index 93984c364..addd09708 100644 --- a/core/src/worker/client/mocks.rs +++ b/core/src/worker/client/mocks.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, LazyLock}; use temporal_client::ClientWorkerSet; pub(crate) static DEFAULT_WORKERS_REGISTRY: LazyLock> = - LazyLock::new(|| Arc::new(ClientWorkerSet::new())); + LazyLock::new(|| Arc::new(ClientWorkerSet::new(true))); pub(crate) static DEFAULT_TEST_CAPABILITIES: &Capabilities = &Capabilities { signal_and_query_header: true, @@ -30,12 +30,18 @@ pub fn mock_worker_client() -> MockWorkerClient { .returning(|| DEFAULT_WORKERS_REGISTRY.clone()); r.expect_is_mock().returning(|| true); r.expect_shutdown_worker() - .returning(|_| Ok(ShutdownWorkerResponse {})); + .returning(|_, _| Ok(ShutdownWorkerResponse {})); r.expect_sdk_name_and_version() .returning(|| ("test-core".to_string(), "0.0.0".to_string())); r.expect_identity() .returning(|| "test-identity".to_string()); r.expect_worker_grouping_key().returning(Uuid::new_v4); + r.expect_set_heartbeat_client_fields().returning(|hb| { + hb.sdk_name = "test-core".to_string(); + hb.sdk_version = "0.0.0".to_string(); + hb.worker_identity = "test-identity".to_string(); + hb.heartbeat_time = Some(SystemTime::now().into()); + }); r } @@ -148,7 +154,7 @@ mockall::mock! { impl Future> + Send + 'b where 'a: 'b, Self: 'b; - fn shutdown_worker<'a, 'b>(&self, sticky_task_queue: String) -> impl Future> + Send + 'b + fn shutdown_worker<'a, 'b>(&self, sticky_task_queue: String, worker_heartbeat: Option) -> impl Future> + Send + 'b where 'a: 'b, Self: 'b; fn record_worker_heartbeat<'a, 'b>( @@ -164,5 +170,6 @@ mockall::mock! { fn sdk_name_and_version(&self) -> (String, String); fn identity(&self) -> String; fn worker_grouping_key(&self) -> Uuid; + fn set_heartbeat_client_fields(&self, heartbeat: &mut WorkerHeartbeat); } } diff --git a/core/src/worker/heartbeat.rs b/core/src/worker/heartbeat.rs index 88774647f..7ec2f7aa5 100644 --- a/core/src/worker/heartbeat.rs +++ b/core/src/worker/heartbeat.rs @@ -1,12 +1,9 @@ use crate::WorkerClient; use crate::worker::{TaskPollers, WorkerTelemetry}; use parking_lot::Mutex; -use prost_types::Duration as PbDuration; use std::collections::HashMap; -use std::{ - sync::Arc, - time::{Duration, SystemTime}, -}; +use std::sync::Arc; +use std::time::Duration; use temporal_client::SharedNamespaceWorkerTrait; use temporal_sdk_core_api::worker::{ PollerBehavior, WorkerConfigBuilder, WorkerVersioningStrategy, @@ -17,7 +14,7 @@ use tokio_util::sync::CancellationToken; use uuid::Uuid; /// Callback used to collect heartbeat data from each worker at the time of heartbeat -pub(crate) type HeartbeatFn = Box WorkerHeartbeat + Send + Sync>; +pub(crate) type HeartbeatFn = Arc WorkerHeartbeat + Send + Sync>; /// SharedNamespaceWorker is responsible for polling nexus-delivered worker commands and sending /// worker heartbeats to the server. This invokes callbacks on all workers in the same process that @@ -49,7 +46,7 @@ impl SharedNamespaceWorker { .nexus_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize)) .build() .expect("all required fields should be implemented"); - let worker = crate::worker::Worker::new_with_pollers_inner( + let worker = crate::worker::Worker::new_with_pollers( config, None, client.clone(), @@ -59,8 +56,6 @@ impl SharedNamespaceWorker { true, )?; - let last_heartbeat_time_map = Mutex::new(HashMap::new()); - let reset_notify = Arc::new(Notify::new()); let cancel = CancellationToken::new(); let cancel_clone = cancel.clone(); @@ -77,34 +72,13 @@ impl SharedNamespaceWorker { tokio::select! { _ = ticker.tick() => { let mut hb_to_send = Vec::new(); - for (instance_key, heartbeat_callback) in heartbeat_map_clone.lock().iter() { + for (_instance_key, heartbeat_callback) in heartbeat_map_clone.lock().iter() { let mut heartbeat = heartbeat_callback(); - let mut last_heartbeat_time_map = last_heartbeat_time_map.lock(); - let now = SystemTime::now(); - let elapsed_since_last_heartbeat = last_heartbeat_time_map.get(instance_key).cloned().map( - |hb_time| { - let dur = now.duration_since(hb_time).unwrap_or(Duration::ZERO); - PbDuration { - seconds: dur.as_secs() as i64, - nanos: dur.subsec_nanos() as i32, - } - } - ); - - heartbeat.elapsed_since_last_heartbeat = elapsed_since_last_heartbeat; - heartbeat.heartbeat_time = Some(now.into()); - // All of these heartbeat details rely on a client. To avoid circular // dependencies, this must be populated from within SharedNamespaceWorker // to get info from the current client - heartbeat.worker_identity = client_clone.identity(); - let sdk_name_and_ver = client_clone.sdk_name_and_version(); - heartbeat.sdk_name = sdk_name_and_ver.0; - heartbeat.sdk_version = sdk_name_and_ver.1; - + client_clone.set_heartbeat_client_fields(&mut heartbeat); hb_to_send.push(heartbeat); - - last_heartbeat_time_map.insert(*instance_key, now); } if let Err(e) = client_clone.record_worker_heartbeat(namespace_clone.clone(), hb_to_send).await { if matches!(e.code(), tonic::Code::Unimplemented) { @@ -137,19 +111,12 @@ impl SharedNamespaceWorkerTrait for SharedNamespaceWorker { self.namespace.clone() } - fn register_callback( - &self, - worker_instance_key: Uuid, - heartbeat_callback: Box WorkerHeartbeat + Send + Sync>, - ) { + fn register_callback(&self, worker_instance_key: Uuid, heartbeat_callback: HeartbeatFn) { self.heartbeat_map .lock() .insert(worker_instance_key, heartbeat_callback); } - fn unregister_callback( - &self, - worker_instance_key: Uuid, - ) -> (Option WorkerHeartbeat + Send + Sync>>, bool) { + fn unregister_callback(&self, worker_instance_key: Uuid) -> (Option, bool) { let mut heartbeat_map = self.heartbeat_map.lock(); let heartbeat_callback = heartbeat_map.remove(&worker_instance_key); if heartbeat_map.is_empty() { @@ -225,7 +192,6 @@ mod tests { client.clone(), None, Some(Duration::from_millis(100)), - false, ) .unwrap(); diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index 5428c067a..a5037cb38 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -1,6 +1,6 @@ mod activities; pub(crate) mod client; -mod heartbeat; +pub(crate) mod heartbeat; mod nexus; mod slot_provider; pub(crate) mod tuner; @@ -8,10 +8,11 @@ mod workflow; pub use temporal_sdk_core_api::worker::{WorkerConfig, WorkerConfigBuilder}; pub use tuner::{ - FixedSizeSlotSupplier, RealSysInfo, ResourceBasedSlotsOptions, - ResourceBasedSlotsOptionsBuilder, ResourceBasedTuner, ResourceSlotOptions, SlotSupplierOptions, - TunerBuilder, TunerHolder, TunerHolderOptions, TunerHolderOptionsBuilder, + FixedSizeSlotSupplier, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, + ResourceBasedTuner, ResourceSlotOptions, SlotSupplierOptions, TunerBuilder, TunerHolder, + TunerHolderOptions, TunerHolderOptionsBuilder, }; +pub(crate) use tuner::{RealSysInfo, SystemResourceInfo}; pub(crate) use activities::{ ExecutingLAId, LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution, @@ -20,6 +21,7 @@ pub(crate) use activities::{ pub(crate) use wft_poller::WFTPollerShared; pub use workflow::LEGACY_QUERY_ID; +use crate::telemetry::WorkerHeartbeatMetrics; use crate::worker::heartbeat::{HeartbeatFn, SharedNamespaceWorker}; use crate::{ ActivityHeartbeat, CompleteActivityError, PollError, WorkerTrait, @@ -49,10 +51,13 @@ use crate::{ }; use activities::WorkerActivityTasks; use anyhow::bail; +use crossbeam_utils::atomic::AtomicCell; use futures_util::{StreamExt, stream}; use gethostname::gethostname; use parking_lot::{Mutex, RwLock}; use slot_provider::SlotProvider; +use std::sync::atomic::AtomicU64; +use std::time::SystemTime; use std::{ convert::TryInto, future, @@ -67,11 +72,18 @@ use temporal_client::{ ConfiguredClient, SharedNamespaceWorkerTrait, TemporalServiceClientWithMetrics, }; use temporal_sdk_core_api::telemetry::metrics::TemporalMeter; +use temporal_sdk_core_api::worker::{ + ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, SlotKind, WorkflowSlotKind, +}; use temporal_sdk_core_api::{ errors::{CompleteNexusError, WorkerValidationError}, worker::PollerBehavior, }; -use temporal_sdk_core_protos::temporal::api::worker::v1::{WorkerHeartbeat, WorkerHostInfo}; +use temporal_sdk_core_protos::temporal::api::deployment; +use temporal_sdk_core_protos::temporal::api::enums::v1::WorkerStatus; +use temporal_sdk_core_protos::temporal::api::worker::v1::{ + WorkerHeartbeat, WorkerHostInfo, WorkerPollerInfo, WorkerSlotsInfo, +}; use temporal_sdk_core_protos::{ TaskToken, coresdk::{ @@ -131,6 +143,8 @@ pub struct Worker { all_permits_tracker: tokio::sync::Mutex, /// Used to track worker client client_worker_registrator: Arc, + /// Status of the worker + status: Arc>, } struct AllPermitsTracker { @@ -249,18 +263,15 @@ impl WorkerTrait for Worker { ); } self.shutdown_token.cancel(); - // First, unregister worker from the client - if let Err(e) = self - .client - .workers() - .unregister_worker(self.worker_instance_key) { - error!( - task_queue=%self.config.task_queue, - namespace=%self.config.namespace, - error=%e, - "Failed to unregister worker on shutdown", - ); + *self.status.lock() = WorkerStatus::ShuttingDown; + } + // First, unregister worker from the client + if !self.client_worker_registrator.shared_namespace_worker { + let _res = self + .client + .workers() + .unregister_worker(self.worker_instance_key); } // Second, we want to stop polling of both activity and workflow tasks @@ -288,6 +299,10 @@ impl WorkerTrait for Worker { async fn finalize_shutdown(self) { self.finalize_shutdown().await } + + fn worker_instance_key(&self) -> Uuid { + self.worker_instance_key + } } impl Worker { @@ -301,18 +316,23 @@ impl Worker { client: Arc, telem_instance: Option<&TelemetryInstance>, worker_heartbeat_interval: Option, - shared_namespace_worker: bool, ) -> Result { info!(task_queue=%config.task_queue, namespace=%config.namespace, "Initializing worker"); + let worker_telemetry = telem_instance.map(|telem| WorkerTelemetry { + metric_meter: telem.get_metric_meter(), + temporal_metric_meter: telem.get_temporal_metric_meter(), + trace_subscriber: telem.trace_subscriber(), + }); + Self::new_with_pollers( config, sticky_queue_name, client, TaskPollers::Real, - telem_instance, + worker_telemetry, worker_heartbeat_interval, - shared_namespace_worker, + false, ) } @@ -347,36 +367,10 @@ impl Worker { #[cfg(test)] pub(crate) fn new_test(config: WorkerConfig, client: impl WorkerClient + 'static) -> Self { - Self::new(config, None, Arc::new(client), None, None, false).unwrap() + Self::new(config, None, Arc::new(client), None, None).unwrap() } pub(crate) fn new_with_pollers( - config: WorkerConfig, - sticky_queue_name: Option, - client: Arc, - task_pollers: TaskPollers, - telem_instance: Option<&TelemetryInstance>, - worker_heartbeat_interval: Option, - shared_namespace_worker: bool, - ) -> Result { - let worker_telemetry = telem_instance.map(|telem| WorkerTelemetry { - metric_meter: telem.get_metric_meter(), - temporal_metric_meter: telem.get_temporal_metric_meter(), - trace_subscriber: telem.trace_subscriber(), - }); - - Worker::new_with_pollers_inner( - config, - sticky_queue_name, - client, - task_pollers, - worker_telemetry, - worker_heartbeat_interval, - shared_namespace_worker, - ) - } - - pub(crate) fn new_with_pollers_inner( config: WorkerConfig, sticky_queue_name: Option, client: Arc, @@ -398,11 +392,13 @@ impl Worker { (MetricsContext::no_op(), None) }; - let tuner = config - .tuner - .as_ref() - .cloned() - .unwrap_or_else(|| Arc::new(TunerBuilder::from_config(&config).build())); + let mut sys_info = None; + let tuner = config.tuner.as_ref().cloned().unwrap_or_else(|| { + let mut tuner_builder = TunerBuilder::from_config(&config); + sys_info = tuner_builder.get_sys_info(); + Arc::new(tuner_builder.build()) + }); + let sys_info = sys_info.unwrap_or_else(|| Arc::new(RealSysInfo::new())); metrics.worker_registered(); let shutdown_token = CancellationToken::new(); @@ -434,6 +430,12 @@ impl Worker { ); let act_permits = act_slots.get_extant_count_rcv(); let (external_wft_tx, external_wft_rx) = unbounded_channel(); + + let wf_last_suc_poll_time = Arc::new(AtomicCell::new(None)); + let wf_sticky_last_suc_poll_time = Arc::new(AtomicCell::new(None)); + let act_last_suc_poll_time = Arc::new(AtomicCell::new(None)); + let nexus_last_suc_poll_time = Arc::new(AtomicCell::new(None)); + let nexus_slots = MeteredPermitDealer::new( tuner.nexus_task_slot_supplier(), metrics.with_new_attrs([nexus_worker_type()]), @@ -450,6 +452,8 @@ impl Worker { &metrics, &shutdown_token, &wft_slots, + wf_last_suc_poll_time.clone(), + wf_sticky_last_suc_poll_time.clone(), ); let wft_stream = if !client.is_mock() { // Some replay tests combine a mock client with real pollers, @@ -475,11 +479,13 @@ impl Worker { max_worker_acts_per_second: config.max_worker_activities_per_second, max_tps: config.max_task_queue_activities_per_second, }, + act_last_suc_poll_time.clone(), ); Some(Box::from(ap) as BoxedActPoller) }; let np_metrics = metrics.with_new_attrs([nexus_poller()]); + let nexus_poll_buffer = Box::new(LongPollBuffer::new_nexus_task( client.clone(), config.task_queue.clone(), @@ -487,6 +493,7 @@ impl Worker { nexus_slots.clone(), shutdown_token.child_token(), Some(move |np| np_metrics.record_num_pollers(np)), + nexus_last_suc_poll_time.clone(), shared_namespace_worker, )) as BoxedNexusPoller; @@ -531,13 +538,13 @@ impl Worker { let la_permits = la_permit_dealer.get_extant_count_rcv(); let local_act_mgr = Arc::new(LocalActivityManager::new( config.namespace.clone(), - la_permit_dealer, + la_permit_dealer.clone(), hb_tx, metrics.clone(), )); let at_task_mgr = act_poller.map(|ap| { WorkerActivityTasks::new( - act_slots, + act_slots.clone(), ap, client.clone(), metrics.clone(), @@ -548,7 +555,7 @@ impl Worker { ) }); let poll_on_non_local_activities = at_task_mgr.is_some(); - if !poll_on_non_local_activities { + if !poll_on_non_local_activities && !shared_namespace_worker { info!("Activity polling is disabled for this worker"); }; let la_sink = LAReqSink::new(local_act_mgr.clone()); @@ -567,14 +574,29 @@ impl Worker { external_wft_tx, ); let worker_instance_key = Uuid::new_v4(); + let worker_status = Arc::new(Mutex::new(WorkerStatus::Running)); let sdk_name_and_ver = client.sdk_name_and_version(); let worker_heartbeat = worker_heartbeat_interval.map(|hb_interval| { + let hb_metrics = HeartbeatMetrics { + in_mem_metrics: metrics.in_memory_meter(), + wft_slots: wft_slots.clone(), + act_slots, + nexus_slots, + la_slots: la_permit_dealer, + wf_last_suc_poll_time, + wf_sticky_last_suc_poll_time, + act_last_suc_poll_time, + nexus_last_suc_poll_time, + status: worker_status.clone(), + sys_info, + }; WorkerHeartbeatManager::new( config.clone(), worker_instance_key, hb_interval, worker_telemetry.clone(), + hb_metrics, ) }); @@ -583,6 +605,7 @@ impl Worker { slot_provider: provider, heartbeat_manager: worker_heartbeat, client: RwLock::new(client.clone()), + shared_namespace_worker, }); if !shared_namespace_worker { @@ -650,6 +673,7 @@ impl Worker { }), nexus_mgr, client_worker_registrator, + status: worker_status, }) } @@ -658,8 +682,14 @@ impl Worker { async fn shutdown(&self) { self.initiate_shutdown(); if let Some(name) = self.workflows.get_sticky_queue_name() { + let heartbeat = self + .client_worker_registrator + .heartbeat_manager + .as_ref() + .map(|hm| hm.heartbeat_callback.clone()()); + // This is a best effort call and we can still shutdown the worker if it fails - match self.client.shutdown_worker(name).await { + match self.client.shutdown_worker(name, heartbeat).await { Err(err) if !matches!( err.code(), @@ -955,6 +985,7 @@ struct ClientWorkerRegistrator { slot_provider: SlotProvider, heartbeat_manager: Option, client: RwLock>, + shared_namespace_worker: bool, } impl ClientWorker for ClientWorkerRegistrator { @@ -979,12 +1010,12 @@ impl ClientWorker for ClientWorkerRegistrator { fn heartbeat_callback(&self) -> Option { if let Some(hb_mgr) = self.heartbeat_manager.as_ref() { - let mut heartbeat_manager = hb_mgr.heartbeat_callback.lock(); - heartbeat_manager.take() + Some(hb_mgr.heartbeat_callback.clone()) } else { None } } + fn new_shared_namespace_worker( &self, ) -> Result, anyhow::Error> { @@ -999,12 +1030,20 @@ impl ClientWorker for ClientWorkerRegistrator { bail!("Shared namespace worker creation never be called without a heartbeat manager"); } } +} - fn register_callback(&self, callback: HeartbeatCallback) { - if let Some(hb_mgr) = self.heartbeat_manager.as_ref() { - hb_mgr.heartbeat_callback.lock().replace(callback); - } - } +struct HeartbeatMetrics { + in_mem_metrics: Option>, + wft_slots: MeteredPermitDealer, + act_slots: MeteredPermitDealer, + nexus_slots: MeteredPermitDealer, + la_slots: MeteredPermitDealer, + wf_last_suc_poll_time: Arc>>, + wf_sticky_last_suc_poll_time: Arc>>, + act_last_suc_poll_time: Arc>>, + nexus_last_suc_poll_time: Arc>>, + status: Arc>, + sys_info: Arc, } struct WorkerHeartbeatManager { @@ -1013,7 +1052,7 @@ struct WorkerHeartbeatManager { /// Telemetry instance, needed to initialize [SharedNamespaceWorker] when replacing client telemetry: Option, /// Heartbeat callback - heartbeat_callback: Mutex WorkerHeartbeat + Send + Sync>>>, + heartbeat_callback: Arc WorkerHeartbeat + Send + Sync>, } impl WorkerHeartbeatManager { @@ -1022,48 +1061,136 @@ impl WorkerHeartbeatManager { worker_instance_key: Uuid, heartbeat_interval: Duration, telemetry_instance: Option, + heartbeat_manager_metrics: HeartbeatMetrics, ) -> Self { - let worker_instance_key_clone = worker_instance_key.to_string(); - let task_queue = config.task_queue.clone(); + let start_time = Some(SystemTime::now().into()); + let worker_heartbeat_callback: HeartbeatFn = Arc::new(move || { + let deployment_version = config.computed_deployment_version().map(|dv| { + deployment::v1::WorkerDeploymentVersion { + deployment_name: dv.deployment_name, + build_id: dv.build_id, + } + }); - // TODO: requires the metrics changes to get the rest of these fields - let worker_heartbeat_callback: HeartbeatFn = Box::new(move || { - WorkerHeartbeat { - worker_instance_key: worker_instance_key_clone.clone(), + let mut worker_heartbeat = WorkerHeartbeat { + worker_instance_key: worker_instance_key.to_string(), host_info: Some(WorkerHostInfo { host_name: gethostname().to_string_lossy().to_string(), process_id: std::process::id().to_string(), - ..Default::default() + current_host_cpu_usage: heartbeat_manager_metrics.sys_info.used_cpu_percent() + as f32, + current_host_mem_usage: heartbeat_manager_metrics.sys_info.used_mem_percent() + as f32, + + // Set by SharedNamespaceWorker because it relies on the client + process_key: String::new(), }), - task_queue: task_queue.clone(), - deployment_version: None, - - status: 0, - start_time: Some(std::time::SystemTime::now().into()), - workflow_task_slots_info: None, - activity_task_slots_info: None, - nexus_task_slots_info: None, - local_activity_slots_info: None, - workflow_poller_info: None, - workflow_sticky_poller_info: None, - activity_poller_info: None, - nexus_poller_info: None, - total_sticky_cache_hit: 0, - total_sticky_cache_miss: 0, - current_sticky_cache_size: 0, - plugins: vec![], - - // sdk_name, sdk_version, and worker_identity must be set by + task_queue: config.task_queue.clone(), + deployment_version, + + status: (*heartbeat_manager_metrics.status.lock()) as i32, + start_time, + plugins: config.plugins.clone(), + + // Some Metrics dependent fields are set below, and + // some fields like sdk_name, sdk_version, and worker_identity, must be set by // SharedNamespaceWorker because they rely on the client, and // need to be pulled from the current client used by SharedNamespaceWorker ..Default::default() + }; + + if let Some(in_mem) = heartbeat_manager_metrics.in_mem_metrics.as_ref() { + worker_heartbeat.total_sticky_cache_hit = + in_mem.total_sticky_cache_hit.load(Ordering::Relaxed) as i32; + worker_heartbeat.total_sticky_cache_miss = + in_mem.total_sticky_cache_miss.load(Ordering::Relaxed) as i32; + worker_heartbeat.current_sticky_cache_size = + in_mem.sticky_cache_size.load(Ordering::Relaxed) as i32; + + worker_heartbeat.workflow_poller_info = Some(WorkerPollerInfo { + current_pollers: in_mem + .num_pollers + .wft_current_pollers + .load(Ordering::Relaxed) as i32, + last_successful_poll_time: heartbeat_manager_metrics + .wf_last_suc_poll_time + .load() + .map(|time| time.into()), + is_autoscaling: config.workflow_task_poller_behavior.is_autoscaling(), + }); + worker_heartbeat.workflow_sticky_poller_info = Some(WorkerPollerInfo { + current_pollers: in_mem + .num_pollers + .sticky_wft_current_pollers + .load(Ordering::Relaxed) as i32, + last_successful_poll_time: heartbeat_manager_metrics + .wf_sticky_last_suc_poll_time + .load() + .map(|time| time.into()), + is_autoscaling: config.workflow_task_poller_behavior.is_autoscaling(), + }); + worker_heartbeat.activity_poller_info = Some(WorkerPollerInfo { + current_pollers: in_mem + .num_pollers + .activity_current_pollers + .load(Ordering::Relaxed) as i32, + last_successful_poll_time: heartbeat_manager_metrics + .act_last_suc_poll_time + .load() + .map(|time| time.into()), + is_autoscaling: config.activity_task_poller_behavior.is_autoscaling(), + }); + worker_heartbeat.nexus_poller_info = Some(WorkerPollerInfo { + current_pollers: in_mem + .num_pollers + .nexus_current_pollers + .load(Ordering::Relaxed) as i32, + last_successful_poll_time: heartbeat_manager_metrics + .nexus_last_suc_poll_time + .load() + .map(|time| time.into()), + is_autoscaling: config.nexus_task_poller_behavior.is_autoscaling(), + }); + + worker_heartbeat.workflow_task_slots_info = make_slots_info( + &heartbeat_manager_metrics.wft_slots, + in_mem.worker_task_slots_available.workflow_worker.clone(), + in_mem.worker_task_slots_used.workflow_worker.clone(), + in_mem.workflow_task_execution_latency.clone(), + in_mem.workflow_task_execution_failed.clone(), + ); + worker_heartbeat.activity_task_slots_info = make_slots_info( + &heartbeat_manager_metrics.act_slots, + in_mem.worker_task_slots_available.activity_worker.clone(), + in_mem.worker_task_slots_used.activity_worker.clone(), + in_mem.activity_execution_latency.clone(), + in_mem.activity_execution_failed.clone(), + ); + worker_heartbeat.nexus_task_slots_info = make_slots_info( + &heartbeat_manager_metrics.nexus_slots, + in_mem.worker_task_slots_available.nexus_worker.clone(), + in_mem.worker_task_slots_used.nexus_worker.clone(), + in_mem.nexus_task_execution_latency.clone(), + in_mem.nexus_task_execution_failed.clone(), + ); + worker_heartbeat.local_activity_slots_info = make_slots_info( + &heartbeat_manager_metrics.la_slots, + in_mem + .worker_task_slots_available + .local_activity_worker + .clone(), + in_mem.worker_task_slots_used.local_activity_worker.clone(), + in_mem.local_activity_execution_latency.clone(), + in_mem.local_activity_execution_failed.clone(), + ); } + worker_heartbeat }); WorkerHeartbeatManager { heartbeat_interval, telemetry: telemetry_instance, - heartbeat_callback: Mutex::new(Some(worker_heartbeat_callback)), + heartbeat_callback: worker_heartbeat_callback, } } } @@ -1105,6 +1232,31 @@ fn wft_poller_behavior(config: &WorkerConfig, is_sticky: bool) -> PollerBehavior } } +fn make_slots_info( + dealer: &MeteredPermitDealer, + slots_available: Arc, + slots_used: Arc, + total_processed: Arc, + total_failed: Arc, +) -> Option +where + SK: SlotKind + 'static, +{ + Some(WorkerSlotsInfo { + current_available_slots: i32::try_from(slots_available.load(Ordering::Relaxed)) + .unwrap_or(-1), + current_used_slots: i32::try_from(slots_used.load(Ordering::Relaxed)).unwrap_or(-1), + slot_supplier_kind: dealer.slot_supplier_kind().to_string(), + total_processed_tasks: i32::try_from(total_processed.load(Ordering::Relaxed)) + .unwrap_or(i32::MIN), + total_failed_tasks: i32::try_from(total_failed.load(Ordering::Relaxed)).unwrap_or(i32::MIN), + + // Filled in by heartbeat later + last_interval_processed_tasks: 0, + last_interval_failure_tasks: 0, + }) +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/worker/tuner.rs b/core/src/worker/tuner.rs index ed592a6f7..0a7cadcc9 100644 --- a/core/src/worker/tuner.rs +++ b/core/src/worker/tuner.rs @@ -3,10 +3,12 @@ mod resource_based; pub use fixed_size::FixedSizeSlotSupplier; pub use resource_based::{ - RealSysInfo, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceBasedTuner, + ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceBasedTuner, ResourceSlotOptions, }; +pub(crate) use resource_based::{RealSysInfo, SystemResourceInfo}; + use std::sync::Arc; use temporal_sdk_core_api::worker::{ ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, SlotKind, SlotSupplier, WorkerConfig, @@ -126,6 +128,9 @@ impl TunerHolderOptions { } None => {} } + if let Some(tuner) = rb_tuner { + builder.sys_info(tuner.sys_info()); + } Ok(builder.build()) } } @@ -187,6 +192,7 @@ pub struct TunerBuilder { local_activity_slot_supplier: Option + Send + Sync>>, nexus_slot_supplier: Option + Send + Sync>>, + sys_info: Option>, } impl TunerBuilder { @@ -243,6 +249,17 @@ impl TunerBuilder { self } + /// Sets a field that implements [SystemResourceInfo] + pub fn sys_info(&mut self, sys_info: Arc) -> &mut Self { + self.sys_info = Some(sys_info); + self + } + + /// Gets the field that implements [SystemResourceInfo] + pub fn get_sys_info(&self) -> Option> { + self.sys_info.clone() + } + /// Build a [WorkerTuner] from the configured slot suppliers pub fn build(&mut self) -> TunerHolder { TunerHolder { diff --git a/core/src/worker/tuner/fixed_size.rs b/core/src/worker/tuner/fixed_size.rs index aa737dc8b..e1bf53d6e 100644 --- a/core/src/worker/tuner/fixed_size.rs +++ b/core/src/worker/tuner/fixed_size.rs @@ -50,4 +50,8 @@ where fn available_slots(&self) -> Option { Some(self.sem.available_permits()) } + + fn slot_supplier_kind(&self) -> String { + "Fixed".to_string() + } } diff --git a/core/src/worker/tuner/resource_based.rs b/core/src/worker/tuner/resource_based.rs index 173418413..88606add3 100644 --- a/core/src/worker/tuner/resource_based.rs +++ b/core/src/worker/tuner/resource_based.rs @@ -1,11 +1,13 @@ use crossbeam_utils::atomic::AtomicCell; use parking_lot::Mutex; +use std::sync::mpsc; use std::{ marker::PhantomData, sync::{ Arc, OnceLock, atomic::{AtomicU64, AtomicUsize, Ordering}, }, + thread, time::{Duration, Instant}, }; use temporal_sdk_core_api::{ @@ -31,6 +33,8 @@ pub struct ResourceBasedTuner { act_opts: Option, la_opts: Option, nexus_opts: Option, + + sys_info: Arc, } impl ResourceBasedTuner { @@ -42,25 +46,28 @@ impl ResourceBasedTuner { .target_cpu_usage(target_cpu_usage) .build() .expect("default resource based slot options can't fail to build"); - let controller = ResourceController::new_with_sysinfo(opts, RealSysInfo::new()); + let controller = ResourceController::new_with_sysinfo(opts, Arc::new(RealSysInfo::new())); Self::new_from_controller(controller) } /// Create an instance using the fully configurable set of PID controller options pub fn new_from_options(options: ResourceBasedSlotsOptions) -> Self { - let controller = ResourceController::new_with_sysinfo(options, RealSysInfo::new()); + let controller = + ResourceController::new_with_sysinfo(options, Arc::new(RealSysInfo::new())); Self::new_from_controller(controller) } } impl ResourceBasedTuner { fn new_from_controller(controller: ResourceController) -> Self { + let sys_info = controller.sys_info_supplier.clone(); Self { slots: Arc::new(controller), wf_opts: None, act_opts: None, la_opts: None, nexus_opts: None, + sys_info, } } @@ -87,6 +94,11 @@ impl ResourceBasedTuner { self.nexus_opts = Some(opts); self } + + /// Get sys info + pub fn sys_info(&self) -> Arc { + self.sys_info.clone() + } } const DEFAULT_WF_SLOT_OPTS: ResourceSlotOptions = ResourceSlotOptions { @@ -121,7 +133,7 @@ pub struct ResourceSlotOptions { struct ResourceController { options: ResourceBasedSlotsOptions, - sys_info_supplier: MI, + sys_info_supplier: Arc, metrics: OnceLock>, pids: Mutex, last_metric_vals: Arc>, @@ -314,6 +326,10 @@ where } } } + + fn slot_supplier_kind(&self) -> String { + "ResourceBased".to_string() + } } impl ResourceBasedSlotsForType @@ -421,7 +437,7 @@ impl ResourceController { Arc::new(ResourceBasedSlotsForType::new(self.clone(), opts)) } - fn new_with_sysinfo(options: ResourceBasedSlotsOptions, sys_info: MI) -> Self { + fn new_with_sysinfo(options: ResourceBasedSlotsOptions, sys_info: Arc) -> Self { Self { pids: Mutex::new(PidControllers::new(&options)), options, @@ -474,37 +490,14 @@ impl ResourceController { /// Implements [SystemResourceInfo] using the [sysinfo] crate #[derive(Debug)] -pub struct RealSysInfo { +struct RealSysInfoInner { sys: Mutex, total_mem: AtomicU64, cur_mem_usage: AtomicU64, cur_cpu_usage: AtomicU64, - last_refresh: AtomicCell, } -impl RealSysInfo { - fn new() -> Self { - let mut sys = sysinfo::System::new(); - sys.refresh_memory(); - let total_mem = sys.total_memory(); - let s = Self { - sys: Mutex::new(sys), - last_refresh: AtomicCell::new(Instant::now()), - cur_mem_usage: AtomicU64::new(0), - cur_cpu_usage: AtomicU64::new(0), - total_mem: AtomicU64::new(total_mem), - }; - s.refresh(); - s - } - - fn refresh_if_needed(&self) { - // This is all quite expensive and meaningfully slows everything down if it's allowed to - // happen more often. A better approach than a lock would be needed to go faster. - if (Instant::now() - self.last_refresh.load()) > Duration::from_millis(100) { - self.refresh(); - } - } +impl RealSysInfoInner { fn refresh(&self) { let mut lock = self.sys.lock(); lock.refresh_memory(); @@ -522,25 +515,73 @@ impl RealSysInfo { self.cur_mem_usage.store(mem, Ordering::Release); } self.cur_cpu_usage.store(cpu.to_bits(), Ordering::Release); - self.last_refresh.store(Instant::now()); + } +} + +/// Tracks host resource usage by refreshing metrics on a background thread. +pub struct RealSysInfo { + inner: Arc, + shutdown_tx: mpsc::Sender<()>, + shutdown_handle: Mutex>>, +} + +impl RealSysInfo { + pub(crate) fn new() -> Self { + let mut sys = sysinfo::System::new(); + sys.refresh_memory(); + let total_mem = sys.total_memory(); + let inner = Arc::new(RealSysInfoInner { + sys: Mutex::new(sys), + cur_mem_usage: AtomicU64::new(0), + cur_cpu_usage: AtomicU64::new(0), + total_mem: AtomicU64::new(total_mem), + }); + inner.refresh(); + + let thread_clone = inner.clone(); + let (tx, rx) = mpsc::channel::<()>(); + let handle = thread::Builder::new() + .name("temporal-real-sysinfo".to_string()) + .spawn(move || { + const REFRESH_INTERVAL: Duration = Duration::from_millis(100); + loop { + thread_clone.refresh(); + let r = rx.recv_timeout(REFRESH_INTERVAL); + if matches!(r, Err(mpsc::RecvTimeoutError::Disconnected)) || r.is_ok() { + return; + } + } + }) + .expect("failed to spawn RealSysInfo refresh thread"); + + Self { + inner, + shutdown_tx: tx, + shutdown_handle: Mutex::new(Some(handle)), + } } } impl SystemResourceInfo for RealSysInfo { fn total_mem(&self) -> u64 { - self.total_mem.load(Ordering::Acquire) + self.inner.total_mem.load(Ordering::Acquire) } fn used_mem(&self) -> u64 { - // TODO: This should really happen on a background thread since it's getting called from - // the async reserve - self.refresh_if_needed(); - self.cur_mem_usage.load(Ordering::Acquire) + self.inner.cur_mem_usage.load(Ordering::Acquire) } fn used_cpu_percent(&self) -> f64 { - self.refresh_if_needed(); - f64::from_bits(self.cur_cpu_usage.load(Ordering::Acquire)) + f64::from_bits(self.inner.cur_cpu_usage.load(Ordering::Acquire)) + } +} + +impl Drop for RealSysInfo { + fn drop(&mut self) { + let _res = self.shutdown_tx.send(()); + if let Some(handle) = self.shutdown_handle.lock().take() { + let _ = handle.join(); + } } } @@ -558,9 +599,9 @@ mod tests { used: Arc, } impl FakeMIS { - fn new() -> (Self, Arc) { + fn new() -> (Arc, Arc) { let used = Arc::new(AtomicU64::new(0)); - (Self { used: used.clone() }, used) + (Arc::new(Self { used: used.clone() }), used) } } impl SystemResourceInfo for FakeMIS { diff --git a/core/src/worker/workflow/wft_poller.rs b/core/src/worker/workflow/wft_poller.rs index 3cc2da579..0a00ad179 100644 --- a/core/src/worker/workflow/wft_poller.rs +++ b/core/src/worker/workflow/wft_poller.rs @@ -6,13 +6,16 @@ use crate::{ telemetry::metrics::{workflow_poller, workflow_sticky_poller}, worker::{client::WorkerClient, wft_poller_behavior}, }; +use crossbeam_utils::atomic::AtomicCell; use futures_util::{Stream, stream}; use std::sync::{Arc, OnceLock}; +use std::time::SystemTime; use temporal_sdk_core_api::worker::{WorkerConfig, WorkflowSlotKind}; use temporal_sdk_core_protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse; use tokio::sync::watch; use tokio_util::sync::CancellationToken; +#[allow(clippy::too_many_arguments)] pub(crate) fn make_wft_poller( config: &WorkerConfig, sticky_queue_name: &Option, @@ -20,6 +23,8 @@ pub(crate) fn make_wft_poller( metrics: &MetricsContext, shutdown_token: &CancellationToken, wft_slots: &MeteredPermitDealer, + last_successful_poll_time: Arc>>, + sticky_last_successful_poll_time: Arc>>, ) -> impl Stream< Item = Result< ( @@ -52,6 +57,7 @@ pub(crate) fn make_wft_poller( WorkflowTaskOptions { wft_poller_shared: wft_poller_shared.clone(), }, + last_successful_poll_time, ); let sticky_queue_poller = sticky_queue_name.as_ref().map(|sqn| { let sticky_metrics = metrics.with_new_attrs([workflow_sticky_poller()]); @@ -66,6 +72,7 @@ pub(crate) fn make_wft_poller( sticky_metrics.record_num_pollers(np); }), WorkflowTaskOptions { wft_poller_shared }, + sticky_last_successful_poll_time, ) }); let wf_task_poll_buffer = Box::new(WorkflowTaskPoller::new( diff --git a/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json b/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json index 8591cb0be..cfed16ffd 100644 --- a/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json +++ b/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json @@ -2130,6 +2130,51 @@ ] } }, + "/api/v1/namespaces/{namespace}/worker-deployments/{deploymentName}/set-manager": { + "post": { + "summary": "Set/unset the ManagerIdentity of a Worker Deployment.\nExperimental. This API might significantly change or be removed in a future release.", + "operationId": "SetWorkerDeploymentManager2", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1SetWorkerDeploymentManagerResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "deploymentName", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/WorkflowServiceSetWorkerDeploymentManagerBody" + } + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/api/v1/namespaces/{namespace}/worker-deployments/{deploymentName}/set-ramping-version": { "post": { "summary": "Set/unset the Ramping Version of a Worker Deployment and its ramp percentage. Can be used for\ngradual ramp to unversioned workers too.\nExperimental. This API might significantly change or be removed in a future release.", @@ -2296,6 +2341,45 @@ ] } }, + "/api/v1/namespaces/{namespace}/workers/describe/{workerInstanceKey}": { + "get": { + "summary": "DescribeWorker returns information about the specified worker.", + "operationId": "DescribeWorker2", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1DescribeWorkerResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "description": "Namespace this worker belongs to.", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "workerInstanceKey", + "description": "Worker instance key to describe.", + "in": "path", + "required": true, + "type": "string" + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/api/v1/namespaces/{namespace}/workers/fetch-config": { "post": { "summary": "FetchWorkerConfig returns the worker configuration for a specific worker.", @@ -2822,7 +2906,7 @@ }, "/api/v1/namespaces/{namespace}/workflows/{execution.workflowId}/history-reverse": { "get": { - "summary": "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse \norder (starting from last event). Fails with`NotFound` if the specified workflow execution is \nunknown to the service.", + "summary": "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse\norder (starting from last event). Fails with`NotFound` if the specified workflow execution is\nunknown to the service.", "operationId": "GetWorkflowExecutionHistoryReverse2", "responses": { "200": { @@ -5871,6 +5955,51 @@ ] } }, + "/namespaces/{namespace}/worker-deployments/{deploymentName}/set-manager": { + "post": { + "summary": "Set/unset the ManagerIdentity of a Worker Deployment.\nExperimental. This API might significantly change or be removed in a future release.", + "operationId": "SetWorkerDeploymentManager", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1SetWorkerDeploymentManagerResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "deploymentName", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/WorkflowServiceSetWorkerDeploymentManagerBody" + } + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/namespaces/{namespace}/worker-deployments/{deploymentName}/set-ramping-version": { "post": { "summary": "Set/unset the Ramping Version of a Worker Deployment and its ramp percentage. Can be used for\ngradual ramp to unversioned workers too.\nExperimental. This API might significantly change or be removed in a future release.", @@ -6037,6 +6166,45 @@ ] } }, + "/namespaces/{namespace}/workers/describe/{workerInstanceKey}": { + "get": { + "summary": "DescribeWorker returns information about the specified worker.", + "operationId": "DescribeWorker", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1DescribeWorkerResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "description": "Namespace this worker belongs to.", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "workerInstanceKey", + "description": "Worker instance key to describe.", + "in": "path", + "required": true, + "type": "string" + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/namespaces/{namespace}/workers/fetch-config": { "post": { "summary": "FetchWorkerConfig returns the worker configuration for a specific worker.", @@ -6563,7 +6731,7 @@ }, "/namespaces/{namespace}/workflows/{execution.workflowId}/history-reverse": { "get": { - "summary": "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse \norder (starting from last event). Fails with`NotFound` if the specified workflow execution is \nunknown to the service.", + "summary": "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse\norder (starting from last event). Fails with`NotFound` if the specified workflow execution is\nunknown to the service.", "operationId": "GetWorkflowExecutionHistoryReverse", "responses": { "200": { @@ -7861,7 +8029,7 @@ }, "type": { "type": "string", - "description": "Pause all running activities of this type." + "description": "Pause all running activities of this type.\nNote: Experimental - the behavior of pause by activity type might change in a future release." }, "reason": { "type": "string", @@ -8314,10 +8482,37 @@ "ignoreMissingTaskQueues": { "type": "boolean", "description": "Optional. By default this request would be rejected if not all the expected Task Queues are\nbeing polled by the new Version, to protect against accidental removal of Task Queues, or\nworker health issues. Pass `true` here to bypass this protection.\nThe set of expected Task Queues is the set of all the Task Queues that were ever poller by\nthe existing Current Version of the Deployment, with the following exclusions:\n - Task Queues that are not used anymore (inferred by having empty backlog and a task\n add_rate of 0.)\n - Task Queues that are moved to another Worker Deployment (inferred by the Task Queue\n having a different Current Version than the Current Version of this deployment.)\nWARNING: Do not set this flag unless you are sure that the missing task queue pollers are not\nneeded. If the request is unexpectedly rejected due to missing pollers, then that means the\npollers have not reached to the server yet. Only set this if you expect those pollers to\nnever arrive." + }, + "allowNoPollers": { + "type": "boolean", + "description": "Optional. By default this request will be rejected if no pollers have been seen for the proposed\nCurrent Version, in order to protect users from routing tasks to pollers that do not exist, leading\nto possible timeouts. Pass `true` here to bypass this protection." } }, "description": "Set/unset the Current Version of a Worker Deployment." }, + "WorkflowServiceSetWorkerDeploymentManagerBody": { + "type": "object", + "properties": { + "managerIdentity": { + "type": "string", + "description": "Arbitrary value for `manager_identity`.\nEmpty will unset the field." + }, + "self": { + "type": "boolean", + "description": "True will set `manager_identity` to `identity`." + }, + "conflictToken": { + "type": "string", + "format": "byte", + "description": "Optional. This can be the value of conflict_token from a Describe, or another Worker\nDeployment API. Passing a non-nil conflict token will cause this request to fail if the\nDeployment's configuration has been modified between the API call that generated the\ntoken and this one." + }, + "identity": { + "type": "string", + "description": "Required. The identity of the client who initiated this request." + } + }, + "description": "Update the ManagerIdentity of a Worker Deployment." + }, "WorkflowServiceSetWorkerDeploymentRampingVersionBody": { "type": "object", "properties": { @@ -8346,6 +8541,10 @@ "ignoreMissingTaskQueues": { "type": "boolean", "description": "Optional. By default this request would be rejected if not all the expected Task Queues are\nbeing polled by the new Version, to protect against accidental removal of Task Queues, or\nworker health issues. Pass `true` here to bypass this protection.\nThe set of expected Task Queues equals to all the Task Queues ever polled from the existing\nCurrent Version of the Deployment, with the following exclusions:\n - Task Queues that are not used anymore (inferred by having empty backlog and a task\n add_rate of 0.)\n - Task Queues that are moved to another Worker Deployment (inferred by the Task Queue\n having a different Current Version than the Current Version of this deployment.)\nWARNING: Do not set this flag unless you are sure that the missing task queue poller are not\nneeded. If the request is unexpectedly rejected due to missing pollers, then that means the\npollers have not reached to the server yet. Only set this if you expect those pollers to\nnever arrive.\nNote: this check only happens when the ramping version is about to change, not every time\nthat the percentage changes. Also note that the check is against the deployment's Current\nVersion, not the previous Ramping Version." + }, + "allowNoPollers": { + "type": "boolean", + "description": "Optional. By default this request will be rejected if no pollers have been seen for the proposed\nCurrent Version, in order to protect users from routing tasks to pollers that do not exist, leading\nto possible timeouts. Pass `true` here to bypass this protection." } }, "description": "Set/unset the Ramping Version of a Worker Deployment and its ramp percentage." @@ -8643,6 +8842,10 @@ "priority": { "$ref": "#/definitions/v1Priority", "title": "Priority metadata" + }, + "eagerWorkerDeploymentOptions": { + "$ref": "#/definitions/v1WorkerDeploymentOptions", + "description": "Deployment Options of the worker who will process the eager task. Passed when `request_eager_execution=true`." } } }, @@ -10875,6 +11078,14 @@ } } }, + "v1DescribeWorkerResponse": { + "type": "object", + "properties": { + "workerInfo": { + "$ref": "#/definitions/v1WorkerInfo" + } + } + }, "v1DescribeWorkflowExecutionResponse": { "type": "object", "properties": { @@ -11198,6 +11409,14 @@ }, "visibilityStore": { "type": "string" + }, + "initialFailoverVersion": { + "type": "string", + "format": "int64" + }, + "failoverVersionIncrement": { + "type": "string", + "format": "int64" } }, "description": "GetClusterInfoResponse contains information about Temporal cluster." @@ -12167,6 +12386,14 @@ "asyncUpdate": { "type": "boolean", "title": "True if the namespace supports async update" + }, + "workerHeartbeats": { + "type": "boolean", + "title": "True if the namespace supports worker heartbeats" + }, + "reportedProblemsSearchAttribute": { + "type": "boolean", + "title": "True if the namespace supports reported problems search attribute" } }, "description": "Namespace capability details. Should contain what features are enabled in a namespace." @@ -13154,7 +13381,7 @@ "priorityKey": { "type": "integer", "format": "int32", - "description": "Priority key is a positive integer from 1 to n, where smaller integers\ncorrespond to higher priorities (tasks run sooner). In general, tasks in\na queue should be processed in close to priority order, although small\ndeviations are possible.\n\nThe maximum priority value (minimum priority) is determined by server\nconfiguration, and defaults to 5.\n\nIf priority is not present (or zero), then the effective priority will be\nthe default priority, which is is calculated by (min+max)/2. With the\ndefault max of 5, and min of 1, that comes out to 3." + "description": "Priority key is a positive integer from 1 to n, where smaller integers\ncorrespond to higher priorities (tasks run sooner). In general, tasks in\na queue should be processed in close to priority order, although small\ndeviations are possible.\n\nThe maximum priority value (minimum priority) is determined by server\nconfiguration, and defaults to 5.\n\nIf priority is not present (or zero), then the effective priority will be\nthe default priority, which is calculated by (min+max)/2. With the\ndefault max of 5, and min of 1, that comes out to 3." }, "fairnessKey": { "type": "string", @@ -14343,6 +14570,20 @@ } } }, + "v1SetWorkerDeploymentManagerResponse": { + "type": "object", + "properties": { + "conflictToken": { + "type": "string", + "format": "byte", + "description": "This value is returned so that it can be optionally passed to APIs\nthat write to the Worker Deployment state to ensure that the state\ndid not change between this API call and a future write." + }, + "previousManagerIdentity": { + "type": "string", + "description": "What the `manager_identity` field was before this change." + } + } + }, "v1SetWorkerDeploymentRampingVersionResponse": { "type": "object", "properties": { @@ -14891,6 +15132,10 @@ "priority": { "$ref": "#/definitions/v1Priority", "title": "Priority metadata" + }, + "eagerWorkerDeploymentOptions": { + "$ref": "#/definitions/v1WorkerDeploymentOptions", + "description": "Deployment Options of the worker who will process the eager task. Passed when `request_eager_execution=true`." } } }, @@ -15453,7 +15698,7 @@ "additionalProperties": { "type": "string" }, - "description": "A key-value map for any customized purpose.\nIf data already exists on the namespace, \nthis will merge with the existing key values." + "description": "A key-value map for any customized purpose.\nIf data already exists on the namespace,\nthis will merge with the existing key values." }, "state": { "$ref": "#/definitions/v1NamespaceState", @@ -15812,6 +16057,10 @@ "lastModifierIdentity": { "type": "string", "description": "Identity of the last client who modified the configuration of this Deployment. Set to the\n`identity` value sent by APIs such as `SetWorkerDeploymentCurrentVersion` and\n`SetWorkerDeploymentRampingVersion`." + }, + "managerIdentity": { + "type": "string", + "description": "Identity of the client that has the exclusive right to make changes to this Worker Deployment.\nEmpty by default.\nIf this is set, clients whose identity does not match `manager_identity` will not be able to make changes\nto this Worker Deployment. They can either set their own identity as the manager or unset the field to proceed." } }, "description": "A Worker Deployment (Deployment, for short) represents all workers serving \na shared set of Task Queues. Typically, a Deployment represents one service or \napplication.\nA Deployment contains multiple Deployment Versions, each representing a different \nversion of workers. (see documentation of WorkerDeploymentVersionInfo)\nDeployment records are created in Temporal server automatically when their\nfirst poller arrives to the server.\nExperimental. Worker Deployments are experimental and might significantly change in the future." diff --git a/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml b/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml index 7d587b366..88f8737d0 100644 --- a/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml +++ b/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml @@ -1914,6 +1914,44 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /api/v1/namespaces/{namespace}/worker-deployments/{deploymentName}/set-manager: + post: + tags: + - WorkflowService + description: |- + Set/unset the ManagerIdentity of a Worker Deployment. + Experimental. This API might significantly change or be removed in a future release. + operationId: SetWorkerDeploymentManager + parameters: + - name: namespace + in: path + required: true + schema: + type: string + - name: deploymentName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/SetWorkerDeploymentManagerRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/SetWorkerDeploymentManagerResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /api/v1/namespaces/{namespace}/worker-deployments/{deploymentName}/set-ramping-version: post: tags: @@ -2087,6 +2125,38 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /api/v1/namespaces/{namespace}/workers/describe/{workerInstanceKey}: + get: + tags: + - WorkflowService + description: DescribeWorker returns information about the specified worker. + operationId: DescribeWorker + parameters: + - name: namespace + in: path + description: Namespace this worker belongs to. + required: true + schema: + type: string + - name: workerInstanceKey + in: path + description: Worker instance key to describe. + required: true + schema: + type: string + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/DescribeWorkerResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /api/v1/namespaces/{namespace}/workers/fetch-config: post: tags: @@ -2540,7 +2610,10 @@ paths: get: tags: - WorkflowService - description: "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse \n order (starting from last event). Fails with`NotFound` if the specified workflow execution is \n unknown to the service." + description: |- + GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse + order (starting from last event). Fails with`NotFound` if the specified workflow execution is + unknown to the service. operationId: GetWorkflowExecutionHistoryReverse parameters: - name: namespace @@ -5265,6 +5338,44 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /namespaces/{namespace}/worker-deployments/{deploymentName}/set-manager: + post: + tags: + - WorkflowService + description: |- + Set/unset the ManagerIdentity of a Worker Deployment. + Experimental. This API might significantly change or be removed in a future release. + operationId: SetWorkerDeploymentManager + parameters: + - name: namespace + in: path + required: true + schema: + type: string + - name: deploymentName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/SetWorkerDeploymentManagerRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/SetWorkerDeploymentManagerResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /namespaces/{namespace}/worker-deployments/{deploymentName}/set-ramping-version: post: tags: @@ -5438,6 +5549,38 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /namespaces/{namespace}/workers/describe/{workerInstanceKey}: + get: + tags: + - WorkflowService + description: DescribeWorker returns information about the specified worker. + operationId: DescribeWorker + parameters: + - name: namespace + in: path + description: Namespace this worker belongs to. + required: true + schema: + type: string + - name: workerInstanceKey + in: path + description: Worker instance key to describe. + required: true + schema: + type: string + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/DescribeWorkerResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /namespaces/{namespace}/workers/fetch-config: post: tags: @@ -5891,7 +6034,10 @@ paths: get: tags: - WorkflowService - description: "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse \n order (starting from last event). Fails with`NotFound` if the specified workflow execution is \n unknown to the service." + description: |- + GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse + order (starting from last event). Fails with`NotFound` if the specified workflow execution is + unknown to the service. operationId: GetWorkflowExecutionHistoryReverse parameters: - name: namespace @@ -7921,6 +8067,11 @@ components: Only set if `report_task_queue_stats` is set to true in the request. (-- api-linter: core::0140::prepositions=disabled aip.dev/not-precedent: "by" is used to clarify the key. --) + DescribeWorkerResponse: + type: object + properties: + workerInfo: + $ref: '#/components/schemas/WorkerInfo' DescribeWorkflowExecutionResponse: type: object properties: @@ -8242,6 +8393,10 @@ components: type: string visibilityStore: type: string + initialFailoverVersion: + type: string + failoverVersionIncrement: + type: string description: GetClusterInfoResponse contains information about Temporal cluster. GetCurrentDeploymentResponse: type: object @@ -9055,6 +9210,12 @@ components: asyncUpdate: type: boolean description: True if the namespace supports async update + workerHeartbeats: + type: boolean + description: True if the namespace supports worker heartbeats + reportedProblemsSearchAttribute: + type: boolean + description: True if the namespace supports reported problems search attribute description: Namespace capability details. Should contain what features are enabled in a namespace. NamespaceReplicationConfig: type: object @@ -9461,7 +9622,9 @@ components: description: Only the activity with this ID will be paused. type: type: string - description: Pause all running activities of this type. + description: |- + Pause all running activities of this type. + Note: Experimental - the behavior of pause by activity type might change in a future release. reason: type: string description: Reason to pause the activity. @@ -9939,7 +10102,7 @@ components: configuration, and defaults to 5. If priority is not present (or zero), then the effective priority will be - the default priority, which is is calculated by (min+max)/2. With the + the default priority, which is calculated by (min+max)/2. With the default max of 5, and min of 1, that comes out to 3. format: int32 fairnessKey: @@ -11347,6 +11510,12 @@ components: needed. If the request is unexpectedly rejected due to missing pollers, then that means the pollers have not reached to the server yet. Only set this if you expect those pollers to never arrive. + allowNoPollers: + type: boolean + description: |- + Optional. By default this request will be rejected if no pollers have been seen for the proposed + Current Version, in order to protect users from routing tasks to pollers that do not exist, leading + to possible timeouts. Pass `true` here to bypass this protection. description: Set/unset the Current Version of a Worker Deployment. SetWorkerDeploymentCurrentVersionResponse: type: object @@ -11365,6 +11534,46 @@ components: allOf: - $ref: '#/components/schemas/WorkerDeploymentVersion' description: The version that was current before executing this operation. + SetWorkerDeploymentManagerRequest: + type: object + properties: + namespace: + type: string + deploymentName: + type: string + managerIdentity: + type: string + description: |- + Arbitrary value for `manager_identity`. + Empty will unset the field. + self: + type: boolean + description: True will set `manager_identity` to `identity`. + conflictToken: + type: string + description: |- + Optional. This can be the value of conflict_token from a Describe, or another Worker + Deployment API. Passing a non-nil conflict token will cause this request to fail if the + Deployment's configuration has been modified between the API call that generated the + token and this one. + format: bytes + identity: + type: string + description: Required. The identity of the client who initiated this request. + description: Update the ManagerIdentity of a Worker Deployment. + SetWorkerDeploymentManagerResponse: + type: object + properties: + conflictToken: + type: string + description: |- + This value is returned so that it can be optionally passed to APIs + that write to the Worker Deployment state to ensure that the state + did not change between this API call and a future write. + format: bytes + previousManagerIdentity: + type: string + description: What the `manager_identity` field was before this change. SetWorkerDeploymentRampingVersionRequest: type: object properties: @@ -11415,6 +11624,12 @@ components: Note: this check only happens when the ramping version is about to change, not every time that the percentage changes. Also note that the check is against the deployment's Current Version, not the previous Ramping Version. + allowNoPollers: + type: boolean + description: |- + Optional. By default this request will be rejected if no pollers have been seen for the proposed + Current Version, in order to protect users from routing tasks to pollers that do not exist, leading + to possible timeouts. Pass `true` here to bypass this protection. description: Set/unset the Ramping Version of a Worker Deployment and its ramp percentage. SetWorkerDeploymentRampingVersionResponse: type: object @@ -11960,6 +12175,10 @@ components: allOf: - $ref: '#/components/schemas/Priority' description: Priority metadata + eagerWorkerDeploymentOptions: + allOf: + - $ref: '#/components/schemas/WorkerDeploymentOptions' + description: Deployment Options of the worker who will process the eager task. Passed when `request_eager_execution=true`. StartWorkflowExecutionResponse: type: object properties: @@ -12551,7 +12770,10 @@ components: type: object additionalProperties: type: string - description: "A key-value map for any customized purpose.\n If data already exists on the namespace, \n this will merge with the existing key values." + description: |- + A key-value map for any customized purpose. + If data already exists on the namespace, + this will merge with the existing key values. state: enum: - NAMESPACE_STATE_UNSPECIFIED @@ -13093,6 +13315,13 @@ components: Identity of the last client who modified the configuration of this Deployment. Set to the `identity` value sent by APIs such as `SetWorkerDeploymentCurrentVersion` and `SetWorkerDeploymentRampingVersion`. + managerIdentity: + type: string + description: |- + Identity of the client that has the exclusive right to make changes to this Worker Deployment. + Empty by default. + If this is set, clients whose identity does not match `manager_identity` will not be able to make changes + to this Worker Deployment. They can either set their own identity as the manager or unset the field to proceed. description: "A Worker Deployment (Deployment, for short) represents all workers serving \n a shared set of Task Queues. Typically, a Deployment represents one service or \n application.\n A Deployment contains multiple Deployment Versions, each representing a different \n version of workers. (see documentation of WorkerDeploymentVersionInfo)\n Deployment records are created in Temporal server automatically when their\n first poller arrives to the server.\n Experimental. Worker Deployments are experimental and might significantly change in the future." WorkerDeploymentInfo_WorkerDeploymentVersionSummary: type: object diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/common/v1/message.proto b/sdk-core-protos/protos/api_upstream/temporal/api/common/v1/message.proto index 51acfaa2e..838f5fefc 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/common/v1/message.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/common/v1/message.proto @@ -280,7 +280,7 @@ message Priority { // configuration, and defaults to 5. // // If priority is not present (or zero), then the effective priority will be - // the default priority, which is is calculated by (min+max)/2. With the + // the default priority, which is calculated by (min+max)/2. With the // default max of 5, and min of 1, that comes out to 3. int32 priority_key = 1; diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/deployment/v1/message.proto b/sdk-core-protos/protos/api_upstream/temporal/api/deployment/v1/message.proto index 14b4205c5..8f6685a5d 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/deployment/v1/message.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/deployment/v1/message.proto @@ -195,6 +195,12 @@ message WorkerDeploymentInfo { // `SetWorkerDeploymentRampingVersion`. string last_modifier_identity = 5; + // Identity of the client that has the exclusive right to make changes to this Worker Deployment. + // Empty by default. + // If this is set, clients whose identity does not match `manager_identity` will not be able to make changes + // to this Worker Deployment. They can either set their own identity as the manager or unset the field to proceed. + string manager_identity = 6; + message WorkerDeploymentVersionSummary { // Deprecated. Use `deployment_version`. string version = 1 [deprecated = true]; diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/namespace/v1/message.proto b/sdk-core-protos/protos/api_upstream/temporal/api/namespace/v1/message.proto index 405cd53c9..79c44cb05 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/namespace/v1/message.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/namespace/v1/message.proto @@ -34,6 +34,10 @@ message NamespaceInfo { bool sync_update = 2; // True if the namespace supports async update bool async_update = 3; + // True if the namespace supports worker heartbeats + bool worker_heartbeats = 4; + // True if the namespace supports reported problems search attribute + bool reported_problems_search_attribute = 5; } // Whether scheduled workflows are supported on this namespace. This is only needed @@ -68,8 +72,8 @@ message UpdateNamespaceInfo { string description = 1; string owner_email = 2; // A key-value map for any customized purpose. - // If data already exists on the namespace, - // this will merge with the existing key values. + // If data already exists on the namespace, + // this will merge with the existing key values. map data = 3; // New namespace state, server will reject if transition is not allowed. // Allowed transitions are: diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto b/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto index 5059575dc..37ad083c4 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto @@ -194,6 +194,8 @@ message StartWorkflowExecutionRequest { temporal.api.workflow.v1.OnConflictOptions on_conflict_options = 26; // Priority metadata temporal.api.common.v1.Priority priority = 27; + // Deployment Options of the worker who will process the eager task. Passed when `request_eager_execution=true`. + temporal.api.deployment.v1.WorkerDeploymentOptions eager_worker_deployment_options = 28; } message StartWorkflowExecutionResponse { @@ -1157,6 +1159,8 @@ message GetClusterInfoResponse { int32 history_shard_count = 6; string persistence_store = 7; string visibility_store = 8; + int64 initial_failover_version = 9; + int64 failover_version_increment = 10; } message GetSystemInfoRequest { @@ -1938,6 +1942,7 @@ message PauseActivityRequest { // Only the activity with this ID will be paused. string id = 4; // Pause all running activities of this type. + // Note: Experimental - the behavior of pause by activity type might change in a future release. string type = 5; } @@ -2163,6 +2168,10 @@ message SetWorkerDeploymentCurrentVersionRequest { // pollers have not reached to the server yet. Only set this if you expect those pollers to // never arrive. bool ignore_missing_task_queues = 6; + // Optional. By default this request will be rejected if no pollers have been seen for the proposed + // Current Version, in order to protect users from routing tasks to pollers that do not exist, leading + // to possible timeouts. Pass `true` here to bypass this protection. + bool allow_no_pollers = 9; } message SetWorkerDeploymentCurrentVersionResponse { @@ -2215,6 +2224,10 @@ message SetWorkerDeploymentRampingVersionRequest { // that the percentage changes. Also note that the check is against the deployment's Current // Version, not the previous Ramping Version. bool ignore_missing_task_queues = 7; + // Optional. By default this request will be rejected if no pollers have been seen for the proposed + // Current Version, in order to protect users from routing tasks to pollers that do not exist, leading + // to possible timeouts. Pass `true` here to bypass this protection. + bool allow_no_pollers = 10; } message SetWorkerDeploymentRampingVersionResponse { @@ -2248,8 +2261,8 @@ message ListWorkerDeploymentsResponse { google.protobuf.Timestamp create_time = 2; temporal.api.deployment.v1.RoutingConfig routing_config = 3; // Summary of the version that was added most recently in the Worker Deployment. - temporal.api.deployment.v1.WorkerDeploymentInfo.WorkerDeploymentVersionSummary latest_version_summary = 4; - // Summary of the current version of the Worker Deployment. + temporal.api.deployment.v1.WorkerDeploymentInfo.WorkerDeploymentVersionSummary latest_version_summary = 4; + // Summary of the current version of the Worker Deployment. temporal.api.deployment.v1.WorkerDeploymentInfo.WorkerDeploymentVersionSummary current_version_summary = 5; // Summary of the ramping version of the Worker Deployment. temporal.api.deployment.v1.WorkerDeploymentInfo.WorkerDeploymentVersionSummary ramping_version_summary = 6; @@ -2309,6 +2322,39 @@ message UpdateWorkerDeploymentVersionMetadataResponse { temporal.api.deployment.v1.VersionMetadata metadata = 1; } +// Update the ManagerIdentity of a Worker Deployment. +message SetWorkerDeploymentManagerRequest { + string namespace = 1; + string deployment_name = 2; + + oneof new_manager_identity { + // Arbitrary value for `manager_identity`. + // Empty will unset the field. + string manager_identity = 3; + + // True will set `manager_identity` to `identity`. + bool self = 4; + } + + // Optional. This can be the value of conflict_token from a Describe, or another Worker + // Deployment API. Passing a non-nil conflict token will cause this request to fail if the + // Deployment's configuration has been modified between the API call that generated the + // token and this one. + bytes conflict_token = 5; + + // Required. The identity of the client who initiated this request. + string identity = 6; +} + +message SetWorkerDeploymentManagerResponse { + // This value is returned so that it can be optionally passed to APIs + // that write to the Worker Deployment state to ensure that the state + // did not change between this API call and a future write. + bytes conflict_token = 1; + + // What the `manager_identity` field was before this change. + string previous_manager_identity = 2; +} // Returns the Current Deployment of a deployment series. // [cleanup-wv-pre-release] Pre-release deployment APIs, clean up later @@ -2537,3 +2583,15 @@ message UpdateWorkerConfigResponse { // Once we support sending update to a multiple workers - it will be converted into a batch job, and job id will be returned. } } + +message DescribeWorkerRequest { + // Namespace this worker belongs to. + string namespace = 1; + + // Worker instance key to describe. + string worker_instance_key = 2; +} + +message DescribeWorkerResponse { + temporal.api.worker.v1.WorkerInfo worker_info = 1; +} diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/service.proto b/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/service.proto index cc74230af..dc33b84ef 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/service.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/service.proto @@ -133,9 +133,9 @@ service WorkflowService { } }; } - - // GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse - // order (starting from last event). Fails with`NotFound` if the specified workflow execution is + + // GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse + // order (starting from last event). Fails with`NotFound` if the specified workflow execution is // unknown to the service. rpc GetWorkflowExecutionHistoryReverse (GetWorkflowExecutionHistoryReverseRequest) returns (GetWorkflowExecutionHistoryReverseResponse) { option (google.api.http) = { @@ -458,7 +458,8 @@ service WorkflowService { }; } - // ScanWorkflowExecutions is a visibility API to list large amount of workflow executions in a specific namespace without order. + // ScanWorkflowExecutions _was_ a visibility API to list large amount of workflow executions in a specific namespace without order. + // It has since been deprecated in favor of `ListWorkflowExecutions` and rewritten to use `ListWorkflowExecutions` internally. // // Deprecated: Replaced with `ListWorkflowExecutions`. // (-- api-linter: core::0127::http-annotation=disabled @@ -669,8 +670,8 @@ service WorkflowService { // members are compatible with one another. // // A single build id may be mapped to multiple task queues using this API for cases where a single process hosts - // multiple workers. - // + // multiple workers. + // // To query which workers can be retired, use the `GetWorkerTaskReachability` API. // // NOTE: The number of task queues mapped to a single build id is limited by the `limit.taskQueuesPerBuildId` @@ -923,6 +924,19 @@ service WorkflowService { }; } + // Set/unset the ManagerIdentity of a Worker Deployment. + // Experimental. This API might significantly change or be removed in a future release. + rpc SetWorkerDeploymentManager (SetWorkerDeploymentManagerRequest) returns (SetWorkerDeploymentManagerResponse) { + option (google.api.http) = { + post: "/namespaces/{namespace}/worker-deployments/{deployment_name}/set-manager" + body: "*" + additional_bindings { + post: "/api/v1/namespaces/{namespace}/worker-deployments/{deployment_name}/set-manager" + body: "*" + } + }; + } + // Invokes the specified Update function on user Workflow code. rpc UpdateWorkflowExecution(UpdateWorkflowExecutionRequest) returns (UpdateWorkflowExecutionResponse) { option (google.api.http) = { @@ -1235,4 +1249,14 @@ service WorkflowService { } }; } + + // DescribeWorker returns information about the specified worker. + rpc DescribeWorker (DescribeWorkerRequest) returns (DescribeWorkerResponse) { + option (google.api.http) = { + get: "/namespaces/{namespace}/workers/describe/{worker_instance_key}" + additional_bindings { + get: "/api/v1/namespaces/{namespace}/workers/describe/{worker_instance_key}" + } + }; + } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 35b2825c3..859629512 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -38,6 +38,7 @@ use temporal_sdk::{ WorkerInterceptor, }, }; +pub(crate) use temporal_sdk_core::test_help::NAMESPACE; use temporal_sdk_core::{ ClientOptions, ClientOptionsBuilder, CoreRuntime, RuntimeOptions, RuntimeOptionsBuilder, WorkerConfigBuilder, init_replay_worker, init_worker, @@ -67,8 +68,7 @@ use temporal_sdk_core_protos::{ use tokio::{sync::OnceCell, task::AbortHandle}; use tracing::{debug, warn}; use url::Url; - -pub(crate) use temporal_sdk_core::test_help::NAMESPACE; +use uuid::Uuid; /// The env var used to specify where the integ tests should point pub(crate) const INTEG_SERVER_TARGET_ENV_VAR: &str = "TEMPORAL_SERVICE_ADDRESS"; pub(crate) const INTEG_NAMESPACE_ENV_VAR: &str = "TEMPORAL_NAMESPACE"; @@ -498,6 +498,10 @@ impl TestWorker { &mut self.inner } + pub(crate) fn worker_instance_key(&self) -> Uuid { + self.core_worker.worker_instance_key() + } + // TODO: Maybe trait-ify? pub(crate) fn register_wf>( &mut self, diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index dc7caa812..901e7c178 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -762,15 +762,8 @@ async fn docker_metrics_with_prometheus( assert!(!data.is_empty(), "No metrics found for query: {test_uid}"); assert_eq!(data[0]["metric"]["exported_job"], "temporal-core-sdk"); assert_eq!(data[0]["metric"]["job"], "otel-collector"); - // Worker heartbeating nexus worker assert!( data[0]["metric"]["task_queue"] - .as_str() - .unwrap() - .starts_with("temporal-sys/worker-commands/default/") - ); - assert!( - data[1]["metric"]["task_queue"] .as_str() .unwrap() .starts_with(test_name) diff --git a/tests/integ_tests/worker_heartbeat_tests.rs b/tests/integ_tests/worker_heartbeat_tests.rs new file mode 100644 index 000000000..bdd962ce6 --- /dev/null +++ b/tests/integ_tests/worker_heartbeat_tests.rs @@ -0,0 +1,635 @@ +use crate::common::{ANY_PORT, CoreWfStarter, get_integ_telem_options}; +use anyhow::anyhow; +use crossbeam_utils::atomic::AtomicCell; +use prost_types::Duration as PbDuration; +use prost_types::Timestamp; +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use temporal_client::{Client, NamespacedClient, RetryClient, WorkflowService}; +use temporal_sdk::{ActContext, ActivityOptions, WfContext}; +use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter}; +use temporal_sdk_core::{ + CoreRuntime, ResourceBasedTuner, ResourceSlotOptions, RuntimeOptionsBuilder, +}; +use temporal_sdk_core_api::telemetry::{ + OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder, +}; +use temporal_sdk_core_api::worker::PollerBehavior; +use temporal_sdk_core_protos::coresdk::AsJsonPayloadExt; +use temporal_sdk_core_protos::temporal::api::common::v1::RetryPolicy; +use temporal_sdk_core_protos::temporal::api::enums::v1::WorkerStatus; +use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat; +use temporal_sdk_core_protos::temporal::api::workflowservice::v1::DescribeWorkerRequest; +use temporal_sdk_core_protos::temporal::api::workflowservice::v1::ListWorkersRequest; +use tokio::sync::Semaphore; +use tokio::time::sleep; +use url::Url; + +fn within_two_minutes_ts(ts: Timestamp) -> bool { + let ts_time = UNIX_EPOCH + Duration::new(ts.seconds as u64, ts.nanos as u32); + + let now = SystemTime::now(); + // ts should be at most 2 minutes before the current time + now.duration_since(ts_time).unwrap() <= Duration::from_secs(2 * 60) +} + +fn within_duration(dur: PbDuration, threshold: Duration) -> bool { + let std_dur = Duration::new(dur.seconds as u64, dur.nanos as u32); + std_dur <= threshold +} + +fn new_no_metrics_starter(wf_name: &str) -> CoreWfStarter { + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(TelemetryOptionsBuilder::default().build().unwrap()) + .heartbeat_interval(Some(Duration::from_millis(100))) + .build() + .unwrap(); + CoreWfStarter::new_with_runtime(wf_name, CoreRuntime::new_assume_tokio(runtimeopts).unwrap()) +} + +async fn list_worker_heartbeats( + client: &Arc>, + query: impl Into, +) -> Vec { + let mut raw_client = client.as_ref().clone(); + WorkflowService::list_workers( + &mut raw_client, + ListWorkersRequest { + namespace: client.namespace().to_owned(), + page_size: 200, + next_page_token: Vec::new(), + query: query.into(), + }, + ) + .await + .unwrap() + .into_inner() + .workers_info + .into_iter() + .filter_map(|info| info.worker_heartbeat) + .collect() +} + +// Tests that rely on Prometheus running in a docker container need to start +// with `docker_` and set the `DOCKER_PROMETHEUS_RUNNING` env variable to run +#[rstest::rstest] +#[tokio::test] +async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] backing: &str) { + let telemopts = if backing == "no_metrics" { + TelemetryOptionsBuilder::default().build().unwrap() + } else { + get_integ_telem_options() + }; + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .heartbeat_interval(Some(Duration::from_millis(100))) + .build() + .unwrap(); + let mut rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); + match backing { + "otel" => { + let url = Some("grpc://localhost:4317") + .map(|x| x.parse::().unwrap()) + .unwrap(); + let mut opts_build = OtelCollectorOptionsBuilder::default(); + let opts = opts_build.url(url).build().unwrap(); + rt.telemetry_mut() + .attach_late_init_metrics(Arc::new(build_otlp_metric_exporter(opts).unwrap())); + } + "prom" => { + let mut opts_build = PrometheusExporterOptionsBuilder::default(); + opts_build.socket_addr(ANY_PORT.parse().unwrap()); + let opts = opts_build.build().unwrap(); + rt.telemetry_mut() + .attach_late_init_metrics(start_prometheus_metric_exporter(opts).unwrap().meter); + } + "no_metrics" => {} + _ => unreachable!(), + } + let wf_name = format!("worker_heartbeat_basic_{backing}"); + let mut starter = CoreWfStarter::new_with_runtime(&wf_name, rt); + starter + .worker_config + .max_outstanding_workflow_tasks(5_usize) + .max_cached_workflows(5_usize) + .max_outstanding_activities(5_usize); + let mut worker = starter.worker().await; + let worker_instance_key = worker.worker_instance_key(); + + // Run a workflow + worker.register_wf(wf_name.to_string(), |ctx: WfContext| async move { + ctx.activity(ActivityOptions { + activity_type: "pass_fail_act".to_string(), + input: "pass".as_json_payload().expect("serializes fine"), + start_to_close_timeout: Some(Duration::from_secs(1)), + ..Default::default() + }) + .await; + Ok(().into()) + }); + + static ACTS_STARTED: Semaphore = Semaphore::const_new(0); + static ACTS_DONE: Semaphore = Semaphore::const_new(0); + worker.register_activity("pass_fail_act", |_ctx: ActContext, i: String| async move { + ACTS_STARTED.add_permits(1); + let _ = ACTS_DONE.acquire().await.unwrap(); + Ok(i) + }); + + starter + .start_with_worker(wf_name.clone(), &mut worker) + .await; + + let start_time = AtomicCell::new(None); + let heartbeat_time = AtomicCell::new(None); + + let test_fut = async { + // Give enough time to ensure heartbeat interval has been hit + tokio::time::sleep(Duration::from_millis(150)).await; + let _ = ACTS_STARTED.acquire().await.unwrap(); + let client = starter.get_client().await; + let mut raw_client = (*client).clone(); + let workers_list = WorkflowService::list_workers( + &mut raw_client, + ListWorkersRequest { + namespace: client.namespace().to_owned(), + page_size: 100, + next_page_token: Vec::new(), + query: String::new(), + }, + ) + .await + .unwrap() + .into_inner(); + let worker_info = workers_list + .workers_info + .iter() + .find(|worker_info| { + if let Some(hb) = worker_info.worker_heartbeat.as_ref() { + hb.worker_instance_key == worker_instance_key.to_string() + } else { + false + } + }) + .unwrap(); + let heartbeat = worker_info.worker_heartbeat.as_ref().unwrap(); + in_activity_checks(heartbeat, &start_time, &heartbeat_time); + ACTS_DONE.add_permits(1); + }; + + let runner = async move { + worker.run_until_done().await.unwrap(); + }; + tokio::join!(test_fut, runner); + + let client = starter.get_client().await; + let mut raw_client = (*client).clone(); + let workers_list = WorkflowService::list_workers( + &mut raw_client, + ListWorkersRequest { + namespace: client.namespace().to_owned(), + page_size: 100, + next_page_token: Vec::new(), + query: String::new(), + }, + ) + .await + .unwrap() + .into_inner(); + // Since list_workers finds all workers in the namespace, must find specific worker used in this + // test + let worker_info = workers_list + .workers_info + .iter() + .find(|worker_info| { + if let Some(hb) = worker_info.worker_heartbeat.as_ref() { + hb.worker_instance_key == worker_instance_key.to_string() + } else { + false + } + }) + .unwrap(); + let heartbeat = worker_info.worker_heartbeat.as_ref().unwrap(); + after_shutdown_checks(heartbeat, &wf_name, &start_time, &heartbeat_time); +} + +// Tests that rely on Prometheus running in a docker container need to start +// with `docker_` and set the `DOCKER_PROMETHEUS_RUNNING` env variable to run +#[tokio::test] +async fn docker_worker_heartbeat_tuner() { + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(get_integ_telem_options()) + .heartbeat_interval(Some(Duration::from_millis(100))) + .build() + .unwrap(); + let mut rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); + + let url = Some("grpc://localhost:4317") + .map(|x| x.parse::().unwrap()) + .unwrap(); + let mut opts_build = OtelCollectorOptionsBuilder::default(); + let opts = opts_build.url(url).build().unwrap(); + + rt.telemetry_mut() + .attach_late_init_metrics(Arc::new(build_otlp_metric_exporter(opts).unwrap())); + let wf_name = "worker_heartbeat_tuner"; + let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); + let mut tuner = ResourceBasedTuner::new(0.0, 0.0); + tuner + .with_workflow_slots_options(ResourceSlotOptions::new(2, 10, Duration::from_millis(0))) + .with_activity_slots_options(ResourceSlotOptions::new(5, 10, Duration::from_millis(50))); + starter + .worker_config + .workflow_task_poller_behavior(PollerBehavior::Autoscaling { + minimum: 1, + maximum: 200, + initial: 5, + }) + .nexus_task_poller_behavior(PollerBehavior::Autoscaling { + minimum: 1, + maximum: 200, + initial: 5, + }) + .clear_max_outstanding_opts() + .tuner(Arc::new(tuner)); + let mut worker = starter.worker().await; + let worker_instance_key = worker.worker_instance_key(); + + // Run a workflow + worker.register_wf(wf_name.to_string(), |ctx: WfContext| async move { + ctx.activity(ActivityOptions { + activity_type: "pass_fail_act".to_string(), + input: "pass".as_json_payload().expect("serializes fine"), + start_to_close_timeout: Some(Duration::from_secs(1)), + ..Default::default() + }) + .await; + Ok(().into()) + }); + worker.register_activity("pass_fail_act", |_ctx: ActContext, i: String| async move { + Ok(i) + }); + + starter.start_with_worker(wf_name, &mut worker).await; + worker.run_until_done().await.unwrap(); + + let client = starter.get_client().await; + let mut raw_client = (*client).clone(); + let workers_list = WorkflowService::list_workers( + &mut raw_client, + ListWorkersRequest { + namespace: client.namespace().to_owned(), + page_size: 100, + next_page_token: Vec::new(), + query: String::new(), + }, + ) + .await + .unwrap() + .into_inner(); + // Since list_workers finds all workers in the namespace, must find specific worker used in this + // test + let worker_info = workers_list + .workers_info + .iter() + .find(|worker_info| { + if let Some(hb) = worker_info.worker_heartbeat.as_ref() { + hb.worker_instance_key == worker_instance_key.to_string() + } else { + false + } + }) + .unwrap(); + let heartbeat = worker_info.worker_heartbeat.as_ref().unwrap(); + assert!(heartbeat.task_queue.starts_with(wf_name)); + + assert_eq!( + heartbeat + .workflow_task_slots_info + .clone() + .unwrap() + .slot_supplier_kind, + "ResourceBased" + ); + assert_eq!( + heartbeat + .activity_task_slots_info + .clone() + .unwrap() + .slot_supplier_kind, + "ResourceBased" + ); + assert_eq!( + heartbeat + .nexus_task_slots_info + .clone() + .unwrap() + .slot_supplier_kind, + "ResourceBased" + ); + assert_eq!( + heartbeat + .local_activity_slots_info + .clone() + .unwrap() + .slot_supplier_kind, + "ResourceBased" + ); + + let workflow_poller_info = heartbeat.workflow_poller_info.unwrap(); + assert!(workflow_poller_info.is_autoscaling); + assert!(within_two_minutes_ts( + workflow_poller_info.last_successful_poll_time.unwrap() + )); + let sticky_poller_info = heartbeat.workflow_sticky_poller_info.unwrap(); + assert!(sticky_poller_info.is_autoscaling); + assert!(within_two_minutes_ts( + sticky_poller_info.last_successful_poll_time.unwrap() + )); + let nexus_poller_info = heartbeat.nexus_poller_info.unwrap(); + assert!(nexus_poller_info.is_autoscaling); + assert!(nexus_poller_info.last_successful_poll_time.is_none()); + let activity_poller_info = heartbeat.activity_poller_info.unwrap(); + assert!(!activity_poller_info.is_autoscaling); + assert!(within_two_minutes_ts( + activity_poller_info.last_successful_poll_time.unwrap() + )); +} + +fn in_activity_checks( + heartbeat: &WorkerHeartbeat, + start_time: &AtomicCell>, + heartbeat_time: &AtomicCell>, +) { + assert_eq!(heartbeat.status, WorkerStatus::Running as i32); + + let workflow_task_slots = heartbeat.workflow_task_slots_info.clone().unwrap(); + assert_eq!(workflow_task_slots.total_processed_tasks, 1); + assert_eq!(workflow_task_slots.current_available_slots, 5); + assert_eq!(workflow_task_slots.current_used_slots, 0); + assert_eq!(workflow_task_slots.slot_supplier_kind, "Fixed"); + let activity_task_slots = heartbeat.activity_task_slots_info.clone().unwrap(); + assert_eq!(activity_task_slots.current_available_slots, 4); + assert_eq!(activity_task_slots.current_used_slots, 1); + assert_eq!(activity_task_slots.slot_supplier_kind, "Fixed"); + let nexus_task_slots = heartbeat.nexus_task_slots_info.clone().unwrap(); + assert_eq!(nexus_task_slots.current_available_slots, 0); + assert_eq!(nexus_task_slots.current_used_slots, 0); + assert_eq!(nexus_task_slots.slot_supplier_kind, "Fixed"); + let local_activity_task_slots = heartbeat.local_activity_slots_info.clone().unwrap(); + assert_eq!(local_activity_task_slots.current_available_slots, 100); + assert_eq!(local_activity_task_slots.current_used_slots, 0); + assert_eq!(local_activity_task_slots.slot_supplier_kind, "Fixed"); + + let workflow_poller_info = heartbeat.workflow_poller_info.unwrap(); + assert_eq!(workflow_poller_info.current_pollers, 1); + let sticky_poller_info = heartbeat.workflow_sticky_poller_info.unwrap(); + assert_ne!(sticky_poller_info.current_pollers, 0); + let nexus_poller_info = heartbeat.nexus_poller_info.unwrap(); + assert_eq!(nexus_poller_info.current_pollers, 0); + let activity_poller_info = heartbeat.activity_poller_info.unwrap(); + assert_ne!(activity_poller_info.current_pollers, 0); + assert_ne!(heartbeat.current_sticky_cache_size, 0); + start_time.store(Some(heartbeat.start_time.unwrap())); + heartbeat_time.store(Some(heartbeat.heartbeat_time.unwrap())); +} + +fn after_shutdown_checks( + heartbeat: &WorkerHeartbeat, + wf_name: &str, + start_time: &AtomicCell>, + heartbeat_time: &AtomicCell>, +) { + assert_eq!(heartbeat.worker_identity, "integ_tester"); + let host_info = heartbeat.host_info.clone().unwrap(); + assert!(!host_info.host_name.is_empty()); + assert!(!host_info.process_key.is_empty()); + assert!(!host_info.process_id.is_empty()); + assert_ne!(host_info.current_host_cpu_usage, 0.0); + assert_ne!(host_info.current_host_mem_usage, 0.0); + assert!(heartbeat.task_queue.starts_with(wf_name)); + assert_eq!( + heartbeat.deployment_version.clone().unwrap().build_id, + "test_build_id" + ); + assert_eq!(heartbeat.sdk_name, "temporal-core"); + assert_eq!(heartbeat.sdk_version, "0.1.0"); + assert_eq!(heartbeat.status, WorkerStatus::Shutdown as i32); + assert_eq!(start_time.load().unwrap(), heartbeat.start_time.unwrap()); + assert_ne!( + heartbeat_time.load().unwrap(), + heartbeat.heartbeat_time.unwrap() + ); + // TODO: heartbeat.heartbeat_time comes after heartbeat_time + assert!(within_two_minutes_ts(heartbeat.start_time.unwrap())); + assert!(within_two_minutes_ts(heartbeat.heartbeat_time.unwrap())); + assert!(within_duration( + heartbeat.elapsed_since_last_heartbeat.unwrap(), + Duration::from_secs(200) + )); + let workflow_task_slots = heartbeat.workflow_task_slots_info.clone().unwrap(); + assert_eq!(workflow_task_slots.current_available_slots, 5); + // TODO: Could be a bug here with "+ extra" from when the metric is recorded in MeteredPermitDealer.build_owned() + assert_eq!(workflow_task_slots.current_used_slots, 1); + assert_eq!(workflow_task_slots.total_processed_tasks, 2); + assert_eq!(workflow_task_slots.slot_supplier_kind, "Fixed"); + let activity_task_slots = heartbeat.activity_task_slots_info.clone().unwrap(); + assert_eq!(activity_task_slots.current_available_slots, 5); + // TODO: Could be a bug here with "+ extra" from when the metric is recorded in MeteredPermitDealer.build_owned() + assert_eq!(workflow_task_slots.current_used_slots, 1); + assert_eq!(activity_task_slots.slot_supplier_kind, "Fixed"); + assert_eq!(activity_task_slots.last_interval_processed_tasks, 1); + let nexus_task_slots = heartbeat.nexus_task_slots_info.clone().unwrap(); + assert_eq!(nexus_task_slots.current_available_slots, 0); + assert_eq!(nexus_task_slots.current_used_slots, 0); + assert_eq!(nexus_task_slots.slot_supplier_kind, "Fixed"); + let local_activity_task_slots = heartbeat.local_activity_slots_info.clone().unwrap(); + assert_eq!(local_activity_task_slots.current_available_slots, 100); + assert_eq!(local_activity_task_slots.current_used_slots, 0); + assert_eq!(local_activity_task_slots.slot_supplier_kind, "Fixed"); + + let workflow_poller_info = heartbeat.workflow_poller_info.unwrap(); + assert!(!workflow_poller_info.is_autoscaling); + assert!(within_two_minutes_ts( + workflow_poller_info.last_successful_poll_time.unwrap() + )); + let sticky_poller_info = heartbeat.workflow_sticky_poller_info.unwrap(); + assert!(!sticky_poller_info.is_autoscaling); + assert!(within_two_minutes_ts( + sticky_poller_info.last_successful_poll_time.unwrap() + )); + let nexus_poller_info = heartbeat.nexus_poller_info.unwrap(); + assert!(!nexus_poller_info.is_autoscaling); + assert!(nexus_poller_info.last_successful_poll_time.is_none()); + let activity_poller_info = heartbeat.activity_poller_info.unwrap(); + assert!(!activity_poller_info.is_autoscaling); + assert!(within_two_minutes_ts( + activity_poller_info.last_successful_poll_time.unwrap() + )); + + assert_eq!(heartbeat.total_sticky_cache_hit, 2); + // TODO: total_sticky_cache_miss + assert_eq!(heartbeat.current_sticky_cache_size, 0); + // TODO: plugin +} + +#[tokio::test] +async fn docker_worker_heartbeat_multiple_workers() { + let wf_name = "worker_heartbeat_multi_workers"; + let mut starter = new_no_metrics_starter(wf_name); + starter + .worker_config + .max_outstanding_workflow_tasks(5_usize) + .max_cached_workflows(5_usize); + + let client = starter.get_client().await; + let starting_hb_len = list_worker_heartbeats(&client, String::new()).await.len(); + + let mut worker_a = starter.worker().await; + worker_a.register_wf(wf_name.to_string(), |_ctx: WfContext| async move { + Ok(().into()) + }); + worker_a.register_activity("failing_act", |_ctx: ActContext, _: String| async move { + Ok(()) + }); + + let mut starter_b = starter.clone_no_worker(); + let mut worker_b = starter_b.worker().await; + worker_b.register_wf(wf_name.to_string(), |_ctx: WfContext| async move { + Ok(().into()) + }); + worker_b.register_activity("failing_act", |_ctx: ActContext, _: String| async move { + Ok(()) + }); + + let worker_a_key = worker_a.worker_instance_key().to_string(); + let worker_b_key = worker_b.worker_instance_key().to_string(); + let _ = starter.start_with_worker(wf_name, &mut worker_a).await; + worker_a.run_until_done().await.unwrap(); + + let _ = starter_b.start_with_worker(wf_name, &mut worker_b).await; + worker_b.run_until_done().await.unwrap(); + + sleep(Duration::from_millis(200)).await; + + let all = list_worker_heartbeats(&client, String::new()).await; + let keys: HashSet<_> = all + .iter() + .map(|hb| hb.worker_instance_key.clone()) + .collect(); + assert!(keys.contains(&worker_a_key)); + assert!(keys.contains(&worker_b_key)); + + // Verify both heartbeats contain the same shared process_key + let process_keys: HashSet<_> = all + .iter() + .filter_map(|hb| hb.host_info.as_ref().map(|info| info.process_key.clone())) + .collect(); + assert!(process_keys.len() > starting_hb_len); + + let filtered = + list_worker_heartbeats(&client, format!("WorkerInstanceKey=\"{worker_a_key}\"")).await; + assert_eq!(filtered.len(), 1); + assert_eq!(filtered[0].worker_instance_key, worker_a_key); + + // Verify describe worker gives the same heartbeat as listworker + let mut raw_client = client.as_ref().clone(); + let describe_worker_a = WorkflowService::describe_worker( + &mut raw_client, + DescribeWorkerRequest { + namespace: client.namespace().to_owned(), + worker_instance_key: worker_a_key.to_string(), + }, + ) + .await + .unwrap() + .into_inner() + .worker_info + .unwrap() + .worker_heartbeat + .unwrap(); + assert_eq!(describe_worker_a, filtered[0]); + + let filtered_b = + list_worker_heartbeats(&client, format!("WorkerInstanceKey = \"{worker_b_key}\"")).await; + assert_eq!(filtered_b.len(), 1); + assert_eq!(filtered_b[0].worker_instance_key, worker_b_key); + let describe_worker_b = WorkflowService::describe_worker( + &mut raw_client, + DescribeWorkerRequest { + namespace: client.namespace().to_owned(), + worker_instance_key: worker_b_key.to_string(), + }, + ) + .await + .unwrap() + .into_inner() + .worker_info + .unwrap() + .worker_heartbeat + .unwrap(); + assert_eq!(describe_worker_b, filtered_b[0]); +} + +#[tokio::test] +async fn docker_worker_heartbeat_failure_metrics() { + let wf_name = "worker_heartbeat_failure_metrics"; + let mut starter = new_no_metrics_starter(wf_name); + starter.worker_config.max_outstanding_activities(5_usize); + + let mut worker = starter.worker().await; + static COUNT: AtomicU64 = AtomicU64::new(0); + + worker.register_wf(wf_name.to_string(), |ctx: WfContext| async move { + println!("[WF] starting"); + COUNT.store(COUNT.load(Ordering::Relaxed) + 1, Ordering::Relaxed); + let _asdf = ctx + .activity(ActivityOptions { + activity_type: "failing_act".to_string(), + input: "boom".as_json_payload().expect("serialize"), + start_to_close_timeout: Some(Duration::from_secs(1)), // TODO: use retry policy instead + retry_policy: Some(RetryPolicy { + maximum_attempts: 3, + ..Default::default() + }), + ..Default::default() + }) + .await; + if COUNT.load(Ordering::Relaxed) == 1 { + println!("[WF] returning error"); + panic!("expected WF panic"); + } + Ok(().into()) + }); + worker.register_activity("failing_act", |_ctx: ActContext, _: String| async move { + if COUNT.load(Ordering::Relaxed) >= 3 { + return Ok(()); + } + Err(anyhow!("Expected error").into()) + }); + + let worker_key = worker.worker_instance_key().to_string(); + starter.workflow_options.retry_policy = Some(RetryPolicy { + maximum_attempts: 2, + ..Default::default() + }); + let _ = starter.start_with_worker(wf_name, &mut worker).await; + + worker.run_until_done().await.unwrap(); + + sleep(Duration::from_millis(150)).await; + let client = starter.get_client().await; + let mut heartbeats = + list_worker_heartbeats(&client, format!("WorkerInstanceKey=\"{worker_key}\"")).await; + assert_eq!(heartbeats.len(), 1); + let heartbeat = heartbeats.pop().unwrap(); + + let activity_slots = heartbeat.activity_task_slots_info.unwrap(); + assert_eq!(activity_slots.total_failed_tasks, 3); + assert!(activity_slots.last_interval_failure_tasks >= 1); + + let workflow_slots = heartbeat.workflow_task_slots_info.unwrap(); + assert_eq!(workflow_slots.total_failed_tasks, 1); +} diff --git a/tests/main.rs b/tests/main.rs index 8a71f03ca..d48a68bd5 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -22,6 +22,7 @@ mod integ_tests { mod queries_tests; mod update_tests; mod visibility_tests; + mod worker_heartbeat_tests; mod worker_tests; mod worker_versioning_tests; mod workflow_tests; diff --git a/tests/runner.rs b/tests/runner.rs index f2d843968..af763cb4a 100644 --- a/tests/runner.rs +++ b/tests/runner.rs @@ -121,6 +121,10 @@ async fn main() -> Result<(), anyhow::Error> { "system.enableDeploymentVersions=true".to_owned(), "--dynamic-config-value".to_owned(), "component.nexusoperations.recordCancelRequestCompletionEvents=true".to_owned(), + "--dynamic-config-value".to_owned(), + "frontend.WorkerHeartbeatsEnabled=true".to_owned(), + "--dynamic-config-value".to_owned(), + "frontend.ListWorkersEnabled=true".to_owned(), "--http-port".to_string(), "7243".to_string(), "--search-attribute".to_string(),