diff --git a/settings.gradle b/settings.gradle index 9357a1f15..8c5fc445c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,3 +4,4 @@ include 'temporal-sdk' include 'temporal-testing' include 'temporal-testing-junit4' include 'temporal-testing-junit5' +include 'temporal-opentracing' diff --git a/temporal-opentracing/LICENSE.txt b/temporal-opentracing/LICENSE.txt new file mode 100644 index 000000000..7bbf24f9f --- /dev/null +++ b/temporal-opentracing/LICENSE.txt @@ -0,0 +1,18 @@ +Temporal Java SDK + +Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + +Copyright (c) 2017 Uber Technologies, Inc. All Rights Reserved + +AWS Simple Workflow Flow Library +Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved + +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this +file except in compliance with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under +the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. diff --git a/temporal-opentracing/README.md b/temporal-opentracing/README.md new file mode 100644 index 000000000..5e7b973bc --- /dev/null +++ b/temporal-opentracing/README.md @@ -0,0 +1,41 @@ +# Temporal [OpenTracing](https://opentracing.io/) support module + +This module provides a set of Interceptors that adds support for OpenTracing Span Context propagation to Temporal. + +## Usage + +You want to register two interceptors - one on the Temporal client side, another on the worker side: + +1. Client configuration: + ```java + WorkflowClientOptions.newBuilder() + //... + .setInterceptors(new OpenTracingClientInterceptor()) + .build(); + ``` +2. Worker configuration: + ```java + WorkerFactoryOptions.newBuilder() + //... + .setWorkerInterceptors(new OpenTracingWorkerInterceptor()) + .build(); + ``` + +## [OpenTelemetry](https://opentelemetry.io/) + +OpenTracing has been merged into OpenTelemetry and nowadays OpenTelemetry should be a preferred solution. +There is still plenty of OpenTracing usage everywhere and there is an official OpenTracing -> OpenTelemetry bridge, +but no OpenTelemetry -> OpenTracing bridges. + +To give the best coverage in the simplest way, this module is implemented based on OpenTracing for now. +OpenTelemetry users are advised to use the +[OpenTracing -> OpenTelemetry bridge](https://github.com/open-telemetry/opentelemetry-java/tree/main/opentracing-shim) +to hook their OpenTelemetry setup and make it available for OpenTracing API: + +```java + io.opentracing.Tracer tracer = OpenTracingShim.createTracerShim(); + //or io.opentracing.Tracer tracer = OpenTracingShim.createTracerShim(openTelemetry); + GlobalTracer.registerIfAbsent(tracer); +``` + + diff --git a/temporal-opentracing/build.gradle b/temporal-opentracing/build.gradle new file mode 100644 index 000000000..a4a043e81 --- /dev/null +++ b/temporal-opentracing/build.gradle @@ -0,0 +1,178 @@ +// Run 'gradle checkUpdates' to find out which dependencies have newer versions +plugins { + id 'java-library' + id 'net.ltgt.errorprone' version '1.3.0' + id 'net.minecrell.licenser' version '0.4.1' + id 'com.palantir.git-version' version '0.12.3' + id 'maven-publish' + id 'signing' + id 'de.marcphilipp.nexus-publish' version '0.4.0' + id 'name.remal.check-updates' version '1.2.2' +} + +apply plugin: 'maven-publish' +apply plugin: 'de.marcphilipp.nexus-publish' +apply plugin: 'java' + +if (hasProperty('signing.keyId')) { + apply plugin: 'signing' + signing { + sign configurations.archives + } +} + +group = 'io.temporal' +version = getVersionName() +archivesBaseName = "temporal-opentracing" + +description = '''Temporal Java SDK OpenTracing Support Module''' + +java { + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 + withJavadocJar() + withSourcesJar() +} + +ext { + opentracingVersion = '0.33.0' +} + +dependencies { + errorproneJavac('com.google.errorprone:javac:9+181-r4173-1') + errorprone('com.google.errorprone:error_prone_core:2.5.1') + + api project(':temporal-sdk') + api group: 'io.opentracing', name: 'opentracing-api', version: "$opentracingVersion" + + implementation group: 'com.google.guava', name: 'guava', version: '30.1.1-jre' + implementation group: 'io.opentracing', name: 'opentracing-util', version: "$opentracingVersion" + + testImplementation project(":temporal-testing") + testImplementation project(':temporal-testing-junit4') + testImplementation group: 'junit', name: 'junit', version: '4.13.2' + testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.8.0' + testImplementation group: 'io.opentracing', name: 'opentracing-mock', version: "$opentracingVersion" +} + +license { + header rootProject.file('license-header.txt') + exclude '**/*.puml' +} + +compileJava { + dependsOn 'googleJavaFormat' + options.encoding = 'UTF-8' + options.compilerArgs << '-Xlint:none' << '-Xlint:deprecation' << '-Werror' +} + +compileTestJava { + options.encoding = 'UTF-8' + options.compilerArgs << '-Xlint:none' << '-Xlint:deprecation' << '-Werror' +} + +if (JavaVersion.current().isJava8Compatible()) { + allprojects { + tasks.withType(Javadoc) { + options.addStringOption('Xdoclint:none', '-quiet') + } + } +} + +javadoc { + options.encoding = 'UTF-8' + if (JavaVersion.current().isJava9Compatible()) { + options.addBooleanOption('html5', true) + } +} + +task sourceJar(type: Jar) { + from sourceSets.main.allSource + classifier "sources" +} + +test { + dependsOn 'checkLicenseMain' + testLogging { + events 'passed', 'skipped', 'failed' + exceptionFormat 'full' + // Uncomment the following line if you want to see test logs in gradlew run. + showStandardStreams true + } + forkEvery = 1 + maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 +} + +publishing { + publications { + mavenJava(MavenPublication) { + from components.java + versionMapping { + usage('java-api') { + fromResolutionOf('runtimeClasspath') + } + usage('java-runtime') { + fromResolutionResult() + } + } + pom { + name = 'Temporal Java SDK OpenTracing Support Module' + packaging = 'jar' + // optionally artifactId can be defined here + description = 'Contains a set of classes that adds OpenTracing support to Temporal' + url = 'https://github.com/temporalio/temporal-java-sdk' + + scm { + connection = 'scm:git@github.com:temporalio/temporal-java-sdk.git' + developerConnection = 'scm:git@github.com:temporalio/temporal-java-sdk.git' + url = 'https://github.com/temporalio/temporal-java-sdk.git' + } + + licenses { + license { + name = 'The Apache License, Version 2.0' + url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' + } + } + + developers { + developer { + id = 'mfateev' + name = 'Maxim Fateev' + email = 'maxim@temporal.io' + } + developer { + id = 'samarabbas' + name = 'Samar Abbas' + email = 'samar@temporal.io' + } + } + } + } + + } + + signing { + sign publishing.publications.mavenJava + } + + // Uncomment to test local publishing and comment nexusPublishing +// repositories { +// maven { +// def releasesRepoUrl = "$System.env.HOME/repos/releases" +// def snapshotsRepoUrl = "$System.env.HOME/repos/snapshots" +// url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl +// } +// } + +} + +nexusPublishing { + repositories { + sonatype { + username = project.hasProperty('ossrhUsername') ? project.property('ossrhUsername') : '' + password = project.hasProperty('ossrhPassword') ? project.property('ossrhPassword') : '' + } + } +} diff --git a/temporal-opentracing/license-header.txt b/temporal-opentracing/license-header.txt new file mode 100644 index 000000000..6331aec9f --- /dev/null +++ b/temporal-opentracing/license-header.txt @@ -0,0 +1,16 @@ + Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + + Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Modifications copyright (C) 2017 Uber Technologies, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"). You may not + use this file except in compliance with the License. A copy of the License is + located at + + http://aws.amazon.com/apache2.0 + + or in the "license" file accompanying this file. This file is distributed on + an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + express or implied. See the License for the specific language governing + permissions and limitations under the License. diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingClientInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingClientInterceptor.java new file mode 100644 index 000000000..c26f0db4f --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingClientInterceptor.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; +import io.temporal.common.interceptors.WorkflowClientInterceptorBase; +import io.temporal.opentracing.internal.OpenTracingWorkflowClientCallsInterceptor; +import io.temporal.opentracing.internal.SpanFactory; + +public class OpenTracingClientInterceptor extends WorkflowClientInterceptorBase { + private final OpenTracingOptions options; + + public OpenTracingClientInterceptor() { + this(OpenTracingOptions.getDefaultInstance()); + } + + public OpenTracingClientInterceptor(OpenTracingOptions options) { + this.options = options; + } + + @Override + public WorkflowClientCallsInterceptor workflowClientCallsInterceptor( + WorkflowClientCallsInterceptor next) { + return new OpenTracingWorkflowClientCallsInterceptor(next, options, new SpanFactory(options)); + } +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingOptions.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingOptions.java new file mode 100644 index 000000000..402420d39 --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingOptions.java @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +import com.google.common.base.MoreObjects; +import io.opentracing.Tracer; +import io.opentracing.util.GlobalTracer; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; + +public class OpenTracingOptions { + private static final OpenTracingOptions DEFAULT_INSTANCE = + OpenTracingOptions.newBuilder().build(); + + private final Tracer tracer; + private final Map customSpanOperationNamePrefixes; + + public static OpenTracingOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private OpenTracingOptions( + Tracer tracer, Map customSpanOperationNamePrefixes) { + if (tracer == null) throw new IllegalArgumentException("tracer shouldn't be null"); + this.tracer = tracer; + this.customSpanOperationNamePrefixes = customSpanOperationNamePrefixes; + } + + @Nonnull + public String getSpanOperationNamePrefix(SpanOperationType spanOperationType) { + return customSpanOperationNamePrefixes.getOrDefault( + spanOperationType, spanOperationType.getDefaultPrefix()); + } + + @Nonnull + public Tracer getTracer() { + return tracer; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder { + private Tracer tracer; + private final Map customSpanOperationNamePrefixes = new HashMap<>(); + + private Builder() {} + + public void setTracer(Tracer tracer) { + this.tracer = tracer; + } + + public Builder setSpanOperationNamePrefix( + SpanOperationType spanOperationType, String customPrefix) { + this.customSpanOperationNamePrefixes.put(spanOperationType, customPrefix); + return this; + } + + public OpenTracingOptions build() { + return new OpenTracingOptions( + MoreObjects.firstNonNull(tracer, GlobalTracer.get()), customSpanOperationNamePrefixes); + } + } +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingWorkerInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingWorkerInterceptor.java new file mode 100644 index 000000000..6c504af4b --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingWorkerInterceptor.java @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +import io.temporal.common.interceptors.*; +import io.temporal.opentracing.internal.OpenTracingActivityInboundCallsInterceptor; +import io.temporal.opentracing.internal.OpenTracingWorkflowInboundCallsInterceptor; +import io.temporal.opentracing.internal.SpanFactory; + +public class OpenTracingWorkerInterceptor implements WorkerInterceptor { + private final OpenTracingOptions options; + + public OpenTracingWorkerInterceptor() { + this(OpenTracingOptions.getDefaultInstance()); + } + + public OpenTracingWorkerInterceptor(OpenTracingOptions options) { + this.options = options; + } + + @Override + public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) { + return new OpenTracingWorkflowInboundCallsInterceptor(next, options, new SpanFactory(options)); + } + + @Override + public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { + return new OpenTracingActivityInboundCallsInterceptor(next, options, new SpanFactory(options)); + } +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java new file mode 100644 index 000000000..f123815ed --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +public enum SpanOperationType { + START_WORKFLOW("StartWorkflow"), + SIGNAL_WITH_START_WORKFLOW("SignalWithStartWorkflow"), + RUN_WORKFLOW("RunWorkflow"), + START_CHILD_WORKFLOW("StartChildWorkflow"), + START_ACTIVITY("StartActivity"), + RUN_ACTIVITY("RunActivity"); + + private final String defaultPrefix; + + SpanOperationType(String defaultPrefix) { + this.defaultPrefix = defaultPrefix; + } + + public String getDefaultPrefix() { + return defaultPrefix; + } +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardLogNames.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardLogNames.java new file mode 100644 index 000000000..f3647d1ae --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardLogNames.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +public class StandardLogNames { + public static final String FAILURE_MESSAGE = "failureMessage"; + public static final String FAILURE_CAUSE = "failureCause"; +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java new file mode 100644 index 000000000..7a3c93eb4 --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +public class StandardTagNames { + public static final String WORKFLOW_ID = "workflowId"; + public static final String RUN_ID = "runId"; + public static final String PARENT_WORKFLOW_ID = "parentWorkflowId"; + public static final String PARENT_RUN_ID = "parentRunId"; + public static final String FAILED = "failed"; +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java new file mode 100644 index 000000000..5f1b1a0e4 --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing.internal; + +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.activity.ActivityInfo; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptorBase; +import io.temporal.opentracing.OpenTracingOptions; + +public class OpenTracingActivityInboundCallsInterceptor + extends ActivityInboundCallsInterceptorBase { + private final OpenTracingOptions options; + private final SpanFactory spanFactory; + + public OpenTracingActivityInboundCallsInterceptor( + ActivityInboundCallsInterceptor next, OpenTracingOptions options, SpanFactory spanFactory) { + super(next); + this.options = options; + this.spanFactory = spanFactory; + } + + private ActivityExecutionContext activityExecutionContext; + + @Override + public void init(ActivityExecutionContext context) { + // Workflow Interceptors have access to Workflow.getInfo methods, + // but Activity Interceptors don't have access to Activity.getExecutionContext().getInfo() + // This is inconsistent and should be addressed, but this is a workaround. + this.activityExecutionContext = context; + super.init(context); + } + + @Override + public ActivityOutput execute(ActivityInput input) { + Tracer tracer = options.getTracer(); + SpanContext rootSpanContext = + OpenTracingContextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); + ActivityInfo activityInfo = activityExecutionContext.getInfo(); + Span activityRunSpan = + spanFactory + .createActivityRunSpan( + tracer, + activityInfo.getActivityType(), + System.currentTimeMillis(), + activityInfo.getWorkflowId(), + activityInfo.getRunId(), + rootSpanContext) + .start(); + try (Scope scope = tracer.scopeManager().activate(activityRunSpan)) { + return super.execute(input); + } catch (Throwable t) { + spanFactory.logFail(activityRunSpan, t); + throw t; + } finally { + activityRunSpan.finish(); + } + } +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingContextAccessor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingContextAccessor.java new file mode 100644 index 000000000..b63f9a4da --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingContextAccessor.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing.internal; + +import com.google.common.reflect.TypeToken; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.propagation.*; +import io.temporal.api.common.v1.Payload; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.interceptors.Header; +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +class OpenTracingContextAccessor { + private static final String TRACER_HEADER_KEY = "_tracer-data"; + private static final Type HASH_MAP_STRING_STRING_TYPE = + new TypeToken>() {}.getType(); + + public static void writeSpanContextToHeader( + SpanContext spanContext, Header header, Tracer tracer) { + Map serializedSpanContext = serializeSpanContextToMap(spanContext, tracer); + Optional payload = DataConverter.getDefaultInstance().toPayload(serializedSpanContext); + header.getValues().put(TRACER_HEADER_KEY, payload.get()); + } + + private static Map serializeSpanContextToMap( + SpanContext spanContext, Tracer tracer) { + Map serialized = new HashMap<>(); + tracer.inject( + spanContext, Format.Builtin.TEXT_MAP_INJECT, new TextMapInjectAdapter(serialized)); + return serialized; + } + + public static SpanContext readSpanContextFromHeader(Header header, Tracer tracer) { + Payload payload = header.getValues().get(TRACER_HEADER_KEY); + if (payload == null) { + return null; + } + Map serializedSpanContext = + DataConverter.getDefaultInstance() + .fromPayload(payload, HashMap.class, HASH_MAP_STRING_STRING_TYPE); + return deserializeSpanContextFromMap(serializedSpanContext, tracer); + } + + private static SpanContext deserializeSpanContextFromMap( + Map serializedSpanContext, Tracer tracer) { + return tracer.extract( + Format.Builtin.TEXT_MAP_EXTRACT, new TextMapExtractAdapter(serializedSpanContext)); + } +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java new file mode 100644 index 000000000..dd6e3095e --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing.internal; + +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; +import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase; +import io.temporal.opentracing.OpenTracingOptions; +import io.temporal.opentracing.SpanOperationType; + +public class OpenTracingWorkflowClientCallsInterceptor extends WorkflowClientCallsInterceptorBase { + + private final OpenTracingOptions options; + private final SpanFactory spanFactory; + + public OpenTracingWorkflowClientCallsInterceptor( + WorkflowClientCallsInterceptor next, OpenTracingOptions options, SpanFactory spanFactory) { + super(next); + this.options = options; + this.spanFactory = spanFactory; + } + + @Override + public WorkflowStartOutput start(WorkflowStartInput input) { + Span span = createAndPassWorkflowStartSpan(input, SpanOperationType.START_WORKFLOW); + try { + return super.start(input); + } finally { + span.finish(); + } + } + + @Override + public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input) { + Span span = + createAndPassWorkflowStartSpan( + input.getWorkflowStartInput(), SpanOperationType.SIGNAL_WITH_START_WORKFLOW); + try { + return super.signalWithStart(input); + } finally { + span.finish(); + } + } + + private Span createAndPassWorkflowStartSpan( + WorkflowStartInput input, SpanOperationType operationType) { + Tracer tracer = options.getTracer(); + Span span = createWorkflowStartSpanBuilder(tracer, input, operationType).start(); + OpenTracingContextAccessor.writeSpanContextToHeader(span.context(), input.getHeader(), tracer); + return span; + } + + private Tracer.SpanBuilder createWorkflowStartSpanBuilder( + Tracer tracer, WorkflowStartInput input, SpanOperationType operationType) { + return spanFactory.createWorkflowStartSpan( + tracer, + operationType, + input.getWorkflowType(), + System.currentTimeMillis(), + input.getWorkflowId()); + } +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java new file mode 100644 index 000000000..52a828ed2 --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing.internal; + +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; +import io.temporal.common.interceptors.WorkflowInboundCallsInterceptorBase; +import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; +import io.temporal.opentracing.OpenTracingOptions; +import io.temporal.workflow.Workflow; + +public class OpenTracingWorkflowInboundCallsInterceptor + extends WorkflowInboundCallsInterceptorBase { + private final OpenTracingOptions options; + private final SpanFactory spanFactory; + + public OpenTracingWorkflowInboundCallsInterceptor( + WorkflowInboundCallsInterceptor next, OpenTracingOptions options, SpanFactory spanFactory) { + super(next); + this.options = options; + this.spanFactory = spanFactory; + } + + @Override + public void init(WorkflowOutboundCallsInterceptor outboundCalls) { + super.init( + new OpenTracingWorkflowOutboundCallsInterceptor(outboundCalls, options, spanFactory)); + } + + @Override + public WorkflowOutput execute(WorkflowInput input) { + Tracer tracer = options.getTracer(); + SpanContext rootSpanContext = + OpenTracingContextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); + Span workflowRunSpan = + spanFactory + .createWorkflowRunSpan( + tracer, + Workflow.getInfo().getWorkflowType(), + Workflow.currentTimeMillis(), + Workflow.getInfo().getWorkflowId(), + Workflow.getInfo().getRunId(), + rootSpanContext) + .start(); + try (Scope scope = tracer.scopeManager().activate(workflowRunSpan)) { + return super.execute(input); + } catch (Throwable t) { + spanFactory.logFail(workflowRunSpan, t); + throw t; + } finally { + workflowRunSpan.finish(); + } + } +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java new file mode 100644 index 000000000..3be9d12a7 --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing.internal; + +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.temporal.common.interceptors.Header; +import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; +import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptorBase; +import io.temporal.opentracing.OpenTracingOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInfo; + +public class OpenTracingWorkflowOutboundCallsInterceptor + extends WorkflowOutboundCallsInterceptorBase { + private final OpenTracingOptions options; + private final SpanFactory spanFactory; + + public OpenTracingWorkflowOutboundCallsInterceptor( + WorkflowOutboundCallsInterceptor next, OpenTracingOptions options, SpanFactory spanFactory) { + super(next); + this.options = options; + this.spanFactory = spanFactory; + } + + @Override + public ActivityOutput executeActivity(ActivityInput input) { + if (!Workflow.isReplaying()) { + Span span = createAndPassActivityStartSpan(input.getActivityName(), input.getHeader()); + try { + return super.executeActivity(input); + } finally { + span.finish(); + } + } else { + return super.executeActivity(input); + } + } + + @Override + public LocalActivityOutput executeLocalActivity(LocalActivityInput input) { + if (!Workflow.isReplaying()) { + Span span = createAndPassActivityStartSpan(input.getActivityName(), input.getHeader()); + try { + return super.executeLocalActivity(input); + } finally { + span.finish(); + } + } else { + return super.executeLocalActivity(input); + } + } + + @Override + public ChildWorkflowOutput executeChildWorkflow(ChildWorkflowInput input) { + if (!Workflow.isReplaying()) { + Span span = createAndPassChildWorkflowStartSpan(input); + try { + return super.executeChildWorkflow(input); + } finally { + span.finish(); + } + } else { + return super.executeChildWorkflow(input); + } + } + + private Span createAndPassActivityStartSpan(String activityName, Header header) { + Tracer tracer = options.getTracer(); + Span span = createActivityStartSpanBuilder(tracer, activityName).start(); + OpenTracingContextAccessor.writeSpanContextToHeader(span.context(), header, tracer); + return span; + } + + private Tracer.SpanBuilder createActivityStartSpanBuilder(Tracer tracer, String activityName) { + WorkflowInfo workflowInfo = Workflow.getInfo(); + return spanFactory.createActivityStartSpan( + tracer, + activityName, + Workflow.currentTimeMillis(), + workflowInfo.getWorkflowId(), + workflowInfo.getRunId()); + } + + private Span createAndPassChildWorkflowStartSpan(ChildWorkflowInput input) { + Tracer tracer = options.getTracer(); + Span span = createChildWorkflowStartSpanBuilder(tracer, input).start(); + OpenTracingContextAccessor.writeSpanContextToHeader(span.context(), input.getHeader(), tracer); + return span; + } + + private Tracer.SpanBuilder createChildWorkflowStartSpanBuilder( + Tracer tracer, ChildWorkflowInput input) { + WorkflowInfo parentWorkflowInfo = Workflow.getInfo(); + return spanFactory.createChildWorkflowStartSpan( + tracer, + input.getWorkflowType(), + System.currentTimeMillis(), + input.getWorkflowId(), + parentWorkflowInfo.getWorkflowId(), + parentWorkflowInfo.getRunId()); + } +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java new file mode 100644 index 000000000..121906865 --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -0,0 +1,175 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing.internal; + +import com.google.common.base.MoreObjects; +import com.uber.m3.util.ImmutableMap; +import io.opentracing.References; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.temporal.opentracing.OpenTracingOptions; +import io.temporal.opentracing.SpanOperationType; +import io.temporal.opentracing.StandardLogNames; +import io.temporal.opentracing.StandardTagNames; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +public class SpanFactory { + // Inspired by convention used in JAX-RS2 OpenTracing implementation: + // https://github.com/opentracing-contrib/java-jaxrs/blob/dcbfda6/opentracing-jaxrs2/src/main/java/io/opentracing/contrib/jaxrs2/server/OperationNameProvider.java#L46 + private static final String PREFIX_DELIMITER = ":"; + + private final OpenTracingOptions options; + + public SpanFactory(OpenTracingOptions options) { + this.options = options; + } + + public Tracer.SpanBuilder createWorkflowStartSpan( + Tracer tracer, + SpanOperationType operationType, + String workflowType, + long startTimeMs, + String workflowId) { + Map tags = ImmutableMap.of(StandardTagNames.WORKFLOW_ID, workflowId); + String operationName = + options.getSpanOperationNamePrefix(operationType) + PREFIX_DELIMITER + workflowType; + return createSpan(tracer, startTimeMs, operationName, tags, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createChildWorkflowStartSpan( + Tracer tracer, + String childWorkflowType, + long startTimeMs, + String workflowId, + String parentWorkflowId, + String parentRunId) { + Map tags = + ImmutableMap.of( + StandardTagNames.WORKFLOW_ID, workflowId, + StandardTagNames.PARENT_WORKFLOW_ID, parentWorkflowId, + StandardTagNames.PARENT_RUN_ID, parentRunId); + String operationName = + options.getSpanOperationNamePrefix(SpanOperationType.START_CHILD_WORKFLOW) + + PREFIX_DELIMITER + + childWorkflowType; + return createSpan(tracer, startTimeMs, operationName, tags, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createWorkflowRunSpan( + Tracer tracer, + String workflowType, + long startTimeMs, + String workflowId, + String runId, + SpanContext workflowStartSpanContext) { + Map tags = + ImmutableMap.of( + StandardTagNames.WORKFLOW_ID, workflowId, + StandardTagNames.RUN_ID, runId); + String operationName = + options.getSpanOperationNamePrefix(SpanOperationType.RUN_WORKFLOW) + + PREFIX_DELIMITER + + workflowType; + return createSpan( + tracer, + startTimeMs, + operationName, + tags, + workflowStartSpanContext, + References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createActivityStartSpan( + Tracer tracer, String activityType, long startTimeMs, String workflowId, String runId) { + Map tags = + ImmutableMap.of( + StandardTagNames.WORKFLOW_ID, workflowId, + StandardTagNames.RUN_ID, runId); + String operationName = + options.getSpanOperationNamePrefix(SpanOperationType.START_ACTIVITY) + + PREFIX_DELIMITER + + activityType; + return createSpan(tracer, startTimeMs, operationName, tags, null, References.CHILD_OF); + } + + public Tracer.SpanBuilder createActivityRunSpan( + Tracer tracer, + String activityType, + long startTimeMs, + String workflowId, + String runId, + SpanContext activityStartSpanContext) { + Map tags = + ImmutableMap.of( + StandardTagNames.WORKFLOW_ID, workflowId, + StandardTagNames.RUN_ID, runId); + String operationName = + options.getSpanOperationNamePrefix(SpanOperationType.RUN_ACTIVITY) + + PREFIX_DELIMITER + + activityType; + return createSpan( + tracer, startTimeMs, operationName, tags, activityStartSpanContext, References.CHILD_OF); + } + + public void logFail(Span toSpan, Throwable failReason) { + toSpan.setTag(StandardTagNames.FAILED, true); + Map logPayload = + ImmutableMap.of( + StandardLogNames.FAILURE_MESSAGE, + failReason.getMessage(), + StandardLogNames.FAILURE_CAUSE, + failReason); + toSpan.log(System.currentTimeMillis(), logPayload); + } + + private static Tracer.SpanBuilder createSpan( + Tracer tracer, + long startTimeMs, + String operationName, + Map tags, + @Nullable SpanContext parentSpanContext, + @Nullable String parentReferenceType) { + SpanContext parent; + + Span activeSpan = tracer.activeSpan(); + if (activeSpan != null) { + // we prefer an actual opentracing active span if it exists + parent = activeSpan.context(); + } else { + // next we try to use the parent span context from parameters + parent = parentSpanContext; + } + + long startTimeMc = TimeUnit.MILLISECONDS.toMicros(startTimeMs); + Tracer.SpanBuilder spanBuilder = + tracer.buildSpan(operationName).withStartTimestamp(startTimeMc); + + if (parent != null) { + spanBuilder.addReference( + MoreObjects.firstNonNull(parentReferenceType, References.FOLLOWS_FROM), parent); + } + + tags.forEach(spanBuilder::withTag); + return spanBuilder; + } +} diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/ActivityFailureTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/ActivityFailureTest.java new file mode 100644 index 000000000..b189fad0d --- /dev/null +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/ActivityFailureTest.java @@ -0,0 +1,177 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +import static org.junit.Assert.*; + +import io.opentracing.Scope; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.GlobalTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.RetryOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.testing.TestWorkflowRule; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class ActivityFailureTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + @Rule + public TestWorkflowRule testWorkflowRule = + TestWorkflowRule.newBuilder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setInterceptors(new OpenTracingClientInterceptor()) + .validateAndBuildWithDefaults()) + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new OpenTracingWorkerInterceptor()) + .validateAndBuildWithDefaults()) + .setWorkflowTypes(WorkflowImpl.class) + .setActivityImplementations(new FailingActivityImpl()) + .build(); + + @Before + public void setUp() { + GlobalTracer.registerIfAbsent(mockTracer); + } + + @After + public void tearDown() { + mockTracer.reset(); + } + + @ActivityInterface + public interface TestActivity { + @ActivityMethod + String activity(String input); + } + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + String workflow(String input); + } + + private static final AtomicInteger failureCounter = new AtomicInteger(1); + + public static class FailingActivityImpl implements TestActivity { + @Override + public String activity(String input) { + int counter = failureCounter.getAndDecrement(); + if (counter > 0) { + throw ApplicationFailure.newFailure("fail", "fail"); + } else { + return "bar"; + } + } + } + + public static class WorkflowImpl implements TestWorkflow { + private final TestActivity activity = + Workflow.newActivityStub( + TestActivity.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofMinutes(1)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(2).build()) + .validateAndBuildWithDefaults()); + + @Override + public String workflow(String input) { + return activity.activity(input); + } + } + + /* + * We are checking that spans structure looks like this: + * ClientFunction + * | + * child + * v + * StartWorkflow:TestWorkflow -follow> RunWorkflow:TestWorkflow + * | + * child + * v + * StartActivity:Activity -follow> RunActivity:Activity(failed), RunActivity:Activity + */ + @Test + public void testActivityFailureSpanStructure() { + MockSpan span = mockTracer.buildSpan("ClientFunction").start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + try (Scope scope = mockTracer.scopeManager().activate(span)) { + TestWorkflow workflow = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); + assertEquals("bar", workflow.workflow("input")); + } finally { + span.finish(); + } + + OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans()); + + MockSpan clientSpan = spansHelper.getSpanByOperationName("ClientFunction"); + + MockSpan workflowStartSpan = spansHelper.getByParentSpan(clientSpan).get(0); + assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId()); + assertEquals("StartWorkflow:TestWorkflow", workflowStartSpan.operationName()); + + MockSpan workflowRunSpan = spansHelper.getByParentSpan(workflowStartSpan).get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); + + MockSpan activityStartSpan = spansHelper.getByParentSpan(workflowRunSpan).get(0); + assertEquals(workflowRunSpan.context().spanId(), activityStartSpan.parentId()); + assertEquals("StartActivity:Activity", activityStartSpan.operationName()); + + List activityRunSpans = spansHelper.getByParentSpan(activityStartSpan); + + MockSpan activityFailRunSpan = activityRunSpans.get(0); + assertEquals(activityStartSpan.context().spanId(), activityFailRunSpan.parentId()); + assertEquals("RunActivity:Activity", activityFailRunSpan.operationName()); + assertEquals(true, activityFailRunSpan.tags().get(StandardTagNames.FAILED)); + + MockSpan activitySuccessfulRunSpan = activityRunSpans.get(1); + assertEquals(activityStartSpan.context().spanId(), activitySuccessfulRunSpan.parentId()); + assertEquals("RunActivity:Activity", activitySuccessfulRunSpan.operationName()); + } +} diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/NoClientSpanTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/NoClientSpanTest.java new file mode 100644 index 000000000..2466c6069 --- /dev/null +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/NoClientSpanTest.java @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +import static org.junit.Assert.assertEquals; + +import io.opentracing.mock.MockTracer; +import io.opentracing.util.GlobalTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.testing.TestWorkflowRule; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class NoClientSpanTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + @Rule + public TestWorkflowRule testWorkflowRule = + TestWorkflowRule.newBuilder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setInterceptors(new OpenTracingClientInterceptor()) + .validateAndBuildWithDefaults()) + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new OpenTracingWorkerInterceptor()) + .validateAndBuildWithDefaults()) + .setWorkflowTypes(WorkflowImpl.class) + .setActivityImplementations(new ActivityImpl()) + .build(); + + @Before + public void setUp() { + GlobalTracer.registerIfAbsent(mockTracer); + } + + @After + public void tearDown() { + mockTracer.reset(); + } + + @ActivityInterface + public interface TestActivity { + @ActivityMethod + String activity(String input); + } + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + String workflow(String input); + } + + public static class ActivityImpl implements TestActivity { + @Override + public String activity(String input) { + return "bar"; + } + } + + public static class WorkflowImpl implements TestWorkflow { + private TestActivity activity = + Workflow.newActivityStub( + TestActivity.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofMinutes(1)) + .validateAndBuildWithDefaults()); + + @Override + public String workflow(String input) { + return activity.activity(input); + } + } + + @Test + public void testNothingFailsIfNoClientSpan() { + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + TestWorkflow workflow = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); + assertEquals("bar", workflow.workflow("input")); + } +} diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/OpenTracingSpansHelper.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/OpenTracingSpansHelper.java new file mode 100644 index 000000000..8d0a179b3 --- /dev/null +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/OpenTracingSpansHelper.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +import io.opentracing.mock.MockSpan; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class OpenTracingSpansHelper { + private final List mockSpans; + private final Map spanIdToSpan; + + public OpenTracingSpansHelper(List mockSpans) { + this.mockSpans = mockSpans; + this.spanIdToSpan = + mockSpans.stream().collect(Collectors.toMap(span -> span.context().spanId(), span -> span)); + } + + public MockSpan getSpan(long spanId) { + return spanIdToSpan.get(spanId); + } + + public MockSpan getSpanByOperationName(String operationName) { + return mockSpans.stream() + .filter(span -> span.operationName().equals(operationName)) + .findFirst() + .orElse(null); + } + + public List getByParentSpan(MockSpan parentSpan) { + return mockSpans.stream() + .filter(span -> span.parentId() == parentSpan.context().spanId()) + .collect(Collectors.toList()); + } +} diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/SpanContextPropagationTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/SpanContextPropagationTest.java new file mode 100644 index 000000000..77bf7d3c3 --- /dev/null +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/SpanContextPropagationTest.java @@ -0,0 +1,156 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; + +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.GlobalTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.testing.TestWorkflowRule; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.workflow.*; +import java.time.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class SpanContextPropagationTest { + + private static final String BAGGAGE_ITEM_KEY = "baggage-item-key"; + + private MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + @Rule + public TestWorkflowRule testWorkflowRule = + TestWorkflowRule.newBuilder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setInterceptors(new OpenTracingClientInterceptor()) + .validateAndBuildWithDefaults()) + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new OpenTracingWorkerInterceptor()) + .validateAndBuildWithDefaults()) + .setWorkflowTypes(WorkflowImpl.class) + .setActivityImplementations(new OpenTracingAwareActivityImpl()) + .build(); + + @Before + public void setUp() { + GlobalTracer.registerIfAbsent(mockTracer); + } + + @After + public void tearDown() { + mockTracer.reset(); + } + + @ActivityInterface + public interface TestActivity { + @ActivityMethod + String activity1(String input); + } + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + String workflow1(String input); + } + + public static class OpenTracingAwareActivityImpl implements TestActivity { + @Override + public String activity1(String input) { + Tracer tracer = GlobalTracer.get(); + Span activeSpan = tracer.scopeManager().activeSpan(); + + if ("fail".equals(input)) { + throw ApplicationFailure.newFailure("fail", "fail"); + } else if ("foo".equals(input)) { + return "bar"; + } else { + return activeSpan.getBaggageItem(BAGGAGE_ITEM_KEY); + } + } + } + + public static class WorkflowImpl implements TestWorkflow { + private TestActivity activity = + Workflow.newActivityStub( + TestActivity.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofMinutes(1)) + .validateAndBuildWithDefaults()); + + @Override + public String workflow1(String input) { + Tracer tracer = GlobalTracer.get(); + Span activeSpan = tracer.scopeManager().activeSpan(); + + MockSpan mockSpan = (MockSpan) activeSpan; + assertNotNull(activeSpan); + assertNotEquals(0, mockSpan.parentId()); + + return activity.activity1(input); + } + } + + /** + * This test checks that all elements of workflow are connected with the same root span. We set + * baggage item on the top level in a client span and use the baggage item inside the activity. + * This way we ensure that the baggage item has been propagated all the way from top to bottom. + */ + @Test + public void testBaggageItemPropagation() { + Span span = mockTracer.buildSpan("clientFunction").start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + try (Scope scope = mockTracer.scopeManager().activate(span)) { + Span activeSpan = mockTracer.scopeManager().activeSpan(); + final String BAGGAGE_ITEM_VALUE = "baggage-item-key"; + activeSpan.setBaggageItem(BAGGAGE_ITEM_KEY, BAGGAGE_ITEM_VALUE); + + TestWorkflow workflow = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); + assertEquals(BAGGAGE_ITEM_VALUE, workflow.workflow1("input")); + } finally { + span.finish(); + } + } +} diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/WorkflowReplayTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/WorkflowReplayTest.java new file mode 100644 index 000000000..e02ce10d3 --- /dev/null +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/WorkflowReplayTest.java @@ -0,0 +1,221 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +import static org.junit.Assert.assertEquals; + +import io.opentracing.Scope; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.GlobalTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.RetryOptions; +import io.temporal.testing.TestWorkflowRule; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class WorkflowReplayTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + @Rule + public TestWorkflowRule testWorkflowRule = + TestWorkflowRule.newBuilder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setInterceptors(new OpenTracingClientInterceptor()) + .validateAndBuildWithDefaults()) + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new OpenTracingWorkerInterceptor()) + .validateAndBuildWithDefaults()) + .setWorkflowTypes(WorkflowImpl.class) + .setActivityImplementations(new ActivityImpl()) + .build(); + + @Before + public void setUp() { + GlobalTracer.registerIfAbsent(mockTracer); + } + + @After + public void tearDown() { + mockTracer.reset(); + } + + @ActivityInterface + public interface TestActivity1 { + @ActivityMethod + String activity1(String input); + } + + @ActivityInterface + public interface TestActivity2 { + @ActivityMethod + String activity2(String input); + } + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + String workflow1(String input); + } + + public static class ActivityImpl implements TestActivity1, TestActivity2 { + @Override + public String activity1(String input) { + return "bar"; + } + + @Override + public String activity2(String input) { + return "bar"; + } + } + + private static final AtomicInteger failureCounter = new AtomicInteger(1); + + public static class WorkflowImpl implements TestWorkflow { + private final TestActivity1 activity1 = + Workflow.newActivityStub( + TestActivity1.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofMinutes(1)) + .validateAndBuildWithDefaults()); + + private final TestActivity2 activity2 = + Workflow.newActivityStub( + TestActivity2.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofMinutes(1)) + .validateAndBuildWithDefaults()); + + @Override + public String workflow1(String input) { + activity1.activity1(input); + + if (failureCounter.getAndDecrement() > 0) { + throw new OutOfMemoryError(); + } + + return activity2.activity2(input); + } + } + + /* + * We are checking that spans structure looks like this: + * ClientFunction + * | + * child + * v + * StartWorkflow:TestWorkflow + * | + * follow + * | + * l__-> RunWorkflow:TestWorkflow, (Error in workflow; RunWorkflow:TestWorkflow + * | thread is dead; replay needed), | + * child child + * v v + * StartActivity:Activity1 -follow> RunActivity:Activity1 StartActivity:Activity2 -follow> RunActivity:Activity2 + */ + @Test + public void testWorkflowReplaySpanStructure() { + MockSpan span = mockTracer.buildSpan("ClientFunction").start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + try (Scope scope = mockTracer.scopeManager().activate(span)) { + TestWorkflow workflow = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setRetryOptions( + RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(1)) + .setBackoffCoefficient(1.0) + .setMaximumAttempts(2) + .build()) + .setWorkflowExecutionTimeout(Duration.ofMinutes(1)) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); + assertEquals("bar", workflow.workflow1("input")); + } finally { + span.finish(); + } + + OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans()); + + MockSpan clientSpan = spansHelper.getSpanByOperationName("ClientFunction"); + + MockSpan workflowStartSpan = spansHelper.getByParentSpan(clientSpan).get(0); + assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId()); + assertEquals("StartWorkflow:TestWorkflow", workflowStartSpan.operationName()); + + List workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan); + assertEquals(2, workflowRunSpans.size()); + + MockSpan workflowFirstRunSpan = workflowRunSpans.get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowFirstRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowFirstRunSpan.operationName()); + assertEquals(true, workflowFirstRunSpan.tags().get(StandardTagNames.FAILED)); + + List workflowFirstRunChildren = spansHelper.getByParentSpan(workflowFirstRunSpan); + assertEquals(1, workflowFirstRunChildren.size()); + + MockSpan activity1StartSpan = workflowFirstRunChildren.get(0); + assertEquals(workflowFirstRunSpan.context().spanId(), activity1StartSpan.parentId()); + assertEquals("StartActivity:Activity1", activity1StartSpan.operationName()); + + MockSpan activity1RunSpan = spansHelper.getByParentSpan(activity1StartSpan).get(0); + assertEquals(activity1StartSpan.context().spanId(), activity1RunSpan.parentId()); + assertEquals("RunActivity:Activity1", activity1RunSpan.operationName()); + + MockSpan workflowSecondRunSpan = workflowRunSpans.get(1); + assertEquals(workflowStartSpan.context().spanId(), workflowSecondRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowSecondRunSpan.operationName()); + + List workflowSecondRunChildren = spansHelper.getByParentSpan(workflowSecondRunSpan); + assertEquals( + "First Activity shouldn't be here if we replay", 1, workflowSecondRunChildren.size()); + + MockSpan activity2StartSpan = workflowSecondRunChildren.get(0); + assertEquals(workflowSecondRunSpan.context().spanId(), activity2StartSpan.parentId()); + assertEquals("StartActivity:Activity2", activity2StartSpan.operationName()); + + MockSpan activity2RunSpan = spansHelper.getByParentSpan(activity2StartSpan).get(0); + assertEquals(activity2StartSpan.context().spanId(), activity2RunSpan.parentId()); + assertEquals("RunActivity:Activity2", activity2RunSpan.operationName()); + } +} diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/WorkflowRetryTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/WorkflowRetryTest.java new file mode 100644 index 000000000..79d42d01c --- /dev/null +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/WorkflowRetryTest.java @@ -0,0 +1,229 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.opentracing; + +import static org.junit.Assert.assertEquals; + +import io.opentracing.Scope; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.GlobalTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.RetryOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.testing.TestWorkflowRule; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class WorkflowRetryTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + @Rule + public TestWorkflowRule testWorkflowRule = + TestWorkflowRule.newBuilder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setInterceptors(new OpenTracingClientInterceptor()) + .validateAndBuildWithDefaults()) + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new OpenTracingWorkerInterceptor()) + .validateAndBuildWithDefaults()) + .setWorkflowTypes(WorkflowImpl.class) + .setActivityImplementations(new ActivityImpl()) + .build(); + + @Before + public void setUp() { + GlobalTracer.registerIfAbsent(mockTracer); + } + + @After + public void tearDown() { + mockTracer.reset(); + } + + @ActivityInterface + public interface TestActivity1 { + @ActivityMethod + String activity1(String input); + } + + @ActivityInterface + public interface TestActivity2 { + @ActivityMethod + String activity2(String input); + } + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + String workflow1(String input); + } + + public static class ActivityImpl implements TestActivity1, TestActivity2 { + @Override + public String activity1(String input) { + return "bar"; + } + + @Override + public String activity2(String input) { + return "bar"; + } + } + + private static final AtomicInteger failureCounter = new AtomicInteger(1); + + public static class WorkflowImpl implements TestWorkflow { + private final TestActivity1 activity1 = + Workflow.newActivityStub( + TestActivity1.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofMinutes(1)) + .validateAndBuildWithDefaults()); + + private final TestActivity2 activity2 = + Workflow.newActivityStub( + TestActivity2.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofMinutes(1)) + .validateAndBuildWithDefaults()); + + @Override + public String workflow1(String input) { + activity1.activity1(input); + + if (failureCounter.getAndDecrement() > 0) { + throw ApplicationFailure.newFailure("fail", "fail"); + } + + return activity2.activity2(input); + } + } + + /* + * We are checking that spans structure looks like this: + * ClientFunction + * | + * child + * v + * StartWorkflow:TestWorkflow + * | + * follow + * | + * l__-> RunWorkflow:TestWorkflow, (failure in workflow, retry needed), RunWorkflow:TestWorkflow + * | | + * child child + * v v + * StartActivity:Activity1 -follow> RunActivity:Activity1 StartActivity:Activity1 -follow> RunActivity:Activity1, StartActivity:Activity2 -follow> RunActivity:Activity2 + */ + @Test + public void testWorkflowRetrySpanStructure() { + MockSpan span = mockTracer.buildSpan("ClientFunction").start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + try (Scope scope = mockTracer.scopeManager().activate(span)) { + TestWorkflow workflow = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setRetryOptions( + RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(1)) + .setBackoffCoefficient(1.0) + .setMaximumAttempts(2) + .build()) + .setWorkflowExecutionTimeout(Duration.ofMinutes(1)) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); + assertEquals("bar", workflow.workflow1("input")); + } finally { + span.finish(); + } + + OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans()); + + MockSpan clientSpan = spansHelper.getSpanByOperationName("ClientFunction"); + + MockSpan workflowStartSpan = spansHelper.getByParentSpan(clientSpan).get(0); + assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId()); + assertEquals("StartWorkflow:TestWorkflow", workflowStartSpan.operationName()); + + List workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan); + assertEquals(2, workflowRunSpans.size()); + + MockSpan workflowFirstRunSpan = workflowRunSpans.get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowFirstRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowFirstRunSpan.operationName()); + assertEquals(true, workflowFirstRunSpan.tags().get(StandardTagNames.FAILED)); + + List workflowFirstRunChildren = spansHelper.getByParentSpan(workflowFirstRunSpan); + assertEquals(1, workflowFirstRunChildren.size()); + + MockSpan activity1StartSpan = workflowFirstRunChildren.get(0); + assertEquals(workflowFirstRunSpan.context().spanId(), activity1StartSpan.parentId()); + assertEquals("StartActivity:Activity1", activity1StartSpan.operationName()); + + MockSpan activity1RunSpan = spansHelper.getByParentSpan(activity1StartSpan).get(0); + assertEquals(activity1StartSpan.context().spanId(), activity1RunSpan.parentId()); + assertEquals("RunActivity:Activity1", activity1RunSpan.operationName()); + + MockSpan workflowSecondRunSpan = workflowRunSpans.get(1); + assertEquals(workflowStartSpan.context().spanId(), workflowSecondRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowSecondRunSpan.operationName()); + + List workflowSecondRunChildren = spansHelper.getByParentSpan(workflowSecondRunSpan); + assertEquals(2, workflowSecondRunChildren.size()); + + activity1StartSpan = workflowSecondRunChildren.get(0); + assertEquals(workflowSecondRunSpan.context().spanId(), activity1StartSpan.parentId()); + assertEquals("StartActivity:Activity1", activity1StartSpan.operationName()); + + activity1RunSpan = spansHelper.getByParentSpan(activity1StartSpan).get(0); + assertEquals(activity1StartSpan.context().spanId(), activity1RunSpan.parentId()); + assertEquals("RunActivity:Activity1", activity1RunSpan.operationName()); + + MockSpan activity2StartSpan = workflowSecondRunChildren.get(1); + assertEquals(workflowSecondRunSpan.context().spanId(), activity2StartSpan.parentId()); + assertEquals("StartActivity:Activity2", activity2StartSpan.operationName()); + + MockSpan activity2RunSpan = spansHelper.getByParentSpan(activity2StartSpan).get(0); + assertEquals(activity2StartSpan.context().spanId(), activity2RunSpan.parentId()); + assertEquals("RunActivity:Activity2", activity2RunSpan.operationName()); + } +}