Skip to content

Commit 6d7f85b

Browse files
committed
Implements passing Headers in WorkflowOptions & WorkflowClientInterceptor.
Drafted for discussion for PR temporalio#382
1 parent b3fad1d commit 6d7f85b

File tree

6 files changed

+222
-2
lines changed

6 files changed

+222
-2
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public static WorkflowOptions merge(
6767
.setCronSchedule(OptionsUtils.merge(cronAnnotation, o.getCronSchedule(), String.class))
6868
.setMemo(o.getMemo())
6969
.setSearchAttributes(o.getSearchAttributes())
70+
.setHeaders(o.getHeaders())
7071
.setContextPropagators(o.getContextPropagators())
7172
.validateBuildWithDefaults();
7273
}
@@ -93,6 +94,8 @@ public static final class Builder {
9394

9495
private Map<String, Object> searchAttributes;
9596

97+
private Map<String, Object> headers;
98+
9699
private List<ContextPropagator> contextPropagators;
97100

98101
private Builder() {}
@@ -111,6 +114,7 @@ private Builder(WorkflowOptions options) {
111114
this.cronSchedule = options.cronSchedule;
112115
this.memo = options.memo;
113116
this.searchAttributes = options.searchAttributes;
117+
this.headers = options.headers;
114118
this.contextPropagators = options.contextPropagators;
115119
}
116120

@@ -217,6 +221,11 @@ public Builder setContextPropagators(List<ContextPropagator> contextPropagators)
217221
return this;
218222
}
219223

