Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public class FlinkTableSource

// projection push down
@Nullable private int[] projectedFields;
@Nullable private int[][] lakeProjectedFields;

@Nullable private GenericRowData singleRowFilter;

Expand All @@ -170,6 +171,8 @@ public class FlinkTableSource
private final Map<String, String> tableOptions;

@Nullable private LakeSource<LakeSplit> lakeSource;
private boolean lakeFiltersPushedDown;
private List<Predicate> lakePredicates = Collections.emptyList();
@Nullable private Predicate logRecordBatchFilter;

/** Watermark strategy that is pushed down by the Flink optimizer. */
Expand Down Expand Up @@ -529,7 +532,17 @@ public DynamicTableSource copy() {
source.singleRowFilter = singleRowFilter;
source.modificationScanType = modificationScanType;
source.partitionFilters = partitionFilters;
source.lakeSource = lakeSource;
source.lakeProjectedFields = lakeProjectedFields;
source.lakeFiltersPushedDown = lakeFiltersPushedDown;
source.lakePredicates = new ArrayList<>(lakePredicates);
if (source.lakeSource != null) {
if (source.lakeProjectedFields != null) {
source.lakeSource.withProject(source.lakeProjectedFields);
}
if (source.lakeFiltersPushedDown) {
source.lakeSource.withFilters(source.lakePredicates);
}
}
source.logRecordBatchFilter = logRecordBatchFilter;
source.watermarkStrategy = watermarkStrategy;
// Note: availableStatsColumns is already computed in the constructor
Expand All @@ -549,6 +562,7 @@ public boolean supportsNestedProjection() {
@Override
public void applyProjection(int[][] projectedFields, DataType producedDataType) {
this.projectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray();
this.lakeProjectedFields = projectedFields;
this.producedDataType = producedDataType.getLogicalType();
if (lakeSource != null) {
lakeSource.withProject(projectedFields);
Expand Down Expand Up @@ -710,12 +724,16 @@ private void pushdownLakeFilters(
}
}

LakeSource.FilterPushDownResult filterPushDownResult =
checkNotNull(lakeSource).withFilters(lakePredicates);
this.lakePredicates =
lakePredicates.isEmpty()
? Collections.emptyList()
: new ArrayList<>(lakePredicates);
this.lakeFiltersPushedDown = true;
if (lakePredicates.isEmpty()) {
return;
}

LakeSource.FilterPushDownResult filterPushDownResult =
checkNotNull(lakeSource).withFilters(lakePredicates);
Set<Predicate> acceptedLakePredicates =
Collections.newSetFromMap(new IdentityHashMap<Predicate, Boolean>());
acceptedLakePredicates.addAll(filterPushDownResult.acceptedPredicates());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.fluss.config.TableConfig;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
import org.apache.fluss.lake.values.TestingValuesLakeSource;
import org.apache.fluss.lake.values.TestingValuesLakeStorage;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.row.BinaryString;
Expand All @@ -43,6 +46,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -356,6 +360,107 @@ void testEmptyFilters() {
}
}

@Nested
class LakeSourcePushDownTests {
private FlinkTableSource tableSource;
private TablePath tablePath;

@BeforeEach
void setUp() {
RowType tableOutputType =
(RowType)
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("value", DataTypes.BIGINT()))
.getLogicalType();

tablePath = TablePath.of("test_db", "test_lake_table");
Configuration flussConfig = new Configuration();
flussConfig.setString(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key(), "localhost:9092");

Configuration tableConfig = new Configuration();
tableConfig.set(ConfigOptions.TABLE_STATISTICS_COLUMNS, "*");

FlinkConnectorOptionsUtils.StartupOptions startupOptions =
new FlinkConnectorOptionsUtils.StartupOptions();
startupOptions.startupMode = FlinkConnectorOptions.ScanStartupMode.EARLIEST;

Map<String, String> tableOptions = Maps.newHashMap();
tableOptions.put(
ConfigOptions.TABLE_DATALAKE_FORMAT.key(), DataLakeFormat.LANCE.toString());

tableSource =
new FlinkTableSource(
tablePath,
flussConfig,
new TableConfig(tableConfig),
tableOutputType,
new int[] {}, // no primary key indexes
new int[] {}, // bucket key indexes
new int[] {}, // partition key indexes
true, // streaming
startupOptions,
false, // lookup async
false, // insert if not exists
null, // cache
1000L, // scan partition discovery interval
true, // is data lake enabled
null, // merge engine type
tableOptions,
null); // lease context
}

@Test
void testCopyUsesIndependentLakeSourceWithReplayedState() {
int[][] project = new int[][] {new int[] {0}, new int[] {2}};
tableSource.applyProjection(
project,
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("value", DataTypes.BIGINT())));

FieldReferenceExpression idFieldRef =
new FieldReferenceExpression("id", DataTypes.INT(), 0, 0);
CallExpression greaterThanCall =
new CallExpression(
BuiltInFunctionDefinitions.GREATER_THAN,
Arrays.asList(idFieldRef, new ValueLiteralExpression(5)),
DataTypes.BOOLEAN());

tableSource.applyFilters(Collections.singletonList(greaterThanCall));

TestingValuesLakeSource originalLakeSource =
TestingValuesLakeStorage.getLatestLakeSource(tablePath);
List<Predicate> originalPredicates =
originalLakeSource == null
? Collections.emptyList()
: originalLakeSource.getPredicates();
assertThat(originalLakeSource).isNotNull();
assertThat(originalPredicates).hasSize(1);

FlinkTableSource copiedSource = (FlinkTableSource) tableSource.copy();
TestingValuesLakeSource copiedLakeSource =
TestingValuesLakeStorage.getLatestLakeSource(tablePath);
assertThat(copiedLakeSource).isNotNull();
assertThat(copiedLakeSource).isNotSameAs(originalLakeSource);
assertThat(copiedLakeSource.getProject()).isDeepEqualTo(project);
assertThat(copiedLakeSource.getPredicates()).isEqualTo(originalPredicates);

CallExpression lessThanCall =
new CallExpression(
BuiltInFunctionDefinitions.LESS_THAN,
Arrays.asList(idFieldRef, new ValueLiteralExpression(10)),
DataTypes.BOOLEAN());

copiedSource.applyFilters(Collections.singletonList(lessThanCall));

assertThat(originalLakeSource.getPredicates()).isEqualTo(originalPredicates);
assertThat(copiedLakeSource.getPredicates()).hasSize(1);
assertThat(copiedLakeSource.getPredicates()).isNotEqualTo(originalPredicates);
}
}

