Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: apache/incubator-uniffle
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 9e9721f10cd2240c9dfc6d50ee67f49a073e4608
Choose a base ref
..
head repository: apache/incubator-uniffle
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 85b1f01e8ac409b4dac59361928b911248ce3973
Choose a head ref
Showing with 15,258 additions and 2,747 deletions.
  1. +2 −2 bin/start-coordinator.sh
  2. +2 −2 bin/start-dashboard.sh
  3. +2 −2 bin/start-shuffle-server.sh
  4. +12 −0 client-mr/core/pom.xml
  5. +10 −5 client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
  6. +29 −5 client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBuffer.java
  7. +39 −10 client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
  8. +10 −0 client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
  9. +36 −0 client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MRMetricsReporter.java
  10. +249 −0 client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RMRssShuffle.java
  11. +22 −1 client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
  12. +80 −0 client-mr/core/src/main/java/org/apache/uniffle/client/shuffle/MRCombiner.java
  13. +365 −0 client-mr/core/src/main/java/org/apache/uniffle/client/shuffle/RecordCollector.java
  14. +244 −26 client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
  15. +16 −3 client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferTest.java
  16. +20 −8 client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
  17. +287 −0 client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RMRssShuffleTest.java
  18. +109 −0 client-mr/core/src/test/java/org/apache/uniffle/client/shuffle/MRCombinerTest.java
  19. +102 −0 client-mr/core/src/test/java/org/apache/uniffle/client/shuffle/RecordCollectorTest.java
  20. +6 −3 client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
  21. +5 −5 client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
  22. +59 −53 client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
  23. +7 −0 client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
  24. +11 −15 client-spark/spark2-shaded/pom.xml
  25. +35 −51 client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
  26. +5 −1 client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
  27. +6 −5 client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
  28. +11 −15 client-spark/spark3-shaded/pom.xml
  29. +33 −48 client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
  30. +6 −1 client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
  31. +4 −1 client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
  32. +23 −11 client-tez/pom.xml
  33. +36 −0 client-tez/src/main/java/org/apache/tez/common/GetShuffleServerRequest.java
  34. +10 −0 client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
  35. +10 −7 client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
  36. +8 −1 client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
  37. +33 −3 client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
  38. +14 −2 client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
  39. +273 −0 ...-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RMRssShuffle.java
  40. +84 −0 ...main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RMRssShuffleScheduler.java
  41. +21 −15 ...c/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
  42. +29 −5 client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBuffer.java
  43. +36 −4 client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
  44. +8 −1 client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
  45. +4 −1 client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
  46. +320 −0 client-tez/src/main/java/org/apache/tez/runtime/library/input/RMRssOrderedGroupedKVInput.java
  47. +25 −2 client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
  48. +442 −0 .../src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RMRssShuffleTest.java
  49. +153 −12 ...t-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
  50. +97 −2 client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferTest.java
  51. +481 −0 client-tez/src/test/java/org/apache/tez/runtime/library/input/RMRssOrderedGroupedKVInputTest.java
  52. +6 −0 client/pom.xml
  53. +24 −5 client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
  54. +161 −139 client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
  55. +69 −0 client/src/main/java/org/apache/uniffle/client/record/Record.java
  56. +110 −0 client/src/main/java/org/apache/uniffle/client/record/RecordBlob.java
  57. +112 −0 client/src/main/java/org/apache/uniffle/client/record/RecordBuffer.java
  58. +8 −7 ...ger/Recordable.java → client/src/main/java/org/apache/uniffle/client/record/RecordCollection.java
  59. +22 −0 client/src/main/java/org/apache/uniffle/client/record/metrics/MetricsReporter.java
  60. +67 −0 client/src/main/java/org/apache/uniffle/client/record/reader/BufferedSegment.java
  61. +29 −0 client/src/main/java/org/apache/uniffle/client/record/reader/KeyValueReader.java
  62. +29 −0 client/src/main/java/org/apache/uniffle/client/record/reader/KeyValuesReader.java
  63. +682 −0 client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java
  64. +38 −0 client/src/main/java/org/apache/uniffle/client/record/writer/Combiner.java
  65. +5 −0 client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
  66. +100 −0 client/src/test/java/org/apache/uniffle/client/record/reader/BufferedSegmentTest.java
  67. +211 −0 client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleServerClient.java
  68. +179 −0 client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleWriteClient.java
  69. +334 −0 client/src/test/java/org/apache/uniffle/client/record/reader/RMRecordsReaderTest.java
  70. +164 −0 client/src/test/java/org/apache/uniffle/client/record/writer/RecordCollectionTest.java
  71. +123 −0 client/src/test/java/org/apache/uniffle/client/record/writer/SumByKeyCombiner.java
  72. +65 −0 common/src/main/java/org/apache/uniffle/common/PartitionInfo.java
  73. +37 −9 common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
  74. +189 −0 common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
  75. +15 −1 common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
  76. +24 −14 common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
  77. +19 −10 common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
  78. +29 −0 common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
  79. +43 −0 common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
  80. +4 −0 common/src/main/java/org/apache/uniffle/common/config/RssConf.java
  81. +51 −0 common/src/main/java/org/apache/uniffle/common/executor/MeasurableRejectedExecutionHandler.java
  82. +381 −0 common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
  83. +54 −36 common/src/main/java/org/apache/uniffle/common/merger/Merger.java
  84. +4 −0 common/src/main/java/org/apache/uniffle/common/merger/Segment.java
  85. +13 −60 common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
  86. +77 −0 common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
  87. +8 −2 common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
  88. +29 −5 common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
  89. +7 −3 common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
  90. +4 −0 common/src/main/java/org/apache/uniffle/common/netty/client/TransportConf.java
  91. +111 −0 common/src/main/java/org/apache/uniffle/common/netty/protocol/GetSortedShuffleDataRequest.java
  92. +93 −0 common/src/main/java/org/apache/uniffle/common/netty/protocol/GetSortedShuffleDataResponse.java
  93. +11 −1 common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
  94. +15 −1 common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
  95. +9 −4 common/src/main/java/org/apache/uniffle/common/records/RecordsReader.java
  96. +12 −3 common/src/main/java/org/apache/uniffle/common/records/RecordsWriter.java
  97. +11 −3 common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
  98. +69 −0 common/src/main/java/org/apache/uniffle/common/serializer/BufferSerInputStream.java
  99. +2 −0 common/src/main/java/org/apache/uniffle/common/serializer/DeserializationStream.java
  100. +65 −0 common/src/main/java/org/apache/uniffle/common/serializer/DynBufferSerOutputStream.java
  101. +47 −63 ...java/org/apache/uniffle/common/serializer/{PartialInputStreamImpl.java → FileSerInputStream.java}
  102. +60 −0 common/src/main/java/org/apache/uniffle/common/serializer/FileSerOutputStream.java
  103. +0 −165 common/src/main/java/org/apache/uniffle/common/serializer/SeekableInMemoryByteChannel.java
  104. +62 −0 common/src/main/java/org/apache/uniffle/common/serializer/SerInputStream.java
  105. +11 −10 ...c/main/java/org/apache/uniffle/common/serializer/{PartialInputStream.java → SerOutputStream.java}
  106. +2 −0 common/src/main/java/org/apache/uniffle/common/serializer/SerializationStream.java
  107. +3 −3 common/src/main/java/org/apache/uniffle/common/serializer/SerializerInstance.java
  108. +36 −0 common/src/main/java/org/apache/uniffle/common/serializer/WrappedByteArrayOutputStream.java
  109. +116 −0 .../java/org/apache/uniffle/common/serializer/writable/BufferedRawWritableDeserializationStream.java
  110. +84 −0 ...in/java/org/apache/uniffle/common/serializer/writable/BufferedRawWritableSerializationStream.java
  111. +13 −4 ...src/main/java/org/apache/uniffle/common/serializer/writable/RawWritableDeserializationStream.java
  112. +17 −17 ...n/src/main/java/org/apache/uniffle/common/serializer/writable/RawWritableSerializationStream.java
  113. +12 −3 ...on/src/main/java/org/apache/uniffle/common/serializer/writable/WritableDeserializationStream.java
  114. +16 −12 common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializationStream.java
  115. +15 −6 common/src/main/java/org/apache/uniffle/common/serializer/writable/WritableSerializerInstance.java
  116. +2 −2 common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
  117. +4 −0 common/src/main/java/org/apache/uniffle/common/util/ByteUnit.java
  118. +2 −0 common/src/main/java/org/apache/uniffle/common/util/Constants.java
  119. +16 −0 common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
  120. +22 −0 common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
  121. +7 −1 common/src/main/java/org/apache/uniffle/common/web/JettyServer.java
  122. +69 −0 common/src/main/java/org/apache/uniffle/common/web/resource/ConfOpsResource.java
  123. +23 −11 ...ctDeletionStrategy.java → common/src/main/java/org/apache/uniffle/common/web/resource/ConfVO.java
  124. +2 −0 common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
  125. +145 −0 common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
  126. +3 −3 common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
  127. +157 −0 common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
  128. +52 −46 common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
  129. +35 −0 common/src/test/java/org/apache/uniffle/common/netty/TransportFrameDecoderTest.java
  130. +49 −0 common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java
  131. +125 −255 common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
  132. +52 −55 ...rg/apache/uniffle/common/serializer/{PartialInputStreamTest.java → SerInputOutputStreamTest.java}
  133. +53 −35 common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
  134. +82 −148 common/src/test/java/org/apache/uniffle/common/serializer/WritableSerializerTest.java
  135. +13 −0 common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
  136. +11 −0 common/src/test/java/org/apache/uniffle/common/util/UnitConverterTest.java
  137. +1 −1 common/src/test/java/org/apache/uniffle/common/web/JettyServerTest.java
  138. +30 −13 coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
  139. +6 −1 coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
  140. +15 −2 coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
  141. +3 −0 coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
  142. +37 −4 coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ApplicationResource.java
  143. +6 −0 coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/CoordinatorServerResource.java
  144. +7 −0 coordinator/src/main/java/org/apache/uniffle/coordinator/web/vo/AppInfoVO.java
  145. +2 −1 coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
  146. +1 −1 coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
  147. +131 −1 dashboard/src/main/webapp/src/mock/nodelistpage.js
  148. +1 −1 dashboard/src/main/webapp/src/mock/shuffleserverpage.js
  149. +39 −9 dashboard/src/main/webapp/src/pages/ApplicationPage.vue
  150. +1 −1 dashboard/src/main/webapp/src/pages/serverstatus/NodeListPage.vue
  151. +8 −0 deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go
  152. +26 −0 deploy/kubernetes/operator/api/uniffle/v1alpha1/zz_generated.deepcopy.go
  153. +16 −0 deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml
  154. +23 −4 deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
  155. +45 −0 deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
  156. BIN docs/asset/rss_remote_merge_architecture.png
  157. +1 −0 docs/client_guide/client_guide.md
  158. +128 −0 docs/remote_merge_guide.md
  159. +4 −0 docs/server_guide.md
  160. +2 −0 integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
  161. +22 −15 integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
  162. +956 −0 ...ration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
  163. +963 −0 .../src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
  164. +8 −7 integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
  165. +8 −6 integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
  166. +8 −6 integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
  167. +60 −15 integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
  168. +106 −0 integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
  169. +8 −4 integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
  170. +8 −4 integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
  171. +129 −0 integration-test/tez/src/test/java/org/apache/uniffle/test/RMTezOrderedWordCountTest.java
  172. +6 −0 integration-test/tez/src/test/java/org/apache/uniffle/test/TezCartesianProductTest.java
  173. +9 −0 integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
  174. +6 −0 integration-test/tez/src/test/java/org/apache/uniffle/test/TezJoinIntegrationTestBase.java
  175. +6 −0 integration-test/tez/src/test/java/org/apache/uniffle/test/TezOrderedWordCountTest.java
  176. +6 −0 integration-test/tez/src/test/java/org/apache/uniffle/test/TezSimpleSessionExampleTest.java
  177. +6 −0 integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountTest.java
  178. +1 −1 internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
  179. +8 −0 internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
  180. +15 −0 internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
  181. +7 −3 internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
  182. +286 −0 ...rnal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java
  183. +138 −2 internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
  184. +65 −0 internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
  185. +25 −5 internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
  186. +18 −2 internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
  187. +62 −0 internal-client/src/main/java/org/apache/uniffle/client/request/RssGetSortedShuffleDataRequest.java
  188. +16 −4 internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java
  189. +10 −1 internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
  190. +52 −0 internal-client/src/main/java/org/apache/uniffle/client/request/RssStartSortMergeRequest.java
  191. +63 −0 ...rnal-client/src/main/java/org/apache/uniffle/client/response/RssGetSortedShuffleDataResponse.java
  192. +27 −0 internal-client/src/main/java/org/apache/uniffle/client/response/RssStartSortMergeResponse.java
  193. +49 −0 proto/src/main/proto/Rss.proto
  194. +23 −21 server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
  195. +4 −5 server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
  196. +20 −35 server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
  197. +44 −8 server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
  198. +80 −0 server/src/main/java/org/apache/uniffle/server/ShuffleDetailInfo.java
  199. +24 −11 server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
  200. +44 −9 server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
  201. +5 −0 server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
  202. +44 −0 server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
  203. +317 −29 server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
  204. +23 −11 server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
  205. +16 −0 server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
  206. +74 −14 server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
  207. +67 −10 server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
  208. +28 −8 server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
  209. +12 −4 server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
  210. +63 −9 server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
  211. +57 −27 server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
  212. +63 −21 server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
  213. +0 −14 server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java
  214. +1 −6 server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java
  215. +149 −39 server/src/main/java/org/apache/uniffle/server/merge/BlockFlushFileReader.java
  216. +4 −1 server/src/main/java/org/apache/uniffle/server/merge/DefaultMergeEventHandler.java
  217. +1 −1 server/src/main/java/org/apache/uniffle/server/merge/MergeEventHandler.java
  218. +87 −31 server/src/main/java/org/apache/uniffle/server/merge/MergedResult.java
  219. +156 −116 server/src/main/java/org/apache/uniffle/server/merge/Partition.java
  220. +3 −8 server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
  221. +70 −34 server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
  222. +168 −27 server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
  223. +0 −111 server/src/main/java/org/apache/uniffle/server/storage/HadoopDeletionStrategy.java
  224. +18 −3 server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
  225. +0 −96 server/src/main/java/org/apache/uniffle/server/storage/LocalDeletionStrategy.java
  226. +41 −6 server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
  227. +6 −13 server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
  228. +1 −1 server/src/main/java/org/apache/uniffle/server/storage/hybrid/DefaultStorageManagerSelector.java
  229. +6 −0 server/src/main/java/org/apache/uniffle/server/web/resource/ServerResource.java
  230. +11 −6 server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
  231. +2 −2 server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
  232. +3 −2 server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
  233. +2 −1 server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
  234. +29 −7 server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
  235. +11 −9 server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
  236. +63 −27 server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
  237. +20 −7 server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
  238. +68 −64 server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java
  239. +74 −61 server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java
  240. +56 −17 server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
  241. +2 −3 server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
  242. +27 −14 storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
  243. +2 −0 storage/src/main/java/org/apache/uniffle/storage/common/HadoopStorage.java
  244. +1 −0 storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
  245. +2 −2 storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
  246. +0 −76 storage/src/main/java/org/apache/uniffle/storage/handler/AsynchronousDeleteEvent.java
  247. +0 −5 storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java
  248. +2 −2 storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
  249. +36 −0 storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandlerWrapper.java
  250. +12 −4 storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopFileWriter.java
  251. +0 −53 storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
  252. +61 −12 storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
  253. +0 −40 storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
  254. +43 −7 storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
  255. +8 −1 storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriter.java
  256. +17 −4 storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
  257. +5 −2 storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
  258. +12 −0 storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
  259. +2 −2 storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
  260. +1 −1 storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
  261. +2 −2 storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
  262. +2 −1 ...ge/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
4 changes: 2 additions & 2 deletions bin/start-coordinator.sh
Original file line number Diff line number Diff line change
@@ -68,11 +68,11 @@ COORDINATOR_BASE_JVM_ARGS=${COORDINATOR_BASE_JVM_ARGS:-" -server \
-Xms${COORDINATOR_XMX_SIZE} \
-XX:+PrintCommandLineFlags"}

DEFAULT_GC_ARGS=" -XX:+UseG1GC \
DEFAULT_GC_ARGS=${COORDINATOR_DEFAULT_GC_ARGS:-" -XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 \
-XX:ParallelGCThreads=20 \
-XX:ConcGCThreads=5 \
-XX:InitiatingHeapOccupancyPercent=45"
-XX:InitiatingHeapOccupancyPercent=45"}

GC_LOG_ARGS_LEGACY=" -XX:+PrintGC \
-XX:+PrintAdaptiveSizePolicy \
4 changes: 2 additions & 2 deletions bin/start-dashboard.sh
Original file line number Diff line number Diff line change
@@ -54,11 +54,11 @@ DASHBOARD_BASE_JVM_ARGS=${DASHBOARD_BASE_JVM_ARGS:-" -server \
-Xms${DASHBOARD_XMX_SIZE} \
-XX:+PrintCommandLineFlags"}

