Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(table): support partition & upsert config #2248

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,37 +81,37 @@ public class TopicConfig {

public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable";
public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration as true. " +
"You can not disable this config once it is enabled. It will be provided in future versions.";
"You can not disable this config once it is enabled. It will be provided in future versions.";

public static final String LOCAL_LOG_RETENTION_MS_CONFIG = "local.retention.ms";
public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of milliseconds to keep the local log segment before it gets deleted. " +
"Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " +
"to `retention.ms` value.";
"Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " +
"to `retention.ms` value.";

public static final String LOCAL_LOG_RETENTION_BYTES_CONFIG = "local.retention.bytes";
public static final String LOCAL_LOG_RETENTION_BYTES_DOC = "The maximum size of local log segments that can grow for a partition before it " +
"deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";
"deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";

public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = "remote.log.copy.disable";
public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines whether tiered data for a topic should become read only," +
" and no more data uploading on a topic. Once this config is set to true, the local retention configuration " +
"(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" +
"(i.e. retention.ms/bytes).";
" and no more data uploading on a topic. Once this config is set to true, the local retention configuration " +
"(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" +
"(i.e. retention.ms/bytes).";

public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +
"deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " +
"set `remote.storage.enable` from true to false";
"deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " +
"set `remote.storage.enable` from true to false";

public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
public static final String MAX_MESSAGE_BYTES_DOC =
"The largest record batch size allowed by Kafka (after compression if compression is enabled). " +
"If this is increased and there are consumers older than 0.10.2, the consumers' fetch " +
"size must also be increased so that they can fetch record batches this large. " +
"In the latest message format version, records are always grouped into batches for efficiency. " +
"In previous message format versions, uncompressed records are not grouped into batches and this " +
"limit only applies to a single record in that case.";
"If this is increased and there are consumers older than 0.10.2, the consumers' fetch " +
"size must also be increased so that they can fetch record batches this large. " +
"In the latest message format version, records are always grouped into batches for efficiency. " +
"In previous message format versions, uncompressed records are not grouped into batches and this " +
"limit only applies to a single record in that case.";

public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes";
public static final String INDEX_INTERVAL_BYTES_DOC = "This setting controls how frequently " +
Expand Down Expand Up @@ -167,7 +167,7 @@ public class TopicConfig {
"not in the ISR set to be elected as leader as a last resort, even though doing so may result in data " +
"loss.<p>Note: In KRaft mode, when enabling this config dynamically, it needs to wait for the unclean leader election" +
"thread to trigger election periodically (default is 5 minutes). Please run `kafka-leader-election.sh` with `unclean` option " +
"to trigger the unclean leader election immediately if needed.</p>";
"to trigger the unclean leader election immediately if needed.</p>";

public static final String MIN_IN_SYNC_REPLICAS_CONFIG = "min.insync.replicas";
public static final String MIN_IN_SYNC_REPLICAS_DOC = "When a producer sets acks to \"all\" (or \"-1\"), " +
Expand Down Expand Up @@ -267,6 +267,15 @@ public class TopicConfig {
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns";
public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables."
+ "ex. [region, name]";
public static final String TABLE_TOPIC_PARTITION_BY_CONFIG = "automq.table.topic.partition.by";
public static final String TABLE_TOPIC_PARTITION_BY_DOC = "The partition fields of the table. ex. [bucket(name), month(timestamp)]";
public static final String TABLE_TOPIC_UPSERT_ENABLE_CONFIG = "automq.table.topic.upsert.enable";
public static final String TABLE_TOPIC_UPSERT_ENABLE_DOC = "The configuration controls whether enable table topic upsert";
public static final String TABLE_TOPIC_CDC_FIELD_CONFIG = "automq.table.topic.cdc.field";
public static final String TABLE_TOPIC_CDC_FIELD_DOC = "The name of the field containing the CDC operation, I, U, or D";
// AutoMQ inject end

}
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@ public Optional<String> serverConfigName(String configName) {
.define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC)
.define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC)
.define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC)
// TODO: add validator
.define(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_ID_COLUMNS_DOC)
.define(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_PARTITION_BY_DOC)
.define(TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_DOC)
.define(TopicConfig.TABLE_TOPIC_CDC_FIELD_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_CDC_FIELD_DOC)
// AutoMQ inject end
.define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC);
Expand Down Expand Up @@ -396,6 +401,10 @@ public Optional<String> serverConfigName(String configName) {
public final long tableTopicCommitInterval;
public final String tableTopicNamespace;
public final TableTopicSchemaType tableTopicSchemaType;
public final String tableTopicIdColumns;
public final String tableTopicPartitionBy;
public final boolean tableTopicUpsertEnable;
public final String tableTopicCdcField;
// AutoMQ inject end

private final int maxMessageSize;
Expand Down Expand Up @@ -453,6 +462,10 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG);
this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG);
this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG));
this.tableTopicIdColumns = getString(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG);
this.tableTopicPartitionBy = getString(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG);
this.tableTopicUpsertEnable = getBoolean(TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_CONFIG);
this.tableTopicCdcField = getString(TopicConfig.TABLE_TOPIC_CDC_FIELD_CONFIG);
// AutoMQ inject end

remoteLogConfig = new RemoteLogConfig(this);
Expand Down
Loading