Skip to content

Commit

Permalink
added prometheus dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Oct 17, 2024
1 parent f43cedc commit 7efeb3c
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import io.envoyproxy.controlplane.cache.XdsRequest
import io.micrometer.core.instrument.MeterRegistry
import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlMetrics
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHANGE_WATCHER_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_TYPE_TAG
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
Expand All @@ -38,7 +38,7 @@ internal class GroupChangeWatcher(
return groupsChanged
.measureBuffer("group-change-watcher", meterRegistry)
.checkpoint("group-change-watcher-emitted")
.name(REACTOR_METRIC)
.name(CHANGE_WATCHER_METRIC)
.tag(WATCH_TYPE_TAG, "group")
.metrics()
.doOnSubscribe {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.COMMUNICATION_MODE_ERROR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
Expand Down Expand Up @@ -62,7 +64,7 @@ class SnapshotUpdater(
)
.measureBuffer("snapshot-updater", meterRegistry, innerSources = 2)
.checkpoint("snapshot-updater-merged")
.name(REACTOR_METRIC)
.name(SNAPSHOT_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "merged")
.tag(UPDATE_TRIGGER_TAG, "global")
Expand Down Expand Up @@ -106,7 +108,7 @@ class SnapshotUpdater(
.map { groups ->
UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups)
}
.name(REACTOR_METRIC)
.name(SNAPSHOT_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "published")
.tag(UPDATE_TRIGGER_TAG, "groups")
Expand All @@ -124,17 +126,16 @@ class SnapshotUpdater(

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.name(REACTOR_METRIC)
.tag(UPDATE_TRIGGER_TAG, "services")
.tag(STATUS_TAG, "sampled")
.name(SERVICES_STATE_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(CHECKPOINT_TAG, "sampled")
.onBackpressureLatestMeasured("snapshot-updater", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot-updater", meterRegistry)
.checkpoint("snapshot-updater-services-published")
.name(REACTOR_METRIC)
.tag(UPDATE_TRIGGER_TAG, "services")
.tag(STATUS_TAG, "published")
.name(SERVICES_STATE_METRIC)
.tag(CHECKPOINT_TAG, "published")
.metrics()
.createClusterConfigurations()
.map { (states, clusters) ->
Expand Down Expand Up @@ -213,7 +214,7 @@ class SnapshotUpdater(
} else if (result.xdsSnapshot != null && group.communicationMode == XDS) {
updateSnapshotForGroup(group, result.xdsSnapshot)
} else {
meterRegistry.counter(ERRORS_TOTAL_METRIC, Tags.of("type", "communication-mode")).increment()
meterRegistry.counter(COMMUNICATION_MODE_ERROR_METRIC).increment()
logger.error(
"Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " +
"Handling Envoy with not supported communication mode should have been rejected before." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
Expand Down Expand Up @@ -47,8 +47,9 @@ class GlobalStateChanges(
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combinator", meterRegistry)
.checkpoint("global-service-changes-emitted")
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes-combinator")
.name(SERVICES_STATE_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes")
.tag(CHECKPOINT_TAG, "combined")
.metrics()
}

Expand Down Expand Up @@ -76,7 +77,7 @@ class GlobalStateChanges(
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combine-latest", meterRegistry)
.checkpoint("global-service-changes-emitted")
.name(REACTOR_METRIC)
.name(SERVICES_STATE_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes")
.tag(CHECKPOINT_TAG, "emitted")
.onBackpressureLatestMeasured("global-service-changes-backpressure", meterRegistry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization
import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlProperties
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import reactor.core.publisher.Flux

class RemoteClusterStateChanges(
Expand All @@ -16,7 +17,8 @@ class RemoteClusterStateChanges(
.getChanges(properties.sync.pollingInterval)
.startWith(MultiClusterState.empty())
.distinctUntilChanged()
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "cross-dc-synchronisation")
.name(SERVICES_STATE_METRIC)
.tag(METRIC_EMITTER_TAG, "remote-cluster-changes")
.tag(CHECKPOINT_TAG, "cross-dc")
.metrics()
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.noop.NoopTimer

val noopTimer = NoopTimer(Meter.Id("", Tags.empty(), null, null, Meter.Type.TIMER))
const val REACTOR_METRIC = "reactor"
const val REACTOR_METRIC = "reactor.stats"
const val SERVICES_STATE_METRIC = "services.state"
const val SNAPSHOT_METRIC = "snapshot"
const val ERRORS_TOTAL_METRIC = "errors.total"
const val COMMUNICATION_MODE_ERROR_METRIC = "communication.errors.total"
const val CONNECTIONS_METRIC = "connections"
const val REQUESTS_METRIC = "requests.total"
const val WATCH_METRIC = "watch"
Expand All @@ -19,6 +22,7 @@ const val SIMPLE_CACHE_METRIC = "simple.cache.duration.seconds"
const val PROTOBUF_CACHE_METRIC = "protobuf.cache.serialize.time"
const val CACHE_GROUP_COUNT_METRIC = "cache.groups.count"
const val SNAPSHOT_FACTORY_SECONDS_METRIC = "snapshot.factory.seconds"
const val CHANGE_WATCHER_METRIC = "group.change.watcher"

const val CONNECTION_TYPE_TAG = "connection-type"
const val STREAM_TYPE_TAG = "stream-type"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.server.ReadinessStateHandler
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState
import pl.allegro.tech.servicemesh.envoycontrol.utils.ENVOY_CONTROL_WARM_UP_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureDiscardedItems
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.ENVOY_CONTROL_WARM_UP_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureDiscardedItems
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import java.time.Duration
Expand Down Expand Up @@ -57,7 +57,7 @@ class ConsulServiceChanges(
)
.measureDiscardedItems("consul-service-changes", metrics.meterRegistry)
.checkpoint("consul-service-changes-emitted")
.name(REACTOR_METRIC)
.name(SERVICES_STATE_METRIC)
.tag(METRIC_EMITTER_TAG, "consul-service-changes")
.tag(CHECKPOINT_TAG, "emitted")
.checkpoint("consul-service-changes-emitted-distinct")
Expand Down

0 comments on commit 7efeb3c

Please sign in to comment.