Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,24 @@ public class ConfigOptions {
"The interval for cleaning up expired producer offsets "
+ "and orphan files in remote storage. Default is 1 hour.");

public static final ConfigOption<Duration> COORDINATOR_REQUEST_RETRY_BACKOFF =
key("coordinator.request.retry-backoff")
.durationType()
.defaultValue(Duration.ofMillis(100))
.withDescription(
"The backoff duration the coordinator waits before retrying a "
+ "control-plane request to a tablet server after a "
+ "transient RPC-layer failure.");

public static final ConfigOption<Duration> COORDINATOR_REQUEST_TIMEOUT =
key("coordinator.request.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The timeout the sender thread waits for a response to a "
+ "control-plane request before treating it as failed "
+ "and retrying.");

// ------------------------------------------------------------------------
// ConfigOptions for Tablet Server
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ public class MetricNames {
public static final String BUCKET_COUNT = "bucketCount";
public static final String PARTITION_COUNT = "partitionCount";
public static final String REPLICAS_TO_DELETE_COUNT = "replicasToDeleteCount";
public static final String DELETION_INELIGIBLE_COUNT = "deletionIneligibleCount";

// for coordinator sender (per-tablet-server control request sender threads)
public static final String SENDER_QUEUE_SIZE = "senderQueueSize";
public static final String SENDER_QUEUE_TIME_MS = "senderQueueTimeMs";
public static final String SENDER_RETRY_COUNT = "senderRetryCount";
public static final String SENDER_STALE_DROP_COUNT = "senderStaleDropCount";
public static final String SENDER_ALIVE = "senderAlive";

// for coordinator event processor
public static final String EVENT_QUEUE_SIZE = "eventQueueSize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/** An abstract thread that is shutdownable . */
public abstract class ShutdownableThread extends Thread {
Expand Down Expand Up @@ -110,6 +111,16 @@ public void run() {
log.info("Stopped");
}

/**
* Causes the current thread to wait until the shutdown is initiated, or the specified waiting
* time elapses.
*/
public void pause(long timeout, TimeUnit unit) throws InterruptedException {
if (shutdownInitiated.await(timeout, unit)) {
log.trace("shutdownInitiated latch count reached zero. Shutdown called.");
}
}

public boolean isRunning() {
return !isShutdownInitiated();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.fluss.metrics.Gauge;
import org.apache.fluss.metrics.Histogram;
import org.apache.fluss.metrics.Meter;
import org.apache.fluss.metrics.Metric;
import org.apache.fluss.metrics.SimpleCounter;
import org.apache.fluss.metrics.groups.MetricGroup;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
Expand All @@ -37,6 +39,7 @@ public class TestMetricGroup implements MetricGroup {
private final Map<String, String> variables;
private final BiFunction<String, Optional<CharacterFilter>, String> metricIdentifierFunction;
private final BiFunction<CharacterFilter, Optional<Character>, String> logicalScopeFunction;
private final Map<String, Metric> metrics = new HashMap<>();

public TestMetricGroup(
String[] scopeComponents,
Expand All @@ -49,32 +52,48 @@ public TestMetricGroup(
this.logicalScopeFunction = logicalScopeFunction;
}

/** Creates a default {@link TestMetricGroup} suitable for unit tests. */
public static TestMetricGroup createTestMetricGroup() {
return newBuilder().build();
}

public static TestMetricGroupBuilder newBuilder() {
return new TestMetricGroupBuilder();
}

/** Returns a metric previously registered under the given name. */
public Metric getMetric(String name) {
return metrics.get(name);
}

@Override
public Counter counter(String name) {
return new SimpleCounter();
SimpleCounter counter = new SimpleCounter();
metrics.put(name, counter);
return counter;
}

@Override
public <C extends Counter> C counter(String name, C counter) {
metrics.put(name, counter);
return counter;
}

@Override
public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
metrics.put(name, gauge);
return gauge;
}

@Override
public <H extends Histogram> H histogram(String name, H histogram) {
metrics.put(name, histogram);
return histogram;
}

@Override
public <M extends Meter> M meter(String name, M meter) {
metrics.put(name, meter);
return meter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.TableBucketReplica;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.rpc.RpcClient;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.ApiMessage;
Expand All @@ -35,56 +39,134 @@
import org.apache.fluss.rpc.messages.StopReplicaResponse;
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
import org.apache.fluss.rpc.messages.UpdateMetadataResponse;
import org.apache.fluss.rpc.protocol.ApiKeys;
import org.apache.fluss.server.coordinator.channel.ControlRequestSendThread;
import org.apache.fluss.server.coordinator.channel.QueueItem;
import org.apache.fluss.server.coordinator.channel.TabletServerChannelState;
import org.apache.fluss.server.utils.RpcGatewayManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.Nullable;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import java.util.function.IntSupplier;

import static org.apache.fluss.utils.Preconditions.checkState;

/**
* Using by coordinator server. It's a manager to manage the rpc channels to tablet servers and send
* request to the servers.
* Used by the coordinator server to manage RPC channels to tablet servers and send requests.
* Mutations are guarded by {@code channelLock} so the metric reporter thread can safely read queue
* sizes. Mirrors Kafka's {@code ControllerChannelManager} which uses a {@code brokerLock} for the
* same purpose.
*/
@NotThreadSafe
public class CoordinatorChannelManager {

private static final Logger LOG = LoggerFactory.getLogger(CoordinatorChannelManager.class);

private static final int WINDOW_SIZE = 100;

/** A manager for the rpc gateways to tablet servers. */
private final RpcGatewayManager<TabletServerGateway> rpcGatewayManager;

public CoordinatorChannelManager(RpcClient rpcClient) {
private final IntSupplier epochSupplier;
private final Configuration conf;
private final MetricGroup coordinatorMetricGroup;

private final Object channelLock = new Object();
private final Map<Integer, TabletServerChannelState> channelStates = new HashMap<>();

public CoordinatorChannelManager(
RpcClient rpcClient,
IntSupplier epochSupplier,
Configuration conf,
MetricGroup coordinatorMetricGroup) {
this.rpcGatewayManager = new RpcGatewayManager<>(rpcClient, TabletServerGateway.class);
this.epochSupplier = epochSupplier;
this.conf = conf;
this.coordinatorMetricGroup = coordinatorMetricGroup;
}

public void startup(Collection<ServerNode> serverNodes) {
for (ServerNode serverNode : serverNodes) {
addTabletServer(serverNode);
addNewTabletServer(serverNode);
}
synchronized (channelLock) {
for (TabletServerChannelState state : channelStates.values()) {
startSendThread(state);
}
}
}

public void close() throws Exception {
rpcGatewayManager.close();
}

/** Adds a tablet server and immediately starts its sender thread (runtime addition). */
public void addTabletServer(ServerNode serverNode) {
// add new tablet server to the channel manager
addNewTabletServer(serverNode);
synchronized (channelLock) {
TabletServerChannelState state = channelStates.get(serverNode.id());
if (state != null) {
startSendThread(state);
}
}
}

private void addNewTabletServer(ServerNode serverNode) {
checkState(
serverNode.serverType().equals(ServerType.TABLET_SERVER),
"The server type should be TABLET_SERVER, but was " + serverNode.serverType());

rpcGatewayManager.addServer(serverNode);

int id = serverNode.id();
synchronized (channelLock) {
if (channelStates.containsKey(id)) {
return;
}
BlockingQueue<QueueItem> queue = new LinkedBlockingQueue<>();

MetricGroup tsGroup =
coordinatorMetricGroup.addGroup("tablet-server-id", String.valueOf(id));
tsGroup.gauge(MetricNames.SENDER_QUEUE_SIZE, queue::size);

ControlRequestSendThread thread =
new ControlRequestSendThread(
id,
queue,
() -> rpcGatewayManager.getRpcGateway(id),
epochSupplier,
conf,
tsGroup);
channelStates.put(
id, new TabletServerChannelState(id, serverNode, queue, thread, tsGroup));
}
}

private void startSendThread(TabletServerChannelState state) {
ControlRequestSendThread thread = state.getSendThread();
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}

public void removeTabletServer(Integer serverId) {
TabletServerChannelState state;
synchronized (channelLock) {
state = channelStates.remove(serverId);
}
teardownChannelState(serverId, state);

rpcGatewayManager
.removeServer(serverId)
.exceptionally(
Expand All @@ -97,6 +179,39 @@ public void removeTabletServer(Integer serverId) {
});
}

/** Shuts down all per-tablet-server sender threads and deregisters their metrics. */
public void shutdown() {
Map<Integer, TabletServerChannelState> statesToTeardown;
synchronized (channelLock) {
statesToTeardown = new HashMap<>(channelStates);
channelStates.clear();
}
for (Map.Entry<Integer, TabletServerChannelState> entry : statesToTeardown.entrySet()) {
teardownChannelState(entry.getKey(), entry.getValue());
}
}

private void teardownChannelState(int serverId, @Nullable TabletServerChannelState state) {
if (state == null) {
return;
}
try {
try {
state.getSendThread().shutdown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn(
"Interrupted while shutting down sender thread for tabletServer {}",
serverId,
e);
}
state.getQueue().clear();
state.getMetricGroup().close();
} catch (Throwable t) {
LOG.error("Error tearing down channel state for tabletServer {}", serverId, t);
}
}

/** Send NotifyLeaderAndIsr request to the server and handle the response. */
public void sendBucketLeaderAndIsrRequest(
int receiveServerId,
Expand All @@ -109,16 +224,46 @@ public void sendBucketLeaderAndIsrRequest(
responseConsumer);
}

/** Send StopBucketReplicaRequest to the server and handle the response. */
/**
* Enqueues a StopReplica request onto the per-tablet-server sender queue. The sender thread
* retries on network-level failures until the TS responds or is removed.
*/
public void sendStopBucketReplicaRequest(
int receiveServerId,
StopReplicaRequest stopReplicaRequest,
int coordinatorEpoch,
@Nullable Set<TableBucketReplica> deletionReplicas,
BiConsumer<StopReplicaResponse, ? super Throwable> responseConsumer) {
sendRequest(
receiveServerId,
stopReplicaRequest,
TabletServerGateway::stopReplica,
responseConsumer);
TabletServerChannelState state;
synchronized (channelLock) {
state = channelStates.get(receiveServerId);
}
if (state == null) {
LOG.warn(
"No channel state for tabletServer {}; dropping stopReplica (epoch={}). "
+ "Correctness for any deletion replicas is handled by "
+ "processDeadTabletServer.",
receiveServerId,
coordinatorEpoch);
return;
}
QueueItem item =
new QueueItem(
ApiKeys.STOP_REPLICA,
stopReplicaRequest,
responseConsumer,
coordinatorEpoch,
System.currentTimeMillis(),
deletionReplicas);
try {
state.getQueue().put(item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn(
"Interrupted while enqueueing stopReplica for tabletServer {}",
receiveServerId,
e);
}
}

/** Send UpdateMetadataRequest to the server and handle the response. */
Expand Down
Loading