Skip to content

[SPARK-57487][SQL] Support distributed map join for medium-sized tables via SQL hint#56542

Open
yugan95 wants to merge 3 commits into
apache:masterfrom
yugan95:mapjoin-0615
Open

[SPARK-57487][SQL] Support distributed map join for medium-sized tables via SQL hint#56542
yugan95 wants to merge 3 commits into
apache:masterfrom
yugan95:mapjoin-0615

Conversation

@yugan95

@yugan95 yugan95 commented Jun 16, 2026

Copy link
Copy Markdown

What changes were proposed in this pull request?

This PR introduces Distributed Map Join (DMJ), a new join strategy that avoids full probe-side shuffle by building a distributed hash table service for the build side. The probe side performs batched RPC lookups instead of shuffling.

How it works:

  1. Build side is hash-partitioned into a fixed number of shards and materialized as HashedRelation on executors, with configurable replica placement for fault tolerance.
  2. A bloom filter is constructed over the build-side keys for probe-side pre-filtering.
  3. Probe-side tasks buffer keys into batches, send async RPC lookups to the shard-holding executors, and stream matched rows back — no shuffle required on the probe side.

The strategy is triggered explicitly via SQL hint:

SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ *
FROM probe_table JOIN build_table ON probe_table.key = build_table.key

Key components:

Component Module Description
ShardManager / ShardManagerMaster core Shard lifecycle, location tracking, replica placement
ShardLookupService core Netty-based RPC layer for shard data transfer (pluggable via spark.shard.service)
ShardExchangeExec sql/core Physical exchange operator that partitions build side into shards
DistributedMapJoinExec sql/core Physical join operator with async batched RPC lookups
ShardQueryStageExec sql/core AQE integration
ShardDistribution / ShardPartitioning sql/catalyst Distribution and partitioning for shard exchange
BufferedShardRowMap sql/core Off-heap key-value buffer for batching probe-side lookups
ResolveHints (extended) sql/catalyst Hint parsing for DISTMAPJOIN

Supported join types: Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin.

Why are the changes needed?

Broadcast hash join is limited by driver memory and the executor broadcast threshold — build-side tables between ~200MB and 2GB often fall back to expensive shuffle joins, causing massive data movement on the probe side.

Production benchmark (16k cores, 120TB memory cluster, DRA enabled):

Probe side Build side Strategy Wall time
Before ~5 PB ~2 GB Shuffle hash join (shuffle write + join) ~5 hours
After ~5 PB ~2 GB Distributed map join (join only) ~2 hours

~60% wall-time reduction by eliminating the probe-side shuffle entirely.

This is similar in spirit to Hive's map join but distributed — no single-node bottleneck.

Does this PR introduce any user-facing change?

Yes. Adds a new SQL hint DISTMAPJOIN and the following configs:

Config Default Description
spark.shard.enabled false Enable shard service infrastructure at launch
spark.sql.execution.distributedMapJoin.maxInFlightNum 8 Max concurrent RPC lookup batches per task
spark.sql.execution.distributedMapJoin.maxBatchSize 1024 Max probe-side keys per RPC batch
spark.sql.execution.distributedMapJoin.bloomFilterCapacity 5242880 Expected distinct keys per shard for bloom filter sizing
spark.sql.execution.distributedMapJoin.exchangeTimeout 30min Timeout for building shard data

How was this patch tested?

  • DistributedMapJoinSuite: end-to-end tests on local-cluster[2,1,512] covering inner join, left/right outer join, left semi join, left anti join, with and without join conditions, null key handling, and multi-column join keys.
  • DistributedMapJoinSuiteAE: same test coverage with AQE enabled.
  • ContextCleanerSuite: extended to cover shard-set cleanup.
  • CachedTableSuite: extended with shardSetCleaned listener stub.

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

I reviewed the exact current head, 50a75f47be60f89210c1dc2d8974ab63a8298ad1, and then re-validated each candidate finding from the reachable Spark runtime path before posting this review. The overall idea is promising for very large fact-table joins, but the current implementation has correctness, security, fault-tolerance, and resource-lifecycle blockers that need to be addressed before it is safe to merge.

