重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
今天就跟大家聊聊有关Spark2.x中如何进行BlockManagerMaster源码剖析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
成都网站制作、网站建设的开发,更需要了解用户,从用户角度来建设网站,获得较好的用户体验。创新互联建站多年互联网经验,见的多,沟通容易、能帮助客户提出的运营建议。作为成都一家网络公司,打造的就是网站建设产品直销的概念。选择创新互联建站,不只是建站,我们把建站作为产品,不断的更新、完善,让每位来访用户感受到浩方产品的价值服务。
1.BlockManagerMaster创建
BlockManagerMaster要负责整个应用程序在运行期间block元数据的管理和维护,以及向从节点发送指令执行命令,它是在构造SparkEnv的时候创建的,Driver端是创建SparkContext的时候创建SparkEnv,SparkEnv中对应的初始化代码如下:
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver)
这里可以看到在构造blockManagerMaster时,会创建一个BlockManagerMasterEndpoint实例并注册到了rpcEnv中,Executor中的blockManager通过Driver端BlockManagerMasterEndpoint的引用BlockManagerMasterRef与blockManagerMaster进行通信。
2.BlockManagerMaster成员函数:
1).removeExecutor()函数,代码如下:
//向BlockManagerMasterEndpoint发送RemoveExecutor消息,移除挂掉的Exeutor //这个函数只会在driver端调用 def removeExecutor(execId: String) { tell(RemoveExecutor(execId)) logInfo("Removed " + execId + " successfully in removeExecutor") }
2).removeExecutorAsync()函数,代码如下:
// 跟1)作用差不多,移除挂掉的Executor,这里是非阻塞的异步方法 def removeExecutorAsync(execId: String) { driverEndpoint.ask[Boolean](RemoveExecutor(execId)) logInfo("Removal of executor " + execId + " requested") }
3).registerBlockManager()函数,代码如下:
//Executor端的BlockManager启动会,会向BlockManagerMaster进行注册// BlockManagerMaster会保存在master的blockManagerInfo中 def registerBlockManager( blockManagerId: BlockManagerId, maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { logInfo(s"Registering BlockManager $blockManagerId") val updatedId = driverEndpoint.askSync[BlockManagerId]( RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") updatedId }
3).updateBlockInfo()函数,代码如下:
//更新block数据块信息 def updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean = { //向BlockManagerMasterEndpoint发送UpdateBlockInfo消息,并且返回结果 val res = driverEndpoint.askSync[Boolean]( UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) logDebug(s"Updated info of block $blockId") res }
4).getLocations()函数,代码如下:
//获取block所在的BockManager节点信息,这里返回的是Seq集合,
//如果block的Replication>1 一个block块,可能会在多个blockmanager
//节点上存在
def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
//向BlockManagerMasterEndpoint发送GetLocations消息
driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))
}
5).getPeers()函数,代码如下:
//获取参数blockManagerId之外的其他BlockManagerId, //上面说了一个block,可能会在多个blockmanager节点上存在 def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { //向BlockManagerMasterEndpoint发送GetPeers消息 driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId)) }
6).getExecutorEndpointRef()函数,代码如下:
//这里就是获取BlockManagerMasterEndpoint的引用,与其进行通信 private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { for ( blockManagerId <- blockManagerIdByExecutor.get(executorId); info <- blockManagerInfo.get(blockManagerId) ) yield { info.slaveEndpoint } }
7).getBlockStatus()函数,代码如下:
//获取一个Block的状态信息,位置,占用内存和磁盘大小def getBlockStatus( blockId: BlockId, askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { val msg = GetBlockStatus(blockId, askSlaves) val response = driverEndpoint. askSync[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) val (blockManagerIds, futures) = response.unzip implicit val sameThread = ThreadUtils.sameThread val cbf = implicitly[ CanBuildFrom[Iterable[Future[Option[BlockStatus]]], Option[BlockStatus], Iterable[Option[BlockStatus]]]] val blockStatus = timeout.awaitResult( Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread)) if (blockStatus == null) { throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) } blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) => status.map { s => (blockManagerId, s) } }.toMap }
BlockManagerMaster里面的各种函数处理其实都在 BlockManagerMasterEndpoint实例中,后面我们会详细剖析BlockManagerMasterEndpoint类的各个消息的具体处理流程。
看完上述内容,你们对Spark2.x中如何进行BlockManagerMaster源码剖析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。