Skip to content

Commit 4611ab4

Browse files
committed
Add ':binding' support
Per metosin#54 in metosin/seippari, this commit could be considered a starting point for supporting bindings support in clojure. The initial implementation used the starting point @robert-stuttaford posted in the referenced issue. A tradeoff here, is that in order to support sync/async w/ the various runtimes, using (bound-fn*) seems to be necessary given the different thread pool implemenations. Counsequently, there is a a perf hit needing to push/pop thread local vars. It doesn't seem to be too much in the general case, but with manifold, it seems to a an order of magnitude slower, though still in the microseconds.
1 parent bfc76d4 commit 4611ab4

File tree

8 files changed

+187
-10
lines changed

8 files changed

+187
-10
lines changed

src/sieppari/async.cljc

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@
55
java.util.concurrent.CompletionException
66
java.util.function.Function)))
77

8+
#?(:clj
9+
(defn -forward-bindings [f]
10+
(fn [ctx]
11+
(with-bindings (or (:bindings ctx) {})
12+
((bound-fn* f) ctx)))))
13+
814
(defprotocol AsyncContext
915
(async? [t])
1016
(continue [t f])
@@ -21,20 +27,33 @@
2127
(extend-protocol AsyncContext
2228
Object
2329
(async? [_] false)
30+
; Given the implementation of enter/leave,
31+
; `continue` won't be called, and therefore,
32+
; the function call does not need to be bound
2433
(continue [t f] (f t))
2534
(await [t] t)))
2635

2736
#?(:cljs
2837
(extend-protocol AsyncContext
2938
default
3039
(async? [_] false)
40+
; Given the implementation of enter/leave,
41+
; `continue` won't be called, and therefore,
42+
; the function call does not need to be bound
3143
(continue [t f] (f t))))
3244

3345
#?(:clj
3446
(extend-protocol AsyncContext
3547
clojure.lang.IDeref
3648
(async? [_] true)
37-
(continue [c f] (future (f @c)))
49+
(continue [c f]
50+
(cond
51+
; Make sure there is
52+
(map? @c)
53+
(with-bindings (or (:bindings @c) {})
54+
(future ((bound-fn* f) @c)))
55+
:else
56+
(future (f @c))))
3857
(catch [c f] (future (let [c @c]
3958
(if (exception? c) (f c) c))))
4059
(await [c] @c)))
@@ -44,6 +63,8 @@
4463
CompletionStage
4564
(async? [_] true)
4665
(continue [this f]
66+
; Given the "context" is a completion stage, there isn't
67+
; a means
4768
(.thenApply ^CompletionStage this
4869
^Function (->FunctionWrapper f)))
4970

