Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2025 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server.internal.replication;

import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.metric.MoreMeters;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;

final class ReplicationMetrics {

private final String projectName;
private final Timer lockAcquireSuccessTimer;
private final Timer lockAcquireFailureTimer;
private final Timer commandExecutionTimer;
private final Timer logReplayTimer;
private final Timer logStoreTimer;

ReplicationMetrics(MeterRegistry registry, String projectName) {
this.projectName = projectName;
lockAcquireSuccessTimer = MoreMeters.newTimer(registry, "replication.lock.waiting",
ImmutableList.of(Tag.of("project", projectName),
Tag.of("acquired", "true")));
lockAcquireFailureTimer = MoreMeters.newTimer(registry, "replication.lock.waiting",
ImmutableList.of(Tag.of("project", projectName),
Tag.of("acquired", "false")));
commandExecutionTimer = MoreMeters.newTimer(registry, "replication.command.execution",
ImmutableList.of(Tag.of("project", projectName)));
logReplayTimer = MoreMeters.newTimer(registry, "replication.log.replay",
ImmutableList.of(Tag.of("project", projectName)));
logStoreTimer = MoreMeters.newTimer(registry, "replication.log.store",
ImmutableList.of(Tag.of("project", projectName)));
}

Timer lockAcquireSuccessTimer() {
return lockAcquireSuccessTimer;
}

Timer lockAcquireFailureTimer() {
return lockAcquireFailureTimer;
}

Timer commandExecutionTimer() {
return commandExecutionTimer;
}

Timer logReplayTimer() {
return logReplayTimer;
}

Timer logStoreTimer() {
return logStoreTimer;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ReplicationMetrics)) {
return false;
}
final ReplicationMetrics that = (ReplicationMetrics) o;
return projectName.equals(that.projectName);
}

@Override
public int hashCode() {
return projectName.hashCode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2025 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server.internal.replication;

import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import com.linecorp.armeria.common.util.TextFormatter;

final class ReplicationTimings {

@Nullable
private final ReplicationMetrics metrics;

private long lockAcquisitionStartNanos;
private long lockAcquisitionDurationNanos;
private boolean lockAcquired;
private long commandExecutionStartNanos;
private long commandExecutionDurationNanos;
private long logReplayStartNanos;
private long logReplayDurationNanos;
private long logStoreStartNanos;
private long logStoreDurationNanos;

ReplicationTimings(@Nullable ReplicationMetrics metrics) {
this.metrics = metrics;
}

void startLockAcquisition(long startNanos) {
lockAcquisitionStartNanos = startNanos;
}

void endLockAcquisition(boolean lockAcquired) {
lockAcquisitionDurationNanos = System.nanoTime() - lockAcquisitionStartNanos;
this.lockAcquired = lockAcquired;
}

void startCommandExecution() {
commandExecutionStartNanos = System.nanoTime();
}

void endCommandExecution() {
commandExecutionDurationNanos = System.nanoTime() - commandExecutionStartNanos;
}

void startLogReplay() {
logReplayStartNanos = System.nanoTime();
}

void endLogReplay() {
logReplayDurationNanos = System.nanoTime() - logReplayStartNanos;
}

void startLogStore() {
logStoreStartNanos = System.nanoTime();
}

void endLogStore() {
logStoreDurationNanos = System.nanoTime() - logStoreStartNanos;
}

void record() {
if (metrics == null) {
return;
}

if (lockAcquired) {
metrics.lockAcquireSuccessTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS);
} else {
metrics.lockAcquireFailureTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS);
}
metrics.commandExecutionTimer().record(commandExecutionDurationNanos, TimeUnit.NANOSECONDS);
metrics.logReplayTimer().record(logReplayDurationNanos, TimeUnit.NANOSECONDS);
metrics.logStoreTimer().record(logStoreDurationNanos, TimeUnit.NANOSECONDS);
}
Comment on lines +77 to +90
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid recording metrics for phases that never started.

