Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ void testCreateTableWithInvalidProperty() {
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessageContaining(
"Currently, Primary Key Table only supports ARROW log format if kv format is COMPACTED.");
"Currently, Primary Key Table supports ARROW or COMPACTED log format when kv format is COMPACTED.");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,11 @@ void testPutAndPoll(String kvFormat) throws Exception {
verifyAppendOrPut(false, "ARROW", kvFormat);
}

@Test
void testPutAndPollCompacted() throws Exception {
verifyAppendOrPut(false, "COMPACTED", "COMPACTED");
}

void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvFormat)
throws Exception {
Schema schema =
Expand Down Expand Up @@ -1384,4 +1389,233 @@ void testFileSystemRecognizeConnectionConf() throws Exception {
Collections.singletonMap("client.fs.test.key", "fs_test_value"));
}
}

// ---------------------- PK with COMPACTED log tests ----------------------
@Test
void testPkUpsertAndPollWithCompactedLog() throws Exception {
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
.column("c", DataTypes.STRING())
.column("d", DataTypes.BIGINT())
.primaryKey("a")
.build();
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(schema)
.kvFormat(KvFormat.COMPACTED)
.logFormat(LogFormat.COMPACTED)
.build();
TablePath tablePath = TablePath.of("test_db_1", "test_pk_compacted_upsert_poll");
createTable(tablePath, tableDescriptor, false);

int expectedSize = 30;
try (Table table = conn.getTable(tablePath)) {
UpsertWriter upsertWriter = table.newUpsert().createWriter();
for (int i = 0; i < expectedSize; i++) {
String value = i % 2 == 0 ? "hello, friend" + i : null;
GenericRow r = row(i, 100, value, i * 10L);
upsertWriter.upsert(r);
if (i % 10 == 0) {
upsertWriter.flush();
}
}
upsertWriter.flush();

LogScanner logScanner = createLogScanner(table);
subscribeFromBeginning(logScanner, table);
int count = 0;
while (count < expectedSize) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
for (ScanRecord scanRecord : scanRecords) {
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
InternalRow rr = scanRecord.getRow();
assertThat(rr.getFieldCount()).isEqualTo(4);
assertThat(rr.getInt(0)).isEqualTo(count);
assertThat(rr.getInt(1)).isEqualTo(100);
if (count % 2 == 0) {
assertThat(rr.getString(2).toString()).isEqualTo("hello, friend" + count);
} else {
assertThat(rr.isNullAt(2)).isTrue();
}
assertThat(rr.getLong(3)).isEqualTo(count * 10L);
count++;
}
}
assertThat(count).isEqualTo(expectedSize);
logScanner.close();
}
}

@Test
void testPkUpdateAndDeleteWithCompactedLog() throws Exception {
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
.primaryKey("a")
.build();
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(schema)
.kvFormat(KvFormat.COMPACTED)
.logFormat(LogFormat.COMPACTED)
.build();
TablePath tablePath = TablePath.of("test_db_1", "test_pk_compacted_update_delete");
createTable(tablePath, tableDescriptor, false);

try (Table table = conn.getTable(tablePath)) {
UpsertWriter upsertWriter = table.newUpsert().createWriter();
// initial insert
upsertWriter.upsert(row(1, 10));
upsertWriter.flush();
// update same key
upsertWriter.upsert(row(1, 20));
upsertWriter.flush();
// delete the key
upsertWriter.delete(row(1, 20));
upsertWriter.flush();

LogScanner scanner = createLogScanner(table);
subscribeFromBeginning(scanner, table);
// Expect: +I(1,10), -U(1,10), +U(1,20), -D(1,20)
ChangeType[] expected = {
ChangeType.INSERT,
ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER,
ChangeType.DELETE
};
int seen = 0;
while (seen < expected.length) {
ScanRecords recs = scanner.poll(Duration.ofSeconds(1));
for (ScanRecord r : recs) {
assertThat(r.getChangeType()).isEqualTo(expected[seen]);
InternalRow row = r.getRow();
assertThat(row.getInt(0)).isEqualTo(1);
if (expected[seen] == ChangeType.INSERT
|| expected[seen] == ChangeType.UPDATE_BEFORE
|| expected[seen] == ChangeType.UPDATE_AFTER) {
// value field present
if (expected[seen] == ChangeType.UPDATE_AFTER) {
assertThat(row.getInt(1)).isEqualTo(20);
} else {
assertThat(row.getInt(1)).isEqualTo(10);
}
}
seen++;
}
}
assertThat(seen).isEqualTo(expected.length);
scanner.close();
}
}

