Skip to content

Commit 34c64f9

Browse files
Source connector parallelization (#17)
1 parent 4bae458 commit 34c64f9

File tree

17 files changed

+222
-38
lines changed

17 files changed

+222
-38
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## [1.1.0] - 2020-12-11
8+
### Added
9+
- Parallelization for source connector based on channels/patterns
10+
11+
### Removed
12+
- Default configuration for Kafka topic
13+
714
## [1.0.4] - 2020-11-29
815
### Added
916
- Added support for sinking arbitrary Redis commands, primarily for use with Redis modules

docs/connectors/SINK.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,9 @@ Keys are ignored.
424424
```
425425

426426
## Configuration
427+
### Parallelization
428+
Splitting the workload between multiple tasks is possible via the configuration property `tasks.max`. The configured number will exactly determine the number of tasks that are created.
429+
427430
### Connector Properties
428431
| Name | Type | Default | Importance | Description |
429432
| ----------------------- | ------- | ------- | ---------- | ------------------------------------------------------- |

docs/connectors/SOURCE.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,13 @@ In the case of subscribing to Redis keyspace notifications, it may be useful to
8282
The plugin can be configured to use an alternative partitioning strategy if desired. Set the configuration property `connector.client.config.override.policy` to value `All` on the Kafka Connect worker (the overall Kafka Connect application that runs plugins). This will allow the override of the internal Kafka producer and consumer configurations. To override the partitioner for an individual connector plugin, add the configuration property `producer.override.partitioner.class` to the connector plugin with a value that points to a class implementing the [Partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java) interface, e.g. `org.apache.kafka.clients.producer.internals.DefaultPartitioner`.
8383

8484
## Parallelization
85-
Splitting the workload between multiple tasks via the configuration property `max.tasks` is not supported at this time. Support for this will be added in the future.
85+
Splitting the workload between multiple tasks is possible via the configuration property `tasks.max`. The connector splits the work based on the number of configured channels/patterns. If the max tasks configuration exceeds the number of channels/patterns, the number of channels/patterns will be used instead as the maximum.
8686

8787
## Configuration
8888
### Connector Properties
8989
| Name | Type | Default | Importance | Description |
9090
| --------------------------------- | ------- | -------------- | ---------- | ------------------------------------------------------- |
91-
| `topic` | string | `redis.events` | High | Topic to write to. |
91+
| `topic` | string | | High | Topic to write to. |
9292
| `redis.uri` | string | | High | Redis connection information provided via a URI string. |
9393
| `redis.cluster.enabled` | boolean | false | High | Target Redis is running as a cluster. |
9494
| `redis.channels` | string | | High | Redis channels to subscribe to separated by commas. |

docs/demo/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ docker build -t jaredpetersen/redis:latest .
2727

2828
Next, we'll need to build a docker image for Kafka Connect Redis. Navigate to `demo/docker/kafka-connect-redis` and run the following commands:
2929
```bash
30-
curl -O https://oss.sonatype.org/service/local/repositories/releases/content/io/github/jaredpetersen/kafka-connect-redis/1.0.4/kafka-connect-redis-1.0.4.jar
30+
curl -O https://repo1.maven.org/maven2/io/github/jaredpetersen/kafka-connect-redis/1.1.0/kafka-connect-redis-1.1.0.jar
3131
docker build -t jaredpetersen/kafka-connect-redis:latest .
3232
```
3333

docs/demo/SINK.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ curl --request POST \
1717
"key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
1818
"value.converter": "io.confluent.connect.avro.AvroConverter",
1919
"value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
20-
"tasks.max": "1",
20+
"tasks.max": "3",
2121
"topics": "redis.commands.set,redis.commands.expire,redis.commands.expireat,redis.commands.pexpire,redis.commands.sadd,redis.commands.geoadd,redis.commands.arbitrary",
2222
"redis.uri": "redis://IEPfIr0eLF7UsfwrIlzy80yUaBG258j9@redis-cluster",
2323
"redis.cluster.enabled": true

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
<groupId>io.github.jaredpetersen</groupId>
77
<artifactId>kafka-connect-redis</artifactId>
8-
<version>1.0.4</version>
8+
<version>1.1.0</version>
99
<packaging>jar</packaging>
1010

1111
<name>kafka-connect-redis</name>

src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/RedisSinkConnector.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66
import java.util.List;
77
import java.util.Map;
88
import org.apache.kafka.common.config.ConfigDef;
9+
import org.apache.kafka.common.config.ConfigException;
910
import org.apache.kafka.connect.connector.Task;
11+
import org.apache.kafka.connect.errors.ConnectException;
1012
import org.apache.kafka.connect.sink.SinkConnector;
1113

1214
/**
1315
* Entry point for Kafka Connect Redis Sink.
1416
*/
1517
public class RedisSinkConnector extends SinkConnector {
16-
private Map<String, String> config;
18+
private RedisSinkConfig config;
1719

1820
@Override
1921
public String version() {
@@ -22,7 +24,12 @@ public String version() {
2224

2325
@Override
2426
public void start(final Map<String, String> props) {
25-
this.config = props;
27+
try {
28+
this.config = new RedisSinkConfig(props);
29+
}
30+
catch (ConfigException configException) {
31+
throw new ConnectException("connector configuration error");
32+
}
2633
}
2734

2835
@Override
@@ -35,7 +42,7 @@ public List<Map<String, String>> taskConfigs(final int maxTasks) {
3542
List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);
3643

3744
for (int configIndex = 0; configIndex < maxTasks; ++configIndex) {
38-
taskConfigs.add(this.config);
45+
taskConfigs.add(this.config.originalsStrings());
3946
}
4047

4148
return taskConfigs;

src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/RedisSinkTask.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
1313
import java.util.Collection;
1414
import java.util.Map;
15+
import org.apache.kafka.common.config.ConfigException;
1516
import org.apache.kafka.connect.errors.ConnectException;
1617
import org.apache.kafka.connect.sink.SinkRecord;
1718
import org.apache.kafka.connect.sink.SinkTask;
@@ -43,8 +44,16 @@ public String version() {
4344
@Override
4445
public void start(final Map<String, String> props) {
4546
// Map the task properties to config object
46-
final RedisSinkConfig config = new RedisSinkConfig(props);
47+
final RedisSinkConfig config;
4748

49+
try {
50+
config = new RedisSinkConfig(props);
51+
}
52+
catch (ConfigException configException) {
53+
throw new ConnectException("task configuration error");
54+
}
55+
56+
// Set up the writer
4857
if (config.isRedisClusterEnabled()) {
4958
this.redisClusterClient = RedisClusterClient.create(config.getRedisUri());
5059
this.redisClusterConnection = this.redisClusterClient.connect();

src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/config/RedisSinkConfig.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77
import org.apache.kafka.common.config.ConfigDef.Type;
88

99
public class RedisSinkConfig extends AbstractConfig {
10-
// TODO Store as password
11-
private static final String REDIS_URI = "redis.uri";
10+
public static final String REDIS_URI = "redis.uri";
1211
private static final String REDIS_URI_DOC = "Redis uri.";
1312
private final String redisUri;
1413

15-
private static final String REDIS_CLUSTER_ENABLED = "redis.cluster.enabled";
14+
public static final String REDIS_CLUSTER_ENABLED = "redis.cluster.enabled";
1615
private static final String REDIS_CLUSTER_ENABLED_DOC = "Redis cluster mode enabled.";
1716
private static final boolean REDIS_CLUSTER_ENABLED_DEFAULT = false;
1817
private final boolean redisClusterEnabled;

src/main/java/io/github/jaredpetersen/kafkaconnectredis/source/RedisSourceConnector.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,22 @@
22

33
import io.github.jaredpetersen.kafkaconnectredis.source.config.RedisSourceConfig;
44
import io.github.jaredpetersen.kafkaconnectredis.util.VersionUtil;
5-
import java.util.Collections;
5+
import java.util.HashMap;
66
import java.util.List;
77
import java.util.Map;
8+
import java.util.stream.Collectors;
89
import org.apache.kafka.common.config.ConfigDef;
10+
import org.apache.kafka.common.config.ConfigException;
911
import org.apache.kafka.connect.connector.Task;
12+
import org.apache.kafka.connect.errors.ConnectException;
1013
import org.apache.kafka.connect.source.SourceConnector;
14+
import org.apache.kafka.connect.util.ConnectorUtils;
1115

1216
/**
1317
* Entry point for Kafka Connect Redis Sink.
1418
*/
1519
public class RedisSourceConnector extends SourceConnector {
16-
private Map<String, String> config;
20+
private RedisSourceConfig config;
1721

1822
@Override
1923
public String version() {
@@ -22,7 +26,13 @@ public String version() {
2226

2327
@Override
2428
public void start(final Map<String, String> props) {
25-
this.config = props;
29+
// Map the connector properties to config object
30+
try {
31+
this.config = new RedisSourceConfig(props);
32+
}
33+
catch (ConfigException configException) {
34+
throw new ConnectException("connector configuration error", configException);
35+
}
2636
}
2737

2838
@Override
@@ -32,8 +42,19 @@ public Class<? extends Task> taskClass() {
3242

3343
@Override
3444
public List<Map<String, String>> taskConfigs(final int maxTasks) {
35-
// TODO create a task for each channel/pattern
36-
return Collections.singletonList(this.config);
45+
// Partition the configs based on channels
46+
final List<List<String>> partitionedRedisChannels = ConnectorUtils
47+
.groupPartitions(this.config.getRedisChannels(), Math.min(this.config.getRedisChannels().size(), maxTasks));
48+
49+
// Create task configs based on the partitions
50+
return partitionedRedisChannels.stream()
51+
.map(redisChannels -> {
52+
final Map<String, String> taskConfig = new HashMap<>(this.config.originalsStrings());
53+
taskConfig.put(RedisSourceConfig.REDIS_CHANNELS, String.join(",", redisChannels));
54+
55+
return taskConfig;
56+
})
57+
.collect(Collectors.toList());
3758
}
3859

3960
@Override

0 commit comments

Comments
 (0)