private[storage] classBlockInfo(
// 存储级别vallevel:StorageLevel,
// BlockInfo描述的Block的类型valclassTag:ClassTag[_],
// BlockInfo所描述的Block是否需要告知Master。valtellMaster:Boolean) {
/** * The size of the block (in bytes) * block的大小*/defsize:Long= _size
defsize_=(s: Long):Unit= {
_size = s
private[this] var_size:Long=0/** * The number of times that this block has been locked for reading. * BlockInfo所描述的Block被锁定读取的次数。*/defreaderCount:Int= _readerCount
defreaderCount_=(c: Int):Unit= {
_readerCount = c
private[this] var_readerCount:Int=0/** * 任务尝试在对Block进行写操作前,首先必须获得对应BlockInfo的写锁。 * _writerTask用于保存任务尝试的ID(每个任务在实际执行时,会多次尝试,每次尝试都会分配一个ID)。 * * The task attempt id of the task which currently holds the write lock for this block, or * [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or * [[BlockInfo.NO_WRITER]] if this block is not locked for writing.*/defwriterTask:Long= _writerTask
defwriterTask_=(t: Long):Unit= {
_writerTask = t
private[this] var_writerTask:Long=BlockInfo.NO_WRITERprivatedefcheckInvariants():Unit= {
// A block's reader count must be non-negative:
assert(_readerCount >=0)
// A block is either locked for reading or for writing, but not for both at the same time:
assert(_readerCount ==0|| _writerTask ==BlockInfo.NO_WRITER)
// blockId blockId: BlockId,
// 当存在写锁时是否阻塞 blocking: Boolean=true):Option[BlockInfo] =synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
do {
// 获取对应的BlockInfo
infos.get(blockId) match {
caseNone=>returnNonecaseSome(info) =>// 如果没有写锁,读锁+1if (info.writerTask ==BlockInfo.NO_WRITER) {
info.readerCount +=1// 将currentTaskAttemptId加入读锁队列
logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
// 如果开启阻塞,则等到写锁完成进行读取if (blocking) {
} while (blocking)
blockId: BlockId,
blocking: Boolean=true):Option[BlockInfo] =synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
do {
infos.get(blockId) match {
caseNone=>returnNonecaseSome(info) =>// 如果当前没有写锁,并且没有读锁if (info.writerTask ==BlockInfo.NO_WRITER&& info.readerCount ==0) {
// 将当前TaskAttemptId赋值给_writerTask,表示目前该Block存在写锁
info.writerTask = currentTaskAttemptId
// 添加到写锁map中
writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
// 返回info信息returnSome(info)
// 如果存在写锁或读锁阻塞,知道对方释放if (blocking) {
} while (blocking)
defunlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] =None):Unit=synchronized {
// 获取taskId,如果当前没传入就调用currentTaskAttemptIdvaltaskId= taskAttemptId.getOrElse(currentTaskAttemptId)
logTrace(s"Task $taskId releasing lock for $blockId")
// 获取当前blockInfovalinfo= get(blockId).getOrElse {
thrownewIllegalStateException(s"Block $blockId not found")
// 如果存在写锁if (info.writerTask !=BlockInfo.NO_WRITER) {
// 将writerTask设置为不存在写锁
info.writerTask =BlockInfo.NO_WRITER// 移除写锁在map中的存储
writeLocksByTask.removeBinding(taskId, blockId)
} else {
assert(info.readerCount >0, s"Block $blockId is not locked for reading")
// 读锁可重入减1
info.readerCount -=1// 获取读锁集合valcountsForTask:ConcurrentHashMultiset[BlockId] = readLocksByTask(taskId)
// 移除该锁valnewPinCountForTask:Int= countsForTask.remove(blockId, 1) -1
assert(newPinCountForTask >=0,
s"Task $taskId release lock on block $blockId more times than it acquired it")
// 唤醒全部wait
defdowngradeLock(blockId: BlockId):Unit=synchronized {
logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId")
valinfo= get(blockId).get
require(info.writerTask == currentTaskAttemptId,
s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on"+s" block $blockId")
// 释放写锁
// 添加读锁vallockOutcome= lockForReading(blockId, blocking =false)
blockId: BlockId,
newBlockInfo: BlockInfo):Boolean=synchronized {
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
lockForReading(blockId) match {
// 如果块已经存在,就没必要在获取写锁了caseSome(info) =>// Block already exists. This could happen if another thread races with us to compute// the same block. In this case, just keep the read lock and return.falsecaseNone=>// Block does not yet exist or is removed, so we are free to acquire the write lock
infos(blockId) = newBlockInfo
defremoveBlock(blockId: BlockId):Unit=synchronized {
logTrace(s"Task $currentTaskAttemptId trying to remove block $blockId")
// 获取BlockInfo
infos.get(blockId) match {
caseSome(blockInfo) =>if (blockInfo.writerTask != currentTaskAttemptId) {
s"Task $currentTaskAttemptId called remove() on block $blockId without a write lock")
} else {
// 将block在内存中移除
// 释放读写锁
blockInfo.readerCount =0
blockInfo.writerTask =BlockInfo.NO_WRITER
writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
s"Task $currentTaskAttemptId called remove() on non-existent block $blockId")
// 唤醒全部阻塞操作
private[spark] classDiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolean) extendsLogging {
// 本地子目录个数private[spark] valsubDirsPerLocalDir= conf.getInt("spark.diskStore.subDirectories", 64)
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid * having really large inodes at the top level. */// 本地目录数组,创建本地目录private[spark] vallocalDirs:Array[File] = createLocalDirs(conf)
// 本地目录创建失败if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
// The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content// of subDirs(i) is protected by the lock of subDirs(i)privatevalsubDirs=Array.fill(localDirs.length)(newArray[File](subDirsPerLocalDir))
// 添加shutdown钩子函数privatevalshutdownHook= addShutdownHook()
privatedefcreateLocalDirs(conf: SparkConf):Array[File] = {
// 获取spark.local.dir配置Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>try {
vallocalDir=Utils.createDirectory(rootDir, "blockmgr")
logInfo(s"Created local directory at $localDir")
} catch {
casee: IOException=>
logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
defgetFile(filename: String):File= {
// Figure out which local directory it hashes to, and which subdirectory in that// 获取非负数的hash值valhash=Utils.nonNegativeHash(filename)
// 按照取余方式选中一级目录valdirId= hash % localDirs.length
// 获取subDirIdvalsubDirId= (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already existvalsubDir= subDirs(dirId).synchronized {
// 获取oldSubDirvalold= subDirs(dirId)(subDirId)
if (old !=null) {
} else {
valnewDir=newFile(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() &&!newDir.mkdir()) {
thrownewIOException(s"Failed to create local dir in $newDir.")
subDirs(dirId)(subDirId) = newDir
newFile(subDir, filename)
defgetAllFiles():Seq[File] = {
// Get all the files inside the array of array of directories
subDirs.flatMap { dir =>
dir.synchronized {
// Copy the content of dir because it may be modified in other threads
}.filter(_ !=null).flatMap { dir =>valfiles= dir.listFiles()
if (files !=null) files elseSeq.empty
// lock对内存池提供线程安全保证的锁对象private[memory] abstractclassMemoryPool(lock: Object) {
/** * 内存池大小,单位字节*/@GuardedBy("lock")
private[this] var_poolSize:Long=0/** * Returns the current size of the pool, in bytes.*/finaldefpoolSize:Long= lock.synchronized {
/** * Returns the amount of free memory in the pool, in bytes.*/finaldefmemoryFree:Long= lock.synchronized {
_poolSize - memoryUsed
/** * Expands the pool by `delta` bytes.*/finaldefincrementPoolSize(delta: Long):Unit= lock.synchronized {
require(delta >=0)
_poolSize += delta
/** * Shrinks the pool by `delta` bytes.*/finaldefdecrementPoolSize(delta: Long):Unit= lock.synchronized {
require(delta >=0)
require(delta <= _poolSize)
require(_poolSize - delta >= memoryUsed)
_poolSize -= delta
/** * Returns the amount of used memory in this pool (in bytes).*/defmemoryUsed:Long
private[memory] classStorageMemoryPool(
lock: Object,
memoryMode: MemoryMode
) extendsMemoryPool(lock) withLogging {
/** * Storage内存池名称*/private[this] valpoolName:String= memoryMode match {
caseMemoryMode.ON_HEAP=>"on-heap storage"caseMemoryMode.OFF_HEAP=>"off-heap storage"
private[this] var_memoryUsed:Long=0L/** * 使用内存量 * @return*/overridedefmemoryUsed:Long= lock.synchronized {
/** * memoryStore*/privatevar_memoryStore:MemoryStore= _
defmemoryStore:MemoryStore= {
if (_memoryStore ==null) {
thrownewIllegalStateException("memory store not initialized yet")
/** * Set the [[MemoryStore]] used by this manager to evict cached blocks. * This must be set after construction due to initialization ordering constraints.*/finaldefsetMemoryStore(store: MemoryStore):Unit= {
_memoryStore = store
/** * 申请内存的N bytes换成给定的block * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. * * @return whether all N bytes were successfully granted.*/defacquireMemory(blockId: BlockId, numBytes: Long):Boolean= lock.synchronized {
valnumBytesToFree= math.max(0, numBytes - memoryFree)
acquireMemory(blockId, numBytes, numBytesToFree)
/** * Acquire N bytes of storage memory for the given block, evicting existing ones if necessary. * * @paramblockId the ID of the block we are acquiring storage memory for * @paramnumBytesToAcquire the size of this block * @paramnumBytesToFree the amount of space to be freed through evicting blocks * @return whether all N bytes were successfully granted.*/defacquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long):Boolean= lock.synchronized {
assert(numBytesToAcquire >=0)
assert(numBytesToFree >=0)
assert(memoryUsed <= poolSize)
// 如果numBytesToFree大于0,说明memoryFree内存不足,需要使用内存if (numBytesToFree >0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
// 释放内存后继续判断是否内足够可用内存可以申请// NOTE: If the memory store evicts blocks, then those evictions will synchronously call// back into this StorageMemoryPool in order to free memory. Therefore, these variables// should have been updated.valenoughMemory= numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
defreleaseMemory(size: Long):Unit= lock.synchronized {
if (size > _memoryUsed) {
logWarning(s"Attempted to release $size bytes of storage "+s"memory when we only have ${_memoryUsed} bytes")
_memoryUsed =0
} else {
_memoryUsed -= size
defreleaseAllMemory():Unit= lock.synchronized {
_memoryUsed =0
/** * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes. * Note: this method doesn't actually reduce the pool size but relies on the caller to do so. * * @return number of bytes to be removed from the pool's capacity. * 用于释放指定大小的空间,缩小内存池的大小。*/deffreeSpaceToShrinkPool(spaceToFree: Long):Long= lock.synchronized {
// 计算最小的空闲逻辑内存valspaceFreedByReleasingUnusedMemory= math.min(spaceToFree, memoryFree)
// 计算剩余的空闲内存valremainingSpaceToFree= spaceToFree - spaceFreedByReleasingUnusedMemory
// 如果大于0if (remainingSpaceToFree >0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:// 后收其他block的内存valspaceFreedByEviction=
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
private[spark] classUnifiedMemoryManagerprivate[memory] (
conf: SparkConf,
onHeapStorageRegionSize: Long,
numCores: Int)
maxHeapMemory - onHeapStorageRegionSize) {
/** * 判断内存大小*/privatedefassertInvariants():Unit= {
assert(onHeapExecutionMemoryPool.poolSize + onHeapStorageMemoryPool.poolSize == maxHeapMemory)
offHeapExecutionMemoryPool.poolSize + offHeapStorageMemoryPool.poolSize == maxOffHeapMemory)
/** * 最大堆内内存 * @return*/overridedefmaxOnHeapStorageMemory:Long=synchronized {
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
overridedefmaxOffHeapStorageMemory:Long=synchronized {
maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
/** * Try to acquire up to `numBytes` of execution memory for the current task and return the * number of bytes obtained, or 0 if none can be allocated. * * This call may block until there is enough free memory in some situations, to make sure each * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of * active tasks) before it is forced to spill. This can happen if the number of tasks increase * but an older task had a lot of memory already.*/overrideprivate[memory] defacquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode):Long=synchronized {
assert(numBytes >=0)
val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
caseMemoryMode.ON_HEAP=> (
caseMemoryMode.OFF_HEAP=> (
/** * 申请storge区域的内存到execution中使用 * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. * * When acquiring memory for a task, the execution pool may need to make multiple * attempts. Each attempt must be able to evict storage in case another task jumps in * and caches a large block between the attempts. This is called once per attempt.*/defmaybeGrowExecutionPool(extraMemoryNeeded: Long):Unit= {
if (extraMemoryNeeded >0) {
// There is not enough free memory in the execution pool, so try to reclaim memory from// storage. We can reclaim any free memory from the storage pool. If the storage pool// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim// the memory that storage has borrowed from execution.valmemoryReclaimableFromStorage:Long= math.max(
storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage >0) {
// Only reclaim as much space as is necessary and available:valspaceToReclaim= storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
/** * The size the execution pool would have after evicting storage memory. * * The execution memory pool divides this quantity among the active tasks evenly to cap * the execution memory allocation for each task. It is important to keep this greater * than the execution pool size, which doesn't take into account potential memory that * could be freed by evicting storage. Otherwise we may hit SPARK-12155. * * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness * in execution memory allocation across tasks, Otherwise, a task may occupy more than * its fair share of execution memory, mistakenly thinking that other tasks can acquire * the portion of storage memory that cannot be evicted.*/defcomputeMaxExecutionPoolSize():Long= {
maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode):Boolean=synchronized {
assert(numBytes >=0)
val (executionPool, storagePool, maxMemory) = memoryMode match {
caseMemoryMode.ON_HEAP=> (
caseMemoryMode.OFF_HEAP=> (
// 超过最大内存限制if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our "+s"memory limit ($maxMemory bytes)")
// 如果大于storagePool的free内存if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from// the execution pool.// 尝试区execution申请内存// fixme 这里storage区域不足,去申请execution区域内存,但是这里没有校验execution+storge的free内存是否满足申请,如果不满足还需要走到最终storagePool后才能感知,// fixme 然后去尝试回收其他block的内存,为什么不能在这里直接就进行尝试,回收其他block,链路不用在走下去valmemoryBorrowedFromExecution=Math.min(executionPool.memoryFree,
numBytes - storagePool.memoryFree)
storagePool.acquireMemory(blockId, numBytes)
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode):Boolean=synchronized {
acquireStorageMemory(blockId, numBytes, memoryMode)
objectUnifiedMemoryManager {
// Set aside a fixed amount of memory for non-storage, non-execution purposes.// This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve// sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then// the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default.privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024defapply(conf: SparkConf, numCores: Int):UnifiedMemoryManager= {
valmaxMemory= getMaxMemory(conf)
maxHeapMemory = maxMemory,
onHeapStorageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
numCores = numCores)
/** * 返回storage和execution区域共享内存部分 * Return the total amount of memory shared between execution and storage, in bytes.*/privatedefgetMaxMemory(conf: SparkConf):Long= {
valsystemMemory= conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
valreservedMemory= conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0elseRESERVED_SYSTEM_MEMORY_BYTES)
valminSystemMemory= (reservedMemory *1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
thrownewIllegalArgumentException(s"System memory $systemMemory must "+s"be at least $minSystemMemory. Please increase heap size using the --driver-memory "+s"option or spark.driver.memory in Spark configuration.")
// SPARK-12759 Check executor memory to fail fast if memory is insufficientif (conf.contains("spark.executor.memory")) {
// executor内存valexecutorMemory= conf.getSizeAsBytes("spark.executor.memory")
if (executorMemory < minSystemMemory) {
thrownewIllegalArgumentException(s"Executor memory $executorMemory must be at least "+s"$minSystemMemory. Please increase executor memory using the "+s"--executor-memory option or spark.executor.memory in Spark configuration.")
valusableMemory= systemMemory - reservedMemory
valmemoryFraction= conf.getDouble("spark.memory.fraction", 0.6)
(usableMemory * memoryFraction).toLong
defgetLocalBytes(blockId: BlockId):Option[BlockData] = {
logDebug(s"Getting local block $blockId as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it// without acquiring a lock; the disk store never deletes (recent) items so this should work// 如果当前block是shuffleBlock,获取shuffleBlock解析器if (blockId.isShuffle) {
valshuffleBlockResolver= shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently// downstream code will throw an exception.// 封装成ChunkedByteBuffervalbuf=newChunkedByteBuffer(
Some(newByteBufferBlockData(buf, true))
} else {
// 添加读锁,从本地获取bytes,然后解析BlockInfo解析为对应的BlockData
blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
//shuffleBlockoverridedefgetBlockData(blockId: ShuffleBlockId):ManagedBuffer= {
// The block is actually going to be a range of a single map output file for this map, so// find out the consolidated file, then the offset within that from our index// 获取IndexFilevalindexFile= getIndexFile(blockId.shuffleId, blockId.mapId)
// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code// which is incorrectly using our file descriptor then this code will fetch the wrong offsets// (which may cause a reducer to be sent a different reducer's data). The explicit position// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this// class of issue from re-occurring in the future which is why they are left here even though// SPARK-22982 is fixed.valchannel=Files.newByteChannel(indexFile.toPath)
channel.position(blockId.reduceId *8L)
try {
valoffset= in.readLong()
valnextOffset= in.readLong()
valactualPosition= channel.position()
valexpectedPosition= blockId.reduceId *8L+16if (actualPosition != expectedPosition) {
thrownewException(s"SPARK-22982: Incorrect channel position after index file reads: "+s"expected $expectedPosition but actual position was $actualPosition.")
// 获取dataFile
getDataFile(blockId.shuffleId, blockId.mapId),
nextOffset - offset)
} finally {
// 如果不是shuffleBlockprivatedefdoGetLocalBytes(blockId: BlockId, info: BlockInfo):BlockData= {
// 获取存储级别,根据不同的存储级别判断从diskStore或memoryStore获取BlockDatavallevel= info.level
logDebug(s"Level for block $blockId is $level")
// In order, try to read the serialized bytes from memory, then from disk, then fall back to// serializing in-memory objects, and, finally, throw an exception if the block does not exist.if (level.deserialized) {
// Try to avoid expensive serialization by reading a pre-serialized copy from disk:if (level.useDisk && diskStore.contains(blockId)) {
// Note: we purposely do not try to put the block back into memory here. Since this branch// handles deserialized blocks, this block may only be cached in memory as objects, not// serialized bytes. Because the caller only requested bytes, it doesn't make sense to// cache the block's deserialized objects since that caching may not have a payoff.
} elseif (level.useMemory && memoryStore.contains(blockId)) {
// The block was not found on disk, so serialize an in-memory copy:newByteBufferBlockData(serializerManager.dataSerializeWithExplicitClassTag(
blockId, memoryStore.getValues(blockId).get, info.classTag), true)
} else {
// memoryStore没有数据
} else { // storage level is serializedif (level.useMemory && memoryStore.contains(blockId)) {
newByteBufferBlockData(memoryStore.getBytes(blockId).get, false)
} elseif (level.useDisk && diskStore.contains(blockId)) {
valdiskData= diskStore.getBytes(blockId)
maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
.map(newByteBufferBlockData(_, false))
} else {
overridedefgetBlockData(blockId: BlockId):ManagedBuffer= {
// 如果是shuffleBlockif (blockId.isShuffle) {
} else {
// 获取LocalBlockData
getLocalBytes(blockId) match {
caseSome(blockData) =>newBlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
// 如果从找不到local BlockDatacaseNone=>// If this block manager receives a request for a block that it doesn't have then it's// likely that the master has outdated block statuses for this block. Therefore, we send// an RPC so that this block is marked as being unavailable from this block manager.// 上报BlockStatus
reportBlockStatus(blockId, BlockStatus.empty)
blockId: BlockId,
status: BlockStatus,
droppedMemorySize: Long=0L):Unit= {
// 上报失败,需要重新注册valneedReregister=!tryToReportBlockStatus(blockId, status, droppedMemorySize)
if (needReregister) {
logInfo(s"Got told to re-register updating block $blockId")
// Re-registering will report our new block for free.
logDebug(s"Told master about block $blockId")
defgetMatchingBlockIds(filter: BlockId=>Boolean):Seq[BlockId] = {
// The `toArray` is necessary here in order to force the list to be materialized so that we// don't try to serialize a lazy iterator when responding to client requests.// info中全部的blockId 获取全部disk中的block
( ++ diskBlockManager.getAllBlocks())
privatedefremoveRdd(rddId: Int):Future[Seq[Int]] = {
// First remove the metadata for the given RDD, and then asynchronously remove the blocks// from the slaves.// Find all blocks for the given RDD, remove the block from both blockLocations and// the blockManagerInfo that is tracking the blocks.valblocks= blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocks.foreach { blockId =>valbms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
// Ask the slaves to remove the RDD, and put the result in a sequence of Futures.// The dispatcher is used as an implicit argument into the Future sequence construction.valremoveMsg=RemoveRdd(rddId)
// 遍历locations,向每个节点的BlockManagerSlaveEndpoint发送RemoveBlock消息。valfutures= { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
casee: IOException=>
logWarning(s"Error trying to remove RDD $rddId from block manager ${bm.blockManagerId}",
0// zero blocks were removed
// slavecaseRemoveRdd(rddId) =>
doAsync[Int]("removing RDD "+ rddId, context) {
// 移除rdd