Skip to content

Commit d2cefd7

Browse files
authored
Merge branch 'develop' into bugfix/issue-716-udf-memory-limit
2 parents b57d4f5 + 7ab2ec9 commit d2cefd7

12 files changed

+218
-71
lines changed

src/Interpreters/Streaming/ChangelogQueryVisitor.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ namespace ErrorCodes
1717
{
1818
extern const int UNKNOWN_IDENTIFIER;
1919
extern const int NOT_IMPLEMENTED;
20+
extern const int INCORRECT_QUERY;
2021
}
2122

2223
namespace Streaming
@@ -70,8 +71,7 @@ void ChangelogQueryVisitorMatcher::visit(ASTSelectQuery & select_query, ASTPtr &
7071
{
7172
/// For changelog/changelog_kv, the `*` include `_tp_delta`
7273
/// For versioned_kv, the `*` didn't include `_tp_delta`
73-
bool asterisk_include_delta = isChangelogDataStream(tables_with_columns.front().output_data_stream_semantic)
74-
|| isChangelogKeyedStorage(tables_with_columns.front().output_data_stream_semantic);
74+
bool asterisk_include_delta = isChangelogDataStream(tables_with_columns.front().output_data_stream_semantic);
7575
addDeltaColumn(select_query, asterisk_include_delta);
7676
}
7777
}
@@ -131,10 +131,10 @@ void ChangelogQueryVisitorMatcher::addDeltaColumn(ASTSelectQuery & select_query,
131131
if (!found_delta_col)
132132
{
133133
if (is_subquery)
134-
/// Need add delta if _tp_delta is not present and the @p select_query is a subquery
134+
/// Need add delta if _tp_delta is not present and the \param select_query is a subquery (not top level select)
135135
select_expression_list->children.emplace_back(std::make_shared<ASTIdentifier>(ProtonConsts::RESERVED_DELTA_FLAG));
136-
else if (query_info.force_emit_changelog)
137-
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "The query with emit changelog explicitly requires a `_tp_delta` in select list");
136+
else
137+
throw Exception(ErrorCodes::INCORRECT_QUERY, "The query with changelog output requires selecting the `_tp_delta` column explicitly");
138138
}
139139

140140
if (add_new_required_result_columns)
@@ -143,3 +143,4 @@ void ChangelogQueryVisitorMatcher::addDeltaColumn(ASTSelectQuery & select_query,
143143

144144
}
145145
}
146+

src/Interpreters/Streaming/JoinStreamDescription.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void JoinStreamDescription::calculateColumnPositions(JoinStrictness strictness)
7575
if (table_with_columns.hasColumn(ProtonConsts::RESERVED_DELTA_FLAG))
7676
delta_column_position = calc_column_position(ProtonConsts::RESERVED_DELTA_FLAG);
7777

78-
assertValid();
78+
checkValid();
7979
}
8080

8181
const String & JoinStreamDescription::deltaColumnName() const
@@ -85,11 +85,13 @@ const String & JoinStreamDescription::deltaColumnName() const
8585
return input_header.getByPosition(*delta_column_position).name;
8686
}
8787

88-
void JoinStreamDescription::assertValid() const
88+
void JoinStreamDescription::checkValid() const
8989
{
90-
/// If it is a keyed data stream, we are expecting `delta` column or `primary key + version column`
90+
/// If it is a changelog data stream, we are expecting `delta` column or `primary key + version column`
9191
/// are there in the input
92-
assert(Streaming::isAppendDataStream(data_stream_semantic) || (hasDeltaColumn() || (hasPrimaryKey() && hasVersionColumn())));
92+
if (Streaming::isChangelogDataStream(data_stream_semantic) && (!hasDeltaColumn() && !(hasPrimaryKey() && hasVersionColumn())))
93+
throw Exception(ErrorCodes::LOGICAL_ERROR, "The changelog data stream requires '_tp_delta' column or 'primary key + version column' in the pipeline's input");
9394
}
9495
}
9596
}
97+

