|
18 | 18 |
|
19 | 19 | package com.dtstack.chunjun.connector.ftp.table;
|
20 | 20 |
|
| 21 | +import com.dtstack.chunjun.conf.FieldConf; |
21 | 22 | import com.dtstack.chunjun.connector.ftp.conf.FtpConfig;
|
22 | 23 | import com.dtstack.chunjun.connector.ftp.options.FtpOptions;
|
23 | 24 | import com.dtstack.chunjun.connector.ftp.sink.FtpDynamicTableSink;
|
|
38 | 39 | import org.apache.flink.table.factories.DynamicTableSourceFactory;
|
39 | 40 | import org.apache.flink.table.factories.FactoryUtil;
|
40 | 41 | import org.apache.flink.table.factories.SerializationFormatFactory;
|
| 42 | +import org.apache.flink.table.types.logical.RowType; |
41 | 43 | import org.apache.flink.table.utils.TableSchemaUtils;
|
42 | 44 |
|
| 45 | +import java.util.ArrayList; |
43 | 46 | import java.util.HashSet;
|
| 47 | +import java.util.List; |
44 | 48 | import java.util.Set;
|
45 | 49 |
|
46 | 50 | /**
|
@@ -94,7 +98,18 @@ public DynamicTableSource createDynamicTableSource(Context context) {
|
94 | 98 | helper.discoverDecodingFormat(
|
95 | 99 | DeserializationFormatFactory.class, FtpOptions.FORMAT);
|
96 | 100 |
|
| 101 | + RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); |
97 | 102 | FtpConfig ftpConfig = getFtpConfByOptions(config);
|
| 103 | + String[] fieldNames = physicalSchema.getFieldNames(); |
| 104 | + List<FieldConf> columnList = new ArrayList<>(fieldNames.length); |
| 105 | + for (int i = 0; i < fieldNames.length; i++) { |
| 106 | + FieldConf field = new FieldConf(); |
| 107 | + field.setName(fieldNames[i]); |
| 108 | + field.setType(rowType.getTypeAt(i).asSummaryString()); |
| 109 | + field.setIndex(i); |
| 110 | + columnList.add(field); |
| 111 | + } |
| 112 | + ftpConfig.setColumn(columnList); |
98 | 113 |
|
99 | 114 | return new FtpDynamicTableSource(physicalSchema, ftpConfig, decodingFormat);
|
100 | 115 | }
|
|
0 commit comments