Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 6cb14ee

Browse files
authored
[Join] InitHashTable optimisation (#663)
This commit reworks `fill_hash_join_buff_bucketized_cpu` to use tbb and utilize cpu properly. Partially resolves: #574 Signed-off-by: Dmitrii Makarenko <[email protected]>
1 parent 54dbc06 commit 6cb14ee

File tree

7 files changed

+373
-97
lines changed

7 files changed

+373
-97
lines changed

omniscidb/QueryEngine/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ set(query_engine_source_files
6161
JoinHashTable/HashTable.cpp
6262
JoinHashTable/PerfectJoinHashTable.cpp
6363
JoinHashTable/Runtime/HashJoinRuntime.cpp
64+
JoinHashTable/Runtime/HashJoinRuntimeCpu.cpp
6465
L0Kernel.cpp
6566
LogicalIR.cpp
6667
LLVMFunctionAttributesUtil.cpp

omniscidb/QueryEngine/ColumnFetcher.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,8 @@ JoinColumn ColumnFetcher::makeJoinColumn(
146146
data_provider,
147147
column_cache);
148148
if (col_buff != nullptr) {
149+
join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count, num_elems};
149150
num_elems += elem_count;
150-
join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
151151
} else {
152152
continue;
153153
}

omniscidb/QueryEngine/JoinHashTable/Builders/PerfectHashTableBuilder.h

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#pragma once
1818

1919
#include "QueryEngine/JoinHashTable/PerfectHashTable.h"
20+
#include "QueryEngine/JoinHashTable/Runtime/HashJoinRuntimeCpu.h"
2021

2122
#include "Shared/scope.h"
2223

@@ -166,8 +167,6 @@ class PerfectJoinHashTableBuilder {
166167
0);
167168

168169
auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
169-
const int thread_count = cpu_threads();
170-
std::vector<std::thread> init_cpu_buff_threads;
171170

172171
{
173172
auto timer_init = DEBUG_TIMER("CPU One-To-One Perfect-Hash: init_hash_join_buff");
@@ -176,54 +175,36 @@ class PerfectJoinHashTableBuilder {
176175
hash_join_invalid_val);
177176
}
178177
const bool for_semi_join = for_semi_anti_join(join_type);
179-
std::atomic<int> err{0};
180178
{
181179
auto timer_fill =
182-
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized");
183-
for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
184-
init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
185-
&join_column,
186-
str_proxy_translation_map,
187-
thread_idx,
188-
thread_count,
189-
type,
190-
&err,
191-
&col_range,
192-
&is_bitwise_eq,
193-
&for_semi_join,
194-
cpu_hash_table_buff,
195-
hash_entry_info] {
196-
int partial_err = fill_hash_join_buff_bucketized(
197-
cpu_hash_table_buff,
198-
hash_join_invalid_val,
199-
for_semi_join,
200-
join_column,
201-
{static_cast<size_t>(type->size()),
202-
col_range.getIntMin(),
203-
col_range.getIntMax(),
204-
inline_fixed_encoding_null_value(type),
205-
is_bitwise_eq,
206-
col_range.getIntMax() + 1,
207-
get_join_column_type_kind(type)},
208-
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
209-
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
210-
: 0, // 0 is dummy value
211-
thread_idx,
212-
thread_count,
213-
hash_entry_info.bucket_normalization);
214-
int zero{0};
215-
err.compare_exchange_strong(zero, partial_err);
216-
});
217-
}
218-
for (auto& t : init_cpu_buff_threads) {
219-
t.join();
180+
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized_cpu");
181+
182+
{
183+
JoinColumnTypeInfo type_info{static_cast<size_t>(type->size()),
184+
col_range.getIntMin(),
185+
col_range.getIntMax(),
186+
inline_fixed_encoding_null_value(type),
187+
is_bitwise_eq,
188+
col_range.getIntMax() + 1,
189+
get_join_column_type_kind(type)};
190+
191+
int error = fill_hash_join_buff_bucketized_cpu(
192+
cpu_hash_table_buff,
193+
hash_join_invalid_val,
194+
for_semi_join,
195+
join_column,
196+
type_info,
197+
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
198+
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
199+
: 0, // 0 is dummy value
200+
hash_entry_info.bucket_normalization);
201+
if (error) {
202+
// Too many hash entries, need to retry with a 1:many table
203+
hash_table_ = nullptr; // clear the hash table buffer
204+
throw NeedsOneToManyHash();
205+
}
220206
}
221207
}
222-
if (err) {
223-
// Too many hash entries, need to retry with a 1:many table
224-
hash_table_ = nullptr; // clear the hash table buffer
225-
throw NeedsOneToManyHash();
226-
}
227208
}
228209

