Skip to content

Latest commit

 

History

History
169 lines (123 loc) · 10.5 KB

File metadata and controls

169 lines (123 loc) · 10.5 KB

Kafka 连接器

上级文档:连接器

Kafka 连接器支持如下功能点:

  • 批式场景下支持 At Least Once 的写入
  • 流式场景下支持 Exactly Once 的读取

依赖引入

Kafka连接器内部使用 1.0.1 版本的org.apache.kafka:kafka-clients进行数据写入,因此在使用kafka写连接器时时需要注意目标kafka集群能使用此版本kafka-clients连接。

<dependency>
   <groupId>com.bytedance.bitsail</groupId>
   <artifactId>bitsail-connector-kafka</artifactId>
   <version>${revision}</version>
</dependency>

Kafka读连接器

参数配置

参数名 参数是否必需 参数默认值 参数说明
class Kafka读连接器类名,只能为com.bytedance.bitsail.connector.legacy.kafka.source.KafkaSourceFunctionDAGBuilder
child_connector_type 指定连接的消息队列种类,只能为kafka
reader_parallelism_num 读并发数

KafkaConsumer属性设置

Kafka连接器底层使用FlinkKafkaConsumer进行读取。初始化FlinkKafkaConsumer的属性或者kafka信息均通过选项job.reader.connector传入,具体形式如下:

{
  "job": {
    "reader": {
      "connector": {
        "prop_key": "prop_value"   // "prop_key"和"prop_value"代指属性key和属性value
      }
    }
  }
}

job.reader.connector支持 <string,string> 形式的KV配置,其中:

  • prop_key: KafkaConsumer属性key
  • prop_value: KafkaConsumer属性value

下面列举了一些常见的属性key:

1. kafka集群属性

属性key 属性是否必需 属性默认值 属性可选值 属性说明
connector.bootstrap.servers 读取的kafka集群地址
connector.topic 读取的topic
connector.group.id kafka消费组

2. kafka起始消费属性

属性key 属性是否必需 属性默认值 属性可选值 属性说明
connector.startup-mode group-offsets 1. ealiest-offset: 从partition的最早offset开始消费
2. latest-offset: 从partition的最新offset开始消费
3. group-offsets: 设置从当前consumer group的offset开始消费
4. specific-offsets: 指定各个partition的起始消费offset,配合connector.specific-offsets使用
5. specific-timestamp: 指定消费某个时间点后的数据,配合connector.specific-timestamp使用
决定kafka从何处开始消费
connector.specific-offsets 配合specific-offsets使用,格式为标准json字符串,例如:
[{"partition":1,"offset":100},{"partition":2,"offset":200}]
connector.specific-timestamp 配合specific-timestamp使用,指定KafkaConsumer的消费起始时间戳,单位ms

3. 其他FlinkKafkaConsumer参数

FlinkKafkaConsumer支持许多参数,详情可参考ConsumerConfig(2.1.0) API]

若用户需要设置这些参数,可通过此 connector.XXX 来进行配置。 例如,若要设置MAX_PARTITION_FETCH_BYTES_CONFIG为1024,则添加参数:

{
  "job": {
    "reader": {
      "connector": {
        "connector.max.partition.fetch.bytes": "1024"
      }
    }
  }
}

debug参数

Kafka读连接器用于流式场景,正常情况下会一直消费。若用户想通过只消费有限数量的数据进行debug,则可通过以下参数进行配置。注意这些参数需要添加job.reader前缀。

参数名 参数是否必需 参数默认值 参数说明
enable_count_mode false 是否在发送一段数据后结束当前任务,一般用于测试
count_mode_record_threshold 10000 若enable_count_mode=true,则在kafka消费count_mode_record_threshold条数据后结束当前任务
count_mode_run_time_threshold 600 若enable_count_mode=true,则在任务运行count_mode_record_threshold秒后结束当前任务

支持的消息解析模式

从KafkaConsumer可以拉取到消息ConsumerRecordBitSail 支持两种对ConsumerRecord的处理方式。 用户可通过参数job.reader.format_type决定使用哪种方式:

  • job.reader.format_type="json": 按照json格式解析
    • 在此模式下,BitSail 按照用户设置的参数 job.reader.columns ,对ConsumerRecord中value代表的json格式字符串进行解析。
    • 因此此模式下必需job.reader.columns参数
  • job.reader.format_type="streaming_file": 不解析
    • 在此模式下,会直接将ConsumerRecord中的信息传出,具体结构如下:
    [
   {"index":0, "name":"key", "type":"binary"},     // 消息key
   {"index":1, "name":"value", "type":"binary"},   // 消息value
   {"index":2, "name":"partition", "type":"string"}, // 消息来源partition
   {"index":3, "name":"offset", "type":"long"}     // 消息在partition中的offset
    ]

kafka写连接器

参数配置

注意这些参数需要添加job.writer前缀。

必需参数

参数名 参数默认值 参数说明
class kafka写连接器类名,只能为com.bytedance.bitsail.connector.legacy.kafka.sink.KafkaOutputFormat
kafka_servers kafka的bootstrap server地址,多个bootstrap server地址由逗号 ',' 分隔
topic_name 要写入的topic
columns 数据字段名称及字段类型

可选参数

参数名 参数默认值 参数说明
writer_parallelism_num - 写并发数
partition_field - partition_field包含“job.writer.columns”中的一个或几个field,以逗号分隔(例如:"id,timestamp")。
若partition_field不为空,在发送数据到kafka topic时,会根据record中的这几个field的值决定写入哪个topic
log_failures_only false 在KafkaProducer执行异步send操作失败时:
1. 若log_failures_only=true,则只通过log.error发送失败信息
2. 若log_failures_only=false,则抛出异常
retries 10 KafkaProducer的失败重试次数
retry_backoff_ms 1000 KafkaProducer的失败重试间隔,单位ms
linger_ms 5000 KafkaProducer创建单个batch的最大等待时间,单位ms

其他参数

在初始化KafkaProducer时,用户可通过参数 job.common.optional 传入指定的初始化参数,示例如下:

{
   "job": {
      "common": {
         "optional": {
            "batch.size": 16384,
            "buffer.memory": 33554432
         }
      }
   }
}

相关文档

配置示例文档:Kafka 连接器示例