From 7efeb3c429df34aca518562bf3fad1ac426dae1a Mon Sep 17 00:00:00 2001 From: "nastassia.dailidava" Date: Thu, 17 Oct 2024 18:20:32 +0200 Subject: [PATCH] added prometheus dependency --- .../envoycontrol/groups/GroupChangeWatcher.kt | 4 ++-- .../envoycontrol/snapshot/SnapshotUpdater.kt | 23 ++++++++++--------- .../synchronization/GlobalStateChanges.kt | 9 ++++---- .../RemoteClusterStateChanges.kt | 8 ++++--- .../servicemesh/envoycontrol/utils/Metrics.kt | 6 ++++- .../consul/services/ConsulServiceChanges.kt | 8 +++---- 6 files changed, 33 insertions(+), 25 deletions(-) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt index 56cd44685..dfba4912c 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt @@ -10,8 +10,8 @@ import io.envoyproxy.controlplane.cache.XdsRequest import io.micrometer.core.instrument.MeterRegistry import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlMetrics import pl.allegro.tech.servicemesh.envoycontrol.logger +import pl.allegro.tech.servicemesh.envoycontrol.utils.CHANGE_WATCHER_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_TYPE_TAG import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink @@ -38,7 +38,7 @@ internal class GroupChangeWatcher( return groupsChanged .measureBuffer("group-change-watcher", meterRegistry) .checkpoint("group-change-watcher-emitted") - .name(REACTOR_METRIC) + .name(CHANGE_WATCHER_METRIC) .tag(WATCH_TYPE_TAG, "group") .metrics() .doOnSubscribe { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt index 2ca421461..cf679c0ff 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt @@ -10,15 +10,17 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS import pl.allegro.tech.servicemesh.envoycontrol.groups.Group import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState +import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.COMMUNICATION_MODE_ERROR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer @@ -62,7 +64,7 @@ class SnapshotUpdater( ) .measureBuffer("snapshot-updater", meterRegistry, innerSources = 2) .checkpoint("snapshot-updater-merged") - .name(REACTOR_METRIC) + .name(SNAPSHOT_METRIC) .tag(METRIC_EMITTER_TAG, "snapshot-updater") .tag(SNAPSHOT_STATUS_TAG, "merged") .tag(UPDATE_TRIGGER_TAG, "global") @@ -106,7 +108,7 @@ class SnapshotUpdater( .map { groups -> UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups) } - .name(REACTOR_METRIC) + .name(SNAPSHOT_METRIC) .tag(METRIC_EMITTER_TAG, "snapshot-updater") .tag(SNAPSHOT_STATUS_TAG, "published") .tag(UPDATE_TRIGGER_TAG, "groups") @@ -124,17 +126,16 @@ class SnapshotUpdater( internal fun services(states: Flux): Flux { return states - .name(REACTOR_METRIC) - .tag(UPDATE_TRIGGER_TAG, "services") - .tag(STATUS_TAG, "sampled") + .name(SERVICES_STATE_METRIC) + .tag(METRIC_EMITTER_TAG, "snapshot-updater") + .tag(CHECKPOINT_TAG, "sampled") .onBackpressureLatestMeasured("snapshot-updater", meterRegistry) // prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure .publishOn(globalSnapshotScheduler, 1) .measureBuffer("snapshot-updater", meterRegistry) .checkpoint("snapshot-updater-services-published") - .name(REACTOR_METRIC) - .tag(UPDATE_TRIGGER_TAG, "services") - .tag(STATUS_TAG, "published") + .name(SERVICES_STATE_METRIC) + .tag(CHECKPOINT_TAG, "published") .metrics() .createClusterConfigurations() .map { (states, clusters) -> @@ -213,7 +214,7 @@ class SnapshotUpdater( } else if (result.xdsSnapshot != null && group.communicationMode == XDS) { updateSnapshotForGroup(group, result.xdsSnapshot) } else { - meterRegistry.counter(ERRORS_TOTAL_METRIC, Tags.of("type", "communication-mode")).increment() + meterRegistry.counter(COMMUNICATION_MODE_ERROR_METRIC).increment() logger.error( "Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " + "Handling Envoy with not supported communication mode should have been rejected before." + diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt index 912b6f57f..5bb778dc1 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt @@ -6,7 +6,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured @@ -47,8 +47,9 @@ class GlobalStateChanges( .logSuppressedError("combineLatest() suppressed exception") .measureBuffer("global-service-changes-combinator", meterRegistry) .checkpoint("global-service-changes-emitted") - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "global-service-changes-combinator") + .name(SERVICES_STATE_METRIC) + .tag(METRIC_EMITTER_TAG, "global-service-changes") + .tag(CHECKPOINT_TAG, "combined") .metrics() } @@ -76,7 +77,7 @@ class GlobalStateChanges( .logSuppressedError("combineLatest() suppressed exception") .measureBuffer("global-service-changes-combine-latest", meterRegistry) .checkpoint("global-service-changes-emitted") - .name(REACTOR_METRIC) + .name(SERVICES_STATE_METRIC) .tag(METRIC_EMITTER_TAG, "global-service-changes") .tag(CHECKPOINT_TAG, "emitted") .onBackpressureLatestMeasured("global-service-changes-backpressure", meterRegistry) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt index ee85877b8..e20075b6a 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt @@ -3,8 +3,9 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlProperties import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState +import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC import reactor.core.publisher.Flux class RemoteClusterStateChanges( @@ -16,7 +17,8 @@ class RemoteClusterStateChanges( .getChanges(properties.sync.pollingInterval) .startWith(MultiClusterState.empty()) .distinctUntilChanged() - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "cross-dc-synchronisation") + .name(SERVICES_STATE_METRIC) + .tag(METRIC_EMITTER_TAG, "remote-cluster-changes") + .tag(CHECKPOINT_TAG, "cross-dc") .metrics() } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt index 6a99f0fa0..678e67b19 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt @@ -5,8 +5,11 @@ import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.noop.NoopTimer val noopTimer = NoopTimer(Meter.Id("", Tags.empty(), null, null, Meter.Type.TIMER)) -const val REACTOR_METRIC = "reactor" +const val REACTOR_METRIC = "reactor.stats" +const val SERVICES_STATE_METRIC = "services.state" +const val SNAPSHOT_METRIC = "snapshot" const val ERRORS_TOTAL_METRIC = "errors.total" +const val COMMUNICATION_MODE_ERROR_METRIC = "communication.errors.total" const val CONNECTIONS_METRIC = "connections" const val REQUESTS_METRIC = "requests.total" const val WATCH_METRIC = "watch" @@ -19,6 +22,7 @@ const val SIMPLE_CACHE_METRIC = "simple.cache.duration.seconds" const val PROTOBUF_CACHE_METRIC = "protobuf.cache.serialize.time" const val CACHE_GROUP_COUNT_METRIC = "cache.groups.count" const val SNAPSHOT_FACTORY_SECONDS_METRIC = "snapshot.factory.seconds" +const val CHANGE_WATCHER_METRIC = "group.change.watcher" const val CONNECTION_TYPE_TAG = "connection-type" const val STREAM_TYPE_TAG = "stream-type" diff --git a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt index 39b01cc79..fe190af33 100644 --- a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt +++ b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt @@ -13,11 +13,11 @@ import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.server.ReadinessStateHandler import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState -import pl.allegro.tech.servicemesh.envoycontrol.utils.ENVOY_CONTROL_WARM_UP_METRIC -import pl.allegro.tech.servicemesh.envoycontrol.utils.measureDiscardedItems import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.ENVOY_CONTROL_WARM_UP_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.measureDiscardedItems import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.time.Duration @@ -57,7 +57,7 @@ class ConsulServiceChanges( ) .measureDiscardedItems("consul-service-changes", metrics.meterRegistry) .checkpoint("consul-service-changes-emitted") - .name(REACTOR_METRIC) + .name(SERVICES_STATE_METRIC) .tag(METRIC_EMITTER_TAG, "consul-service-changes") .tag(CHECKPOINT_TAG, "emitted") .checkpoint("consul-service-changes-emitted-distinct")