Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

[SQL-DS-CACHE-124] [POAE7-1133] Fix q44 case #151

Merged
merged 2 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 61 additions & 32 deletions oap-ape/ape-native/src/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr
initRowGroupReaders();

// this reader have read all rows
if (totalRowsRead >= totalRows) {
if (totalRowsRead >= totalRows && dumpAggCursor == 0) {
return -1;
}
checkEndOfRowGroup();
Expand All @@ -184,34 +184,62 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr
ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead;
rowsRet = doFilter(rowsToRead, buffersPtr, nullsPtr);
} else {
results.resize(aggExprs.size());
for (int i = 0; i < aggExprs.size(); i++) {
std::vector<uint8_t> nullVector(1);
results[i].nullVector = std::make_shared<std::vector<uint8_t>>(nullVector);
}
while (totalRowsRead < totalRows && !checkEndOfRowGroup() &&
// TODO: refactor. A quick work around to avoid group num exceed batch size.
map.size() < (batchSize / 4)) {
int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr);
totalRowsRead += rowsToRead;
ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead;

int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr);
ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter;

int tmp = doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr);
// if the last batch are empty after filter, it will return 0 regard less of the
// group num
if (tmp != 0) rowsRet = tmp;
}
if (dumpAggCursor == 0) { // will read a whole RowGroup and do agg
results.resize(aggExprs.size());
for (int i = 0; i < aggExprs.size(); i++) {
std::vector<uint8_t> nullVector(1);
results[i].nullVector = std::make_shared<std::vector<uint8_t>>(nullVector);
}
while (totalRowsRead < totalRows && !checkEndOfRowGroup()) {
int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr);
totalRowsRead += rowsToRead;
ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead;

int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr);
ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter;

int tmp =
doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr);
// if the last batch are empty after filter, it will return 0 regard less of the
// group num
if (tmp != 0) rowsRet = tmp;
}
int rowsDump = rowsRet;
if (rowsRet > batchSize) {
rowsDump = batchSize;
dumpAggCursor = batchSize;
}

if (aggExprs.size()) {
dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, buffersPtr_,
nullsPtr_);
if (aggExprs.size()) {
dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results,
buffersPtr_, nullsPtr_, 0, rowsDump);
}
if (rowsRet <=
batchSize) { // return all result in one call, so clear buffers here.
map.clear();
keys.clear();
results.clear();
}
rowsRet = rowsDump;
} else { // this row group aggregation result is more than default batch size, we
// will return them via mutilple call
rowsRet = ((keys.size() - dumpAggCursor) > batchSize)
? batchSize
: ((keys.size() - dumpAggCursor));
if (aggExprs.size()) {
dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results,
buffersPtr_, nullsPtr_, dumpAggCursor, rowsRet);
}
if ((keys.size() - dumpAggCursor) <=
batchSize) { // the last batch, let's clear buffers
map.clear();
keys.clear();
results.clear();
dumpAggCursor = 0;
} else {
dumpAggCursor += batchSize;
}
}
map.clear();
keys.clear();
results.clear();
}

ARROW_LOG(DEBUG) << "ret rows " << rowsRet;
Expand Down Expand Up @@ -403,20 +431,21 @@ int Reader::doAggregation(int batchSize, ApeHashMap& map, std::vector<Key>& keys
int Reader::dumpBufferAfterAgg(int groupBySize, int aggExprsSize,
const std::vector<Key>& keys,
const std::vector<DecimalVector>& results,
int64_t* oriBufferPtr, int64_t* oriNullsPtr) {
int64_t* oriBufferPtr, int64_t* oriNullsPtr,
int32_t offset, int32_t length) {
// dump buffers
for (int i = 0; i < groupBySize; i++) {
std::shared_ptr<AttributeReferenceExpression> groupByExpr =
std::static_pointer_cast<AttributeReferenceExpression>(groupByExprs[i]);
int typeIndex = groupByExpr->columnIndex;
DumpUtils::dumpGroupByKeyToJavaBuffer(keys, (uint8_t*)(oriBufferPtr[i]),
(uint8_t*)(oriNullsPtr[i]), i,
typeVector[typeIndex]);
typeVector[typeIndex], offset, length);
}

for (int i = groupBySize; i < aggExprs.size(); i++) {
DumpUtils::dumpToJavaBuffer((uint8_t*)(oriBufferPtr[i]), (uint8_t*)(oriNullsPtr[i]),
results[i]);
results[i], offset, length);
}

return 0;
Expand Down Expand Up @@ -457,7 +486,7 @@ int Reader::allocateExtraBuffers(int batchSize, std::vector<int64_t>& buffersPtr
return initRequiredColumnCount + filterBufferCount + aggBufferCount;
}

bool Reader::hasNext() { return columnReaders[0]->HasNext(); }
bool Reader::hasNext() { return dumpAggCursor > 0 || columnReaders[0]->HasNext(); }

bool Reader::skipNextRowGroup() {
if (totalRowGroupsRead == totalRowGroups) {
Expand Down Expand Up @@ -531,7 +560,7 @@ void Reader::initRowGroupReaders() {
}

bool Reader::checkEndOfRowGroup() {
if (totalRowsRead != totalRowsLoadedSoFar) return false;
if (totalRowsRead != totalRowsLoadedSoFar || dumpAggCursor != 0) return false;
// if a splitFile contains rowGroup [2,5], currentRowGroup is 2
// rowGroupReaders index starts from 0
ARROW_LOG(DEBUG) << "totalRowsLoadedSoFar: " << totalRowsLoadedSoFar;
Expand Down
4 changes: 3 additions & 1 deletion oap-ape/ape-native/src/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Reader {

int dumpBufferAfterAgg(int groupBySize, int aggExprsSize, const std::vector<Key>& keys,
const std::vector<DecimalVector>& results, int64_t* oriBufferPtr,
int64_t* oriNullsPtr);
int64_t* oriNullsPtr, int32_t offset, int32_t length);

arrow::Result<std::shared_ptr<arrow::fs::HadoopFileSystem>> fsResult;
arrow::fs::HdfsOptions* options;
Expand Down Expand Up @@ -162,5 +162,7 @@ class Reader {
std::vector<DecimalVector> results = std::vector<DecimalVector>();
std::vector<Key> keys = std::vector<Key>();
ApeHashMap map;

int32_t dumpAggCursor = 0;
};
} // namespace ape
32 changes: 17 additions & 15 deletions oap-ape/ape-native/src/utils/DumpUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,37 @@ class DumpUtils {
public:
static void dumpGroupByKeyToJavaBuffer(const std::vector<Key>& keys,
uint8_t* bufferAddr, uint8_t* nullAddr,
const int index,
const parquet::Type::type pType) {
const int index, const parquet::Type::type pType,
int32_t offset, int32_t len) {
*(nullAddr + index) = 1;
int len = keys.size();
switch (pType) {
case parquet::Type::INT32: {
for (int i = 0; i < len; i++) {
*((int32_t*)(bufferAddr) + i) = std::get<0>(keys[i][index]);
*((int32_t*)(bufferAddr) + i) = std::get<0>(keys[offset + i][index]);
}
break;
}
case parquet::Type::INT64: {
for (int i = 0; i < len; i++) {
*((int64_t*)(bufferAddr) + i) = std::get<1>(keys[i][index]);
*((int64_t*)(bufferAddr) + i) = std::get<1>(keys[offset + i][index]);
}
break;
}
case parquet::Type::FLOAT: {
for (int i = 0; i < len; i++) {
*((float*)(bufferAddr) + i) = std::get<2>(keys[i][index]);
*((float*)(bufferAddr) + i) = std::get<2>(keys[offset + i][index]);
}
break;
}
case parquet::Type::DOUBLE: {
for (int i = 0; i < len; i++) {
*((double*)(bufferAddr) + i) = std::get<3>(keys[i][index]);
*((double*)(bufferAddr) + i) = std::get<3>(keys[offset + i][index]);
}
break;
}
case parquet::Type::BYTE_ARRAY: {
for (int i = 0; i < len; i++) {
*((parquet::ByteArray*)(bufferAddr) + i) = std::get<4>(keys[i][index]);
*((parquet::ByteArray*)(bufferAddr) + i) = std::get<4>(keys[offset + i][index]);
}
break;
}
Expand All @@ -72,16 +71,18 @@ class DumpUtils {
}

static void dumpToJavaBuffer(uint8_t* bufferAddr, uint8_t* nullAddr,
const DecimalVector& result) {
for (int i = 0; i < result.data.size(); i++) {
const DecimalVector& result, int32_t offset, int32_t len) {
for (int i = 0; i < len; i++) {
*(nullAddr + i) = result.nullVector->at(i);
switch (result.type) {
case ResultType::IntType: {
*((int32_t*)bufferAddr + i) = static_cast<int32_t>(result.data[i].low_bits());
*((int32_t*)bufferAddr + i) =
static_cast<int32_t>(result.data[offset + i].low_bits());
break;
}
case ResultType::LongType: {
*((int64_t*)bufferAddr + i) = static_cast<int64_t>(result.data[i].low_bits());
*((int64_t*)bufferAddr + i) =
static_cast<int64_t>(result.data[offset + i].low_bits());
break;
}
case ResultType::FloatType: {
Expand All @@ -91,16 +92,17 @@ class DumpUtils {
case ResultType::DoubleType: {
// TODO: this is just for UnscaledValue case, if the data type is Double, this
// will not work
arrow::Decimal128 tmp(result.data[i]);
arrow::Decimal128 tmp(result.data[offset + i]);
*((double*)bufferAddr + i) = tmp.ToDouble(0);
break;
}
case ResultType::Decimal64Type: {
*((int64_t*)bufferAddr + i) = static_cast<int64_t>(result.data[i].low_bits());
*((int64_t*)bufferAddr + i) =
static_cast<int64_t>(result.data[offset + i].low_bits());
break;
}
case ResultType::Decimal128Type: {
decimalToBytes(result.data[i], result.precision,
decimalToBytes(result.data[offset + i], result.precision,
(uint8_t*)(bufferAddr + i * 16));
break;
}
Expand Down