src/Interpreters/Streaming/JoinStreamDescription.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ struct JoinStreamDescription
102102
std::optional<size_t> delta_column_position;
103103

104104
private:
105-
void assertValid() const;
105+
void checkValid() const;
106106
};
107107

108108
using JoinStreamDescriptionPtr = std::shared_ptr<JoinStreamDescription>;

src/Interpreters/getTableExpressions.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ static NamesAndTypesList getColumnsFromTableExpression(
123123
aliases = columns.getAliases();
124124
virtuals = table->getVirtuals();
125125

126-
/// proton : starts. Calculate hash semantic
126+
/// proton : starts. Calculate \output_data_stream_semantic if exists
127127
if (output_data_stream_semantic)
128128
{
129129
*output_data_stream_semantic = table->dataStreamSemantic();

src/Storages/StorageView.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ void StorageView::read(
138138
Streaming::rewriteSubquery(current_inner_query->as<ASTSelectWithUnionQuery &>(), query_info);
139139
/// proton: ends.
140140

141-
auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, true, query_info.settings_limit_offset_done);
141+
auto options = SelectQueryOptions(QueryProcessingStage::Complete, 1, true, query_info.settings_limit_offset_done);
142142
InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, options, column_names);
143143
interpreter.addStorageLimits(*query_info.storage_limits);
144144
interpreter.buildQueryPlan(query_plan);
@@ -268,7 +268,7 @@ bool StorageView::isStreamingQuery(ContextPtr query_context) const
268268
auto select = getInMemoryMetadataPtr()->getSelectQuery().inner_query;
269269
auto local_ctx = Context::createCopy(query_context);
270270
local_ctx->setCollectRequiredColumns(false);
271-
return InterpreterSelectWithUnionQuery(select, local_ctx, SelectQueryOptions().analyze()).isStreamingQuery();
271+
return InterpreterSelectWithUnionQuery(select, local_ctx, SelectQueryOptions().subquery().analyze()).isStreamingQuery();
272272
}
273273

274274
Streaming::DataStreamSemanticEx StorageView::dataStreamSemantic() const
@@ -280,7 +280,7 @@ Streaming::DataStreamSemanticEx StorageView::dataStreamSemantic() const
280280
auto ctx = Context::createCopy(local_context);
281281
ctx->setCollectRequiredColumns(false);
282282

283-
data_stream_semantic = InterpreterSelectWithUnionQuery(select, ctx, SelectQueryOptions().analyze()).getDataStreamSemantic();
283+
data_stream_semantic = InterpreterSelectWithUnionQuery(select, ctx, SelectQueryOptions().subquery().analyze()).getDataStreamSemantic();
284284

285285
data_stream_semantic_resolved = true;
286286

tests/stream/test_stream_smoke/0013_changelog_stream5.yaml

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ tests:
861861
query_id: '1419'
862862
depends_on_stream: test14_subquery_5
863863
query_type: stream
864-
query: select lag(i, 2), k2 from test14_view_5;
864+
query: select lag(i, 2), k2, _tp_delta from test14_view_5;
865865

866866
- client: python
867867
query_type: table
@@ -904,14 +904,14 @@ tests:
904904
expected_results:
905905
- query_id: '1419'
906906
expected_results:
907-
- [0, 'a']
908-
- [0, 'a']
909-
- [4, 'a']
910-
- [4, 'b']
911-
- [2, 'a']
912-
- [5, 'a']
913-
- [2, 'b']
914-
- [2, 'b']
907+
- [0, 'a', 1]
908+
- [0, 'a', -1]
909+
- [4, 'a', 1]
910+
- [4, 'b', 1]
911+
- [2, 'a', -1]
912+
- [5, 'a', 1]
913+
- [2, 'b', -1]
914+
- [2, 'b', 1]
915915

916916
- id: 20
917917
tags:
@@ -1046,7 +1046,7 @@ tests:
10461046
query_id: '1421'
10471047
depends_on_stream: test14_subquery_5
10481048
query_type: stream
1049-
query: select lag(i, 2), k2 from test14_view2_5;
1049+
query: select lag(i, 2), k2, _tp_delta from test14_view2_5;
10501050

