Skip to content

Commit

Permalink
Add GCP Pub Sub Source Connector, FromJSON SMT
Browse files Browse the repository at this point in the history
The Google Cloud Pub/Sub Source Connector can be used to ingest data
from a topic/subscription. The connector knows which GCP subscription to
listen for messages on using the following configuration properties:
- `gcp.pubsub.project.id`
- `gcp.pubsub.topic.id`
- `gcp.pubsub.subscription.id`

The GCP private key credentials are loaded from the file located on the
Docker container file system and can be downloaded from the GCP console:
- `gcp.pubsub.credentials.path`

Alternatively, the JSON file contents can be inlined:
- `gcp.pubsub.credentials.json`

The full list of connector configuration properties can be found at
- https://docs.confluent.io/kafka-connectors/gcp-pubsub/current/configuration_options.html#pubsub-source-connector-config

---

The scenario covered in this connector expects a JSON string to be
published to the GCP Topic. In order to convert that JSON object to a
Kafka Connect Struct the `FromJSON` SMT is being used:
- `com.github.jcustenborder.kafka.connect.json.FromJson$Value`

The `FromJson` SMT requires a JSON schema to enable it to map the
JSON properties to the Struct fields. This is done by referencing a json
file in the Docker container file system:
- `"transforms.fromJson.json.schema.location": "Url"`
- `"transforms.fromJson.json.schema.url": "file:///schemas/FromJson-pub-sub-schema.json"`

This JSON schema can also be inlined:
- `"transforms.fromJson.json.schema.location": "Inline"`
- `json.schema.inline`

The payload from GCP is stored in the `MessageData` field and extracted.

---

The target Kafka Topic has a JSON schema applied to the value subject to
ensure only valid data is produced to the topic, we use this converter
to perform the JSON schema validation:
- `"value.converter": "io.confluent.connect.json.JsonSchemaConverter"`

If the JSON object in the value of the record does not conform to the
JSON schema, the record will be rejected and an error with the JSON
validation details will be logged.

---

We use ValueToKey/ExtractField$Key SMTs to extract the OrderNumber and
use it as the Kafka record key:
 - https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html

See more at:
- https://www.confluent.io/hub/confluentinc/kafka-connect-gcp-pubsub
- https://github.com/jcustenborder/kafka-connect-json-schema
- https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-json-schema/transformations/FromJson.html
  • Loading branch information
bazzani committed Jun 24, 2024
1 parent 730a576 commit 06eb21c
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 15 deletions.
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ USER appuser
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.7.6 && \
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.5

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-gcp-pubsub:1.2.5 && \
confluent-hub install --no-prompt jcustenborder/kafka-connect-json-schema:0.2.5

USER root
RUN yum install jq -y
USER appuser

COPY connect-scripts /connect-scripts
COPY ./connect-smt-lib/build/libs/connect-smt-lib-*.jar /usr/share/java/kafka
COPY ./credentials /credentials
COPY schemas/gcp-pubsub-FromJson-schema.json /schemas/gcp-pubsub-FromJson-schema.json
COPY connect-connector-configs /connect-connector-configs