229210
void initOneToManyHashTableOnCpu(

omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.cpp

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#else
2929
#include "Logger/Logger.h"
3030

31+
#include "HashJoinRuntimeCpu.h"
3132
#include "QueryEngine/RuntimeFunctions.h"
3233
#include "Shared/likely.h"
3334
#include "StringDictionary/StringDictionary.h"
@@ -53,56 +54,6 @@
5354
#ifndef __CUDACC__
5455
namespace {
5556

56-
/**
57-
* Joins between two dictionary encoded string columns without a shared string dictionary
58-
* are computed by translating the inner dictionary to the outer dictionary while filling
59-
* the hash table. The translation works as follows:
60-
*
61-
* Given two tables t1 and t2, with t1 the outer table and t2 the inner table, and two
62-
* columns t1.x and t2.x, both dictionary encoded strings without a shared dictionary, we
63-
* read each value in t2.x and do a lookup in the dictionary for t1.x. If the lookup
64-
* returns a valid ID, we insert that ID into the hash table. Otherwise, we skip adding an
65-
* entry into the hash table for the inner column. We can also skip adding any entries
66-
* that are outside the range of the outer column.
67-
*
68-
* Consider a join of the form SELECT x, n FROM (SELECT x, COUNT(*) n FROM t1 GROUP BY x
69-
* HAVING n > 10), t2 WHERE t1.x = t2.x; Let the result of the subquery be t1_s.
70-
* Due to the HAVING clause, the range of all IDs in t1_s must be less than or equal to
71-
* the range of all IDs in t1. Suppose we have an element a in t2.x that is also in
72-
* t1_s.x. Then the ID of a must be within the range of t1_s. Therefore it is safe to
73-
* ignore any element ID that is not in the dictionary corresponding to t1_s.x or is
74-
* outside the range of column t1_s.
75-
*/
76-
inline int64_t translate_str_id_to_outer_dict(const int64_t elem,
77-
const int64_t min_elem,
78-
const int64_t max_elem,
79-
const void* sd_inner_proxy,
80-
const void* sd_outer_proxy) {
81-
CHECK(sd_outer_proxy);
82-
const auto sd_inner_dict_proxy =
83-
static_cast<const StringDictionaryProxy*>(sd_inner_proxy);
84-
const auto sd_outer_dict_proxy =
85-
static_cast<const StringDictionaryProxy*>(sd_outer_proxy);
86-
const auto elem_str = sd_inner_dict_proxy->getString(elem);
87-
const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str);
88-
if (outer_id > max_elem || outer_id < min_elem) {
89-
return StringDictionary::INVALID_STR_ID;
90-
}
91-
return outer_id;
92-
}
93-
94-
inline int64_t map_str_id_to_outer_dict(const int64_t inner_elem,
95-
const int64_t min_inner_elem,
96-
const int64_t min_outer_elem,
97-
const int64_t max_outer_elem,
98-
const int32_t* inner_to_outer_translation_map) {
99-
const auto outer_id = inner_to_outer_translation_map[inner_elem - min_inner_elem];
100-
if (outer_id > max_outer_elem || outer_id < min_outer_elem) {
101-
return StringDictionary::INVALID_STR_ID;
102-
}
103-
return outer_id;
104-
}
105-
10657
#if defined(_MSC_VER)
10758
#define DEFAULT_TARGET_ATTRIBUTE
10859
#else

omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ struct JoinChunk {
101101
const int8_t*
102102
col_buff; // actually from AbstractBuffer::getMemoryPtr() via Chunk_NS::Chunk
103103
size_t num_elems;
104+
size_t row_id;
104105
};
105106

106107
struct JoinColumn {

0 commit comments

Comments
 (0)