10511051
- client: python
10521052
query_type: table
@@ -1094,14 +1094,14 @@ tests:
10941094
expected_results:
10951095
- query_id: '1421'
10961096
expected_results:
1097-
- [0, 'a']
1098-
- [0, 'a']
1099-
- [4, 'a']
1100-
- [4, 'b']
1101-
- [2, 'a']
1102-
- [5, 'a']
1103-
- [2, 'b']
1104-
- [2, 'b']
1097+
- [0, 'a', 1]
1098+
- [0, 'a', -1]
1099+
- [4, 'a', 1]
1100+
- [4, 'b', 1]
1101+
- [2, 'a', -1]
1102+
- [5, 'a', 1]
1103+
- [2, 'b', -1]
1104+
- [2, 'b', 1]
11051105

11061106
- id: 22
11071107
tags:
@@ -1129,7 +1129,7 @@ tests:
11291129
with cte1 as (
11301130
with cte2 as (select i, k1, k2, _tp_delta from changelog(test14_subquery_5, k2))
11311131
select * from cte2
1132-
)select lag(i, 2), k2 from cte1;
1132+
)select lag(i, 2), k2, _tp_delta from cte1;
11331133
11341134
- client: python
11351135
query_type: table
@@ -1167,14 +1167,14 @@ tests:
11671167
expected_results:
11681168
- query_id: '1422'
11691169
expected_results:
1170-
- [0, 'a']
1171-
- [0, 'a']
1172-
- [4, 'a']
1173-
- [4, 'b']
1174-
- [2, 'a']
1175-
- [5, 'a']
1176-
- [2, 'b']
1177-
- [2, 'b']
1170+
- [0, 'a', 1]
1171+
- [0, 'a', -1]
1172+
- [4, 'a', 1]
1173+
- [4, 'b', 1]
1174+
- [2, 'a', -1]
1175+
- [5, 'a', 1]
1176+
- [2, 'b', -1]
1177+
- [2, 'b', 1]
11781178

11791179
- id: 23
11801180
tags:

tests/stream/test_stream_smoke/0013_changelog_stream6.yaml

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -743,7 +743,7 @@ tests:
743743
query_id: '1431'
744744
depends_on_stream: test14_substream_6
745745
query_type: stream
746-
query: select lag(val, 1), val, id, name from changelog(test14_substream_6, id) partition by id;
746+
query: select lag(val, 1), val, id, name, _tp_delta from changelog(test14_substream_6, id) partition by id;
747747

748748
- client: python
749749
query_type: table
@@ -796,19 +796,19 @@ tests:
796796
expected_results:
797797
- query_id: '1431'
798798
expected_results:
799-
- [0, 1, 2, 'a']
800-
- [0, 3, 3, 'b']
801-
- [0, 5, 1, 'c']
802-
- [1, 1, 2, 'a']
803-
- [1, 8, 2, 'a']
804-
- [3, 3, 3, 'b']
805-
- [3, 9, 3, 'b']
806-
- [8, 8, 2, 'a']
807-
- [8, 11, 2, 'b']
808-
- [5, 5, 1, 'c']
809-
- [5, 10, 1, 'c']
810-
- [11, 11, 2, 'b']
811-
- [11, 14, 2, 'b']
799+
- [0, 1, 2, 'a', 1]
800+
- [0, 3, 3, 'b', 1]
801+
- [0, 5, 1, 'c', 1]
802+
- [1, 1, 2, 'a', -1]
803+
- [1, 8, 2, 'a', 1]
804+
- [3, 3, 3, 'b', -1]
805+
- [3, 9, 3, 'b', 1]
806+
- [8, 8, 2, 'a', -1]
807+
- [8, 11, 2, 'b', 1]
808+
- [5, 5, 1, 'c', -1]
809+
- [5, 10, 1, 'c', 1]
810+
- [11, 11, 2, 'b', -1]
811+
- [11, 14, 2, 'b', 1]
812812

