Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit e342fae

Browse files
committed
[Join] Inline and parallelize tbb in getAllTableColumnFragments.
This commit refactors and simplifies method `getAllTableColumnFragments`. Also some parallelization added. Partially resolves: #574 Signed-off-by: Dmitrii Makarenko <[email protected]>
1 parent 4f37224 commit e342fae

File tree

11 files changed

+199
-106
lines changed

11 files changed

+199
-106
lines changed

omniscidb/DataMgr/BufferMgr/Buffer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,10 @@ class Buffer : public AbstractBuffer {
135135
}
136136

137137
inline int unPin() override {
138-
std::lock_guard<std::mutex> pin_lock(pin_mutex_);
138+
std::unique_lock<std::mutex> pin_lock(pin_mutex_);
139139
int res = (--pin_count_);
140140
if (!res && delete_on_unpin_) {
141+
pin_lock.unlock();
141142
delete this;
142143
}
143144
return res;

omniscidb/DataMgr/BufferMgr/BufferMgr.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,12 @@ void BufferMgr::clear() {
102102
// pinned. Here we delete unpinned buffers and mark pinned buffers
103103
// for removal to have them deleted when unpinned.
104104
for (auto& buf : chunk_index_) {
105-
if (buf.second->buffer) {
106-
buf.second->buffer->deleteWhenUnpinned();
107-
buf.second->buffer = nullptr;
105+
// Iterator can be empty, check Buffer c-tors
106+
if (buf.second == BufferList::iterator()) {
107+
if (buf.second->buffer) {
108+
buf.second->buffer->deleteWhenUnpinned();
109+
buf.second->buffer = nullptr;
110+
}
108111
}
109112
}
110113

@@ -623,6 +626,7 @@ void BufferMgr::deleteBuffer(const ChunkKey& key, const bool) {
623626
chunk_index_lock.unlock();
624627
std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
625628
if (seg_it->buffer) {
629+
CHECK_EQ(seg_it->buffer->getPinCount(), 0);
626630
delete seg_it->buffer; // Delete Buffer for segment
627631
seg_it->buffer = 0;
628632
}
@@ -831,6 +835,10 @@ void BufferMgr::free(AbstractBuffer* buffer) {
831835
if (casted_buffer == 0) {
832836
LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type.";
833837
}
838+
if (casted_buffer->seg_it_ == BufferList::iterator()) {
839+
delete buffer;
840+
}
841+
834842
deleteBuffer(casted_buffer->seg_it_->chunk_key);
835843
}
836844

omniscidb/QueryEngine/ColumnFetcher.cpp

Lines changed: 94 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,16 @@
1616

1717
#include "QueryEngine/ColumnFetcher.h"
1818

19-
#include <memory>
20-
2119
#include "DataMgr/ArrayNoneEncoder.h"
2220
#include "QueryEngine/ErrorHandling.h"
2321
#include "QueryEngine/Execute.h"
2422
#include "Shared/Intervals.h"
2523
#include "Shared/likely.h"
2624
#include "Shared/sqltypes.h"
2725

26+
#include <tbb/parallel_for.h>
27+
#include <memory>
28+
2829
namespace {
2930

3031
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel) {
@@ -239,6 +240,11 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
239240
int db_id = col_info->db_id;
240241
int table_id = col_info->table_id;
241242
int col_id = col_info->column_id;
243+
if (col_info->type->isString() || col_info->type->isArray()) {
244+
throw std::runtime_error(
245+
"Array type passed to getAllTableColumnFragments. Should be handled in "
246+
"linearization.");
247+
}
242248
const auto fragments_it = all_tables_fragments.find({db_id, table_id});
243249
CHECK(fragments_it != all_tables_fragments.end());
244250
const auto fragments = fragments_it->second;
@@ -248,7 +254,6 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
248254
const InputDescriptor table_desc(db_id, table_id, int(0));
249255
{
250256
std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
251-
252257
auto col_token = data_provider_->getZeroCopyColumnData(*col_info);
253258
if (col_token != nullptr) {
254259
size_t num_rows = col_token->getSize() / col_token->getType()->size();
@@ -262,44 +267,95 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
262267
}
263268

264269
auto column_it = columnarized_scan_table_cache_.find({table_id, col_id});
265-
if (column_it == columnarized_scan_table_cache_.end()) {
266-
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
267-
if (executor_->getConfig()
268-
.exec.interrupt.enable_non_kernel_time_query_interrupt &&
269-
executor_->checkNonKernelTimeInterrupted()) {
270-
throw QueryExecutionError(Executor::ERR_INTERRUPTED);
271-
}
272-
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
273-
std::list<ChunkIter> chunk_iter_holder;
274-
const auto& fragment = (*fragments)[frag_id];
275-
if (fragment.isEmptyPhysicalFragment()) {
276-
continue;
277-
}
278-
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
279-
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
280-
auto col_buffer = getOneTableColumnFragment(col_info,
281-
static_cast<int>(frag_id),
282-
all_tables_fragments,
283-
chunk_holder,
284-
chunk_iter_holder,
285-
Data_Namespace::CPU_LEVEL,
286-
int(0),
287-
device_allocator);
288-
column_frags.push_back(
289-
std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
290-
col_buffer,
291-
fragment.getNumTuples(),
292-
chunk_meta_it->second->type(),
293-
thread_idx));
270+
if (column_it != columnarized_scan_table_cache_.end()) {
271+
table_column = column_it->second.get();
272+
return ColumnFetcher::transferColumnIfNeeded(
273+
table_column, 0, memory_level, device_id, device_allocator);
274+
}
275+
276+
size_t total_row_count = 0;
277+
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
278+
if (executor_->getConfig().exec.interrupt.enable_non_kernel_time_query_interrupt &&
279+
executor_->checkNonKernelTimeInterrupted()) {
280+
throw QueryExecutionError(Executor::ERR_INTERRUPTED);
294281
}
295-
auto merged_results =
296-
ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
282+
const auto& fragment = (*fragments)[frag_id];
283+
const auto rows_in_frag = fragment.getNumTuples();
284+
total_row_count += rows_in_frag;
285+
}
286+
287+
const auto type_width = col_info->type->size();
288+
auto write_ptr =
289+
executor_->row_set_mem_owner_->allocate(type_width * total_row_count);
290+
std::vector<std::pair<int8_t*, size_t>> write_ptrs;
291+
std::vector<size_t> valid_fragments;
292+
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
293+
const auto& fragment = (*fragments)[frag_id];
294+
if (fragment.isEmptyPhysicalFragment()) {
295+
continue;
296+
}
297+
CHECK_EQ(type_width, fragment.getChunkMetadataMap().at(col_id)->type()->size());
298+
write_ptrs.push_back({write_ptr, fragment.getNumTuples() * type_width});
299+
write_ptr += fragment.getNumTuples() * type_width;
300+
valid_fragments.push_back(frag_id);
301+
}
302+
303+
if (write_ptrs.empty()) {
304+
std::unique_ptr<ColumnarResults> merged_results(nullptr);
305+
297306
table_column = merged_results.get();
298307
columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id),
299308
std::move(merged_results));
300-
} else {
301-
table_column = column_it->second.get();
309+
310+
return ColumnFetcher::transferColumnIfNeeded(
311+
table_column, 0, memory_level, device_id, device_allocator);
302312
}
313+
314+
size_t valid_frag_count = valid_fragments.size();
315+
tbb::parallel_for(
316+
tbb::blocked_range<size_t>(0, valid_frag_count),
317+
[&](const tbb::blocked_range<size_t>& frag_ids) {
318+
for (size_t v_frag_id = frag_ids.begin(); v_frag_id < frag_ids.end();
319+
++v_frag_id) {
320+
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
321+
std::list<ChunkIter> chunk_iter_holder;
322+
size_t frag_id = valid_fragments[v_frag_id];
323+
const auto& fragment = (*fragments)[frag_id];
324+
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
325+
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
326+
std::shared_ptr<Chunk_NS::Chunk> chunk;
327+
{
328+
ChunkKey chunk_key{
329+
db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
330+
chunk = data_provider_->getChunk(col_info,
331+
chunk_key,
332+
Data_Namespace::CPU_LEVEL,
333+
0,
334+
chunk_meta_it->second->numBytes(),
335+
chunk_meta_it->second->numElements());
336+
std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
337+
chunk_holder.push_back(chunk);
338+
}
339+
auto ab = chunk->getBuffer();
340+
CHECK(ab->getMemoryPtr());
341+
int8_t* col_buffer =
342+
ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
343+
memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second);
344+
}
345+
});
346+
347+
std::vector<int8_t*> raw_write_ptrs;
348+
raw_write_ptrs.reserve(frag_count);
349+
for (size_t i = 0; i < frag_count; i++) {
350+
raw_write_ptrs.emplace_back(write_ptrs[i].first);
351+
}
352+
353+
std::unique_ptr<ColumnarResults> merged_results(new ColumnarResults(
354+
std::move(raw_write_ptrs), total_row_count, col_info->type, thread_idx));
355+
356+
table_column = merged_results.get();
357+
columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id),
358+
std::move(merged_results));
303359
}
304360
return ColumnFetcher::transferColumnIfNeeded(
305361
table_column, 0, memory_level, device_id, device_allocator);
@@ -469,8 +525,10 @@ const int8_t* ColumnFetcher::linearizeColumnFragments(
469525
}
470526
}
471527
CHECK(res.first); // check merged data buffer
528+
res.first->deleteWhenUnpinned();
472529
if (!type->isFixedLenArray()) {
473530
CHECK(res.second); // check merged index buffer
531+
res.second->deleteWhenUnpinned();
474532
}
475533
auto merged_data_buffer = res.first;
476534
auto merged_index_buffer = res.second;
@@ -1009,34 +1067,3 @@ ChunkIter ColumnFetcher::prepareChunkIter(AbstractBuffer* merged_data_buf,
10091067
merged_chunk_iter.elem_type_size = chunk_iter.elem_type_size;
10101068
return merged_chunk_iter;
10111069
}
1012-
1013-
void ColumnFetcher::freeLinearizedBuf() {
1014-
std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1015-
auto buffer_provider = executor_->getBufferProvider();
1016-
1017-
if (!linearized_data_buf_cache_.empty()) {
1018-
for (auto& kv : linearized_data_buf_cache_) {
1019-
for (auto& kv2 : kv.second) {
1020-
buffer_provider->free(kv2.second);
1021-
}
1022-
}
1023-
}
1024-
1025-
if (!linearized_idx_buf_cache_.empty()) {
1026-
for (auto& kv : linearized_idx_buf_cache_) {
1027-
for (auto& kv2 : kv.second) {
1028-
buffer_provider->free(kv2.second);
1029-
}
1030-
}
1031-
}
1032-
}
1033-
1034-
void ColumnFetcher::freeTemporaryCpuLinearizedIdxBuf() {
1035-
std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
1036-
auto buffer_provider = executor_->getBufferProvider();
1037-
if (!linearlized_temporary_cpu_index_buf_cache_.empty()) {
1038-
for (auto& kv : linearlized_temporary_cpu_index_buf_cache_) {
1039-
buffer_provider->free(kv.second);
1040-
}
1041-
}
1042-
}

