Skip to content

Commit 616c782

Browse files
allegro-internal/flex-roadmap#819 returned all other metrics
1 parent c191571 commit 616c782

File tree

9 files changed

+100
-31
lines changed

9 files changed

+100
-31
lines changed

envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import io.micrometer.core.instrument.MeterRegistry
66
import io.micrometer.core.instrument.Tags
77
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC
88
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG
9+
import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG
10+
import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC
911
import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG
1012
import java.util.concurrent.atomic.AtomicInteger
1113
import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest
@@ -58,14 +60,30 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)
5860
}
5961

6062
override fun onV3StreamRequest(streamId: Long, request: V3DiscoveryRequest) {
61-
// noop
63+
meterRegistry.counter(
64+
REQUESTS_METRIC,
65+
Tags.of(
66+
CONNECTION_TYPE_TAG, "grpc",
67+
STREAM_TYPE_TAG, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(),
68+
DISCOVERY_REQ_TYPE_TAG, "total"
69+
)
70+
)
71+
.increment()
6272
}
6373

6474
override fun onV3StreamDeltaRequest(
6575
streamId: Long,
6676
request: V3DeltaDiscoveryRequest
6777
) {
68-
// noop
78+
meterRegistry.counter(
79+
REQUESTS_METRIC,
80+
Tags.of(
81+
CONNECTION_TYPE_TAG, "grpc",
82+
STREAM_TYPE_TAG, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(),
83+
DISCOVERY_REQ_TYPE_TAG, "delta"
84+
)
85+
)
86+
.increment()
6987
}
7088

7189
override fun onStreamCloseWithError(streamId: Long, typeUrl: String, error: Throwable) {

envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.logger
1212
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
1313
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
1414
import pl.allegro.tech.servicemesh.envoycontrol.utils.COMMUNICATION_MODE_ERROR_METRIC
15-
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
1615
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
1716
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler
1817
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
@@ -186,8 +185,7 @@ class SnapshotUpdater(
186185
SNAPSHOT_GROUP_ERROR_METRIC,
187186
Tags.of(
188187
SERVICE_TAG, group.serviceName,
189-
OPERATION_TAG, "create-snapshot",
190-
METRIC_EMITTER_TAG, "snapshot-updater"
188+
OPERATION_TAG, "create-snapshot"
191189
)
192190
).increment()
193191
logger.error("Unable to create snapshot for group ${group.serviceName}", e)

envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_TAG
1212
import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_CANCELLED_METRIC
1313
import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_SECONDS_METRIC
1414
import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_TOTAL_METRIC
15-
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
1615
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
1716
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_ERRORS_METRIC
1817
import reactor.core.publisher.Flux
@@ -75,8 +74,7 @@ class RemoteServices(
7574
SERVICES_STATE_ERRORS_METRIC,
7675
Tags.of(
7776
CLUSTER_TAG, cluster,
78-
OPERATION_TAG, "get-state",
79-
METRIC_EMITTER_TAG, "cross-dc-synchronization"
77+
OPERATION_TAG, "get-state"
8078
)
8179
).increment()
8280
logger.warn("Error synchronizing instances ${it.message}", it)
@@ -93,8 +91,7 @@ class RemoteServices(
9391
SERVICES_STATE_ERRORS_METRIC,
9492
Tags.of(
9593
CLUSTER_TAG, cluster,
96-
OPERATION_TAG, "get-instances",
97-
METRIC_EMITTER_TAG, "cross-dc-synchronization"
94+
OPERATION_TAG, "get-instances"
9895
)
9996
).increment()
10097
logger.warn("Failed fetching instances from $cluster", e)

envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@ import io.micrometer.core.instrument.noop.NoopTimer
77
val noopTimer = NoopTimer(Meter.Id("", Tags.empty(), null, null, Meter.Type.TIMER))
88
const val REACTOR_METRIC = "reactor.stats"
99
const val SERVICES_STATE_METRIC = "services.state"
10-
const val SERVICES_STATE_ERRORS_METRIC = "services.state.errors"
10+
const val SERVICES_STATE_ERRORS_METRIC = "services.state.errors.total"
1111
const val SNAPSHOT_METRIC = "snapshot"
1212
const val SNAPSHOT_UPDATE_DURATION_METRIC = "snapshot.update.duration.seconds"
1313
const val SNAPSHOT_ERROR_METRIC = "snapshot.errors"
14-
const val SNAPSHOT_GROUP_ERROR_METRIC = "snapshot.group.errors"
15-
const val WATCH_ERRORS_METRIC = "watch.errors.total"
14+
const val SNAPSHOT_GROUP_ERROR_METRIC = "snapshot.group.errors.total"
1615
const val COMMUNICATION_MODE_ERROR_METRIC = "communication.errors.total"
1716
const val CONNECTIONS_METRIC = "connections.stats"
18-
const val REQUESTS_METRIC = "stream.requests"
19-
const val WATCH_METRIC = "watch.stats"
17+
const val REQUESTS_METRIC = "requests.stats"
18+
const val WATCH_ERRORS_METRIC = "service.watch.errors.total"
19+
const val WATCH_METRIC = "service.watch"
2020
const val ENVOY_CONTROL_WARM_UP_METRIC = "envoy.control.warmup.seconds"
2121
const val CROSS_DC_SYNC_METRIC = "cross.dc.synchronization"
2222
const val CROSS_DC_SYNC_CANCELLED_METRIC = "$CROSS_DC_SYNC_METRIC.cancelled.total"
@@ -35,7 +35,6 @@ const val WATCH_TYPE_TAG = "watch-type"
3535
const val DISCOVERY_REQ_TYPE_TAG = "discovery-request-type"
3636
const val METRIC_TYPE_TAG = "metric-type"
3737
const val METRIC_EMITTER_TAG = "metric-emitter"
38-
const val SNAPSHOT_STATUS_TAG = "snapshot-status"
3938
const val UPDATE_TRIGGER_TAG = "update-trigger"
4039
const val SERVICE_TAG = "service"
4140
const val OPERATION_TAG = "operation"

envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pl.allegro.tech.servicemesh.envoycontrol.utils
22

33
import io.micrometer.core.instrument.MeterRegistry
4+
import io.micrometer.core.instrument.Tags
45
import org.reactivestreams.Subscription
56
import org.slf4j.LoggerFactory
67
import reactor.core.Disposable
@@ -11,6 +12,7 @@ import reactor.core.scheduler.Scheduler
1112
import reactor.core.scheduler.Schedulers
1213
import java.time.Duration
1314
import java.util.concurrent.TimeUnit
15+
import kotlin.streams.asSequence
1416

1517
private val logger = LoggerFactory.getLogger("pl.allegro.tech.servicemesh.envoycontrol.utils.ReactorUtils")
1618
private val defaultScheduler by lazy { Schedulers.newSingle("reactor-utils-scheduler") }
@@ -110,7 +112,12 @@ private fun measureQueueSubscriptionBuffer(
110112
name: String,
111113
meterRegistry: MeterRegistry
112114
) {
113-
logger.info("subscription $subscription name: $name meterRegistry: $meterRegistry")
115+
meterRegistry.gauge(
116+
REACTOR_METRIC,
117+
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name),
118+
subscription,
119+
queueSubscriptionBufferExtractor
120+
)
114121
}
115122