If we fail before startCommandExecution()/startLogReplay()/startLogStore() runs (e.g., lock acquisition timeout), these timers still record(0, …), bumping the sample count and hiding that the phase never happened. Guard each record with *_StartNanos != 0 so only completed phases contribute measurements, leaving lock failures to be represented solely by the lock timer.

Apply this diff:

@@
-        if (lockAcquired) {
-            metrics.lockAcquireSuccessTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS);
-        } else {
-            metrics.lockAcquireFailureTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS);
-        }
-        metrics.commandExecutionTimer().record(commandExecutionDurationNanos, TimeUnit.NANOSECONDS);
-        metrics.logReplayTimer().record(logReplayDurationNanos, TimeUnit.NANOSECONDS);
-        metrics.logStoreTimer().record(logStoreDurationNanos, TimeUnit.NANOSECONDS);
+        if (lockAcquisitionStartNanos != 0) {
+            if (lockAcquired) {
+                metrics.lockAcquireSuccessTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS);
+            } else {
+                metrics.lockAcquireFailureTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS);
+            }
+        }
+        if (commandExecutionStartNanos != 0) {
+            metrics.commandExecutionTimer().record(commandExecutionDurationNanos, TimeUnit.NANOSECONDS);
+        }
+        if (logReplayStartNanos != 0) {
+            metrics.logReplayTimer().record(logReplayDurationNanos, TimeUnit.NANOSECONDS);
+        }
+        if (logStoreStartNanos != 0) {
+            metrics.logStoreTimer().record(logStoreDurationNanos, TimeUnit.NANOSECONDS);
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void record() {
if (metrics == null) {
return;
}
if (lockAcquired) {
metrics.lockAcquireSuccessTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS);
} else {
metrics.lockAcquireFailureTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS);
}
metrics.commandExecutionTimer().record(commandExecutionDurationNanos, TimeUnit.NANOSECONDS);
metrics.logReplayTimer().record(logReplayDurationNanos, TimeUnit.NANOSECONDS);
metrics.logStoreTimer().record(logStoreDurationNanos, TimeUnit.NANOSECONDS);
}
void record() {
if (metrics == null) {
return;
}
if (lockAcquisitionStartNanos != 0) {
if (lockAcquired) {
metrics.lockAcquireSuccessTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS);
} else {
metrics.lockAcquireFailureTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS);
}
}
if (commandExecutionStartNanos != 0) {
metrics.commandExecutionTimer().record(commandExecutionDurationNanos, TimeUnit.NANOSECONDS);
}
if (logReplayStartNanos != 0) {
metrics.logReplayTimer().record(logReplayDurationNanos, TimeUnit.NANOSECONDS);
}
if (logStoreStartNanos != 0) {
metrics.logStoreTimer().record(logStoreDurationNanos, TimeUnit.NANOSECONDS);
}
}
🤖 Prompt for AI Agents
In
server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ReplicationTimings.java
around lines 77 to 90, the record() method currently records zero-duration
metrics for phases that never started; update the method to only call each phase
timer.record(...) if that phase's start timestamp field (e.g.,
commandExecutionStartNanos, logReplayStartNanos, logStoreStartNanos) is non-zero
so that only completed phases contribute measurements; keep the existing lock
timer behavior (it should still record based on
lockAcquired/lockAcquisitionDurationNanos) and add simple guards like if
(commandExecutionStartNanos != 0) { ... } for each of the three phase timers.