Prior state and problem

Spark currently has a gap between broadcast joins and shuffle joins. A broadcast hash join avoids shuffling the large probe side, but requires the complete build side to fit on every executor. Once the build side becomes too large or risky to broadcast, Spark generally falls back to a shuffle-based join and pays the cost of repartitioning the large probe table.

This PR targets the useful middle ground where the probe side is enormous and the build side is medium-sized: too large to broadcast everywhere, but small enough to partition into a limited number of remotely queryable hash-table shards.

Design approach

The patch adds a new DISTMAPJOIN hint and a shard service integrated with Spark Core and AQE. The build side is hash-partitioned into HashedRelation shards, optionally replicated, and accompanied by a merged Bloom filter. Probe tasks keep their original partitioning, batch lookup keys by shard, and use asynchronous Netty RPCs to fetch matching build rows.

This replaces the probe-side shuffle with a new distributed-state and RPC subsystem. That tradeoff can be valuable, but it also means authentication, ordering metadata, task cancellation, replica durability, response bounds, cleanup, and executor-loss behavior become correctness contracts of the SQL operator.

Correctness / compatibility analysis

I found the following introduced issues:

  1. [P1] Clear the inherited output ordering. DistributedMapJoinExec.scala:118 preserves the probe partitioning but inherits HashJoin.outputOrdering, which claims the probe ordering is preserved. It is not: a Bloom-negative row is emitted immediately while an earlier positive row remains buffered, and completed lookup batches are consumed in callback order. For example, a sorted left-outer probe can emit a later missing key before an earlier matching key. EnsureRequirements, RemoveRedundantSorts, window operators, and sort-merge joins trust this metadata and can omit a required sort, producing incorrect results. This operator should report no ordering unless responses are explicitly resequenced.

  2. [P1] Authenticate the shard lookup transport. NettyShardLookupService.scala:68 constructs a client factory without AuthClientBootstrap and deliberately starts the server with an empty bootstrap list at lines 78-84. Unlike NettyBlockTransferService, it is not given a SecurityManager, does not apply the RPC SSL options, and NettyShardRpcServer never validates its appId. Therefore spark.authenticate=true does not protect this executor-facing port: any peer that can reach it can issue lookup requests and retrieve shard rows or consume executor resources.

  3. [P1] Always create a shard exchange for a one-shard build. In EnsureRequirements.scala:70, a build child with SinglePartition already satisfies ShardDistribution(requiredNumPartitions = 1), so the branch that creates ShardExchangeExec at line 128 is skipped. DistributedMapJoinExec.resolveShardSetRef later receives the original build plan and throws IllegalStateException because no shard set exists. shard_count=1 is accepted by the hint parser and is not covered by the new tests.

  4. [P1] Close outstanding lookups before freeing task pages. DistributedMapJoinExec.scala:416 registers only map.free() as its task-completion cleanup. Response ownership is transferred to the callback and released only when BatchMatchReader is fully exhausted, while callbacks continue enqueueing responses after submission. A normal LIMIT, take, cancellation, or lookup exception can abandon the current reader plus queued and future responses. This leaks pooled response buffers; additionally, Future { batch.wrapKeysBuffer() } may still serialize a request from Tungsten pages after the completion listener has freed those pages. The iterator needs task-scoped cancellation/draining and must release every batch and response before freeing the backing pages.

  5. [P1] Do not complete the exchange before requested replicas are installed. ShardExchangeExec.scala:153 requests replicas, but both ShardManagerMaster.installReplicaSet and ShardManagerMasterEndpoint.scala:219 discard their RPC futures. The exchange nevertheless marks its promise successful at line 174. If the primary executor is lost before a requested replica finishes and reports its location, removeShardManager removes the only holder and can no longer seed another copy. Probe-task retries continue using the already-materialized shard stage and cannot recover it. Thus even replica_count=2 does not currently establish the advertised durability before probe execution begins.

  6. [P1] Exclude the driver from replica placement outside local mode. The driver initializes and registers a normal shard manager, while ShardManagerMasterEndpoint.scala:198 selects replica candidates from every registered manager. The supplied isLocal flag is unused. Consequently, ordinary replica placement can copy and deserialize gigabyte-scale shards onto the driver and make it serve probe traffic. With two executors and replica_count=3, the driver necessarily receives every shard; with more executors it can still be selected by the scoring function. This creates a direct driver-OOM/application-loss path and contradicts the stated executor-based placement model.

  7. [P1] Gate distributed map join on hashJoinSupported. SparkStrategies.scala:292 does not apply the hashJoinSupported check used by broadcast and shuffled hash joins. RewriteCollationJoin handles direct attribute equality, but non-binary-stable expression keys remain reachable—for example, equality on nested STRING COLLATE UTF8_LCASE fields produces GetStructField keys rather than AttributeReference keys. A hinted join on those keys constructs DistributedMapJoinExec and trips the assertions in HashJoin; without those assertions, the Bloom filter and raw UnsafeRow lookup would implement binary rather than collation equality. The hint should be rejected with the existing warning/fallback behavior.

  8. [P2] Bound or stream lookup responses. HashedRelationAdapter.scala:67 allocates one pooled direct ByteBuf and appends every matching build row for every key before returning anything. maxBatchSize limits probe-key count, not response cardinality. A single hot key can make one response approach the whole shard, and the 16 server workers can materialize several such buffers concurrently. This should use chunked/streamed responses or enforce a strict byte bound with backpressure.

  9. [P2] Clean shard data on both failed and successful exchange lifecycles. ShardExchangeExec.scala:115 allocates the set before tasks install MEMORY_AND_DISK blocks, but registers cleanup only after collection and Bloom merging succeed at line 175; the catch block only fails the promise. A failed build therefore leaves partially installed shards for the lifetime of the application and dynamic allocation treats their executors as cached. On successful cleanup, ShardManager.unpersist removes BlockManager data but never removes shardSetInfo, shardSetLocations, or per-manager _shards entries from ShardManagerMasterEndpoint. Long-lived applications accumulate permanent driver metadata, and later executor removal can try to replicate already-deleted shards.

  10. [P2] Enforce the in-flight limit for tail batches and validate it as positive. EOF calls flushLookup over every per-shard tail batch, and flushLookup submits them all before returning. With 100 shards this can launch 100 RPCs per task despite maxInFlightNum=8. Separately, SQLConf.scala:7491 accepts zero or negative values; then line 515 polls an empty queue forever before the first request is submitted. Submission should be throttled centrally and the config should require a value greater than zero.

  11. [P2] Release the per-attempt request retain when connection creation fails. ShardManager.scala:293 retains the request once per location attempt and relies on Netty outbound ownership to release it. If clientFactory.createClient throws before sendManagedRpc, the catch reports failure but does not release that retained reference. Retries add further retains, while the final chain releases only the original base reference. Stale locations after executor loss therefore leak one pooled request buffer per failed pre-send attempt.

