@@ -678,7 +678,6 @@ bool HashBuild::finishHashBuild() {
678
678
679
679
std::vector<HashBuild*> otherBuilds;
680
680
otherBuilds.reserve (peers.size ());
681
- uint64_t numRows = table_->rows ()->numRows ();
682
681
for (auto & peer : peers) {
683
682
auto op = peer->findOperator (planNodeId ());
684
683
HashBuild* build = dynamic_cast <HashBuild*>(op);
@@ -696,13 +695,10 @@ bool HashBuild::finishHashBuild() {
696
695
!build->stateCleared_ ,
697
696
" Internal state for a peer is empty. It might have already"
698
697
" been closed." );
699
- numRows += build->table_ ->rows ()->numRows ();
700
698
}
701
699
otherBuilds.push_back (build);
702
700
}
703
701
704
- ensureTableFits (numRows);
705
-
706
702
std::vector<std::unique_ptr<BaseHashTable>> otherTables;
707
703
otherTables.reserve (peers.size ());
708
704
SpillPartitionSet spillPartitions;
@@ -723,20 +719,34 @@ bool HashBuild::finishHashBuild() {
723
719
spiller->finishSpill (spillPartitions);
724
720
}
725
721
}
726
- bool allowDuplicateRows = table_->rows ()->nextOffset () != 0 ;
727
- if (allowDuplicateRows) {
728
- ensureNextRowVectorFits (numRows, otherBuilds);
729
- }
730
722
731
723
if (spiller_ != nullptr ) {
732
724
spiller_->finishSpill (spillPartitions);
733
725
removeEmptyPartitions (spillPartitions);
734
726
}
735
727
736
- // TODO: re-enable parallel join build with spilling triggered after
737
- // https://github.com/facebookincubator/velox/issues/3567 is fixed.
728
+ // TODO: Get accurate signal if parallel join build is going to be applied
729
+ // from hash table. Currently there is still a chance inside hash table that
730
+ // it might decide it is not going to trigger parallel join build.
738
731
const bool allowParallelJoinBuild =
739
732
!otherTables.empty () && spillPartitions.empty ();
733
+ ensureTableFits (otherBuilds, otherTables, allowParallelJoinBuild);
734
+
735
+ SCOPE_EXIT {
736
+ // Make a guard to release the unused memory reservation since we have
737
+ // finished the merged table build. The guard makes sure we release the
738
+ // memory reserved for other operators even when exceptions are thrown to
739
+ // prevent memory leak. We cannot rely on other operator's cleanup mechanism
740
+ // because when exceptions are thrown, other operator's cleanup mechanism
741
+ // might already have finished.
742
+ pool ()->release ();
743
+ for (auto * build : otherBuilds) {
744
+ build->pool ()->release ();
745
+ }
746
+ };
747
+
748
+ // TODO: Re-enable parallel join build with spilling triggered after
749
+ // https://github.com/facebookincubator/velox/issues/3567 is fixed.
740
750
CpuWallTiming timing;
741
751
{
742
752
CpuWallTimer cpuWallTimer{timing};
@@ -757,25 +767,19 @@ bool HashBuild::finishHashBuild() {
757
767
if (spillEnabled ()) {
758
768
stateCleared_ = true ;
759
769
}
760
-
761
- // Release the unused memory reservation since we have finished the merged
762
- // table build.
763
- pool ()->release ();
764
- if (allowDuplicateRows) {
765
- for (auto * build : otherBuilds) {
766
- build->pool ()->release ();
767
- }
768
- }
769
770
return true ;
770
771
}
771
772
772
- void HashBuild::ensureTableFits (uint64_t numRows) {
773
+ void HashBuild::ensureTableFits (
774
+ const std::vector<HashBuild*>& otherBuilds,
775
+ const std::vector<std::unique_ptr<BaseHashTable>>& otherTables,
776
+ bool isParallelJoin) {
773
777
// NOTE: we don't need memory reservation if all the partitions have been
774
778
// spilled as nothing need to be built.
775
- if (!spillEnabled () || spiller_ == nullptr || spiller_->isAllSpilled () ||
776
- numRows == 0 ) {
779
+ if (!spillEnabled () || spiller_ == nullptr || spiller_->isAllSpilled ()) {
777
780
return ;
778
781
}
782
+ VELOX_CHECK_EQ (otherBuilds.size (), otherTables.size ());
779
783
780
784
// Test-only spill path.
781
785
if (testingTriggerSpill (pool ()->name ())) {
@@ -784,58 +788,82 @@ void HashBuild::ensureTableFits(uint64_t numRows) {
784
788
return ;
785
789
}
786
790
787
- // NOTE: reserve a bit more memory to consider the extra memory used for
788
- // parallel table build operation.
789
- const uint64_t bytesToReserve = table_->estimateHashTableSize (numRows) * 1.1 ;
791
+ TestValue::adjust (" facebook::velox::exec::HashBuild::ensureTableFits" , this );
792
+
793
+ const auto dupRowOverheadBytes = sizeof (char *) + sizeof (NextRowVector);
794
+
795
+ uint64_t totalNumRows{0 };
796
+ uint64_t lastBuildBytesToReserve{0 };
797
+ bool allowDuplicateRows{false };
790
798
{
791
- Operator::ReclaimableSectionGuard guard (this );
792
- TestValue::adjust (
793
- " facebook::velox::exec::HashBuild::ensureTableFits" , this );
794
- if (pool ()->maybeReserve (bytesToReserve)) {
795
- return ;
799
+ std::lock_guard<std::mutex> l (mutex_);
800
+ const auto numRows = table_->rows ()->numRows ();
801
+ totalNumRows += numRows;
802
+ allowDuplicateRows = table_->rows ()->nextOffset () != 0 ;
803
+ if (allowDuplicateRows) {
804
+ lastBuildBytesToReserve += numRows * dupRowOverheadBytes;
796
805
}
797
806
}
798
807
799
- LOG (WARNING) << " Failed to reserve " << succinctBytes (bytesToReserve)
800
- << " for memory pool " << pool ()->name ()
801
- << " , usage: " << succinctBytes (pool ()->usedBytes ())
802
- << " , reservation: " << succinctBytes (pool ()->reservedBytes ());
803
- }
808
+ for (auto i = 0 ; i < otherTables.size (); i++) {
809
+ auto & otherTable = otherTables[i];
810
+ VELOX_CHECK_NOT_NULL (otherTable);
811
+ auto & otherBuild = otherBuilds[i];
812
+ const auto & rowContainer = otherTable->rows ();
813
+ int64_t numRows{0 };
814
+ {
815
+ std::lock_guard<std::mutex> l (otherBuild->mutex_ );
816
+ numRows = rowContainer->numRows ();
817
+ }
818
+ if (numRows == 0 ) {
819
+ continue ;
820
+ }
804
821
805
- void HashBuild::ensureNextRowVectorFits (
806
- uint64_t numRows,
807
- const std::vector<HashBuild*>& otherBuilds) {
808
- if (!spillEnabled ()) {
809
- return ;
822
+ totalNumRows += numRows;
823
+ if (!allowDuplicateRows) {
824
+ continue ;
825
+ }
826
+
827
+ const auto dupRowBytesToReserve = numRows * dupRowOverheadBytes;
828
+ if (!isParallelJoin) {
829
+ lastBuildBytesToReserve += dupRowBytesToReserve;
830
+ continue ;
831
+ }
832
+
833
+ Operator::ReclaimableSectionGuard guard (otherBuild);
834
+ auto * otherPool = otherBuild->pool ();
835
+
836
+ // Reserve memory for memory allocations for next-row-vectors in
837
+ // otherBuild operators if it is parallel join build. Otherwise all
838
+ // next-row-vectors shall be allocated from the last build operator.
839
+ if (!otherPool->maybeReserve (dupRowBytesToReserve)) {
840
+ LOG (WARNING)
841
+ << " Failed to reserve " << succinctBytes (dupRowBytesToReserve)
842
+ << " for for duplicate row memory allocation from non-last memory pool "
843
+ << otherPool->name ()
844
+ << " , usage: " << succinctBytes (otherPool->usedBytes ())
845
+ << " , reservation: " << succinctBytes (otherPool->reservedBytes ());
846
+ }
810
847
}
811
848
812
- TestValue::adjust (
813
- " facebook::velox::exec::HashBuild::ensureNextRowVectorFits" , this );
849
+ if (totalNumRows == 0 ) {
850
+ return ;
851
+ }
814
852
815
- // The memory allocation for next-row-vectors may stuck in
816
- // 'SharedArbitrator::growCapacity' when memory arbitrating is also
817
- // triggered. Reserve memory for next-row-vectors to prevent this issue.
818
- auto bytesToReserve = numRows * (sizeof (char *) + sizeof (NextRowVector));
853
+ // NOTE: reserve a bit more memory to consider the extra memory used for
854
+ // parallel table build operation.
855
+ lastBuildBytesToReserve += table_->estimateHashTableSize (totalNumRows) * 1.1 ;
819
856
{
820
857
Operator::ReclaimableSectionGuard guard (this );
821
- if (!pool ()->maybeReserve (bytesToReserve)) {
822
- LOG (WARNING) << " Failed to reserve " << succinctBytes (bytesToReserve)
823
- << " for memory pool " << pool ()->name ()
824
- << " , usage: " << succinctBytes (pool ()->usedBytes ())
825
- << " , reservation: "
826
- << succinctBytes (pool ()->reservedBytes ());
827
- }
828
- }
829
- for (auto * build : otherBuilds) {
830
- Operator::ReclaimableSectionGuard guard (build);
831
- if (!build->pool ()->maybeReserve (bytesToReserve)) {
832
- LOG (WARNING) << " Failed to reserve " << succinctBytes (bytesToReserve)
833
- << " for memory pool " << build->pool ()->name ()
834
- << " , usage: " << succinctBytes (build->pool ()->usedBytes ())
835
- << " , reservation: "
836
- << succinctBytes (build->pool ()->reservedBytes ());
858
+ if (pool ()->maybeReserve (lastBuildBytesToReserve)) {
859
+ return ;
837
860
}
838
861
}
862
+
863
+ LOG (WARNING) << " Failed to reserve " << succinctBytes (lastBuildBytesToReserve)
864
+ << " for last build memory pool " << pool ()->name ()
865
+ << " , usage: " << succinctBytes (pool ()->usedBytes ())
866
+ << " , reservation: " << succinctBytes (pool ()->reservedBytes ());
839
867
}
840
868
841
869
void HashBuild::postHashBuildProcess () {
0 commit comments