Skip to content

Commit a3fc935

Browse files
Paddy0523FlechazoW
authored andcommitted
[feat-#846][hdfs]hdfs sql support partitionColumn
1 parent d6c7f89 commit a3fc935

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsDynamicTableSource.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,17 @@ public class HdfsDynamicTableSource implements ScanTableSource {
4848

4949
private final HdfsConf hdfsConf;
5050
private final TableSchema tableSchema;
51+
private final List<String> partitionKeyList;
5152

5253
public HdfsDynamicTableSource(HdfsConf hdfsConf, TableSchema tableSchema) {
54+
this(hdfsConf, tableSchema, new ArrayList<>());
55+
}
56+
57+
public HdfsDynamicTableSource(
58+
HdfsConf hdfsConf, TableSchema tableSchema, List<String> partitionKeyList) {
5359
this.hdfsConf = hdfsConf;
5460
this.tableSchema = tableSchema;
61+
this.partitionKeyList = partitionKeyList;
5562
}
5663

5764
@Override
@@ -61,10 +68,14 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
6168
String[] fieldNames = tableSchema.getFieldNames();
6269
List<FieldConf> columnList = new ArrayList<>(fieldNames.length);
6370
for (int i = 0; i < fieldNames.length; i++) {
71+
String fieldName = fieldNames[i];
6472
FieldConf field = new FieldConf();
65-
field.setName(fieldNames[i]);
73+
field.setName(fieldName);
6674
field.setType(rowType.getTypeAt(i).asSummaryString());
6775
field.setIndex(i);
76+
if (partitionKeyList.contains(fieldName)) {
77+
field.setPart(true);
78+
}
6879
columnList.add(field);
6980
}
7081
hdfsConf.setColumn(columnList);

chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/table/HdfsDynamicTableFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.flink.table.utils.TableSchemaUtils;
3737

3838
import java.util.HashSet;
39+
import java.util.List;
3940
import java.util.Set;
4041

4142
/**
@@ -94,11 +95,12 @@ public DynamicTableSource createDynamicTableSource(Context context) {
9495
TableSchema physicalSchema =
9596
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
9697
HdfsConf hdfsConf = getHdfsConf(config);
98+
List<String> partitionKeys = context.getCatalogTable().getPartitionKeys();
9799
hdfsConf.setParallelism(config.get(SourceOptions.SCAN_PARALLELISM));
98100
hdfsConf.setHadoopConfig(
99101
HdfsOptions.getHadoopConfig(context.getCatalogTable().getOptions()));
100102

101-
return new HdfsDynamicTableSource(hdfsConf, physicalSchema);
103+
return new HdfsDynamicTableSource(hdfsConf, physicalSchema, partitionKeys);
102104
}
103105

104106
@Override

0 commit comments

Comments
 (0)