-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#bugfix add numServersQueried and numServersResponded to MSQE response #14806
base: master
Are you sure you want to change the base?
Changes from 6 commits
ed7fca7
e8b6233
1a84ec2
0c3d03c
99de6c9
6055448
de63dde
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import com.fasterxml.jackson.databind.node.JsonNodeFactory; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
@@ -64,6 +65,7 @@ | |
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; | ||
import org.apache.pinot.query.planner.physical.DispatchableSubPlan; | ||
import org.apache.pinot.query.planner.plannode.PlanNode; | ||
import org.apache.pinot.query.routing.QueryServerInstance; | ||
import org.apache.pinot.query.routing.WorkerManager; | ||
import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder; | ||
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; | ||
|
@@ -199,8 +201,15 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO | |
} | ||
|
||
DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan(); | ||
Set<String> tableNames = queryPlanResult.getTableNames(); | ||
|
||
Set<QueryServerInstance> servers = dispatchableSubPlan.getQueryStageList().stream() | ||
.map(DispatchablePlanFragment::getServerInstances) | ||
.reduce(new HashSet<QueryServerInstance>(), (all, some) -> { | ||
all.addAll(some); | ||
return all; | ||
}); | ||
|
||
Set<String> tableNames = queryPlanResult.getTableNames(); | ||
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_STAGE_QUERIES_GLOBAL, 1); | ||
for (String tableName : tableNames) { | ||
_brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.MULTI_STAGE_QUERIES, 1); | ||
|
@@ -277,8 +286,12 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO | |
BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); | ||
brokerResponse.setResultTable(queryResults.getResultTable()); | ||
brokerResponse.setTablesQueried(tableNames); | ||
// TODO: Add servers queried/responded stats | ||
brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs()); | ||
// MSE cannot finish if a single queried server did not respond, so we can use the same count for | ||
// both the queried and responded stats. Minus one prevents the broker to be included in the count | ||
// (it will always be included because of the root of the query plan) | ||
brokerResponse.setNumServersQueried(servers.size() - 1); | ||
brokerResponse.setNumServersResponded(servers.size() - 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the happy path ? If something broke, the count will be different and we still want the correct metric to be emitted. So I am not sure why are we explicitly setting them to be same. They should be set agnostic of whether happy or alternate path was taken There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't know how many servers respond. This is the best we can do without actually changing the protocol. The reason why is that the broker sends stages to the servers it chooses, and then a subset of these servers returns the actual data. Ideally, we could include involved servers in the stats, but the stats do not support set or list types, so we cannot send the different involved segments. TL;DR: we cannot know how many servers responded in case of an error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. I think there is an opportunity to make this significantly better. I am sure we will be doing more complex observability for MSE. So, the sooner we fix the debt, the better. |
||
|
||
// Attach unavailable segments | ||
int numUnavailableSegments = 0; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This is correct, but unnecessarily complex to read.
What about something like:
Other options (maybe a bit more expensive, but easier to read):
or
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a big fan of functional API (don't really know what's happening behind the scene, and higher overhead), so I'd suggest just using the old fashioned way of creating a set and calling
addAll()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I don't find this coding pattern readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think any of the 3 versions I suggested is easy to read (probably more than a for loop)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm gonna betray Gonzalo here 😇
I'm more into using a conventional loop with addAll, too. Using streams here feels a bit overkill for what the code intends to do. Going through the whole array anyway, so I don't see the point of relying into streams.
Actually, that was my first draft and changed it afterwards trying to show off. It obviously didn't go as intended, so conventional loop it is: de63ddeead