Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/querying/query-context-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Unless otherwise noted, the following parameters apply to all query types, and t
|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.|
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly.|
|`queryableHistoricalTiers`|`null`|Set of Historical tier names that may be queried. When set, the Broker only queries Historical servers whose `druid.server.tier` is in this set. Segments without a replica on one of the listed tiers are skipped.|
|`realtimeSegmentsMode` |`include`| Controls whether realtime segments are queried. `include` queries all segments, including realtime. `exclude` skips realtime segments. `exclusive` queries only realtime segments. |
|`realtimeSegmentsOnly` |`false`| **Deprecated.** Use `realtimeSegmentsMode=exclusive` instead. When set to `true`, this is equivalent to `realtimeSegmentsMode=exclusive`. When set to `false`, this is equivalent to `realtimeSegmentsMode=include`.|

Expand Down Expand Up @@ -140,4 +141,3 @@ For more information, see the following topics:
- [Set query context](./query-context.md) to learn how to configure query context parameters.
- [SQL query context](sql-query-context.md) for query context parameters specific to Druid SQL.
- [SQL-based ingestion reference](../multi-stage-query/reference/#context-parameters) for context parameters used in SQL-based ingestion (MSQ).

20 changes: 20 additions & 0 deletions processing/src/main/java/org/apache/druid/query/QueryContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;

/**
Expand Down Expand Up @@ -237,6 +238,19 @@ public <E extends Enum<E>> E getEnum(String key, Class<E> clazz, E defaultValue)
return QueryContexts.getAsEnum(key, get(key), clazz, defaultValue);
}

/**
* Return a value as a {@link Set} of strings, returning {@code null} if the
* context value is not set. The context value may be a JSON array-like
* {@link java.util.Collection} or a single string.
*
* @throws BadQueryContextException for an invalid value
*/
@Nullable
public Set<String> getStringSet(final String key)
{
return QueryContexts.getAsStringSet(key, get(key));
}

public Granularity getGranularity(String key, ObjectMapper jsonMapper)
{
final String granularityString = getString(key);
Expand Down Expand Up @@ -621,6 +635,12 @@ public CloneQueryMode getCloneQueryMode()
);
}

@Nullable
public Set<String> getQueryableHistoricalTiers()
{
return getStringSet(QueryContexts.QUERYABLE_HISTORICAL_TIERS);
}

