Welcome to the Microsoft Kafka Bootcamp. We will be getting started with some basics of Kafka and its associated components.
- Confluent Cloud Account - Please register an account if you have not already done it.
- Docker and docker-compose installed on your machine
- Docker Engine - https://docs.docker.com/engine/install/
- Docker compose - https://docs.docker.com/compose/install/other/
- Gradle installed
- Java 11 installed and configured as the current Java version for the environment. Verify that
java -versionoutputs version 11 and ensure that theJAVA_HOMEenvironment variable is set to the Java installation directory containingbin. - IDE of your choice - Visual Studio or IntelliJ
- Create a Basic Kafka cluster on your Confluent Cloud account.
- Create a Essentials Stream Governance package on the same environment as your Kafka cluster.
- You can obtain the Kafka cluster bootstrap server configuration from the Cluster settings section under
Cluster Overview. - Similarly, you can obtain the schema registry URL endpoint from the stream governance section.
-
We will need a Kafka API Key for the clients to connect to Kafka cluster. So, let’s create one.
-
We will need a Kafka API Key for the clients to connect to Kafka cluster. So, let’s create one.
-
Create a API key by navigating to the
API Keyssection underCluster Overview- Scope of the API key will be global.
Note: Global scopes are not recommended in production. Creating API key with granular permissions using ACL(s) is out of scope for this bootcamp.
-
Download the CLI archive,
curl -O https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
-
Extract the CLI,
tar zxf kafka_2.13-3.3.1.tgz
-
Add the CLI bin to environment PATH
export PATH=$PATH:$PWD/kafka_2.13-3.3.1/bin
-
We will need to configure the
client.propertiesfile for the Admin Client to connect to Kafka Cluster.sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<Kafka-api-key>" \ password="<Kafka-api-secret>";We will need the bootstrap server in addition to the above.
-
List topics from the Kafka cluster
kafka-topics.sh --bootstrap-server <kafka-server> --command-config client.properties --list
-
Create a topic
kafka-topics.sh --bootstrap-server <kafka-server> --command-config client.properties --create --topic test_topic
-
Describe a topic
kafka-topics.sh --bootstrap-server <kafka-server> --command-config client.properties --describe --topic test_topic
-
Delete a topic
kafka-topics.sh --bootstrap-server <kafka-server> --command-config client.properties --delete --topic test_topic
-
Create a
producer.propertiesfile with the connection details as well as producer related configurations. For more details on the available Producer configuration, visit this linksasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<Kafka-api-key>" \ password="<Kafka-api-secret>"; key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer batch.size=16384 linger.ms=0 acks=all -
Produce to a topic from the console
kafka-console-producer.sh --bootstrap-server <kafka-server> --producer.config producer.properties --topic test_topic
-
Create a
consumer.propertiesfile with the connection details as well as consumer related configurations. For more details on the available Consumer configuration, visit this linksasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<Kafka-api-key>" \ password="<Kafka-api-secret>"; key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer fetch.min.bytes=1 auto.offset.reset=earliest -
Consume from a topic in console
kafka-console-consumer.sh --bootstrap-server <kafka-server> --consumer.config consumer.properties --topic test_topic --group test_group
-
The consumer group id of the consumer can be defined with the
--grouptag as seen above.
-
Go to the schema registry example directory.
cd java-client/ -
A sample producer code is at
java-client/src/main/java/examples/ProducerAvroExample.java -
Gradle build file with the required dependencies mentioned is stored as
build.gradle -
You can test the code before preceding by compiling with:
gradle build
And you should see:
BUILD SUCCESSFUL
-
To build a JAR that we can run from the command line, first run:
gradle shadowJar
-
Create a
producer.propertiesfile at the Java folder with schema registry credentials,bootstrap.servers=<kafka-bootstrap-server> sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<Kafka-api-key>" \ password="<Kafka-api-secret>"; key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer batch.size=16384 linger.ms=0 acks=all schema.registry.url=<schema-registry-url> schema.registry.basic.auth.credentials.source=USER_INFO schema.registry.basic.auth.user.info=<schema-registry-api-key>:<schema-registry-api-secret> -
Run the following command to build and execute the producer application, which will produce some random data events to the
test_avrotopic.java -cp build/libs/kafka-clients-java-7.3.1.jar main.java.examples.ProducerAvroExample producer.properties test_avro
-
A sample consumer code is at
src/main/java/examples/ConsumerAvroExample.java -
You can test the code before preceding by compiling with:
gradle build
And you should see:
BUILD SUCCESSFUL
-
To build a JAR that we can run from the command line, first run:
gradle shadowJar
-
Create a
consumer.propertiesfile at the Java folderbootstrap.servers=<kafka-bootstrap-server> sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<Kafka-api-key>" \ password="<Kafka-api-secret>"; key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer fetch.min.bytes=1 auto.offset.reset=earliest group.id=test_avro_consumer schema.registry.url=<schema-registry-url> schema.registry.basic.auth.credentials.source=USER_INFO schema.registry.basic.auth.user.info=<schema-registry-api-key>:<schema-registry-api-secret> -
From another terminal, run the following command to run the consumer application which will read the events from the
test_avrotopic and write the information to the terminal.java -cp build/libs/kafka-clients-java-7.3.1.jar main.java.examples.ConsumerAvroExample consumer.properties test_avro
-
The consumer application will start and print any events it has not yet consumed and then wait for more events to arrive. Once you are done with the consumer, press
ctrl-cto terminate the consumer application.
Run the kstreams application in kstreams directory.
bash java -cp target/kstreams-1.0-SNAPSHOT-jar-with-dependencies.jar com.bootcamp.CustomerStreams producer.properties customers
Produce AVRO formatted data to customers topic first.
```bash
java -cp build/libs/kafka-clients-java-7.3.1.jar main.java.examples.CustomersAvroProducer ../kstreams/producer.properties customers
```
-
We will deploy a standalone Kafka Connect cluster using the
cp-kafka-connectimage which is configured to connect to the Kafka cluster in Confluent Cloud. -
The Kafka Connect configurations should be given as environment variables. Please refer to this link on the format the configurations should follow.
-
The default
cp-kafka-connectimage only contains a handful of connectors and does not contain popular connectors like JDBC, S3-sink etc. -
We need to bake a new image by installing the required connectors on top of the
cp-kafka-connectimage. For this, we will use aDockerfilelike below,# Stage 1 -- install connectors FROM confluentinc/cp-server-connect:7.3.1 AS install-connectors ENV CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" # Install SSE connector RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.6.0 RUN confluent-hub install confluentinc/kafka-connect-azure-blob-storage-source:2.5.1 RUN confluent-hub install confluentinc/kafka-connect-azure-blob-storage:1.6.15 RUN confluent-hub install confluentinc/kafka-connect-azure-data-lake-gen2-storage:1.6.15 RUN confluent-hub install confluentinc/kafka-connect-azure-functions:2.0.2 -
The above
Dockerfilewill be called inside thedocker-compose.ymlwhile starting up the docker containers. -
Please refer to the
docker-compose.ymlfile for reference. -
Please refer to the sample connector config file as instructed.
-
Create a connector through the HTTP Post call,
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '@<file-path>' -
Check the status of the connectors,
curl -X GET http://localhost:8083/connectors?expand=status -
Delete a connector,
curl -X DELETE http://localhost:8083/connectors/<connector-name> -
Stop the Kafka connect worker,
docker-compose down -v
- We will deploy a standalone KSQL server using the
cp-ksqldb-serverimage which is configured to connect to the Kafka cluster in Confluent Cloud. - For configuration format, please visit this link
- Please refer to the
docker-compose.ymlfile for reference. - More details on the KSQL Configurations to follow
- Connect to the ksql server using the KSQL Cli,
# Connect the ksqlDB CLI to the server. docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088 - Stop the KSQL cluster,
docker-compose down -v