Skip to content

Commit eca1f8e

Browse files
authored
Merge pull request #2240 from atlanhq/PLT-1173-main
Plt 1173 - Add support for micrometer in metastore
2 parents cb05bd5 + 26babf6 commit eca1f8e

File tree

15 files changed

+318
-7
lines changed

15 files changed

+318
-7
lines changed

client-keycloak/src/main/java/org/apache/atlas/keycloak/client/AbstractKeycloakClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package org.apache.atlas.keycloak.client;
22

33
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.micrometer.core.instrument.Timer;
45
import okhttp3.*;
56
import okhttp3.logging.HttpLoggingInterceptor;
67
import org.apache.atlas.AtlasErrorCode;
78
import org.apache.atlas.exception.AtlasBaseException;
89
import org.apache.atlas.keycloak.client.config.KeycloakConfig;
910
import org.apache.atlas.keycloak.client.service.AtlasKeycloakAuthService;
11+
import org.apache.atlas.service.metrics.MetricUtils;
1012
import org.slf4j.Logger;
1113
import org.slf4j.LoggerFactory;
1214
import org.springframework.lang.NonNull;
@@ -33,11 +35,14 @@ abstract class AbstractKeycloakClient {
3335
private static final String AUTHORIZATION = "Authorization";
3436
private static final String BEARER = "Bearer ";
3537
private static final int TIMEOUT_IN_SEC = 60;
38+
private static final String INTEGRATION = "integration";
39+
private static final String KEYCLOAK = "keycloak";
3640

3741
protected final KeycloakConfig keycloakConfig;
3842
protected final RetrofitKeycloakClient retrofit;
3943

4044
private final AtlasKeycloakAuthService authService;
45+
private MetricUtils metricUtils = null;
4146

4247
static {
4348
ERROR_CODE_MAP.put(HTTP_NOT_FOUND, RESOURCE_NOT_FOUND);
@@ -46,6 +51,7 @@ abstract class AbstractKeycloakClient {
4651

4752
public AbstractKeycloakClient(KeycloakConfig keycloakConfig) {
4853
this.keycloakConfig = keycloakConfig;
54+
this.metricUtils = new MetricUtils();
4955
HttpLoggingInterceptor httpInterceptor = new HttpLoggingInterceptor();
5056
httpInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
5157
OkHttpClient okHttpClient = new OkHttpClient.Builder()
@@ -70,7 +76,11 @@ public AbstractKeycloakClient(KeycloakConfig keycloakConfig) {
7076
*/
7177
Interceptor responseLoggingInterceptor = chain -> {
7278
Request request = chain.request();
79+
String rawPath = request.url().uri().getRawPath();
80+
Timer.Sample timerSample = this.metricUtils.start(rawPath);
7381
okhttp3.Response response = chain.proceed(request);
82+
this.metricUtils.recordHttpTimer(timerSample, request.method(), rawPath, response.code(),
83+
INTEGRATION, KEYCLOAK);
7484
LOG.info("Keycloak: Request for url {} Status:{}", request.url(), response.code());
7585
return response;
7686
};

common/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@
104104
<scope>compile</scope>
105105
</dependency>
106106

107+
<dependency>
108+
<groupId>io.micrometer</groupId>
109+
<artifactId>micrometer-registry-prometheus</artifactId>
110+
<version>${micrometer.version}</version>
111+
</dependency>
112+
107113
<dependency>
108114
<groupId>com.google.guava</groupId>
109115
<artifactId>guava</artifactId>
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package org.apache.atlas.service.metrics;
2+
3+
import io.micrometer.core.instrument.Metrics;
4+
import io.micrometer.core.instrument.Tags;
5+
import io.micrometer.core.instrument.Timer;
6+
import io.micrometer.prometheus.PrometheusConfig;
7+
import io.micrometer.prometheus.PrometheusMeterRegistry;
8+
import org.apache.atlas.ApplicationProperties;
9+
import org.apache.commons.lang.StringUtils;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
import org.springframework.stereotype.Component;
13+
14+
import java.util.*;
15+
import java.util.stream.Collectors;
16+
17+
import static org.apache.commons.lang.StringUtils.EMPTY;
18+
19+
@Component
20+
public class MetricUtils {
21+
private final static Logger LOG = LoggerFactory.getLogger(MetricUtils.class);
22+
23+
private static final String URI = "uri";
24+
private static final String LOCAL = "local";
25+
private static final String STATUS = "status";
26+
private static final String METHOD = "method";
27+
private static final String SERVICE = "service";
28+
private static final String INTEGRATION = "integration";
29+
private static final String ATLAS_METASTORE = "atlas-metastore";
30+
private static final String REGEX_URI_PLACEHOLDER = "\\[\\^/\\]\\+";
31+
private static final String HTTP_SERVER_REQUESTS = "http.server.requests";
32+
private static final String ATLAS_METRICS_URI_PATTERNS = "atlas.metrics.uri_patterns";
33+
private static final double[] PERCENTILES = {0.5, 0.90, 0.99};
34+
35+
private static Map<String, String> METRIC_URI_PATTERNS_MAP;
36+
private static final PrometheusMeterRegistry METER_REGISTRY;
37+
38+
static {
39+
METER_REGISTRY = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
40+
METER_REGISTRY.config().withHighCardinalityTagsDetector().commonTags(SERVICE, ATLAS_METASTORE, INTEGRATION, LOCAL);
41+
Metrics.globalRegistry.add(METER_REGISTRY);
42+
}
43+
44+
public MetricUtils() {
45+
try {
46+
METRIC_URI_PATTERNS_MAP = Arrays.stream(ApplicationProperties.get().getStringArray(ATLAS_METRICS_URI_PATTERNS))
47+
.distinct().collect(Collectors.toMap(uri->uri, uri->uri.replaceAll(REGEX_URI_PLACEHOLDER, "*")));
48+
} catch (Exception e) {
49+
LOG.error("Failed to load 'atlas.metrics.uri_patterns from properties");
50+
}
51+
}
52+
53+
public Timer.Sample start(String uri) {
54+
return matchCanonicalPattern(uri).isPresent() ? Timer.start(getMeterRegistry()) : null;
55+
}
56+
57+
public void recordHttpTimer(Timer.Sample sample, String method, String rawPath, int code, String... additionalTags) {
58+
if (Objects.isNull(sample)) {
59+
return;
60+
}
61+
sample.stop(getTimer(HTTP_SERVER_REQUESTS, method, code, rawPath, additionalTags));
62+
}
63+
64+
private Timer getTimer(String timerName, String method, int code, String rawPath, String... additionalTags) {
65+
Tags tags = getTags(method, code, rawPath);
66+
if (Objects.nonNull(additionalTags) && additionalTags.length > 0) {
67+
tags = tags.and(additionalTags);
68+
}
69+
return Timer.builder(timerName)
70+
.publishPercentiles(PERCENTILES)
71+
.tags(tags)
72+
.register(getMeterRegistry());
73+
}
74+
75+
private Tags getTags(String httpMethod, int httpResponseStatus, String uri) {
76+
return Tags.of(METHOD, httpMethod,
77+
STATUS, String.valueOf(httpResponseStatus),
78+
URI, matchCanonicalPattern(uri).get());
79+
}
80+
81+
private Optional<String> matchCanonicalPattern(String uri) {
82+
if (Objects.isNull(uri) || uri.isEmpty()) {
83+
return Optional.empty();
84+
}
85+
if (uri.endsWith("/")) {
86+
uri = uri.substring(0, uri.lastIndexOf("/"));
87+
}
88+
String updatedUrl = uri;
89+
Optional<String> patternOp = METRIC_URI_PATTERNS_MAP.keySet().stream()
90+
.filter(pattern -> updatedUrl.matches(pattern + "$"))
91+
.findFirst();
92+
return Optional.ofNullable(METRIC_URI_PATTERNS_MAP.get(patternOp.orElse(EMPTY)));
93+
}
94+
95+
public static PrometheusMeterRegistry getMeterRegistry() {
96+
return METER_REGISTRY;
97+
}
98+
99+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.apache.atlas.service.metrics;
2+
3+
import org.apache.atlas.utils.AtlasPerfMetrics;
4+
5+
import java.io.IOException;
6+
import java.io.PrintWriter;
7+
8+
public interface MetricsRegistry {
9+
10+
void collect(String requestId, AtlasPerfMetrics metrics);
11+
12+
void scrape(PrintWriter writer) throws IOException;
13+
14+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package org.apache.atlas.service.metrics;
2+
3+
import io.micrometer.core.instrument.Meter;
4+
import io.micrometer.core.instrument.Metrics;
5+
import io.micrometer.core.instrument.Tags;
6+
import io.micrometer.core.instrument.binder.BaseUnits;
7+
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
8+
import io.micrometer.prometheus.PrometheusMeterRegistry;
9+
import org.apache.atlas.ApplicationProperties;
10+
import org.apache.atlas.AtlasException;
11+
import org.apache.atlas.utils.AtlasPerfMetrics;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
import org.springframework.stereotype.Component;
15+
16+
import javax.inject.Inject;
17+
import java.io.IOException;
18+
import java.io.PrintWriter;
19+
import java.time.Duration;
20+
21+
import static org.apache.atlas.service.metrics.MetricUtils.getMeterRegistry;
22+
23+
24+
@Component
25+
public class MetricsRegistryServiceImpl implements MetricsRegistry {
26+
27+
private static final Logger LOG = LoggerFactory.getLogger(MetricsRegistryServiceImpl.class);
28+
29+
private static final String NAME = "name";
30+
private static final String METHOD_DIST_SUMMARY = "method_dist_summary";
31+
private static final double[] PERCENTILES = {0.99};
32+
private static final int SEC_MILLIS_SCALE = 1;
33+
private static final String METHOD_LEVEL_METRICS_ENABLE = "atlas.metrics.method_level.enable";
34+
35+
private final DistributionStatisticConfig distributionStatisticConfig;
36+
37+
@Inject
38+
public MetricsRegistryServiceImpl() {
39+
this.distributionStatisticConfig = DistributionStatisticConfig.builder().percentilePrecision(2)
40+
.percentiles(PERCENTILES)
41+
.bufferLength(3)
42+
.percentilesHistogram(false)
43+
.minimumExpectedValue(1.0)
44+
.maximumExpectedValue(Double.MAX_VALUE)
45+
.expiry(Duration.ofMinutes(2)).build();
46+
}
47+
48+
@Override
49+
public void collect(String requestId, AtlasPerfMetrics metrics) {
50+
try {
51+
if (!ApplicationProperties.get().getBoolean(METHOD_LEVEL_METRICS_ENABLE, false)) {
52+
return;
53+
}
54+
} catch (AtlasException e) {
55+
LOG.error("Failed to read {} property from atlas config", METHOD_LEVEL_METRICS_ENABLE, e);
56+
return;
57+
}
58+
for (String name : metrics.getMetricsNames()) {
59+
AtlasPerfMetrics.Metric metric = metrics.getMetric(name);
60+
getMeterRegistry().newDistributionSummary(new Meter.Id(METHOD_DIST_SUMMARY,
61+
Tags.of(NAME, metric.getName()), BaseUnits.MILLISECONDS, METHOD_DIST_SUMMARY,
62+
Meter.Type.TIMER), distributionStatisticConfig, SEC_MILLIS_SCALE)
63+
.record(metric.getTotalTimeMSecs());
64+
}
65+
}
66+
67+
@Override
68+
public void scrape(PrintWriter writer) {
69+
Metrics.globalRegistry.getRegistries().forEach(r -> {
70+
try {
71+
((PrometheusMeterRegistry) r).scrape(writer);
72+
writer.flush();
73+
} catch (IOException e) {
74+
LOG.warn("Failed to write metrics while scraping", e);
75+
}finally {
76+
writer.close();
77+
}
78+
});
79+
}
80+
81+
}

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,7 @@
777777
<woodstox-core.version>5.0.3</woodstox-core.version>
778778
<zookeeper.version>3.4.6</zookeeper.version>
779779
<redis.client.version>3.20.1</redis.client.version>
780+
<micrometer.version>1.11.1</micrometer.version>
780781
</properties>
781782

782783
<modules>

server-api/src/main/java/org/apache/atlas/RequestContext.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.atlas.model.instance.*;
2222
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
2323
import org.apache.atlas.model.tasks.AtlasTask;
24+
import org.apache.atlas.service.metrics.MetricsRegistry;
2425
import org.apache.atlas.utils.AtlasPerfMetrics;
2526
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
2627
import org.apache.commons.collections.CollectionUtils;
@@ -92,6 +93,7 @@ public class RequestContext {
9293
private String traceId;
9394
private final Map<AtlasObjectId, Object> relationshipEndToVertexIdMap = new HashMap<>();
9495
private boolean allowDuplicateDisplayName;
96+
private MetricsRegistry metricsRegistry;
9597

9698
private RequestContext() {
9799
}
@@ -149,15 +151,17 @@ public void clearCache() {
149151
this.relationshipEndToVertexIdMap.clear();
150152
this.relationshipMutationMap.clear();
151153
this.currentTask = null;
152-
setTraceId(null);
153154

154155
this.isPoliciesBootstrappingInProgress = false;
155156

156157
if (metrics != null && !metrics.isEmpty()) {
157158
METRICS.debug(metrics.toString());
158-
159+
if (Objects.nonNull(this.metricsRegistry)){
160+
this.metricsRegistry.collect(traceId, metrics);
161+
}
159162
metrics.clear();
160163
}
164+
setTraceId(null);
161165

162166
if (this.entityGuidInRequest != null) {
163167
this.entityGuidInRequest.clear();
@@ -624,6 +628,10 @@ public boolean includeClassifications() {
624628
return this.includeClassifications;
625629
}
626630

631+
public void setMetricRegistry(MetricsRegistry metricsRegistry) {
632+
this.metricsRegistry = metricsRegistry;
633+
}
634+
627635
public class EntityGuidPair {
628636
private final Object entity;
629637
private final String guid;

webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.atlas.*;
2222
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
23+
import org.apache.atlas.service.metrics.MetricsRegistry;
2324
import org.apache.atlas.util.AtlasRepositoryConfiguration;
2425
import org.apache.atlas.web.util.DateTimeHelper;
2526
import org.apache.atlas.web.util.Servlets;
@@ -29,7 +30,9 @@
2930
import org.slf4j.LoggerFactory;
3031
import org.slf4j.MDC;
3132
import org.springframework.stereotype.Component;
33+
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
3234

35+
import javax.inject.Inject;
3336
import javax.servlet.Filter;
3437
import javax.servlet.FilterChain;
3538
import javax.servlet.FilterConfig;
@@ -59,13 +62,16 @@ public class AuditFilter implements Filter {
5962
private boolean deleteTypeOverrideEnabled = false;
6063
private boolean createShellEntityForNonExistingReference = false;
6164

65+
@Inject
66+
private MetricsRegistry metricsRegistry;
67+
6268
@Override
6369
public void init(FilterConfig filterConfig) throws ServletException {
6470
LOG.info("AuditFilter initialization started");
6571

6672
deleteTypeOverrideEnabled = REST_API_ENABLE_DELETE_TYPE_OVERRIDE.getBoolean();
6773
createShellEntityForNonExistingReference = REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
68-
74+
SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
6975
LOG.info("REST_API_ENABLE_DELETE_TYPE_OVERRIDE={}", deleteTypeOverrideEnabled);
7076
}
7177

@@ -95,6 +101,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
95101
requestContext.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
96102
requestContext.setForwardedAddresses(AtlasAuthorizationUtils.getForwardedAddressesFromRequest(httpRequest));
97103
requestContext.setSkipFailedEntities(skipFailedEntities);
104+
requestContext.setMetricRegistry(metricsRegistry);
98105
MDC.put(TRACE_ID, internalRequestId);
99106
MDC.put(X_ATLAN_REQUEST_ID, ofNullable(httpRequest.getHeader(X_ATLAN_REQUEST_ID)).orElse(EMPTY));
100107
if (StringUtils.isNotEmpty(deleteType)) {
@@ -120,8 +127,8 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
120127
httpResponse.setHeader(TRACE_ID, internalRequestId);
121128
httpResponse.setHeader(X_ATLAN_REQUEST_ID, MDC.get(X_ATLAN_REQUEST_ID));
122129
currentThread.setName(oldName);
123-
MDC.clear();
124130
RequestContext.clear();
131+
MDC.clear();
125132
}
126133
}
127134

0 commit comments

Comments
 (0)