Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds IKStreamBase/join with arity 5 to configure left and right serdes and possibly name the join #363

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

### Unreleased
- Adds IKStreamBase/join with arity 5 to configure left and right serdes and possibly name the join

### [0.9.12] - [2023-12-05]
- Support for Foreign Key joins [#365](https://github.com/FundingCircle/jackdaw/pull/365) (Issue [#364])
Expand Down
2 changes: 2 additions & 0 deletions src/jackdaw/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
share the same key or a foreign key using an inner join."
([kstream-or-ktable ktable value-joiner-fn]
(p/join kstream-or-ktable ktable value-joiner-fn))
([kstream-or-ktable ktable value-joiner-fn this-topic-config other-topic-config]
(p/join kstream-or-ktable ktable value-joiner-fn this-topic-config other-topic-config))
([kstream-or-ktable ktable foreign-key-extractor-fn value-joiner-fn]
(p/join kstream-or-ktable ktable foreign-key-extractor-fn value-joiner-fn)))

Expand Down
6 changes: 6 additions & 0 deletions src/jackdaw/streams/configured.clj
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@
config
(join kstream ktable value-joiner-fn)))

(join
[_ ktable value-joiner-fn topic-config other-topic-config]
(configured-kstream
config
(join kstream ktable value-joiner-fn topic-config other-topic-config)))

(left-join
[_ ktable value-joiner-fn]
(configured-kstream
Expand Down
22 changes: 20 additions & 2 deletions src/jackdaw/streams/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@
key-serde (.withKeySerde key-serde)
value-serde (.withValueSerde value-serde)))

(defn ->joined
"Builds a Joined object that represents optional params that can be
passed to join and left-join operations."
([key-serde value-serde other-value-serde]
(Joined/with key-serde value-serde other-value-serde))
([key-serde value-serde other-value-serde joined-name]
(Joined/with key-serde value-serde other-value-serde joined-name)))

(defn suppress-config->suppressed
[{:keys [max-records max-bytes until-time-limit-ms]}]
(let [config (cond
Expand Down Expand Up @@ -154,6 +162,16 @@
^KTable (ktable* ktable)
^ValueJoiner (value-joiner value-joiner-fn))))

(join
[_ ktable value-joiner-fn
{key-serde :key-serde this-value-serde :value-serde joined-name :name}
{other-value-serde :value-serde}]
(clj-kstream
(.join ^KStream kstream
^KTable (ktable* ktable)
^ValueJoiner (value-joiner value-joiner-fn)
^Joined (->joined key-serde this-value-serde other-value-serde joined-name))))

(left-join
[_ ktable value-joiner-fn]
(clj-kstream
Expand All @@ -163,13 +181,13 @@

(left-join
[_ ktable value-joiner-fn
{key-serde :key-serde this-value-serde :value-serde}
{key-serde :key-serde this-value-serde :value-serde joined-name :name}
{other-value-serde :value-serde}]
(clj-kstream
(.leftJoin kstream
^KTable (ktable* ktable)
^ValueJoiner (value-joiner value-joiner-fn)
(Joined/with key-serde this-value-serde other-value-serde))))
^Joined (->joined key-serde this-value-serde other-value-serde joined-name))))

(peek
[_ peek-fn]
Expand Down
1 change: 1 addition & 0 deletions src/jackdaw/streams/protocols.clj
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"Methods common to KStream and KTable."
(join
[kstream-or-ktable ktable value-joiner-fn]
[kstream-or-ktable ktable value-joiner-fn this-topic-config other-topic-config]
[kstream-or-ktable ktable foreign-key-extractor-fn value-joiner-fn]
"Combines the values of the KStream-or-KTable with the values of the
KTable that share the same key using an inner join. For foreign key
Expand Down
14 changes: 11 additions & 3 deletions src/jackdaw/streams/specs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@

(s/def ::topic-configs (s/coll-of ::topic-config))

(s/def ::name string?)
(s/def ::join-config
(s/keys :req-un [::key-serde
::value-serde]
:opt-un [::name]))

(s/def ::kstreams (s/coll-of kstream?))
(s/def ::kstream-or-ktable (s/or :kstream kstream? :ktable ktable?))

Expand Down Expand Up @@ -103,7 +109,9 @@
(s/fdef k/join
:args (s/or :pk-join (s/cat :kstream-or-ktable ::kstream-or-ktable
:ktable ktable?
:value-joiner-fn ifn?)
:value-joiner-fn ifn?
:this-topic-config (s/? ::join-config)
:other-topic-config (s/? ::join-config))
:fk-join (s/cat :kstream-or-ktable ::kstream-or-ktable
:ktable ktable?
:fk-extractor-fn ifn?
Expand All @@ -114,8 +122,8 @@
:args (s/cat :kstream-or-ktable ::kstream-or-ktable
:ktable ktable?
:value-joiner-fn ifn?
:this-topic-config (s/? ::topic-config)
:other-topic-config (s/? ::topic-config))
:this-topic-config (s/? ::join-config)
:other-topic-config (s/? ::join-config))
:ret ::kstream-or-ktable)

(s/fdef k/for-each!
Expand Down
9 changes: 7 additions & 2 deletions test/jackdaw/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,17 @@
(testing "join"
(let [topic-a (mock/topic "table-a")
topic-b (mock/topic "table-b")
topic-c (mock/topic "topic-c")]
topic-c (mock/topic "topic-c")
join-config {:key-serde (Serdes/Long)
:value-serde (Serdes/Long)
:name "a-b-join"}]

(with-open [driver (mock/build-driver (fn [builder]
(let [left (k/kstream builder topic-a)
right (k/ktable builder topic-b)]
(-> (k/join left right +)
(-> (k/join left right +
join-config
topic-b)
(k/to topic-c)))))]
(let [publish-left (partial mock/publish driver topic-a)
publish-right (partial mock/publish driver topic-b)]
Expand Down