1
1
package pl.allegro.tech.servicemesh.envoycontrol.utils
2
2
3
3
import io.micrometer.core.instrument.MeterRegistry
4
+ import io.micrometer.core.instrument.Tags
4
5
import org.reactivestreams.Subscription
5
6
import org.slf4j.LoggerFactory
6
7
import reactor.core.Disposable
@@ -11,6 +12,7 @@ import reactor.core.scheduler.Scheduler
11
12
import reactor.core.scheduler.Schedulers
12
13
import java.time.Duration
13
14
import java.util.concurrent.TimeUnit
15
+ import kotlin.streams.asSequence
14
16
15
17
private val logger = LoggerFactory .getLogger(" pl.allegro.tech.servicemesh.envoycontrol.utils.ReactorUtils" )
16
18
private val defaultScheduler by lazy { Schedulers .newSingle(" reactor-utils-scheduler" ) }
@@ -50,8 +52,7 @@ fun <T> Flux<T>.measureBuffer(
50
52
fun <T > Flux<T>.measureDiscardedItems (name : String , meterRegistry : MeterRegistry ): Flux <T > = this
51
53
.doOnDiscard(Any ::class .java) {
52
54
meterRegistry.counter(
53
- REACTOR_METRIC ,
54
- METRIC_TYPE_TAG , " discarded-items" ,
55
+ REACTOR_DISCARDED_METRIC ,
55
56
METRIC_EMITTER_TAG , name
56
57
).increment()
57
58
}
@@ -110,7 +111,12 @@ private fun measureQueueSubscriptionBuffer(
110
111
name : String ,
111
112
meterRegistry : MeterRegistry
112
113
) {
113
- logger.info(" subscription $subscription name: $name meterRegistry: $meterRegistry " )
114
+ meterRegistry.gauge(
115
+ REACTOR_METRIC ,
116
+ Tags .of(METRIC_TYPE_TAG , " buffer-size" , METRIC_EMITTER_TAG , name),
117
+ subscription,
118
+ queueSubscriptionBufferExtractor
119
+ )
114
120
}
115
121
116
122
private fun measureScannableBuffer (
@@ -119,7 +125,49 @@ private fun measureScannableBuffer(
119
125
innerSources : Int ,
120
126
meterRegistry : MeterRegistry
121
127
) {
122
- logger.info(" scannable $scannable name: $name innerSources: $innerSources meterRegistry: $meterRegistry " )
128
+ val buffered = scannable.scan(Scannable .Attr .BUFFERED )
129
+ if (buffered == null ) {
130
+ logger.error(
131
+ " Cannot register metric $REACTOR_METRIC 'with $METRIC_EMITTER_TAG : $name '. Buffer size not available. " +
132
+ " Use measureBuffer() only on supported reactor operators"
133
+ )
134
+ return
135
+ }
136
+
137
+ meterRegistry.gauge(
138
+ REACTOR_METRIC ,
139
+ Tags .of(METRIC_TYPE_TAG , " buffer-size" , METRIC_EMITTER_TAG , name),
140
+ scannable,
141
+ scannableBufferExtractor
142
+ )
143
+
144
+ /* *
145
+ * Special case for FlatMap derived operators like merge(). The main buffer attribute doesn't return actual
146
+ * buffer (that is controlled by `prefetch` parameter) size. Instead it returns simply number of connected sources.
147
+ *
148
+ * To access actual buffer size, we need to extract it from inners(). We don't know how many sources will
149
+ * be available, so it must be stated explicitly as innerSources parameter.
150
+ */
151
+ for (i in 0 until innerSources) {
152
+ meterRegistry.gauge(
153
+ REACTOR_METRIC ,
154
+ Tags .of(METRIC_TYPE_TAG , " buffer-size" , METRIC_EMITTER_TAG , " ${(name)} _$i " ),
155
+ scannable,
156
+ innerBufferExtractor(i)
157
+ )
158
+ }
159
+ }
160
+
161
+ private val scannableBufferExtractor = { s: Scannable -> s.scan(Scannable .Attr .BUFFERED )?.toDouble() ? : - 1.0 }
162
+ private fun innerBufferExtractor (index : Int ) = { s: Scannable ->
163
+ s.inners().asSequence()
164
+ .elementAtOrNull(index)
165
+ ?.let (scannableBufferExtractor)
166
+ ? : - 1.0
167
+ }
168
+
169
+ private val queueSubscriptionBufferExtractor = { s: Fuseable .QueueSubscription <* > ->
170
+ s.size.toDouble()
123
171
}
124
172
125
173
sealed class ParallelizableScheduler
0 commit comments