@@ -2985,82 +2985,169 @@ FetchResult Executor::fetchChunks(
29852985 std::vector<std::vector<const int8_t *>> all_frag_col_buffers;
29862986 std::vector<std::vector<int64_t >> all_num_rows;
29872987 std::vector<std::vector<uint64_t >> all_frag_offsets;
2988- for (const auto & selected_frag_ids : frag_ids_crossjoin) {
2989- std::vector<const int8_t *> frag_col_buffers (
2990- plan_state_->global_to_local_col_ids_ .size ());
2991- for (const auto & col_id : col_global_ids) {
2992- if (interrupted_.load ()) {
2993- throw QueryExecutionError (ERR_INTERRUPTED);
2994- }
2995- CHECK (col_id);
2996- if (col_id->isVirtual ()) {
2997- continue ;
2998- }
2999- const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3000- CHECK (fragments_it != all_tables_fragments.end ());
3001- const auto fragments = fragments_it->second ;
3002- auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3003- CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3004- CHECK_LT (static_cast <size_t >(it->second ),
3005- plan_state_->global_to_local_col_ids_ .size ());
3006- const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3007- if (!fragments->size ()) {
3008- return {};
3009- }
3010- auto memory_level_for_column = memory_level;
3011- if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3012- plan_state_->columns_to_fetch_ .end ()) {
3013- memory_level_for_column = Data_Namespace::CPU_LEVEL;
3014- }
3015- if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3016- // determine if we need special treatment to linearlize multi-frag table
3017- // i.e., a column that is classified as varlen type, i.e., array
3018- // for now, we can support more types in this way
3019- if (needLinearizeAllFragments (
3020- *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3021- bool for_lazy_fetch = false ;
3022- if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3023- plan_state_->columns_to_not_fetch_ .end ()) {
3024- for_lazy_fetch = true ;
3025- VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3026- << col_id->getColId () << " )" ;
2988+ if (memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL){
2989+ std::mutex all_frag;
2990+ tbb::task_arena limitedArena (16 );
2991+ limitedArena.execute ([&]() {
2992+ tbb::parallel_for_each (
2993+ frag_ids_crossjoin.begin (),
2994+ frag_ids_crossjoin.end (),
2995+ [&](const std::vector<size_t >& selected_frag_ids) {
2996+ // for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2997+ std::vector<const int8_t *> frag_col_buffers (
2998+ plan_state_->global_to_local_col_ids_ .size ());
2999+ for (const auto & col_id : col_global_ids) {
3000+ if (interrupted_.load ()) {
3001+ throw QueryExecutionError (ERR_INTERRUPTED);
3002+ }
3003+ CHECK (col_id);
3004+ if (col_id->isVirtual ()) {
3005+ continue ;
3006+ }
3007+ const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3008+ CHECK (fragments_it != all_tables_fragments.end ());
3009+ const auto fragments = fragments_it->second ;
3010+ auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3011+ CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3012+ CHECK_LT (static_cast <size_t >(it->second ),
3013+ plan_state_->global_to_local_col_ids_ .size ());
3014+ const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3015+ if (!fragments->size ()) {
3016+ continue ;
3017+ }
3018+ auto memory_level_for_column = memory_level;
3019+ if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3020+ plan_state_->columns_to_fetch_ .end ()) {
3021+ memory_level_for_column = Data_Namespace::CPU_LEVEL;
3022+ }
3023+ if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3024+ // determine if we need special treatment to linearlize multi-frag table
3025+ // i.e., a column that is classified as varlen type, i.e., array
3026+ // for now, we can support more types in this way
3027+ if (needLinearizeAllFragments (
3028+ *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3029+ bool for_lazy_fetch = false ;
3030+ if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3031+ plan_state_->columns_to_not_fetch_ .end ()) {
3032+ for_lazy_fetch = true ;
3033+ VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3034+ << col_id->getColId () << " )" ;
3035+ }
3036+ frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3037+ col_id->getColInfo (),
3038+ all_tables_fragments,
3039+ chunks,
3040+ chunk_iterators,
3041+ for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3042+ for_lazy_fetch ? 0 : device_id,
3043+ device_allocator,
3044+ thread_idx);
3045+ } else {
3046+ frag_col_buffers[it->second ] =
3047+ column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3048+ all_tables_fragments,
3049+ memory_level_for_column,
3050+ device_id,
3051+ device_allocator,
3052+ thread_idx);
3053+ }
3054+ } else {
3055+ frag_col_buffers[it->second ] =
3056+ column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3057+ frag_id,
3058+ all_tables_fragments,
3059+ chunks,
3060+ chunk_iterators,
3061+ memory_level_for_column,
3062+ device_id,
3063+ device_allocator);
3064+ }
3065+ }
3066+ all_frag.lock ();
3067+ all_frag_col_buffers.push_back (frag_col_buffers);
3068+ all_frag.unlock ();
3069+ });
3070+ });
3071+ } else {
3072+ for (const auto & selected_frag_ids : frag_ids_crossjoin) {
3073+ std::vector<const int8_t *> frag_col_buffers (
3074+ plan_state_->global_to_local_col_ids_ .size ());
3075+ for (const auto & col_id : col_global_ids) {
3076+ if (interrupted_.load ()) {
3077+ throw QueryExecutionError (ERR_INTERRUPTED);
3078+ }
3079+ CHECK (col_id);
3080+ if (col_id->isVirtual ()) {
3081+ continue ;
3082+ }
3083+ const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3084+ CHECK (fragments_it != all_tables_fragments.end ());
3085+ const auto fragments = fragments_it->second ;
3086+ auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3087+ CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3088+ CHECK_LT (static_cast <size_t >(it->second ),
3089+ plan_state_->global_to_local_col_ids_ .size ());
3090+ const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3091+ if (!fragments->size ()) {
3092+ return {};
3093+ }
3094+ auto memory_level_for_column = memory_level;
3095+ if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3096+ plan_state_->columns_to_fetch_ .end ()) {
3097+ memory_level_for_column = Data_Namespace::CPU_LEVEL;
3098+ }
3099+ if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3100+ // determine if we need special treatment to linearlize multi-frag table
3101+ // i.e., a column that is classified as varlen type, i.e., array
3102+ // for now, we can support more types in this way
3103+ if (needLinearizeAllFragments (
3104+ *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3105+ bool for_lazy_fetch = false ;
3106+ if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3107+ plan_state_->columns_to_not_fetch_ .end ()) {
3108+ for_lazy_fetch = true ;
3109+ VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3110+ << col_id->getColId () << " )" ;
3111+ }
3112+ frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3113+ col_id->getColInfo (),
3114+ all_tables_fragments,
3115+ chunks,
3116+ chunk_iterators,
3117+ for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3118+ for_lazy_fetch ? 0 : device_id,
3119+ device_allocator,
3120+ thread_idx);
3121+ } else {
3122+ frag_col_buffers[it->second ] =
3123+ column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3124+ all_tables_fragments,
3125+ memory_level_for_column,
3126+ device_id,
3127+ device_allocator,
3128+ thread_idx);
3129+ }
3130+ } else {
3131+ auto timer1 = DEBUG_TIMER (" getOneTableColumnFragment" );
3132+ frag_col_buffers[it->second ] =
3133+ column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3134+ frag_id,
3135+ all_tables_fragments,
3136+ chunks,
3137+ chunk_iterators,
3138+ memory_level_for_column,
3139+ device_id,
3140+ device_allocator);
3141+ timer1.stop ();
3142+ }
3143+ }
3144+ all_frag_col_buffers.push_back (frag_col_buffers);
30273145 }
3028- frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3029- col_id->getColInfo (),
3030- all_tables_fragments,
3031- chunks,
3032- chunk_iterators,
3033- for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3034- for_lazy_fetch ? 0 : device_id,
3035- device_allocator,
3036- thread_idx);
3037- } else {
3038- frag_col_buffers[it->second ] =
3039- column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3040- all_tables_fragments,
3041- memory_level_for_column,
3042- device_id,
3043- device_allocator,
3044- thread_idx);
3045- }
3046- } else {
3047- auto timer1 = DEBUG_TIMER (" getOneTableColumnFragment" );
3048- frag_col_buffers[it->second ] =
3049- column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3050- frag_id,
3051- all_tables_fragments,
3052- chunks,
3053- chunk_iterators,
3054- memory_level_for_column,
3055- device_id,
3056- device_allocator);
3057- timer1.stop ();
3058- }
3059- }
3060- all_frag_col_buffers.push_back (frag_col_buffers);
30613146 }
30623147 std::tie (all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags (
30633148 ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs , all_tables_fragments);
3149+ CHECK_EQ (all_num_rows.size (), all_frag_col_buffers.size ());
3150+ CHECK_EQ (all_frag_offsets.size (), all_frag_col_buffers.size ());
30643151 return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
30653152}
30663153
0 commit comments