1
1
# Demo: Kafka Connect Sink
2
2
## Install Connector
3
- Send a request to the Kafka Connect REST API to configure it to use Kafka Connect Redis:
3
+ Send a request to the Kafka Connect REST API to configure it to use Kafka Connect Redis.
4
4
5
- ### Avro
6
- ** IMPORTANT:** The Avro demo utilizes multiple topics in order to work around [ a bug in the Avro console producer] ( https://github.com/confluentinc/schema-registry/issues/898 ) . A fix has been merged but Confluent has not published a new Docker image for it yet (6.1.0+). Kafka Connect Redis works with Avro on a single topic; this is just a problem with the console producer provided by Confluent.
5
+ First, expose the Kafka Connect server:
6
+ ``` bash
7
+ kubectl -n kcr-demo port-forward service/kafka-connect :rest
8
+ ```
9
+
10
+ Kubectl will choose an available port for you that you will need to use for the cURLs (` $PORT ` ).
7
11
12
+ ### Avro
8
13
``` bash
9
14
curl --request POST \
10
- --url " $( minikube -n kcr-demo service kafka-connect --url ) /connectors" \
15
+ --url " localhost: $PORT /connectors" \
11
16
--header ' content-type: application/json' \
12
17
--data ' {
13
18
"name": "demo-redis-sink-connector",
14
19
"config": {
15
20
"connector.class": "io.github.jaredpetersen.kafkaconnectredis.sink.RedisSinkConnector",
16
21
"key.converter": "io.confluent.connect.avro.AvroConverter",
17
22
"key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
23
+ "key.converter.key.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
18
24
"value.converter": "io.confluent.connect.avro.AvroConverter",
19
25
"value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
26
+ "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
20
27
"tasks.max": "3",
21
- "topics": "redis.commands.set,redis.commands.expire,redis.commands.expireat,redis.commands.pexpire,redis.commands.sadd,redis.commands.geoadd,redis.commands.arbitrary ",
28
+ "topics": "redis.commands",
22
29
"redis.uri": "redis://IEPfIr0eLF7UsfwrIlzy80yUaBG258j9@redis-cluster",
23
30
"redis.cluster.enabled": true
24
31
}
@@ -28,7 +35,7 @@ curl --request POST \
28
35
### Connect JSON
29
36
``` bash
30
37
curl --request POST \
31
- --url " $( minikube -n kcr-demo service kafka-connect --url ) /connectors" \
38
+ --url " localhost: $PORT /connectors" \
32
39
--header ' content-type: application/json' \
33
40
--data ' {
34
41
"name": "demo-redis-sink-connector",
@@ -48,17 +55,18 @@ curl --request POST \
48
55
### Avro
49
56
Create an interactive ephemeral query pod:
50
57
``` bash
51
- kubectl -n kcr-demo run -it --rm kafka-write-records --image confluentinc/cp-schema-registry:6.0 .0 --command /bin/bash
58
+ kubectl -n kcr-demo run -it --rm kafka-write-records --image confluentinc/cp-schema-registry:6.1 .0 --command /bin/bash
52
59
```
53
60
54
- Write records to the ` redis.commands ` topics :
61
+ Write records to the ` redis.commands ` topic :
55
62
56
63
``` bash
57
64
kafka-avro-console-producer \
58
65
--broker-list kafka-broker-0.kafka-broker:9092 \
59
66
--property schema.registry.url=' http://kafka-schema-registry:8081' \
67
+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
60
68
--property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisSetCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"value","type":"string"},{"name":"expiration","type":["null",{"name":"RedisSetCommandExpiration","type":"record","fields":[{"name":"type","type":{"name":"RedisSetCommandExpirationType","type":"enum","symbols":["EX","PX","KEEPTTL"]}},{"name":"time","type":["null","long"]}]}],"default":null},{"name":"condition","type":["null",{"name":"RedisSetCommandCondition","type":"enum","symbols":["NX","XX","KEEPTTL"]}],"default":null}]}' \
61
- --topic redis.commands.set
69
+ --topic redis.commands
62
70
> {" key" :" {user.1}.username" ," value" :" jetpackmelon22" ," expiration" :null," condition" :null}
63
71
> {" key" :" {user.2}.username" ," value" :" anchorgoat74" ," expiration" :{" io.github.jaredpetersen.kafkaconnectredis.RedisSetCommandExpiration" :{" type" :" EX" ," time" :{" long" :2100}}}," condition" :{" io.github.jaredpetersen.kafkaconnectredis.RedisSetCommandCondition" :" NX" }}
64
72
> {" key" :" product.milk" ," value" :" $2 .29" ," expiration" :null," condition" :null}
@@ -70,35 +78,39 @@ kafka-avro-console-producer \
70
78
kafka-avro-console-producer \
71
79
--broker-list kafka-broker-0.kafka-broker:9092 \
72
80
--property schema.registry.url=' http://kafka-schema-registry:8081' \
81
+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
73
82
--property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisExpireCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"seconds","type":"long"}]}' \
74
- --topic redis.commands.expire
83
+ --topic redis.commands
75
84
> {" key" :" product.milk" ," seconds" :1800}
76
85
```
77
86
78
87
``` bash
79
88
kafka-avro-console-producer \
80
89
--broker-list kafka-broker-0.kafka-broker:9092 \
81
90
--property schema.registry.url=' http://kafka-schema-registry:8081' \
91
+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
82
92
--property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisExpireatCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"timestamp","type":"long"}]}' \
83
- --topic redis.commands.expireat
93
+ --topic redis.commands
84
94
> {" key" :" product.bread" ," timestamp" :4130464553}
85
95
```
86
96
87
97
``` bash
88
98
kafka-avro-console-producer \
89
99
--broker-list kafka-broker-0.kafka-broker:9092 \
90
100
--property schema.registry.url=' http://kafka-schema-registry:8081' \
101
+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
91
102
--property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisPexpireCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"milliseconds","type":"long"}]}' \
92
- --topic redis.commands.pexpire
103
+ --topic redis.commands
93
104
> {" key" :" product.waffles" ," milliseconds" :1800000}
94
105
```
95
106
96
107
``` bash
97
108
kafka-avro-console-producer \
98
109
--broker-list kafka-broker-0.kafka-broker:9092 \
99
110
--property schema.registry.url=' http://kafka-schema-registry:8081' \
111
+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
100
112
--property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisSaddCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"values","type":{"type":"array","items":"string"}}]}' \
101
- --topic redis.commands.sadd
113
+ --topic redis.commands
102
114
> {" key" :" {user.1}.interests" ," values" :[" reading" ]}
103
115
> {" key" :" {user.2}.interests" ," values" :[" sailing" ," woodworking" ," programming" ]}
104
116
```
@@ -107,17 +119,19 @@ kafka-avro-console-producer \
107
119
kafka-avro-console-producer \
108
120
--broker-list kafka-broker-0.kafka-broker:9092 \
109
121
--property schema.registry.url=' http://kafka-schema-registry:8081' \
122
+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
110
123
--property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisGeoaddCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"values","type":{"type":"array","items":{"name":"RedisGeoaddCommandGeolocation","type":"record","fields":[{"name":"longitude","type":"double"},{"name":"latitude","type":"double"},{"name":"member","type":"string"}]}}}]}' \
111
- --topic redis.commands.geoadd
124
+ --topic redis.commands
112
125
> {" key" :" Sicily" ," values" :[{" longitude" :13.361389," latitude" :13.361389," member" :" Palermo" },{" longitude" :15.087269," latitude" :37.502669," member" :" Catania" }]}
113
126
```
114
127
115
128
``` bash
116
129
kafka-avro-console-producer \
117
130
--broker-list kafka-broker-0.kafka-broker:9092 \
118
131
--property schema.registry.url=' http://kafka-schema-registry:8081' \
132
+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
119
133
--property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisArbitraryCommand","type":"record","fields":[{"name":"command","type":"string"},{"name":"arguments","type":{"type":"array","items":"string"}}]}' \
120
- --topic redis.commands.arbitrary
134
+ --topic redis.commands
121
135
> {" command" :" TS.CREATE" ," arguments" :[" temperature:3:11" , " RETENTION" , " 60" , " LABELS" , " sensor_id" , " 2" , " area_id" , " 32" ]}
122
136
> {" command" :" TS.ADD" ," arguments" :[" temperature:3:11" , " 1548149181" , " 30" ]}
123
137
> {" command" :" TS.ADD" ," arguments" :[" temperature:3:11" , " 1548149191" , " 42" ]}
@@ -126,7 +140,7 @@ kafka-avro-console-producer \
126
140
### Connect JSON
127
141
Create an interactive ephemeral query pod:
128
142
``` bash
129
- kubectl -n kcr-demo run -it --rm kafka-write-records --image confluentinc/cp-kafka:6.0 .0 --command /bin/bash
143
+ kubectl -n kcr-demo run -it --rm kafka-write-records --image confluentinc/cp-kafka:6.1 .0 --command /bin/bash
130
144
```
131
145
132
146
Write records to the ` redis.commands ` topic:
0 commit comments