Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#bugfix add numServersQueried and numServersResponded to MSQE response #14806

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

albertobastos
Copy link
Contributor

@albertobastos albertobastos commented Jan 14, 2025

closes #14624

@Jackie-Jiang Jackie-Jiang added observability multi-stage Related to the multi-stage query engine labels Jan 14, 2025
@codecov-commenter
Copy link

codecov-commenter commented Jan 14, 2025

Codecov Report

Attention: Patch coverage is 0% with 8 lines in your changes missing coverage. Please review.

Project coverage is 63.77%. Comparing base (59551e4) to head (de63dde).
Report is 1592 commits behind head on master.

Files with missing lines Patch % Lines
...requesthandler/MultiStageBrokerRequestHandler.java 0.00% 7 Missing ⚠️
...ery/planner/physical/DispatchablePlanFragment.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14806      +/-   ##
============================================
+ Coverage     61.75%   63.77%   +2.01%     
- Complexity      207     1612    +1405     
============================================
  Files          2436     2708     +272     
  Lines        133233   151232   +17999     
  Branches      20636    23346    +2710     
============================================
+ Hits          82274    96443   +14169     
- Misses        44911    47553    +2642     
- Partials       6048     7236    +1188     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.72% <0.00%> (+2.01%) ⬆️
java-21 63.65% <0.00%> (+2.03%) ⬆️
skip-bytebuffers-false 63.76% <0.00%> (+2.01%) ⬆️
skip-bytebuffers-true 63.61% <0.00%> (+35.88%) ⬆️
temurin 63.77% <0.00%> (+2.01%) ⬆️
unittests 63.76% <0.00%> (+2.02%) ⬆️
unittests1 56.34% <0.00%> (+9.45%) ⬆️
unittests2 34.05% <0.00%> (+6.32%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines 207 to 210
.reduce(new HashSet<QueryServerInstance>(), (all, some) -> {
all.addAll(some);
return all;
});
Copy link
Contributor

@gortiz gortiz Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is correct, but unnecessarily complex to read.

What about something like:

dispatchableSubPlan.getQueryStageList().stream()
        .map(DispatchablePlanFragment::getServerInstances)
        .reduce(new HashSet<>(), Sets::union)
        .size();

Other options (maybe a bit more expensive, but easier to read):

dispatchableSubPlan.getQueryStageList().stream()
        .flatMap(fragment -> fragment.getServerInstances().stream())
        .distinct()
        .count();

or

dispatchableSubPlan.getQueryStageList().stream()
        .flatMap(fragment -> fragment.getServerInstances().stream())
        .collect(Collectors.toSet())
        .size();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of functional API (don't really know what's happening behind the scene, and higher overhead), so I'd suggest just using the old fashioned way of creating a set and calling addAll()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I don't find this coding pattern readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any of the 3 versions I suggested is easy to read (probably more than a for loop)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna betray Gonzalo here 😇

I'm more into using a conventional loop with addAll, too. Using streams here feels a bit overkill for what the code intends to do. Going through the whole array anyway, so I don't see the point of relying into streams.

Actually, that was my first draft and changed it afterwards trying to show off. It obviously didn't go as intended, so conventional loop it is: de63ddeead

// both the queried and responded stats. Minus one prevents the broker to be included in the count
// (it will always be included because of the root of the query plan)
brokerResponse.setNumServersQueried(servers.size() - 1);
brokerResponse.setNumServersResponded(servers.size() - 1);
Copy link
Contributor

@siddharthteotia siddharthteotia Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the happy path ?

If something broke, the count will be different and we still want the correct metric to be emitted. So I am not sure why are we explicitly setting them to be same. They should be set agnostic of whether happy or alternate path was taken

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't know how many servers respond. This is the best we can do without actually changing the protocol.

The reason why is that the broker sends stages to the servers it chooses, and then a subset of these servers returns the actual data. Ideally, we could include involved servers in the stats, but the stats do not support set or list types, so we cannot send the different involved segments.

TL;DR: we cannot know how many servers responded in case of an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I think there is an opportunity to make this significantly better. I am sure we will be doing more complex observability for MSE. So, the sooner we fix the debt, the better.

@siddharthteotia
Copy link
Contributor

siddharthteotia commented Jan 16, 2025

I think a better approach would be to rethink how we are propagating the execution state during the course of query execution. This would be beneficial for debugging and in fact live analysis too.

Imagine a a JOIN + aggregation query that runs for 10secs. As the data is flowing upstream, a tool can be built to look into the live-progress of the query. Built on top of vanilla explain plan, this tool will show the multi-stage plan, which executors are running which stage and the progress of the stage at the operator level.

At any point in time (with some lag), we should be able to drill down into whether a server is waiting on input, if received then how many records received from downstream, how many records produced for upstream so on and so forth.

This will likely require change to the protocol but will require changes to how the moving state is measured and communicated.

I don't want to be anal about it but in general, there is no point in emitting a metric which we know may not be always correct to begin with.

@gortiz
Copy link
Contributor

gortiz commented Jan 16, 2025

@siddharthteotia, although it sounds good, I don't have a clear picture of what your suggestion would look like. What seems clear is that significant modifications to the current system would be required. This feature deserves a design document, which we can use to discuss how to implement it and the advantages it could provide.

I don't want to be anal about it but in general, there is no point in emitting a metric which we know may not be always correct to begin with.

That is never going to be the case, right? We are going to emit the metric if and only if we know that is the correct value. Specifically, whenever a query succeeds, the number of servers that intervened is known and the value matches Alberto's code. When there is an error, brokerResponse.setNumServersResponded(servers.size() - 1) won't be called, so we are not going to send that value.

As said, AFAIK, right now, it is impossible to know how many servers intervened on it when a query fails, so this approach is a good (and partial) short-term solution. We are never going to send an incorrect metric, but in some cases where we don't know the correct value, the metric won't be included. That is an improvement over the current state where we never set the metric.

@siddharthteotia
Copy link
Contributor

We are never going to send an incorrect metric, but in some cases where we don't know the correct value, the metric won't be included. That is an improvement over the current state where we never set the metric.

Perfect. Thank you

although it sounds good, I don't have a clear picture of what your suggestion would look like. What seems clear is that significant modifications to the current system would be required. This feature deserves a design document, which we can use to discuss how to implement it and the advantages it could provide.

I agree. It will be quite a bit of surgery. If we think, this may be valuable in the long run, then we can create an issue for now I guess and then evaluate it via design / PEP if / when it is picked up. It could be quite useful from live query observability standpoint but I have not yet thought about the possible overheads.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multi-stage Related to the multi-stage query engine observability
Projects
None yet
Development

Successfully merging this pull request may close these issues.

numServersQueried, responded is not set in BrokerResponse for MSQE
5 participants