1.使用git工具把项目clone到本地
git clone https://github.com/DTStack/flinkx.git
cd flinkx
在flinkx home目录下执行
mvn clean package -DskipTests
或者执行
sh build/build.sh
解决办法:在$FLINKX_HOME/jars目录下有这些驱动包,可以手动安装,也可以使用插件提供的脚本安装:
## windows平台
./install_jars.bat
## unix平台
./install_jars.sh
NOTE:项目中的flinkx-examples模块下提供了大量 数据同步案例 和 SQL案例
首先准备要运行的任务json,这里以stream插件为例(flinkx-examples
文件夹下有大量案例):
{
"job": {
"content": [
{
"reader": {
"parameter": {
"column": [
{
"name": "id",
"type": "id"
},
{
"name": "name",
"type": "string"
},
{
"name": "content",
"type": "string"
}
],
"sliceRecordCount": ["30"],
"permitsPerSecond": 1
},
"table": {
"tableName": "sourceTable"
},
"name": "streamreader"
},
"writer": {
"parameter": {
"column": [
{
"name": "id",
"type": "id"
},
{
"name": "name",
"type": "string"
},
{
"name": "content",
"type": "timestamp"
}
],
"print": true
},
"table": {
"tableName": "sinkTable"
},
"name": "streamwriter"
},
"transformer": {
"transformSql": "select id,name, NOW() from sourceTable where CHAR_LENGTH(name) < 50 and CHAR_LENGTH(content) < 50"
}
}
],
"setting": {
"errorLimit": {
"record": 100
},
"speed": {
"bytes": 0,
"channel": 1,
"readerChannel": 1,
"writerChannel": 1
}
}
}
}
NOTE:flinkX和flinkSql connector共用
或者准备要运行的flinksql任务,这里以stream插件为例(flinkx-examples
文件夹下有大量案例):
CREATE TABLE source
(
id INT,
name STRING,
money DECIMAL(32, 2),
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp(6),
datenigth timestamp(9),
dtdate date,
dttime time
) WITH (
'connector' = 'stream-x',
'number-of-rows' = '10', -- 输入条数,默认无限
'rows-per-second' = '1' -- 每秒输入条数,默认不限制
);
CREATE TABLE sink
(
id INT,
name STRING,
money DECIMAL(32, 2),
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp(6),
datenigth timestamp(9),
dtdate date,
dttime time
) WITH (
'connector' = 'stream-x',
'print' = 'true'
);
insert into sink
select *
from source;
命令模板:
bin/flinkx \
-mode local \
-jobType sync \
-job flinkx-local-test/src/main/demo/json/stream/stream.json \
-flinkxDistDir flinkx-dist
可以在flink-conf.yaml配置文件里配置端口:
## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888
使用下面的命令运行任务:
bin/flinkx \
-mode local \
-jobType sync \
-job flinkx-local-test/src/main/demo/json/stream/stream.json \
-flinkxDistDir flinkx-dist
任务运行后可以通过8888端口访问flink界面查看任务运行情况:
NOTE:将flinkx-dist目录拷贝到$FLINK_HOME/lib下,并修改$FLINK_HOME/conf/flink-conf.yml中的classloader为classloader.resolve-order: parent-first
命令模板:
bin/flinkx \
-mode standalone \
-jobType sync \
-job flinkx-local-test/src/main/demo/json/stream/stream.json \
-flinkxDistDir flinkx-dist \
-flinkConfDir $FLINK_HOME/conf \
-confProp "{\"flink.checkpoint.interval\":60000}"
首先启动flink集群:
# flink集群默认端口是8081
$FLINK_HOME/bin/start-cluster.sh
通过8081端口检查集群是否启动成功
把任务提交到集群上运行:
./bin/flinkx \
-mode standalone \
-jobType sync \
-flinkxDistDir flinkx-dist \
-job flinkx-local-test/src/main/demo/json/stream/stream.json \
-flinkConfDir $FLINK_HOME/conf
在集群上查看任务运行情况
NOTE:可以先在现在flinkx-clients模块YarnSessionClientUtil类中启动一个session,然后修改$FLINK_HOME/conf/flink-conf.yml中的classloader为classloader.resolve-order: parent-first
命令示例:
bin/flinkx \
-mode yarn-session \
-jobType sync \
-job flinkx-local-test/src/main/demo/json/stream/stream.json \
-flinkxDistDir flinkx-dist \
-flinkConfDir $FLINK_HOME/conf \
-hadoopConfDir $HADOOP_HOME/etc/hadoop \
-confProp "{\"flink.checkpoint.interval\":60000}"
首先确保yarn集群是可用的,然后手动启动一个yarn session:
$FLINK_HOME/bin/yarn-session.sh -n 1 -s 1 -jm 1024 -tm 1024
把任务提交到这个yarn session上:
bin/flinkx \
-mode yarn-session \
-jobType sync \
-job flinkx-local-test/src/main/demo/json/stream/stream.json \
-flinkConfDir $FLINK_HOME/conf \
-flinkxDistDir flinkx-dist \
-hadoopConfDir $HADOOP_HOME/etc/hadoop
然后在flink界面查看任务运行情况:
命令示例:
bin/flinkx \
-mode yarn-per-job \
-jobType sync \
-job flinkx-local-test/src/main/demo/json/stream/stream.json \
-flinkxDistDir flinkx-dist \
-flinkConfDir $FLINK_HOME/conf \
-hadoopConfDir $HADOOP_HOME/etc/hadoop \
-flinkLibDir $FLINK_HOME/lib \
-confProp "{\"flink.checkpoint.interval\":60000,\"yarn.application.queue\":\"default\"}" \
首先确保yarn集群是可用的,启动一个Yarn Application运行任务:
bin/flinkx \
-mode yarn-per-job \
-jobType sync \
-job flinkx-local-test/src/main/demo/json/stream/stream.json \
-flinkxDistDir flinkx-dist \
-hadoopConfDir $HADOOP_HOME/etc/hadoop \
-flinkLibDir $FLINK_HOME/lib \
然后在集群上查看任务运行情况
命令示例:
bin/flinkx \
-mode kubernetes-session \
-jobType sync \
-job flinkx-local-test/src/main/demo/json/stream/stream.json \
-jobName kubernetes-job \
-jobType sync \
-flinkxDistDir flinkx-dist \
-flinkLibDir $FLINK_HOME/lib \
-flinkConfDir $FLINK_HOME/conf \
-confProp "{\"kubernetes.config.file\":\"${kubernetes_config_path}\",\"kubernetes.cluster-id\":\"${cluster_id}\",\"kubernetes.namespace\":\"${namespace}\"}"
需要提前手动在kubernetes上启动kubernetes session
$FLINK_HOME/bin/kubernetes-session.sh -Dkubernetes.cluster-id=flink-session-test -Dclassloader.resolve-order=parent-first -Dkubernetes.container.image=${image_name}
注意:需要提前构建flinkx镜像 flinkx镜像构建说明
命令示例:
bin/flinkx \
-mode kubernetes-application \
-jobType sync \
-job flinkx-local-test/src/main/demo/json/stream/stream.json \
-jobName kubernetes-job \
-jobType sync \
-flinkxDistDir flinkx-dist \
-remotePluginPath /opt/flinkx-dist \
-pluginLoadMode classpath \
-flinkLibDir $FLINK_HOME/lib \
-flinkConfDir $FLINK_HOME/conf \
-confProp "{\"kubernetes.config.file\":\"${kubernetes_config_path}\",\"kubernetes.container.image\":\"${image_name}\",\"kubernetes.namespace\":\"${namespace}\"}"
注意:需要提前构建flinkx镜像 flinkx镜像构建说明
名称 | 说明 | 可选值 | 是否必填 | 默认值 |
---|---|---|---|---|
mode | 执行模式,也就是flink集群的工作模式 | 1.local: 本地模式 2.standalone: 独立部署模式的flink集群 3.yarn-session: yarn-session模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster" 4.yarn-per-job: yarn模式的flink集群,单独为当前任务启动一个flink session,使用默认名称"Flink per-job cluster" 5.kubernetes-session: kubernetes session模式提交任务,需要提前在kubernetes上启动flink session 6.kubernetes-application: kubernetes run application模式提交任务 |
否 | local |
jobType | 任务类型 | 1.sync:数据同步任务 2.sql:flinksql任务 |
是 | 无 |
job | 同步、flinksql任务描述文件的存放路径;该描述文件中使用json、sql存放任务信息 | 无 | 是 | 无 |
jobName | 任务名称 | 无 | 否 | Flink Job |
flinkxDistDir | 插件根目录地址,也就是打包后产生的flinkx-dist目录。 | 无 | 否 | $FLINKX_HOME/flinkx-dist |
flinkConfDir | flink配置文件所在的目录(单机模式下不需要) | $FLINK_HOME/conf | 否 | $FLINK_HOME/conf |
flinkLibDir | flink lib所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.10.1/lib | $FLINK_HOME/lib | 否 | $FLINK_HOME/lib |
hadoopConfDir | Hadoop配置文件(包括hdfs和yarn)所在的目录 | $HADOOP_HOME/etc/hadoop | 否 | $HADOOP_HOME/etc/hadoop |
pluginLoadMode | yarn session模式插件加载方式 | 1.classpath:提交任务时不上传插件包,需要在yarn-node节点flinkx-dist目录下部署插件包,但任务启动速度较快,session模式建议使用 2.shipfile:提交任务时上传flinkx-dist目录下部署插件包的插件包,yarn-node节点不需要部署插件包,任务启动速度取决于插件包的大小及网络环境,yarnPer模式建议使用 |
否 | shipfile |
confProp | flink官方所有配置参数 | 否 | 无 | |
p | 自定义入参,用于替换脚本中的占位符,如脚本中存在占位符${pt1},${pt2},则该参数可配置为pt1=20200101,pt2=20200102 | 否 | 无 |