3434import java .util .concurrent .atomic .AtomicReference ;
3535import java .util .concurrent .locks .Lock ;
3636import java .util .concurrent .locks .ReentrantLock ;
37+ import java .util .function .Supplier ;
3738
3839public class WorkflowMutableInstance implements WorkflowInstance {
3940
@@ -47,6 +48,8 @@ public class WorkflowMutableInstance implements WorkflowInstance {
4748 protected AtomicReference <CompletableFuture <WorkflowModel >> futureRef = new AtomicReference <>();
4849 protected Instant completedAt ;
4950
51+ protected final Map <String , Object > additionalObjects = new ConcurrentHashMap <String , Object >();
52+
5053 private Lock statusLock = new ReentrantLock ();
5154 private Map <CompletableFuture <TaskContext >, TaskContext > suspended ;
5255
@@ -84,14 +87,18 @@ protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnabl
8487 .inputFilter ()
8588 .map (f -> f .apply (workflowContext , null , input ))
8689 .orElse (input ))
87- .whenComplete (this ::whenFailed )
90+ .whenComplete (this ::whenCompleted )
8891 .thenApply (this ::whenSuccess );
8992 futureRef .set (future );
9093 return future ;
9194 }
9295
93- private void whenFailed (WorkflowModel result , Throwable ex ) {
96+ private void whenCompleted (WorkflowModel result , Throwable ex ) {
9497 completedAt = Instant .now ();
98+ additionalObjects .values ().stream ()
99+ .filter (AutoCloseable .class ::isInstance )
100+ .map (AutoCloseable .class ::cast )
101+ .forEach (WorkflowUtils ::safeClose );
95102 if (ex != null ) {
96103 handleException (ex instanceof CompletionException ? ex = ex .getCause () : ex );
97104 }
@@ -278,5 +285,9 @@ public boolean cancel() {
278285 }
279286 }
280287
288+ public <T > T additionalObject (String key , Supplier <T > supplier ) {
289+ return (T ) additionalObjects .computeIfAbsent (key , k -> supplier .get ());
290+ }
291+
281292 public void restoreContext (WorkflowContext workflow , TaskContext context ) {}
282293}
0 commit comments