Skip to content

Commit

Permalink
W-15793257: Optimize metrics query regex
Browse files Browse the repository at this point in the history
  • Loading branch information
peterzxu-crm committed May 24, 2024
1 parent 662dd22 commit 629962a
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.demandware.carbonj.service.db.index.QueryUtils.filter;
Expand Down Expand Up @@ -109,9 +110,11 @@ public class MetricIndexImpl implements MetricIndex, ApplicationListener<NameInd
private final CacheStatsReporter metricCacheStatsReporter;
private CacheStatsReporter metricIdCacheStatsReporter;
private final CacheStatsReporter queryCacheStatsReporter;
private final CacheStatsReporter queryPatternCacheStatsReporter;
private final LoadingCache<String, Metric> metricCache;
private LoadingCache<Long, Metric> metricIdCache;
private final LoadingCache<String, List<Metric>> queryCache;
private final LoadingCache<String, Pattern> queryPatternCache;

private final String metricsStoreConfigFile;
private final NameUtils nameUtils;
Expand Down Expand Up @@ -161,10 +164,13 @@ public MetricIndexImpl( MetricRegistry metricRegistry, String metricsStoreConfig
NameUtils nameUtils, StorageAggregationPolicySource aggrPolicySource,
int nameIndexQueryCacheMaxSize, int expireAfterWriteQueryCacheInSeconds,
boolean idCacheEnabled,
boolean longId) {
boolean longId,
int nameIndexQueryPatternCacheMaxSize, int expireAfterWriteQueryPatternCacheInSeconds) {
this(metricRegistry, metricsStoreConfigFile, nameIndex, idIndex, dbMetrics, nameIndexMaxCacheSize,
expireAfterAccessInMinutes, nameUtils, aggrPolicySource, nameIndexQueryCacheMaxSize,
expireAfterWriteQueryCacheInSeconds, idCacheEnabled, longId, new NamespaceCounter(metricRegistry, 7200), false, true, 100);
expireAfterWriteQueryCacheInSeconds, idCacheEnabled, longId,
new NamespaceCounter(metricRegistry, 7200), false, true, 100,
nameIndexQueryPatternCacheMaxSize, expireAfterWriteQueryPatternCacheInSeconds);
}

public MetricIndexImpl( MetricRegistry metricRegistry, String metricsStoreConfigFile,
Expand All @@ -177,7 +183,8 @@ public MetricIndexImpl( MetricRegistry metricRegistry, String metricsStoreConfig
NamespaceCounter namespaceCounter,
boolean rocksdbReadonly,
boolean syncSecondaryDb,
int nameIndexKeyQueueSizeLimit) {
int nameIndexKeyQueueSizeLimit,
int nameIndexQueryPatternCacheMaxSize, int expireAfterWriteQueryPatternCacheInSeconds) {
this.metricRegistry = metricRegistry;
this.metricsStoreConfigFile = metricsStoreConfigFile;
this.nameUtils = Preconditions.checkNotNull(nameUtils);
Expand Down Expand Up @@ -263,6 +270,22 @@ public List<Metric> load(String pattern) {
});
this.queryCacheStatsReporter = new CacheStatsReporter( metricRegistry, "MetricsQueryCache", nameIndexQueryCacheMaxSize, queryCache );

this.queryPatternCache =
CacheBuilder.newBuilder()
.initialCapacity(nameIndexQueryPatternCacheMaxSize)
.maximumSize(nameIndexQueryPatternCacheMaxSize)
.recordStats()
.concurrencyLevel(8)
.expireAfterWrite(expireAfterWriteQueryPatternCacheInSeconds, TimeUnit.SECONDS)
.build(new CacheLoader<>() {
@SuppressWarnings("NullableProblems")
@Override
public Pattern load(String pattern) {
return Pattern.compile(pattern);
}
});
this.queryPatternCacheStatsReporter = new CacheStatsReporter( metricRegistry, "MetricsQueryPatternCache", nameIndexQueryPatternCacheMaxSize, queryPatternCache );

this.aggrPolicySource = Preconditions.checkNotNull( aggrPolicySource );

if (syncSecondaryDb) {
Expand Down Expand Up @@ -417,6 +440,7 @@ public void dumpStats()
metricIdCacheStatsReporter.dumpStats();
}
queryCacheStatsReporter.dumpStats();
queryPatternCacheStatsReporter.dumpStats();
dumpDbPropertyStats(nameIndexStorePropertyMetricMap, nameIndex);
dumpDbPropertyStats(idIndexStorePropertyMetricMap, idIndex);
}
Expand Down Expand Up @@ -544,8 +568,7 @@ public List<Metric> deleteMetric( String name, boolean recursive, boolean testRu
}

