[SPARK-57487][SQL] Support distributed map join for medium-sized tables via SQL hint#56542
[SPARK-57487][SQL] Support distributed map join for medium-sized tables via SQL hint#56542yugan95 wants to merge 3 commits into
Conversation
…d tables via SQL hint
sunchao
left a comment
There was a problem hiding this comment.
Summary
I reviewed the exact current head, 50a75f47be60f89210c1dc2d8974ab63a8298ad1, and then re-validated each candidate finding from the reachable Spark runtime path before posting this review. The overall idea is promising for very large fact-table joins, but the current implementation has correctness, security, fault-tolerance, and resource-lifecycle blockers that need to be addressed before it is safe to merge.
Prior state and problem
Spark currently has a gap between broadcast joins and shuffle joins. A broadcast hash join avoids shuffling the large probe side, but requires the complete build side to fit on every executor. Once the build side becomes too large or risky to broadcast, Spark generally falls back to a shuffle-based join and pays the cost of repartitioning the large probe table.
This PR targets the useful middle ground where the probe side is enormous and the build side is medium-sized: too large to broadcast everywhere, but small enough to partition into a limited number of remotely queryable hash-table shards.
Design approach
The patch adds a new DISTMAPJOIN hint and a shard service integrated with Spark Core and AQE. The build side is hash-partitioned into HashedRelation shards, optionally replicated, and accompanied by a merged Bloom filter. Probe tasks keep their original partitioning, batch lookup keys by shard, and use asynchronous Netty RPCs to fetch matching build rows.
This replaces the probe-side shuffle with a new distributed-state and RPC subsystem. That tradeoff can be valuable, but it also means authentication, ordering metadata, task cancellation, replica durability, response bounds, cleanup, and executor-loss behavior become correctness contracts of the SQL operator.
Correctness / compatibility analysis
I found the following introduced issues:
-
[P1] Clear the inherited output ordering.
DistributedMapJoinExec.scala:118preserves the probe partitioning but inheritsHashJoin.outputOrdering, which claims the probe ordering is preserved. It is not: a Bloom-negative row is emitted immediately while an earlier positive row remains buffered, and completed lookup batches are consumed in callback order. For example, a sorted left-outer probe can emit a later missing key before an earlier matching key.EnsureRequirements,RemoveRedundantSorts, window operators, and sort-merge joins trust this metadata and can omit a required sort, producing incorrect results. This operator should report no ordering unless responses are explicitly resequenced. -
[P1] Authenticate the shard lookup transport.
NettyShardLookupService.scala:68constructs a client factory withoutAuthClientBootstrapand deliberately starts the server with an empty bootstrap list at lines 78-84. UnlikeNettyBlockTransferService, it is not given aSecurityManager, does not apply the RPC SSL options, andNettyShardRpcServernever validates itsappId. Thereforespark.authenticate=truedoes not protect this executor-facing port: any peer that can reach it can issue lookup requests and retrieve shard rows or consume executor resources. -
[P1] Always create a shard exchange for a one-shard build. In
EnsureRequirements.scala:70, a build child withSinglePartitionalready satisfiesShardDistribution(requiredNumPartitions = 1), so the branch that createsShardExchangeExecat line 128 is skipped.DistributedMapJoinExec.resolveShardSetReflater receives the original build plan and throwsIllegalStateExceptionbecause no shard set exists.shard_count=1is accepted by the hint parser and is not covered by the new tests. -
[P1] Close outstanding lookups before freeing task pages.
DistributedMapJoinExec.scala:416registers onlymap.free()as its task-completion cleanup. Response ownership is transferred to the callback and released only whenBatchMatchReaderis fully exhausted, while callbacks continue enqueueing responses after submission. A normalLIMIT,take, cancellation, or lookup exception can abandon the current reader plus queued and future responses. This leaks pooled response buffers; additionally,Future { batch.wrapKeysBuffer() }may still serialize a request from Tungsten pages after the completion listener has freed those pages. The iterator needs task-scoped cancellation/draining and must release every batch and response before freeing the backing pages. -
[P1] Do not complete the exchange before requested replicas are installed.
ShardExchangeExec.scala:153requests replicas, but bothShardManagerMaster.installReplicaSetandShardManagerMasterEndpoint.scala:219discard their RPC futures. The exchange nevertheless marks its promise successful at line 174. If the primary executor is lost before a requested replica finishes and reports its location,removeShardManagerremoves the only holder and can no longer seed another copy. Probe-task retries continue using the already-materialized shard stage and cannot recover it. Thus evenreplica_count=2does not currently establish the advertised durability before probe execution begins. -
[P1] Exclude the driver from replica placement outside local mode. The driver initializes and registers a normal shard manager, while
ShardManagerMasterEndpoint.scala:198selects replica candidates from every registered manager. The suppliedisLocalflag is unused. Consequently, ordinary replica placement can copy and deserialize gigabyte-scale shards onto the driver and make it serve probe traffic. With two executors andreplica_count=3, the driver necessarily receives every shard; with more executors it can still be selected by the scoring function. This creates a direct driver-OOM/application-loss path and contradicts the stated executor-based placement model. -
[P1] Gate distributed map join on
hashJoinSupported.SparkStrategies.scala:292does not apply thehashJoinSupportedcheck used by broadcast and shuffled hash joins.RewriteCollationJoinhandles direct attribute equality, but non-binary-stable expression keys remain reachable—for example, equality on nestedSTRING COLLATE UTF8_LCASEfields producesGetStructFieldkeys rather thanAttributeReferencekeys. A hinted join on those keys constructsDistributedMapJoinExecand trips the assertions inHashJoin; without those assertions, the Bloom filter and rawUnsafeRowlookup would implement binary rather than collation equality. The hint should be rejected with the existing warning/fallback behavior. -
[P2] Bound or stream lookup responses.
HashedRelationAdapter.scala:67allocates one pooled directByteBufand appends every matching build row for every key before returning anything.maxBatchSizelimits probe-key count, not response cardinality. A single hot key can make one response approach the whole shard, and the 16 server workers can materialize several such buffers concurrently. This should use chunked/streamed responses or enforce a strict byte bound with backpressure. -
[P2] Clean shard data on both failed and successful exchange lifecycles.
ShardExchangeExec.scala:115allocates the set before tasks installMEMORY_AND_DISKblocks, but registers cleanup only after collection and Bloom merging succeed at line 175; the catch block only fails the promise. A failed build therefore leaves partially installed shards for the lifetime of the application and dynamic allocation treats their executors as cached. On successful cleanup,ShardManager.unpersistremoves BlockManager data but never removesshardSetInfo,shardSetLocations, or per-manager_shardsentries fromShardManagerMasterEndpoint. Long-lived applications accumulate permanent driver metadata, and later executor removal can try to replicate already-deleted shards. -
[P2] Enforce the in-flight limit for tail batches and validate it as positive. EOF calls
flushLookupover every per-shard tail batch, andflushLookupsubmits them all before returning. With 100 shards this can launch 100 RPCs per task despitemaxInFlightNum=8. Separately,SQLConf.scala:7491accepts zero or negative values; then line 515 polls an empty queue forever before the first request is submitted. Submission should be throttled centrally and the config should require a value greater than zero. -
[P2] Release the per-attempt request retain when connection creation fails.
ShardManager.scala:293retains the request once per location attempt and relies on Netty outbound ownership to release it. IfclientFactory.createClientthrows beforesendManagedRpc, the catch reports failure but does not release that retained reference. Retries add further retains, while the final chain releases only the original base reference. Stale locations after executor loss therefore leak one pooled request buffer per failed pre-send attempt.
Current CI independently confirms three additional merge blockers on this head:
- MiMa reports that inserting
shardManagerFactoryinto the publicSparkEnvconstructor removes the previous JVM signature. SparkConfigBindingPolicySuitereports thatspark.shard.enabledand all four new SQL configs lack mandatory binding policies.ContextCleanerSuitefails because insertingshardSetIdsbeforecheckpointIdssilently reinterprets the existing positional call at line 212 as shard set1.
Key design decisions
- Keeping the strategy explicit through a hint is appropriate for a first version because the cost boundary depends on build size, probe size, match rate, fanout, and RPC latency.
- The merged Bloom filter is important: this strategy is most attractive when it can reject many probe keys without an RPC.
- Shard replication is the right mechanism for long-running jobs, but it must be a completed, observable exchange invariant rather than best-effort background work.
- A separate transport is reasonable only if it inherits Spark's authentication, encryption, buffer-ownership, cancellation, and metrics contracts.
Implementation sketch
The physical flow is: build-side hash shuffle -> per-shard HashedRelation materialization -> optional replica installation -> merged Bloom filter -> probe scan without repartitioning -> per-shard batched RPC lookup -> local join result assembly. AQE wraps the build exchange as ShardQueryStageExec and exchange reuse can share the materialized shard set.
The happy-path join semantics receive useful coverage across join types and AQE modes. The missing coverage is concentrated in metadata contracts and adverse lifecycle paths: ordered probes, one-shard input, collated keys, authentication, early termination, executor loss during replica installation, skewed/high-fanout responses, and failed-exchange cleanup.
Behavioral changes worth calling out
- Enabling the feature opens a new executor-facing network port with its own protocol and memory behavior.
- Probe-side row order is not preserved even though the current physical metadata says it is.
- Shard state is not shuffle output, so Spark's normal lost-shuffle recovery does not rebuild it automatically.
- Dynamic allocation must retain live shard holders but release them promptly and completely after both success and failure.
- The strategy can replace shuffle bytes with very high lookup QPS and response traffic; match selectivity and hot-key fanout are essential operational constraints.
Suggested improvements
I suggest addressing this in layers:
- Fix the deterministic correctness and security blockers: output ordering, one-shard planning, key support, and authenticated/encrypted transport.
- Make replica installation part of exchange completion, exclude the driver from placement, and define how a lost last replica invalidates/rebuilds the query stage.
- Give every request, response, batch, and task page an explicit cancellation-safe ownership lifecycle; add response byte limits or streaming.
- Add symmetric cleanup for failed and successful exchanges, including all master metadata, and centralize backpressure/config validation.
- Add targeted tests for every failure path above, then benchmark not only wall time but also RPC QPS/bytes, direct memory, skew behavior, and executor-loss recovery.
The optimization is valuable enough to pursue, but I do not think the current patch is safe to merge until these contracts are closed.
|
@yugan95 I'm not a PMC member nor a maintainer of SQL area (I mainly deal with Structured Streaming), but given the large scope of change across multiple modules with huge code diff while addressing a specific use case, I wonder we should step back and make a consensus about the direction in prior. Apache Spark has a process for this - https://spark.apache.org/improvement-proposals.html The main purpose is to build a consensus on the community that the improvement is something we want to adopt. The Heilmeier form isn't purposed to bring up detailed design, but high-level design is appreciated (this change obviously warrants it since new distributed data exchange with RPC is introduced). Also probably need a much clearer answer about "when" users will be benefited by this change, especially that this is "opt-in" than opt-out. 5TB vs 2GB example in the JIRA ticket doesn't feel like a very general case, or might need more data about the trade-off between the cost of eliminating shuffle vs retrieving data via remote RPC instead of pre-loading the whole shard after shuffle - simply saying, do we have an answer for this, if you were users which criteria warrants this feature to be enabled? The process requires one PMC member to be a shepherd - if you don't have one to contact and co-work with this process, probably start with dev@ mailing list with empty shepherd, and I assume you can find volunteer as long as your proposal is on consensus to the shape of "good to go". Thanks again! |
|
Thanks @sunchao for the thorough review! All issues except issue 8 are addressed in the latest push. Issue 1 - outputOrdering — Overrode to Issue 2 - Shard transport auth — Issue 3 - SinglePartition vs ShardDistribution(1) — Added Issue 4 - Task cancellation buffer leak — Added Issue 5 - Replica installation sync — Issue 6 - Driver exclusion — Candidates filter out Issue 7 - hashJoinSupported — Issue 8 - Response size bound — Will address as a follow-up. Adding chunked responses requires protocol changes that deserve a separate review. Issue 9 - Exchange cleanup — Failed exchanges call Issue 10 - EOF flush throttling — Issue 11 - Request retain leak — Added CI fixes (binding policies, MiMa, ContextCleanerSuite, Java checkstyle) also included. |
|
@HeartSaVioR Thanks for the suggestion on the SPIP process. A couple of clarifications on the scope and entry point: This feature is double-gated — it requires The use case is the gap between broadcast join and shuffle join: the build side is too large to broadcast (tens of GB) but the probe side is very large (PB-scale in our case — a minor correction to the JIRA ticket which mentions 5PB, not 5TB). Shuffling the probe side at that scale is expensive. Distributed map join avoids the probe-side shuffle by partitioning the build side into remotely queryable shards, with a Bloom filter to reduce RPC volume. This has been running stably in our production environment. I'm happy to draft a SPIP proposal if the community feels that's the right process for this scope. Also glad to provide more detailed benchmarks on the trade-off between shuffle cost and RPC cost across different data profiles. Open to either path. |
|
That sounds to me even bigger scope than what I thought - should the shard infrastructure be also opt-in? What's the downside of making it by default? Is it a separate process than executor or inlined to the executor? If it's a separate process, then how the Spark infra launches them? This obviously needs a design doc (there seems to be meaningful number of design decisions/options you might already went through while someone might want to revisit), and "Since it's opt-in, this doesn't impact others unless they opt-in" does not simply justify an addition of the feature. Again, I'm not an expert of this area, but I feel like folks in SQL area might be a same page that this warrants SPIP. cc. @cloud-fan @dtenedor FYI |
|
@HeartSaVioR Thanks for the follow-up questions. To clarify the shard infrastructure: it is fully inlined in the executor process — just an additional Netty server and a few RPC endpoints started inside The reason it defaults to off is that the Netty server opens an additional port on each executor. Keeping it opt-in felt more appropriate rather than starting a new listener by default. I understand the concern about scope and I'm open to drafting a SPIP. Will wait to hear from the SQL area maintainers as well. |
What changes were proposed in this pull request?
This PR introduces Distributed Map Join (DMJ), a new join strategy that avoids full probe-side shuffle by building a distributed hash table service for the build side. The probe side performs batched RPC lookups instead of shuffling.
How it works:
HashedRelationon executors, with configurable replica placement for fault tolerance.The strategy is triggered explicitly via SQL hint:
Key components:
ShardManager/ShardManagerMasterShardLookupServicespark.shard.service)ShardExchangeExecDistributedMapJoinExecShardQueryStageExecShardDistribution/ShardPartitioningBufferedShardRowMapResolveHints(extended)DISTMAPJOINSupported join types: Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin.
Why are the changes needed?
Broadcast hash join is limited by driver memory and the executor broadcast threshold — build-side tables between ~200MB and 2GB often fall back to expensive shuffle joins, causing massive data movement on the probe side.
Production benchmark (16k cores, 120TB memory cluster, DRA enabled):
~60% wall-time reduction by eliminating the probe-side shuffle entirely.
This is similar in spirit to Hive's map join but distributed — no single-node bottleneck.
Does this PR introduce any user-facing change?
Yes. Adds a new SQL hint
DISTMAPJOINand the following configs:spark.shard.enabledfalsespark.sql.execution.distributedMapJoin.maxInFlightNum8spark.sql.execution.distributedMapJoin.maxBatchSize1024spark.sql.execution.distributedMapJoin.bloomFilterCapacity5242880spark.sql.execution.distributedMapJoin.exchangeTimeout30minHow was this patch tested?
DistributedMapJoinSuite: end-to-end tests onlocal-cluster[2,1,512]covering inner join, left/right outer join, left semi join, left anti join, with and without join conditions, null key handling, and multi-column join keys.DistributedMapJoinSuiteAE: same test coverage with AQE enabled.ContextCleanerSuite: extended to cover shard-set cleanup.CachedTableSuite: extended withshardSetCleanedlistener stub.