Skip to content

Commit

Permalink
Allow more granular retry strategy control. (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
buinauskas authored Oct 4, 2023
1 parent 15c2eb0 commit 9be1383
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 146 deletions.
236 changes: 126 additions & 110 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,176 +10,192 @@ This connector has not yet been published to Confluent Hub. To install it, downl
install it using `confluent-hub` command line tool.

```sh
wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.5/vinted-kafka-connect-vespa-1.0.5-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.5-SNAPSHOT.zip -q
wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.6/vinted-kafka-connect-vespa-1.0.6-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.6-SNAPSHOT.zip -q
```

```sh
confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.5-SNAPSHOT.zip
confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.6-SNAPSHOT.zip
```

### Important
### Operational modes

This connectors can work in two modes, `UPSERT` and `RAW`. In upsert mode, the connector expects records from Kafka to
have a key and value. The key is used as the document ID and the value is used as the document body. In raw mode, the
connector will execute kafka messages as vespa document api operations using document JSON format.
The connector can work in two operational modes which act very differently and serve two distinct cases. It can be
configured using the `vespa.operational.mode` configuration parameter.

### Note
#### UPSERT

Under upsert mode connector supports deletes, if the record stored in Kafka has a null value, this connector will
delete document with the corresponding key to Vespa.
Upsert mode will replace existing documents, tombstones messages will be converted to delete operations. Document keys
are constructed using the following format: `namespace:documenttype:id`. By default, the topic names are used for both
the namespace and the document type, but they can be overriden using the `vespa.namespace` and `vespa.document.type`
configuration properties. If you need something more complex, consider using single message transforms to change the
topic name.

Under upsert mode, document keys are constructed using the following format: `namespace:documenttype:id`. Namespace and
document type are taken from the connector and kafka record is used as the id.
#### RAW

Raw mode will execute kafka messages as vespa document api operations using document JSON format. If you want those
operations to be executed in order, make sure they have a unique kafka key so that the operations are sent to the same
kafka partitions. In this mode, connector does not use the kafka keys at all.

### Configuration

# Connector
#### Connector

`vespa.endpoint`
The comma-separated list of one or more Vespa URLs, such as
`https://node1:8080,http://node2:8080` or `https://node3:8080`. HTTPS is
used for all connections if any of the URLs starts with `https:`. A URL
without a protocol is treated as `http`.

- Type: list
- Default: <http://localhost:8080>
- Valid Values:
- Importance: high
The comma-separated list of one or more Vespa URLs, such as `https://node1:8080,http://node2:8080`
or `https://node3:8080`. HTTPS is used for all connections if any of the URLs starts with `https:`. A URL without a
protocol is treated as `http`.

* Type: list
* Default: http://localhost:8080
* Valid Values:
* Importance: high

`vespa.connections.per.endpoint`
A reasonable value here is a value that lets all feed clients (if more
than one) Sets the number of connections this client will use
collectively have a number of connections which is a small multiple of
the numbers of containers in the cluster to feed, so load can be
balanced across these containers. In general, this value should be kept
as low as possible, but poor connectivity between feeder and cluster may
also warrant a higher number of connections.

- Type: int
- Default: 8
- Valid Values: \[1,...\]
- Importance: low

A reasonable value here is a value that lets all feed clients (if more than one) Sets the number of connections this
client will use collectively have a number of connections which is a small multiple of the numbers of containers in the
cluster to feed, so load can be balanced across these containers. In general, this value should be kept as low as
possible, but poor connectivity between feeder and cluster may also warrant a higher number of connections.

* Type: int
* Default: 8
* Valid Values: [1,...]
* Importance: low

`vespa.max.streams.per.connection`
This determines the maximum number of concurrent, in-flight requests for
these Sets the maximum number of streams per HTTP/2 client, which is
maxConnections \* maxStreamsPerConnection. Prefer more streams to more
connections, when possible. The feed client automatically throttles load
to achieve the best throughput, and the actual number of streams per
connection is usually lower than the maximum.

- Type: int
- Default: 128
- Valid Values: \[1,...\]
- Importance: low

This determines the maximum number of concurrent, in-flight requests for this. Sets the maximum number of streams per
HTTP/2 client, which is maxConnections * maxStreamsPerConnection. Prefer more streams to more connections, when
possible. The feed client automatically throttles load to achieve the best throughput, and the actual number of streams
per connection is usually lower than the maximum.

* Type: int
* Default: 128
* Valid Values: [1,...]
* Importance: low

`vespa.dryrun`
Turns on dryrun mode, where each operation succeeds after a given delay,
rather than being sent across the network.

- Type: boolean
- Default: false
- Importance: low
Turns on dryrun mode, where each operation succeeds after a given delay, rather than being sent across the network.

* Type: boolean
* Default: false
* Importance: low

`vespa.speedtest`
Turns on speed test mode, where each operation succeeds immediately,
rather than being sent across the network.
Turns on speed test mode, where each operation succeeds immediately, rather than being sent across the network.

- Type: boolean
- Default: false
- Importance: low
* Type: boolean
* Default: false
* Importance: low

`vespa.max.failure.ms`

The period of consecutive failures before shutting down.

- Type: int
- Default: 60000 (1 minute)
- Valid Values: \[10000,...\]
- Importance: low
* Type: int
* Default: 60000 (1 minute)
* Valid Values: [10000,...]
* Importance: low

`vespa.namespace`
User specified part of each document ID in that sense. Namespace can not
be used in queries, other than as part of the full document ID. However,
it can be used for document selection, where namespace can be
accessed and compared to a given string, for instance. An example use
case is visiting a subset of documents. Defaults to topic name if not specified.

- Type: string
- Default: null
- Importance: high
User specified part of each document ID in that sense. Namespace can not be used in queries, other than as part of the
full document ID. However, it can be used for document selection, where namespace can be accessed and compared to a
given string, for instance. An example use case is visiting a subset of documents. Defaults to topic name if not
specified.

* Type: string
* Default: null
* Valid Values: non-empty string without ISO control characters
* Importance: high

`vespa.document.type`
Document type as defined in services.xml and the schema. Defaults to topic name if not specified.

- Type: string
- Default: null
- Importance: high
Document type as defined in services.xml and the schema. Defaults to topic name if not specified

* Type: string
* Default: null
* Valid Values: non-empty string without ISO control characters
* Importance: high

`vespa.operational.mode`
The operational mode of the connector. Valid options are upsert and raw.
Upsert mode will update existing documents and insert new documents,
tombstones messages will be converted to delete operations. Raw mode
executes all operations using document json format as explained in
<https://docs.vespa.ai/en/reference/document-json-format.html>.

- Type: string
- Default: UPSERT
- Valid Values: Matches: `UPSERT`, `RAW`
- Importance: high
The operational mode of the connector. Valid options are upsert and raw. Upsert mode will update existing documents and
insert new documents, tombstones messages will be converted to delete operations. Raw mode executes all operations using
document json format as explained in https://docs.vespa.ai/en/reference/document-json-format.html.

# Operation
* Type: string
* Default: UPSERT
* Valid Values: Matches: `UPSERT`, `RAW`
* Importance: high

`vespa.operation.retries`
Number of retries per operation for assumed transient, non-backpressure
problems.
`vespa.retry.strategy.retries`

- Type: int
- Default: 10
- Valid Values: \[0,...,2147483647\]
- Importance: low
Number of retries per operation for assumed transient, non-backpressure problems.

* Type: int
* Default: 10
* Valid Values: [0,...,2147483647]
* Importance: low

`vespa.retry.strategy.operation.types`

Operation types to retry.

* Type: list
* Default: PUT,UPDATE,REMOVE
* Valid Values: Matches: `PUT`, `UPDATE`, `REMOVE`
* Importance: low

#### Operation

`vespa.operation.timeout.ms`

Feed operation timeout.

- Type: int
- Default: 60000 (1 minute)
- Valid Values: \[0,...,2147483647\]
- Importance: low
* Type: int
* Default: 60000 (1 minute)
* Valid Values: [0,...,2147483647]
* Importance: low

`vespa.operation.route`

Target Vespa route for feed operations.

- Type: string
- Default: null
- Valid Values: non-empty string without ISO control characters
- Importance: low
* Type: string
* Default: null
* Valid Values: non-empty string without ISO control characters
* Importance: low

`vespa.operation.tracelevel`

The trace level of network traffic.

- Type: int
- Default: 0
- Valid Values: \[0,...,9\]
- Importance: low
* Type: int
* Default: 0
* Valid Values: [0,...,9]
* Importance: low

# Data Conversion
#### Data Conversion

`vespa.drop.invalid.message`
Whether to drop kafka message when it cannot be converted to output
message.

- Type: boolean
- Default: false
- Importance: low
Whether to drop kafka message when it cannot be converted to output message.

* Type: boolean
* Default: false
* Importance: low

`vespa.behavior.on.malformed.documents`
How to handle records that Vespa rejects due to document malformation.
Valid options are `ignore`, `warn`, and `fail`.

- Type: string
- Default: FAIL
- Valid Values: Matches: `IGNORE`, `WARN`, `FAIL`
- Importance: low
How to handle records that Vespa rejects due to document malformation. Valid options are `IGNORE`, `WARN`, and `FAIL`.

* Type: string
* Default: FAIL
* Valid Values: Matches: `IGNORE`, `WARN`, `FAIL`
* Importance: low

#### Examples

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.vinted.kafka.connect.vespa</groupId>
<artifactId>kafka-connect-vespa</artifactId>
<version>1.0.5-SNAPSHOT</version>
<version>1.0.6-SNAPSHOT</version>
<name>kafka-connect-vespa</name>
<description>The Vespa Sink Connector is used to write data from Kafka to a Vespa search engine.</description>
<url>https://github.com/vinted/kafka-connect-vespa</url>
Expand Down
Loading

0 comments on commit 9be1383

Please sign in to comment.