@Override
public DeleteAPIResult deleteAPI(String name, boolean delete, Set<String> exclude)
{
public DeleteAPIResult deleteAPI(String name, boolean delete, Set<String> exclude) {
List<String> metricNames = new ArrayList<>();
// id starting with **. is considered as segment delete. Ex: **.order.count
if( name.startsWith("**.") )
Expand All @@ -554,8 +577,12 @@ public DeleteAPIResult deleteAPI(String name, boolean delete, Set<String> exclud
}
else
{
metricNames.addAll( findMetrics( rootKey, 0, splitQuery( name ), false, Integer.MAX_VALUE, false )
.stream().map(m -> m.name ).toList());
try {
metricNames.addAll( findMetrics( rootKey, 0, splitQuery( name ), false, Integer.MAX_VALUE, false )
.stream().map(m -> m.name ).toList());
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

DeleteResult deleteResult = new DeleteResult();
Expand Down Expand Up @@ -672,9 +699,13 @@ public List<String> getChildNames( String metricName )
}

@Override
public List<Metric> findMetrics( String pattern )
{
return findMetrics(rootKey, 0, splitQuery( pattern ), false, Integer.MAX_VALUE, false );
public List<Metric> findMetrics( String pattern ) {
try {
return findMetrics(rootKey, 0, splitQuery( pattern ), false, Integer.MAX_VALUE, false );
} catch (ExecutionException e) {
log.error("Failed to find metrics for pattern {} - {}", pattern, e.getMessage());
return new ArrayList<>();
}
}


Expand Down Expand Up @@ -707,8 +738,7 @@ public List<Metric> findMetrics(String pattern, boolean leafOnly, boolean useThr
}
}

private List<Metric> findMetricsNoCache(String pattern, boolean leafOnly, boolean useThreshold, boolean skipInvalid)
{
private List<Metric> findMetricsNoCache(String pattern, boolean leafOnly, boolean useThreshold, boolean skipInvalid) {
List<Metric> metrics;
int threshold = enforceMaxSeriesPerRequest && useThreshold ? maxSeriesPerRequest : Integer.MAX_VALUE;

Expand All @@ -724,6 +754,9 @@ private List<Metric> findMetricsNoCache(String pattern, boolean leafOnly, boolea
"Pattern [%s], configured threshold value: %s", pattern, threshold );
log.warn(msg);
throw new TooManyMetricsFoundException(e.getLimit(), msg);
} catch (ExecutionException e) {
log.error("Failed to query metrics for pattern {} - {}", pattern, e.getMessage());
metrics = new ArrayList<>();
}

// can happen if enforceMaxSeriesPerRequest is false
Expand All @@ -737,16 +770,15 @@ private List<Metric> findMetricsNoCache(String pattern, boolean leafOnly, boolea
return metrics;
}

private List<Metric> findMetrics( String parentKey, int queryPartIdx, String[] queryParts, boolean leafOnly, int max,
boolean excludeInvalid )
{
private List<Metric> findMetrics( String parentKey, int queryPartIdx, QueryPart[] queryParts, boolean leafOnly, int max,
boolean excludeInvalid ) throws ExecutionException {
Metric parent = getMetric( parentKey );
if ( parent == null )
{
DatabaseMetrics.deletedMetricAccessError.mark();
return Collections.emptyList();
}
List<String> matches = filter( parent.children(), queryParts[queryPartIdx] );
List<String> matches = filter( parent.children(), queryParts[queryPartIdx], queryPatternCache );
boolean isLastQuerySegment = queryPartIdx + 1 >= queryParts.length;
if ( isLastQuerySegment )
{
Expand Down Expand Up @@ -797,15 +829,15 @@ private List<Metric> findMetrics( String parentKey, int queryPartIdx, String[] q

}

private List<String> findMetricWithSegment(String parentKey, int queryPartIdx, String[] queryParts)
private List<String> findMetricWithSegment(String parentKey, int queryPartIdx, QueryPart[] queryParts)
{
Metric parent = getMetric( parentKey );
List<String> matched = new ArrayList<>();
for ( String childName: parent.children() )
{
String childKey = toMetricName( parentKey, childName );
int qIndex = queryPartIdx;
if(childName.equals(queryParts[queryPartIdx]))
if(childName.equals(queryParts[queryPartIdx].getQuery()))
{
boolean isLastSegment = queryPartIdx + 1 >= queryParts.length;
if( isLastSegment )
Expand All @@ -821,14 +853,14 @@ private List<String> findMetricWithSegment(String parentKey, int queryPartIdx, S
}
else
{
qIndex = queryPartIdx != 0 ? (childName.equals(queryParts[0])? 1 : 0) : queryPartIdx;
qIndex = queryPartIdx != 0 ? (childName.equals(queryParts[0].getQuery())? 1 : 0) : queryPartIdx;
}
matched.addAll(findMetricWithSegment(childKey, qIndex, queryParts));
}
return matched;
}

private List<String> findAllMetricsWithSegment( String parentKey, int queryPartIdx, String[] queryParts )
private List<String> findAllMetricsWithSegment( String parentKey, int queryPartIdx, QueryPart[] queryParts )
{
return new ArrayList<>(findMetricWithSegment(parentKey, queryPartIdx, queryParts));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (c) 2018, salesforce.com, inc.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
* For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/
package com.demandware.carbonj.service.db.index;

public class QueryPart {

private final String query;

private final boolean isRegEx;

QueryPart(String query, boolean isRegEx) {
this.query = query;
this.isRegEx = isRegEx;
}

public String getQuery() {
return query;
}

public boolean isRegEx() {
return isRegEx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@
*/
package com.demandware.carbonj.service.db.index;

import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

class QueryUtils
{

private static final Logger log = LoggerFactory.getLogger( QueryUtils.class );

public static String patternToRegEx(String p)
{
String r = p.replaceAll("\\*", ".*");
Expand All @@ -22,9 +31,18 @@ public static String patternToRegEx(String p)
return "^" + r + "$";
}

public static List<String> filter( List<String> entries, String pattern)
{
return entries.stream().filter( s -> match( s, pattern ) ).collect( Collectors.toList());
public static List<String> filter(List<String> entries, QueryPart pattern, LoadingCache<String, Pattern> queryPatternCache) {
List<String> matched = new ArrayList<>();
for (String entry : entries) {
try {
if (match(entry, pattern, queryPatternCache)) {
matched.add(entry);
}
} catch (ExecutionException e) {
log.error("Failed to check query pattern {} for query {} - {}", pattern.getQuery(), entry, e.getMessage());
}
}
return matched;
}

/*
Expand All @@ -46,17 +64,20 @@ public static boolean isPattern(String s)
return s.indexOf( '*' ) > -1 || s.indexOf( '[' ) > -1 || s.indexOf( '{' ) > -1;
}

public static String[] splitQuery(String query)
public static QueryPart[] splitQuery(String query)
{
String[] parts = query.split( "\\." );
QueryPart[] queryParts = new QueryPart[parts.length];
for(int i = 0; i < parts.length; i++)
{
if( isPattern(parts[i]) )
{
parts[i] = patternToRegEx(parts[i]);
queryParts[i] = new QueryPart(patternToRegEx(parts[i]), true);
} else {
queryParts[i] = new QueryPart(parts[i], false);
}
}
return parts;
return queryParts;
}


Expand All @@ -80,10 +101,16 @@ public static String[] splitQuery(String query)
// return matching



public static boolean match(String namePart, String pattern)
{
return namePart.matches( pattern );
public static boolean match(String namePart, QueryPart pattern, LoadingCache<String, Pattern> queryPatternCache) throws ExecutionException {
if (pattern.isRegEx()) {
if (queryPatternCache != null) {
Matcher matcher = queryPatternCache.get(pattern.getQuery()).matcher(namePart);
return matcher.matches();
} else {
return namePart.matches(pattern.getQuery());

Check failure

Code scanning / CodeQL

Regular expression injection High

This regular expression is constructed from a
user-provided value
.
This regular expression is constructed from a
user-provided value
.
This regular expression is constructed from a
user-provided value
.
}
} else {
return namePart.equals(pattern.getQuery());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ public class cfgMetricIndex
@Value( "${metrics.cacheExpireAfterAccessInMinutes:120}" )
private int metricCacheExpireAfterAccessInMinutes = 120;

@Value( "${metrics.tasks.emptyQueuePauseMillis:10000}" )
private int emptyQueuePauseMillis = 1000;

@Value( "${metrics.tasks.queueReadBatchSize:10000}" )
private int queueReadBatchSize = 10000;

@Value( "${storage.aggregation.rules:config/storage-aggregation.conf}" )
private String storageAggregationRulesConfigFile = "config/storage-aggregation.conf";

Expand All @@ -64,6 +58,12 @@ public class cfgMetricIndex
@Value( "${metrics.store.expireAfterWriteQueryCacheInSeconds:120}" )
private int expireAfterWriteQueryCacheInSeconds;

@Value( "${metrics.store.queryPatternCacheMaxSize:10000}" )
private int nameIndexQueryPatternCacheMaxSize;

@Value( "${metrics.store.expireAfterWriteQueryPatternCacheInSeconds:120}" )
private int expireAfterWriteQueryPatternCacheInSeconds;

@Value( "${metrics.store.enableIdCache:false}" )
private boolean enableIdCache;

Expand Down Expand Up @@ -115,10 +115,10 @@ public StorageAggregationPolicySource storageAggregationPolicySource(ScheduledEx

File rulesFile = locateConfigFile( serviceDir, storageAggregationRulesConfigFile );
StorageAggregationRulesLoader rulesLoader = new StorageAggregationRulesLoader( rulesFile );
s.scheduleWithFixedDelay( ( ) -> rulesLoader.reload(), 60, 45, TimeUnit.SECONDS );
s.scheduleWithFixedDelay(rulesLoader::reload, 60, 45, TimeUnit.SECONDS );

StorageAggregationPolicySource policySource = new StorageAggregationPolicySource( rulesLoader );
s.scheduleWithFixedDelay( () -> policySource.cleanup(), 10, 120, TimeUnit.MINUTES );
s.scheduleWithFixedDelay(policySource::cleanup, 10, 120, TimeUnit.MINUTES );
return policySource;
}

Expand All @@ -133,7 +133,8 @@ MetricIndex metricIndex(@Qualifier( "metricNameIndexStore" ) IndexStore<String,
MetricIndexImpl metricIndex = new MetricIndexImpl(metricRegistry, metricStoreConfigFile, nameIndex, idIndex, dbMetrics,
nameIndexMaxCacheSize, metricCacheExpireAfterAccessInMinutes, nameUtils, policySource,
nameIndexQueryCacheMaxSize, expireAfterWriteQueryCacheInSeconds, enableIdCache, longId,
namespaceCounter, rocksdbReadonly, syncSecondaryDb, nameIndexKeyQueueSizeLimit);
namespaceCounter, rocksdbReadonly, syncSecondaryDb, nameIndexKeyQueueSizeLimit,
nameIndexQueryPatternCacheMaxSize, expireAfterWriteQueryPatternCacheInSeconds);
s.scheduleWithFixedDelay(metricIndex::reload, 300, 300, TimeUnit.SECONDS);
return metricIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.model.*;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -21,8 +28,6 @@
public class DynamoDbCheckPointMgr implements CheckPointMgr<Date> {
private static final Logger log = LoggerFactory.getLogger(DynamoDbCheckPointMgr.class);

private static final String VERSION = "1.0";

private final String tableName;
private final int defaultOffsetMins;

Expand All @@ -47,7 +52,7 @@ private void createTable(String tableName, int provisionedThroughput) throws Exc
.withKeySchema(
new KeySchemaElement("checkPointType", KeyType.HASH))
.withProvisionedThroughput(
new ProvisionedThroughput(new Long(provisionedThroughput), new Long(provisionedThroughput)))
new ProvisionedThroughput((long)provisionedThroughput, (long)provisionedThroughput))
.withTableName(tableName);
log.info("Issuing CreateTable request for " + tableName);
Table newlyCreatedTable = dynamoDB.createTable(request);
Expand Down
Loading

0 comments on commit 629962a

Please sign in to comment.