spark架构原理-星辰平台

发表于 2022/04/19 20:56:31 2022/04/19
【摘要】 相比mapreduce僵化的map与reduce分阶段计算相比,spark的计算框架更加富有弹性和灵活性,运行性能更佳。 1 spark的计算阶段mapreduce一个应用一次只运行一个map和一个reducespark可根据应用的复杂度,分割成更多的计算阶段(stage),组成一个有向无环图dag,spark任务调度器可根据dag的依赖关系执行计算阶段逻辑回归机器学习性能spark比map...

相比mapreduce僵化的map与reduce分阶段计算相比,spark的计算框架更加富有弹性和灵活性,运行性能更佳。

1 spark的计算阶段

  • mapreduce一个应用一次只运行一个map和一个reduce
  • spark可根据应用的复杂度,分割成更多的计算阶段(stage),组成一个有向无环图dag,spark任务调度器可根据dag的依赖关系执行计算阶段

逻辑回归机器学习性能spark比mapreduce快100多倍。因为某些机器学习算法可能需要进行大量迭代计算,产生数万个计算阶段,这些计算阶段在一个应用中处理完成,而不是像mapreduce那样需要启动数万个应用,因此运行效率极高。

dag,有向无环图,不同阶段的依赖关系是有向的,计算过程只能沿依赖关系方向执行,被依赖的阶段执行完成前,依赖的阶段不能开始执行。该依赖关系不能有环形依赖,否则就死循环。

典型的spark运行dag的不同阶段:

整个应用被切分成3个阶段,阶段3需要依赖阶段1、2,阶段1、2互不依赖。spark执行调度时,先执行阶段1、2,完成后,再执行阶段3。对应spark伪代码:

rddb = rdda.groupby(key)
rddd = rddc.map(func)
rddf = rddd.union(rdde)
rddg = rddb.join(rddf)

所以spark作业调度执行的核心是dag,整个应用被切分成数个阶段,每个阶段的依赖关系也很清楚。根据每个阶段要处理的数据量生成任务集合(taskset),每个任务都分配一个任务进程去处理,spark就实现了大数据的分布式计算。

负责spark应用dag生成和管理的组件是dagscheduler,dagscheduler根据程序代码生成dag,然后将程序分发到分布式计算集群,按计算阶段的先后关系调度执行。

那么spark划分计算阶段的依据是什么呢?显然并不是rdd上的每个转换函数都会生成一个计算阶段,比如上面的例子有4个转换函数,但是只有3个阶段。

你可以再观察一下上面的dag图,关于计算阶段的划分从图上就能看出规律,当rdd之间的转换连接线呈现多对多交叉连接的时候,就会产生新的阶段。一个rdd代表一个数据集,图中每个rdd里面都包含多个小块,每个小块代表rdd的一个分片。

一个数据集中的多个数据分片需要进行分区传输,写入到另一个数据集的不同分片中,这种数据分区交叉传输的操作,我们在mapreduce的运行过程中也看到过。

是的,这就是shuffle过程,spark也需要通过shuffle将数据进行重新组合,相同key的数据放在一起,进行聚合、关联等操作,因而每次shuffle都产生新的计算阶段。这也是为什么计算阶段会有依赖关系,它需要的数据来源于前面一个或多个计算阶段产生的数据,必须等待前面的阶段执行完毕才能进行shuffle,并得到数据。

计算阶段划分的依据是shuffle,不是转换函数的类型,有的函数有时有shuffle,有时没有。如上图例子中rdd b和rdd f进行join,得到rdd g,这里的rdd f需要进行shuffle,rdd b不需要。

因为rdd b在前面一个阶段,阶段1的shuffle过程中,已进行了数据分区。分区数目和分区k不变,无需再shuffle:

  • 这种无需进行shuffle的依赖,在spark里称作窄依赖
  • 需要进行shuffle的依赖,被称作宽依赖

类似mapreduce,shuffle对spark也很重要,只有通过shuffle,相关数据才能互相计算。

既然都要shuffle,为何spark就更高效?

本质上看,spark算是一种mapreduce计算模型的不同实现。hadoop mapreduce简单粗暴根据shuffle将大数据计算分成map、reduce两阶段就完事。但spark更细,将前一个的reduce和后一个的map连接,当作一个阶段持续计算,形成一个更优雅、高效地计算模型,其本质依然是map、reduce。但这种多个计算阶段依赖执行的方案可有效减少对hdfs的访问,减少作业的调度执行次数,因此执行速度更快。

不同于hadoop mapreduce主要使用磁盘存储shuffle过程中的数据,spark优先使用内存进行数据存储,包括rdd数据。除非内存不够用,否则尽可能使用内存, 这也是spark性能比hadoop高的原因。

spark作业管理

spark里面的rdd函数有两种:

  • 转换函数,调用以后得到的还是一个rdd,rdd的计算逻辑主要通过转换函数完成
  • action函数,调用以后不再返回rdd。比如count()函数,返回rdd中数据的元素个数;saveastextfile(path),将rdd数据存储到path路径下。spark的dagscheduler在遇到shuffle的时候,会生成一个计算阶段,在遇到action函数的时候,会生成一个作业(job)

rdd里面的每个数据分片,spark都会创建一个计算任务去处理,所以一个计算阶段含多个计算任务(task)。

作业、计算阶段、任务的依赖和时间先后关系:

横轴时间,纵轴任务。两条粗黑线之间是一个作业,两条细线之间是一个计算阶段。一个作业至少包含一个计算阶段。水平方向红色的线是任务,每个阶段由很多个任务组成,这些任务组成一个任务集合。

dagscheduler根据代码生成dag图后,spark任务调度就以任务为单位进行分配,将任务分配到分布式集群的不同机器上执行。

spark执行流程

spark支持standalone、yarn、mesos、k8s等多种部署方案,原理类似,仅是不同组件的角色命名不同。

spark cluster components:

首先,spark应用程序启动在自己的jvm进程里(driver进程),启动后调用sparkcontext初始化执行配置和输入数据。sparkcontext启动dagscheduler构造执行的dag图,切分成最小的执行单位-计算任务。

然后,driver向cluster manager请求计算资源,用于dag的分布式计算。cluster manager收到请求后,将driver的主机地址等信息通知给集群的所有计算节点worker。

worker收到信息后,根据driver的主机地址,跟driver通信并注册,然后根据自己的空闲资源向driver通报自己可以领用的任务数。driver根据dag图开始向注册的worker分配任务。

worker收到任务后,启动executor进程执行任务。executor先检查自己是否有driver的执行代码,若无,从driver下载执行代码,通过java反射加载后开始执行。

总结

相比 mapreduce,spark的主要特性:

  • rdd编程模型更简单
  • dag切分的多阶段计算过程更快
  • 使用内存存储中间计算结果更高效

spark在2012年开始流行,那时内存容量提升和成本降低已经比mapreduce出现的十年前强了一个数量级,spark优先使用内存的条件已经成熟。

参考

【星辰平台的版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。