Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#bugfix add numServersQueried and numServersResponded to MSQE response #14806

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
Expand Down Expand Up @@ -199,8 +201,15 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}

DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
Set<String> tableNames = queryPlanResult.getTableNames();

Set<QueryServerInstance> servers = dispatchableSubPlan.getQueryStageList().stream()
.map(DispatchablePlanFragment::getServerInstances)
.reduce(new HashSet<QueryServerInstance>(), (all, some) -> {
all.addAll(some);
return all;
});
Copy link
Contributor

@gortiz gortiz Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is correct, but unnecessarily complex to read.

What about something like:

dispatchableSubPlan.getQueryStageList().stream()
        .map(DispatchablePlanFragment::getServerInstances)
        .reduce(new HashSet<>(), Sets::union)
        .size();

Other options (maybe a bit more expensive, but easier to read):

dispatchableSubPlan.getQueryStageList().stream()
        .flatMap(fragment -> fragment.getServerInstances().stream())
        .distinct()
        .count();

or

dispatchableSubPlan.getQueryStageList().stream()
        .flatMap(fragment -> fragment.getServerInstances().stream())
        .collect(Collectors.toSet())
        .size();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of functional API (don't really know what's happening behind the scene, and higher overhead), so I'd suggest just using the old fashioned way of creating a set and calling addAll()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I don't find this coding pattern readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any of the 3 versions I suggested is easy to read (probably more than a for loop)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna betray Gonzalo here 😇

I'm more into using a conventional loop with addAll, too. Using streams here feels a bit overkill for what the code intends to do. Going through the whole array anyway, so I don't see the point of relying into streams.

Actually, that was my first draft and changed it afterwards trying to show off. It obviously didn't go as intended, so conventional loop it is: de63ddeead


Set<String> tableNames = queryPlanResult.getTableNames();
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_STAGE_QUERIES_GLOBAL, 1);
for (String tableName : tableNames) {
_brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.MULTI_STAGE_QUERIES, 1);
Expand Down Expand Up @@ -277,8 +286,12 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.setResultTable(queryResults.getResultTable());
brokerResponse.setTablesQueried(tableNames);
// TODO: Add servers queried/responded stats
brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs());
// MSE cannot finish if a single queried server did not respond, so we can use the same count for
// both the queried and responded stats. Minus one prevents the broker to be included in the count
// (it will always be included because of the root of the query plan)
brokerResponse.setNumServersQueried(servers.size() - 1);
brokerResponse.setNumServersResponded(servers.size() - 1);
Copy link
Contributor

@siddharthteotia siddharthteotia Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the happy path ?

If something broke, the count will be different and we still want the correct metric to be emitted. So I am not sure why are we explicitly setting them to be same. They should be set agnostic of whether happy or alternate path was taken

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't know how many servers respond. This is the best we can do without actually changing the protocol.

The reason why is that the broker sends stages to the servers it chooses, and then a subset of these servers returns the actual data. Ideally, we could include involved servers in the stats, but the stats do not support set or list types, so we cannot send the different involved segments.

TL;DR: we cannot know how many servers responded in case of an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I think there is an opportunity to make this significantly better. I am sure we will be doing more complex observability for MSE. So, the sooner we fix the debt, the better.


// Attach unavailable segments
int numUnavailableSegments = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,16 @@ public void testConcurrentQueries() {
executorService.shutdownNow();
}

@Test
public void testNumServersQueried() throws Exception {
String query = "select * from mytable limit 10";
JsonNode jsonNode = postQuery(query);
JsonNode numServersQueried = jsonNode.get("numServersQueried");
assertNotNull(numServersQueried);
assertTrue(numServersQueried.isInt());
assertTrue(numServersQueried.asInt() > 0);
}

private void checkQueryResultForDBTest(String column, String tableName)
throws Exception {
checkQueryResultForDBTest(column, tableName, null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.routing.QueryServerInstance;
Expand Down Expand Up @@ -112,4 +113,8 @@ public void setServerInstanceToWorkerIdMap(Map<QueryServerInstance, List<Integer
_serverInstanceToWorkerIdMap.clear();
_serverInstanceToWorkerIdMap.putAll(serverInstanceToWorkerIdMap);
}

public Set<QueryServerInstance> getServerInstances() {
return _serverInstanceToWorkerIdMap.keySet();
}
}