813813
- id: 32
814814
tags:

tests/stream/test_stream_smoke/0013_changelogstream_join_table5.json

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2949,7 +2949,7 @@
29492949
"query_id": "14199",
29502950
"wait": 5,
29512951
"terminate": "manual",
2952-
"query": "with t as (select * from test14_stream1_5 as a join table(test14_stream2_5) as b on a.j = b.j2)select i, j, k, i2, j2, k2, i3, j3, k3 from t join table(test14_stream3_5) as k on t.j = k.j3;"
2952+
"query": "with t as (select * from test14_stream1_5 as a join table(test14_stream2_5) as b on a.j = b.j2)select i, j, k, i2, j2, k2, i3, j3, k3, _tp_delta from t join table(test14_stream3_5) as k on t.j = k.j3;"
29532953
},
29542954
{
29552955
"client": "python",
@@ -2994,7 +2994,8 @@
29942994
"a",
29952995
"3",
29962996
"2",
2997-
"e"
2997+
"e",
2998+
1
29982999
],
29993000
[
30003001
"1",
@@ -3005,7 +3006,8 @@
30053006
"f",
30063007
"6",
30073008
"3",
3008-
"7"
3009+
"7",
3010+
1
30093011
],
30103012
[
30113013
"1",
@@ -3016,7 +3018,8 @@
30163018
"a",
30173019
"3",
30183020
"2",
3019-
"e"
3021+
"e",
3022+
-1
30203023
],
30213024
[
30223025
"1",
@@ -3027,7 +3030,8 @@
30273030
"a",
30283031
"3",
30293032
"2",
3030-
"e"
3033+
"e",
3034+
1
30313035
]
30323036
]
30333037
}
@@ -3140,7 +3144,7 @@
31403144
"query_id": "14200",
31413145
"wait": 5,
31423146
"terminate": "manual",
3143-
"query": "with t as (select * from test14_stream1_5 as a join table(test14_stream2_5) as b on a.j = b.j2)select i, j, k, i2, j2, k2, i3, j3, k3 from t left join table(test14_stream3_5) as k on t.j = k.j3;"
3147+
"query": "with t as (select * from test14_stream1_5 as a join table(test14_stream2_5) as b on a.j = b.j2)select i, j, k, i2, j2, k2, i3, j3, k3, _tp_delta from t left join table(test14_stream3_5) as k on t.j = k.j3;"
31443148
},
31453149
{
31463150
"client": "python",
@@ -3185,7 +3189,8 @@
31853189
"a",
31863190
"3",
31873191
"2",
3188-
"e"
3192+
"e",
3193+
1
31893194
],
31903195
[
31913196
"1",
@@ -3196,7 +3201,8 @@
31963201
"f",
31973202
"6",
31983203
"3",
3199-
"7"
3204+
"7",
3205+
1
32003206
],
32013207
[
32023208
"1",
@@ -3207,7 +3213,8 @@
32073213
"a",
32083214
"3",
32093215
"2",
3210-
"e"
3216+
"e",
3217+
-1
32113218
],
32123219
[
32133220
"1",
@@ -3218,7 +3225,8 @@
32183225
"a",
32193226
"3",
32203227
"2",
3221-
"e"
3228+
"e",
3229+
1
32223230
],
32233231
[
32243232
"1",
@@ -3229,7 +3237,8 @@
32293237
"b",
32303238
"0",
32313239
"0",
3232-
""
3240+
"",
3241+
1
32333242
]
32343243
]
32353244
}