public boolean getEnableRewriteJoinToFilter()
{
return getBoolean(
Expand Down
29 changes: 29 additions & 0 deletions processing/src/main/java/org/apache/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -67,6 +71,7 @@ public class QueryContexts
public static final String MAX_NUMERIC_IN_FILTERS = "maxNumericInFilters";
public static final String CURSOR_AUTO_ARRANGE_FILTERS = "cursorAutoArrangeFilters";
public static final String CLONE_QUERY_MODE = "cloneQueryMode";
public static final String QUERYABLE_HISTORICAL_TIERS = "queryableHistoricalTiers";
/**
* This flag controls whether {@link AggregatorFactory#optimizeForSegment(PerSegmentQueryOptimizationContext)}
* is used. It is undocumented because its main purpose is to help developers debug issues with the optimizations.
Expand Down Expand Up @@ -325,6 +330,30 @@ public static String getAsString(
throw badTypeException(key, "a String", value);
}

@Nullable
public static Set<String> getAsStringSet(
final String key,
final Object value
)
{
if (value == null) {
return null;
} else if (value instanceof String) {
return Collections.singleton((String) value);
} else if (value instanceof Collection) {
final Set<String> values = new LinkedHashSet<>();
for (final Object element : (Collection<?>) value) {
if (!(element instanceof String)) {
throw badValueException(key, "a collection of Strings", value);
}
values.add((String) element);
}
return Collections.unmodifiableSet(values);
}

throw badTypeException(key, "a String or collection of Strings", value);
}

@Nullable
public static Boolean getAsBoolean(
final String key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import nl.jqno.equalsverifier.EqualsVerifier;
import nl.jqno.equalsverifier.Warning;
Expand All @@ -46,6 +48,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -121,6 +124,37 @@ public void testGetString()
assertThrows(BadQueryContextException.class, () -> context.getString("key2"));
}

@Test
public void testGetStringSet()
{
final QueryContext context = QueryContext.of(
ImmutableMap.of(
"key1", ImmutableList.of("hot", "cold", "hot"),
"key2", "hot",
"key3", ImmutableList.of("hot", 1),
"key4", 1
)
);

assertEquals(ImmutableSet.of("hot", "cold"), context.getStringSet("key1"));
assertEquals(Collections.singleton("hot"), context.getStringSet("key2"));
assertNull(context.getStringSet("non-exist"));

assertThrows(BadQueryContextException.class, () -> context.getStringSet("key3"));
assertThrows(BadQueryContextException.class, () -> context.getStringSet("key4"));
}

@Test
public void testGetQueryableHistoricalTiers()
{
final QueryContext context = QueryContext.of(
ImmutableMap.of(QueryContexts.QUERYABLE_HISTORICAL_TIERS, ImmutableList.of("hot", "cold"))
);

final Set<String> queryableHistoricalTiers = context.getQueryableHistoricalTiers();
assertEquals(ImmutableSet.of("hot", "cold"), queryableHistoricalTiers);
}

@Test
public void testGetBoolean()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,10 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
final SegmentPruner segmentPruner = ev.getSegmentPruner();

RealtimeSegmentsMode realtimeSegmentsMode = query.context().getRealtimeSegmentsMode();
final QueryContext queryContext = query.context();
final RealtimeSegmentsMode realtimeSegmentsMode = queryContext.getRealtimeSegmentsMode();
final CloneQueryMode cloneQueryMode = queryContext.getCloneQueryMode();
final Set<String> queryableHistoricalTiers = queryContext.getQueryableHistoricalTiers();
// Filter unneeded chunks based on partition dimension
for (TimelineObjectHolder<String, ServerSelector> holder : serversLookup) {
final Collection<PartitionChunk<ServerSelector>> filteredChunks;
Expand All @@ -458,7 +461,7 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
filteredChunks = Sets.newLinkedHashSet(holder.getObject());
}
for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
ServerSelector server = chunk.getObject();
final ServerSelector server = chunk.getObject();
switch (realtimeSegmentsMode) {
case EXCLUSIVE:
if (!server.isRealtimeSegment()) {
Expand All @@ -473,6 +476,10 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
case INCLUDE:
break;
}
if (queryableHistoricalTiers != null
&& !server.hasQueryableServer(queryableHistoricalTiers, cloneQueryMode)) {
continue;
}
final SegmentDescriptor segment = new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
Expand Down Expand Up @@ -604,10 +611,15 @@ private SortedMap<DruidServer, List<SegmentDescriptor>> groupSegmentsByServer(
CloneQueryMode cloneQueryMode
)
{
final Set<String> queryableHistoricalTiers = query.context().getQueryableHistoricalTiers();
final SortedMap<DruidServer, List<SegmentDescriptor>> serverSegments = new TreeMap<>();
for (SegmentServerSelector segmentServer : segments) {
final QueryableDruidServer queryableDruidServer = segmentServer.getServer()
.pick(query, cloneQueryMode);
.pick(
query,
cloneQueryMode,
queryableHistoricalTiers
);

if (queryableDruidServer == null) {
log.makeAlert(
Expand Down Expand Up @@ -823,10 +835,15 @@ String computeResultLevelCachingEtag(
@Nullable byte[] queryCacheKey
)
{
Hasher hasher = Hashing.sha1().newHasher();
final Hasher hasher = Hashing.sha256().newHasher();
final Set<String> queryableHistoricalTiers = query.context().getQueryableHistoricalTiers();
boolean hasOnlyHistoricalSegments = true;
for (SegmentServerSelector p : segments) {
QueryableDruidServer queryableServer = p.getServer().pick(query, cloneQueryMode);
final QueryableDruidServer queryableServer = p.getServer().pick(
query,
cloneQueryMode,
queryableHistoricalTiers
);
if (queryableServer == null || !queryableServer.getServer().isSegmentReplicationTarget()) {
hasOnlyHistoricalSegments = false;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.client.QueryableDruidServer;
import org.apache.druid.query.CloneQueryMode;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -190,16 +191,123 @@ public List<DruidServerMetadata> getAllServers(CloneQueryMode cloneQueryMode)
}

@Nullable
public <T> QueryableDruidServer pick(@Nullable Query<T> query, CloneQueryMode cloneQueryMode)
public <T> QueryableDruidServer pick(@Nullable final Query<T> query, final CloneQueryMode cloneQueryMode)
{
return pick(query, cloneQueryMode, getQueryableHistoricalTiers(query));
}

@Nullable
public <T> QueryableDruidServer pick(
@Nullable final Query<T> query,
final CloneQueryMode cloneQueryMode,
@Nullable final Set<String> queryableHistoricalTiers
)
{
synchronized (this) {
if (!historicalServers.isEmpty()) {
return historicalTierStrategy.pick(query, filter.getQueryableServers(historicalServers, cloneQueryMode), segment.get());
if (queryableHistoricalTiers != null && queryableHistoricalTiers.isEmpty()) {
return null;
}
if (!historicalServers.isEmpty() || queryableHistoricalTiers != null) {
final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> queryableHistoricalServers =
getQueryableHistoricalServers(
filter.getQueryableServers(historicalServers, cloneQueryMode),
queryableHistoricalTiers
);
return historicalTierStrategy.pick(query, queryableHistoricalServers, segment.get());
}
return realtimeTierStrategy.pick(query, realtimeServers, segment.get());
}
}

public <T> boolean hasQueryableServer(@Nullable final Query<T> query, final CloneQueryMode cloneQueryMode)
{
return hasQueryableServer(getQueryableHistoricalTiers(query), cloneQueryMode);
}

public boolean hasQueryableServer(
@Nullable final Set<String> queryableHistoricalTiers,
final CloneQueryMode cloneQueryMode
)
{
synchronized (this) {
if (queryableHistoricalTiers != null) {
return !queryableHistoricalTiers.isEmpty()
&& !historicalServers.isEmpty()
&& hasQueryableHistoricalServer(
filter.getQueryableServers(historicalServers, cloneQueryMode),
queryableHistoricalTiers
);
}
if (!historicalServers.isEmpty()) {
return hasServers(filter.getQueryableServers(historicalServers, cloneQueryMode));
}
return hasServers(realtimeServers);
}
}

@Nullable
private static <T> Set<String> getQueryableHistoricalTiers(@Nullable final Query<T> query)
{
if (query == null) {
return null;
}

final QueryContext queryContext = query.context();
return queryContext == null ? null : queryContext.getQueryableHistoricalTiers();
}

private Int2ObjectRBTreeMap<Set<QueryableDruidServer>> getQueryableHistoricalServers(
final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> queryableServers,
@Nullable final Set<String> queryableHistoricalTiers
)
{
if (queryableHistoricalTiers == null) {
return queryableServers;
}

final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> filteredServers =
new Int2ObjectRBTreeMap<>(historicalTierStrategy.getComparator());
for (final int priority : queryableServers.keySet()) {
final Set<QueryableDruidServer> priorityServers = new HashSet<>();
for (final QueryableDruidServer server : queryableServers.get(priority)) {
if (queryableHistoricalTiers.contains(server.getServer().getTier())) {
priorityServers.add(server);
}
}
if (!priorityServers.isEmpty()) {
filteredServers.put(priority, priorityServers);
}
}

return filteredServers;
}

private static boolean hasQueryableHistoricalServer(
final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> queryableServers,
final Set<String> queryableHistoricalTiers
)
{
for (final Set<QueryableDruidServer> priorityServers : queryableServers.values()) {
for (final QueryableDruidServer server : priorityServers) {
if (queryableHistoricalTiers.contains(server.getServer().getTier())) {
return true;
}
}
}

return false;
}

private static boolean hasServers(final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> servers)
{
for (final Set<QueryableDruidServer> priorityServers : servers.values()) {
if (!priorityServers.isEmpty()) {
return true;
}
}
return false;
}

@Override
public boolean overshadows(ServerSelector other)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private SegmentServerSelector makeServerSelector(boolean isHistorical, int parti
SegmentId segmentId = SegmentId.dummy("data-source", partitionNumber);
DataSegment segment = DataSegment.builder(segmentId).shardSpec(new NumberedShardSpec(partitionNumber, 10)).build();
expect(server.isSegmentReplicationTarget()).andReturn(isHistorical).anyTimes();
expect(serverSelector.pick(query, CloneQueryMode.EXCLUDECLONES)).andReturn(queryableDruidServer).anyTimes();
expect(serverSelector.pick(query, CloneQueryMode.EXCLUDECLONES, null)).andReturn(queryableDruidServer).anyTimes();
expect(queryableDruidServer.getServer()).andReturn(server).anyTimes();
expect(serverSelector.getSegment()).andReturn(segment).anyTimes();
replay(serverSelector, queryableDruidServer, server);
Expand Down
Loading
Loading