@Nested
class PartitionedKvTableTests {
private FlinkTableSource tableSource;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.lake.values;

import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.lake.source.Planner;
import org.apache.fluss.lake.source.RecordReader;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.utils.CloseableIterator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/** Testing values lake source for Flink connector tests. */
public class TestingValuesLakeSource implements LakeSource<LakeSplit> {

private List<Predicate> predicates = Collections.emptyList();
private int[][] project;

@Override
public void withProject(int[][] project) {
this.project = project;
}

@Override
public void withLimit(int limit) {}

@Override
public FilterPushDownResult withFilters(List<Predicate> predicates) {
this.predicates = new ArrayList<>(predicates);
return FilterPushDownResult.of(predicates, Collections.emptyList());
}

@Override
public Planner<LakeSplit> createPlanner(PlannerContext context) {
return Collections::emptyList;
}

@Override
public RecordReader createRecordReader(ReaderContext<LakeSplit> context) {
return CloseableIterator::emptyIterator;
}

@Override
public SimpleVersionedSerializer<LakeSplit> getSplitSerializer() {
return new SimpleVersionedSerializer<LakeSplit>() {
@Override
public int getVersion() {
return 0;
}

@Override
public byte[] serialize(LakeSplit obj) throws IOException {
return new byte[0];
}

@Override
public LakeSplit deserialize(int version, byte[] serialized) throws IOException {
throw new IOException("Unsupported testing split deserialization.");
}
};
}

public List<Predicate> getPredicates() {
return predicates;
}

public int[][] getProject() {
return project;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@
import org.apache.fluss.lake.writer.LakeTieringFactory;
import org.apache.fluss.metadata.TablePath;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** Implementation of {@link LakeStorage} for values lake. */
public class TestingValuesLakeStorage implements LakeStorage {

private static final Map<TablePath, TestingValuesLakeSource> LAKE_SOURCES =
new ConcurrentHashMap<>();

@Override
public LakeTieringFactory<?, ?> createLakeTieringFactory() {
return new TestingValuesLakeTieringFactory();
Expand All @@ -39,6 +46,12 @@ public LakeCatalog createLakeCatalog() {

@Override
public LakeSource<?> createLakeSource(TablePath tablePath) {
throw new UnsupportedOperationException("Not impl.");
TestingValuesLakeSource lakeSource = new TestingValuesLakeSource();
LAKE_SOURCES.put(tablePath, lakeSource);
return lakeSource;
}

public static TestingValuesLakeSource getLatestLakeSource(TablePath tablePath) {
return LAKE_SOURCES.get(tablePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public void withLimit(int limit) {
public FilterPushDownResult withFilters(List<Predicate> predicates) {
List<Predicate> unConsumedPredicates = new ArrayList<>();
List<Predicate> consumedPredicates = new ArrayList<>();
if (predicates.isEmpty()) {
filter = null;
return FilterPushDownResult.of(consumedPredicates, unConsumedPredicates);
}
List<Expression> converted = new ArrayList<>();
Schema schema = getSchema(tablePath);
for (Predicate predicate : predicates) {
Expand All @@ -82,9 +86,7 @@ public FilterPushDownResult withFilters(List<Predicate> predicates) {
unConsumedPredicates.add(predicate);
}
}
if (!converted.isEmpty()) {
filter = converted.stream().reduce(Expressions::and).orElse(null);
}
filter = converted.stream().reduce(Expressions::and).orElse(null);
return FilterPushDownResult.of(consumedPredicates, unConsumedPredicates);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,64 @@ void testWithFilters() throws Exception {
assertThat(filterPushDownResult.remainingPredicates().toString())
.isEqualTo(allFilters.toString());
}

@Test
void testUnacceptedFiltersClearPreviousAcceptedFilters() throws Exception {
TablePath tablePath = TablePath.of("fluss", "test_clear_previous_filters");
createTable(tablePath, SCHEMA, PARTITION_SPEC);

Table table = getTable(tablePath);
List<Record> rows = new ArrayList<>();
for (int i = 1; i <= 4; i++) {
rows.add(
createIcebergRecord(
SCHEMA,
i,
"name" + i,
0,
(long) i,
OffsetDateTime.now(ZoneOffset.UTC)));
}
writeRecord(table, rows, null, 0);
table.refresh();

Predicate acceptedFilter = FLUSS_BUILDER.greaterOrEqual(0, 3);
Predicate nonConvertibleFilter = FLUSS_BUILDER.endsWith(1, BinaryString.fromString("name"));

LakeSource<IcebergSplit> lakeSource = lakeStorage.createLakeSource(tablePath);
LakeSource.FilterPushDownResult filterPushDownResult =
lakeSource.withFilters(Collections.singletonList(acceptedFilter));
assertThat(filterPushDownResult.acceptedPredicates())
.containsExactlyInAnyOrder(acceptedFilter);
assertThat(readRows(lakeSource, table.currentSnapshot().snapshotId()).toString())
.isEqualTo("[+I[3, name3], +I[4, name4]]");

filterPushDownResult =
lakeSource.withFilters(Collections.singletonList(nonConvertibleFilter));
assertThat(filterPushDownResult.acceptedPredicates()).isEmpty();
assertThat(filterPushDownResult.remainingPredicates())
.containsExactly(nonConvertibleFilter);

assertThat(readRows(lakeSource, table.currentSnapshot().snapshotId()).toString())
.isEqualTo("[+I[1, name1], +I[2, name2], +I[3, name3], +I[4, name4]]");
}

private List<Row> readRows(LakeSource<IcebergSplit> lakeSource, long snapshotId)
throws Exception {
List<Row> actual = new ArrayList<>();
org.apache.fluss.row.InternalRow.FieldGetter[] fieldGetters =
org.apache.fluss.row.InternalRow.createFieldGetters(
RowType.of(new IntType(), new StringType()));
for (IcebergSplit icebergSplit : lakeSource.createPlanner(() -> snapshotId).plan()) {
RecordReader recordReader = lakeSource.createRecordReader(() -> icebergSplit);
try (CloseableIterator<LogRecord> iterator = recordReader.read()) {
actual.addAll(
convertToFlinkRow(
fieldGetters,
TransformingCloseableIterator.transform(
iterator, LogRecord::getRow)));
}
}
return actual;
}
}
Loading