Current CI independently confirms three additional merge blockers on this head:

  • MiMa reports that inserting shardManagerFactory into the public SparkEnv constructor removes the previous JVM signature.
  • SparkConfigBindingPolicySuite reports that spark.shard.enabled and all four new SQL configs lack mandatory binding policies.
  • ContextCleanerSuite fails because inserting shardSetIds before checkpointIds silently reinterprets the existing positional call at line 212 as shard set 1.

Key design decisions

  • Keeping the strategy explicit through a hint is appropriate for a first version because the cost boundary depends on build size, probe size, match rate, fanout, and RPC latency.
  • The merged Bloom filter is important: this strategy is most attractive when it can reject many probe keys without an RPC.
  • Shard replication is the right mechanism for long-running jobs, but it must be a completed, observable exchange invariant rather than best-effort background work.
  • A separate transport is reasonable only if it inherits Spark's authentication, encryption, buffer-ownership, cancellation, and metrics contracts.

Implementation sketch

The physical flow is: build-side hash shuffle -> per-shard HashedRelation materialization -> optional replica installation -> merged Bloom filter -> probe scan without repartitioning -> per-shard batched RPC lookup -> local join result assembly. AQE wraps the build exchange as ShardQueryStageExec and exchange reuse can share the materialized shard set.

