Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRILL-7191 / DRILL-7026]: RM state blob persistence in Zookeeper and Integration of Distributed queue configuration with Planner #1762

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
*/
package org.apache.drill.exec.store.kafka;

import org.apache.drill.PlanTestBase;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.kafka.common.serialization.StringSerializer;

import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -32,10 +32,8 @@
@Category({KafkaStorageTest.class, SlowTest.class})
public class KafkaFilterPushdownTest extends KafkaTestBase {
private static final int NUM_PARTITIONS = 5;
private static final String expectedSubStr = " \"kafkaScanSpec\" : {\n" +
" \"topicName\" : \"drill-pushdown-topic\"\n" +
" },\n" +
" \"cost\"";
private static final String expectedPattern = "kafkaScanSpec.*\\n.*\"topicName\" : \"drill-pushdown-topic\"\\n(" +
".*\\n)?(.*\\n)?(.*\\n)?.*cost\"(.*\\n)(.*\\n).*outputRowCount\" : (%s.0)";

@BeforeClass
public static void setup() throws Exception {
Expand Down Expand Up @@ -63,7 +61,9 @@ public void testPushdownOnOffset() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);

runKafkaSQLVerifyCount(queryString, expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -79,7 +79,9 @@ public void testPushdownOnPartition() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);

runKafkaSQLVerifyCount(queryString, expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -95,7 +97,9 @@ public void testPushdownOnTimestamp() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -112,7 +116,9 @@ public void testPushdownUnorderedTimestamp() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowInPlan));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowInPlan)},
new String[]{});
}

/**
Expand All @@ -128,7 +134,9 @@ public void testPushdownWhenTimestampDoesNotExist() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -144,7 +152,9 @@ public void testPushdownWhenPartitionDoesNotExist() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -161,7 +171,9 @@ public void testPushdownForEmptyScanSpec() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -178,42 +190,54 @@ public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() throws E
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"equal" such that value < startOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"greater_than" such that value = endOffset-1
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"greater_than_or_equal" such that value = endOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"less_than" such that value = startOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"less_than_or_equal" such that value < startOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -230,21 +254,27 @@ public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() throws E
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9");

runKafkaSQLVerifyCount(queryString, expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"greater_than" such that value = endOffset-2
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});

//"greater_than_or_equal" such that value = endOffset-1
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9");

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -262,7 +292,9 @@ public void testPushdownWithOr() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -280,7 +312,9 @@ public void testPushdownWithOr1() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowInPlan));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowInPlan)},
new String[]{});
}

/**
Expand All @@ -299,7 +333,9 @@ public void testPushdownWithAndOrCombo() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCount)},
new String[]{});
}

/**
Expand All @@ -319,7 +355,9 @@ public void testPushdownWithAndOrCombo2() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3, predicate4);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCountInPlan)},
new String[]{});
}

/**
Expand All @@ -338,7 +376,9 @@ public void testPushdownTimestampWithNonMetaField() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCountInPlan)},
new String[]{});
}

/**
Expand All @@ -358,7 +398,9 @@ public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3);

runKafkaSQLVerifyCount(queryString,expectedRowCount);
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(expectedPattern, expectedRowCountInPlan)},
new String[]{});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ private ExecConstants() {
new OptionDescription("Indicates how long a query can wait in queue before the query fails. Range: 0-9223372036854775807"));

// New Smart RM boot time configs
public static final String RM_WAIT_THREAD_INTERVAL = "exec.rm.wait_thread_interval";
public static final String RM_QUERY_TAGS_KEY = "exec.rm.queryTags";
public static final StringValidator RM_QUERY_TAGS_VALIDATOR = new StringValidator(RM_QUERY_TAGS_KEY,
new OptionDescription("Allows user to set coma separated list of tags for all the queries submitted over a session"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
*/
package org.apache.drill.exec.coord;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.coord.store.TransientStoreConfig;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities
* as well as understand other node's existence and capabilities.
Expand Down Expand Up @@ -60,6 +61,10 @@ public abstract class ClusterCoordinator implements AutoCloseable {
*/
public abstract Collection<DrillbitEndpoint> getAvailableEndpoints();

public Map<String, DrillbitEndpoint> getAvailableEndpointsUUID() {
throw new UnsupportedOperationException("Only supported by Zookeeper Cluster Coordinator outside YARN");
}

/**
* Get a collection of ONLINE drillbit endpoints by excluding the drillbits
* that are in QUIESCENT state (drillbits that are shutting down). Primarily used by the planner
Expand All @@ -70,6 +75,10 @@ public abstract class ClusterCoordinator implements AutoCloseable {

public abstract Collection<DrillbitEndpoint> getOnlineEndPoints();

public Map<DrillbitEndpoint, String> getOnlineEndpointsUUID() {
throw new UnsupportedOperationException("Only supported by Zookeeper Cluster Coordinator outside YARN");
}

public abstract RegistrationHandle update(RegistrationHandle handle, State state);

public interface RegistrationHandle {
Expand All @@ -79,6 +88,8 @@ public interface RegistrationHandle {
*/
public abstract DrillbitEndpoint getEndPoint();

public abstract String getId();

public abstract void setEndPoint(DrillbitEndpoint endpoint);
}

Expand Down
Loading