|
| 1 | +--- |
| 2 | +version: '3' |
| 3 | +services: |
| 4 | + |
| 5 | + connect: |
| 6 | + image: confluentinc/cp-kafka-connect-base:6.1.0 |
| 7 | + container_name: connect |
| 8 | + ports: |
| 9 | + - 8083:8083 |
| 10 | + environment: |
| 11 | + CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n" |
| 12 | + CONNECT_CUB_KAFKA_TIMEOUT: 300 |
| 13 | + CONNECT_BOOTSTRAP_SERVERS: "{{CCLOUD_ENDPOINT}}" |
| 14 | + CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-ccloud' |
| 15 | + CONNECT_REST_PORT: 8083 |
| 16 | + CONNECT_GROUP_ID: kafka-connect-datagen-v01 |
| 17 | + CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-datagen-v01-configs |
| 18 | + CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-datagen-v01-offsets |
| 19 | + CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-datagen-v01-status |
| 20 | + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter |
| 21 | + CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter |
| 22 | + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "{{SCHEMA_REGISTRY_URL}}" |
| 23 | + CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO" |
| 24 | + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "{{SR_KEY}}:{{SR_SECRET}}" |
| 25 | + CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO' |
| 26 | + CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR' |
| 27 | + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3' |
| 28 | + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3' |
| 29 | + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3' |
| 30 | + CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/' |
| 31 | + # Confluent Cloud config |
| 32 | + CONNECT_REQUEST_TIMEOUT_MS: "20000" |
| 33 | + CONNECT_RETRY_BACKOFF_MS: "500" |
| 34 | + CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https" |
| 35 | + CONNECT_SASL_MECHANISM: "PLAIN" |
| 36 | + CONNECT_SECURITY_PROTOCOL: "SASL_SSL" |
| 37 | + CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{{CCLOUD_KEY}}\" password=\"{{CCLOUD_SECRET}}";" |
| 38 | + # |
| 39 | + CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL" |
| 40 | + CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https" |
| 41 | + CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN" |
| 42 | + CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{{CCLOUD_KEY}}\" password=\"{{CCLOUD_SECRET}}";" |
| 43 | + CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000" |
| 44 | + CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500" |
| 45 | + # |
| 46 | + CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL" |
| 47 | + CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https" |
| 48 | + CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN" |
| 49 | + CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{{CCLOUD_KEY}}\" password=\"{{CCLOUD_SECRET}}";" |
| 50 | + CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000" |
| 51 | + CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500" |
| 52 | + command: |
| 53 | + # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option' |
| 54 | + - bash |
| 55 | + - -c |
| 56 | + - | |
| 57 | + echo "Installing DataGen connector plugin" |
| 58 | + mkdir -p /usr/share/confluent-hub-components/ |
| 59 | + confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-datagen:0.5.0 |
| 60 | + # |
| 61 | + echo "Launching Kafka Connect worker" |
| 62 | + /etc/confluent/docker/run & |
| 63 | + # |
| 64 | + echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳" |
| 65 | + while : ; do |
| 66 | + curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) |
| 67 | + echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)" |
| 68 | + if [ $$curl_status -eq 200 ] ; then |
| 69 | + break |
| 70 | + fi |
| 71 | + sleep 5 |
| 72 | + done |
| 73 | + echo -e "\n--\n+> Creating Kafka Connect source connector" |
| 74 | + curl -X PUT http://localhost:8083/connectors/dg_shirts/config \ |
| 75 | + -i -H "Content-Type: application/json" -d '{ |
| 76 | + "connector.class" : "io.confluent.kafka.connect.datagen.DatagenConnector", |
| 77 | + "kafka.topic" : "shirts", |
| 78 | + "schema.string" : "{\"type\":\"record\",\"name\":\"shirt\",\"fields\":[{\"name\":\"color\",\"type\":{\"type\": \"string\",\"arg.properties\":{\"options\":[\"Gray\",\"Black\",\"Tie-dye\"]}}},{\"name\":\"size\",\"type\":{\"type\": \"string\",\"arg.properties\":{\"options\":[\"Small\",\"Medium\",\"Large\"]}}},{\"name\":\"logo\",\"type\":{\"type\": \"string\",\"arg.properties\":{\"options\":[\"Kafka\",\"Confluent\",\"Kafka Streams\"]}}}]}", |
| 79 | + "max.interval" : 1000, |
| 80 | + "iterations" : 1000, |
| 81 | + "topic.creation.default.partitions" : 1, |
| 82 | + "topic.creation.default.replication.factor" : 3 |
| 83 | + }' |
| 84 | + # |
| 85 | + sleep infinity |
0 commit comments