tests/stream/test_stream_smoke/0018_query_state8_join.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@
209209
{"client":"python", "query_type": "table", "query":"drop stream if exists test19_right_stream1"},
210210
{"client":"python", "query_type": "table", "exist":"test19_left_stream1", "exist_wait":2, "wait":1, "query":"create stream if not exists test19_left_stream1 (i int, s string, ts datetime) primary key i settings mode='versioned_kv'"},
211211
{"client":"python", "query_type": "table", "exist":"test19_right_stream1", "exist_wait":2, "wait":1, "query":"create stream if not exists test19_right_stream1 (ii int, ss string, tts datetime) primary key ii settings mode='versioned_kv'"},
212-
{"client":"python", "query_type": "stream", "query_id":"19207", "wait":1, "terminate":"manual", "query":"subscribe to select s, ss from test19_left_stream1 inner join test19_right_stream1 on i=ii settings checkpoint_interval=1"},
212+
{"client":"python", "query_type": "stream", "query_id":"19207", "wait":1, "terminate":"manual", "query":"subscribe to select s, ss, _tp_delta from test19_left_stream1 inner join test19_right_stream1 on i=ii settings checkpoint_interval=1"},
213213
{"client":"python", "query_type": "table", "depends_on":"19207", "wait":1, "query": "insert into test19_right_stream1(ii, ss, tts) values (1, 's1', '2022-05-23 15:45:10')"},
214214
{"client":"python", "query_type": "table", "wait":1, "query": "insert into test19_right_stream1(ii, ss, tts) values (1, 's2', '2022-05-23 15:45:11')"},
215215
{"client":"python", "query_type": "table", "wait":1, "query": "insert into test19_left_stream1(i, s, ts) values (1, 's', '2022-05-23 15:45:10')"},
@@ -222,7 +222,7 @@
222222
{
223223
"query_id":"19207",
224224
"expected_results":[
225-
["s", "s2"], ["s", "s2"], ["ss", "s2"]
225+
["s", "s2", 1], ["s", "s2", -1], ["ss", "s2", 1]
226226
]
227227
}
228228
]

tests/stream/test_stream_smoke/0018_query_state9_parallel_hash_join.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@
209209
{"client":"python", "query_type": "table", "query":"drop stream if exists test19_right_stream2"},
210210
{"client":"python", "query_type": "table", "exist":"test19_left_stream2", "exist_wait":2, "wait":1, "query":"create stream if not exists test19_left_stream2 (i int, s string, ts datetime) primary key i settings mode='versioned_kv'"},
211211
{"client":"python", "query_type": "table", "exist":"test19_right_stream2", "exist_wait":2, "wait":1, "query":"create stream if not exists test19_right_stream2 (ii int, ss string, tts datetime) primary key ii settings mode='versioned_kv'"},
212-
{"client":"python", "query_type": "stream", "depends_on_stream":"test19_right_stream2", "query_id":"19227", "wait":1, "terminate":"manual", "query":"subscribe to select s, ss from test19_left_stream2 inner join test19_right_stream2 on i=ii settings join_algorithm = 'parallel_hash', checkpoint_interval=1"},
212+
{"client":"python", "query_type": "stream", "depends_on_stream":"test19_right_stream2", "query_id":"19227", "wait":1, "terminate":"manual", "query":"subscribe to select s, ss, _tp_delta from test19_left_stream2 inner join test19_right_stream2 on i=ii settings join_algorithm = 'parallel_hash', checkpoint_interval=1"},
213213
{"client":"python", "query_type": "table", "depends_on":"19227", "wait":1, "query": "insert into test19_right_stream2(ii, ss, tts) values (1, 's1', '2022-05-23 15:45:10')"},
214214
{"client":"python", "query_type": "table", "query": "insert into test19_right_stream2(ii, ss, tts) values (1, 's2', '2022-05-23 15:45:11')"},
215215
{"client":"python", "query_type": "table", "wait":1, "query": "insert into test19_left_stream2(i, s, ts) values (1, 's', '2022-05-23 15:45:10')"},
@@ -222,7 +222,7 @@
222222
{
223223
"query_id":"19227",
224224
"expected_results":[
225-
["s", "s2"], ["s", "s2"], ["ss", "s2"]
225+
["s", "s2", 1], ["s", "s2", -1], ["ss", "s2", 1]
226226
]
227227
}
228228
]

0 commit comments

Comments
 (0)