String timingsString() {
final StringBuilder sb = new StringBuilder();
sb.append("{lockAcquisition=");
TextFormatter.appendElapsed(sb, lockAcquisitionDurationNanos);
sb.append(", commandExecution=");
TextFormatter.appendElapsed(sb, commandExecutionDurationNanos);
sb.append(", logReplay=");
TextFormatter.appendElapsed(sb, logReplayDurationNanos);
sb.append(", logStore=");
TextFormatter.appendElapsed(sb, logStoreDurationNanos);
sb.append('}');
return sb.toString();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -82,28 +82,24 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;

import com.linecorp.armeria.common.metric.MoreMeters;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.centraldogma.common.LockAcquireTimeoutException;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.server.ZooKeeperReplicationConfig;
import com.linecorp.centraldogma.server.ZooKeeperServerConfig;
import com.linecorp.centraldogma.server.command.AbstractCommandExecutor;
import com.linecorp.centraldogma.server.command.AbstractPushCommand;
import com.linecorp.centraldogma.server.command.Command;
import com.linecorp.centraldogma.server.command.CommandExecutor;
import com.linecorp.centraldogma.server.command.CommandType;
import com.linecorp.centraldogma.server.command.CommitResult;
import com.linecorp.centraldogma.server.command.ForcePushCommand;
import com.linecorp.centraldogma.server.command.NormalizableCommit;
import com.linecorp.centraldogma.server.command.TransformCommand;
import com.linecorp.centraldogma.server.command.RepositoryCommand;
import com.linecorp.centraldogma.server.command.UpdateServerStatusCommand;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.netty.util.concurrent.DefaultThreadFactory;

Expand Down Expand Up @@ -135,7 +131,7 @@ public final class ZooKeeperCommandExecutor
private static final RetryPolicy RETRY_POLICY_NEVER = (retryCount, elapsedTimeMs, sleeper) -> false;

private final ConcurrentMap<String, InterProcessMutex> mutexMap = new ConcurrentHashMap<>();
private final Map<ProjectNameAndAcquired, Timer> lockAcquiredTimers = new ConcurrentHashMap<>();
private final Map<String, ReplicationMetrics> replicationTimings = new ConcurrentHashMap<>();

private final ZooKeeperReplicationConfig cfg;
private final File revisionFile;
Expand Down Expand Up @@ -918,13 +914,14 @@ public void childEvent(CuratorFramework unused, PathChildrenCacheEvent event) th
oldLogRemover.touch();
}