116123
private fun measureScannableBuffer(
@@ -119,7 +126,49 @@ private fun measureScannableBuffer(
119126
innerSources: Int,
120127
meterRegistry: MeterRegistry
121128
) {
122-
logger.info("scannable $scannable name: $name innerSources: $innerSources meterRegistry: $meterRegistry")
129+
val buffered = scannable.scan(Scannable.Attr.BUFFERED)
130+
if (buffered == null) {
131+
logger.error(
132+
"Cannot register metric $REACTOR_METRIC 'with $METRIC_EMITTER_TAG: $name'. Buffer size not available. " +
133+
"Use measureBuffer() only on supported reactor operators"
134+
)
135+
return
136+
}
137+
138+
meterRegistry.gauge(
139+
REACTOR_METRIC,
140+
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name),
141+
scannable,
142+
scannableBufferExtractor
143+
)
144+
145+
/**
146+
* Special case for FlatMap derived operators like merge(). The main buffer attribute doesn't return actual
147+
* buffer (that is controlled by `prefetch` parameter) size. Instead it returns simply number of connected sources.
148+
*
149+
* To access actual buffer size, we need to extract it from inners(). We don't know how many sources will
150+
* be available, so it must be stated explicitly as innerSources parameter.
151+
*/
152+
for (i in 0 until innerSources) {
153+
meterRegistry.gauge(
154+
REACTOR_METRIC,
155+
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "${(name)}_$i"),
156+
scannable,
157+
innerBufferExtractor(i)
158+
)
159+
}
160+
}
161+
162+
private val scannableBufferExtractor = { s: Scannable -> s.scan(Scannable.Attr.BUFFERED)?.toDouble() ?: -1.0 }
163+
private fun innerBufferExtractor(index: Int) = { s: Scannable ->
164+
s.inners().asSequence()
165+
.elementAtOrNull(index)
166+
?.let(scannableBufferExtractor)
167+
?: -1.0
168+
}
169+
170+
private val queueSubscriptionBufferExtractor = { s: Fuseable.QueueSubscription<*> ->
171+
s.size.toDouble()
123172
}
124173

125174
sealed class ParallelizableScheduler

envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyIn
5757
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.RequestPolicyMapper
5858
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator
5959
import pl.allegro.tech.servicemesh.envoycontrol.utils.DirectScheduler
60-
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
6160
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
6261
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelScheduler
6362
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler
@@ -477,8 +476,7 @@ class SnapshotUpdaterTest {
477476
.tags(
478477
Tags.of(
479478
SERVICE_TAG, "example-service",
480-
OPERATION_TAG, "create-snapshot",
481-
METRIC_EMITTER_TAG, "snapshot-updater"
479+
OPERATION_TAG, "create-snapshot"
482480
)
483481
)
484482
.counter()?.count()

envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils
33
import io.micrometer.core.instrument.Tags
44
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
55
import org.assertj.core.api.Assertions.assertThat
6-
import org.junit.jupiter.api.Disabled
76
import org.junit.jupiter.api.Test
87
import org.junit.jupiter.api.fail
98
import org.testcontainers.shaded.org.awaitility.Awaitility
@@ -13,7 +12,6 @@ import java.util.concurrent.CountDownLatch
1312
import java.util.concurrent.TimeUnit
1413
import java.util.function.BiFunction
1514

16-
@Disabled
1715
class ReactorUtilsTest {
1816

1917
@Test

envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.infrastructure
33
import com.ecwid.consul.v1.ConsulClient
44
import com.fasterxml.jackson.databind.ObjectMapper
55
import io.micrometer.core.instrument.MeterRegistry
6+
import io.micrometer.core.instrument.Tags
67
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
78
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
89
import org.springframework.boot.context.properties.ConfigurationProperties
@@ -40,6 +41,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.RegexServi
4041
import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.ServiceInstancesTransformer
4142
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters
4243
import pl.allegro.tech.servicemesh.envoycontrol.synchronization.GlobalStateChanges
44+
import pl.allegro.tech.servicemesh.envoycontrol.utils.CACHE_GROUP_COUNT_METRIC
45+
import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG
46+
import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_ERRORS_METRIC
47+
import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_METRIC
4348
import reactor.core.scheduler.Schedulers
4449
import java.net.URI
4550

@@ -172,7 +177,18 @@ class ControlPlaneConfig {
172177
ConsulClient(properties.host, properties.port).agentSelf.value?.config?.datacenter ?: "local"
173178

174179
fun controlPlaneMetrics(meterRegistry: MeterRegistry): DefaultEnvoyControlMetrics {
175-
return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry)
180+
return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry).also {
181+
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "added"), it.servicesAdded)
182+
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "removed"), it.servicesRemoved)
183+
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "instance-changed"), it.instanceChanges)
184+
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "snapshot-changed"), it.snapshotChanges)
185+
meterRegistry.gauge(CACHE_GROUP_COUNT_METRIC, it.cacheGroupsCount)
186+
it.meterRegistry.more().counter(
187+
WATCH_ERRORS_METRIC,
188+
listOf(),
189+
it.errorWatchingServices
190+
)
191+
}
176192
}
177193

178194
@Bean

envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package pl.allegro.tech.servicemesh.envoycontrol
22

33
import io.micrometer.core.instrument.Tags
44
import org.assertj.core.api.Assertions.assertThat
5-
import org.junit.jupiter.api.Disabled
65
import org.junit.jupiter.api.Test
76
import org.junit.jupiter.api.extension.RegisterExtension
87
import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted
@@ -21,15 +20,14 @@ import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscover
2120
import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.RDS
2221
import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.SDS
2322
import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.UNKNOWN
24-
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG
2523
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC
24+
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG
2625
import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG
2726
import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC
2827
import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG
2928
import java.util.function.Consumer
3029
import java.util.function.Predicate
3130

32-
@Disabled
3331
class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTest {
3432
companion object {
3533

@@ -89,7 +87,6 @@ class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTe
8987
)
9088
}
9189

92-
@Disabled
9390
class AdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTest {
9491
companion object {
9592

@@ -149,7 +146,6 @@ class AdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTes
149146
)
150147
}
151148

152-
@Disabled
153149
class DeltaAdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTest {
154150
companion object {
155151

@@ -209,7 +205,6 @@ class DeltaAdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbac
209205
)
210206
}
211207

212-
@Disabled
213208
interface MetricsDiscoveryServerCallbacksTest {
214209
companion object {
215210
private val logger by logger()
@@ -251,7 +246,8 @@ interface MetricsDiscoveryServerCallbacksTest {
251246
).isNotNull
252247
assertThat(
253248
meterRegistry.get(metric)
254-
.tags(Tags.of(STREAM_TYPE_TAG, type.name.lowercase(), CONNECTION_TYPE_TAG, "grpc")).gauge().value()
249+
.tags(Tags.of(STREAM_TYPE_TAG, type.name.lowercase(), CONNECTION_TYPE_TAG, "grpc")).gauge()
250+
.value()
255251
.toInt()
256252
).isEqualTo(value)
257253
}

0 commit comments

Comments
 (0)