File tree Expand file tree Collapse file tree 1 file changed +7
-1
lines changed Expand file tree Collapse file tree 1 file changed +7
-1
lines changed Original file line number Diff line number Diff line change @@ -86,7 +86,11 @@ fs.cosn.bucket.endpoint_suffix: cos.ap-guangzhou.myqcloud.com
86
86
4.在作业的 write 或 sink 路径中填写格式为:` ` ` cosn://bucket-appid/path```的路径信息即可,例如:
87
87
88
88
` ` ` java
89
- ...
89
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
90
+ // 采用 Streaming File Sink 写入的话,必须启用 checkpoint,这里使用 COS 作为 StateBackend 举例子,也可以使用其他 checkpoint storage。
91
+ env.setStateBackend(new FsStateBackend("cosn://bucket-name-125xxxxxx/checkpoint"));
92
+ env.enableCheckpointing(1000);
93
+ // 构造 Streaming File Sink 写入
90
94
StreamingFileSink<String> streamingFileSink =
91
95
StreamingFileSink.forRowFormat(
92
96
new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
@@ -100,6 +104,8 @@ fs.cosn.bucket.endpoint_suffix: cos.ap-guangzhou.myqcloud.com
100
104
...
101
105
` ` `
102
106
107
+ ⚠️**注意**:如果使用 Streaming File Sink 方式写入,需要同时启用 Flink 的 checkpoint,否则写入的数据始终处于 inprogress 不可见状态,无法被读取。
108
+
103
109
# ## 使用示例
104
110
105
111
以下给出 Flink Job 读写 COS 的示例代码:
You can’t perform that action at this time.
0 commit comments