Skip to content

Commit b942eee

Browse files
authored
feat(zerozone): support zero cross az traffic cost (#2505)
Signed-off-by: Robin Han <[email protected]>
1 parent 8c61afd commit b942eee

32 files changed

+3780
-5
lines changed

core/src/main/java/kafka/automq/AutoMQConfig.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.ArrayList;
3838
import java.util.Collections;
3939
import java.util.List;
40+
import java.util.Optional;
4041
import java.util.concurrent.TimeUnit;
4142

4243
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
@@ -194,6 +195,13 @@ public class AutoMQConfig {
194195
public static final String S3_BACK_PRESSURE_COOLDOWN_MS_DOC = "The cooldown time in milliseconds to wait between two regulator actions";
195196
public static final long S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT = TimeUnit.SECONDS.toMillis(15);
196197

198+
public static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG = "automq.table.topic.schema.registry.url";
199+
private static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC = "The schema registry url for table topic";
200+
201+
public static final String ZONE_ROUTER_CHANNELS_CONFIG = "automq.zonerouter.channels";
202+
public static final String ZONE_ROUTER_CHANNELS_DOC = "The channels to use for cross zone router. Currently it only support object storage channel."
203+
+ " The format is '0@s3://$bucket?region=$region[&batchInterval=250][&maxBytesInBatch=8388608]'";
204+
197205
// Deprecated config start
198206
public static final String S3_ENDPOINT_CONFIG = "s3.endpoint";
199207
public static final String S3_ENDPOINT_DOC = "[DEPRECATED]please use s3.data.buckets. The object storage endpoint, ex. <code>https://s3.us-east-1.amazonaws.com</code>.";
@@ -245,9 +253,6 @@ public class AutoMQConfig {
245253
public static final String S3_TELEMETRY_OPS_ENABLED_CONFIG = "s3.telemetry.ops.enabled";
246254
public static final String S3_TELEMETRY_OPS_ENABLED_DOC = "[DEPRECATED] use s3.telemetry.metrics.uri instead.";
247255

248-
public static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG = "automq.table.topic.schema.registry.url";
249-
private static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC = "The schema registry url for table topic";
250-
251256
// Deprecated config end
252257

253258
public static void define(ConfigDef configDef) {
@@ -285,6 +290,7 @@ public static void define(ConfigDef configDef) {
285290
.define(AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_CONFIG, STRING, null, MEDIUM, AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_DOC)
286291
.define(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, BOOLEAN, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DOC)
287292
.define(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, LONG, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DOC)
293+
.define(AutoMQConfig.ZONE_ROUTER_CHANNELS_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, AutoMQConfig.ZONE_ROUTER_CHANNELS_DOC)
288294
// Deprecated config start
289295
.define(AutoMQConfig.S3_ENDPOINT_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_ENDPOINT_DOC)
290296
.define(AutoMQConfig.S3_REGION_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_REGION_DOC)
@@ -310,13 +316,15 @@ public static void define(ConfigDef configDef) {
310316
private String walConfig;
311317
private String metricsExporterURI;
312318
private List<Pair<String, String>> baseLabels;
319+
private Optional<BucketURI> zoneRouterChannels;
313320

314321
public AutoMQConfig setup(KafkaConfig config) {
315322
dataBuckets = genDataBuckets(config);
316323
opsBuckets = genOpsBuckets(config);
317324
walConfig = genWALConfig(config);
318325
metricsExporterURI = genMetricsExporterURI(config);
319326
baseLabels = parseBaseLabels(config);
327+
zoneRouterChannels = genZoneRouterChannels(config);
320328
return this;
321329
}
322330

@@ -340,6 +348,10 @@ public List<Pair<String, String>> baseLabels() {
340348
return baseLabels;
341349
}
342350

351+
public Optional<BucketURI> zoneRouterChannels() {
352+
return zoneRouterChannels;
353+
}
354+
343355
private static List<BucketURI> genDataBuckets(KafkaConfig config) {
344356
String dataBuckets = config.getString(S3_DATA_BUCKETS_CONFIG);
345357
String oldEndpoint = config.getString(S3_ENDPOINT_CONFIG);
@@ -463,4 +475,20 @@ private static List<Pair<String, String>> parseBaseLabels(KafkaConfig config) {
463475
}
464476
return labels;
465477
}
478+
479+
480+
private static Optional<BucketURI> genZoneRouterChannels(KafkaConfig config) {
481+
String str = config.getString(ZONE_ROUTER_CHANNELS_CONFIG);
482+
if (StringUtils.isBlank(str)) {
483+
return Optional.empty();
484+
}
485+
List<BucketURI> buckets = BucketURI.parseBuckets(str);
486+
if (buckets.isEmpty()) {
487+
return Optional.empty();
488+
} else if (buckets.size() > 1) {
489+
throw new IllegalArgumentException(ZONE_ROUTER_CHANNELS_CONFIG + " only supports one object storage, but it's config with " + str);
490+
} else {
491+
return Optional.of(buckets.get(0));
492+
}
493+
}
466494
}
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package kafka.automq.zerozone;
21+
22+
import kafka.server.KafkaConfig;
23+
24+
import org.apache.kafka.clients.ApiVersions;
25+
import org.apache.kafka.clients.ClientRequest;
26+
import org.apache.kafka.clients.ClientResponse;
27+
import org.apache.kafka.clients.ManualMetadataUpdater;
28+
import org.apache.kafka.clients.MetadataRecoveryStrategy;
29+
import org.apache.kafka.clients.NetworkClient;
30+
import org.apache.kafka.clients.NetworkClientUtils;
31+
import org.apache.kafka.clients.RequestCompletionHandler;
32+
import org.apache.kafka.common.Node;
33+
import org.apache.kafka.common.metrics.Metrics;
34+
import org.apache.kafka.common.network.ChannelBuilder;
35+
import org.apache.kafka.common.network.ChannelBuilders;
36+
import org.apache.kafka.common.network.NetworkReceive;
37+
import org.apache.kafka.common.network.Selectable;
38+
import org.apache.kafka.common.network.Selector;
39+
import org.apache.kafka.common.requests.AbstractRequest;
40+
import org.apache.kafka.common.security.JaasContext;
41+
import org.apache.kafka.common.utils.LogContext;
42+
import org.apache.kafka.common.utils.Time;
43+
44+
import com.automq.stream.utils.Threads;
45+
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
48+
49+
import java.net.SocketTimeoutException;
50+
import java.util.Collections;
51+
import java.util.Map;
52+
import java.util.Queue;
53+
import java.util.concurrent.CompletableFuture;
54+
import java.util.concurrent.ConcurrentHashMap;
55+
import java.util.concurrent.ConcurrentLinkedQueue;
56+
import java.util.concurrent.ConcurrentMap;
57+
import java.util.concurrent.ExecutorService;
58+
import java.util.concurrent.atomic.AtomicBoolean;
59+
60+
public interface AsyncSender {
61+
62+
<T extends AbstractRequest> CompletableFuture<ClientResponse> sendRequest(
63+
Node node,
64+
AbstractRequest.Builder<T> requestBuilder
65+
);
66+
67+
void initiateClose();
68+
69+
void close();
70+
71+
class BrokersAsyncSender implements AsyncSender {
72+
private static final Logger LOGGER = LoggerFactory.getLogger(BrokersAsyncSender.class);
73+
private final NetworkClient networkClient;
74+
private final Time time;
75+
private final ExecutorService executorService;
76+
private final AtomicBoolean shouldRun = new AtomicBoolean(true);
77+
78+
public BrokersAsyncSender(
79+
KafkaConfig brokerConfig,
80+
Metrics metrics,
81+
String metricGroupPrefix,
82+
Time time,
83+
String clientId,
84+
LogContext logContext
85+
) {
86+
87+
ChannelBuilder channelBuilder = ChannelBuilders.clientChannelBuilder(
88+
brokerConfig.interBrokerSecurityProtocol(),
89+
JaasContext.Type.SERVER,
90+
brokerConfig,
91+
brokerConfig.interBrokerListenerName(),
92+
brokerConfig.saslMechanismInterBrokerProtocol(),
93+
time,
94+
brokerConfig.saslInterBrokerHandshakeRequestEnable(),
95+
logContext
96+
);
97+
Selector selector = new Selector(
98+
NetworkReceive.UNLIMITED,
99+
brokerConfig.connectionsMaxIdleMs(),
100+
metrics,
101+
time,
102+
metricGroupPrefix,
103+
Collections.emptyMap(),
104+
false,
105+
channelBuilder,
106+
logContext
107+
);
108+
this.networkClient = new NetworkClient(
109+
selector,
110+
new ManualMetadataUpdater(),
111+
clientId,
112+
5,
113+
0,
114+
0,
115+
Selectable.USE_DEFAULT_BUFFER_SIZE,
116+
brokerConfig.replicaSocketReceiveBufferBytes(),
117+
brokerConfig.requestTimeoutMs(),
118+
brokerConfig.connectionSetupTimeoutMs(),
119+
brokerConfig.connectionSetupTimeoutMaxMs(),
120+
time,
121+
false,
122+
new ApiVersions(),
123+
logContext,
124+
MetadataRecoveryStrategy.REBOOTSTRAP
125+
);
126+
this.time = time;
127+
executorService = Threads.newFixedThreadPoolWithMonitor(1, metricGroupPrefix, true, LOGGER);
128+
executorService.submit(this::run);
129+
}
130+
131+
private final ConcurrentMap<Node, Queue<Request>> waitingSendRequests = new ConcurrentHashMap<>();
132+
133+
@Override
134+
public <T extends AbstractRequest> CompletableFuture<ClientResponse> sendRequest(Node node,
135+
AbstractRequest.Builder<T> requestBuilder) {
136+
CompletableFuture<ClientResponse> cf = new CompletableFuture<>();
137+
waitingSendRequests.compute(node, (n, queue) -> {
138+
if (queue == null) {
139+
queue = new ConcurrentLinkedQueue<>();
140+
}
141+
queue.add(new Request(requestBuilder, cf));
142+
return queue;
143+
});
144+
return cf;
145+
}
146+
147+
private void run() {
148+
Map<Node, Long> lastUnreadyTimes = new ConcurrentHashMap<>();
149+
while (shouldRun.get()) {
150+
// TODO: graceful shutdown
151+
try {
152+
long now = time.milliseconds();
153+
waitingSendRequests.forEach((node, queue) -> {
154+
if (queue.isEmpty()) {
155+
return;
156+
}
157+
if (NetworkClientUtils.isReady(networkClient, node, now)) {
158+
lastUnreadyTimes.remove(node);
159+
Request request = queue.poll();
160+
ClientRequest clientRequest = networkClient.newClientRequest(Integer.toString(node.id()), request.requestBuilder, now, true, 3000, new RequestCompletionHandler() {
161+
@Override
162+
public void onComplete(ClientResponse response) {
163+
request.cf.complete(response);
164+
}
165+
});
166+
networkClient.send(clientRequest, now);
167+
} else {
168+
Long lastUnreadyTime = lastUnreadyTimes.get(node);
169+
if (lastUnreadyTime == null) {
170+
networkClient.ready(node, now);
171+
lastUnreadyTimes.put(node, now);
172+
} else {
173+
if (now - lastUnreadyTime > 3000) {
174+
for (; ; ) {
175+
Request request = queue.poll();
176+
if (request == null) {
177+
break;
178+
}
179+
request.cf.completeExceptionally(new SocketTimeoutException(String.format("Cannot connect to node=%s", node)));
180+
}
181+
lastUnreadyTimes.remove(node);
182+
} else {
183+
// attempt to reconnect
184+
networkClient.ready(node, now);
185+
}
186+
}
187+
}
188+
});
189+
networkClient.poll(1, now);
190+
} catch (Throwable e) {
191+
LOGGER.error("Processor get uncaught exception", e);
192+
}
193+
}
194+
}
195+
196+
@Override
197+
public void initiateClose() {
198+
networkClient.initiateClose();
199+
}
200+
201+
@Override
202+
public void close() {
203+
networkClient.close();
204+
shouldRun.set(false);
205+
}
206+
}
207+
208+
class Request {
209+
final AbstractRequest.Builder<?> requestBuilder;
210+
final CompletableFuture<ClientResponse> cf;
211+
212+
public Request(AbstractRequest.Builder<?> requestBuilder, CompletableFuture<ClientResponse> cf) {
213+
this.requestBuilder = requestBuilder;
214+
this.cf = cf;
215+
}
216+
}
217+
218+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package kafka.automq.zerozone;
21+
22+
import kafka.automq.interceptor.ClientIdMetadata;
23+
24+
public interface ClientRackProvider {
25+
26+
String rack(ClientIdMetadata clientId);
27+
28+
}

0 commit comments

Comments
 (0)