@Test
void testPkCompactedProject() throws Exception {
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
.column("c", DataTypes.STRING())
.primaryKey("a")
.build();
TableDescriptor td =
TableDescriptor.builder()
.schema(schema)
.kvFormat(KvFormat.COMPACTED)
.logFormat(LogFormat.COMPACTED)
.build();
TablePath path = TablePath.of("test_db_1", "test_pk_compacted_project");
createTable(path, td, false);

try (Table table = conn.getTable(path)) {
UpsertWriter upsert = table.newUpsert().createWriter();
for (int i = 0; i < 10; i++) {
String v = i % 2 == 0 ? "v" + i : null;
upsert.upsert(row(i, 100 + i, v));
}
upsert.flush();

// Creating a projected log scanner for COMPACTED should fail
assertThatThrownBy(() -> createLogScanner(table, new int[] {0, 2}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Projection is not supported for COMPACTED log format");
}
}

@Test
void testPkCompactedPollFromLatestNoRecords() throws Exception {
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
.primaryKey("a")
.build();
TableDescriptor td =
TableDescriptor.builder()
.schema(schema)
.kvFormat(KvFormat.COMPACTED)
.logFormat(LogFormat.COMPACTED)
.build();
TablePath path = TablePath.of("test_db_1", "test_pk_compacted_latest");
createTable(path, td, false);

try (Table table = conn.getTable(path)) {
LogScanner scanner = createLogScanner(table);
subscribeFromLatestOffset(path, null, null, table, scanner, admin);
// Now write a few rows and ensure only these are seen
UpsertWriter upsert = table.newUpsert().createWriter();
for (int i = 0; i < 5; i++) {
upsert.upsert(row(i, i));
}
upsert.flush();

int seen = 0;
while (seen < 5) {
ScanRecords recs = scanner.poll(Duration.ofSeconds(1));
for (ScanRecord r : recs) {
assertThat(r.getChangeType()).isEqualTo(ChangeType.INSERT);
assertThat(r.getRow().getInt(0)).isBetween(0, 4);
seen++;
}
}
scanner.close();
}
}

@Test
void testPkDeleteNonExistentEmitsNoRecords() throws Exception {
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
.primaryKey("a")
.build();
TableDescriptor td =
TableDescriptor.builder()
.schema(schema)
.kvFormat(KvFormat.COMPACTED)
.logFormat(LogFormat.COMPACTED)
.build();
TablePath path = TablePath.of("test_db_1", "test_pk_compacted_delete_missing");
createTable(path, td, false);

try (Table table = conn.getTable(path)) {
UpsertWriter upsert = table.newUpsert().createWriter();
// delete non-existent key
upsert.delete(row(42, 0));
upsert.flush();

LogScanner scanner = createLogScanner(table);
subscribeFromBeginning(scanner, table);
int total = 0;
// poll a few times to ensure no accidental records
for (int i = 0; i < 3; i++) {
total += scanner.poll(Duration.ofSeconds(1)).count();
}
assertThat(total).isEqualTo(0);
scanner.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ public class ConfigOptions {
.defaultValue(LogFormat.ARROW)
.withDescription(
"The format of the log records in log store. The default value is `arrow`. "
+ "The supported formats are `arrow` and `indexed`.");
+ "The supported formats are `arrow`, `indexed` and `compacted`.");

public static final ConfigOption<ArrowCompressionType> TABLE_LOG_ARROW_COMPRESSION_TYPE =
key("table.log.arrow.compression.type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package org.apache.fluss.metadata;

import org.apache.fluss.record.MemoryLogRecordsArrowBuilder;
import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
import org.apache.fluss.row.compacted.CompactedRow;
import org.apache.fluss.row.indexed.IndexedRow;

/** The format of the log records in log store. The supported formats are 'arrow' and 'indexed'. */
/**
* The format of the log records in log store. The supported formats are 'arrow', 'indexed' and
* 'compacted'.
*/
public enum LogFormat {

/**
Expand All @@ -41,18 +46,29 @@ public enum LogFormat {
*
* @see MemoryLogRecordsIndexedBuilder
*/
INDEXED;
INDEXED,

/**
* The log record batches are stored in {@link CompactedRow} format which is a compact
* row-oriented format optimized for primary key tables to reduce storage while trading CPU for
* reads.
*
* @see MemoryLogRecordsCompactedBuilder
*/
COMPACTED;

/**
* Creates a {@link LogFormat} from the given string. The string must be either 'arrow' or
* 'indexed'.
* Creates a {@link LogFormat} from the given string. The string must be either 'arrow',
* 'indexed' or 'compacted'.
*/
public static LogFormat fromString(String format) {
switch (format.toUpperCase()) {
case "ARROW":
return ARROW;
case "INDEXED":
return INDEXED;
case "COMPACTED":
return COMPACTED;
default:
throw new IllegalArgumentException("Unsupported log format: " + format);
}
Expand Down
Loading