-
Notifications
You must be signed in to change notification settings - Fork 129
Record ZooKeeperCommandExecutor timings #1191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ikhoon
wants to merge
1
commit into
line:main
Choose a base branch
from
ikhoon:zookeeper-command-executor-metrics
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
85 changes: 85 additions & 0 deletions
85
...c/main/java/com/linecorp/centraldogma/server/internal/replication/ReplicationMetrics.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| /* | ||
| * Copyright 2025 LY Corporation | ||
| * | ||
| * LY Corporation licenses this file to you under the Apache License, | ||
| * version 2.0 (the "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at: | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
| * License for the specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package com.linecorp.centraldogma.server.internal.replication; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
|
|
||
| import com.linecorp.armeria.common.metric.MoreMeters; | ||
|
|
||
| import io.micrometer.core.instrument.MeterRegistry; | ||
| import io.micrometer.core.instrument.Tag; | ||
| import io.micrometer.core.instrument.Timer; | ||
|
|
||
| final class ReplicationMetrics { | ||
|
|
||
| private final String projectName; | ||
| private final Timer lockAcquireSuccessTimer; | ||
| private final Timer lockAcquireFailureTimer; | ||
| private final Timer commandExecutionTimer; | ||
| private final Timer logReplayTimer; | ||
| private final Timer logStoreTimer; | ||
|
|
||
| ReplicationMetrics(MeterRegistry registry, String projectName) { | ||
| this.projectName = projectName; | ||
| lockAcquireSuccessTimer = MoreMeters.newTimer(registry, "replication.lock.waiting", | ||
| ImmutableList.of(Tag.of("project", projectName), | ||
| Tag.of("acquired", "true"))); | ||
| lockAcquireFailureTimer = MoreMeters.newTimer(registry, "replication.lock.waiting", | ||
| ImmutableList.of(Tag.of("project", projectName), | ||
| Tag.of("acquired", "false"))); | ||
| commandExecutionTimer = MoreMeters.newTimer(registry, "replication.command.execution", | ||
| ImmutableList.of(Tag.of("project", projectName))); | ||
| logReplayTimer = MoreMeters.newTimer(registry, "replication.log.replay", | ||
| ImmutableList.of(Tag.of("project", projectName))); | ||
| logStoreTimer = MoreMeters.newTimer(registry, "replication.log.store", | ||
| ImmutableList.of(Tag.of("project", projectName))); | ||
| } | ||
|
|
||
| Timer lockAcquireSuccessTimer() { | ||
| return lockAcquireSuccessTimer; | ||
| } | ||
|
|
||
| Timer lockAcquireFailureTimer() { | ||
| return lockAcquireFailureTimer; | ||
| } | ||
|
|
||
| Timer commandExecutionTimer() { | ||
| return commandExecutionTimer; | ||
| } | ||
|
|
||
| Timer logReplayTimer() { | ||
| return logReplayTimer; | ||
| } | ||
|
|
||
| Timer logStoreTimer() { | ||
| return logStoreTimer; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (!(o instanceof ReplicationMetrics)) { | ||
| return false; | ||
| } | ||
| final ReplicationMetrics that = (ReplicationMetrics) o; | ||
| return projectName.equals(that.projectName); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return projectName.hashCode(); | ||
| } | ||
| } |
106 changes: 106 additions & 0 deletions
106
...c/main/java/com/linecorp/centraldogma/server/internal/replication/ReplicationTimings.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| /* | ||
| * Copyright 2025 LY Corporation | ||
| * | ||
| * LY Corporation licenses this file to you under the Apache License, | ||
| * version 2.0 (the "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at: | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
| * License for the specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package com.linecorp.centraldogma.server.internal.replication; | ||
|
|
||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| import com.linecorp.armeria.common.util.TextFormatter; | ||
|
|
||
| final class ReplicationTimings { | ||
|
|
||
| @Nullable | ||
| private final ReplicationMetrics metrics; | ||
|
|
||
| private long lockAcquisitionStartNanos; | ||
| private long lockAcquisitionDurationNanos; | ||
| private boolean lockAcquired; | ||
| private long commandExecutionStartNanos; | ||
| private long commandExecutionDurationNanos; | ||
| private long logReplayStartNanos; | ||
| private long logReplayDurationNanos; | ||
| private long logStoreStartNanos; | ||
| private long logStoreDurationNanos; | ||
|
|
||
| ReplicationTimings(@Nullable ReplicationMetrics metrics) { | ||
| this.metrics = metrics; | ||
| } | ||
|
|
||
| void startLockAcquisition(long startNanos) { | ||
| lockAcquisitionStartNanos = startNanos; | ||
| } | ||
|
|
||
| void endLockAcquisition(boolean lockAcquired) { | ||
| lockAcquisitionDurationNanos = System.nanoTime() - lockAcquisitionStartNanos; | ||
| this.lockAcquired = lockAcquired; | ||
| } | ||
|
|
||
| void startCommandExecution() { | ||
| commandExecutionStartNanos = System.nanoTime(); | ||
| } | ||
|
|
||
| void endCommandExecution() { | ||
| commandExecutionDurationNanos = System.nanoTime() - commandExecutionStartNanos; | ||
| } | ||
|
|
||
| void startLogReplay() { | ||
| logReplayStartNanos = System.nanoTime(); | ||
| } | ||
|
|
||
| void endLogReplay() { | ||
| logReplayDurationNanos = System.nanoTime() - logReplayStartNanos; | ||
| } | ||
|
|
||
| void startLogStore() { | ||
| logStoreStartNanos = System.nanoTime(); | ||
| } | ||
|
|
||
| void endLogStore() { | ||
| logStoreDurationNanos = System.nanoTime() - logStoreStartNanos; | ||
| } | ||
|
|
||
| void record() { | ||
| if (metrics == null) { | ||
| return; | ||
| } | ||
|
|
||
| if (lockAcquired) { | ||
| metrics.lockAcquireSuccessTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS); | ||
| } else { | ||
| metrics.lockAcquireFailureTimer().record(lockAcquisitionDurationNanos, TimeUnit.NANOSECONDS); | ||
| } | ||
| metrics.commandExecutionTimer().record(commandExecutionDurationNanos, TimeUnit.NANOSECONDS); | ||
| metrics.logReplayTimer().record(logReplayDurationNanos, TimeUnit.NANOSECONDS); | ||
| metrics.logStoreTimer().record(logStoreDurationNanos, TimeUnit.NANOSECONDS); | ||
| } | ||
|
|
||
| String timingsString() { | ||
| final StringBuilder sb = new StringBuilder(); | ||
| sb.append("{lockAcquisition="); | ||
| TextFormatter.appendElapsed(sb, lockAcquisitionDurationNanos); | ||
| sb.append(", commandExecution="); | ||
| TextFormatter.appendElapsed(sb, commandExecutionDurationNanos); | ||
| sb.append(", logReplay="); | ||
| TextFormatter.appendElapsed(sb, logReplayDurationNanos); | ||
| sb.append(", logStore="); | ||
| TextFormatter.appendElapsed(sb, logStoreDurationNanos); | ||
| sb.append('}'); | ||
| return sb.toString(); | ||
| } | ||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,28 +82,24 @@ | |
| import com.google.common.util.concurrent.MoreExecutors; | ||
| import com.google.common.util.concurrent.Uninterruptibles; | ||
|
|
||
| import com.linecorp.armeria.common.metric.MoreMeters; | ||
| import com.linecorp.armeria.common.util.SafeCloseable; | ||
| import com.linecorp.centraldogma.common.LockAcquireTimeoutException; | ||
| import com.linecorp.centraldogma.common.Revision; | ||
| import com.linecorp.centraldogma.internal.Jackson; | ||
| import com.linecorp.centraldogma.server.ZooKeeperReplicationConfig; | ||
| import com.linecorp.centraldogma.server.ZooKeeperServerConfig; | ||
| import com.linecorp.centraldogma.server.command.AbstractCommandExecutor; | ||
| import com.linecorp.centraldogma.server.command.AbstractPushCommand; | ||
| import com.linecorp.centraldogma.server.command.Command; | ||
| import com.linecorp.centraldogma.server.command.CommandExecutor; | ||
| import com.linecorp.centraldogma.server.command.CommandType; | ||
| import com.linecorp.centraldogma.server.command.CommitResult; | ||
| import com.linecorp.centraldogma.server.command.ForcePushCommand; | ||
| import com.linecorp.centraldogma.server.command.NormalizableCommit; | ||
| import com.linecorp.centraldogma.server.command.TransformCommand; | ||
| import com.linecorp.centraldogma.server.command.RepositoryCommand; | ||
| import com.linecorp.centraldogma.server.command.UpdateServerStatusCommand; | ||
|
|
||
| import io.micrometer.core.instrument.Gauge; | ||
| import io.micrometer.core.instrument.MeterRegistry; | ||
| import io.micrometer.core.instrument.Tag; | ||
| import io.micrometer.core.instrument.Timer; | ||
| import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; | ||
| import io.netty.util.concurrent.DefaultThreadFactory; | ||
|
|
||
|
|
@@ -135,7 +131,7 @@ public final class ZooKeeperCommandExecutor | |
| private static final RetryPolicy RETRY_POLICY_NEVER = (retryCount, elapsedTimeMs, sleeper) -> false; | ||
|
|
||
| private final ConcurrentMap<String, InterProcessMutex> mutexMap = new ConcurrentHashMap<>(); | ||
| private final Map<ProjectNameAndAcquired, Timer> lockAcquiredTimers = new ConcurrentHashMap<>(); | ||
| private final Map<String, ReplicationMetrics> replicationTimings = new ConcurrentHashMap<>(); | ||
|
|
||
| private final ZooKeeperReplicationConfig cfg; | ||
| private final File revisionFile; | ||
|
|
@@ -918,13 +914,14 @@ public void childEvent(CuratorFramework unused, PathChildrenCacheEvent event) th | |
| oldLogRemover.touch(); | ||
| } | ||
|
|
||
| private SafeCloseable safeLock(Command<?> command) { | ||
| private SafeCloseable safeLock(Command<?> command, ReplicationTimings timings) { | ||
| final long lockTimeoutNanos = this.lockTimeoutNanos; | ||
| final String executionPath = command.executionPath(); | ||
| final InterProcessMutex mtx = mutexMap.computeIfAbsent( | ||
| executionPath, k -> new InterProcessMutex(curator, absolutePath(LOCK_PATH, k))); | ||
|
|
||
| final long startTime = System.nanoTime(); | ||
| timings.startLockAcquisition(startTime); | ||
| boolean lockAcquired = false; | ||
| Throwable cause = null; | ||
| try { | ||
|
|
@@ -958,13 +955,7 @@ private SafeCloseable safeLock(Command<?> command) { | |
| cause = e; | ||
| } | ||
|
|
||
| if (command instanceof AbstractPushCommand) { | ||
| final String projectName = ((AbstractPushCommand<?>) command).projectName(); | ||
| record(projectName, startTime, lockAcquired); | ||
| } else if (command instanceof TransformCommand) { | ||
| final String projectName = ((TransformCommand) command).projectName(); | ||
| record(projectName, startTime, lockAcquired); | ||
| } | ||
| timings.endLockAcquisition(lockAcquired); | ||
|
|
||
| if (!lockAcquired) { | ||
| if (cause != null) { | ||
|
|
@@ -982,15 +973,6 @@ private SafeCloseable safeLock(Command<?> command) { | |
| return () -> safeRelease(mtx); | ||
| } | ||
|
|
||
| private void record(String projectName, long startTime, boolean lockAcquired) { | ||
| final Timer timer = lockAcquiredTimers.computeIfAbsent( | ||
| new ProjectNameAndAcquired(projectName, lockAcquired), key -> MoreMeters.newTimer( | ||
| meterRegistry, "zookeeper.lock.acquired", | ||
| ImmutableList.of(Tag.of("project", projectName), | ||
| Tag.of("acquired", String.valueOf(lockAcquired))))); | ||
| timer.record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); | ||
| } | ||
|
|
||
| private static void safeRelease(InterProcessMutex mtx) { | ||
| try { | ||
| mtx.release(); | ||
|
|
@@ -1150,6 +1132,18 @@ private static String pathFromRevision(long revision) { | |
| return String.format("%010d", revision); | ||
| } | ||
|
|
||
| private <T> ReplicationTimings newReplicationTimings(Command<T> command) { | ||
| ReplicationMetrics metrics = null; | ||
| if (command instanceof RepositoryCommand) { | ||
| final RepositoryCommand<?> repoCommand = (RepositoryCommand<?>) command; | ||
| final String projectName = repoCommand.projectName(); | ||
| metrics = replicationTimings.computeIfAbsent(projectName, key -> { | ||
| return new ReplicationMetrics(meterRegistry, key); | ||
| }); | ||
| } | ||
| return new ReplicationTimings(metrics); | ||
| } | ||
|
|
||
| // Ensure that all logs are replayed, any other logs can not be added before end of this function. | ||
| @Override | ||
| protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception { | ||
|
|
@@ -1163,48 +1157,71 @@ protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exceptio | |
| } | ||
| } | ||
| executor.execute(() -> { | ||
| final ReplicationTimings timings = newReplicationTimings(command); | ||
| try { | ||
| future.complete(blockingExecute(command)); | ||
| future.complete(blockingExecute(command, timings)); | ||
| } catch (Throwable t) { | ||
| future.completeExceptionally(t); | ||
| } finally { | ||
| timings.record(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Understood that if failure occurs while replaying, certain metrics (e.g. |
||
| logger.debug("Elapsed times for {}: {}", command, timings.timingsString()); | ||
| } | ||
| }); | ||
| return future; | ||
| } | ||
|
|
||
| private <T> T blockingExecute(Command<T> command) throws Exception { | ||
| private <T> T blockingExecute(Command<T> command, ReplicationTimings timings) throws Exception { | ||
| createParentNodes(); | ||
|
|
||
| try (SafeCloseable ignored = safeLock(command)) { | ||
| try (SafeCloseable ignored = safeLock(command, timings)) { | ||
|
|
||
| // NB: We are sure no other replicas will append the conflicting logs (the commands with the | ||
| // same execution path) while we hold the lock for the command's execution path. | ||
| // | ||
| // Other replicas may still append the logs with different execution paths, because, by design, | ||
| // two commands never conflict with each other if they have different execution paths. | ||
|
|
||
| final List<String> recentRevisions = curator.getChildren().forPath(absolutePath(LOG_PATH)); | ||
| if (!recentRevisions.isEmpty()) { | ||
| final long lastRevision = recentRevisions.stream().mapToLong(Long::parseLong).max().getAsLong(); | ||
| replayLogs(lastRevision); | ||
| timings.startLogReplay(); | ||
| try { | ||
| final List<String> recentRevisions = curator.getChildren().forPath(absolutePath(LOG_PATH)); | ||
| if (!recentRevisions.isEmpty()) { | ||
| final long lastRevision = recentRevisions.stream().mapToLong(Long::parseLong).max() | ||
| .getAsLong(); | ||
| replayLogs(lastRevision); | ||
| } | ||
| } finally { | ||
| timings.endLogReplay(); | ||
| } | ||
|
|
||
| timings.startCommandExecution(); | ||
| final T result; | ||
| try { | ||
| result = delegate.execute(command).get(); | ||
| } finally { | ||
| timings.endCommandExecution(); | ||
| } | ||
|
|
||
| final T result = delegate.execute(command).get(); | ||
| timings.startLogStore(); | ||
| final long revision; | ||
| final ReplicationLog<?> log; | ||
| final Command<?> maybeUnwrapped = unwrapForcePush(command); | ||
| if (maybeUnwrapped instanceof NormalizableCommit) { | ||
| final NormalizableCommit normalizingPushCommand = (NormalizableCommit) maybeUnwrapped; | ||
| assert result instanceof CommitResult : result; | ||
| final CommitResult commitResult = (CommitResult) result; | ||
| final Command<Revision> pushAsIsCommand = normalizingPushCommand.asIs(commitResult); | ||
| log = new ReplicationLog<>(replicaId(), | ||
| maybeWrap(command, pushAsIsCommand), commitResult.revision()); | ||
| } else { | ||
| log = new ReplicationLog<>(replicaId(), command, result); | ||
| } | ||
| try { | ||
| final Command<?> maybeUnwrapped = unwrapForcePush(command); | ||
| if (maybeUnwrapped instanceof NormalizableCommit) { | ||
| final NormalizableCommit normalizingPushCommand = (NormalizableCommit) maybeUnwrapped; | ||
| assert result instanceof CommitResult : result; | ||
| final CommitResult commitResult = (CommitResult) result; | ||
| final Command<Revision> pushAsIsCommand = normalizingPushCommand.asIs(commitResult); | ||
| log = new ReplicationLog<>(replicaId(), | ||
| maybeWrap(command, pushAsIsCommand), commitResult.revision()); | ||
| } else { | ||
| log = new ReplicationLog<>(replicaId(), command, result); | ||
| } | ||
|
|
||
| // Store the command execution log to ZooKeeper. | ||
| final long revision = storeLog(log); | ||
| // Store the command execution log to ZooKeeper. | ||
| revision = storeLog(log); | ||
| } finally { | ||
| timings.endLogStore(); | ||
| } | ||
|
|
||
| // Update the ServerStatus to the CommandExecutor after the log is stored. | ||
| if (command.type() == CommandType.UPDATE_SERVER_STATUS) { | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid recording metrics for phases that never started.
If we fail before
startCommandExecution()/startLogReplay()/startLogStore()runs (e.g., lock acquisition timeout), these timers stillrecord(0, …), bumping the sample count and hiding that the phase never happened. Guard each record with*_StartNanos != 0so only completed phases contribute measurements, leaving lock failures to be represented solely by the lock timer.Apply this diff:
📝 Committable suggestion
🤖 Prompt for AI Agents