Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GCP Pub Sub Source Connector, FromJSON SMT #37

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

bazzani
Copy link
Owner

@bazzani bazzani commented Jun 24, 2024

The Google Cloud Pub/Sub Source Connector can be used to ingest data from a topic/subscription. The connector knows which GCP subscription to listen for messages on using the following configuration properties:

  • gcp.pubsub.project.id
  • gcp.pubsub.topic.id
  • gcp.pubsub.subscription.id

The GCP private key credentials are loaded from the file located on the Docker container file system and can be downloaded from the GCP console:

  • gcp.pubsub.credentials.path

Alternatively, the JSON file contents can be inlined:

  • gcp.pubsub.credentials.json

The full list of connector configuration properties can be found at


The scenario covered in this connector expects a JSON string to be published to the GCP Topic. In order to convert that JSON object to a Kafka Connect Struct the FromJSON SMT is being used:

  • com.github.jcustenborder.kafka.connect.json.FromJson$Value

The FromJson SMT requires a JSON schema to enable it to map the JSON properties to the Struct fields. This is done by referencing a json file in the Docker container file system:

  • "transforms.fromJson.json.schema.location": "Url"
  • "transforms.fromJson.json.schema.url": "file:///schemas/FromJson-pub-sub-schema.json"

This JSON schema can also be inlined:

  • "transforms.fromJson.json.schema.location": "Inline"
  • json.schema.inline

The payload from GCP is stored in the MessageData field and extracted.


The target Kafka Topic has a JSON schema applied to the value subject to ensure only valid data is produced to the topic, we use this converter to perform the JSON schema validation:

  • "value.converter": "io.confluent.connect.json.JsonSchemaConverter"

If the JSON object in the value of the record does not conform to the JSON schema, the record will be rejected and an error with the JSON validation details will be logged.


We use ValueToKey/ExtractField$Key SMTs to extract the OrderNumber and use it as the Kafka record key:

See more at:

@bazzani bazzani force-pushed the add-gcp-pub-sub-source-connector branch 2 times, most recently from 06eb21c to a26a381 Compare June 24, 2024 14:03
The Google Cloud Pub/Sub Source Connector can be used to ingest data
from a topic/subscription. The connector knows which GCP subscription to
listen for messages on using the following configuration properties:
- `gcp.pubsub.project.id`
- `gcp.pubsub.topic.id`
- `gcp.pubsub.subscription.id`

The GCP private key credentials are loaded from the file located on the
Docker container file system and can be downloaded from the GCP console:
- `gcp.pubsub.credentials.path`

Alternatively, the JSON file contents can be inlined:
- `gcp.pubsub.credentials.json`

The full list of connector configuration properties can be found at
- https://docs.confluent.io/kafka-connectors/gcp-pubsub/current/configuration_options.html#pubsub-source-connector-config

---

The scenario covered in this connector expects a JSON string to be
published to the GCP Topic. In order to convert that JSON object to a
Kafka Connect Struct the `FromJSON` SMT is being used:
- `com.github.jcustenborder.kafka.connect.json.FromJson$Value`

The `FromJson` SMT requires a JSON schema to enable it to map the
JSON properties to the Struct fields. This is done by referencing a json
file in the Docker container file system:
- `"transforms.fromJson.json.schema.location": "Url"`
- `"transforms.fromJson.json.schema.url": "file:///schemas/FromJson-pub-sub-schema.json"`

This JSON schema can also be inlined:
- `"transforms.fromJson.json.schema.location": "Inline"`
- `json.schema.inline`

The payload from GCP is stored in the `MessageData` field and extracted.

---

The target Kafka Topic has a JSON schema applied to the value subject to
ensure only valid data is produced to the topic, we use this converter
to perform the JSON schema validation:
- `"value.converter": "io.confluent.connect.json.JsonSchemaConverter"`

If the JSON object in the value of the record does not conform to the
JSON schema, the record will be rejected and an error with the JSON
validation details will be logged.

---

We use ValueToKey/ExtractField$Key SMTs to extract the OrderNumber and
use it as the Kafka record key:
 - https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html

See more at:
- https://www.confluent.io/hub/confluentinc/kafka-connect-gcp-pubsub
- https://github.com/jcustenborder/kafka-connect-json-schema
- https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-json-schema/transformations/FromJson.html
@bazzani bazzani force-pushed the add-gcp-pub-sub-source-connector branch from a26a381 to f6f51e3 Compare July 6, 2024 04:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant