Add GCP Pub Sub Source Connector, FromJSON SMT #37
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The Google Cloud Pub/Sub Source Connector can be used to ingest data from a topic/subscription. The connector knows which GCP subscription to listen for messages on using the following configuration properties:
gcp.pubsub.project.id
gcp.pubsub.topic.id
gcp.pubsub.subscription.id
The GCP private key credentials are loaded from the file located on the Docker container file system and can be downloaded from the GCP console:
gcp.pubsub.credentials.path
Alternatively, the JSON file contents can be inlined:
gcp.pubsub.credentials.json
The full list of connector configuration properties can be found at
The scenario covered in this connector expects a JSON string to be published to the GCP Topic. In order to convert that JSON object to a Kafka Connect Struct the
FromJSON
SMT is being used:com.github.jcustenborder.kafka.connect.json.FromJson$Value
The
FromJson
SMT requires a JSON schema to enable it to map the JSON properties to the Struct fields. This is done by referencing a json file in the Docker container file system:"transforms.fromJson.json.schema.location": "Url"
"transforms.fromJson.json.schema.url": "file:///schemas/FromJson-pub-sub-schema.json"
This JSON schema can also be inlined:
"transforms.fromJson.json.schema.location": "Inline"
json.schema.inline
The payload from GCP is stored in the
MessageData
field and extracted.The target Kafka Topic has a JSON schema applied to the value subject to ensure only valid data is produced to the topic, we use this converter to perform the JSON schema validation:
"value.converter": "io.confluent.connect.json.JsonSchemaConverter"
If the JSON object in the value of the record does not conform to the JSON schema, the record will be rejected and an error with the JSON validation details will be logged.
We use ValueToKey/ExtractField$Key SMTs to extract the OrderNumber and use it as the Kafka record key:
See more at: