FlinkX使用了flink内置Accumulator和Metric来记录任务的一些统计指标:
指标名称 | 含义 |
---|---|
numRead | 累计读取数据条数 |
byteRead | 累计读取数据字节数 |
readDuration | 读取数据的总时间 |
numWrite | 累计写入数据条数 |
byteWrite | 累计写入数据字节数 |
writeDuration | 写入数据的总时间 |
nErrors | 累计错误记录数 |
nullErrors | 累计空指针错误记录数 |
duplicateErrors | 累计主键冲突错误记录数 |
conversionErrors | 累计类型转换错误记录数 |
otherErrors | 累计其它错误记录数 |
local模式运行时,任务结束后会在控制台打印这些指标:
---------------------------------
numWrite | 100
last_write_num_0 | 0
conversionErrors | 0
writeDuration | 12251
numRead | 100
duplicateErrors | 0
snapshotWrite | 0
readDuration | 12247
otherErrors | 0
byteRead | 2329
last_write_location_0 | 0
byteWrite | 2329
nullErrors | 0
nErrors | 0
---------------------------------
任务运行期间,可以通过Flink REST接口获取Accumulator数据,名称和上面给出的一致。
返回数据示例:
{
"job-accumulators": [],
"user-task-accumulators": [
{
"name": "numWrite",
"type": "LongCounter",
"value": "0"
},
{
"name": "last_write_num_0",
"type": "LongCounter",
"value": "0"
},
{
"name": "conversionErrors",
"type": "LongCounter",
"value": "0"
},
{
"name": "writeDuration",
"type": "LongCounter",
"value": "0"
},
{
"name": "numRead",
"type": "LongCounter",
"value": "0"
},
{
"name": "duplicateErrors",
"type": "LongCounter",
"value": "0"
},
{
"name": "snapshotWrite",
"type": "LongCounter",
"value": "0"
},
{
"name": "readDuration",
"type": "LongCounter",
"value": "0"
},
{
"name": "otherErrors",
"type": "LongCounter",
"value": "0"
},
{
"name": "byteRead",
"type": "LongCounter",
"value": "0"
},
{
"name": "last_write_location_0",
"type": "LongCounter",
"value": "0"
},
{
"name": "byteWrite",
"type": "LongCounter",
"value": "0"
},
{
"name": "nullErrors",
"type": "LongCounter",
"value": "0"
},
{
"name": "nErrors",
"type": "LongCounter",
"value": "0"
}
],
"serialized-user-task-accumulators": {}
}
比如将指标输出到prometheus,在flink的配置文件里增加配置即可:
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.interval: 500 MILLISECONDS
metrics.reporter.promgateway.host: 127.0.0.1
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: testjob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
通过prometheus获取数据时的名称为:
FlinkX中指标名称 | prometheus中指标名称 |
---|---|
numRead | flink_taskmanager_job_task_operator_flinkx_byteRead |
byteRead | flink_taskmanager_job_task_operator_flinkx_byteRead |
readDuration | flink_taskmanager_job_task_operator_flinkx_readDuration |
numWrite | flink_taskmanager_job_task_operator_flinkx_numWrite |
byteWrite | flink_taskmanager_job_task_operator_flinkx_byteWrite |
writeDuration | flink_taskmanager_job_task_operator_flinkx_writeDuration |
nErrors | flink_taskmanager_job_task_operator_flinkx_nErrors |
nullErrors | flink_taskmanager_job_task_operator_flinkx_nullErrors |
duplicateErrors | flink_taskmanager_job_task_operator_flinkx_duplicateErrors |
conversionErrors | flink_taskmanager_job_task_operator_flinkx_conversionErrors |
otherErrors | flink_taskmanager_job_task_operator_flinkx_otherErrors |