本篇文章为大家展示了如何进行JobScheduler内幕实现和深度思考,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
创新互联专业为企业提供五华网站建设、五华做网站、五华网站设计、五华网站制作等企业网站建设、网页设计与制作、五华企业网站模板建站服务,十多年五华做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。
DStream的foreachRDD方法,实例化ForEachDStream对象,并将用户定义的函数foreachFunc传入到该对象中。foreachRDD方法是输出操作,foreachFunc方法会作用到这个DStream中的每个RDD。
/** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. * @param foreachFuncforeachRDD function * @param displayInnerRDDOpsWhether the detailed callsites and scopes of the RDDs generated * in the `foreachFunc` to be displayed in the UI. If `false`, then * only the scopes and callsites of `foreachRDD` will override those * of the RDDs on the display. */ private defforeachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { newForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() } |
ForEachDStream对象中重写了generateJob方法,调用父DStream的getOrCompute方法来生成RDD并封装Job,传入对该RDD的操作函数foreachFunc和time。dependencies方法定义为父DStream的集合。
/** * An internal DStream used to represent output operations like DStream.foreachRDD. * @param parent Parent DStream * @param foreachFunc Function to apply on each RDD generated by the parent DStream * @param displayInnerRDDOpsWhether the detailed callsites and scopes of the RDDs generated * by `foreachFunc` will be displayed in the UI; only the scope and * callsite of `DStream.foreachRDD` will be displayed. */ private[streaming] classForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean ) extendsDStream[Unit](parent.ssc) {
override defdependencies: List[DStream[_]] = List(parent)
override defslideDuration: Duration = parent.slideDuration
override defcompute(validTime: Time): Option[RDD[Unit]] = None
override defgenerateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match{ caseSome(rdd) => valjobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(newJob(time, jobFunc)) caseNone => None } } } |
DStreamGraph的generateJobs方法中会调用outputStream的generateJob方法,就是调用ForEachDStream的generateJob方法。
defgenerateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time "+ time) valjobs = this.synchronized { outputStreams.flatMap { outputStream => valjobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated "+ jobs.length + " jobs for time "+ time) jobs } |
DStream的generateJob定义如下,其子类中只有ForEachDStream重写了generateJob方法。
/** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] defgenerateJob(time: Time): Option[Job] = { getOrCompute(time) match{ caseSome(rdd) => { valjobFunc = () => { valemptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(newJob(time, jobFunc)) } caseNone => None } } |
DStream的print方法内部还是调用foreachRDD来实现,传入了内部方法foreachFunc,来取出num+1个数后打印输出。
/** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ defprint(num: Int): Unit = ssc.withScope { defforeachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println("Time: "+ time) println("-------------------------------------------") firstNum.take(num).foreach(println) if(firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) } |
总结:JobScheduler是SparkStreaming 所有Job调度的中心,内部有两个重要的成员:
JobGenerator负责Job的生成,ReceiverTracker负责记录输入的数据源信息。
JobScheduler的启动会导致ReceiverTracker和JobGenerator的启动。ReceiverTracker的启动导致运行在Executor端的Receiver启动并且接收数据,ReceiverTracker会记录Receiver接收到的数据meta信息。JobGenerator的启动导致每隔BatchDuration,就调用DStreamGraph生成RDD Graph,并生成Job。JobScheduler中的线程池来提交封装的JobSet对象(时间值,Job,数据源的meta)。Job中封装了业务逻辑,导致最后一个RDD的action被触发,被DAGScheduler真正调度在Spark集群上执行该Job。
上述内容就是如何进行JobScheduler内幕实现和深度思考,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。
分享名称:如何进行JobScheduler内幕实现和深度思考
新闻来源:
http://cqcxhl.cn/article/ispgjg.html