diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/DefaultReplicationTimings.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/DefaultReplicationTimings.java new file mode 100644 index 000000000..4e9f3cf8a --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/DefaultReplicationTimings.java @@ -0,0 +1,164 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.server.internal.replication; + +import java.util.concurrent.TimeUnit; + +import com.linecorp.armeria.common.util.TextFormatter; + +final class DefaultReplicationTimings implements ReplicationTimings { + + private final ReplicationMetrics metrics; + + private long executorSubmitStartNanos; + private long executorQueueLatencyNanos; + + private long lockAcquisitionStartNanos; + private long lockAcquisitionDurationNanos; + private boolean lockAcquired; + + private long lockReleaseStartNanos; + private long lockReleaseDurationNanos; + + private long commandExecutionStartNanos; + private long commandExecutionDurationNanos; + private boolean commandExecutionEnded; + + private long logReplayStartNanos; + private long logReplayDurationNanos; + private boolean logReplayEnded; + + private long logStoreStartNanos; + private long logStoreDurationNanos; + private boolean logStoreEnded; + private long logStoreEndNanos; + + DefaultReplicationTimings(ReplicationMetrics metrics) { + this.metrics = metrics; + } + + @Override + public void startExecutorSubmit() { + executorSubmitStartNanos = System.nanoTime(); + } + + @Override + public void startExecutorExecution() { + executorQueueLatencyNanos = System.nanoTime() - executorSubmitStartNanos; + } + + @Override + public void startLockAcquisition(long startNanos) { + lockAcquisitionStartNanos = startNanos; + } + + @Override + public void endLockAcquisition(boolean lockAcquired) { + lockAcquisitionDurationNanos = System.nanoTime() - lockAcquisitionStartNanos; + this.lockAcquired = lockAcquired; + } + + @Override + public void startLockRelease() { + lockReleaseStartNanos = System.nanoTime(); + } + + @Override + public void endLockRelease() { + lockReleaseDurationNanos = System.nanoTime() - lockReleaseStartNanos; + } + + @Override + public void startCommandExecution() { + commandExecutionStartNanos = System.nanoTime(); + } + + @Override + public void endCommandExecution() { + commandExecutionDurationNanos = System.nanoTime() - commandExecutionStartNanos; + commandExecutionEnded = true; + } + + @Override + public void startLogReplay() { + logReplayStartNanos = System.nanoTime(); + } + + @Override + public void endLogReplay() { + logReplayDurationNanos = System.nanoTime() - logReplayStartNanos; + logReplayEnded = true; + } + + @Override + public void startLogStore() { + logStoreStartNanos = System.nanoTime(); + } + + @Override + public void endLogStore() { + logStoreEndNanos = System.nanoTime(); + logStoreDurationNanos = logStoreEndNanos - logStoreStartNanos; + logStoreEnded = true; + } + + @Override + public void record() { + metrics.executorQueueLatencyTimer().record(executorQueueLatencyNanos, TimeUnit.NANOSECONDS); + if (lockAcquired) { + metrics.lockAcquireSuccessTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS); + } else { + metrics.lockAcquireFailureTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS); + } + metrics.lockReleaseTimer().record(lockReleaseDurationNanos, TimeUnit.NANOSECONDS); + if (commandExecutionEnded) { + metrics.commandExecutionTimer().record(commandExecutionDurationNanos, TimeUnit.NANOSECONDS); + } + if (logReplayEnded) { + metrics.logReplayTimer().record(logReplayDurationNanos, TimeUnit.NANOSECONDS); + } + if (logStoreEnded) { + metrics.logStoreTimer().record(logStoreDurationNanos, TimeUnit.NANOSECONDS); + } + } + + @Override + public String toText() { + if (!logStoreEnded) { + return "{ not completed yet }"; + } + + final StringBuilder sb = new StringBuilder(); + sb.append("{total="); + TextFormatter.appendElapsed(sb, logStoreEndNanos - executorSubmitStartNanos); + sb.append(", executorQueueLatency="); + TextFormatter.appendElapsed(sb, executorQueueLatencyNanos); + sb.append(", lockAcquisition="); + TextFormatter.appendElapsed(sb, lockAcquisitionDurationNanos); + sb.append(", lockRelease="); + TextFormatter.appendElapsed(sb, lockReleaseDurationNanos); + sb.append(", commandExecution="); + TextFormatter.appendElapsed(sb, commandExecutionDurationNanos); + sb.append(", logReplay="); + TextFormatter.appendElapsed(sb, logReplayDurationNanos); + sb.append(", logStore="); + TextFormatter.appendElapsed(sb, logStoreDurationNanos); + sb.append('}'); + return sb.toString(); + } +} + diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/NoopReplicationTimings.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/NoopReplicationTimings.java new file mode 100644 index 000000000..a1cc1efd6 --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/NoopReplicationTimings.java @@ -0,0 +1,65 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.server.internal.replication; + +enum NoopReplicationTimings implements ReplicationTimings { + INSTANCE; + + @Override + public void startExecutorSubmit() {} + + @Override + public void startExecutorExecution() {} + + @Override + public void startLockAcquisition(long startNanos) {} + + @Override + public void endLockAcquisition(boolean lockAcquired) {} + + @Override + public void startLockRelease() {} + + @Override + public void endLockRelease() {} + + @Override + public void startCommandExecution() {} + + @Override + public void endCommandExecution() {} + + @Override + public void startLogReplay() {} + + @Override + public void endLogReplay() {} + + @Override + public void startLogStore() {} + + @Override + public void endLogStore() {} + + @Override + public void record() {} + + @Override + public String toText() { + return "{ no timings recorded }"; + } +} diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ReplicationMetrics.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ReplicationMetrics.java new file mode 100644 index 000000000..068ffada7 --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ReplicationMetrics.java @@ -0,0 +1,99 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.server.internal.replication; + +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.common.metric.MoreMeters; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; + +final class ReplicationMetrics { + + private final String projectName; + private final Timer executorQueueLatencyTimer; + private final Timer lockAcquireSuccessTimer; + private final Timer lockAcquireFailureTimer; + private final Timer lockReleaseTimer; + private final Timer commandExecutionTimer; + private final Timer logReplayTimer; + private final Timer logStoreTimer; + + ReplicationMetrics(MeterRegistry registry, String projectName) { + this.projectName = projectName; + executorQueueLatencyTimer = MoreMeters.newTimer(registry, "replication.executor.queue.latency", + ImmutableList.of(Tag.of("project", projectName))); + lockAcquireSuccessTimer = MoreMeters.newTimer(registry, "replication.lock.acquisition", + ImmutableList.of(Tag.of("project", projectName), + Tag.of("acquired", "true"))); + lockAcquireFailureTimer = MoreMeters.newTimer(registry, "replication.lock.acquisition", + ImmutableList.of(Tag.of("project", projectName), + Tag.of("acquired", "false"))); + lockReleaseTimer = MoreMeters.newTimer(registry, "replication.lock.release", + ImmutableList.of(Tag.of("project", projectName))); + commandExecutionTimer = MoreMeters.newTimer(registry, "replication.command.execution", + ImmutableList.of(Tag.of("project", projectName))); + logReplayTimer = MoreMeters.newTimer(registry, "replication.log.replay", + ImmutableList.of(Tag.of("project", projectName))); + logStoreTimer = MoreMeters.newTimer(registry, "replication.log.store", + ImmutableList.of(Tag.of("project", projectName))); + } + + Timer executorQueueLatencyTimer() { + return executorQueueLatencyTimer; + } + + Timer lockAcquireSuccessTimer() { + return lockAcquireSuccessTimer; + } + + Timer lockAcquireFailureTimer() { + return lockAcquireFailureTimer; + } + + Timer lockReleaseTimer() { + return lockReleaseTimer; + } + + Timer commandExecutionTimer() { + return commandExecutionTimer; + } + + Timer logReplayTimer() { + return logReplayTimer; + } + + Timer logStoreTimer() { + return logStoreTimer; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ReplicationMetrics)) { + return false; + } + final ReplicationMetrics that = (ReplicationMetrics) o; + return projectName.equals(that.projectName); + } + + @Override + public int hashCode() { + return projectName.hashCode(); + } +} diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ReplicationTimings.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ReplicationTimings.java new file mode 100644 index 000000000..4e566e347 --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ReplicationTimings.java @@ -0,0 +1,57 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.server.internal.replication; + +import javax.annotation.Nullable; + +interface ReplicationTimings { + + static ReplicationTimings of(@Nullable ReplicationMetrics metrics) { + if (metrics == null) { + return NoopReplicationTimings.INSTANCE; + } + return new DefaultReplicationTimings(metrics); + } + + void startExecutorSubmit(); + + void startExecutorExecution(); + + void startLockAcquisition(long startNanos); + + void endLockAcquisition(boolean lockAcquired); + + void startLockRelease(); + + void endLockRelease(); + + void startCommandExecution(); + + void endCommandExecution(); + + void startLogReplay(); + + void endLogReplay(); + + void startLogStore(); + + void endLogStore(); + + void record(); + + String toText(); +} diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java index d1934197e..82d356ce0 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java @@ -79,7 +79,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; -import com.linecorp.armeria.common.metric.MoreMeters; import com.linecorp.armeria.common.util.SafeCloseable; import com.linecorp.centraldogma.common.LockAcquireTimeoutException; import com.linecorp.centraldogma.common.Revision; @@ -87,20 +86,17 @@ import com.linecorp.centraldogma.server.ZooKeeperReplicationConfig; import com.linecorp.centraldogma.server.ZooKeeperServerConfig; import com.linecorp.centraldogma.server.command.AbstractCommandExecutor; -import com.linecorp.centraldogma.server.command.AbstractPushCommand; import com.linecorp.centraldogma.server.command.Command; import com.linecorp.centraldogma.server.command.CommandExecutor; import com.linecorp.centraldogma.server.command.CommandType; import com.linecorp.centraldogma.server.command.CommitResult; import com.linecorp.centraldogma.server.command.ForcePushCommand; import com.linecorp.centraldogma.server.command.NormalizableCommit; -import com.linecorp.centraldogma.server.command.TransformCommand; +import com.linecorp.centraldogma.server.command.RepositoryCommand; import com.linecorp.centraldogma.server.command.UpdateServerStatusCommand; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; import io.netty.util.concurrent.DefaultThreadFactory; @@ -132,7 +128,7 @@ public final class ZooKeeperCommandExecutor private static final RetryPolicy RETRY_POLICY_NEVER = (retryCount, elapsedTimeMs, sleeper) -> false; private final ConcurrentMap mutexMap = new ConcurrentHashMap<>(); - private final Map lockAcquiredTimers = new ConcurrentHashMap<>(); + private final Map replicationTimings = new ConcurrentHashMap<>(); private final ZooKeeperReplicationConfig cfg; private final File revisionFile; @@ -913,13 +909,14 @@ public void childEvent(CuratorFramework unused, PathChildrenCacheEvent event) th oldLogRemover.touch(); } - private SafeCloseable safeLock(Command command) { + private SafeCloseable safeLock(Command command, ReplicationTimings timings) { + final long startTime = System.nanoTime(); + timings.startLockAcquisition(startTime); final long lockTimeoutNanos = this.lockTimeoutNanos; final String executionPath = command.executionPath(); final InterProcessMutex mtx = mutexMap.computeIfAbsent( executionPath, k -> new InterProcessMutex(curator, absolutePath(LOCK_PATH, k))); - final long startTime = System.nanoTime(); boolean lockAcquired = false; Throwable cause = null; try { @@ -953,13 +950,7 @@ private SafeCloseable safeLock(Command command) { cause = e; } - if (command instanceof AbstractPushCommand) { - final String projectName = ((AbstractPushCommand) command).projectName(); - record(projectName, startTime, lockAcquired); - } else if (command instanceof TransformCommand) { - final String projectName = ((TransformCommand) command).projectName(); - record(projectName, startTime, lockAcquired); - } + timings.endLockAcquisition(lockAcquired); if (!lockAcquired) { if (cause != null) { @@ -974,23 +965,17 @@ private SafeCloseable safeLock(Command command) { } } - return () -> safeRelease(mtx); - } - - private void record(String projectName, long startTime, boolean lockAcquired) { - final Timer timer = lockAcquiredTimers.computeIfAbsent( - new ProjectNameAndAcquired(projectName, lockAcquired), key -> MoreMeters.newTimer( - meterRegistry, "zookeeper.lock.acquired", - ImmutableList.of(Tag.of("project", projectName), - Tag.of("acquired", String.valueOf(lockAcquired))))); - timer.record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + return () -> safeRelease(mtx, timings); } - private static void safeRelease(InterProcessMutex mtx) { + private static void safeRelease(InterProcessMutex mtx, ReplicationTimings timings) { try { + timings.startLockRelease(); mtx.release(); } catch (Exception ignored) { // Ignore. + } finally { + timings.endLockRelease(); } } @@ -1097,6 +1082,18 @@ private static String pathFromRevision(long revision) { return String.format("%010d", revision); } + private ReplicationTimings newReplicationTimings(Command command) { + ReplicationMetrics metrics = null; + if (command instanceof RepositoryCommand) { + final RepositoryCommand repoCommand = (RepositoryCommand) command; + final String projectName = repoCommand.projectName(); + metrics = replicationTimings.computeIfAbsent(projectName, key -> { + return new ReplicationMetrics(meterRegistry, key); + }); + } + return ReplicationTimings.of(metrics); + } + // Ensure that all logs are replayed, any other logs can not be added before end of this function. @Override protected CompletableFuture doExecute(Command command) throws Exception { @@ -1109,20 +1106,28 @@ protected CompletableFuture doExecute(Command command) throws Exceptio executor = ForkJoinPool.commonPool(); } } + final ReplicationTimings timings = newReplicationTimings(command); + timings.startExecutorSubmit(); executor.execute(() -> { try { - future.complete(blockingExecute(command)); + timings.startExecutorExecution(); + future.complete(blockingExecute(command, timings)); } catch (Throwable t) { future.completeExceptionally(t); + } finally { + timings.record(); + if (logger.isDebugEnabled() && timings != NoopReplicationTimings.INSTANCE) { + logger.debug("Elapsed times for {}: {}", command, timings.toText()); + } } }); return future; } - private T blockingExecute(Command command) throws Exception { + private T blockingExecute(Command command, ReplicationTimings timings) throws Exception { createParentNodes(); - try (SafeCloseable ignored = safeLock(command)) { + try (SafeCloseable ignored = safeLock(command, timings)) { // NB: We are sure no other replicas will append the conflicting logs (the commands with the // same execution path) while we hold the lock for the command's execution path. @@ -1130,28 +1135,47 @@ private T blockingExecute(Command command) throws Exception { // Other replicas may still append the logs with different execution paths, because, by design, // two commands never conflict with each other if they have different execution paths. - final List recentRevisions = curator.getChildren().forPath(absolutePath(LOG_PATH)); - if (!recentRevisions.isEmpty()) { - final long lastRevision = recentRevisions.stream().mapToLong(Long::parseLong).max().getAsLong(); - replayLogs(lastRevision); + timings.startLogReplay(); + try { + final List recentRevisions = curator.getChildren().forPath(absolutePath(LOG_PATH)); + if (!recentRevisions.isEmpty()) { + final long lastRevision = recentRevisions.stream().mapToLong(Long::parseLong).max() + .getAsLong(); + replayLogs(lastRevision); + } + } finally { + timings.endLogReplay(); } - final T result = delegate.execute(command).get(); - final ReplicationLog log; - final Command maybeUnwrapped = unwrapForcePush(command); - if (maybeUnwrapped instanceof NormalizableCommit) { - final NormalizableCommit normalizingPushCommand = (NormalizableCommit) maybeUnwrapped; - assert result instanceof CommitResult : result; - final CommitResult commitResult = (CommitResult) result; - final Command pushAsIsCommand = normalizingPushCommand.asIs(commitResult); - log = new ReplicationLog<>(replicaId(), - maybeWrap(command, pushAsIsCommand), commitResult.revision()); - } else { - log = new ReplicationLog<>(replicaId(), command, result); + timings.startCommandExecution(); + final T result; + try { + result = delegate.execute(command).get(); + } finally { + timings.endCommandExecution(); } - // Store the command execution log to ZooKeeper. - final long revision = storeLog(log); + timings.startLogStore(); + final long revision; + final ReplicationLog log; + try { + final Command maybeUnwrapped = unwrapForcePush(command); + if (maybeUnwrapped instanceof NormalizableCommit) { + final NormalizableCommit normalizingPushCommand = (NormalizableCommit) maybeUnwrapped; + assert result instanceof CommitResult : result; + final CommitResult commitResult = (CommitResult) result; + final Command pushAsIsCommand = normalizingPushCommand.asIs(commitResult); + log = new ReplicationLog<>(replicaId(), + maybeWrap(command, pushAsIsCommand), commitResult.revision()); + } else { + log = new ReplicationLog<>(replicaId(), command, result); + } + + // Store the command execution log to ZooKeeper. + revision = storeLog(log); + } finally { + timings.endLogStore(); + } // Update the ServerStatus to the CommandExecutor after the log is stored. if (command.type() == CommandType.UPDATE_SERVER_STATUS) { @@ -1205,32 +1229,4 @@ private void createZkPathIfMissing(String zkPath) throws Exception { public void setLockTimeoutMillis(long lockTimeoutMillis) { lockTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(lockTimeoutMillis); } - - private static class ProjectNameAndAcquired { - private final String projectName; - private final boolean lockAcquired; - - ProjectNameAndAcquired(String projectName, boolean lockAcquired) { - this.projectName = projectName; - this.lockAcquired = lockAcquired; - } - - @Override - public int hashCode() { - return Objects.hash(projectName, lockAcquired); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof ProjectNameAndAcquired)) { - return false; - } - final ProjectNameAndAcquired that = (ProjectNameAndAcquired) obj; - return lockAcquired == that.lockAcquired && - projectName.equals(that.projectName); - } - } } diff --git a/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/ReplicationTimingsTest.java b/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/ReplicationTimingsTest.java new file mode 100644 index 000000000..8e62c26ae --- /dev/null +++ b/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/ReplicationTimingsTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.server.internal.replication; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.linecorp.armeria.common.metric.MoreMeters; +import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.client.CentralDogmaRepository; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.server.CentralDogmaBuilder; +import com.linecorp.centraldogma.testing.internal.CentralDogmaReplicationExtension; + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + +class ReplicationTimingsTest { + + private static final Map meterRegistryMap = new HashMap<>(); + + @RegisterExtension + static CentralDogmaReplicationExtension replica = new CentralDogmaReplicationExtension(3) { + @Override + protected void configureEach(int serverId, CentralDogmaBuilder builder) { + final SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); + meterRegistryMap.put(serverId, meterRegistry); + builder.meterRegistry(meterRegistry); + } + }; + + @Test + void pushAndRecordMetrics() { + final CentralDogma client0 = replica.servers().get(0).client(); + client0.createProject("fooProject").join(); + final CentralDogmaRepository repo = client0.createRepository("fooProject", "barRepo").join(); + repo.commit("Test", Change.ofTextUpsert("/hello.txt", "Hello, World!")) + .push().join(); + + final SimpleMeterRegistry meterRegistry0 = meterRegistryMap.get(1); + await().untilAsserted(() -> { + final Map metrics = MoreMeters.measureAll(meterRegistry0); + assertMetricExists(metrics, "replication.executor.queue.latency.percentile#value"); + assertMetricExists(metrics, "replication.lock.acquisition.percentile#value{acquired="); + assertMetricExists(metrics, "replication.lock.release.percentile#value"); + assertMetricExists(metrics, "replication.command.execution.percentile#value"); + assertMetricExists(metrics, "replication.log.replay.percentile#value"); + assertMetricExists(metrics, "replication.log.store.percentile#value"); + }); + } + + private static void assertMetricExists(Map metrics, String metricName) { + assertThat(metrics).anySatisfy((name, value) -> { + assertThat(name).startsWith(metricName); + }); + } +}