111111#include < Processors/QueryPlan/Streaming/WatermarkStep.h>
112112#include < Processors/QueryPlan/Streaming/WatermarkStepWithSubstream.h>
113113#include < Processors/QueryPlan/Streaming/WindowStep.h>
114+ #include < Processors/Transforms/Streaming/AggregatingHelper.h>
114115#include < Processors/Transforms/Streaming/WatermarkStamper.h>
115116#include < Storages/Streaming/ProxyStream.h>
116117#include < Storages/Streaming/StorageStream.h>
@@ -3219,7 +3220,7 @@ void InterpreterSelectQuery::executeStreamingAggregation(
32193220 ErrorCodes::NOT_IMPLEMENTED,
32203221 " User defined aggregation function with emit strategy shouldn't be used together with other aggregation function" );
32213222
3222- if (windowType () != Streaming::WindowType::NONE )
3223+ if (windowType () != Streaming::WindowType::None )
32233224 throw Exception (
32243225 ErrorCodes::NOT_IMPLEMENTED,
32253226 " User defined aggregation function with emit strategy shouldn't be used together with streaming window" );
@@ -3240,6 +3241,9 @@ void InterpreterSelectQuery::executeStreamingAggregation(
32403241 auto tracking_updates_type = Streaming::TrackingUpdatesType::None;
32413242 if (data_stream_semantic_pair.isChangelogOutput ())
32423243 tracking_updates_type = Streaming::TrackingUpdatesType::UpdatesWithRetract;
3244+ // / TODO: A optimization for `emit on update`, we don't need to track updates and just directly convert each input (fast in and fast out)
3245+ else if (Streaming::AggregatingHelper::onlyEmitUpdates (emit_mode))
3246+ tracking_updates_type = Streaming::TrackingUpdatesType::Updates;
32433247
32443248 Streaming::Aggregator::Params params (
32453249 header_before_aggregation,
@@ -3278,10 +3282,10 @@ void InterpreterSelectQuery::executeStreamingAggregation(
32783282 // / 2) `shuffle by`: calculating light substream without substream ID (The data have been shuffled by `LightShufflingTransform`)
32793283 if (query_info.hasPartitionByKeys () || light_shuffled)
32803284 query_plan.addStep (std::make_unique<Streaming::AggregatingStepWithSubstream>(
3281- query_plan.getCurrentDataStream (), std::move (params), final , emit_version, data_stream_semantic_pair.isChangelogOutput ()));
3285+ query_plan.getCurrentDataStream (), std::move (params), final , emit_version, data_stream_semantic_pair.isChangelogOutput (), emit_mode ));
32823286 else
32833287 query_plan.addStep (std::make_unique<Streaming::AggregatingStep>(
3284- query_plan.getCurrentDataStream (), std::move (params), final , merge_threads, temporary_data_merge_threads, emit_version, data_stream_semantic_pair.isChangelogOutput ()));
3288+ query_plan.getCurrentDataStream (), std::move (params), final , merge_threads, temporary_data_merge_threads, emit_version, data_stream_semantic_pair.isChangelogOutput (), emit_mode ));
32853289}
32863290
32873291// / Resolve input / output data stream semantic.
@@ -3370,7 +3374,7 @@ Streaming::WindowType InterpreterSelectQuery::windowType() const
33703374 }
33713375 }
33723376
3373- return Streaming::WindowType::NONE ;
3377+ return Streaming::WindowType::None ;
33743378}
33753379
33763380bool InterpreterSelectQuery::hasStreamingGlobalAggregation () const
@@ -3498,12 +3502,14 @@ void InterpreterSelectQuery::buildShufflingQueryPlan(QueryPlan & query_plan)
34983502 query_plan.getCurrentDataStream (), std::move (substream_key_positions), shuffle_output_streams));
34993503}
35003504
3501- void InterpreterSelectQuery::buildWatermarkQueryPlan (QueryPlan & query_plan) const
3505+ void InterpreterSelectQuery::buildWatermarkQueryPlan (QueryPlan & query_plan)
35023506{
35033507 assert (isStreamingQuery ());
35043508 auto params = std::make_shared<Streaming::WatermarkStamperParams>(
35053509 query_info.query , query_info.syntax_analyzer_result , query_info.streaming_window_params );
35063510
3511+ emit_mode = params->mode ; // / saved it to be used for streaming aggregating step
3512+
35073513 bool skip_stamping_for_backfill_data = !context->getSettingsRef ().emit_during_backfill .value ;
35083514
35093515 if (query_info.hasPartitionByKeys ())
0 commit comments