Skip to content

Commit 7a56aa8

Browse files
authored
support append-only left all join append-only (#739)
1 parent 273900f commit 7a56aa8

File tree

3 files changed

+124
-7
lines changed

3 files changed

+124
-7
lines changed

src/Interpreters/Streaming/HashJoin.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,7 @@ size_t insertFromBlockImpl(
785785
const HashJoin::SupportMatrix HashJoin::support_matrix = {
786786
/// <left_stroage_semantic, join_kind, join_strictness, right_storage_semantic> - supported
787787
/// Append ...
788+
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::Append}, true},
788789
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::ChangelogKV}, true},
789790
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::VersionedKV}, true},
790791
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::Changelog}, true},
@@ -927,15 +928,15 @@ void HashJoin::init()
927928

928929
bidirectional_hash_join = !data_enrichment_join;
929930

930-
/// append-only inner join append-only on ... and date_diff_within(10s)
931+
/// append-only inner/left join append-only on ... and date_diff_within(10s)
931932
/// In case when emitChangeLog()
932933
if (streaming_strictness == Strictness::Range
933934
&& (left_data.join_stream_desc->data_stream_semantic != DataStreamSemantic::Append
934935
|| right_data.join_stream_desc->data_stream_semantic != DataStreamSemantic::Append
935-
|| streaming_kind != Kind::Inner))
936+
|| (streaming_kind != Kind::Inner && streaming_kind != Kind::Left)))
936937
throw Exception(
937938
ErrorCodes::NOT_IMPLEMENTED,
938-
"Only inner range join is supported and the left and right stream must be append-only streams in range join");
939+
"Only inner/left range join is supported and the left and right stream must be append-only streams in range join");
939940

940941
range_bidirectional_hash_join = bidirectional_hash_join && (streaming_strictness == Strictness::Range);
941942

src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -806,7 +806,7 @@ TEST(StreamingHashJoin, SimpleJoinTests)
806806
context);
807807