DEFAULT_GC_ARGS=" -XX:+UseG1GC \
DEFAULT_GC_ARGS=${DASHBOARD_DEFAULT_GC_ARGS:-" -XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 \
-XX:ParallelGCThreads=20 \
-XX:ConcGCThreads=5 \
-XX:InitiatingHeapOccupancyPercent=45"
-XX:InitiatingHeapOccupancyPercent=45"}

GC_LOG_ARGS_LEGACY=" -XX:+PrintGC \
-XX:+PrintAdaptiveSizePolicy \
4 changes: 2 additions & 2 deletions bin/start-shuffle-server.sh
Original file line number Diff line number Diff line change
@@ -104,13 +104,13 @@ SHUFFLE_SERVER_BASE_JVM_ARGS=${SHUFFLE_SERVER_BASE_JVM_ARGS:-" -server \
-XX:+UnlockExperimentalVMOptions \
-XX:+PrintCommandLineFlags"}

DEFAULT_GC_ARGS=" -XX:+UseG1GC \
DEFAULT_GC_ARGS=${SHUFFLE_SERVER_DEFAULT_GC_ARGS:-" -XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 \
-XX:ParallelGCThreads=20 \
-XX:ConcGCThreads=5 \
-XX:InitiatingHeapOccupancyPercent=60 \
-XX:G1HeapRegionSize=32m \
-XX:G1NewSizePercent=10"
-XX:G1NewSizePercent=10"}

