From 75437c9938c8cd45dff36fd1bd813fd4751d89dc Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Thu, 28 Aug 2025 12:47:42 +0300 Subject: [PATCH] [kv] Supports compacted row as change log --- .../fluss/client/admin/FlussAdminITCase.java | 2 +- .../fluss/client/table/FlussTableITCase.java | 234 +++++++++++++++++ .../apache/fluss/config/ConfigOptions.java | 2 +- .../org/apache/fluss/metadata/LogFormat.java | 24 +- .../AbstractRowMemoryLogRecordsBuilder.java | 238 ++++++++++++++++++ .../fluss/record/CompactedLogRecord.java | 185 ++++++++++++++ .../fluss/record/DefaultLogRecordBatch.java | 29 +++ .../fluss/record/LogRecordReadContext.java | 31 +++ .../MemoryLogRecordsCompactedBuilder.java | 78 ++++++ .../MemoryLogRecordsIndexedBuilder.java | 196 +-------------- .../fluss/record/CompactedLogRecordTest.java | 89 +++++++ .../MemoryLogRecordsCompactedBuilderTest.java | 187 ++++++++++++++ .../org/apache/fluss/server/kv/KvTablet.java | 3 + .../server/kv/wal/CompactedWalBuilder.java | 92 +++++++ .../utils/TableDescriptorValidation.java | 8 +- 15 files changed, 1203 insertions(+), 195 deletions(-) create mode 100644 fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/record/CompactedLogRecord.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java create mode 100644 fluss-common/src/test/java/org/apache/fluss/record/CompactedLogRecordTest.java create mode 100644 fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilderTest.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/kv/wal/CompactedWalBuilder.java diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index bd79542844..62026ca2f5 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -659,7 +659,7 @@ void testCreateTableWithInvalidProperty() { .cause() .isInstanceOf(InvalidConfigException.class) .hasMessageContaining( - "Currently, Primary Key Table only supports ARROW log format if kv format is COMPACTED."); + "Currently, Primary Key Table supports ARROW or COMPACTED log format when kv format is COMPACTED."); } @Test diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index cf20846756..62d4209c0b 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -730,6 +730,11 @@ void testPutAndPoll(String kvFormat) throws Exception { verifyAppendOrPut(false, "ARROW", kvFormat); } + @Test + void testPutAndPollCompacted() throws Exception { + verifyAppendOrPut(false, "COMPACTED", "COMPACTED"); + } + void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvFormat) throws Exception { Schema schema = @@ -1384,4 +1389,233 @@ void testFileSystemRecognizeConnectionConf() throws Exception { Collections.singletonMap("client.fs.test.key", "fs_test_value")); } } + + // ---------------------- PK with COMPACTED log tests ---------------------- + @Test + void testPkUpsertAndPollWithCompactedLog() throws Exception { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .column("c", DataTypes.STRING()) + .column("d", DataTypes.BIGINT()) + .primaryKey("a") + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .kvFormat(KvFormat.COMPACTED) + .logFormat(LogFormat.COMPACTED) + .build(); + TablePath tablePath = TablePath.of("test_db_1", "test_pk_compacted_upsert_poll"); + createTable(tablePath, tableDescriptor, false); + + int expectedSize = 30; + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + for (int i = 0; i < expectedSize; i++) { + String value = i % 2 == 0 ? "hello, friend" + i : null; + GenericRow r = row(i, 100, value, i * 10L); + upsertWriter.upsert(r); + if (i % 10 == 0) { + upsertWriter.flush(); + } + } + upsertWriter.flush(); + + LogScanner logScanner = createLogScanner(table); + subscribeFromBeginning(logScanner, table); + int count = 0; + while (count < expectedSize) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (ScanRecord scanRecord : scanRecords) { + assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT); + InternalRow rr = scanRecord.getRow(); + assertThat(rr.getFieldCount()).isEqualTo(4); + assertThat(rr.getInt(0)).isEqualTo(count); + assertThat(rr.getInt(1)).isEqualTo(100); + if (count % 2 == 0) { + assertThat(rr.getString(2).toString()).isEqualTo("hello, friend" + count); + } else { + assertThat(rr.isNullAt(2)).isTrue(); + } + assertThat(rr.getLong(3)).isEqualTo(count * 10L); + count++; + } + } + assertThat(count).isEqualTo(expectedSize); + logScanner.close(); + } + } + + @Test + void testPkUpdateAndDeleteWithCompactedLog() throws Exception { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .primaryKey("a") + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .kvFormat(KvFormat.COMPACTED) + .logFormat(LogFormat.COMPACTED) + .build(); + TablePath tablePath = TablePath.of("test_db_1", "test_pk_compacted_update_delete"); + createTable(tablePath, tableDescriptor, false); + + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + // initial insert + upsertWriter.upsert(row(1, 10)); + upsertWriter.flush(); + // update same key + upsertWriter.upsert(row(1, 20)); + upsertWriter.flush(); + // delete the key + upsertWriter.delete(row(1, 20)); + upsertWriter.flush(); + + LogScanner scanner = createLogScanner(table); + subscribeFromBeginning(scanner, table); + // Expect: +I(1,10), -U(1,10), +U(1,20), -D(1,20) + ChangeType[] expected = { + ChangeType.INSERT, + ChangeType.UPDATE_BEFORE, + ChangeType.UPDATE_AFTER, + ChangeType.DELETE + }; + int seen = 0; + while (seen < expected.length) { + ScanRecords recs = scanner.poll(Duration.ofSeconds(1)); + for (ScanRecord r : recs) { + assertThat(r.getChangeType()).isEqualTo(expected[seen]); + InternalRow row = r.getRow(); + assertThat(row.getInt(0)).isEqualTo(1); + if (expected[seen] == ChangeType.INSERT + || expected[seen] == ChangeType.UPDATE_BEFORE + || expected[seen] == ChangeType.UPDATE_AFTER) { + // value field present + if (expected[seen] == ChangeType.UPDATE_AFTER) { + assertThat(row.getInt(1)).isEqualTo(20); + } else { + assertThat(row.getInt(1)).isEqualTo(10); + } + } + seen++; + } + } + assertThat(seen).isEqualTo(expected.length); + scanner.close(); + } + } + + @Test + void testPkCompactedProject() throws Exception { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .column("c", DataTypes.STRING()) + .primaryKey("a") + .build(); + TableDescriptor td = + TableDescriptor.builder() + .schema(schema) + .kvFormat(KvFormat.COMPACTED) + .logFormat(LogFormat.COMPACTED) + .build(); + TablePath path = TablePath.of("test_db_1", "test_pk_compacted_project"); + createTable(path, td, false); + + try (Table table = conn.getTable(path)) { + UpsertWriter upsert = table.newUpsert().createWriter(); + for (int i = 0; i < 10; i++) { + String v = i % 2 == 0 ? "v" + i : null; + upsert.upsert(row(i, 100 + i, v)); + } + upsert.flush(); + + // Creating a projected log scanner for COMPACTED should fail + assertThatThrownBy(() -> createLogScanner(table, new int[] {0, 2})) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Projection is not supported for COMPACTED log format"); + } + } + + @Test + void testPkCompactedPollFromLatestNoRecords() throws Exception { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .primaryKey("a") + .build(); + TableDescriptor td = + TableDescriptor.builder() + .schema(schema) + .kvFormat(KvFormat.COMPACTED) + .logFormat(LogFormat.COMPACTED) + .build(); + TablePath path = TablePath.of("test_db_1", "test_pk_compacted_latest"); + createTable(path, td, false); + + try (Table table = conn.getTable(path)) { + LogScanner scanner = createLogScanner(table); + subscribeFromLatestOffset(path, null, null, table, scanner, admin); + // Now write a few rows and ensure only these are seen + UpsertWriter upsert = table.newUpsert().createWriter(); + for (int i = 0; i < 5; i++) { + upsert.upsert(row(i, i)); + } + upsert.flush(); + + int seen = 0; + while (seen < 5) { + ScanRecords recs = scanner.poll(Duration.ofSeconds(1)); + for (ScanRecord r : recs) { + assertThat(r.getChangeType()).isEqualTo(ChangeType.INSERT); + assertThat(r.getRow().getInt(0)).isBetween(0, 4); + seen++; + } + } + scanner.close(); + } + } + + @Test + void testPkDeleteNonExistentEmitsNoRecords() throws Exception { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .primaryKey("a") + .build(); + TableDescriptor td = + TableDescriptor.builder() + .schema(schema) + .kvFormat(KvFormat.COMPACTED) + .logFormat(LogFormat.COMPACTED) + .build(); + TablePath path = TablePath.of("test_db_1", "test_pk_compacted_delete_missing"); + createTable(path, td, false); + + try (Table table = conn.getTable(path)) { + UpsertWriter upsert = table.newUpsert().createWriter(); + // delete non-existent key + upsert.delete(row(42, 0)); + upsert.flush(); + + LogScanner scanner = createLogScanner(table); + subscribeFromBeginning(scanner, table); + int total = 0; + // poll a few times to ensure no accidental records + for (int i = 0; i < 3; i++) { + total += scanner.poll(Duration.ofSeconds(1)).count(); + } + assertThat(total).isEqualTo(0); + scanner.close(); + } + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index bcfeb061bc..b905b320ad 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1230,7 +1230,7 @@ public class ConfigOptions { .defaultValue(LogFormat.ARROW) .withDescription( "The format of the log records in log store. The default value is `arrow`. " - + "The supported formats are `arrow` and `indexed`."); + + "The supported formats are `arrow`, `indexed` and `compacted`."); public static final ConfigOption TABLE_LOG_ARROW_COMPRESSION_TYPE = key("table.log.arrow.compression.type") diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java b/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java index d1b6de4ba3..b5f460162f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java @@ -18,10 +18,15 @@ package org.apache.fluss.metadata; import org.apache.fluss.record.MemoryLogRecordsArrowBuilder; +import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder; import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder; +import org.apache.fluss.row.compacted.CompactedRow; import org.apache.fluss.row.indexed.IndexedRow; -/** The format of the log records in log store. The supported formats are 'arrow' and 'indexed'. */ +/** + * The format of the log records in log store. The supported formats are 'arrow', 'indexed' and + * 'compacted'. + */ public enum LogFormat { /** @@ -41,11 +46,20 @@ public enum LogFormat { * * @see MemoryLogRecordsIndexedBuilder */ - INDEXED; + INDEXED, + + /** + * The log record batches are stored in {@link CompactedRow} format which is a compact + * row-oriented format optimized for primary key tables to reduce storage while trading CPU for + * reads. + * + * @see MemoryLogRecordsCompactedBuilder + */ + COMPACTED; /** - * Creates a {@link LogFormat} from the given string. The string must be either 'arrow' or - * 'indexed'. + * Creates a {@link LogFormat} from the given string. The string must be either 'arrow', + * 'indexed' or 'compacted'. */ public static LogFormat fromString(String format) { switch (format.toUpperCase()) { @@ -53,6 +67,8 @@ public static LogFormat fromString(String format) { return ARROW; case "INDEXED": return INDEXED; + case "COMPACTED": + return COMPACTED; default: throw new IllegalArgumentException("Unsupported log format: " + format); } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java b/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java new file mode 100644 index 0000000000..f920b22a25 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.memory.AbstractPagedOutputView; +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.memory.MemorySegmentOutputView; +import org.apache.fluss.record.bytesview.BytesView; +import org.apache.fluss.record.bytesview.MultiBytesView; +import org.apache.fluss.utils.crc.Crc32C; + +import java.io.IOException; + +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH; +import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; +import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; +import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset; +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** Abstract base builder for row-based MemoryLogRecords builders sharing common logic. */ +abstract class AbstractRowMemoryLogRecordsBuilder implements AutoCloseable { + protected static final int BUILDER_DEFAULT_OFFSET = 0; + + protected final long baseLogOffset; + protected final int schemaId; + protected final int writeLimit; + protected final byte magic; + protected final AbstractPagedOutputView pagedOutputView; + protected final MemorySegment firstSegment; + protected final boolean appendOnly; + + private BytesView builtBuffer = null; + private long writerId; + private int batchSequence; + private int currentRecordNumber; + private int sizeInBytes; + private volatile boolean isClosed; + private boolean aborted = false; + + protected AbstractRowMemoryLogRecordsBuilder( + long baseLogOffset, + int schemaId, + int writeLimit, + byte magic, + AbstractPagedOutputView pagedOutputView, + boolean appendOnly) { + this.appendOnly = appendOnly; + checkArgument( + schemaId <= Short.MAX_VALUE, + "schemaId shouldn't be greater than the max value of short: " + Short.MAX_VALUE); + this.baseLogOffset = baseLogOffset; + this.schemaId = schemaId; + this.writeLimit = writeLimit; + this.magic = magic; + this.pagedOutputView = pagedOutputView; + this.firstSegment = pagedOutputView.getCurrentSegment(); + this.writerId = NO_WRITER_ID; + this.batchSequence = NO_BATCH_SEQUENCE; + this.currentRecordNumber = 0; + this.isClosed = false; + + // Skip header initially; will be written in build() + int headerSize = recordBatchHeaderSize(magic); + this.pagedOutputView.setPosition(headerSize); + this.sizeInBytes = headerSize; + } + + protected AbstractRowMemoryLogRecordsBuilder( + int schemaId, int writeLimit, AbstractPagedOutputView outputView, boolean appendOnly) { + this( + BUILDER_DEFAULT_OFFSET, + schemaId, + writeLimit, + CURRENT_LOG_MAGIC_VALUE, + outputView, + appendOnly); + } + + /** Implement to return size of the record (including length field). */ + protected abstract int sizeOf(T row); + + /** Implement to write the record and return total written bytes including length field. */ + protected abstract int writeRecord(ChangeType changeType, T row) throws IOException; + + public boolean hasRoomFor(T row) { + return sizeInBytes + sizeOf(row) <= writeLimit; + } + + public void append(ChangeType changeType, T row) throws Exception { + appendRecord(changeType, row); + } + + private void appendRecord(ChangeType changeType, T row) throws IOException { + if (aborted) { + throw new IllegalStateException( + "Tried to append a record, but " + + getClass().getSimpleName() + + " has already been aborted"); + } + if (isClosed) { + throw new IllegalStateException( + "Tried to append a record, but MemoryLogRecordsBuilder is closed for record appends"); + } + if (appendOnly && changeType != ChangeType.APPEND_ONLY) { + throw new IllegalArgumentException( + "Only append-only change type is allowed for append-only row log builder, but got " + + changeType); + } + + int recordByteSizes = writeRecord(changeType, row); + currentRecordNumber++; + sizeInBytes += recordByteSizes; + } + + public BytesView build() throws IOException { + if (aborted) { + throw new IllegalStateException("Attempting to build an aborted record batch"); + } + if (builtBuffer != null) { + return builtBuffer; + } + writeBatchHeader(); + builtBuffer = + MultiBytesView.builder() + .addMemorySegmentByteViewList(pagedOutputView.getWrittenSegments()) + .build(); + return builtBuffer; + } + + public void setWriterState(long writerId, int batchBaseSequence) { + this.writerId = writerId; + this.batchSequence = batchBaseSequence; + } + + public void resetWriterState(long writerId, int batchSequence) { + // trigger to rewrite batch header + this.builtBuffer = null; + this.writerId = writerId; + this.batchSequence = batchSequence; + } + + public long writerId() { + return writerId; + } + + public int batchSequence() { + return batchSequence; + } + + public boolean isClosed() { + return isClosed; + } + + public void abort() { + aborted = true; + } + + @Override + public void close() throws IOException { + if (aborted) { + throw new IllegalStateException( + "Cannot close " + + getClass().getSimpleName() + + " as it has already been aborted"); + } + isClosed = true; + } + + public int getSizeInBytes() { + return sizeInBytes; + } + + // ----------------------- internal methods ------------------------------- + private void writeBatchHeader() throws IOException { + // pagedOutputView doesn't support seek to previous segment, + // so we create a new output view on the first segment + MemorySegmentOutputView outputView = new MemorySegmentOutputView(firstSegment); + outputView.setPosition(0); + // update header. + outputView.writeLong(baseLogOffset); + outputView.writeInt(sizeInBytes - BASE_OFFSET_LENGTH - LENGTH_LENGTH); + outputView.writeByte(magic); + + // write empty timestamp which will be overridden on server side + outputView.writeLong(0); + + // write empty leaderEpoch which will be overridden on server side + if (magic >= LOG_MAGIC_VALUE_V1) { + outputView.writeInt(NO_LEADER_EPOCH); + } + + // write empty crc first. + outputView.writeUnsignedInt(0); + + outputView.writeShort((short) schemaId); + // write attributes (currently only appendOnly flag) + outputView.writeBoolean(appendOnly); + // skip write attribute byte for now. + outputView.setPosition(lastOffsetDeltaOffset(magic)); + if (currentRecordNumber > 0) { + outputView.writeInt(currentRecordNumber - 1); + } else { + // If there is no record, we write 0 for filed lastOffsetDelta, see the comments about + // the field 'lastOffsetDelta' in DefaultLogRecordBatch. + outputView.writeInt(0); + } + outputView.writeLong(writerId); + outputView.writeInt(batchSequence); + outputView.writeInt(currentRecordNumber); + + // Update crc. + long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), schemaIdOffset(magic)); + outputView.setPosition(crcOffset(magic)); + outputView.writeUnsignedInt(crc); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/record/CompactedLogRecord.java b/fluss-common/src/main/java/org/apache/fluss/record/CompactedLogRecord.java new file mode 100644 index 0000000000..92b329aee7 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/record/CompactedLogRecord.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.memory.OutputView; +import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.row.BinaryRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.compacted.CompactedRow; +import org.apache.fluss.row.compacted.CompactedRowDeserializer; +import org.apache.fluss.row.compacted.CompactedRowWriter; +import org.apache.fluss.types.DataType; +import org.apache.fluss.utils.MurmurHashUtils; + +import java.io.IOException; + +import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH; + +/** + * An immutable log record for @CompactedRow which can be directly persisted. The on-wire schema is + * identical to IndexedLogRecord but the row payload uses the CompactedRow binary format: + * + *
    + *
  • Length => int32 (total number of bytes following this length field) + *
  • Attributes => int8 (low 4 bits encode {@link ChangeType}) + *
  • Value => {@link CompactedRow} (bytes in compacted row format) + *
