Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
[SQL-DS-CACHE-124] [POAE7-1133] Fix q44 case (#151)
Browse files Browse the repository at this point in the history
* when agg result is more than default batch size, will return via multiple times.

* remove logs
  • Loading branch information
jikunshang authored Jun 22, 2021
1 parent f5769dc commit 6123c15
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 48 deletions.
93 changes: 61 additions & 32 deletions oap-ape/ape-native/src/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr
initRowGroupReaders();

// this reader have read all rows
if (totalRowsRead >= totalRows) {
if (totalRowsRead >= totalRows && dumpAggCursor == 0) {
return -1;
}
checkEndOfRowGroup();
Expand All @@ -184,34 +184,62 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr
ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead;
rowsRet = doFilter(rowsToRead, buffersPtr, nullsPtr);
} else {
results.resize(aggExprs.size());
for (int i = 0; i < aggExprs.size(); i++) {
std::vector<uint8_t> nullVector(1);
results[i].nullVector = std::make_shared<std::vector<uint8_t>>(nullVector);
}
while (totalRowsRead < totalRows && !checkEndOfRowGroup() &&
// TODO: refactor. A quick work around to avoid group num exceed batch size.
map.size() < (batchSize / 4)) {
int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr);
totalRowsRead += rowsToRead;
ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead;

int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr);
ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter;

int tmp = doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr);
// if the last batch are empty after filter, it will return 0 regard less of the
// group num
if (tmp != 0) rowsRet = tmp;
}
if (dumpAggCursor == 0) { // will read a whole RowGroup and do agg
results.resize(aggExprs.size());
for (int i = 0; i < aggExprs.size(); i++) {
std::vector<uint8_t> nullVector(1);
results[i].nullVector = std::make_shared<std::vector<uint8_t>>(nullVector);
}
while (totalRowsRead < totalRows && !checkEndOfRowGroup()) {
int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr);
totalRowsRead += rowsToRead;
ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead;

int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr);
ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter;

int tmp =
doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr);
// if the last batch are empty after filter, it will return 0 regard less of the
// group num
if (tmp != 0) rowsRet = tmp;
}
int rowsDump = rowsRet;
if (rowsRet > batchSize) {
rowsDump = batchSize;
dumpAggCursor = batchSize;
}

if (aggExprs.size()) {
dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, buffersPtr_,
nullsPtr_);
if (aggExprs.size()) {
dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results,
buffersPtr_, nullsPtr_, 0, rowsDump);
}
if (rowsRet <=
batchSize) { // return all result in one call, so clear buffers here.
map.clear();
keys.clear();
results.clear();
}
rowsRet = rowsDump;
} else { // this row group aggregation result is more than default batch size, we
// will return them via mutilple call
rowsRet = ((keys.size() - dumpAggCursor) > batchSize)
? batchSize
: ((keys.size() - dumpAggCursor));
if (aggExprs.size()) {
dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results,
buffersPtr_, nullsPtr_, dumpAggCursor, rowsRet);
}
if ((keys.size() - dumpAggCursor) <=
batchSize) { // the last batch, let's clear buffers
map.clear();
keys.clear();
results.clear();
dumpAggCursor = 0;
} else {
dumpAggCursor += batchSize;
}
}
map.clear();
keys.clear();
results.clear();
}

ARROW_LOG(DEBUG) << "ret rows " << rowsRet;
Expand Down Expand Up @@ -403,20 +431,21 @@ int Reader::doAggregation(int batchSize, ApeHashMap& map, std::vector<Key>& keys
int Reader::dumpBufferAfterAgg(int groupBySize, int aggExprsSize,
const std::vector<Key>& keys,
const std::vector<DecimalVector>& results,
int64_t* oriBufferPtr, int64_t* oriNullsPtr) {
int64_t* oriBufferPtr, int64_t* oriNullsPtr,
int32_t offset, int32_t length) {
// dump buffers
for (int i = 0; i < groupBySize; i++) {
std::shared_ptr<AttributeReferenceExpression> groupByExpr =
std::static_pointer_cast<AttributeReferenceExpression>(groupByExprs[i]);
int typeIndex = groupByExpr->columnIndex;
DumpUtils::dumpGroupByKeyToJavaBuffer(keys, (uint8_t*)(oriBufferPtr[i]),
(uint8_t*)(oriNullsPtr[i]), i,
typeVector[typeIndex]);
typeVector[typeIndex], offset, length);
}

for (int i = groupBySize; i < aggExprs.size(); i++) {
DumpUtils::dumpToJavaBuffer((uint8_t*)(oriBufferPtr[i]), (uint8_t*)(oriNullsPtr[i]),
results[i]);
results[i], offset, length);
}

return 0;
Expand Down Expand Up @@ -457,7 +486,7 @@ int Reader::allocateExtraBuffers(int batchSize, std::vector<int64_t>& buffersPtr
return initRequiredColumnCount + filterBufferCount + aggBufferCount;
}

bool Reader::hasNext() { return columnReaders[0]->HasNext(); }
bool Reader::hasNext() { return dumpAggCursor > 0 || columnReaders[0]->HasNext(); }