224+
public Builder setHeaders(Map<String, Object> headers) {
225+
this.headers = headers;
226+
return this;
227+
}
228+
220229
public WorkflowOptions build() {
221230
return new WorkflowOptions(
222231
workflowId,
@@ -229,6 +238,7 @@ public WorkflowOptions build() {
229238
cronSchedule,
230239
memo,
231240
searchAttributes,
241+
headers,
232242
contextPropagators);
233243
}
234244

@@ -247,6 +257,7 @@ public WorkflowOptions validateBuildWithDefaults() {
247257
cronSchedule,
248258
memo,
249259
searchAttributes,
260+
headers,
250261
contextPropagators);
251262
}
252263
}
@@ -271,6 +282,8 @@ public WorkflowOptions validateBuildWithDefaults() {
271282

272283
private final Map<String, Object> searchAttributes;
273284

285+
private final Map<String, Object> headers;
286+
274287
private final List<ContextPropagator> contextPropagators;
275288

276289
private WorkflowOptions(
@@ -284,6 +297,7 @@ private WorkflowOptions(
284297
String cronSchedule,
285298
Map<String, Object> memo,
286299
Map<String, Object> searchAttributes,
300+
Map<String, Object> headers,
287301
List<ContextPropagator> contextPropagators) {
288302
this.workflowId = workflowId;
289303
this.workflowIdReusePolicy = workflowIdReusePolicy;
@@ -295,6 +309,7 @@ private WorkflowOptions(
295309
this.cronSchedule = cronSchedule;
296310
this.memo = memo;
297311
this.searchAttributes = searchAttributes;
312+
this.headers = headers;
298313
this.contextPropagators = contextPropagators;
299314
}
300315

@@ -338,6 +353,10 @@ public Map<String, Object> getSearchAttributes() {
338353
return searchAttributes;
339354
}
340355

356+
public Map<String, Object> getHeaders() {
357+
return headers;
358+
}
359+
341360
public List<ContextPropagator> getContextPropagators() {
342361
return contextPropagators;
343362
}

temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,6 @@ <R> CompletableFuture<R> getResultAsync(
160160
void terminate(String reason, Object... details);
161161

162162
Optional<WorkflowOptions> getOptions();
163+
164+
void setOptions(WorkflowOptions options);
163165
}

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowStubImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class WorkflowStubImpl implements WorkflowStub {
8282
private final Optional<String> workflowType;
8383
private final Scope metricsScope;
8484
private final AtomicReference<WorkflowExecution> execution = new AtomicReference<>();
85-
private final Optional<WorkflowOptions> options;
85+
private Optional<WorkflowOptions> options;
8686
private final WorkflowClientOptions clientOptions;
8787

8888
WorkflowStubImpl(
@@ -235,7 +235,10 @@ private StartWorkflowExecutionRequest newStartWorkflowExecutionRequest(
235235
}
236236
if (o.getContextPropagators() != null && !o.getContextPropagators().isEmpty()) {
237237
Map<String, Payload> context = extractContextsAndConvertToBytes(o.getContextPropagators());
238-
request.setHeader(Header.newBuilder().putAllFields(context));
238+
request.setHeader(
239+
Header.newBuilder()
240+
.putAllFields(context)
241+
.putAllFields(convertMemoFromObjectToBytes(o.getHeaders())));
239242
}
240243
return request.build();
241244
}
@@ -536,6 +539,11 @@ public Optional<WorkflowOptions> getOptions() {
536539
return options;
537540
}
538541

542+
@Override
543+
public void setOptions(WorkflowOptions options) {
544+
this.options = Optional.of(options);
545+
}
546+
539547
private void checkStarted() {
540548
if (execution.get() == null || execution.get().getWorkflowId() == null) {
541549
throw new IllegalStateException("Null workflowId. Was workflow started?");
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.temporal.common.interceptors;
2+
3+
import io.temporal.client.WorkflowOptions;
4+
import io.temporal.client.WorkflowStub;
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
8+
//This way is bad also because the header is passed when the stub is created, not when the execution is triggered.
9+
//It can be problematic, especially if our context is thread local and stubs are created and executed in different threads
10+
public class SampleOpenTracingLikeInterceptor extends WorkflowClientInterceptorBase {
11+
@Override
12+
public WorkflowStub newUntypedWorkflowStub(
13+
String workflowType, WorkflowOptions options, WorkflowStub next) {
14+
Map<String, Object> originalHeaders = options != null ? options.getHeaders() : null;
15+
Map<String, Object> newHeaders;
16+
17+
if (originalHeaders == null) {
18+
newHeaders = new HashMap<>();
19+
} else {
20+
// we want to copy it, because right now WorkflowOptions exposes the collection itself, so if
21+
// we modify it -
22+
// we will modify a collection inside WorkflowOptions that supposes to be immutable (because
23+
// it has a builder)
24+
newHeaders = new HashMap<>(originalHeaders);
25+
}
26+
newHeaders.put("opentracing", new Object());
27+
WorkflowOptions modifiedOptions =
28+
WorkflowOptions.newBuilder(options).setHeaders(newHeaders).build();
29+
// it's either
30+
// 1. setOption on stub,
31+
// or
32+
// 2. We hack supposedly immutable WorkflowOptions instance and directly modify the headers
33+
// collection.
34+
next.setOptions(modifiedOptions);
35+
return next;
36+
}
37+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package io.temporal.common.interceptors;
2+
3+
import io.temporal.api.common.v1.WorkflowExecution;
4+
import io.temporal.client.WorkflowOptions;
5+
import io.temporal.client.WorkflowStub;
6+
7+
import java.lang.reflect.Type;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.Optional;
11+
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.TimeoutException;
14+
15+
/**
16+
* This is a "better" implementation, that will give right behavior.
17+
* Requires getOptions setOptions on stub to hack it right before the call.
18+
*/
19+
public class SampleOpenTracingLikeInterceptor2 extends WorkflowClientInterceptorBase {
20+
@Override
21+
public WorkflowStub newUntypedWorkflowStub(
22+
String workflowType, WorkflowOptions options, WorkflowStub next) {
23+
return new WorkflowStub() {
24+
private void hackHeaders() {
25+
//We shouldn't use headers that are passed as a parameter to newUntypedWorkflowStub, because
26+
//they can be outdated at a time of call because of the exposed setOptions
27+
WorkflowOptions options = next.getOptions().orElse(null);
28+
Map<String, Object> originalHeaders = options != null ? options.getHeaders() : null;
29+
Map<String, Object> newHeaders;
30+
31+
if (originalHeaders == null) {
32+
newHeaders = new HashMap<>();
33+
} else {
34+
// we want to copy it, because right now WorkflowOptions exposes the collection itself, so if
35+
// we modify it -
36+
// we will modify a collection inside WorkflowOptions that supposes to be immutable (because
37+
// it has a builder)
38+
newHeaders = new HashMap<>(originalHeaders);
39+
}
40+
newHeaders.put("opentracing", new Object());
41+
WorkflowOptions modifiedOptions =
42+
WorkflowOptions.newBuilder(options).setHeaders(newHeaders).build();
43+
// it's either
44+
// 1. setOption on stub,
45+
// or
46+
// 2. We hack supposedly immutable WorkflowOptions instance and directly modify the headers
47+
// collection.
48+
next.setOptions(modifiedOptions);
49+
}
50+
51+
@Override
52+
public WorkflowExecution start(Object... args) {
53+
hackHeaders();
54+
return next.start(args);
55+
}
56+
57+
@Override
58+
public WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs) {
59+
hackHeaders();
60+
return next.signalWithStart(signalName, signalArgs, startArgs);
61+
}
62+
63+
@Override
64+
public void signal(String signalName, Object... args) {
65+
next.signal(signalName, args);
66+
}
67+
68+
@Override
69+
public Optional<String> getWorkflowType() {
70+
return next.getWorkflowType();
71+
}
72+
73+
@Override
74+
public WorkflowExecution getExecution() {
75+
return next.getExecution();
76+
}
77+
78+
@Override
79+
public <R> R getResult(Class<R> resultClass, Type resultType) {
80+
return next.getResult(resultClass, resultType);
81+
}
82+
83+
@Override
84+
public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass, Type resultType) {
85+
return next.getResultAsync(resultClass, resultType);
86+
}
87+
88+
@Override
89+
public <R> R getResult(Class<R> resultClass) {
90+
return next.getResult(resultClass);
91+
}
92+
93+
@Override
94+
public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass) {
95+
return next.getResultAsync(resultClass);
96+
}
97+
98+
@Override
99+
public <R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass, Type resultType) throws TimeoutException {
100+
return next.getResult(timeout, unit, resultClass, resultType);
101+
}
102+
103+
@Override
104+
public <R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass) throws TimeoutException {
105+
return next.getResult(timeout, unit, resultClass);
106+
}
107+
108+
@Override
109+
public <R> CompletableFuture<R> getResultAsync(long timeout, TimeUnit unit, Class<R> resultClass, Type resultType) {
110+
return next.getResultAsync(timeout, unit, resultClass, resultType);
111+
}
112+
113+
@Override
114+
public <R> CompletableFuture<R> getResultAsync(long timeout, TimeUnit unit, Class<R> resultClass) {
115+
return next.getResultAsync(timeout, unit, resultClass);
116+
}
117+
118+
@Override
119+
public <R> R query(String queryType, Class<R> resultClass, Object... args) {
120+
return next.query(queryType, resultClass, args);
121+
}
122+
123+
@Override
124+
public <R> R query(String queryType, Class<R> resultClass, Type resultType, Object... args) {
125+
return next.query(queryType, resultClass, resultType, args);
126+
}
127+
128+
@Override
129+
public void cancel() {
130+
next.cancel();
131+
}
132+
133+
@Override
134+
public void terminate(String reason, Object... details) {
135+
next.terminate(reason, details);
136+
}
137+
138+
@Override
139+
public Optional<WorkflowOptions> getOptions() {
140+
return next.getOptions();
141+
}
142+
143+
@Override
144+
public void setOptions(WorkflowOptions options) {
145+
next.setOptions(options);
146+
}
147+
};
148+
}
149+
}

temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,11 @@ public Optional<WorkflowOptions> getOptions() {
371371
return next.getOptions();
372372
}
373373

374+
@Override
375+
public void setOptions(WorkflowOptions options) {
376+
next.setOptions(options);
377+
}
378+
374379
/** Unlocks time skipping before blocking calls and locks back after completion. */
375380
private class TimeLockingFuture<R> extends CompletableFuture<R> {
376381

0 commit comments

Comments
 (0)