diff --git a/oap-ape/ape-native/src/reader.cc b/oap-ape/ape-native/src/reader.cc index 473c46820..a1073e2f8 100644 --- a/oap-ape/ape-native/src/reader.cc +++ b/oap-ape/ape-native/src/reader.cc @@ -157,7 +157,7 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr initRowGroupReaders(); // this reader have read all rows - if (totalRowsRead >= totalRows) { + if (totalRowsRead >= totalRows && dumpAggCursor == 0) { return -1; } checkEndOfRowGroup(); @@ -184,34 +184,62 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; rowsRet = doFilter(rowsToRead, buffersPtr, nullsPtr); } else { - results.resize(aggExprs.size()); - for (int i = 0; i < aggExprs.size(); i++) { - std::vector nullVector(1); - results[i].nullVector = std::make_shared>(nullVector); - } - while (totalRowsRead < totalRows && !checkEndOfRowGroup() && - // TODO: refactor. A quick work around to avoid group num exceed batch size. - map.size() < (batchSize / 4)) { - int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); - totalRowsRead += rowsToRead; - ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; - - int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr); - ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter; - - int tmp = doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr); - // if the last batch are empty after filter, it will return 0 regard less of the - // group num - if (tmp != 0) rowsRet = tmp; - } + if (dumpAggCursor == 0) { // will read a whole RowGroup and do agg + results.resize(aggExprs.size()); + for (int i = 0; i < aggExprs.size(); i++) { + std::vector nullVector(1); + results[i].nullVector = std::make_shared>(nullVector); + } + while (totalRowsRead < totalRows && !checkEndOfRowGroup()) { + int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); + totalRowsRead += rowsToRead; + ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; + + int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr); + ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter; + + int tmp = + doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr); + // if the last batch are empty after filter, it will return 0 regard less of the + // group num + if (tmp != 0) rowsRet = tmp; + } + int rowsDump = rowsRet; + if (rowsRet > batchSize) { + rowsDump = batchSize; + dumpAggCursor = batchSize; + } - if (aggExprs.size()) { - dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, buffersPtr_, - nullsPtr_); + if (aggExprs.size()) { + dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, + buffersPtr_, nullsPtr_, 0, rowsDump); + } + if (rowsRet <= + batchSize) { // return all result in one call, so clear buffers here. + map.clear(); + keys.clear(); + results.clear(); + } + rowsRet = rowsDump; + } else { // this row group aggregation result is more than default batch size, we + // will return them via mutilple call + rowsRet = ((keys.size() - dumpAggCursor) > batchSize) + ? batchSize + : ((keys.size() - dumpAggCursor)); + if (aggExprs.size()) { + dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, + buffersPtr_, nullsPtr_, dumpAggCursor, rowsRet); + } + if ((keys.size() - dumpAggCursor) <= + batchSize) { // the last batch, let's clear buffers + map.clear(); + keys.clear(); + results.clear(); + dumpAggCursor = 0; + } else { + dumpAggCursor += batchSize; + } } - map.clear(); - keys.clear(); - results.clear(); } ARROW_LOG(DEBUG) << "ret rows " << rowsRet; @@ -403,7 +431,8 @@ int Reader::doAggregation(int batchSize, ApeHashMap& map, std::vector& keys int Reader::dumpBufferAfterAgg(int groupBySize, int aggExprsSize, const std::vector& keys, const std::vector& results, - int64_t* oriBufferPtr, int64_t* oriNullsPtr) { + int64_t* oriBufferPtr, int64_t* oriNullsPtr, + int32_t offset, int32_t length) { // dump buffers for (int i = 0; i < groupBySize; i++) { std::shared_ptr groupByExpr = @@ -411,12 +440,12 @@ int Reader::dumpBufferAfterAgg(int groupBySize, int aggExprsSize, int typeIndex = groupByExpr->columnIndex; DumpUtils::dumpGroupByKeyToJavaBuffer(keys, (uint8_t*)(oriBufferPtr[i]), (uint8_t*)(oriNullsPtr[i]), i, - typeVector[typeIndex]); + typeVector[typeIndex], offset, length); } for (int i = groupBySize; i < aggExprs.size(); i++) { DumpUtils::dumpToJavaBuffer((uint8_t*)(oriBufferPtr[i]), (uint8_t*)(oriNullsPtr[i]), - results[i]); + results[i], offset, length); } return 0; @@ -457,7 +486,7 @@ int Reader::allocateExtraBuffers(int batchSize, std::vector& buffersPtr return initRequiredColumnCount + filterBufferCount + aggBufferCount; } -bool Reader::hasNext() { return columnReaders[0]->HasNext(); } +bool Reader::hasNext() { return dumpAggCursor > 0 || columnReaders[0]->HasNext(); } bool Reader::skipNextRowGroup() { if (totalRowGroupsRead == totalRowGroups) { @@ -531,7 +560,7 @@ void Reader::initRowGroupReaders() { } bool Reader::checkEndOfRowGroup() { - if (totalRowsRead != totalRowsLoadedSoFar) return false; + if (totalRowsRead != totalRowsLoadedSoFar || dumpAggCursor != 0) return false; // if a splitFile contains rowGroup [2,5], currentRowGroup is 2 // rowGroupReaders index starts from 0 ARROW_LOG(DEBUG) << "totalRowsLoadedSoFar: " << totalRowsLoadedSoFar; diff --git a/oap-ape/ape-native/src/reader.h b/oap-ape/ape-native/src/reader.h index 3c5d3a58f..17930ac86 100644 --- a/oap-ape/ape-native/src/reader.h +++ b/oap-ape/ape-native/src/reader.h @@ -98,7 +98,7 @@ class Reader { int dumpBufferAfterAgg(int groupBySize, int aggExprsSize, const std::vector& keys, const std::vector& results, int64_t* oriBufferPtr, - int64_t* oriNullsPtr); + int64_t* oriNullsPtr, int32_t offset, int32_t length); arrow::Result> fsResult; arrow::fs::HdfsOptions* options; @@ -162,5 +162,7 @@ class Reader { std::vector results = std::vector(); std::vector keys = std::vector(); ApeHashMap map; + + int32_t dumpAggCursor = 0; }; } // namespace ape diff --git a/oap-ape/ape-native/src/utils/DumpUtils.h b/oap-ape/ape-native/src/utils/DumpUtils.h index dc3da03b8..126f6cfa9 100644 --- a/oap-ape/ape-native/src/utils/DumpUtils.h +++ b/oap-ape/ape-native/src/utils/DumpUtils.h @@ -29,38 +29,37 @@ class DumpUtils { public: static void dumpGroupByKeyToJavaBuffer(const std::vector& keys, uint8_t* bufferAddr, uint8_t* nullAddr, - const int index, - const parquet::Type::type pType) { + const int index, const parquet::Type::type pType, + int32_t offset, int32_t len) { *(nullAddr + index) = 1; - int len = keys.size(); switch (pType) { case parquet::Type::INT32: { for (int i = 0; i < len; i++) { - *((int32_t*)(bufferAddr) + i) = std::get<0>(keys[i][index]); + *((int32_t*)(bufferAddr) + i) = std::get<0>(keys[offset + i][index]); } break; } case parquet::Type::INT64: { for (int i = 0; i < len; i++) { - *((int64_t*)(bufferAddr) + i) = std::get<1>(keys[i][index]); + *((int64_t*)(bufferAddr) + i) = std::get<1>(keys[offset + i][index]); } break; } case parquet::Type::FLOAT: { for (int i = 0; i < len; i++) { - *((float*)(bufferAddr) + i) = std::get<2>(keys[i][index]); + *((float*)(bufferAddr) + i) = std::get<2>(keys[offset + i][index]); } break; } case parquet::Type::DOUBLE: { for (int i = 0; i < len; i++) { - *((double*)(bufferAddr) + i) = std::get<3>(keys[i][index]); + *((double*)(bufferAddr) + i) = std::get<3>(keys[offset + i][index]); } break; } case parquet::Type::BYTE_ARRAY: { for (int i = 0; i < len; i++) { - *((parquet::ByteArray*)(bufferAddr) + i) = std::get<4>(keys[i][index]); + *((parquet::ByteArray*)(bufferAddr) + i) = std::get<4>(keys[offset + i][index]); } break; } @@ -72,16 +71,18 @@ class DumpUtils { } static void dumpToJavaBuffer(uint8_t* bufferAddr, uint8_t* nullAddr, - const DecimalVector& result) { - for (int i = 0; i < result.data.size(); i++) { + const DecimalVector& result, int32_t offset, int32_t len) { + for (int i = 0; i < len; i++) { *(nullAddr + i) = result.nullVector->at(i); switch (result.type) { case ResultType::IntType: { - *((int32_t*)bufferAddr + i) = static_cast(result.data[i].low_bits()); + *((int32_t*)bufferAddr + i) = + static_cast(result.data[offset + i].low_bits()); break; } case ResultType::LongType: { - *((int64_t*)bufferAddr + i) = static_cast(result.data[i].low_bits()); + *((int64_t*)bufferAddr + i) = + static_cast(result.data[offset + i].low_bits()); break; } case ResultType::FloatType: { @@ -91,16 +92,17 @@ class DumpUtils { case ResultType::DoubleType: { // TODO: this is just for UnscaledValue case, if the data type is Double, this // will not work - arrow::Decimal128 tmp(result.data[i]); + arrow::Decimal128 tmp(result.data[offset + i]); *((double*)bufferAddr + i) = tmp.ToDouble(0); break; } case ResultType::Decimal64Type: { - *((int64_t*)bufferAddr + i) = static_cast(result.data[i].low_bits()); + *((int64_t*)bufferAddr + i) = + static_cast(result.data[offset + i].low_bits()); break; } case ResultType::Decimal128Type: { - decimalToBytes(result.data[i], result.precision, + decimalToBytes(result.data[offset + i], result.precision, (uint8_t*)(bufferAddr + i * 16)); break; }