ENTRYPOINT ["sh","/connect-scripts/connect-entrypoint.sh"]
63 changes: 48 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,26 @@
[![Lines of Code](https://sonarcloud.io/api/project_badges/measure?project=bazzani_kafka-connect-stack&metric=ncloc)](https://sonarcloud.io/summary/new_code?id=bazzani_kafka-connect-stack)

<!-- TOC -->

* [Kafka Connect Stack with Integration Tests](#kafka-connect-stack-with-integration-tests)
* [TL;DR](#tldr)
* [Project purpose](#project-purpose)
* [Continuous Integration](#continuous-integration)
* [Circle CI](#circle-ci)
* [SonarCloud](#sonarcloud)
* [Project structure](#project-structure)
* [How do I extend this project?](#how-do-i-extend-this-project)
* [Technologies Used](#technologies-used)
* [Docker Compose stack](#docker-compose-stack)
* [Ports](#ports)
* [Integration Tests](#integration-tests)
* [Container logs](#container-logs)
* [Running Integration tests once](#running-integration-tests-once)
* [JaCoCo coverage](#jacoco-coverage)
* [TODOs](#todos)
* [TL;DR](#tldr)
* [Project purpose](#project-purpose)
* [Continuous Integration](#continuous-integration)
* [Circle CI](#circle-ci)
* [SonarCloud](#sonarcloud)
* [Project structure](#project-structure)
* [How do I extend this project?](#how-do-i-extend-this-project)
* [Technologies Used](#technologies-used)
* [Docker Compose stack](#docker-compose-stack)
* [Ports](#ports)
* [Integration Tests](#integration-tests)
* [Container logs](#container-logs)
* [Running Integration tests once](#running-integration-tests-once)
* [JaCoCo coverage](#jacoco-coverage)
* [Manual Tests](#manual-tests)
* [GCP Pub Sub Source Connector](#gcp-pub-sub-source-connector)
* [TODOs](#todos)

<!-- TOC -->

## TL;DR
Expand Down Expand Up @@ -308,6 +312,35 @@ directory.

---

## Manual Tests

### GCP Pub Sub Source Connector

1. Create the Kafka Topic specified in
the [connector configuration](connect-connector-configs/gcp-pubsub-source-connector.json)
2. Apply the [JSON schema](schemas/gcp-pubsub-kafka-topic-schema.json) to the Topic value subject
3. Log in to the [GCP console](https://console.cloud.google.com)
4. Add the private key credentials file contents from the GCP console for a Service Account that has the
`Pub/Sub Subscriber` role - https://console.cloud.google.com/iam-admin/serviceaccounts
5. Create the GCP topic and subscription specified in
the [connector configuration](connect-connector-configs/gcp-pubsub-source-connector.json)
6. Run this command from a local terminal to produce a message on the GCP topic:
- `gcloud pubsub topics publish kafka-connect-topic --message="{\"OrderNumber\":\"8caebe13-3f79-4861-88df-a7953424381b\",\"SiteId\":\"SITE123456\",\"Locale\":\"en-US\",\"CreatedTime\":\"2024-06-19T11:49:46.936983\",\"AddressId\":123,\"OrderValue\":999.99}"`
- the expanded payload looks like this:
```json
{
"OrderNumber": "8caebe13-3f79-4861-88df-a7953424381b",
"SiteId": "SITE123456",
"Locale": "en-US",
"CreatedTime": "2024-06-19T11:49:46.936983",
"AddressId": 123,
"OrderValue": 999.99
}
```
7. Check the Confluent Control Center UI for a record being produced to the Kafka Topic

---

## TODOs

Some outstanding tasks to make the project more complete can be found [here](todo/README.md)
35 changes: 35 additions & 0 deletions connect-connector-configs/gcp-pubsub-source-connector.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"name": "gcp-pubsub-source-connector",
"config": {
"connector.class": "io.confluent.connect.gcp.pubsub.PubSubSourceConnector",
"tasks.max": "1",
"confluent.topic.bootstrap.servers": "broker:29092",
"gcp.pubsub.project.id": "be-mygwtproject",
"gcp.pubsub.topic.id": "kafka-connect-topic",
"gcp.pubsub.subscription.id": "kafka-connect-topic-sub",
"gcp.pubsub.credentials.path": "/credentials/gcp-pubsub-credentials.json",
"gcp.pubsub.data.format": "utf_8",
"kafka.topic": "pub-sub-topic",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "false",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.auto.register.schemas": "false",
"value.converter.use.latest.version": "true",
"value.converter.latest.compatibility.strict": "false",
"value.converter.json.fail.invalid.schema": "true",
"transforms": "extractValue,fromJson,valueToKey,extractKey",
"transforms.extractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractValue.field": "MessageData",
"transforms.fromJson.type": "com.github.jcustenborder.kafka.connect.json.FromJson$Value",
"transforms.fromJson.json.schema.location": "Url",
"transforms.fromJson.json.schema.url": "file:///schemas/gcp-pubsub-FromJson-schema.json",
"transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.valueToKey.fields": "OrderNumber",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "OrderNumber",
"errors.tolerance": "all",
"errors.log.include.messages": "true",
"errors.log.enable": "true"
}
}
13 changes: 13 additions & 0 deletions credentials/gcp-pubsub-credentials.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"type": "service_account",
"project_id": "",
"private_key_id": "",
"private_key": "",
"client_email": "",
"client_id": "",
"auth_uri": "",
"token_uri": "",
"auth_provider_x509_cert_url": "",
"client_x509_cert_url": "",
"universe_domain": ""
}
25 changes: 25 additions & 0 deletions schemas/gcp-pubsub-FromJson-schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "PubSubData",
"type": "object",
"properties": {
"OrderNumber": {
"type": "string"
},
"SiteId": {
"type": "string"
},
"Locale": {
"type": "string"
},
"CreatedTime": {
"type": "string"
},
"AddressId": {
"type": "integer"
},
"OrderValue": {
"type": "number"
}
}
}
32 changes: 32 additions & 0 deletions schemas/gcp-pubsub-kafka-topic-schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "JSON Data sourced from GCP Pub Sub",
"title": "pub-sub-data",
"type": "object",
"properties": {
"AddressId": {
"type": "integer"
},
"CreatedTime": {
"maxLength": 26,
"type": "string"
},
"Locale": {
"type": "string",
"minLength": 5,
"maxLength": 5
},
"OrderNumber": {
"maxLength": 36,
"type": "string"
},
"OrderValue": {
"type": "number",
"minimum": 10
},
"SiteId": {
"maxLength": 10,
"type": "string"
}
}
}

0 comments on commit 06eb21c

Please sign in to comment.