src/sieppari/async/core_async.cljc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99
#?(:clj clojure.core.async.impl.protocols.Channel
1010
:cljs cljs.core.async.impl.channels/ManyToManyChannel)
1111
(async? [_] true)
12-
(continue [c f] (go (f (cca/<! c))))
12+
#?(:clj (continue [c f]
13+
(let [f' (sa/-forward-bindings f)]
14+
(go
15+
(f' (cca/<! c)))))
16+
:cljs (continue [c f] (go (f (cca/<! c)))))
1317
(catch [c f] (go (let [c (cca/<! c)]
1418
(if (exception? c) (f c) c))))
1519
#?(:clj (await [c] (<!! c))))

src/sieppari/async/manifold.clj

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,31 @@
22
(:require [sieppari.async :as sa]
33
[manifold.deferred :as d]))
44

5+
; chain'-, as is being used here, is chain'-/3
6+
; chain'-/3, at the time of writing, has an arglist of:
7+
; [d x f]
8+
; where:
9+
; - `d` is a non-realized manifold deferred value, or nil
10+
; to signal a deferred should be returned/provided
11+
; - `x` is either a deferred or a value. If it is a deferred,
12+
; then the deferred is recurively realized until a non-deferred value
13+
; is yeilded.
14+
; - `f` is a function applied to the unwrapped value `x`, before being either realized
15+
; into `d` or being returned as a sucess or error deferred, depending on the result
16+
; of `(f x)`.
517
(extend-protocol sa/AsyncContext
618
manifold.deferred.Deferred
719
(async? [_] true)
8-
(continue [d f] (d/chain'- nil d f))
9-
(catch [d f] (d/catch' d f))
20+
(continue [d f]
21+
(d/chain'- nil d (sa/-forward-bindings f)))
22+
(catch [d f]
23+
(d/catch' d (sa/-forward-bindings f)))
1024
(await [d] (deref d))
1125

1226
manifold.deferred.ErrorDeferred
1327
(async? [_] true)
14-
(continue [d f] (d/chain'- nil d f))
15-
(catch [d f] (d/catch' d f))
28+
(continue [d f]
29+
(d/chain'- nil d (sa/-forward-bindings f)))
30+
(catch [d f]
31+
(d/catch' d (sa/-forward-bindings f)))
1632
(await [d] (deref d)))

src/sieppari/core.cljc

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,17 @@
1414
c/Context
1515
(context? [_] true))
1616

17+
1718
(defn- -try [ctx f]
1819
(if f
1920
(try
20-
(let [ctx* (f ctx)]
21+
(let [ctx* #?(:clj (with-bindings (or (:bindings ctx) {})
22+
; Given the various async
23+
; executors may exec on different threads,
24+
; the fn must be bound in order to preserve
25+
; bindings
26+
((bound-fn* f) ctx))
27+
:cljs (f ctx))]
2128
(if (a/async? ctx*)
2229
(a/catch ctx* (fn [e] (assoc ctx :error e)))
2330
ctx*))
@@ -79,7 +86,7 @@
7986
(callback result))))
8087

8188
(defn- remove-context-keys [ctx]
82-
(dissoc ctx :error :queue :stack))
89+
(dissoc ctx :bindings :error :queue :stack))
8390

8491
;;
8592
;; Public API:

test/clj/sieppari/core_async_test.clj

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,3 +246,36 @@
246246
[:error :c]
247247
[:error :b]
248248
[:leave :a]])))
249+
250+
(def ^:dynamic *boundv* 41)
251+
252+
(defn bindings-handler [_]
253+
(is (= 43 *boundv*))
254+
(go
255+
*boundv*))
256+
257+
(def bindings-chain
258+
[{:enter (fn [ctx]
259+
(go
260+
(assoc ctx
261+
:bindings
262+
{#'*boundv* 42})))
263+
:leave (fn [ctx]
264+
(go
265+
(is (= 42 *boundv*))
266+
ctx))}
267+
{:enter (fn [ctx]
268+
(is (= 42 *boundv*)
269+
"In interceptor failed")
270+
(go
271+
(update-in ctx [:bindings #'*boundv*] inc)))
272+
:leave (fn [ctx]
273+
(go
274+
(update-in ctx [:bindings #'*boundv*] dec)))}
275+
276+
bindings-handler])
277+
278+
(deftest async-bindings-test
279+
(fact "bindings are conveyed across interceptor chain"
280+
(sc/execute bindings-chain {}) => 43))
281+

test/clj/sieppari/core_execute_test.clj

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66
[clojure.string :as str]))
77

88
;;
9-
;; Following tests use a test-chain that has some interceptors
10-
;; that fail on each stage function (enter, leave, error). The
9+
;; Following tests use a test-chain that has some interceptors that fail on each stage function (enter, leave, error). The
1110
;; idea is that the tests override the expected stage functions
1211
;; with test specific function. This ensures that no unexpected
1312
;; stage functions are called.
@@ -269,6 +268,26 @@
269268
[:leave :x]
270269
[:leave :a]]))
271270

271+
272+
(def ^:dynamic *boundv* 41)
273+
274+
(defn bindings-handler [_]
275+
(is (= 42 *boundv*))
276+
*boundv*)
277+
278+
(def bindings-chain
279+
[{:enter (fn [ctx] (assoc ctx
280+
:bindings
281+
{#'*boundv* 42}))}
282+
{:enter (fn [ctx]
283+
(is (= 42 *boundv*))
284+
ctx)}
285+
bindings-handler])
286+
287+
(deftest use-bindings-test
288+
(fact "bindings are conveyed across interceptor chain"
289+
(s/execute bindings-chain {}) => 42))
290+
272291
; TODO: figure out how enqueue should work? Should enqueue add interceptors just
273292
; before the handler?
274293
#_(deftest enqueue-interceptor-test

test/clj/sieppari/manifold_test.clj

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,3 +228,45 @@
228228
[:error :c]
229229
[:error :b]
230230
[:leave :a]])))
231+
232+
(def ^:dynamic *boundv* 41)
233+
234+
(defn bindings-handler [_]
235+
(is (= 43 *boundv*))
236+
(d/chain
237+
nil
238+
(fn [_]
239+
*boundv*)))
240+
241+
(def bindings-chain
242+
[{:enter (fn [ctx]
243+
(d/future
244+
(assoc ctx
245+
:bindings
246+
{#'*boundv* 42})))
247+
:leave (fn [ctx]
248+
(d/chain
249+
ctx
250+
(fn [ctx']
251+
(is (= 42 *boundv*))
252+
ctx')))}
253+
{:enter (fn [ctx]
254+
(is (= 42 *boundv*)
255+
"In interceptor failed")
256+
(d/chain
257+
ctx
258+
#(update-in
259+
%
260+
[:bindings #'*boundv*] inc)))
261+
:leave (fn [ctx]
262+
(d/chain
263+
ctx
264+
#(update-in
265+
%
266+
[:bindings #'*boundv*] dec)))}
267+
bindings-handler])
268+
269+
(deftest async-bindings-test
270+
(fact "bindings are conveyed across interceptor chain"
271+
(sc/execute bindings-chain {}) => 43))
272+

test/clj/sieppari/promesa_test.clj

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,3 +228,38 @@
228228
[:error :c]
229229
[:error :b]
230230
[:leave :a]])))
231+
232+
(def ^:dynamic *boundv* 41)
233+
234+
(defn bindings-handler [_]
235+
(is (= 43 *boundv*))
236+
(p/resolved
237+
*boundv*))
238+
239+
(def bindings-chain
240+
[{:enter (fn [ctx]
241+
(p/resolved
242+
(assoc ctx
243+
:bindings
244+
{#'*boundv* 42})))
245+
:leave (fn [ctx]
246+
(is (= 42 *boundv*))
247+
ctx)}
248+
{:enter (fn [ctx]
249+
(is (= 42 *boundv*))
250+
(-> ctx
251+
(update-in [:bindings #'*boundv*]
252+
inc)
253+
(p/resolved)))
254+
:leave (fn [ctx]
255+
(is (= 43 *boundv*))
256+
(-> ctx
257+
(update-in [:bindings #'*boundv*]
258+
dec)
259+
(p/resolved)))}
260+
bindings-handler])
261+
262+
(deftest async-bindings-test
263+
(fact "bindings are conveyed across interceptor chain"
264+
(sc/execute bindings-chain {}) => 43))
265+

0 commit comments

Comments
 (0)