Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Add messageKey option to Kafka transport (#315)
Browse files Browse the repository at this point in the history
Signed-off-by: Мартынов Максим Сергеевич <[email protected]>
  • Loading branch information
dolfinus authored Apr 18, 2024
1 parent 7e204a0 commit b5f1bdd
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 6 deletions.
32 changes: 26 additions & 6 deletions docs/client/java/partials/java_transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,25 @@ This transport requires the artifact `org.apache.kafka:kafka-clients:3.1.0` (or

- `type` - string, must be `"kafka"`. Required.
- `topicName` - string specifying the topic on what events will be sent. Required.
- `localServerId` - string, id of local server. Required.
- `properties` - a dictionary containing a Kafka producer config as in [Kafka producer config](http://kafka.apache.org/0100/documentation.html#producerconfigs). Required.
- `localServerId` - **deprecated**, renamed to `messageKey` since v1.13.0.
- `messageKey` - string, key for all Kafka messages produced by transport. Optional, default value described below. Added in v1.13.0.

Default values for `messageKey` are:
- `run:{parentJob.namespace}/{parentJob.name}` - for RunEvent with parent facet
- `run:{job.namespace}/{job.name}` - for RunEvent
- `job:{job.namespace}/{job.name}` - for JobEvent
- `dataset:{dataset.namespace}/{dataset.name}` - for DatasetEvent

#### Behavior

Events are serialized to JSON, and then dispatched to the Kafka topic.

#### Notes

It is recommended to provide `messageKey` if Job hierarchy is used. It can be any string, but it should be the same for all jobs in
hierarchy, like `Airflow task -> Spark application -> Spark task runs`.

#### Examples

<Tabs groupId="integrations">
Expand All @@ -254,7 +266,7 @@ transport:
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
localServerId: some-value
messageKey: some-value
```
</TabItem>
Expand All @@ -263,13 +275,12 @@ transport:
```ini
spark.openlineage.transport.type=kafka
spark.openlineage.transport.topicName=openlineage.events
spark.openlineage.transport.localServerId=xxxxxxxx
spark.openlineage.transport.properties.bootstrap.servers=localhost:9092,another.host:9092
spark.openlineage.transport.properties.acks=all
spark.openlineage.transport.properties.retries=3
spark.openlineage.transport.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spark.openlineage.transport.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
spark.openlineage.transport.properties.localServerId=some-value
spark.openlineage.transport.messageKey=some-value
```

</TabItem>
Expand All @@ -278,13 +289,12 @@ spark.openlineage.transport.properties.localServerId=some-value
```ini
openlineage.transport.type=kafka
openlineage.transport.topicName=openlineage.events
openlineage.transport.localServerId=xxxxxxxx
openlineage.transport.properties.bootstrap.servers=localhost:9092,another.host:9092
openlineage.transport.properties.acks=all
openlineage.transport.properties.retries=3
openlineage.transport.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
openlineage.transport.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
openlineage.transport.properties.localServerId=some-value
openlineage.transport.messageKey=some-value
```

</TabItem>
Expand Down Expand Up @@ -318,6 +328,16 @@ OpenLineageClient client = OpenLineageClient.builder()
</TabItem>
</Tabs>

*Notes*:
It is recommended to provide `messageKey` if Job hierarchy is used. It can be any string, but it should be the same for all jobs in
hierarchy, like `Airflow task -> Spark application`.

Default values are:
- `run:{parentJob.namespace}/{parentJob.name}/{parentRun.id}` - for RunEvent with parent facet
- `run:{job.namespace}/{job.name}/{run.id}` - for RunEvent
- `job:{job.namespace}/{job.name}` - for JobEvent
- `dataset:{dataset.namespace}/{dataset.name}` - for DatasetEvent

### [Kinesis](https://github.com/OpenLineage/OpenLineage/blob/main/client/java/src/main/java/io/openlineage/client/transports/KinesisTransport.java)

If a transport type is set to `kinesis`, then the below parameters would be read and used when building KinesisProducer.
Expand Down
14 changes: 14 additions & 0 deletions docs/client/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,24 @@ It can be installed also by specifying kafka client extension: `pip install open
- `topic` - string specifying the topic on what events will be sent. Required.
- `config` - a dictionary containing a Kafka producer config as in [Kafka producer config](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration). Required.
- `flush` - boolean specifying whether Kafka should flush after each event. Optional, default: `true`.
- `messageKey` - string, key for all Kafka messages produced by transport. Optional, default value described below. Added in v1.13.0.

Default values for `messageKey` are:
- `run:{parentJob.namespace}/{parentJob.name}` - for RunEvent with parent facet
- `run:{job.namespace}/{job.name}` - for RunEvent
- `job:{job.namespace}/{job.name}` - for JobEvent
- `dataset:{dataset.namespace}/{dataset.name}` - for DatasetEvent

#### Behavior

- Events are serialized to JSON, and then dispatched to the Kafka topic.
- If `flush` is `true`, messages will be flushed to the topic after each event being sent.

#### Notes

It is recommended to provide `messageKey` if Job hierarchy is used. It can be any string, but it should be the same for all jobs in
hierarchy, like `Airflow task -> Spark application -> Spark task runs`.

#### Using with Airflow integration

There's a caveat for using `KafkaTransport` with Airflow integration. In this integration, a Kafka producer needs to be created
Expand All @@ -208,6 +220,7 @@ transport:
acks: all
retries: 3
flush: true
messageKey: some-value
```
</TabItem>
Expand All @@ -225,6 +238,7 @@ kafka_config = KafkaConfig(
"retries": "3",
},
flush=True,
messageKey="some",
)

client = OpenLineageClient(transport=KafkaTransport(kafka_config))
Expand Down

0 comments on commit b5f1bdd

Please sign in to comment.