Skip to content

Commit

Permalink
Add Workflow unreachable metrics (#8671)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Sep 1, 2023
1 parent 94e8350 commit 32eabb9
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
Expand All @@ -22,17 +27,22 @@
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* Utility functions for connection manager workflows.
*/
@NoArgsConstructor
@Singleton
@Slf4j
public class ConnectionManagerUtils {

private final MetricClient metricClient;

public ConnectionManagerUtils() {
// TODO Inject it when MetricClient becomes injectable.
this.metricClient = MetricClientFactory.getMetricClient();
}

/**
* Send a cancellation to the workflow. It will swallow any exception and won't check if the
* workflow is already deleted when being cancel.
Expand Down Expand Up @@ -117,6 +127,8 @@ private <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final W
}
return connectionManagerWorkflow;
} catch (final UnreachableWorkflowException e) {
metricClient.count(OssMetricsRegistry.WORFLOW_UNREACHABLE, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()));
log.error(
String.format(
"Failed to retrieve ConnectionManagerWorkflow for connection %s. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.StreamDescriptor;
Expand Down Expand Up @@ -76,6 +81,7 @@ public class TemporalClient {
private final ConnectionManagerUtils connectionManagerUtils;
private final NotificationClient notificationClient;
private final StreamResetRecordsHelper streamResetRecordsHelper;
private final MetricClient metricClient;

public TemporalClient(@Named("workspaceRootTemporal") final Path workspaceRoot,
final WorkflowClient client,
Expand All @@ -91,6 +97,8 @@ public TemporalClient(@Named("workspaceRootTemporal") final Path workspaceRoot,
this.connectionManagerUtils = connectionManagerUtils;
this.notificationClient = notificationClient;
this.streamResetRecordsHelper = streamResetRecordsHelper;
// TODO Inject it when MetricClient becomes injectable.
this.metricClient = MetricClientFactory.getMetricClient();
}

private final Set<String> workflowNames = new HashSet<>();
Expand Down Expand Up @@ -567,6 +575,8 @@ public void update(final UUID connectionId) {
log.info("Connection {} is deleted, and therefore cannot be updated.", connectionId);
return;
} catch (final UnreachableWorkflowException e) {
metricClient.count(OssMetricsRegistry.WORFLOW_UNREACHABLE, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()));
log.error(
String.format("Failed to retrieve ConnectionManagerWorkflow for connection %s. Repairing state by creating new workflow.", connectionId),
e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ public enum OssMetricsRegistry implements MetricsRegistry {
WORKER_SOURCE_MESSAGE_READ(MetricEmittingApps.WORKER,
"worker_source_message_read",
"whenever a message is read from the source"),
WORFLOW_UNREACHABLE(MetricEmittingApps.WORKER,
"workflow_unreachable",
"whenever a workflow is unreachable"),
WORKFLOWS_HEALED(MetricEmittingApps.CRON,
"workflows_healed",
"number of workflow the self healing cron healed"),
Expand Down

0 comments on commit 32eabb9

Please sign in to comment.