omniscidb/QueryEngine/ColumnFetcher.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,6 @@ class ColumnFetcher {
9393
DeviceAllocator* device_allocator,
9494
const size_t thread_idx) const;
9595

96-
void freeTemporaryCpuLinearizedIdxBuf();
97-
void freeLinearizedBuf();
98-
9996
DataProvider* getDataProvider() const { return data_provider_; }
10097

10198
private:

omniscidb/QueryEngine/Execute.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2186,10 +2186,6 @@ hdk::ResultSetTable Executor::executeWorkUnitImpl(
21862186
do {
21872187
SharedKernelContext shared_context(query_infos);
21882188
ColumnFetcher column_fetcher(this, data_provider, column_cache);
2189-
ScopeGuard scope_guard = [&column_fetcher] {
2190-
column_fetcher.freeLinearizedBuf();
2191-
column_fetcher.freeTemporaryCpuLinearizedIdxBuf();
2192-
};
21932189

21942190
if (ra_exe_unit.isShuffle()) {
21952191
allocateShuffleBuffers(query_infos, ra_exe_unit, row_set_mem_owner, shared_context);

omniscidb/QueryEngine/JoinHashTable/Runtime/JoinColumnIterator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ struct JoinColumnIterator {
7373
DEVICE FORCE_INLINE JoinColumnIterator& operator++() {
7474
index += step;
7575
index_inside_chunk += step;
76+
// this loop is made to find index_of_chunk by total index of element
7677
while (chunk_data &&
7778
index_inside_chunk >= join_chunk_array[index_of_chunk].num_elems) {
7879
index_inside_chunk -= join_chunk_array[index_of_chunk].num_elems;

omniscidb/ResultSetRegistry/ColumnarResults.cpp

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -122,28 +122,6 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
122122
}
123123
}
124124

125-
ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
126-
const int8_t* one_col_buffer,
127-
const size_t num_rows,
128-
const hdk::ir::Type* target_type,
129-
const size_t thread_idx)
130-
: column_buffers_(1)
131-
, num_rows_(num_rows)
132-
, target_types_{target_type}
133-
, parallel_conversion_(false)
134-
, direct_columnar_conversion_(false)
135-
, thread_idx_(thread_idx) {
136-
auto timer = DEBUG_TIMER(__func__);
137-
138-
if (target_type->isVarLen()) {
139-
throw ColumnarConversionNotSupported();
140-
}
141-
const auto buf_size = num_rows * target_type->size();
142-
column_buffers_[0] =
143-
reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
144-
memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
145-
}
146-
147125
ColumnarResults::ColumnarResults(const std::vector<int8_t*> one_col_buffer,
148126
const size_t num_rows,
149127
const hdk::ir::Type* target_type,

omniscidb/ResultSetRegistry/ColumnarResults.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,6 @@ class ColumnarResults {
6767
const Config& config,
6868
const bool is_parallel_execution_enforced = false);
6969

70-
ColumnarResults(const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
71-
const int8_t* one_col_buffer,
72-
const size_t num_rows,
73-
const hdk::ir::Type* target_type,
74-
const size_t thread_idx);
75-
7670
ColumnarResults(const std::vector<int8_t*> one_col_buffer,
7771
const size_t num_rows,
7872
const hdk::ir::Type* target_type,

omniscidb/Tests/ArrowBasedExecuteTest.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9268,6 +9268,15 @@ TEST_F(Select, Joins_Fixed_Size_Array_Multi_Frag) {
92689268
v<int64_t>(run_simple_agg(
92699269
"SELECT COUNT(1) FROM mf_t_arr r1, mf_t_arr r2 WHERE r1.t2[1] = r2.t2[1]",
92709270
dt)));
9271+
// {
9272+
// std::ostringstream oss_l;
9273+
// oss_l << "SELECT * FROM mf_d_arr t1, mf_d_arr t2 WHERE ";
9274+
// auto part = oss_l.str();
9275+
// auto q_{part + "t1.c2[1] = t2.c2[1];"};
9276+
// const auto rows = run_multiple_agg(q_, dt);
9277+
// LOG(ERROR) << rows->contentToString(true);
9278+
// LOG(ERROR) << "should be done.";
9279+
// }
92719280
}
92729281

92739282
dropTable("mf_f_arr");

0 commit comments

Comments
 (0)