The happy-path join semantics receive useful coverage across join types and AQE modes. The missing coverage is concentrated in metadata contracts and adverse lifecycle paths: ordered probes, one-shard input, collated keys, authentication, early termination, executor loss during replica installation, skewed/high-fanout responses, and failed-exchange cleanup.

Behavioral changes worth calling out

  • Enabling the feature opens a new executor-facing network port with its own protocol and memory behavior.
  • Probe-side row order is not preserved even though the current physical metadata says it is.
  • Shard state is not shuffle output, so Spark's normal lost-shuffle recovery does not rebuild it automatically.
  • Dynamic allocation must retain live shard holders but release them promptly and completely after both success and failure.
  • The strategy can replace shuffle bytes with very high lookup QPS and response traffic; match selectivity and hot-key fanout are essential operational constraints.

Suggested improvements

I suggest addressing this in layers:

  1. Fix the deterministic correctness and security blockers: output ordering, one-shard planning, key support, and authenticated/encrypted transport.
  2. Make replica installation part of exchange completion, exclude the driver from placement, and define how a lost last replica invalidates/rebuilds the query stage.
  3. Give every request, response, batch, and task page an explicit cancellation-safe ownership lifecycle; add response byte limits or streaming.
  4. Add symmetric cleanup for failed and successful exchanges, including all master metadata, and centralize backpressure/config validation.
  5. Add targeted tests for every failure path above, then benchmark not only wall time but also RPC QPS/bytes, direct memory, skew behavior, and executor-loss recovery.

The optimization is valuable enough to pursue, but I do not think the current patch is safe to merge until these contracts are closed.

@HeartSaVioR

HeartSaVioR commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

@yugan95
First of all, welcome to Apache Spark community and thanks for your first contribution!

I'm not a PMC member nor a maintainer of SQL area (I mainly deal with Structured Streaming), but given the large scope of change across multiple modules with huge code diff while addressing a specific use case, I wonder we should step back and make a consensus about the direction in prior.

Apache Spark has a process for this - https://spark.apache.org/improvement-proposals.html

The main purpose is to build a consensus on the community that the improvement is something we want to adopt. The Heilmeier form isn't purposed to bring up detailed design, but high-level design is appreciated (this change obviously warrants it since new distributed data exchange with RPC is introduced). Also probably need a much clearer answer about "when" users will be benefited by this change, especially that this is "opt-in" than opt-out. 5TB vs 2GB example in the JIRA ticket doesn't feel like a very general case, or might need more data about the trade-off between the cost of eliminating shuffle vs retrieving data via remote RPC instead of pre-loading the whole shard after shuffle - simply saying, do we have an answer for this, if you were users which criteria warrants this feature to be enabled?

The process requires one PMC member to be a shepherd - if you don't have one to contact and co-work with this process, probably start with dev@ mailing list with empty shepherd, and I assume you can find volunteer as long as your proposal is on consensus to the shape of "good to go".

Thanks again!

@yugan95

yugan95 commented Jun 17, 2026

Copy link
Copy Markdown
Author

Thanks @sunchao for the thorough review! All issues except issue 8 are addressed in the latest push.

Issue 1 - outputOrdering — Overrode to Nil. Async RPC callbacks break probe ordering.

Issue 2 - Shard transport authNettyShardLookupService now takes SecurityManager, wires AuthServerBootstrap/AuthClientBootstrap when spark.authenticate=true, and passes getRpcSSLOptions() to SparkTransportConf. Mirrors NettyBlockTransferService.

Issue 3 - SinglePartition vs ShardDistribution(1) — Added case _: ShardDistribution => false to SinglePartition.satisfies0.

