Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
[SQL-DS-CACHE-124] [POAE7-1133] Fix failed query in TPCDS (#125)
Browse files Browse the repository at this point in the history
* fix when aggPushdown set to false, ape still pushdown agg issue.

* fix q54

* fix issues: 1) SparkOptimizer will add a UnscaledValue node for some aggregate case(sum/average), it may return a double type rather than long/decimal, so add a dump work around. 2) fix count node null issue

* remove logs

* remove log
  • Loading branch information
jikunshang authored Jun 4, 2021
1 parent b56be36 commit 5fc7fc8
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,13 @@ private static JsonNode constructTree(Expression expr, JsonNode rootNode) {
((ObjectNode) tmpNode).put("dataType", tmpExpr.dataType().toString());
((ObjectNode) tmpNode).put("value", tmpExpr.value().toString());
return tmpNode;
} else if (expr instanceof UnscaledValue) {
// TODO: cast to Long?
return constructTree(exprs.get(0), tmpNode);

} else {
//TODO: will include other type?
throw new UnsupportedOperationException("should not reach here.");
throw new UnsupportedOperationException("should not reach here. Expr: " + expr.toString());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ object AggUtils {
case filter: Filter =>
filter.child match {
case l@LogicalRelation (fsRelation: HadoopFsRelation, _, _, _) =>
if (!fsRelation.resultExpr.isEmpty) {

if (!fsRelation.resultExpr.isEmpty && fsRelation.resultExpr.get.size != 0) {

// judge whether all filter could pushdown here, if not, we will NOT
// do agg push down(don't ignore partial agg)
val filterExpressions = splitConjunctivePredicates(filter.condition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ object DataSourceStrategy {
*/
protected[sql] def translateAggregate(groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[AggregateExpression]): String = {
if (groupingExpressions.size > 0 && aggregateExpressions.size == 0) return ""
AggregateConvertor.toJsonString(
scala.collection.JavaConverters.seqAsJavaList(groupingExpressions),
scala.collection.JavaConverters.seqAsJavaList(aggregateExpressions))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,15 @@ object FileSourceStrategy extends Strategy with Logging {
// but will keep agg info in fsRelation. Ideally, following filter/ projection
// will apply FileSourceStrategy very soon.
if (SparkSession.getActiveSession.get.conf.get(APE_AGGREGATION_PUSHDOWN_ENABLED, false)) {
fsRelation.groupExpr = Some(groupingExpr)
fsRelation.resultExpr = Some(aggExpr
.map(expr => expr.asInstanceOf[AggregateExpression]))

var withoutDistict = true
aggExpr.map(expr => if (expr.isDistinct) withoutDistict = false)

if (withoutDistict) {
fsRelation.groupExpr = Some(groupingExpr)
fsRelation.resultExpr = Some(aggExpr)
}

}
Seq()

Expand Down Expand Up @@ -241,16 +247,23 @@ object FileSourceStrategy extends Strategy with Logging {
val partialResultExpressions =
groupingAttributes ++
partialAggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
val aggregateExpression =

val aggregateExpression = {
if (afterScanFilters.isEmpty) DataSourceStrategy.translateAggregate(gExpression, aggExpressions)
else ""
}
logInfo("agg pd info: " + gExpression.mkString +
" " + aggExpressions.mkString)

// set null to avoid influence later node/plan.
fsRelation.groupExpr = None
fsRelation.resultExpr = None
val onlyGroup = (aggExpressions.size == 0) && (gExpression.size > 0)

val outAttributes: Seq[Attribute] =
if (!partialResultExpressions.isEmpty && afterScanFilters.isEmpty) partialResultExpressions
else outputAttributes
if (!onlyGroup && !partialResultExpressions.isEmpty && afterScanFilters.isEmpty) {
partialResultExpressions
} else outputAttributes

val schema = outAttributes.toStructType
logInfo(s"Output Data Schema after agg pd: ${schema.simpleString}")
Expand Down
8 changes: 4 additions & 4 deletions oap-ape/ape-native/src/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr
int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr);
ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter;

rowsRet = doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr);
int tmp = doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr);
// if the last batch are empty after filter, it will return 0 regard less of the
// group num
if (tmp != 0) rowsRet = tmp;
}

if (aggExprs.size()) {
Expand Down Expand Up @@ -370,9 +373,6 @@ int Reader::doAggregation(int batchSize, ApeHashMap& map, std::vector<Key>& keys
std::dynamic_pointer_cast<RootAggExpression>(agg)->getResult(
results[i], keys.size(), indexes);
} else {
if (results[i].nullVector->size() == 0) {
results[i].nullVector->resize(1);
}
std::dynamic_pointer_cast<RootAggExpression>(agg)->getResult(results[i]);
}
} else {
Expand Down
9 changes: 5 additions & 4 deletions oap-ape/ape-native/src/utils/AggExpression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ int AggExpression::ExecuteWithParam(int batchSize,

void Count::getResultInternal(DecimalVector& result) {
result.type = ResultType::LongType;
if (result.nullVector->size() == 0) {
result.nullVector->resize(1);
result.nullVector->at(0) = 1;
result.nullVector->resize(1);
result.nullVector->at(0) = 1;
if (result.data.size() == 0) {
result.data.push_back(arrow::BasicDecimal128(0));
}
if (typeid(*child) == typeid(LiteralExpression)) { // for count(*) or count(1)
result.data[0] += arrow::BasicDecimal128(batchSize_);
Expand All @@ -84,7 +85,7 @@ void Count::getResultInternal(DecimalVector& result) {
auto tmp = DecimalVector();
child->getResult(tmp);
for (int i = 0; i < tmp.data.size(); i++) {
if (tmp.nullVector->at(i)) result.data[0] += 1;
if (tmp.nullVector->at(i)) result.data[0] += arrow::BasicDecimal128(1);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions oap-ape/ape-native/src/utils/ApeDecimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <arrow/type.h>
#include <arrow/util/basic_decimal.h>
#include <arrow/util/decimal.h>
#include <cstdint>

namespace ape {
Expand Down
5 changes: 4 additions & 1 deletion oap-ape/ape-native/src/utils/DumpUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ class DumpUtils {
break;
}
case ResultType::DoubleType: {
// TODO: convert
// TODO: this is just for UnscaledValue case, if the data type is Double, this
// will not work
arrow::Decimal128 tmp(result.data[i]);
*((double*)bufferAddr + i) = tmp.ToDouble(0);
break;
}
case ResultType::Decimal64Type: {
Expand Down

0 comments on commit 5fc7fc8

Please sign in to comment.