Skip to content
Closed

test #57471

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions be/src/runtime_filter/runtime_filter_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ class RuntimeFilterMerger : public RuntimeFilter {

bool ready() const { return _rf_state == State::READY; }

void set_wrapper_state_and_ready_to_apply(RuntimeFilterWrapper::State state,
std::string reason = "") {
std::unique_lock<std::recursive_mutex> l(_rmtx);
if (_rf_state == State::READY) {
return;
}
_wrapper->set_state(state, reason);
_rf_state = State::READY;
}

private:
RuntimeFilterMerger(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc)
: RuntimeFilter(desc), _rf_state(State::WAITING_FOR_PRODUCT) {}
Expand Down
15 changes: 13 additions & 2 deletions be/src/runtime_filter/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
int execution_timeout) {
DCHECK_GT(cnt_val.targetv2_info.size(), 0);

if (cnt_val.done) {
return Status::InternalError("Runtime filter has been sent",
cnt_val.merger->debug_string());
}
cnt_val.done = true;

butil::IOBuf request_attachment;

PPublishFilterRequestV2 apply_request;
Expand Down Expand Up @@ -424,15 +430,20 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
closure->response_.get(), closure.get());
closure.release();
}

cnt_val.done = true;
return st;
}

void RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* query_ctx) {
std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
for (auto& [filter_id, ctx] : _filter_map) {
if (!ctx.done && !ctx.targetv2_info.empty()) {
{
std::lock_guard<std::mutex> l(ctx.mtx);
ctx.merger->set_wrapper_state_and_ready_to_apply(
RuntimeFilterWrapper::State::DISABLED,
"rf coordinator's query context released before runtime filter is ready to "
"apply");
}
auto st = _send_rf_to_target(ctx, std::weak_ptr<QueryContext> {}, 0,
UniqueId(query_ctx->query_id()).to_proto(),
query_ctx->execution_timeout());
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime_filter/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct GlobalMergeContext {
std::vector<TRuntimeFilterTargetParamsV2> targetv2_info;
std::unordered_set<UniqueId> arrive_id;
std::vector<PNetworkAddress> source_addrs;
bool done = false;
std::atomic<bool> done = false;
};

// owned by RuntimeState
Expand Down
Loading