bool Reader::skipNextRowGroup() {
if (totalRowGroupsRead == totalRowGroups) {
Expand Down Expand Up @@ -531,7 +560,7 @@ void Reader::initRowGroupReaders() {
}

bool Reader::checkEndOfRowGroup() {
if (totalRowsRead != totalRowsLoadedSoFar) return false;
if (totalRowsRead != totalRowsLoadedSoFar || dumpAggCursor != 0) return false;
// if a splitFile contains rowGroup [2,5], currentRowGroup is 2
// rowGroupReaders index starts from 0
ARROW_LOG(DEBUG) << "totalRowsLoadedSoFar: " << totalRowsLoadedSoFar;
Expand Down
4 changes: 3 additions & 1 deletion oap-ape/ape-native/src/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Reader {

int dumpBufferAfterAgg(int groupBySize, int aggExprsSize, const std::vector<Key>& keys,
const std::vector<DecimalVector>& results, int64_t* oriBufferPtr,
int64_t* oriNullsPtr);
int64_t* oriNullsPtr, int32_t offset, int32_t length);

arrow::Result<std::shared_ptr<arrow::fs::HadoopFileSystem>> fsResult;
arrow::fs::HdfsOptions* options;
Expand Down Expand Up @@ -162,5 +162,7 @@ class Reader {
std::vector<DecimalVector> results = std::vector<DecimalVector>();
std::vector<Key> keys = std::vector<Key>();
ApeHashMap map;

int32_t dumpAggCursor = 0;
};
} // namespace ape
32 changes: 17 additions & 15 deletions oap-ape/ape-native/src/utils/DumpUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,37 @@ class DumpUtils {
public:
static void dumpGroupByKeyToJavaBuffer(const std::vector<Key>& keys,
uint8_t* bufferAddr, uint8_t* nullAddr,
const int index,
const parquet::Type::type pType) {
const int index, const parquet::Type::type pType,
int32_t offset, int32_t len) {
*(nullAddr + index) = 1;
int len = keys.size();
switch (pType) {
case parquet::Type::INT32: {
for (int i = 0; i < len; i++) {
*((int32_t*)(bufferAddr) + i) = std::get<0>(keys[i][index]);
*((int32_t*)(bufferAddr) + i) = std::get<0>(keys[offset + i][index]);
}
break;
}
case parquet::Type::INT64: {
for (int i = 0; i < len; i++) {
*((int64_t*)(bufferAddr) + i) = std::get<1>(keys[i][index]);
*((int64_t*)(bufferAddr) + i) = std::get<1>(keys[offset + i][index]);
}
break;
}
case parquet::Type::FLOAT: {
for (int i = 0; i < len; i++) {
*((float*)(bufferAddr) + i) = std::get<2>(keys[i][index]);
*((float*)(bufferAddr) + i) = std::get<2>(keys[offset + i][index]);
}
break;
}
case parquet::Type::DOUBLE: {
for (int i = 0; i < len; i++) {
*((double*)(bufferAddr) + i) = std::get<3>(keys[i][index]);
*((double*)(bufferAddr) + i) = std::get<3>(keys[offset + i][index]);
}
break;
}
case parquet::Type::BYTE_ARRAY: {
for (int i = 0; i < len; i++) {
*((parquet::ByteArray*)(bufferAddr) + i) = std::get<4>(keys[i][index]);
*((parquet::ByteArray*)(bufferAddr) + i) = std::get<4>(keys[offset + i][index]);
}
break;
}
Expand All @@ -72,16 +71,18 @@ class DumpUtils {
}

static void dumpToJavaBuffer(uint8_t* bufferAddr, uint8_t* nullAddr,
const DecimalVector& result) {
for (int i = 0; i < result.data.size(); i++) {
const DecimalVector& result, int32_t offset, int32_t len) {
for (int i = 0; i < len; i++) {
*(nullAddr + i) = result.nullVector->at(i);
switch (result.type) {
case ResultType::IntType: {
*((int32_t*)bufferAddr + i) = static_cast<int32_t>(result.data[i].low_bits());
*((int32_t*)bufferAddr + i) =
static_cast<int32_t>(result.data[offset + i].low_bits());
break;
}
case ResultType::LongType: {
*((int64_t*)bufferAddr + i) = static_cast<int64_t>(result.data[i].low_bits());
*((int64_t*)bufferAddr + i) =
static_cast<int64_t>(result.data[offset + i].low_bits());
break;
}
case ResultType::FloatType: {
Expand All @@ -91,16 +92,17 @@ class DumpUtils {
case ResultType::DoubleType: {
// TODO: this is just for UnscaledValue case, if the data type is Double, this
// will not work
arrow::Decimal128 tmp(result.data[i]);
arrow::Decimal128 tmp(result.data[offset + i]);
*((double*)bufferAddr + i) = tmp.ToDouble(0);
break;
}
case ResultType::Decimal64Type: {
*((int64_t*)bufferAddr + i) = static_cast<int64_t>(result.data[i].low_bits());
*((int64_t*)bufferAddr + i) =
static_cast<int64_t>(result.data[offset + i].low_bits());
break;
}
case ResultType::Decimal128Type: {
decimalToBytes(result.data[i], result.precision,
decimalToBytes(result.data[offset + i], result.precision,
(uint8_t*)(bufferAddr + i * 16));
break;
}
Expand Down

0 comments on commit 6123c15

Please sign in to comment.