-
Notifications
You must be signed in to change notification settings - Fork 897
/
launch-connect-es-sink-worker-container_gcloud.sh
executable file
·143 lines (138 loc) · 6.27 KB
/
launch-connect-es-sink-worker-container_gcloud.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/bin/bash
#
# Robin Moffatt <[email protected]>
# 12 November 2019
#
# This script will launch a Kafka Connect worker on GCP connecting to Confluent Cloud.
# It will install a connector plugin and once the worker is running submit a connector
# configuration to it.
#
# Since Kafka Connect configuration is stored in Kafka itself, the state for this worker
# is not tied to its instance but to the Kafka cluster (Confluent Cloud). This script
# uses the timestamp to uniquely label the instance of the Kafka Connect cluster's topics
# for this reason.
#
# Use:
# ----
#
# 1. Populate .env with the following:
# CONFLUENTPLATFORM_VERSION=5.4.0-beta1
#
# CCLOUD_BROKER_HOST=
# CCLOUD_API_KEY=
# CCLOUD_API_SECRET=
# CCLOUD_SCHEMA_REGISTRY_URL=
# CCLOUD_SCHEMA_REGISTRY_API_KEY=
# CCLOUD_SCHEMA_REGISTRY_API_SECRET=
#
# 2. Install the gcloud CLI and run `gcloud init` to set up authentication
#
# 3. Tweak the connector config, plugin name, target topic name etc.
# -------------------
# Epoch will be our unique ID.
# If you're need something more unique, you can write it here ;)
epoch=$(date +%s)
# Load credentials
source .env
# Build the properties file
PROPERTIES_FILE=/tmp/connect-worker-${epoch}_gcloud_env.properties
echo $PROPERTIES_FILE
# Need do it this way to interpolate some of the values
# (and passing env vars natively in gcloud CLI is a joke)
cat > ${PROPERTIES_FILE}<<EOF
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN=[%d] %p %X{connector.context}%m (%c:%L)%n
CONNECT_CUB_KAFKA_TIMEOUT=300
CONNECT_BOOTSTRAP_SERVERS=${CCLOUD_BROKER_HOST}:9092
CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect-01
CONNECT_REST_PORT=8083
CONNECT_GROUP_ID=kafka-connect-group-es-${epoch}
CONNECT_CONFIG_STORAGE_TOPIC=_kafka-connect-group-es-${epoch}-configs
CONNECT_OFFSET_STORAGE_TOPIC=_kafka-connect-group-es-${epoch}-offsets
CONNECT_STATUS_STORAGE_TOPIC=_kafka-connect-group-es-${epoch}-status
CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=${CCLOUD_SCHEMA_REGISTRY_URL}
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=${CCLOUD_SCHEMA_REGISTRY_URL}
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_LOG4J_ROOT_LOGLEVEL=INFO
CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3
CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components/
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
CONNECT_SASL_MECHANISM=PLAIN
CONNECT_SECURITY_PROTOCOL=SASL_SSL
CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="${CCLOUD_API_KEY}" password="${CCLOUD_API_SECRET}";
CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="${CCLOUD_API_KEY}" password="${CCLOUD_API_SECRET}";
CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="${CCLOUD_API_KEY}" password="${CCLOUD_API_SECRET}";
EOF
gcloud beta compute \
--project=devx-testing instances create-with-container rmoff-connect-mqtt-es-${epoch} \
--machine-type=n1-standard-1 \
--subnet=default \
--metadata=google-logging-enabled=true \
--maintenance-policy=MIGRATE \
--image=cos-stable-77-12371-114-0 \
--image-project=cos-cloud \
--no-scopes \
--no-service-account \
--boot-disk-size=10GB \
--boot-disk-type=pd-standard \
--boot-disk-device-name=rmoff-connect-mqtt-${epoch} \
--container-restart-policy=always \
--labels=container-vm=cos-stable-77-12371-114-0 \
--container-image=confluentinc/cp-kafka-connect-base:${CONFLUENTPLATFORM_VERSION} \
--container-env-file=${PROPERTIES_FILE} \
--container-command=bash \
--container-arg=-c \
--container-arg='set -x
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:5.3.1
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
while : ; do
curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $(date) " Kafka Connect listener HTTP state: " $curl_status " (waiting for 200)"
if [ $curl_status -eq 200 ] ; then
break
fi
sleep 5
done
#
echo -e "\n--\n+> Creating Kafka Connect Elasticsearch sink"
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/sink-elastic-phone-mqtt/config \
-d '"'"'{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "'${ELASTIC_URL}'",
"connection.username": "'${ELASTIC_USERNAME}'",
"connection.password": "'${ELASTIC_PASSWORD}'",
"type.name": "",
"behavior.on.malformed.documents": "warn",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true,
"topics.regex": "pksqlc-e8gj5PHONE_DATA",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}'"'"'
#
sleep infinity'
# Wossup, credential leakage?
rm ${PROPERTIES_FILE}
rm /tmp/config-${epoch}.properties