+ * + *

Differences vs {@link IndexedLogRecord}: - Uses CompactedRow encoding which is space-optimized + * (VLQ for ints/longs, per-row null bitset) and trades CPU for smaller storage; random access to + * fields is not supported without decoding. - Deserialization is lazy: we wrap the underlying bytes + * in a CompactedRow with a {@link CompactedRowDeserializer} and only decode to object values when a + * field is accessed. - The record header (Length + Attributes) layout and attribute semantics are + * the same. + * + *

The offset computes the difference relative to the base offset of the batch containing this + * record. + * + * @since 0.8 + */ +@PublicEvolving +public class CompactedLogRecord implements LogRecord { + + private static final int ATTRIBUTES_LENGTH = 1; + + private final long logOffset; + private final long timestamp; + private final DataType[] fieldTypes; + private final CompactedRowDeserializer compactedDeserializer; + + private MemorySegment segment; + private int offset; + private int sizeInBytes; + + CompactedLogRecord(long logOffset, long timestamp, DataType[] fieldTypes) { + this.logOffset = logOffset; + this.timestamp = timestamp; + this.fieldTypes = fieldTypes; + this.compactedDeserializer = new CompactedRowDeserializer(fieldTypes); + } + + private void pointTo(MemorySegment segment, int offset, int sizeInBytes) { + this.segment = segment; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public int getSizeInBytes() { + return sizeInBytes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompactedLogRecord that = (CompactedLogRecord) o; + return sizeInBytes == that.sizeInBytes + && segment.equalTo(that.segment, offset, that.offset, sizeInBytes); + } + + @Override + public int hashCode() { + return MurmurHashUtils.hashBytes(segment, offset, sizeInBytes); + } + + @Override + public long logOffset() { + return logOffset; + } + + @Override + public long timestamp() { + return timestamp; + } + + @Override + public ChangeType getChangeType() { + byte attributes = segment.get(offset + LENGTH_LENGTH); + return ChangeType.fromByteValue(attributes); + } + + @Override + public InternalRow getRow() { + int rowOffset = LENGTH_LENGTH + ATTRIBUTES_LENGTH; + return deserializeCompactedRow( + sizeInBytes - rowOffset, + segment, + offset + rowOffset, + fieldTypes, + LogFormat.COMPACTED); + } + + /** Write the record to output and return total bytes written including length field. */ + public static int writeTo(OutputView outputView, ChangeType changeType, CompactedRow row) + throws IOException { + int sizeInBytes = calculateSizeInBytes(row); + // write record total bytes size (excluding this int itself) + outputView.writeInt(sizeInBytes); + // write attributes + outputView.writeByte(changeType.toByteValue()); + // write row payload + CompactedRowWriter.serializeCompactedRow(row, outputView); + return sizeInBytes + LENGTH_LENGTH; + } + + public static CompactedLogRecord readFrom( + MemorySegment segment, + int position, + long logOffset, + long logTimestamp, + DataType[] colTypes) { + int sizeInBytes = segment.getInt(position); + CompactedLogRecord logRecord = new CompactedLogRecord(logOffset, logTimestamp, colTypes); + logRecord.pointTo(segment, position, sizeInBytes + LENGTH_LENGTH); + return logRecord; + } + + public static int sizeOf(BinaryRow row) { + int sizeInBytes = calculateSizeInBytes(row); + return sizeInBytes + LENGTH_LENGTH; + } + + private static int calculateSizeInBytes(BinaryRow row) { + int size = 1; // one byte for attributes + size += row.getSizeInBytes(); + return size; + } + + private InternalRow deserializeCompactedRow( + int length, + MemorySegment segment, + int position, + DataType[] fieldTypes, + LogFormat logFormat) { + if (logFormat == LogFormat.COMPACTED) { + CompactedRow compactedRow = new CompactedRow(fieldTypes.length, compactedDeserializer); + compactedRow.pointTo(segment, position, length); + return compactedRow; + } else { + throw new IllegalArgumentException( + "No such compacted row deserializer for: " + logFormat); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java index 88cba54978..188a62bbd0 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java @@ -233,6 +233,8 @@ public CloseableIterator records(ReadContext context) { case INDEXED: return rowRecordIterator( rowType, context.getOutputProjectedRow(schemaId), timestamp); + case COMPACTED: + return compactedRowRecordIterator(rowType, timestamp); default: throw new IllegalArgumentException("Unsupported log format: " + logFormat); } @@ -295,6 +297,33 @@ public void close() {} }; } + private CloseableIterator compactedRowRecordIterator( + RowType rowType, long timestamp) { + DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]); + return new LogRecordIterator() { + int position = DefaultLogRecordBatch.this.position + recordBatchHeaderSize(magic); + int rowId = 0; + + @Override + protected LogRecord readNext(long baseOffset) { + CompactedLogRecord logRecord = + CompactedLogRecord.readFrom( + segment, position, baseOffset + rowId, timestamp, fieldTypes); + rowId++; + position += logRecord.getSizeInBytes(); + return logRecord; + } + + @Override + protected boolean ensureNoneRemaining() { + return true; + } + + @Override + public void close() {} + }; + } + private CloseableIterator columnRecordIterator( RowType rowType, @Nullable ProjectedRow outputProjection, diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 7289d2d67e..d3ee22057c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -79,6 +79,12 @@ public static LogRecordReadContext createReadContext( Projection.of( IntStream.range(0, rowType.getFieldCount()).toArray(), tableInfo.getSchema()); + } else { + // Explicitly forbid any projection for COMPACTED log format + if (logFormat == LogFormat.COMPACTED) { + throw new IllegalArgumentException( + "Projection is not supported for COMPACTED log format. Please remove projection."); + } } if (logFormat == LogFormat.ARROW) { @@ -103,6 +109,9 @@ public static LogRecordReadContext createReadContext( } else if (logFormat == LogFormat.INDEXED) { int[] selectedFields = projection.getProjectionPositions(); return createIndexedReadContext(rowType, schemaId, selectedFields, schemaGetter); + } else if (logFormat == LogFormat.COMPACTED) { + int[] selectedFields = projection.getProjectionPositions(); + return createCompactedRowReadContext(rowType, schemaId, selectedFields); } else { throw new IllegalArgumentException("Unsupported log format: " + logFormat); } @@ -156,6 +165,13 @@ public static LogRecordReadContext createIndexedReadContext( return createIndexedReadContext(rowType, schemaId, selectedFields, schemaGetter); } + /** Creates a LogRecordReadContext for COMPACTED log format. */ + public static LogRecordReadContext createCompactedRowReadContext( + RowType rowType, int schemaId) { + int[] selectedFields = IntStream.range(0, rowType.getFieldCount()).toArray(); + return createCompactedRowReadContext(rowType, schemaId, selectedFields); + } + /** * Creates a LogRecordReadContext for INDEXED log format. * @@ -172,6 +188,21 @@ public static LogRecordReadContext createIndexedReadContext( LogFormat.INDEXED, rowType, schemaId, null, fieldGetters, false, schemaGetter); } + /** + * Creates a LogRecordReadContext for COMPACTED log format. + * + * @param rowType the schema of the read data + * @param schemaId the schemaId of the table + * @param selectedFields the final selected fields of the read data + */ + public static LogRecordReadContext createCompactedRowReadContext( + RowType rowType, int schemaId, int[] selectedFields) { + FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields); + // for COMPACTED log format, the projection is NEVER push downed to the server side + return new LogRecordReadContext( + LogFormat.COMPACTED, rowType, schemaId, null, fieldGetters, false, null); + } + private LogRecordReadContext( LogFormat logFormat, RowType targetDataRowType, diff --git a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java new file mode 100644 index 0000000000..e23aa5cc3b --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.memory.AbstractPagedOutputView; +import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.row.compacted.CompactedRow; + +import java.io.IOException; + +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; + +/** + * Default builder for {@link MemoryLogRecords} of log records in {@link LogFormat#COMPACTED} + * format. + */ +public class MemoryLogRecordsCompactedBuilder + extends AbstractRowMemoryLogRecordsBuilder { + + private MemoryLogRecordsCompactedBuilder( + long baseLogOffset, + int schemaId, + int writeLimit, + byte magic, + AbstractPagedOutputView pagedOutputView, + boolean appendOnly) { + super(baseLogOffset, schemaId, writeLimit, magic, pagedOutputView, appendOnly); + } + + public static MemoryLogRecordsCompactedBuilder builder( + int schemaId, int writeLimit, AbstractPagedOutputView outputView, boolean appendOnly) { + return new MemoryLogRecordsCompactedBuilder( + BUILDER_DEFAULT_OFFSET, + schemaId, + writeLimit, + CURRENT_LOG_MAGIC_VALUE, + outputView, + appendOnly); + } + + @VisibleForTesting + public static MemoryLogRecordsCompactedBuilder builder( + long baseLogOffset, + int schemaId, + int writeLimit, + byte magic, + AbstractPagedOutputView outputView) + throws IOException { + return new MemoryLogRecordsCompactedBuilder( + baseLogOffset, schemaId, writeLimit, magic, outputView, false); + } + + @Override + protected int sizeOf(CompactedRow row) { + return CompactedLogRecord.sizeOf(row); + } + + @Override + protected int writeRecord(ChangeType changeType, CompactedRow row) throws IOException { + return CompactedLogRecord.writeTo(pagedOutputView, changeType, row); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java index 8488c3351b..a713c34ee2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java @@ -19,51 +19,17 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.memory.AbstractPagedOutputView; -import org.apache.fluss.memory.MemorySegment; -import org.apache.fluss.memory.MemorySegmentOutputView; -import org.apache.fluss.metadata.LogFormat; -import org.apache.fluss.record.bytesview.BytesView; -import org.apache.fluss.record.bytesview.MultiBytesView; import org.apache.fluss.row.indexed.IndexedRow; -import org.apache.fluss.utils.crc.Crc32C; import java.io.IOException; import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; -import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH; -import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH; -import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; -import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; -import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH; -import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; -import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset; -import static org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset; -import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; -import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset; -import static org.apache.fluss.utils.Preconditions.checkArgument; /** - * Default builder for {@link MemoryLogRecords} of log records in {@link LogFormat#INDEXED} format. + * Default builder for {@link MemoryLogRecords} of log records in {@link + * org.apache.fluss.metadata.LogFormat#INDEXED} format. */ -public class MemoryLogRecordsIndexedBuilder implements AutoCloseable { - private static final int BUILDER_DEFAULT_OFFSET = 0; - - private final long baseLogOffset; - private final int schemaId; - // The max bytes can be appended. - private final int writeLimit; - private final byte magic; - private final AbstractPagedOutputView pagedOutputView; - private final MemorySegment firstSegment; - private final boolean appendOnly; - - private BytesView builtBuffer = null; - private long writerId; - private int batchSequence; - private int currentRecordNumber; - private int sizeInBytes; - private volatile boolean isClosed; - private boolean aborted = false; +public class MemoryLogRecordsIndexedBuilder extends AbstractRowMemoryLogRecordsBuilder { private MemoryLogRecordsIndexedBuilder( long baseLogOffset, @@ -72,25 +38,7 @@ private MemoryLogRecordsIndexedBuilder( byte magic, AbstractPagedOutputView pagedOutputView, boolean appendOnly) { - this.appendOnly = appendOnly; - checkArgument( - schemaId <= Short.MAX_VALUE, - "schemaId shouldn't be greater than the max value of short: " + Short.MAX_VALUE); - this.baseLogOffset = baseLogOffset; - this.schemaId = schemaId; - this.writeLimit = writeLimit; - this.magic = magic; - this.pagedOutputView = pagedOutputView; - this.firstSegment = pagedOutputView.getCurrentSegment(); - this.writerId = NO_WRITER_ID; - this.batchSequence = NO_BATCH_SEQUENCE; - this.currentRecordNumber = 0; - this.isClosed = false; - - // We don't need to write header information while the builder creating, - // we'll skip it first. - this.pagedOutputView.setPosition(recordBatchHeaderSize(magic)); - this.sizeInBytes = recordBatchHeaderSize(magic); + super(baseLogOffset, schemaId, writeLimit, magic, pagedOutputView, appendOnly); } public static MemoryLogRecordsIndexedBuilder builder( @@ -116,139 +64,13 @@ public static MemoryLogRecordsIndexedBuilder builder( baseLogOffset, schemaId, writeLimit, magic, outputView, false); } - /** - * Check if we have room for a new record containing the given row. If no records have been - * appended, then this returns true. - */ - public boolean hasRoomFor(IndexedRow row) { - return sizeInBytes + IndexedLogRecord.sizeOf(row) <= writeLimit; - } - - public void append(ChangeType changeType, IndexedRow row) throws Exception { - appendRecord(changeType, row); - } - - private void appendRecord(ChangeType changeType, IndexedRow row) throws IOException { - if (aborted) { - throw new IllegalStateException( - "Tried to append a record, but MemoryLogRecordsIndexedBuilder has already been aborted"); - } - - if (isClosed) { - throw new IllegalStateException( - "Tried to append a record, but MemoryLogRecordsBuilder is closed for record appends"); - } - if (appendOnly && changeType != ChangeType.APPEND_ONLY) { - throw new IllegalArgumentException( - "Only append-only change type is allowed for append-only arrow log builder, but got " - + changeType); - } - - int recordByteSizes = IndexedLogRecord.writeTo(pagedOutputView, changeType, row); - currentRecordNumber++; - sizeInBytes += recordByteSizes; - } - - public BytesView build() throws IOException { - if (aborted) { - throw new IllegalStateException("Attempting to build an aborted record batch"); - } - - if (builtBuffer != null) { - return builtBuffer; - } - - writeBatchHeader(); - builtBuffer = - MultiBytesView.builder() - .addMemorySegmentByteViewList(pagedOutputView.getWrittenSegments()) - .build(); - return builtBuffer; - } - - public void setWriterState(long writerId, int batchBaseSequence) { - this.writerId = writerId; - this.batchSequence = batchBaseSequence; - } - - public void resetWriterState(long writerId, int batchSequence) { - // trigger to rewrite batch header - this.builtBuffer = null; - this.writerId = writerId; - this.batchSequence = batchSequence; - } - - public long writerId() { - return writerId; - } - - public int batchSequence() { - return batchSequence; - } - - public boolean isClosed() { - return isClosed; - } - - public void abort() { - aborted = true; - } - @Override - public void close() throws IOException { - if (aborted) { - throw new IllegalStateException( - "Cannot close MemoryLogRecordsIndexedBuilder as it has already been aborted"); - } - - isClosed = true; - } - - public int getSizeInBytes() { - return sizeInBytes; + protected int sizeOf(IndexedRow row) { + return IndexedLogRecord.sizeOf(row); } - // ----------------------- internal methods ------------------------------- - private void writeBatchHeader() throws IOException { - // pagedOutputView doesn't support seek to previous segment, - // so we create a new output view on the first segment - MemorySegmentOutputView outputView = new MemorySegmentOutputView(firstSegment); - outputView.setPosition(0); - // update header. - outputView.writeLong(baseLogOffset); - outputView.writeInt(sizeInBytes - BASE_OFFSET_LENGTH - LENGTH_LENGTH); - outputView.writeByte(magic); - - // write empty timestamp which will be overridden on server side - outputView.writeLong(0); - - // write empty leaderEpoch which will be overridden on server side - if (magic >= LOG_MAGIC_VALUE_V1) { - outputView.writeInt(NO_LEADER_EPOCH); - } - - // write empty crc first. - outputView.writeUnsignedInt(0); - - outputView.writeShort((short) schemaId); - // write attributes (currently only appendOnly flag) - outputView.writeBoolean(appendOnly); - // skip write attribute byte for now. - outputView.setPosition(lastOffsetDeltaOffset(magic)); - if (currentRecordNumber > 0) { - outputView.writeInt(currentRecordNumber - 1); - } else { - // If there is no record, we write 0 for filed lastOffsetDelta, see the comments about - // the field 'lastOffsetDelta' in DefaultLogRecordBatch. - outputView.writeInt(0); - } - outputView.writeLong(writerId); - outputView.writeInt(batchSequence); - outputView.writeInt(currentRecordNumber); - - // Update crc. - long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), schemaIdOffset(magic)); - outputView.setPosition(crcOffset(magic)); - outputView.writeUnsignedInt(crc); + @Override + protected int writeRecord(ChangeType changeType, IndexedRow row) throws IOException { + return IndexedLogRecord.writeTo(pagedOutputView, changeType, row); } } diff --git a/fluss-common/src/test/java/org/apache/fluss/record/CompactedLogRecordTest.java b/fluss-common/src/test/java/org/apache/fluss/record/CompactedLogRecordTest.java new file mode 100644 index 0000000000..cfec405f1c --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/record/CompactedLogRecordTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.TestInternalRowGenerator; +import org.apache.fluss.row.compacted.CompactedRow; +import org.apache.fluss.row.compacted.CompactedRowDeserializer; +import org.apache.fluss.row.compacted.CompactedRowWriter; +import org.apache.fluss.types.DataType; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link CompactedLogRecord}. */ +class CompactedLogRecordTest extends LogTestBase { + + @Test + void testBase() throws IOException { + DataType[] fieldTypes = baseRowType.getChildren().toArray(new DataType[0]); + + CompactedRowWriter writer = new CompactedRowWriter(fieldTypes.length); + // field 0: int 10 + writer.writeInt(10); + // field 1: string "abc" + writer.writeString(BinaryString.fromString("abc")); + byte[] bytes = writer.toBytes(); + CompactedRow row = + CompactedRow.from(fieldTypes, bytes, new CompactedRowDeserializer(fieldTypes)); + + CompactedLogRecord.writeTo(outputView, ChangeType.APPEND_ONLY, row); + + CompactedLogRecord logRecord = + CompactedLogRecord.readFrom( + MemorySegment.wrap(outputView.getCopyOfBuffer()), + 0, + 1000, + 10001, + fieldTypes); + + assertThat(logRecord.getSizeInBytes()).isEqualTo(1 + row.getSizeInBytes() + 4); + assertThat(logRecord.logOffset()).isEqualTo(1000); + assertThat(logRecord.timestamp()).isEqualTo(10001); + assertThat(logRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY); + assertThat(logRecord.getRow()).isEqualTo(row); + } + + @Test + void testWriteToAndReadFromWithRandomData() throws IOException { + // generate a compacted row for all supported types + DataType[] allColTypes = + TestInternalRowGenerator.createAllRowType().getChildren().toArray(new DataType[0]); + CompactedRow row = TestInternalRowGenerator.genCompactedRowForAllType(); + + CompactedLogRecord.writeTo(outputView, ChangeType.APPEND_ONLY, row); + + LogRecord logRecord = + CompactedLogRecord.readFrom( + MemorySegment.wrap(outputView.getCopyOfBuffer()), + 0, + 1000, + 10001, + allColTypes); + + assertThat(logRecord.logOffset()).isEqualTo(1000); + assertThat(logRecord.timestamp()).isEqualTo(10001); + assertThat(logRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY); + assertThat(logRecord.getRow()).isEqualTo(row); + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilderTest.java b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilderTest.java new file mode 100644 index 0000000000..e576edb7a4 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilderTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.memory.ManagedPagedOutputView; +import org.apache.fluss.memory.TestingMemorySegmentPool; +import org.apache.fluss.row.compacted.CompactedRow; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.apache.fluss.record.ChangeType.APPEND_ONLY; +import static org.apache.fluss.record.TestData.BASE_OFFSET; +import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; +import static org.apache.fluss.record.TestData.DEFAULT_MAGIC; +import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; +import static org.apache.fluss.testutils.DataTestUtils.compactedRow; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link MemoryLogRecordsCompactedBuilder}. */ +public class MemoryLogRecordsCompactedBuilderTest { + + @Test + void testAppendAndBuild() throws Exception { + MemoryLogRecordsCompactedBuilder builder = createBuilder(0, 4, 1024); + + List expected = new ArrayList<>(); + expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"})); + expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {2, "b"})); + expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {3, "c"})); + + for (CompactedRow row : expected) { + assertThat(builder.hasRoomFor(row)).isTrue(); + builder.append(APPEND_ONLY, row); + } + + builder.setWriterState(7L, 13); + builder.close(); + MemoryLogRecords records = MemoryLogRecords.pointToBytesView(builder.build()); + + Iterator it = records.batches().iterator(); + assertThat(it.hasNext()).isTrue(); + LogRecordBatch batch = it.next(); + assertThat(it.hasNext()).isFalse(); + + assertThat(batch.getRecordCount()).isEqualTo(expected.size()); + assertThat(batch.baseLogOffset()).isEqualTo(0); + assertThat(batch.writerId()).isEqualTo(7L); + assertThat(batch.batchSequence()).isEqualTo(13); + + try (LogRecordReadContext ctx = + LogRecordReadContext.createCompactedRowReadContext( + DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID); + CloseableIterator recIt = batch.records(ctx)) { + for (CompactedRow expRow : expected) { + assertThat(recIt.hasNext()).isTrue(); + LogRecord rec = recIt.next(); + assertThat(rec.getChangeType()).isEqualTo(APPEND_ONLY); + assertThat(rec.getRow()).isEqualTo(expRow); + } + assertThat(recIt.hasNext()).isFalse(); + } + } + + @Test + void testAbortSemantics() throws Exception { + MemoryLogRecordsCompactedBuilder builder = createBuilder(0, 2, 512); + builder.append(APPEND_ONLY, compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"})); + builder.abort(); + + // append after abort + assertThatThrownBy( + () -> + builder.append( + APPEND_ONLY, + compactedRow(DATA1_ROW_TYPE, new Object[] {2, "b"}))) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("MemoryLogRecordsCompactedBuilder has already been aborted"); + + // build after abort + assertThatThrownBy(builder::build) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Attempting to build an aborted record batch"); + + // close after abort + assertThatThrownBy(builder::close) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "Cannot close MemoryLogRecordsCompactedBuilder as it has already been aborted"); + } + + @Test + void testNoRecordAppendAndBaseOffset() throws Exception { + // base offset 0 + MemoryLogRecordsCompactedBuilder builder = createBuilder(0, 1, 1024); + MemoryLogRecords records = MemoryLogRecords.pointToBytesView(builder.build()); + assertThat(records.sizeInBytes()) + .isEqualTo( + LogRecordBatchFormat.recordBatchHeaderSize( + DEFAULT_MAGIC)); // only batch header + LogRecordBatch batch = records.batches().iterator().next(); + batch.ensureValid(); + assertThat(batch.getRecordCount()).isEqualTo(0); + assertThat(batch.baseLogOffset()).isEqualTo(0); + assertThat(batch.lastLogOffset()).isEqualTo(0); + assertThat(batch.nextLogOffset()).isEqualTo(1); + try (LogRecordReadContext ctx = + LogRecordReadContext.createCompactedRowReadContext( + DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID); + CloseableIterator it = batch.records(ctx)) { + assertThat(it.hasNext()).isFalse(); + } + + // base offset 100 + builder = createBuilder(100, 1, 1024); + records = MemoryLogRecords.pointToBytesView(builder.build()); + assertThat(records.sizeInBytes()) + .isEqualTo(LogRecordBatchFormat.recordBatchHeaderSize(DEFAULT_MAGIC)); + batch = records.batches().iterator().next(); + batch.ensureValid(); + assertThat(batch.getRecordCount()).isEqualTo(0); + assertThat(batch.baseLogOffset()).isEqualTo(100); + assertThat(batch.lastLogOffset()).isEqualTo(100); + assertThat(batch.nextLogOffset()).isEqualTo(101); + try (LogRecordReadContext ctx = + LogRecordReadContext.createCompactedRowReadContext( + DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID); + CloseableIterator it = batch.records(ctx)) { + assertThat(it.hasNext()).isFalse(); + } + } + + @Test + void testResetWriterState() throws Exception { + MemoryLogRecordsCompactedBuilder builder = createBuilder(BASE_OFFSET, 2, 1024); + List expected = new ArrayList<>(); + expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"})); + expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {2, "b"})); + for (CompactedRow row : expected) { + builder.append(APPEND_ONLY, row); + } + builder.setWriterState(5L, 0); + builder.close(); + MemoryLogRecords records = MemoryLogRecords.pointToBytesView(builder.build()); + LogRecordBatch batch = records.batches().iterator().next(); + assertThat(batch.writerId()).isEqualTo(5L); + assertThat(batch.batchSequence()).isEqualTo(0); + + // reset writer state and rebuild with new sequence + builder.resetWriterState(5L, 1); + records = MemoryLogRecords.pointToBytesView(builder.build()); + batch = records.batches().iterator().next(); + assertThat(batch.writerId()).isEqualTo(5L); + assertThat(batch.batchSequence()).isEqualTo(1); + } + + private MemoryLogRecordsCompactedBuilder createBuilder( + long baseOffset, int maxPages, int pageSizeInBytes) throws IOException { + return MemoryLogRecordsCompactedBuilder.builder( + baseOffset, + DEFAULT_SCHEMA_ID, + maxPages * pageSizeInBytes, + DEFAULT_MAGIC, + new ManagedPagedOutputView(new TestingMemorySegmentPool(pageSizeInBytes))); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index b6957c5dc0..e6542d8f47 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -54,6 +54,7 @@ import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader; import org.apache.fluss.server.kv.snapshot.RocksIncrementalSnapshot; import org.apache.fluss.server.kv.wal.ArrowWalBuilder; +import org.apache.fluss.server.kv.wal.CompactedWalBuilder; import org.apache.fluss.server.kv.wal.IndexWalBuilder; import org.apache.fluss.server.kv.wal.WalBuilder; import org.apache.fluss.server.log.LogAppendInfo; @@ -442,6 +443,8 @@ private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Except "Primary Key Table with COMPACTED kv format doesn't support INDEXED cdc log format."); } return new IndexWalBuilder(schemaId, memorySegmentPool); + case COMPACTED: + return new CompactedWalBuilder(schemaId, rowType, memorySegmentPool); case ARROW: return new ArrowWalBuilder( schemaId, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/wal/CompactedWalBuilder.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/wal/CompactedWalBuilder.java new file mode 100644 index 0000000000..8c80a48d91 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/wal/CompactedWalBuilder.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.wal; + +import org.apache.fluss.memory.ManagedPagedOutputView; +import org.apache.fluss.memory.MemorySegmentPool; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder; +import org.apache.fluss.record.bytesview.BytesView; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.InternalRow.FieldGetter; +import org.apache.fluss.row.compacted.CompactedRow; +import org.apache.fluss.row.encode.CompactedRowEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; + +import java.io.IOException; + +/** A {@link WalBuilder} that builds a {@link MemoryLogRecords} with Compacted log format. */ +public class CompactedWalBuilder implements WalBuilder { + + private final MemorySegmentPool memorySegmentPool; + private final ManagedPagedOutputView outputView; + private final MemoryLogRecordsCompactedBuilder recordsBuilder; + + private final CompactedRowEncoder rowEncoder; + private final FieldGetter[] fieldGetters; + private final int fieldCount; + + public CompactedWalBuilder(int schemaId, RowType rowType, MemorySegmentPool memorySegmentPool) + throws IOException { + this.memorySegmentPool = memorySegmentPool; + this.outputView = new ManagedPagedOutputView(memorySegmentPool); + // unlimited write size as we don't know the WAL size in advance + this.recordsBuilder = + MemoryLogRecordsCompactedBuilder.builder( + schemaId, Integer.MAX_VALUE, outputView, /*appendOnly*/ false); + DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]); + this.rowEncoder = new CompactedRowEncoder(fieldTypes); + this.fieldGetters = InternalRow.createFieldGetters(rowType); + this.fieldCount = rowType.getFieldCount(); + } + + @Override + public void append(ChangeType changeType, InternalRow row) throws Exception { + final CompactedRow compactedRow; + if (row instanceof CompactedRow) { + compactedRow = (CompactedRow) row; + } else { + rowEncoder.startNewRow(); + for (int i = 0; i < fieldCount; i++) { + rowEncoder.encodeField(i, fieldGetters[i].getFieldOrNull(row)); + } + compactedRow = rowEncoder.finishRow(); + } + recordsBuilder.append(changeType, compactedRow); + } + + @Override + public MemoryLogRecords build() throws Exception { + recordsBuilder.close(); + BytesView bytesView = recordsBuilder.build(); + // Convert BytesView to MemoryLogRecords (may copy if composite) + return MemoryLogRecords.pointToByteBuffer(bytesView.getByteBuf().nioBuffer()); + } + + @Override + public void setWriterState(long writerId, int batchSequence) { + recordsBuilder.setWriterState(writerId, batchSequence); + } + + @Override + public void deallocate() { + memorySegmentPool.returnAll(outputView.allocatedPooledSegments()); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 250fc30da6..0941e32c16 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -221,9 +221,13 @@ private static void checkReplicationFactor(Configuration tableConf) { private static void checkLogFormat(Configuration tableConf, boolean hasPrimaryKey) { KvFormat kvFormat = tableConf.get(ConfigOptions.TABLE_KV_FORMAT); LogFormat logFormat = tableConf.get(ConfigOptions.TABLE_LOG_FORMAT); - if (hasPrimaryKey && kvFormat == KvFormat.COMPACTED && logFormat != LogFormat.ARROW) { + + // Allow COMPACTED and ARROW log formats when KV format is COMPACTED for primary key tables + if (hasPrimaryKey + && kvFormat == KvFormat.COMPACTED + && !(logFormat == LogFormat.ARROW || logFormat == LogFormat.COMPACTED)) { throw new InvalidConfigException( - "Currently, Primary Key Table only supports ARROW log format if kv format is COMPACTED."); + "Currently, Primary Key Table supports ARROW or COMPACTED log format when kv format is COMPACTED."); } }