Skip to content

Commit

Permalink
Add AttemptNumber to worker traces (#8641)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Sep 1, 2023
1 parent 8a85fee commit 94e8350
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.concurrency.BoundedConcurrentLinkedQueue;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.converters.ThreadedTimeTracker;
Expand Down Expand Up @@ -136,6 +139,7 @@ public BufferedReplicationWorker(final String jobId,
this.processFromDestStopwatch = new Stopwatch();
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException {
final Map<String, String> mdc = MDC.getCopyOfContextMap();
Expand All @@ -145,7 +149,7 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo
try {
final ReplicationContext replicationContext = getReplicationContext(syncInput);
final ReplicationFeatureFlags flags = replicationFeatureFlagReader.readReplicationFeatureFlags(syncInput);
replicationWorkerHelper.initialize(replicationContext, flags);
replicationWorkerHelper.initialize(replicationContext, flags, jobRoot);

// note: resources are closed in the opposite order in which they are declared. thus source will be
// closed first (which is what we want).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,9 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
new ReplicationContext(syncInput.getIsReset(), syncInput.getConnectionId(), syncInput.getSourceId(),
syncInput.getDestinationId(), Long.parseLong(jobId),
attempt, syncInput.getWorkspaceId());
ApmTraceUtils.addTagsToTrace(replicationContext.connectionId(), jobId, jobRoot);

final ReplicationFeatureFlags flags = replicationFeatureFlagReader.readReplicationFeatureFlags(syncInput);
replicationWorkerHelper.initialize(replicationContext, flags);
replicationWorkerHelper.initialize(replicationContext, flags, jobRoot);

replicate(jobRoot, syncInput);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airbyte.config.SyncStats;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricClientFactory;
Expand Down Expand Up @@ -122,11 +123,13 @@ public void markFailed() {
hasFailed.set(true);
}

public void initialize(final ReplicationContext replicationContext, final ReplicationFeatureFlags replicationFeatureFlags) {
public void initialize(final ReplicationContext replicationContext, final ReplicationFeatureFlags replicationFeatureFlags, final Path jobRoot) {
this.replicationContext = replicationContext;
this.replicationFeatureFlags = replicationFeatureFlags;
this.timeTracker.trackReplicationStartTime();
this.metricAttrs = toConnectionAttrs(replicationContext);
ApmTraceUtils.addTagsToTrace(replicationContext.connectionId(), replicationContext.attempt().longValue(),
replicationContext.jobId().toString(), jobRoot);
}

public void startDestination(final AirbyteDestination destination, final StandardSyncInput syncInput, final Path jobRoot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
podName,
mainContainerInfo);

ApmTraceUtils.addTagsToTrace(connectionId, jobRunConfig.getJobId(), jobRoot);
ApmTraceUtils.addTagsToTrace(connectionId, jobRunConfig.getAttemptId(), jobRunConfig.getJobId(), jobRoot);

final String schedulerName = featureFlagClient.stringVariation(UseCustomK8sScheduler.INSTANCE, new Connection(connectionId));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.metrics.lib;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
Expand Down Expand Up @@ -77,12 +78,15 @@ public static void addTagsToTrace(final Span span, final Map<String, Object> tag
* All tags added via this method will use the default {@link #TAG_PREFIX} namespace. Any null
* values will be ignored.
*/
public static void addTagsToTrace(final UUID connectionId, final String jobId, final Path jobRoot) {
public static void addTagsToTrace(final UUID connectionId, final Long attemptNumber, final String jobId, final Path jobRoot) {
final Map<String, Object> tags = new HashMap<>();

if (connectionId != null) {
tags.put(CONNECTION_ID_KEY, connectionId);
}
if (attemptNumber != null) {
tags.put(ATTEMPT_NUMBER_KEY, attemptNumber);
}
if (jobId != null) {
tags.put(JOB_ID_KEY, jobId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.metrics.lib;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
Expand Down Expand Up @@ -134,27 +135,39 @@ void testAddingTagsWithNullChecks() {
final UUID connectionID = UUID.randomUUID();
final String jobId = UUID.randomUUID().toString();
final Path jobRoot = Path.of("dev", "null");
final Long attemptNumber = Long.valueOf(2L);

ApmTraceUtils.addTagsToTrace(connectionID, jobId, jobRoot);
ApmTraceUtils.addTagsToTrace(connectionID, attemptNumber, jobId, jobRoot);
verify(span, times(1)).setTag(String.format(TAG_FORMAT, TAG_PREFIX, CONNECTION_ID_KEY), connectionID.toString());
verify(span, times(1)).setTag(String.format(TAG_FORMAT, TAG_PREFIX, ATTEMPT_NUMBER_KEY), attemptNumber.toString());
verify(span, times(1)).setTag(String.format(TAG_FORMAT, TAG_PREFIX, JOB_ID_KEY), jobId);
verify(span, times(1)).setTag(String.format(TAG_FORMAT, TAG_PREFIX, JOB_ROOT_KEY), jobRoot.toString());

clearInvocations(span);
ApmTraceUtils.addTagsToTrace(null, jobId, jobRoot);
ApmTraceUtils.addTagsToTrace(null, null, jobId, jobRoot);
verify(span, never()).setTag(String.format(TAG_FORMAT, TAG_PREFIX, CONNECTION_ID_KEY), connectionID.toString());
verify(span, never()).setTag(String.format(TAG_FORMAT, TAG_PREFIX, ATTEMPT_NUMBER_KEY), attemptNumber.toString());
verify(span, times(1)).setTag(String.format(TAG_FORMAT, TAG_PREFIX, JOB_ID_KEY), jobId);
verify(span, times(1)).setTag(String.format(TAG_FORMAT, TAG_PREFIX, JOB_ROOT_KEY), jobRoot.toString());

clearInvocations(span);
ApmTraceUtils.addTagsToTrace(connectionID, jobId, null);
ApmTraceUtils.addTagsToTrace(connectionID, null, jobId, null);
verify(span, times(1)).setTag(String.format(TAG_FORMAT, TAG_PREFIX, CONNECTION_ID_KEY), connectionID.toString());
verify(span, never()).setTag(String.format(TAG_FORMAT, TAG_PREFIX, ATTEMPT_NUMBER_KEY), attemptNumber.toString());
verify(span, times(1)).setTag(String.format(TAG_FORMAT, TAG_PREFIX, JOB_ID_KEY), jobId);
verify(span, never()).setTag(String.format(TAG_FORMAT, TAG_PREFIX, JOB_ROOT_KEY), jobRoot.toString());

clearInvocations(span);
ApmTraceUtils.addTagsToTrace((UUID) null, null, null);
ApmTraceUtils.addTagsToTrace(null, attemptNumber, jobId, null);
verify(span, times(1)).setTag(String.format(TAG_FORMAT, TAG_PREFIX, ATTEMPT_NUMBER_KEY), attemptNumber.toString());
verify(span, times(1)).setTag(String.format(TAG_FORMAT, TAG_PREFIX, JOB_ID_KEY), jobId);
verify(span, never()).setTag(String.format(TAG_FORMAT, TAG_PREFIX, CONNECTION_ID_KEY), jobRoot.toString());
verify(span, never()).setTag(String.format(TAG_FORMAT, TAG_PREFIX, JOB_ROOT_KEY), jobRoot.toString());

clearInvocations(span);
ApmTraceUtils.addTagsToTrace((UUID) null, null, null, null);
verify(span, never()).setTag(String.format(TAG_FORMAT, TAG_PREFIX, CONNECTION_ID_KEY), connectionID.toString());
verify(span, never()).setTag(String.format(TAG_FORMAT, TAG_PREFIX, ATTEMPT_NUMBER_KEY), attemptNumber.toString());
verify(span, never()).setTag(String.format(TAG_FORMAT, TAG_PREFIX, JOB_ID_KEY), jobId);
verify(span, never()).setTag(String.format(TAG_FORMAT, TAG_PREFIX, JOB_ROOT_KEY), jobRoot.toString());
}
Expand Down

0 comments on commit 94e8350

Please sign in to comment.