Spark2.x精通:Executor端BlockManager源码剖析
一、概述
BlockManager是分布式块存储管理。核心机制是每个节点存储自己的内存空间和磁盘空间。BlockManagerMaster负责与其他节点的BlockManager通信并负责块在节点间的复制。BlockInfoManager负责管理块的元数据并提供读写锁的功能。当从本地的BlockManager获取不到块时,从远程节点Fetcher这个Block数据块。
上篇文章我们已经讲了Driver端的BlockManagerMaster,这里我们结合源码剖析Executor端的BlockManager。
二、Executor端BlockManager源码剖析
下面我们就来说下他的几个比较核心的方法:
1.Executor 在启动的时候,一定会实例化 BlockManager,我们这里先去看下initialize()函数,代码如下:
def initialize(appId: String): Unit = {//初始化BlockTransferService,之前我们也说过,它主要是用于跨界点的数据传输blockTransferService.init(this)// 初始化ShuffleClient,读取其他executor上的shuffle文件的客户端shuffleClient.init(appId)//初始化 block复制分片策略 blockReplicationPolicy, 可以通过参数 spark.storage.replication.policy 来指定,// 默认为 RandomBlockReplicationPolicyblockReplicationPolicy = {val priorityClass = conf.get("spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)val clazz = Utils.classForName(priorityClass)val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]logInfo(s"Using $priorityClass for block replication policy")ret}//初始化BlockManagerIdval id =BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)//向driver注册该BlockManager,消息会在BlockManagerMaster端进行处理val idFromMaster = master.registerBlockManager(id,maxOnHeapMemory,maxOffHeapMemory,slaveEndpoint)blockManagerId = if (idFromMaster != null) idFromMaster else id//如果启用了Shuffle辅助服务,用于Shuffle效率,这个由参数启用External shuffle Service服务控制,默认为false//如果启用External shuffle Service服务,这里进行对应的初始化shuffleServerId = if (externalShuffleServiceEnabled) {logInfo(s"external shuffle service port = $externalShuffleServicePort")BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)} else {blockManagerId}// Register Executors' configuration with the local shuffle service, if one should exist.if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()}logInfo(s"Initialized BlockManager: $blockManagerId")}
2.向Driver端注册BlockManager,代码如下:
/*** Re-register with the master and report all blocks to it. This will be called by the heart beat* thread if our heartbeat to the block manager indicates that we were not registered.** Note that this method must be called without any BlockInfo locks held.*///向Driver端注册BlockManager节点,def reregister(): Unit = {// TODO: We might need to rate limit re-registering.logInfo(s"BlockManager $blockManagerId re-registering with master")master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint)//向Driver更新blockStatus信息reportAllBlocks()}/*** Re-register with the master sometime soon.*/private def asyncReregister(): Unit = {asyncReregisterLock.synchronized {if (asyncReregisterTask == null) {asyncReregisterTask = Future[Unit] {// This is a blocking action and should run in futureExecutionContext which is a cached// thread poolreregister()asyncReregisterLock.synchronized {asyncReregisterTask = null}}(futureExecutionContext)}}}
3.读Block数据入口get()函数,代码如下:
def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = { //先从本地获取,调用getLocalValues方法 val local = getLocalValues(blockId) if (local.isDefined) { logInfo(s"Found block $blockId locally") return local } //如果本地获取不到,调用getRemoteValues方法远程获取 val remote = getRemoteValues[T](blockId) if (remote.isDefined) { logInfo(s"Found block $blockId remotely") return remote } None }
3.1 这里先来看本地获取getLocalValues,代码如下:
def getLocalValues(blockId: BlockId): Option[BlockResult] = { logDebug(s"Getting local block $blockId") //从blockInfoManager中获取元数据信息 blockInfoManager.lockForReading(blockId) match { case None => logDebug(s"Block $blockId was not found") None case Some(info) => //StorageLevel,按优先内存、其次磁盘的顺序考虑 val level = info.level logDebug(s"Level for block $blockId is $level") val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId()) //如果数据是保存在内存中,则通过memoryStore直接从内存获取 if (level.useMemory && memoryStore.contains(blockId)) { //这里判断是否有数据的序列化 val iter: Iterator[Any] = if (level.deserialized) { memoryStore.getValues(blockId).get } else { serializerManager.dataDeserializeStream( blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag) } // We need to capture the current taskId in case the iterator completion is triggered // from a different thread which does not have TaskContext set; see SPARK-18406 for // discussion. val ci = CompletionIterator[Any, Iterator[Any]](iter, { releaseLock(blockId, taskAttemptId) }) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) //如果是保存在磁盘中,则通过diskStore从磁盘获取数据 } else if (level.useDisk && diskStore.contains(blockId)) { val diskData = diskStore.getBytes(blockId) val iterToReturn: Iterator[Any] = { //跟上面的逻辑一致,判断是否有序列化 if (level.deserialized) { val diskValues = serializerManager.dataDeserializeStream( blockId, diskData.toInputStream())(info.classTag) //为加快读深度,会将磁盘数据,缓存到磁盘,maybeCacheDiskValuesInMemory函数就是干这个事情 maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) } else { val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) .map { _.toInputStream(dispose = false) } .getOrElse { diskData.toInputStream() } serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) } } //释放锁 val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, { releaseLockAndDispose(blockId, diskData, taskAttemptId) }) Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } else { handleLocalReadFailure(blockId) } } }
3.2 再去看看远程获取getRemoteValues,代码如下:
private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { val ct = implicitly[ClassTag[T]] //主要是这个方法 getRemoteBytes(blockId).map { data => val values = serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) new BlockResult(values, DataReadMethod.Network, data.size) } }
这里直接看getRemoteBytes()函数,代码如下:
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {logDebug(s"Getting remote block $blockId")require(blockId != null, "BlockId is null")var runningFailureCount = 0var totalFailureCount = 0// Because all the remote blocks are registered in driver, it is not necessary to ask// all the slave executors to get block status.//从BlockManagerMaster端获取block的状态和位置信息//消息会在BlockManagerMasterEndpoint中进行处理返回结果val locationsAndStatus = master.getLocationsAndStatus(blockId)val blockSize = locationsAndStatus.map { b =>b.status.diskSize.max(b.status.memSize)}.getOrElse(0L)//block所在BlockManager列表val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)//如果block块大于maxRemoteBlockToMem指定的阈值。// 这里需要借助remoteBlockTempFileManager变量进行数据拉取val tempFileManager = if (blockSize > maxRemoteBlockToMem) {remoteBlockTempFileManager} else {null}//一个block可能存在多个blockmanager上,这里进行了排序val locations = sortLocations(blockLocations)val maxFetchFailures = locations.sizevar locationIterator = locations.iterator//循环尝试拉取数据,拉取成功跳出循环while (locationIterator.hasNext) {val loc = locationIterator.next()logDebug(s"Getting remote block $blockId from $loc")val data = try {//这里blockTransferService服务进行远程数据的拉取//更深层次的代码这里不跟了,最后实际是调用NettyBlockTransferService类中//的 fetchBlocks()函数,从远程节点拉取数据到本地blockTransferService.fetchBlockSync(loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager).nioByteBuffer()} catch {case NonFatal(e) =>runningFailureCount = 1totalFailureCount = 1if (totalFailureCount >= maxFetchFailures) {// Give up trying anymore locations. Either we've tried all of the original locations,// or we've refreshed the list of locations from the master, and have still// hit failures after trying locations from the refreshed list.logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. "s"Most recent failure cause:", e)return None}logWarning(s"Failed to fetch remote block $blockId "s"from $loc (failed attempt $runningFailureCount)", e)// If there is a large number of executors then locations list can contain a// large number of stale entries causing a large number of retries that may// take a significant amount of time. To get rid of these stale entries// we refresh the block locations after a certain number of fetch failuresif (runningFailureCount >= maxFailuresBeforeLocationRefresh) {locationIterator = sortLocations(master.getLocations(blockId)).iteratorlogDebug(s"Refreshed locations from the driver "s"after ${runningFailureCount} fetch failures.")runningFailureCount = 0}// This location failed, so we retry fetch from a different one by returning null herenull}if (data != null) {return Some(new ChunkedByteBuffer(data))}logDebug(s"The value of block $blockId is null")}logDebug(s"Block $blockId not found")None}
4.写Block数据入口putBytes()函数,代码如下:
def putBytes[T: ClassTag]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(bytes != null, "Bytes is null") doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster) }
4.1 直接看doPutBytes()函数,代码如下:
private def doPutBytes[T](blockId: BlockId,bytes: ChunkedByteBuffer,level: StorageLevel,classTag: ClassTag[T],tellMaster: Boolean = true,keepReadLock: Boolean = false): Boolean = {doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>val startTimeMs = System.currentTimeMillis// Since we're storing bytes, initiate the replication before storing them locally.// This is faster as data is already serialized and ready to send.//如果block块的复制策略大于1个副本 这里就需要向其远程BlockManager写数据val replicationFuture = if (level.replication > 1) {Future {// This is a blocking action and should run in futureExecutionContext which is a cached// thread pool. The ByteBufferBlockData wrapper is not disposed of to avoid releasing// buffers that are owned by the caller.replicate(blockId, new ByteBufferBlockData(bytes, false), level, classTag)}(futureExecutionContext)} else {null}val size = bytes.size//如果持久化级别为内存 ,通过memoryStore写数据if (level.useMemory) {// Put it in memory first, even if it also has useDisk set to true;// We will drop it to disk later if the memory store can't hold it.//判断是否需要序列化val putSucceeded = if (level.deserialized) {val values =serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)memoryStore.putIteratorAsValues(blockId, values, classTag) match {case Right(_) => truecase Left(iter) =>// If putting deserialized values in memory failed, we will put the bytes directly to// disk, so we don't need this iterator and can close it to free resources earlier.iter.close()false}} else {val memoryMode = level.memoryModememoryStore.putBytes(blockId, size, memoryMode, () => {if (memoryMode == MemoryMode.OFF_HEAP &&bytes.chunks.exists(buffer => !buffer.isDirect)) {bytes.copy(Platform.allocateDirectBuffer)} else {bytes}})}if (!putSucceeded && level.useDisk) {logWarning(s"Persisting block $blockId to disk instead.")diskStore.putBytes(blockId, bytes)}//如果持久化级别为磁盘 ,通过diskStore写数据} else if (level.useDisk) {diskStore.putBytes(blockId, bytes)}//获取最新的blockStatus,并向BlockManagerMaster发送消息,通知更新val putBlockStatus = getCurrentBlockStatus(blockId, info)val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValidif (blockWasSuccessfullyStored) {// Now that the block is in either the memory or disk store,// tell the master about it.info.size = sizeif (tellMaster && info.tellMaster) {reportBlockStatus(blockId, putBlockStatus)}addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)}logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))//如果副本数大于1 等待远程节点返回处理结果if (level.replication > 1) {// Wait for asynchronous replication to finishtry {ThreadUtils.awaitReady(replicationFuture, Duration.Inf)} catch {case NonFatal(t) =>throw new Exception("Error occurred while waiting for replication to finish", t)}}if (blockWasSuccessfullyStored) {None} else {Some(bytes)}}.isEmpty}
4.2 上面的第18行代码中,如果block副本大于1,需要调用replicate()函数,复制到远程的BlockManager中,代码如下:
private def replicate(blockId: BlockId,data: BlockData,level: StorageLevel,classTag: ClassTag[_],existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {//获取复制最大失败次数,默认是1,参数spark.storage.maxReplicationFailures控制val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)val tLevel = StorageLevel(useDisk = level.useDisk,useMemory = level.useMemory,useOffHeap = level.useOffHeap,deserialized = level.deserialized,replication = 1)//需要复制的副本数val numPeersToReplicateTo = level.replication - 1val startTime = System.nanoTimeval peersReplicatedTo = mutable.HashSet.empty existingReplicasval peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]var numFailures = 0//获取集群中的所有blockmanager节点,过滤掉本地blockmanagerval initialPeers = getPeers(false).filterNot(existingReplicas.contains)var peersForReplication = blockReplicationPolicy.prioritize(blockManagerId,initialPeers,peersReplicatedTo,blockId,numPeersToReplicateTo)//下面就是循环向远程节点传递数据了while(numFailures <= maxReplicationFailures &&!peersForReplication.isEmpty &&peersReplicatedTo.size < numPeersToReplicateTo) {val peer = peersForReplication.headtry {val onePeerStartTime = System.nanoTimelogTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")//这里还是通过blockTransferService来上传数据到其他blockManager节点blockTransferService.uploadBlockSync(peer.host,peer.port,peer.executorId,blockId,new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false),tLevel,classTag)logTrace(s"Replicated $blockId of ${data.size} bytes to $peer"s" in ${(System.nanoTime - onePeerStartTime).toDouble / 1e6} ms")peersForReplication = peersForReplication.tailpeersReplicatedTo = peer} catch {case NonFatal(e) =>logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)peersFailedToReplicateTo = peer// we have a failed replication, so we get the list of peers again// we don't want peers we have already replicated to and the ones that// have failed previouslyval filteredPeers = getPeers(true).filter { p =>!peersFailedToReplicateTo.contains(p) && !peersReplicatedTo.contains(p)}numFailures = 1peersForReplication = blockReplicationPolicy.prioritize(blockManagerId,filteredPeers,peersReplicatedTo,blockId,numPeersToReplicateTo - peersReplicatedTo.size)}}logDebug(s"Replicating $blockId of ${data.size} bytes to "s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms")if (peersReplicatedTo.size < numPeersToReplicateTo) {logWarning(s"Block $blockId replicated to only "s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")}logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")}
这里只是讲了BlockManager中的核心函数,很简单无非就是数据的的本地、远程读写,更细的东西自己去看一下源码,感谢关注!!!
赞 (0)
