Skip to content

Commit b1dcdf4

Browse files
committed
update
1 parent 9ded35e commit b1dcdf4

File tree

14 files changed

+38
-105
lines changed

14 files changed

+38
-105
lines changed

worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ public PartitionDataWriter(
7878
writerContext.setPartitionDataWriter(this);
7979
writerContext.setDeviceMonitor(deviceMonitor);
8080
tierWriterProxy = new TierWriterProxy(writerContext, storageManager, conf, partitionType);
81-
tierWriterProxy.registerToDeviceMonitor();
8281
}
8382

8483
public DiskFileInfo getDiskFileInfo() {
@@ -132,7 +131,7 @@ public StorageInfo getStorageInfo() {
132131
return tierWriterProxy.getCurrentStorageInfo();
133132
}
134133

135-
public synchronized long close() {
134+
public long close() {
136135
return tierWriterProxy.close();
137136
}
138137

@@ -146,7 +145,7 @@ public void evict(boolean checkClose) throws IOException {
146145
tierWriterProxy.evict(checkClose);
147146
}
148147

149-
public synchronized void destroy(IOException ioException) {
148+
public void destroy(IOException ioException) {
150149
tierWriterProxy.destroy(ioException);
151150
}
152151

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.celeborn.service.deploy.worker.WorkerSource
3737

3838
trait DeviceMonitor {
3939
def startCheck() {}
40-
def registerFileWriter(fileWriter: PartitionDataWriter): Unit = {}
40+
def registerFileWriter(fileWriter: PartitionDataWriter, filePath: String): Unit = {}
4141
def unregisterFileWriter(fileWriter: PartitionDataWriter): Unit = {}
4242
// Only local flush needs device monitor.
4343
def registerFlusher(flusher: LocalFlusher): Unit = {}
@@ -170,8 +170,8 @@ class LocalDeviceMonitor(
170170
TimeUnit.MILLISECONDS)
171171
}
172172

173-
override def registerFileWriter(fileWriter: PartitionDataWriter): Unit = {
174-
val mountPoint = DeviceInfo.getMountPoint(fileWriter.getFilePath, diskInfos)
173+
override def registerFileWriter(fileWriter: PartitionDataWriter, filePath: String): Unit = {
174+
val mountPoint = DeviceInfo.getMountPoint(filePath, diskInfos)
175175
observedDevices.get(diskInfos.get(mountPoint).deviceInfo).addObserver(fileWriter)
176176
}
177177

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
477477
s3Writers.put(fileInfo.getFilePath, writer)
478478
return
479479
}
480-
deviceMonitor.registerFileWriter(writer)
480+
deviceMonitor.registerFileWriter(writer, fileInfo.getFilePath)
481481
workingDirWriters.computeIfAbsent(workingDir, workingDirWriterListFunc).put(
482482
fileInfo.getFilePath,
483483
writer)

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala

+1-7
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
3838
partitionDataWriterContext: PartitionDataWriterContext,
3939
partitionType: PartitionType,
4040
numPendingWrites: AtomicInteger,
41-
notifier: FlushNotifier,
42-
flushLock: AnyRef): TierWriterBase = {
41+
notifier: FlushNotifier): TierWriterBase = {
4342
evictFileOrder.foreach { order =>
4443
val orderList = order.get(celebornFile.storageType.name())
4544
if (orderList != null) {
@@ -48,7 +47,6 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
4847
partitionType,
4948
numPendingWrites,
5049
notifier,
51-
flushLock,
5250
orderList)
5351
}
5452
}
@@ -61,7 +59,6 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
6159
partitionType: PartitionType,
6260
numPendingWrites: AtomicInteger,
6361
notifier: FlushNotifier,
64-
flushLock: AnyRef,
6562
order: Option[List[String]] = createFileOrder): TierWriterBase = {
6663
logDebug(
6764
s"create file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
@@ -107,7 +104,6 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
107104
metaHandler,
108105
numPendingWrites,
109106
notifier,
110-
flushLock,
111107
source,
112108
memoryFileInfo,
113109
storageInfoType,
@@ -135,7 +131,6 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
135131
metaHandler,
136132
numPendingWrites,
137133
notifier,
138-
flushLock,
139134
flusher,
140135
source,
141136
diskFileInfo,
@@ -148,7 +143,6 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
148143
metaHandler,
149144
numPendingWrites,
150145
notifier,
151-
flushLock,
152146
flusher,
153147
source,
154148
diskFileInfo,

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala

+5-16
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ abstract class TierWriterBase(
4646
val metaHandler: PartitionMetaHandler,
4747
val numPendingWrites: AtomicInteger,
4848
val notifier: FlushNotifier,
49-
val flushLock: AnyRef,
5049
val fileInfo: FileInfo,
5150
val source: AbstractSource,
5251
val storageType: StorageInfo.Type,
@@ -60,6 +59,7 @@ abstract class TierWriterBase(
6059
var writerCloseTimeoutMs: Long = conf.workerWriterCloseTimeoutMs
6160
var flusherBufferSize = 0L
6261
private val chunkSize: Long = conf.shuffleChunkSize
62+
val flushLock: AnyRef = new AnyRef
6363

6464
@volatile var closed: Boolean = false
6565
@volatile private var destroyed: Boolean = false
@@ -257,16 +257,13 @@ abstract class TierWriterBase(
257257

258258
def getFlusher(): Flusher
259259

260-
def registerToDeviceMonitor(): Unit = {}
261-
262260
}
263261

264262
class MemoryTierWriter(
265263
conf: CelebornConf,
266264
metaHandler: PartitionMetaHandler,
267265
numPendingWriters: AtomicInteger,
268266
notifier: FlushNotifier,
269-
flushLock: AnyRef,
270267
source: AbstractSource,
271268
fileInfo: MemoryFileInfo,
272269
storageType: StorageInfo.Type,
@@ -277,7 +274,6 @@ class MemoryTierWriter(
277274
metaHandler,
278275
numPendingWriters,
279276
notifier,
280-
flushLock,
281277
fileInfo,
282278
source,
283279
storageType,
@@ -360,7 +356,6 @@ class LocalTierWriter(
360356
metaHandler: PartitionMetaHandler,
361357
numPendingWrites: AtomicInteger,
362358
notifier: FlushNotifier,
363-
flushLock: AnyRef,
364359
flusher: Flusher,
365360
source: AbstractSource,
366361
diskFileInfo: DiskFileInfo,
@@ -372,7 +367,6 @@ class LocalTierWriter(
372367
metaHandler,
373368
numPendingWrites,
374369
notifier,
375-
flushLock,
376370
diskFileInfo,
377371
source,
378372
storageType,
@@ -387,6 +381,10 @@ class LocalTierWriter(
387381
partitionDataWriterContext.getUserIdentifier)
388382
else
389383
null
384+
storageManager.registerDiskFilePartitionWriter(
385+
partitionDataWriterContext.getPartitionDataWriter,
386+
partitionDataWriterContext.getWorkingDir,
387+
fileInfo.asInstanceOf[DiskFileInfo])
390388

391389
private lazy val channel: FileChannel =
392390
FileChannelUtils.createWritableFileChannel(diskFileInfo.getFilePath)
@@ -474,21 +472,13 @@ class LocalTierWriter(
474472
def getFlusher(): Flusher = {
475473
flusher
476474
}
477-
478-
override def registerToDeviceMonitor(): Unit = {
479-
storageManager.registerDiskFilePartitionWriter(
480-
partitionDataWriterContext.getPartitionDataWriter,
481-
partitionDataWriterContext.getWorkingDir,
482-
fileInfo.asInstanceOf[DiskFileInfo])
483-
}
484475
}
485476

486477
class DfsTierWriter(
487478
conf: CelebornConf,
488479
metaHandler: PartitionMetaHandler,
489480
numPendingWrites: AtomicInteger,
490481
notifier: FlushNotifier,
491-
flushLock: AnyRef,
492482
flusher: Flusher,
493483
source: AbstractSource,
494484
hdfsFileInfo: DiskFileInfo,
@@ -500,7 +490,6 @@ class DfsTierWriter(
500490
metaHandler,
501491
numPendingWrites,
502492
notifier,
503-
flushLock,
504493
hdfsFileInfo,
505494
source,
506495
storageType,

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterProxy.scala

+8-15
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,15 @@ class TierWriterProxy(
3737
val notifier = new FlushNotifier
3838
val numPendingWrites = new AtomicInteger
3939
@volatile var currentTierWriter: TierWriterBase = _
40-
val flushLock = new AnyRef
4140

4241
currentTierWriter =
4342
storageManager.storagePolicy.createFileWriter(
4443
partitionDataWriterContext,
4544
partitionType,
4645
numPendingWrites,
47-
notifier,
48-
flushLock)
46+
notifier)
4947

50-
def write(buf: ByteBuf): Unit = this.flushLock.synchronized {
48+
def write(buf: ByteBuf): Unit = this.synchronized {
5149
if (currentTierWriter.needEvict()) {
5250
evict(false)
5351
}
@@ -56,7 +54,7 @@ class TierWriterProxy(
5654

5755
// evict and flush method need to be in a same synchronized block
5856
// because memory manager may want to evict a file under memory pressure
59-
def evict(checkClose: Boolean): Unit = this.flushLock.synchronized {
57+
def evict(checkClose: Boolean): Unit = this.synchronized {
6058
// close and evict might be invoked concurrently
6159
// do not evict committed files from memory manager
6260
// evict memory file info if worker is shutdown gracefully
@@ -71,21 +69,20 @@ class TierWriterProxy(
7169
partitionDataWriterContext,
7270
partitionType,
7371
numPendingWrites,
74-
notifier,
75-
flushLock)
72+
notifier)
7673
currentTierWriter.evict(nFile)
7774
currentTierWriter = nFile
7875
}
7976

80-
def flush(finalFlush: Boolean): Unit = this.flushLock.synchronized {
77+
def flush(finalFlush: Boolean): Unit = this.synchronized {
8178
currentTierWriter.flush(finalFlush)
8279
}
8380

8481
def getCurrentFileInfo(): FileInfo = {
8582
currentTierWriter.fileInfo
8683
}
8784

88-
def needHardSplitForMemoryFile(): Boolean = {
85+
def needHardSplitForMemoryFile(): Boolean = this.synchronized {
8986
if (!currentTierWriter.isInstanceOf[MemoryTierWriter]) {
9087
return false
9188
}
@@ -126,11 +123,11 @@ class TierWriterProxy(
126123
storageInfo
127124
}
128125

129-
def destroy(ioException: IOException): Unit = {
126+
def destroy(ioException: IOException): Unit = this.synchronized {
130127
currentTierWriter.destroy(ioException)
131128
}
132129

133-
def close(): Long = {
130+
def close(): Long = this.synchronized {
134131
currentTierWriter.close()
135132
}
136133

@@ -157,8 +154,4 @@ class TierWriterProxy(
157154
def getFlusher(): Flusher = {
158155
currentTierWriter.getFlusher()
159156
}
160-
161-
def registerToDeviceMonitor(): Unit = {
162-
currentTierWriter.registerToDeviceMonitor()
163-
}
164157
}

worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java

+4-30
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
import org.apache.celeborn.service.deploy.worker.WorkerSource;
4242

4343
public class PartitionDataWriterSuiteUtils {
44-
private static Object flushLock = new Object();
45-
4644
public static File getTemporaryFile(File tempDir) throws IOException {
4745
String filename = UUID.randomUUID().toString();
4846
File temporaryFile = new File(tempDir, filename);
@@ -98,7 +96,6 @@ public static StorageManager prepareDiskFileTestEnvironment(
9896
finalMetaHandler,
9997
numPendingWriters,
10098
flushNotifier,
101-
flushLock,
10299
flusher,
103100
source,
104101
fileInfo,
@@ -107,12 +104,7 @@ public static StorageManager prepareDiskFileTestEnvironment(
107104
storageManager))
108105
.when(storagePolicy)
109106
.createFileWriter(
110-
Mockito.any(),
111-
Mockito.any(),
112-
Mockito.any(),
113-
Mockito.any(),
114-
Mockito.any(),
115-
Mockito.any());
107+
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
116108

117109
return storageManager;
118110
}
@@ -151,20 +143,14 @@ public static StorageManager prepareMemoryFileTestEnvironment(
151143
celebornConf.shuffleRangeReadFilterEnabled(), memoryFileInfo),
152144
numPendingWriters,
153145
flushNotifier,
154-
flushLock,
155146
source,
156147
memoryFileInfo,
157148
StorageInfo.Type.MEMORY,
158149
writerContext,
159150
storageManager))
160151
.when(storagePolicy)
161152
.createFileWriter(
162-
Mockito.any(),
163-
Mockito.any(),
164-
Mockito.any(),
165-
Mockito.any(),
166-
Mockito.any(),
167-
Mockito.any());
153+
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
168154

169155
return storageManager;
170156
}
@@ -232,20 +218,14 @@ public static StorageManager prepareMemoryEvictEnvironment(
232218
celebornConf.shuffleRangeReadFilterEnabled(), memoryFileInfo),
233219
numPendingWriters,
234220
flushNotifier,
235-
flushLock,
236221
source,
237222
memoryFileInfo,
238223
StorageInfo.Type.MEMORY,
239224
writerContext,
240225
storageManager))
241226
.when(storagePolicy)
242227
.createFileWriter(
243-
Mockito.any(),
244-
Mockito.any(),
245-
Mockito.any(),
246-
Mockito.any(),
247-
Mockito.any(),
248-
Mockito.any());
228+
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
249229

250230
Mockito.doAnswer(
251231
invocation ->
@@ -255,7 +235,6 @@ public static StorageManager prepareMemoryEvictEnvironment(
255235
celebornConf.shuffleRangeReadFilterEnabled(), fileInfo),
256236
numPendingWriters,
257237
flushNotifier,
258-
flushLock,
259238
flusher,
260239
source,
261240
fileInfo,
@@ -264,12 +243,7 @@ public static StorageManager prepareMemoryEvictEnvironment(
264243
storageManager))
265244
.when(storagePolicy)
266245
.getEvictedFileWriter(
267-
Mockito.any(),
268-
Mockito.any(),
269-
Mockito.any(),
270-
Mockito.any(),
271-
Mockito.any(),
272-
Mockito.any());
246+
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
273247

274248
return storageManager;
275249
}

0 commit comments

Comments
 (0)