You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
private[storage] classBlockInfo(
// 存储级别vallevel:StorageLevel,
// BlockInfo描述的Block的类型valclassTag:ClassTag[_],
// BlockInfo所描述的Block是否需要告知Master。valtellMaster:Boolean) {
/** * The size of the block (in bytes) * block的大小*/defsize:Long= _size
defsize_=(s: Long):Unit= {
_size = s
checkInvariants()
}
private[this] var_size:Long=0/** * The number of times that this block has been locked for reading. * BlockInfo所描述的Block被锁定读取的次数。*/defreaderCount:Int= _readerCount
defreaderCount_=(c: Int):Unit= {
_readerCount = c
checkInvariants()
}
private[this] var_readerCount:Int=0/** * 任务尝试在对Block进行写操作前,首先必须获得对应BlockInfo的写锁。 * _writerTask用于保存任务尝试的ID(每个任务在实际执行时,会多次尝试,每次尝试都会分配一个ID)。 * * The task attempt id of the task which currently holds the write lock for this block, or * [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or * [[BlockInfo.NO_WRITER]] if this block is not locked for writing.*/defwriterTask:Long= _writerTask
defwriterTask_=(t: Long):Unit= {
_writerTask = t
checkInvariants()
}
private[this] var_writerTask:Long=BlockInfo.NO_WRITERprivatedefcheckInvariants():Unit= {
// A block's reader count must be non-negative:
assert(_readerCount >=0)
// A block is either locked for reading or for writing, but not for both at the same time:
assert(_readerCount ==0|| _writerTask ==BlockInfo.NO_WRITER)
}
checkInvariants()
}
deflockForReading(
// blockId blockId: BlockId,
// 当存在写锁时是否阻塞 blocking: Boolean=true):Option[BlockInfo] =synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
do {
// 获取对应的BlockInfo
infos.get(blockId) match {
caseNone=>returnNonecaseSome(info) =>// 如果没有写锁,读锁+1if (info.writerTask ==BlockInfo.NO_WRITER) {
info.readerCount +=1// 将currentTaskAttemptId加入读锁队列
readLocksByTask(currentTaskAttemptId).add(blockId)
logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
returnSome(info)
}
}
// 如果开启阻塞,则等到写锁完成进行读取if (blocking) {
wait()
}
} while (blocking)
None
}
lockForWriting
deflockForWriting(
blockId: BlockId,
blocking: Boolean=true):Option[BlockInfo] =synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
do {
infos.get(blockId) match {
caseNone=>returnNonecaseSome(info) =>// 如果当前没有写锁,并且没有读锁if (info.writerTask ==BlockInfo.NO_WRITER&& info.readerCount ==0) {
// 将当前TaskAttemptId赋值给_writerTask,表示目前该Block存在写锁
info.writerTask = currentTaskAttemptId
// 添加到写锁map中
writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
// 返回info信息returnSome(info)
}
}
// 如果存在写锁或读锁阻塞,知道对方释放if (blocking) {
wait()
}
} while (blocking)
None
}
unlock
defunlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] =None):Unit=synchronized {
// 获取taskId,如果当前没传入就调用currentTaskAttemptIdvaltaskId= taskAttemptId.getOrElse(currentTaskAttemptId)
logTrace(s"Task $taskId releasing lock for $blockId")
// 获取当前blockInfovalinfo= get(blockId).getOrElse {
thrownewIllegalStateException(s"Block $blockId not found")
}
// 如果存在写锁if (info.writerTask !=BlockInfo.NO_WRITER) {
// 将writerTask设置为不存在写锁
info.writerTask =BlockInfo.NO_WRITER// 移除写锁在map中的存储
writeLocksByTask.removeBinding(taskId, blockId)
} else {
assert(info.readerCount >0, s"Block $blockId is not locked for reading")
// 读锁可重入减1
info.readerCount -=1// 获取读锁集合valcountsForTask:ConcurrentHashMultiset[BlockId] = readLocksByTask(taskId)
// 移除该锁valnewPinCountForTask:Int= countsForTask.remove(blockId, 1) -1
assert(newPinCountForTask >=0,
s"Task $taskId release lock on block $blockId more times than it acquired it")
}
// 唤醒全部wait
notifyAll()
}
downgradeLock
锁降级,写锁变成读锁
defdowngradeLock(blockId: BlockId):Unit=synchronized {
logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId")
valinfo= get(blockId).get
require(info.writerTask == currentTaskAttemptId,
s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on"+s" block $blockId")
// 释放写锁
unlock(blockId)
// 添加读锁vallockOutcome= lockForReading(blockId, blocking =false)
assert(lockOutcome.isDefined)
}
lockNewBlockForWriting
deflockNewBlockForWriting(
blockId: BlockId,
newBlockInfo: BlockInfo):Boolean=synchronized {
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
lockForReading(blockId) match {
// 如果块已经存在,就没必要在获取写锁了caseSome(info) =>// Block already exists. This could happen if another thread races with us to compute// the same block. In this case, just keep the read lock and return.falsecaseNone=>// Block does not yet exist or is removed, so we are free to acquire the write lock
infos(blockId) = newBlockInfo
lockForWriting(blockId)
true
}
}
removeBlock
defremoveBlock(blockId: BlockId):Unit=synchronized {
logTrace(s"Task $currentTaskAttemptId trying to remove block $blockId")
// 获取BlockInfo
infos.get(blockId) match {
caseSome(blockInfo) =>if (blockInfo.writerTask != currentTaskAttemptId) {
thrownewIllegalStateException(
s"Task $currentTaskAttemptId called remove() on block $blockId without a write lock")
} else {
// 将block在内存中移除
infos.remove(blockId)
// 释放读写锁
blockInfo.readerCount =0
blockInfo.writerTask =BlockInfo.NO_WRITER
writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
}
caseNone=>thrownewIllegalArgumentException(
s"Task $currentTaskAttemptId called remove() on non-existent block $blockId")
}
// 唤醒全部阻塞操作
notifyAll()
}
DiskBlockManager
负责为逻辑的Block与数据写入磁盘的位置之间建立逻辑的映射关系。
相关属性
private[spark] classDiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolean) extendsLogging {
// 本地子目录个数private[spark] valsubDirsPerLocalDir= conf.getInt("spark.diskStore.subDirectories", 64)
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid * having really large inodes at the top level. */// 本地目录数组,创建本地目录private[spark] vallocalDirs:Array[File] = createLocalDirs(conf)
// 本地目录创建失败if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
// The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content// of subDirs(i) is protected by the lock of subDirs(i)privatevalsubDirs=Array.fill(localDirs.length)(newArray[File](subDirsPerLocalDir))
// 添加shutdown钩子函数privatevalshutdownHook= addShutdownHook()
本地目录结构
createLocalDirs
privatedefcreateLocalDirs(conf: SparkConf):Array[File] = {
// 获取spark.local.dir配置Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>try {
vallocalDir=Utils.createDirectory(rootDir, "blockmgr")
logInfo(s"Created local directory at $localDir")
Some(localDir)
} catch {
casee: IOException=>
logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
None
}
}
}
defgetFile(filename: String):File= {
// Figure out which local directory it hashes to, and which subdirectory in that// 获取非负数的hash值valhash=Utils.nonNegativeHash(filename)
// 按照取余方式选中一级目录valdirId= hash % localDirs.length
// 获取subDirIdvalsubDirId= (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already existvalsubDir= subDirs(dirId).synchronized {
// 获取oldSubDirvalold= subDirs(dirId)(subDirId)
if (old !=null) {
old
} else {
valnewDir=newFile(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() &&!newDir.mkdir()) {
thrownewIOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}
newFile(subDir, filename)
}
defgetAllFiles():Seq[File] = {
// Get all the files inside the array of array of directories
subDirs.flatMap { dir =>
dir.synchronized {
// Copy the content of dir because it may be modified in other threads
dir.clone()
}
}.filter(_ !=null).flatMap { dir =>valfiles= dir.listFiles()
if (files !=null) files elseSeq.empty
}
}
// lock对内存池提供线程安全保证的锁对象private[memory] abstractclassMemoryPool(lock: Object) {
/** * 内存池大小,单位字节*/@GuardedBy("lock")
private[this] var_poolSize:Long=0/** * Returns the current size of the pool, in bytes.*/finaldefpoolSize:Long= lock.synchronized {
_poolSize
}
/** * Returns the amount of free memory in the pool, in bytes.*/finaldefmemoryFree:Long= lock.synchronized {
_poolSize - memoryUsed
}
/** * Expands the pool by `delta` bytes.*/finaldefincrementPoolSize(delta: Long):Unit= lock.synchronized {
require(delta >=0)
_poolSize += delta
}
/** * Shrinks the pool by `delta` bytes.*/finaldefdecrementPoolSize(delta: Long):Unit= lock.synchronized {
require(delta >=0)
require(delta <= _poolSize)
require(_poolSize - delta >= memoryUsed)
_poolSize -= delta
}
/** * Returns the amount of used memory in this pool (in bytes).*/defmemoryUsed:Long
内存模型
PoolSize
memoryUsed
memoryFree
StorageMemoryPool
存储物理内存的逻辑抽象,通过对存储内存的逻辑管理,提高Spark存储体系对内存的使用效率。
private[memory] classStorageMemoryPool(
lock: Object,
memoryMode: MemoryMode
) extendsMemoryPool(lock) withLogging {
/** * Storage内存池名称*/private[this] valpoolName:String= memoryMode match {
caseMemoryMode.ON_HEAP=>"on-heap storage"caseMemoryMode.OFF_HEAP=>"off-heap storage"
}
@GuardedBy("lock")
private[this] var_memoryUsed:Long=0L/** * 使用内存量 * @return*/overridedefmemoryUsed:Long= lock.synchronized {
_memoryUsed
}
/** * memoryStore*/privatevar_memoryStore:MemoryStore= _
defmemoryStore:MemoryStore= {
if (_memoryStore ==null) {
thrownewIllegalStateException("memory store not initialized yet")
}
_memoryStore
}
/** * Set the [[MemoryStore]] used by this manager to evict cached blocks. * This must be set after construction due to initialization ordering constraints.*/finaldefsetMemoryStore(store: MemoryStore):Unit= {
_memoryStore = store
}
/** * 申请内存的N bytes换成给定的block * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. * * @return whether all N bytes were successfully granted.*/defacquireMemory(blockId: BlockId, numBytes: Long):Boolean= lock.synchronized {
valnumBytesToFree= math.max(0, numBytes - memoryFree)
acquireMemory(blockId, numBytes, numBytesToFree)
}
/** * Acquire N bytes of storage memory for the given block, evicting existing ones if necessary. * * @paramblockId the ID of the block we are acquiring storage memory for * @paramnumBytesToAcquire the size of this block * @paramnumBytesToFree the amount of space to be freed through evicting blocks * @return whether all N bytes were successfully granted.*/defacquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long):Boolean= lock.synchronized {
assert(numBytesToAcquire >=0)
assert(numBytesToFree >=0)
assert(memoryUsed <= poolSize)
// 如果numBytesToFree大于0,说明memoryFree内存不足,需要使用内存if (numBytesToFree >0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
}
// 释放内存后继续判断是否内足够可用内存可以申请// NOTE: If the memory store evicts blocks, then those evictions will synchronously call// back into this StorageMemoryPool in order to free memory. Therefore, these variables// should have been updated.valenoughMemory= numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
}
enoughMemory
}
defreleaseMemory(size: Long):Unit= lock.synchronized {
if (size > _memoryUsed) {
logWarning(s"Attempted to release $size bytes of storage "+s"memory when we only have ${_memoryUsed} bytes")
_memoryUsed =0
} else {
_memoryUsed -= size
}
}
defreleaseAllMemory():Unit= lock.synchronized {
_memoryUsed =0
}
/** * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes. * Note: this method doesn't actually reduce the pool size but relies on the caller to do so. * * @return number of bytes to be removed from the pool's capacity. * 用于释放指定大小的空间,缩小内存池的大小。*/deffreeSpaceToShrinkPool(spaceToFree: Long):Long= lock.synchronized {
// 计算最小的空闲逻辑内存valspaceFreedByReleasingUnusedMemory= math.min(spaceToFree, memoryFree)
// 计算剩余的空闲内存valremainingSpaceToFree= spaceToFree - spaceFreedByReleasingUnusedMemory
// 如果大于0if (remainingSpaceToFree >0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:// 后收其他block的内存valspaceFreedByEviction=
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
}
}
}
private[spark] classUnifiedMemoryManagerprivate[memory] (
conf: SparkConf,
valmaxHeapMemory:Long,
onHeapStorageRegionSize: Long,
numCores: Int)
extendsMemoryManager(
conf,
numCores,
onHeapStorageRegionSize,
maxHeapMemory - onHeapStorageRegionSize) {
/** * 判断内存大小*/privatedefassertInvariants():Unit= {
assert(onHeapExecutionMemoryPool.poolSize + onHeapStorageMemoryPool.poolSize == maxHeapMemory)
assert(
offHeapExecutionMemoryPool.poolSize + offHeapStorageMemoryPool.poolSize == maxOffHeapMemory)
}
assertInvariants()
/** * 最大堆内内存 * @return*/overridedefmaxOnHeapStorageMemory:Long=synchronized {
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}
overridedefmaxOffHeapStorageMemory:Long=synchronized {
maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}
/** * Try to acquire up to `numBytes` of execution memory for the current task and return the * number of bytes obtained, or 0 if none can be allocated. * * This call may block until there is enough free memory in some situations, to make sure each * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of * active tasks) before it is forced to spill. This can happen if the number of tasks increase * but an older task had a lot of memory already.*/overrideprivate[memory] defacquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode):Long=synchronized {
assertInvariants()
assert(numBytes >=0)
val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
caseMemoryMode.ON_HEAP=> (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
onHeapStorageRegionSize,
maxHeapMemory)
caseMemoryMode.OFF_HEAP=> (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
offHeapStorageMemory,
maxOffHeapMemory)
}
/** * 申请storge区域的内存到execution中使用 * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. * * When acquiring memory for a task, the execution pool may need to make multiple * attempts. Each attempt must be able to evict storage in case another task jumps in * and caches a large block between the attempts. This is called once per attempt.*/defmaybeGrowExecutionPool(extraMemoryNeeded: Long):Unit= {
if (extraMemoryNeeded >0) {
// There is not enough free memory in the execution pool, so try to reclaim memory from// storage. We can reclaim any free memory from the storage pool. If the storage pool// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim// the memory that storage has borrowed from execution.valmemoryReclaimableFromStorage:Long= math.max(
storagePool.memoryFree,
storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage >0) {
// Only reclaim as much space as is necessary and available:valspaceToReclaim= storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storagePool.decrementPoolSize(spaceToReclaim)
executionPool.incrementPoolSize(spaceToReclaim)
}
}
}
/** * The size the execution pool would have after evicting storage memory. * * The execution memory pool divides this quantity among the active tasks evenly to cap * the execution memory allocation for each task. It is important to keep this greater * than the execution pool size, which doesn't take into account potential memory that * could be freed by evicting storage. Otherwise we may hit SPARK-12155. * * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness * in execution memory allocation across tasks, Otherwise, a task may occupy more than * its fair share of execution memory, mistakenly thinking that other tasks can acquire * the portion of storage memory that cannot be evicted.*/defcomputeMaxExecutionPoolSize():Long= {
maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
}
executionPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
}
overridedefacquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode):Boolean=synchronized {
assertInvariants()
assert(numBytes >=0)
val (executionPool, storagePool, maxMemory) = memoryMode match {
caseMemoryMode.ON_HEAP=> (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
caseMemoryMode.OFF_HEAP=> (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapStorageMemory)
}
// 超过最大内存限制if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our "+s"memory limit ($maxMemory bytes)")
returnfalse
}
// 如果大于storagePool的free内存if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from// the execution pool.// 尝试区execution申请内存// fixme 这里storage区域不足,去申请execution区域内存,但是这里没有校验execution+storge的free内存是否满足申请,如果不满足还需要走到最终storagePool后才能感知,// fixme 然后去尝试回收其他block的内存,为什么不能在这里直接就进行尝试,回收其他block,链路不用在走下去valmemoryBorrowedFromExecution=Math.min(executionPool.memoryFree,
numBytes - storagePool.memoryFree)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
storagePool.acquireMemory(blockId, numBytes)
}
overridedefacquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode):Boolean=synchronized {
acquireStorageMemory(blockId, numBytes, memoryMode)
}
}
objectUnifiedMemoryManager {
// Set aside a fixed amount of memory for non-storage, non-execution purposes.// This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve// sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then// the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default.privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024defapply(conf: SparkConf, numCores: Int):UnifiedMemoryManager= {
valmaxMemory= getMaxMemory(conf)
newUnifiedMemoryManager(
conf,
maxHeapMemory = maxMemory,
onHeapStorageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
numCores = numCores)
}
/** * 返回storage和execution区域共享内存部分 * Return the total amount of memory shared between execution and storage, in bytes.*/privatedefgetMaxMemory(conf: SparkConf):Long= {
valsystemMemory= conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
valreservedMemory= conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0elseRESERVED_SYSTEM_MEMORY_BYTES)
valminSystemMemory= (reservedMemory *1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
thrownewIllegalArgumentException(s"System memory $systemMemory must "+s"be at least $minSystemMemory. Please increase heap size using the --driver-memory "+s"option or spark.driver.memory in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficientif (conf.contains("spark.executor.memory")) {
// executor内存valexecutorMemory= conf.getSizeAsBytes("spark.executor.memory")
if (executorMemory < minSystemMemory) {
thrownewIllegalArgumentException(s"Executor memory $executorMemory must be at least "+s"$minSystemMemory. Please increase executor memory using the "+s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
valusableMemory= systemMemory - reservedMemory
valmemoryFraction= conf.getDouble("spark.memory.fraction", 0.6)
(usableMemory * memoryFraction).toLong
}
}
defgetLocalBytes(blockId: BlockId):Option[BlockData] = {
logDebug(s"Getting local block $blockId as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it// without acquiring a lock; the disk store never deletes (recent) items so this should work// 如果当前block是shuffleBlock,获取shuffleBlock解析器if (blockId.isShuffle) {
valshuffleBlockResolver= shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently// downstream code will throw an exception.// 封装成ChunkedByteBuffervalbuf=newChunkedByteBuffer(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Some(newByteBufferBlockData(buf, true))
} else {
// 添加读锁,从本地获取bytes,然后解析BlockInfo解析为对应的BlockData
blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
}
}
//shuffleBlockoverridedefgetBlockData(blockId: ShuffleBlockId):ManagedBuffer= {
// The block is actually going to be a range of a single map output file for this map, so// find out the consolidated file, then the offset within that from our index// 获取IndexFilevalindexFile= getIndexFile(blockId.shuffleId, blockId.mapId)
// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code// which is incorrectly using our file descriptor then this code will fetch the wrong offsets// (which may cause a reducer to be sent a different reducer's data). The explicit position// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this// class of issue from re-occurring in the future which is why they are left here even though// SPARK-22982 is fixed.valchannel=Files.newByteChannel(indexFile.toPath)
channel.position(blockId.reduceId *8L)
valin=newDataInputStream(Channels.newInputStream(channel))
try {
valoffset= in.readLong()
valnextOffset= in.readLong()
valactualPosition= channel.position()
valexpectedPosition= blockId.reduceId *8L+16if (actualPosition != expectedPosition) {
thrownewException(s"SPARK-22982: Incorrect channel position after index file reads: "+s"expected $expectedPosition but actual position was $actualPosition.")
}
newFileSegmentManagedBuffer(
transportConf,
// 获取dataFile
getDataFile(blockId.shuffleId, blockId.mapId),
offset,
nextOffset - offset)
} finally {
in.close()
}
}
// 如果不是shuffleBlockprivatedefdoGetLocalBytes(blockId: BlockId, info: BlockInfo):BlockData= {
// 获取存储级别,根据不同的存储级别判断从diskStore或memoryStore获取BlockDatavallevel= info.level
logDebug(s"Level for block $blockId is $level")
// In order, try to read the serialized bytes from memory, then from disk, then fall back to// serializing in-memory objects, and, finally, throw an exception if the block does not exist.if (level.deserialized) {
// Try to avoid expensive serialization by reading a pre-serialized copy from disk:if (level.useDisk && diskStore.contains(blockId)) {
// Note: we purposely do not try to put the block back into memory here. Since this branch// handles deserialized blocks, this block may only be cached in memory as objects, not// serialized bytes. Because the caller only requested bytes, it doesn't make sense to// cache the block's deserialized objects since that caching may not have a payoff.
diskStore.getBytes(blockId)
} elseif (level.useMemory && memoryStore.contains(blockId)) {
// The block was not found on disk, so serialize an in-memory copy:newByteBufferBlockData(serializerManager.dataSerializeWithExplicitClassTag(
blockId, memoryStore.getValues(blockId).get, info.classTag), true)
} else {
// memoryStore没有数据
handleLocalReadFailure(blockId)
}
} else { // storage level is serializedif (level.useMemory && memoryStore.contains(blockId)) {
newByteBufferBlockData(memoryStore.getBytes(blockId).get, false)
} elseif (level.useDisk && diskStore.contains(blockId)) {
valdiskData= diskStore.getBytes(blockId)
maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
.map(newByteBufferBlockData(_, false))
.getOrElse(diskData)
} else {
handleLocalReadFailure(blockId)
}
}
}
getBlockData
overridedefgetBlockData(blockId: BlockId):ManagedBuffer= {
// 如果是shuffleBlockif (blockId.isShuffle) {
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
// 获取LocalBlockData
getLocalBytes(blockId) match {
caseSome(blockData) =>newBlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
// 如果从找不到local BlockDatacaseNone=>// If this block manager receives a request for a block that it doesn't have then it's// likely that the master has outdated block statuses for this block. Therefore, we send// an RPC so that this block is marked as being unavailable from this block manager.// 上报BlockStatus
reportBlockStatus(blockId, BlockStatus.empty)
thrownewBlockNotFoundException(blockId.toString)
}
}
}
//reportBlockStatusprivatedefreportBlockStatus(
blockId: BlockId,
status: BlockStatus,
droppedMemorySize: Long=0L):Unit= {
// 上报失败,需要重新注册valneedReregister=!tryToReportBlockStatus(blockId, status, droppedMemorySize)
if (needReregister) {
logInfo(s"Got told to re-register updating block $blockId")
// Re-registering will report our new block for free.
asyncReregister()
}
logDebug(s"Told master about block $blockId")
}
defgetMatchingBlockIds(filter: BlockId=>Boolean):Seq[BlockId] = {
// The `toArray` is necessary here in order to force the list to be materialized so that we// don't try to serialize a lazy iterator when responding to client requests.// info中全部的blockId 获取全部disk中的block
(blockInfoManager.entries.map(_._1) ++ diskBlockManager.getAllBlocks())
.filter(filter)
.toArray
.toSeq
}
privatedefremoveRdd(rddId: Int):Future[Seq[Int]] = {
// First remove the metadata for the given RDD, and then asynchronously remove the blocks// from the slaves.// Find all blocks for the given RDD, remove the block from both blockLocations and// the blockManagerInfo that is tracking the blocks.valblocks= blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocks.foreach { blockId =>valbms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
blockLocations.remove(blockId)
}
// Ask the slaves to remove the RDD, and put the result in a sequence of Futures.// The dispatcher is used as an implicit argument into the Future sequence construction.valremoveMsg=RemoveRdd(rddId)
// 遍历locations,向每个节点的BlockManagerSlaveEndpoint发送RemoveBlock消息。valfutures= blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
casee: IOException=>
logWarning(s"Error trying to remove RDD $rddId from block manager ${bm.blockManagerId}",
e)
0// zero blocks were removed
}
}.toSeq
Future.sequence(futures)
}
// slavecaseRemoveRdd(rddId) =>
doAsync[Int]("removing RDD "+ rddId, context) {
// 移除rdd
blockManager.removeRdd(rddId)
}