DAG原理(源码级)
推荐
在线提问>>
1. sparkContext创建DAGScheduler->创建EventProcessLoop->调用eventLoop.start()方法开启事件监听
2. action调用sparkContext.runJob->eventLoop监听到事件,调用handleJobSubmitted开始划分stage
3. 首先对触发job的finalRDD调用createResultStage方法,通过getOrCreateParentStages获取所有父stage列表,然后创建自己。 如:父(stage1,stage2),再创建自己stage3
4. getOrCreateParentStages内部会调用getShuffleDependencies获取所有直接宽依赖(从后往前推,窄依赖直接跳过) 在这个图中G的直接宽依赖是A和F,B因为是窄依赖所以跳过,所以最后B和G属于同一个stage
5. 接下来会循环宽依赖列表,分别调用getOrCreateShuffleMapStage: -- 如果某个RDD已经被划分过会直接返回stageID;
否则就执行getMissingAncestorShuffleDependencies方法,继续寻找该RDD的父宽依赖,窄依赖老规矩直接加入: -- 如果返回的宽依赖列表不为空,则继续执行4,5的流程直到为空为止;
-- 如果返回的宽依赖列表为空,则说明它没有父RDD或者没有宽依赖,此时可以直接调用createShuffleMapStage将该stage创建出来
6. 因此最终的划分结果是stage3(B,G)、stage2(C,D,E,F)、stage1(A)
7. 创建ResultStage,调用submitStage提交这个stage
8. submitStage会首先检查这个stage的父stage是否已经提交,如果没提交就开始递归调用submitStage提交父stage,最后再提交自己。
9. 每一个stage都是一个taskSet,每次提交都会提交一个taskSet给TaskScheduler