-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#bugfix add numServersQueried and numServersResponded to MSQE response #14806
base: master
Are you sure you want to change the base?
#bugfix add numServersQueried and numServersResponded to MSQE response #14806
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14806 +/- ##
============================================
+ Coverage 61.75% 63.77% +2.01%
- Complexity 207 1612 +1405
============================================
Files 2436 2708 +272
Lines 133233 151232 +17999
Branches 20636 23346 +2710
============================================
+ Hits 82274 96443 +14169
- Misses 44911 47553 +2642
- Partials 6048 7236 +1188
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
.reduce(new HashSet<QueryServerInstance>(), (all, some) -> { | ||
all.addAll(some); | ||
return all; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This is correct, but unnecessarily complex to read.
What about something like:
dispatchableSubPlan.getQueryStageList().stream()
.map(DispatchablePlanFragment::getServerInstances)
.reduce(new HashSet<>(), Sets::union)
.size();
Other options (maybe a bit more expensive, but easier to read):
dispatchableSubPlan.getQueryStageList().stream()
.flatMap(fragment -> fragment.getServerInstances().stream())
.distinct()
.count();
or
dispatchableSubPlan.getQueryStageList().stream()
.flatMap(fragment -> fragment.getServerInstances().stream())
.collect(Collectors.toSet())
.size();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a big fan of functional API (don't really know what's happening behind the scene, and higher overhead), so I'd suggest just using the old fashioned way of creating a set and calling addAll()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I don't find this coding pattern readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think any of the 3 versions I suggested is easy to read (probably more than a for loop)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm gonna betray Gonzalo here 😇
I'm more into using a conventional loop with addAll, too. Using streams here feels a bit overkill for what the code intends to do. Going through the whole array anyway, so I don't see the point of relying into streams.
Actually, that was my first draft and changed it afterwards trying to show off. It obviously didn't go as intended, so conventional loop it is: de63ddeead
// both the queried and responded stats. Minus one prevents the broker to be included in the count | ||
// (it will always be included because of the root of the query plan) | ||
brokerResponse.setNumServersQueried(servers.size() - 1); | ||
brokerResponse.setNumServersResponded(servers.size() - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the happy path ?
If something broke, the count will be different and we still want the correct metric to be emitted. So I am not sure why are we explicitly setting them to be same. They should be set agnostic of whether happy or alternate path was taken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't know how many servers respond. This is the best we can do without actually changing the protocol.
The reason why is that the broker sends stages to the servers it chooses, and then a subset of these servers returns the actual data. Ideally, we could include involved servers in the stats, but the stats do not support set or list types, so we cannot send the different involved segments.
TL;DR: we cannot know how many servers responded in case of an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I think there is an opportunity to make this significantly better. I am sure we will be doing more complex observability for MSE. So, the sooner we fix the debt, the better.
I think a better approach would be to rethink how we are propagating the execution state during the course of query execution. This would be beneficial for debugging and in fact live analysis too. Imagine a a JOIN + aggregation query that runs for 10secs. As the data is flowing upstream, a tool can be built to look into the live-progress of the query. Built on top of vanilla explain plan, this tool will show the multi-stage plan, which executors are running which stage and the progress of the stage at the operator level. At any point in time (with some lag), we should be able to drill down into whether a server is waiting on input, if received then how many records received from downstream, how many records produced for upstream so on and so forth. This will likely require change to the protocol but will require changes to how the moving state is measured and communicated. I don't want to be anal about it but in general, there is no point in emitting a metric which we know may not be always correct to begin with. |
@siddharthteotia, although it sounds good, I don't have a clear picture of what your suggestion would look like. What seems clear is that significant modifications to the current system would be required. This feature deserves a design document, which we can use to discuss how to implement it and the advantages it could provide.
That is never going to be the case, right? We are going to emit the metric if and only if we know that is the correct value. Specifically, whenever a query succeeds, the number of servers that intervened is known and the value matches Alberto's code. When there is an error, As said, AFAIK, right now, it is impossible to know how many servers intervened on it when a query fails, so this approach is a good (and partial) short-term solution. We are never going to send an incorrect metric, but in some cases where we don't know the correct value, the metric won't be included. That is an improvement over the current state where we never set the metric. |
Perfect. Thank you
I agree. It will be quite a bit of surgery. If we think, this may be valuable in the long run, then we can create an issue for now I guess and then evaluate it via design / PEP if / when it is picked up. It could be quite useful from live query observability standpoint but I have not yet thought about the possible overheads. |
closes #14624