private SafeCloseable safeLock(Command<?> command) {
private SafeCloseable safeLock(Command<?> command, ReplicationTimings timings) {
final long lockTimeoutNanos = this.lockTimeoutNanos;
final String executionPath = command.executionPath();
final InterProcessMutex mtx = mutexMap.computeIfAbsent(
executionPath, k -> new InterProcessMutex(curator, absolutePath(LOCK_PATH, k)));

final long startTime = System.nanoTime();
timings.startLockAcquisition(startTime);
boolean lockAcquired = false;
Throwable cause = null;
try {
Expand Down Expand Up @@ -958,13 +955,7 @@ private SafeCloseable safeLock(Command<?> command) {
cause = e;
}

if (command instanceof AbstractPushCommand) {
final String projectName = ((AbstractPushCommand<?>) command).projectName();
record(projectName, startTime, lockAcquired);
} else if (command instanceof TransformCommand) {
final String projectName = ((TransformCommand) command).projectName();
record(projectName, startTime, lockAcquired);
}
timings.endLockAcquisition(lockAcquired);

if (!lockAcquired) {
if (cause != null) {
Expand All @@ -982,15 +973,6 @@ private SafeCloseable safeLock(Command<?> command) {
return () -> safeRelease(mtx);
}

private void record(String projectName, long startTime, boolean lockAcquired) {
final Timer timer = lockAcquiredTimers.computeIfAbsent(
new ProjectNameAndAcquired(projectName, lockAcquired), key -> MoreMeters.newTimer(
meterRegistry, "zookeeper.lock.acquired",
ImmutableList.of(Tag.of("project", projectName),
Tag.of("acquired", String.valueOf(lockAcquired)))));
timer.record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}

private static void safeRelease(InterProcessMutex mtx) {
try {
mtx.release();
Expand Down Expand Up @@ -1150,6 +1132,18 @@ private static String pathFromRevision(long revision) {
return String.format("%010d", revision);
}

private <T> ReplicationTimings newReplicationTimings(Command<T> command) {
ReplicationMetrics metrics = null;
if (command instanceof RepositoryCommand) {
final RepositoryCommand<?> repoCommand = (RepositoryCommand<?>) command;
final String projectName = repoCommand.projectName();
metrics = replicationTimings.computeIfAbsent(projectName, key -> {
return new ReplicationMetrics(meterRegistry, key);
});
}
return new ReplicationTimings(metrics);
}

// Ensure that all logs are replayed, any other logs can not be added before end of this function.
@Override
protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception {
Expand All @@ -1163,48 +1157,71 @@ protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exceptio
}
}
executor.execute(() -> {
final ReplicationTimings timings = newReplicationTimings(command);
try {
future.complete(blockingExecute(command));
future.complete(blockingExecute(command, timings));
} catch (Throwable t) {
future.completeExceptionally(t);
} finally {
timings.record();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood that if failure occurs while replaying, certain metrics (e.g. commandExecutionTimer) will record 0 instead of not recording at all

logger.debug("Elapsed times for {}: {}", command, timings.timingsString());
}
});
return future;
}

private <T> T blockingExecute(Command<T> command) throws Exception {
private <T> T blockingExecute(Command<T> command, ReplicationTimings timings) throws Exception {
createParentNodes();

try (SafeCloseable ignored = safeLock(command)) {
try (SafeCloseable ignored = safeLock(command, timings)) {

// NB: We are sure no other replicas will append the conflicting logs (the commands with the
// same execution path) while we hold the lock for the command's execution path.
//
// Other replicas may still append the logs with different execution paths, because, by design,
// two commands never conflict with each other if they have different execution paths.

final List<String> recentRevisions = curator.getChildren().forPath(absolutePath(LOG_PATH));
if (!recentRevisions.isEmpty()) {
final long lastRevision = recentRevisions.stream().mapToLong(Long::parseLong).max().getAsLong();
replayLogs(lastRevision);
timings.startLogReplay();
try {
final List<String> recentRevisions = curator.getChildren().forPath(absolutePath(LOG_PATH));
if (!recentRevisions.isEmpty()) {
final long lastRevision = recentRevisions.stream().mapToLong(Long::parseLong).max()
.getAsLong();
replayLogs(lastRevision);
}
} finally {
timings.endLogReplay();
}

timings.startCommandExecution();
final T result;
try {
result = delegate.execute(command).get();
} finally {
timings.endCommandExecution();
}

final T result = delegate.execute(command).get();
timings.startLogStore();
final long revision;
final ReplicationLog<?> log;
final Command<?> maybeUnwrapped = unwrapForcePush(command);
if (maybeUnwrapped instanceof NormalizableCommit) {
final NormalizableCommit normalizingPushCommand = (NormalizableCommit) maybeUnwrapped;
assert result instanceof CommitResult : result;
final CommitResult commitResult = (CommitResult) result;
final Command<Revision> pushAsIsCommand = normalizingPushCommand.asIs(commitResult);
log = new ReplicationLog<>(replicaId(),
maybeWrap(command, pushAsIsCommand), commitResult.revision());
} else {
log = new ReplicationLog<>(replicaId(), command, result);
}
try {
final Command<?> maybeUnwrapped = unwrapForcePush(command);
if (maybeUnwrapped instanceof NormalizableCommit) {
final NormalizableCommit normalizingPushCommand = (NormalizableCommit) maybeUnwrapped;
assert result instanceof CommitResult : result;
final CommitResult commitResult = (CommitResult) result;
final Command<Revision> pushAsIsCommand = normalizingPushCommand.asIs(commitResult);
log = new ReplicationLog<>(replicaId(),
maybeWrap(command, pushAsIsCommand), commitResult.revision());
} else {
log = new ReplicationLog<>(replicaId(), command, result);
}

// Store the command execution log to ZooKeeper.
final long revision = storeLog(log);
// Store the command execution log to ZooKeeper.
revision = storeLog(log);
} finally {
timings.endLogStore();
}

// Update the ServerStatus to the CommandExecutor after the log is stored.
if (command.type() == CommandType.UPDATE_SERVER_STATUS) {
Expand Down
Loading