808808
/// Additional range between
809-
if (Streaming::isAppendStorage(left_data_stream_semantic) && kind == JoinKind::Inner && strictness == JoinStrictness::All
809+
if (Streaming::isAppendStorage(left_data_stream_semantic) && (kind == JoinKind::Inner || kind == JoinKind::Left) && strictness == JoinStrictness::All
810810
&& Streaming::isAppendStorage(right_data_stream_semantic))
811811
{
812812
commonTest(
@@ -826,6 +826,118 @@ TEST(StreamingHashJoin, SimpleJoinTests)
826826
}
827827
}
828828

829+
TEST(StreamingHashJoin, AppendLeftAllJoinAppend)
830+
{
831+
auto context = getContext().context;
832+
Block left_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
833+
Block right_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
834+
835+
/// stream(t1) left all join stream(t2) on t1.col_1 = t2.col_1
836+
commonTest(
837+
"left",
838+
"all",
839+
/*on_clause*/ "t1.col_1 = t2.col_1",
840+
left_header,
841+
Streaming::StorageSemantic::Append,
842+
/*left_primary_key_column_indexes*/ std::nullopt,
843+
right_header,
844+
Streaming::StorageSemantic::Append,
845+
/*right_primary_key_column_indexes*/ std::nullopt,
846+
/*to_join_steps*/
847+
{
848+
{
849+
/*to join pos*/ ToJoinStep::RIGHT,
850+
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:00')", context),
851+
/*expected join results*/ ExpectedJoinResults{},
852+
},
853+
{
854+
/*to join pos*/ ToJoinStep::LEFT,
855+
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:00')", context),
856+
/*expected join results*/
857+
ExpectedJoinResults{
858+
/// output header: col_1, col_2, t2.col_2
859+
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')"
860+
"(2, '2023-1-1 00:00:00', '1970-1-1 00:00:00')",
861+
},
862+
},
863+
},
864+
context);
865+
}
866+
867+
TEST(StreamingHashJoin, AppendLeftRangeJoinAppend)
868+
{
869+
auto context = getContext().context;
870+
Block left_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
871+
Block right_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
872+
873+
commonTest(
874+
"left",
875+
"all",
876+
/*on_clause*/ "t1.col_1 = t2.col_1 and date_diff_within(2s, t1.col_2, t2.col_2)",
877+
left_header,
878+
Streaming::StorageSemantic::Append,
879+
/*left_primary_key_column_indexes*/ std::nullopt,
880+
right_header,
881+
Streaming::StorageSemantic::Append,
882+
/*right_primary_key_column_indexes*/ std::nullopt,
883+
/*to_join_steps*/
884+
{
885+
{
886+
/*to join pos*/ ToJoinStep::RIGHT,
887+
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:00')(1, '2023-1-1 00:00:01')", context),
888+
/*expected join results*/ ExpectedJoinResults{},
889+
},
890+
{
891+
/*to join pos*/ ToJoinStep::LEFT,
892+
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:00')", context),
893+
/*expected join results*/
894+
ExpectedJoinResults{
895+
/// output header: col_1, col_2, t2.col_2
896+
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')(1, '2023-1-1 00:00:00', '2023-1-1 00:00:01')"
897+
"(2, '2023-1-1 00:00:00', '1970-1-1 00:00:00')",
898+
},
899+
},
900+
{
901+
/*to join pos*/ ToJoinStep::RIGHT,
902+
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:02')(1, '2023-1-1 00:00:03')(2, '2023-1-1 00:00:02')(2, '2023-1-1 00:00:03')", context),
903+
/*expected join results*/
904+
ExpectedJoinResults{
905+
/// output header: col_1, col_2, t2.col_2
906+
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
907+
"(2, '2023-1-1 00:00:00', '2023-1-1 00:00:02')",
908+
},
909+
},
910+
{
911+
/*to join pos*/ ToJoinStep::LEFT,
912+
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(1, '2023-1-1 00:00:02')", context),
913+
/*expected join results*/
914+
ExpectedJoinResults{
915+
/// output header: col_1, col_2, t2.col_2
916+
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')"
917+
"(1, '2023-1-1 00:00:00', '2023-1-1 00:00:01')"
918+
"(1, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
919+
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:00')"
920+
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:01')"
921+
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:02')"
922+
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:03')",
923+
},
924+
},
925+
{
926+
/*to join pos*/ ToJoinStep::LEFT,
927+
/*to join block*/ prepareBlockByHeader(left_header, "(2, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:02')(3, '2023-1-1 00:00:03')", context),
928+
/*expected join results*/
929+
ExpectedJoinResults{
930+
/// output header: col_1, col_2, t2.col_2
931+
.values = "(2, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
932+
"(2, '2023-1-1 00:00:02', '2023-1-1 00:00:02')"
933+
"(2, '2023-1-1 00:00:02', '2023-1-1 00:00:03')"
934+
"(3, '2023-1-1 00:00:03', '1970-1-1 00:00:00')",
935+
},
936+
},
937+
},
938+
context);
939+
}
940+
829941
TEST(StreamingHashJoin, AppendLeftAsofJoinAppend)
830942
{
831943
auto context = getContext().context;

tests/stream/test_stream_smoke/0009_stream_join_stream_case2.json

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,15 +203,19 @@
203203
{"client":"python", "query_type": "stream", "query_id":"1056", "wait":1, "terminate":"manual", "query":"select i, k, j, kk from test10_append_left_stream left join test10_append_right_stream on k == kk;"},
204204
{"client":"python", "query_type": "table", "depends_on":"1056", "wait":1, "query": "insert into test10_append_right_stream (j, kk) values (1, 'a') (1, 'b');"},
205205
{"client":"python", "query_type": "table", "wait":1, "query": "insert into test10_append_left_stream (i, k) values (2, 'a');"},
206-
{"client":"python", "query_type": "table", "wait":1, "query": "insert into test10_append_left_stream (i, k) values (2, 'b');"},
207-
{"client":"python", "query_type": "table", "kill":"1056", "kill_wait":3, "wait":1, "query": "insert into test10_append_left_stream (i, k) values (2, 'c');"}
206+
{"client":"python", "query_type": "table", "query": "insert into test10_append_left_stream (i, k) values (2, 'b');"},
207+
{"client":"python", "query_type": "table", "kill":"1056", "kill_wait":3, "query": "insert into test10_append_left_stream (i, k) values (2, 'c');"}
208208
]
209209
}
210210
],
211211
"expected_results": [
212212
{
213213
"query_id":"1056",
214-
"expected_results": "error_code:48"
214+
"expected_results":[
215+
[2, "a", 1, "a"],
216+
[2, "b", 1, "b"],
217+
[2, "c", 0, ""]
218+
]
215219
}
216220
]
217221
},

0 commit comments

Comments
 (0)