GC_LOG_ARGS_LEGACY=" -XX:+PrintGC \
-XX:+PrintAdaptiveSizePolicy \
12 changes: 12 additions & 0 deletions client-mr/core/pom.xml
Original file line number Diff line number Diff line change
@@ -123,6 +123,18 @@
<artifactId>hadoop${hadoop.short.version}-shim</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Original file line number Diff line number Diff line change
@@ -28,7 +28,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.mapreduce.RssMRUtils;
@@ -131,7 +130,6 @@ public void init(Context context) throws IOException, ClassNotFoundException {

Map<Integer, List<ShuffleServerInfo>> partitionToServers = createAssignmentMap(rssJobConf);

SerializationFactory serializationFactory = new SerializationFactory(mrJobConf);
long maxSegmentSize =
RssMRUtils.getLong(
rssJobConf,
@@ -148,13 +146,19 @@ public void init(Context context) throws IOException, ClassNotFoundException {
RssMRConfig.RSS_WRITER_BUFFER_SIZE,
RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE);
shuffleClient = RssMRUtils.createShuffleClient(mrJobConf);
boolean isRemoteMergeEnable =
RssMRUtils.getBoolean(
rssJobConf,
RssMRConfig.RSS_REMOTE_MERGE_ENABLE,
RssMRConfig.RSS_REMOTE_MERGE_ENABLE_DEFAULT);
bufferManager =
new SortWriteBufferManager(
(long) (ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
taskAttemptId,
batch,
serializationFactory.getSerializer(keyClass),
serializationFactory.getSerializer(valClass),
keyClass,
valClass,
mrJobConf,
comparator,
memoryThreshold,
appId,
@@ -174,7 +178,8 @@ public void init(Context context) throws IOException, ClassNotFoundException {
sendThreshold,
maxBufferSize,
RssMRConfig.toRssConf(rssJobConf),
combinerRunner);
combinerRunner,
isRemoteMergeEnable);
}

private Map<Integer, List<ShuffleServerInfo>> createAssignmentMap(Configuration jobConf) {
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@

package org.apache.hadoop.mapred;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
@@ -31,6 +32,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.serializer.SerializerInstance;

public class SortWriteBuffer<K, V> extends OutputStream {

private static final Logger LOG = LoggerFactory.getLogger(SortWriteBuffer.class);
@@ -47,35 +50,56 @@ public class SortWriteBuffer<K, V> extends OutputStream {
private int currentOffset = 0;
private int currentIndex = 0;

private final boolean useUniffleSerializer;
private final SerializerInstance serializerInstance;
private DataOutputStream dataOutputStream;

public SortWriteBuffer(
int partitionId,
RawComparator<K> comparator,
long maxSegmentSize,
boolean useUniffleSerializer,
Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
Serializer<V> valueSerializer,
SerializerInstance serializerInstance) {
this.partitionId = partitionId;
this.comparator = comparator;
this.maxSegmentSize = maxSegmentSize;
this.useUniffleSerializer = useUniffleSerializer;
this.keySerializer = keySerializer;
this.valSerializer = valueSerializer;
this.serializerInstance = serializerInstance;
if (useUniffleSerializer) {
this.dataOutputStream = new DataOutputStream(this);
}
}

public int addRecord(K key, V value) throws IOException {
keySerializer.open(this);
valSerializer.open(this);
if (!useUniffleSerializer) {
keySerializer.open(this);
valSerializer.open(this);
}
int lastOffSet = currentOffset;
int lastIndex = currentIndex;
int lastDataLength = dataLength;
int keyIndex = lastIndex;
keySerializer.serialize(key);
if (useUniffleSerializer) {
serializerInstance.serialize(key, this.dataOutputStream);
} else {
keySerializer.serialize(key);
}
int keyLength = dataLength - lastDataLength;
int keyOffset = lastOffSet;
if (compact(lastIndex, lastOffSet, keyLength)) {
keyOffset = lastOffSet;
keyIndex = lastIndex;
}
lastDataLength = dataLength;
valSerializer.serialize(value);
if (useUniffleSerializer) {
serializerInstance.serialize(value, this.dataOutputStream);
} else {
valSerializer.serialize(value);
}
int valueLength = dataLength - lastDataLength;
records.add(new Record<K>(keyIndex, keyOffset, keyLength, valueLength));
return keyLength + valueLength;
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapreduce.RssMRUtils;
import org.slf4j.Logger;
@@ -50,6 +51,8 @@
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.serializer.SerializerFactory;
import org.apache.uniffle.common.serializer.SerializerInstance;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -74,8 +77,8 @@ public class SortWriteBufferManager<K, V> {
private final double sendThreshold;
private final ReentrantLock memoryLock = new ReentrantLock();
private final Condition full = memoryLock.newCondition();
private final Serializer<K> keySerializer;
private final Serializer<V> valSerializer;
private Serializer<K> keySerializer;
private Serializer<V> valSerializer;
private final RawComparator<K> comparator;
private final Set<Long> successBlockIds;
private final Set<Long> failedBlockIds;
@@ -99,13 +102,16 @@ public class SortWriteBufferManager<K, V> {
private final RssConf rssConf;
private final Optional<Codec> codec;
private final Task.CombinerRunner<K, V> combinerRunner;
private final boolean useUniffleSerializer;
private SerializerInstance serializerInstance;

public SortWriteBufferManager(
long maxMemSize,
long taskAttemptId,
int batch,
Serializer<K> keySerializer,
Serializer<V> valSerializer,
Class<K> keyClass,
Class<V> valClass,
JobConf mrJobConf,
RawComparator<K> comparator,
double memoryThreshold,
String appId,
@@ -125,12 +131,11 @@ public SortWriteBufferManager(
double sendThreshold,
long maxBufferSize,
RssConf rssConf,
Task.CombinerRunner<K, V> combinerRunner) {
Task.CombinerRunner<K, V> combinerRunner,
boolean useUniffleSerializer) {
this.maxMemSize = maxMemSize;
this.taskAttemptId = taskAttemptId;
this.batch = batch;
this.keySerializer = keySerializer;
this.valSerializer = valSerializer;
this.comparator = comparator;
this.memoryThreshold = memoryThreshold;
this.appId = appId;
@@ -152,6 +157,17 @@ public SortWriteBufferManager(
this.rssConf = rssConf;
this.codec = Codec.newInstance(rssConf);
this.combinerRunner = combinerRunner;
this.useUniffleSerializer = useUniffleSerializer;
if (useUniffleSerializer) {
SerializerFactory factory = new SerializerFactory(rssConf);
org.apache.uniffle.common.serializer.Serializer serializer = factory.getSerializer(keyClass);
this.serializerInstance = serializer.newInstance();
assert factory.getSerializer(valClass).getClass().equals(serializer.getClass());
} else {
SerializationFactory serializationFactory = new SerializationFactory(mrJobConf);
this.keySerializer = serializationFactory.getSerializer(keyClass);
this.valSerializer = serializationFactory.getSerializer(valClass);
}
}

// todo: Single Buffer should also have its size limit
@@ -178,7 +194,13 @@ public void addRecord(int partitionId, K key, V value) throws IOException, Inter
k -> {
SortWriteBuffer<K, V> sortWriterBuffer =
new SortWriteBuffer(
partitionId, comparator, maxSegmentSize, keySerializer, valSerializer);
partitionId,
comparator,
maxSegmentSize,
useUniffleSerializer,
keySerializer,
valSerializer,
serializerInstance);
waitSendBuffers.add(sortWriterBuffer);
return sortWriterBuffer;
});
@@ -274,7 +296,13 @@ public SortWriteBuffer<K, V> combineBuffer(SortWriteBuffer<K, V> buffer)

SortWriteBuffer<K, V> newBuffer =
new SortWriteBuffer<>(
buffer.getPartitionId(), comparator, maxSegmentSize, keySerializer, valSerializer);
buffer.getPartitionId(),
comparator,
maxSegmentSize,
useUniffleSerializer,
keySerializer,
valSerializer,
serializerInstance);

combineCollector.setWriter(newBuffer);
combinerRunner.combine(kvIterator, combineCollector);
@@ -384,7 +412,8 @@ ShuffleBlockInfo createShuffleBlock(SortWriteBuffer wb) {
int partitionId = wb.getPartitionId();
final int uncompressLength = data.length;
long start = System.currentTimeMillis();
final byte[] compressed = codec.map(c -> c.compress(data)).orElse(data);
final byte[] compressed =
useUniffleSerializer ? data : codec.map(c -> c.compress(data)).orElse(data);
final long crc32 = ChecksumUtils.getCrc32(compressed);
compressTime += System.currentTimeMillis() - start;
final long blockId =
Original file line number Diff line number Diff line change
@@ -196,6 +196,16 @@ public class RssMRConfig {
public static final String RSS_TEST_MODE_ENABLE =
MR_CONFIG_PREFIX + RssClientConfig.RSS_TEST_MODE_ENABLE;

public static final String RSS_REMOTE_MERGE_ENABLE =
MR_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_MERGE_ENABLE;
public static final boolean RSS_REMOTE_MERGE_ENABLE_DEFAULT = false;
public static final String RSS_MERGED_BLOCK_SZIE =
MR_CONFIG_PREFIX + RssClientConfig.RSS_MERGED_BLOCK_SZIE;
public static final int RSS_MERGED_BLOCK_SZIE_DEFAULT =
RssClientConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT;
public static final String RSS_REMOTE_MERGE_CLASS_LOADER =
MR_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_MERGE_CLASS_LOADER;

public static RssConf toRssConf(Configuration jobConf) {
RssConf rssConf = new RssConf();
for (Map.Entry<String, String> entry : jobConf) {
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.mapreduce.task.reduce;

import org.apache.hadoop.mapred.Reporter;

import org.apache.uniffle.client.record.metrics.MetricsReporter;

public class MRMetricsReporter implements MetricsReporter {

Reporter reporter;

public MRMetricsReporter(Reporter reporter) {
this.reporter = reporter;
}

@Override
public void incRecordsRead(long v) {
this.reporter.incrCounter(RMRssShuffle.Counter.INPUT_RECORDS_PROCESSED, v);
}
}
Loading