Issue 4 - Task cancellation buffer leak — Added @volatile cancelled flag. Completion listener drains lookupQueue, releases all buffers, closes currentReader, then frees the map. Callbacks check cancelled and release on both success/failure paths.

Issue 5 - Replica installation syncinstallReplicaSet uses askSync with RpcTimeout from exchange timeout (default 30min). Endpoint collects replica futures via Future.sequence, replies only after all complete.

Issue 6 - Driver exclusion — Candidates filter out DRIVER_IDENTIFIER in non-local mode. Local mode keeps driver eligible as the only executor.

Issue 7 - hashJoinSupportedcreateDistributedMapJoin now guarded with if (hashJoinSupport), consistent with BHJ/SHJ.

Issue 8 - Response size bound — Will address as a follow-up. Adding chunked responses requires protocol changes that deserve a separate review.

Issue 9 - Exchange cleanup — Failed exchanges call unpersist in catch block. unpersist now also calls master.removeShardSet (new RemoveShardSet message) to clean shardSetInfo, shardSetLocations, and per-manager _shards.

Issue 10 - EOF flush throttlingflushLookup now enforces maxInFlightNum limit. Added checkValue(_ > 0) for both maxInFlightNum and maxBatchSize configs.

Issue 11 - Request retain leak — Added reqMsg.release() in fetchBatch catch block before onBatchFetchFailure.

CI fixes (binding policies, MiMa, ContextCleanerSuite, Java checkstyle) also included.

@yugan95

yugan95 commented Jun 17, 2026

Copy link
Copy Markdown
Author

@HeartSaVioR Thanks for the suggestion on the SPIP process.

A couple of clarifications on the scope and entry point:

This feature is double-gated — it requires spark.shard.enabled=true at application launch to start the shard infrastructure, and an explicit SQL hint (/*+ DISTMAPJOIN(table) */) per query to activate the join strategy. Both are off by default, so existing behavior is completely unaffected. The planner never considers it automatically. There is no cost-based selection. Users opt in per query, per table.

The use case is the gap between broadcast join and shuffle join: the build side is too large to broadcast (tens of GB) but the probe side is very large (PB-scale in our case — a minor correction to the JIRA ticket which mentions 5PB, not 5TB). Shuffling the probe side at that scale is expensive. Distributed map join avoids the probe-side shuffle by partitioning the build side into remotely queryable shards, with a Bloom filter to reduce RPC volume. This has been running stably in our production environment.

I'm happy to draft a SPIP proposal if the community feels that's the right process for this scope. Also glad to provide more detailed benchmarks on the trade-off between shuffle cost and RPC cost across different data profiles. Open to either path.

@HeartSaVioR

HeartSaVioR commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

That sounds to me even bigger scope than what I thought - should the shard infrastructure be also opt-in? What's the downside of making it by default? Is it a separate process than executor or inlined to the executor? If it's a separate process, then how the Spark infra launches them?

This obviously needs a design doc (there seems to be meaningful number of design decisions/options you might already went through while someone might want to revisit), and "Since it's opt-in, this doesn't impact others unless they opt-in" does not simply justify an addition of the feature. Again, I'm not an expert of this area, but I feel like folks in SQL area might be a same page that this warrants SPIP.

cc. @cloud-fan @dtenedor FYI

@yugan95

yugan95 commented Jun 17, 2026

Copy link
Copy Markdown
Author

@HeartSaVioR Thanks for the follow-up questions.

To clarify the shard infrastructure: it is fully inlined in the executor process — just an additional Netty server and a few RPC endpoints started inside SparkEnv. There is no separate process, no external service, and no additional deployment step. When spark.shard.enabled=false (the default), none of this code is initialized and there is zero overhead.

The reason it defaults to off is that the Netty server opens an additional port on each executor. Keeping it opt-in felt more appropriate rather than starting a new listener by default.

I understand the concern about scope and I'm open to drafting a SPIP. Will wait to hear from the SQL area maintainers as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants