Skip to content

Commit

Permalink
MINOR: Add integration test for plugin aliases (#16621)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
C0urante authored Jul 19, 2024
1 parent f09ead1 commit 208bb1f
Showing 1 changed file with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Filter;
import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
Expand Down Expand Up @@ -80,9 +82,12 @@
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_ENFORCE_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
Expand Down Expand Up @@ -1375,6 +1380,57 @@ public void testRuntimePropertyReconfiguration() throws Exception {
);
}

@Test
public void testPluginAliases() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();

// Create a topic; not strictly necessary but prevents log spam when we start a source connector later
final String topic = "kafka17150";
connect.kafka().createTopic(topic, 1);

Map<String, String> baseConnectorConfig = new HashMap<>();
// General connector properties
baseConnectorConfig.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS));
// Aliased converter classes
baseConnectorConfig.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getSimpleName());
baseConnectorConfig.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getSimpleName());
baseConnectorConfig.put(HEADER_CONVERTER_CLASS_CONFIG, StringConverter.class.getSimpleName());
// Aliased SMT and predicate classes
baseConnectorConfig.put(TRANSFORMS_CONFIG, "filter");
baseConnectorConfig.put(TRANSFORMS_CONFIG + ".filter.type", Filter.class.getSimpleName());
baseConnectorConfig.put(TRANSFORMS_CONFIG + ".filter.predicate", "tombstone");
baseConnectorConfig.put(PREDICATES_CONFIG, "tombstone");
baseConnectorConfig.put(PREDICATES_CONFIG + ".tombstone.type", RecordIsTombstone.class.getSimpleName());

// Test a source connector
final String sourceConnectorName = "plugins-alias-test-source";
Map<String, String> sourceConnectorConfig = new HashMap<>(baseConnectorConfig);
// Aliased source connector class
sourceConnectorConfig.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
// Connector-specific properties
sourceConnectorConfig.put(TOPIC_CONFIG, topic);
sourceConnectorConfig.put("throughput", "10");
sourceConnectorConfig.put("messages.per.poll", String.valueOf(MESSAGES_PER_POLL));
// Create the connector and ensure it and its tasks can start
connect.configureConnector(sourceConnectorName, sourceConnectorConfig);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(sourceConnectorName, NUM_TASKS, "Connector and tasks did not start in time");
connect.deleteConnector(sourceConnectorName);

// Test a sink connector
final String sinkConnectorName = "plugins-alias-test-sink";
Map<String, String> sinkConnectorConfig = new HashMap<>(baseConnectorConfig);
// Aliased sink connector class
sinkConnectorConfig.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName());
// Connector-specific properties
sinkConnectorConfig.put(TOPICS_CONFIG, topic);
// Create the connector and ensure it and its tasks can start
connect.configureConnector(sinkConnectorName, sinkConnectorConfig);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(sinkConnectorName, NUM_TASKS, "Connector and tasks did not start in time");
connect.deleteConnector(sinkConnectorName);
}

private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup props for the source connector
Map<String, String> props = new HashMap<>();
Expand Down

0 comments on commit 208bb1f

Please sign in to comment.