Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1945] Improvement(server): Remove PreAllocated buffer earlier by unregisterPureEvent #1970

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,16 @@ public void sendShuffleData(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, transportTime);
}
}
int requireSize = shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
int requireSize =
shuffleServer.getShuffleTaskManager().getRequireBufferSize(appId, requireBufferId);

StatusCode ret = StatusCode.SUCCESS;
String responseMessage = "OK";
if (req.getShuffleDataCount() > 0) {
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireSize);
ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
PreAllocatedBufferInfo info = manager.getAndRemovePreAllocatedBuffer(requireBufferId);
PreAllocatedBufferInfo info =
manager.getAndRemovePreAllocatedBuffer(appId, requireBufferId);
boolean isPreAllocated = info != null;
if (!isPreAllocated) {
String errorMsg =
Expand Down
112 changes: 82 additions & 30 deletions server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -114,7 +115,9 @@ public class ShuffleTaskManager {
private Map<String, Map<Integer, Roaring64NavigableMap[]>> partitionsToBlockIds;
private final ShuffleBufferManager shuffleBufferManager;
private Map<String, ShuffleTaskInfo> shuffleTaskInfos = JavaUtils.newConcurrentMap();
private Map<Long, PreAllocatedBufferInfo> requireBufferIds = JavaUtils.newConcurrentMap();
// appId -> {requireBufferId -> PreAllocatedBufferInfo}
private Map<String, Map<Long, PreAllocatedBufferInfo>> appIdToRequireBufferIdsMap =
JavaUtils.newConcurrentMap();
private Thread clearResourceThread;
private BlockingQueue<PurgeEvent> expiredAppIdQueue = Queues.newLinkedBlockingQueue();
private final Cache<String, ReentrantReadWriteLock> appLocks;
Expand Down Expand Up @@ -319,17 +322,21 @@ public StatusCode cacheShuffleData(
return shuffleBufferManager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
}

public PreAllocatedBufferInfo getAndRemovePreAllocatedBuffer(long requireBufferId) {
return requireBufferIds.remove(requireBufferId);
public PreAllocatedBufferInfo getAndRemovePreAllocatedBuffer(String appId, long requireBufferId) {
Map<Long, PreAllocatedBufferInfo> requireBufferIdMap = appIdToRequireBufferIdsMap.get(appId);
if (requireBufferIdMap == null) {
return null;
}
return requireBufferIdMap.remove(requireBufferId);
}

public void releasePreAllocatedSize(long requireSize) {
shuffleBufferManager.releasePreAllocatedSize(requireSize);
}

@VisibleForTesting
void removeAndReleasePreAllocatedBuffer(long requireBufferId) {
PreAllocatedBufferInfo info = getAndRemovePreAllocatedBuffer(requireBufferId);
void removeAndReleasePreAllocatedBuffer(String appId, long requireBufferId) {
PreAllocatedBufferInfo info = getAndRemovePreAllocatedBuffer(appId, requireBufferId);
if (info != null) {
releasePreAllocatedSize(info.getRequireSize());
}
Expand Down Expand Up @@ -541,9 +548,18 @@ public long requireBuffer(
public long requireBuffer(String appId, int requireSize) {
if (shuffleBufferManager.requireMemory(requireSize, true)) {
long requireId = requireBufferId.incrementAndGet();
requireBufferIds.put(
requireId,
new PreAllocatedBufferInfo(appId, requireId, System.currentTimeMillis(), requireSize));
ReentrantReadWriteLock.WriteLock appLock = getAppWriteLock(appId);
try {
// preAllocatedBufferCheck will obtain lock and remove the empty appId
appLock.lock();
Map<Long, PreAllocatedBufferInfo> requireBufferMaps =
appIdToRequireBufferIdsMap.computeIfAbsent(appId, x -> JavaUtils.newConcurrentMap());
requireBufferMaps.put(
requireId,
new PreAllocatedBufferInfo(appId, requireId, System.currentTimeMillis(), requireSize));
} finally {
appLock.unlock();
}
return requireId;
} else {
LOG.warn("Failed to require buffer, require size: {}", requireSize);
Expand Down Expand Up @@ -829,6 +845,12 @@ public void removeResources(String appId, boolean checkAppExpired) {
partitionsToBlockIds.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
Map<Long, PreAllocatedBufferInfo> requireBufferIdsMap = appIdToRequireBufferIdsMap.get(appId);
if (requireBufferIdsMap != null) {
for (PreAllocatedBufferInfo info : requireBufferIdsMap.values()) {
removeAndReleasePreAllocatedBuffer(appId, info.getRequireId());
}
}

String operationMsg = String.format("removing storage data for appId:%s", appId);
withTimeoutExecution(
Expand Down Expand Up @@ -896,34 +918,64 @@ public void refreshAppId(String appId) {
private void preAllocatedBufferCheck() {
try {
long current = System.currentTimeMillis();
List<Long> removeIds = Lists.newArrayList();
for (PreAllocatedBufferInfo info : requireBufferIds.values()) {
if (current - info.getTimestamp() > preAllocationExpired) {
removeIds.add(info.getRequireId());
for (Map.Entry<String, Map<Long, PreAllocatedBufferInfo>> entry :
appIdToRequireBufferIdsMap.entrySet()) {
String appId = entry.getKey();
if (MapUtils.isEmpty(entry.getValue())) {
ReentrantReadWriteLock.WriteLock appLock = getAppWriteLock(appId);
try {
appLock.lock();
// After double check, remove empty map related this appId from
// appIdToRequireBufferIdsMap
if (MapUtils.isEmpty(entry.getValue())) {
// Keep single point remove appId from appIdToRequireBufferIdsMap
appIdToRequireBufferIdsMap.remove(appId);
continue;
}
} finally {
appLock.unlock();
}
}
}
for (Long requireId : removeIds) {
PreAllocatedBufferInfo info = requireBufferIds.remove(requireId);
if (info != null) {
// move release memory code down to here as the requiredBuffer could be consumed during
// removing processing.
shuffleBufferManager.releaseMemory(info.getRequireSize(), false, true);
LOG.warn(
"Remove expired preAllocatedBuffer[id={}] that required by app: {}",
requireId,
info.getAppId());
ShuffleServerMetrics.counterPreAllocatedBufferExpired.inc();
} else {
LOG.info("PreAllocatedBuffer[id={}] has already be used", requireId);
List<Long> toRemoveIds = Lists.newArrayList();
for (PreAllocatedBufferInfo info : entry.getValue().values()) {
if (current - info.getTimestamp() > preAllocationExpired) {
toRemoveIds.add(info.getRequireId());
}
}
List<Long> removedIds = Lists.newArrayList();
List<Long> usedIds = Lists.newArrayList();
for (Long requireId : toRemoveIds) {
PreAllocatedBufferInfo info = getAndRemovePreAllocatedBuffer(appId, requireId);
if (info != null) {
// move release memory code down to here as the requiredBuffer could be consumed during
// removing processing.
shuffleBufferManager.releaseMemory(info.getRequireSize(), false, true);
removedIds.add(requireId);
ShuffleServerMetrics.counterPreAllocatedBufferExpired.inc();
} else {
usedIds.add(requireId);
}
if (removedIds.size() > 0) {
LOG.info(
"Remove expired preAllocatedBuffer[id={}] for app[{}], removedIds: {}, usedIds: {}",
requireId,
appId,
removedIds,
usedIds);
}
}
}
} catch (Exception e) {
LOG.warn("Error happened in preAllocatedBufferCheck", e);
}
}

public int getRequireBufferSize(long requireId) {
PreAllocatedBufferInfo pabi = requireBufferIds.get(requireId);
public int getRequireBufferSize(String appId, long requireId) {
Map<Long, PreAllocatedBufferInfo> requireBufferIdMap = appIdToRequireBufferIdsMap.get(appId);
if (requireBufferIdMap == null) {
return 0;
}
PreAllocatedBufferInfo pabi = requireBufferIdMap.get(requireId);
if (pabi == null) {
return 0;
}
Expand All @@ -940,8 +992,8 @@ public Set<String> getAppIds() {
}

@VisibleForTesting
Map<Long, PreAllocatedBufferInfo> getRequireBufferIds() {
return requireBufferIds;
Supplier<Map<Long, PreAllocatedBufferInfo>> getRequireBufferIdSizeByAppId(String appId) {
return () -> appIdToRequireBufferIdsMap.get(appId);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
// thread,
// otherwise we need to release the required size.
PreAllocatedBufferInfo info =
shuffleTaskManager.getAndRemovePreAllocatedBuffer(requireBufferId);
shuffleTaskManager.getAndRemovePreAllocatedBuffer(appId, requireBufferId);
int requireSize = info == null ? 0 : info.getRequireSize();
int requireBlocksSize =
requireSize - req.encodedLength() < 0 ? 0 : requireSize - req.encodedLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -365,33 +366,35 @@ public void writeProcessTest() throws Exception {
StringUtils.EMPTY);
final List<ShufflePartitionedBlock> expectedBlocks1 = Lists.newArrayList();
final List<ShufflePartitionedBlock> expectedBlocks2 = Lists.newArrayList();
final Map<Long, PreAllocatedBufferInfo> bufferIds = shuffleTaskManager.getRequireBufferIds();
// Since requireBuffer doesn't specify the appId, "EMPTY" is used instead.
final Supplier<Map<Long, PreAllocatedBufferInfo>> bufferIds =
shuffleTaskManager.getRequireBufferIdSizeByAppId("EMPTY");

shuffleTaskManager.requireBuffer(10);
shuffleTaskManager.requireBuffer(10);
shuffleTaskManager.requireBuffer(10);
assertEquals(3, bufferIds.size());
assertEquals(3, bufferIds.get().size());
// required buffer should be clear if it doesn't receive data after timeout
Thread.sleep(6000);
assertEquals(0, bufferIds.size());
assertEquals(0, bufferIds.get() == null ? 0 : bufferIds.get().size());

shuffleTaskManager.commitShuffle(appId, shuffleId);

// won't flush for partition 1-1
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
expectedBlocks1.addAll(Lists.newArrayList(partitionedData0.getBlockList()));
long bufferId = shuffleTaskManager.requireBuffer(35);
assertEquals(1, bufferIds.size());
PreAllocatedBufferInfo pabi = bufferIds.get(bufferId);
assertEquals(1, bufferIds.get().size());
PreAllocatedBufferInfo pabi = bufferIds.get().get(bufferId);
assertEquals(35, pabi.getRequireSize());
StatusCode sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData0.getBlockList());
// the required id won't be removed in shuffleTaskManager, it is removed in Grpc service
assertEquals(1, bufferIds.size());
assertEquals(1, bufferIds.get().size());
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.commitShuffle(appId, shuffleId);
// manually release the pre allocate buffer
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);

ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
assertEquals(
Expand All @@ -404,7 +407,7 @@ public void writeProcessTest() throws Exception {
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData1);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData1.getBlockList());
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1);

// won't flush for partition 1-1
Expand All @@ -421,7 +424,7 @@ public void writeProcessTest() throws Exception {
bufferId = shuffleTaskManager.requireBuffer(30);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData3);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData3.getBlockList());
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);
assertEquals(StatusCode.SUCCESS, sc);

// flush for partition 2-2
Expand All @@ -430,7 +433,7 @@ public void writeProcessTest() throws Exception {
bufferId = shuffleTaskManager.requireBuffer(35);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData4);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData4.getBlockList());
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);
assertEquals(StatusCode.SUCCESS, sc);

shuffleTaskManager.commitShuffle(appId, shuffleId);
Expand All @@ -444,7 +447,7 @@ public void writeProcessTest() throws Exception {
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData5);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);

// 2 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2);
Expand All @@ -460,7 +463,7 @@ public void writeProcessTest() throws Exception {
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData7);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);

// 2 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2 + 2);
Expand Down
Loading