spark原理解析
* 解析一:resilient distributed dataset (RDD)
resilient distributed dataset (RDD):弹性分布式数据集,有容错机制可并行执行。
分布式即体现在每个rdd分多个partition,每个partition在执行时为一个task,会被master分配到某一个worker执行器(Executor)的某一个core中。
弹性体现在流水线思想(pipe),即rdd方法分为transformations和actions方法,两者的官方解释为:RDDs support two
types of operations: transformations, which create a new dataset from an
existing one, and actions, which return a value to the driver program after
running a computation on the
dataset。transformations类方法在执行过程中,只会记录每个rdd的依赖,不会立即执行,在这个过程中,可以弹性的处理partition。当action类方法执行时,会按照依赖,每个rdd去父rdd中要数据。
* 解析二:窄依赖(完全依赖)和宽依赖(部分依赖)
transformations类方法的依赖分为窄依赖(完全依赖)和宽依赖(部分依赖),窄依赖可以理解为:每个父rdd的一个分区的数据只会给一个子rdd的一个分区(一个task的数据只会给流水线下游的一个task),子rdd的分区中数据来自一个多个父rdd的分区的数据;宽依赖肯定会有某些或全部父rdd的task数据给多个子rdd的task。
当宽依赖时,需要进行shuffle,此时,会按照shuffle切分成一个个stage。
整个job的过程是一个有向无环图(DAG),如下图,是rdd方法leftOuterJoin执行时的一个DAG,rdd
leftOuterJoin是宽依赖,因此要划分stage,并会发生shuffle;当触发action类方法如collect时会按照依赖往dirver拉数据时,会从rdd
leftOuterJoin的task中拿数据,从而自下而上,触发整个流水线作业。
Dependency NarrowDependency(窄依赖) OneToOneDependency RangeDependency
ShuffleDependency(宽依赖)
* 解析三:shuffleShuffleManager HashShuffleManager SortShuffleManager(默认)
目前默认的shuffle方式为:SortShuffleManager
由于一个worker上运行多个task,每个worker上生成的所有临时文件数是reduce的数量
具体reduceByKey的shuffle过程如下,在map端会进行shuffle写,会先写到缓存,然后写到磁盘;在reduce端会进行shuffle读,读取时会判断取远程读还是在本机读,读取时也会先写到缓存。
shuffle时,等map端的父stage写完后,reduce端才会去进行fetch,fetch的时候是边fetch边处理,不会等全部fetch完再处理。
另外一种方式,hashShuffle,每个worker上会生成map*reduce个磁盘文件,会增大磁盘io以及内存的压力。
shuffle涉及的设置如下:
1、shuffle方式(sort、hash) spark.shuffle.manager 2、spark.shuffle.file.buffer.kb
shuffle写入缓存的大小(默认32kb) 3、spark.reducer.maxMbInFlight
shuffle读(reduce端)缓存大小(默认48m) 4、spark.shuffle.compress shuffle写入磁盘是否压缩,默认true
5、spark.shuffle.io.maxRetries shuffle读通过netty fetches读时,失败的最大尝试次数,默认3
6、spark.shuffle.io.retryWait 5中每次等待几秒(默认5s)
7、spark.shuffle.service.index.cache.size
* 解析四:task数量
当transformation方法时,就确定了map和reduce的task数量。
一般一个worker启动一个Executor,默认每个Executor使用的core数(同一时间一个core只能运行一个task)为机器的所有核心数(即每个CPU的核数相加)
使用rdd方法创建一个rdd时,如果运行在cluster模式下,partition默认的数量为所有Executor的总core数。
reduce的partition的数量。由于reduce可能来自多个rdd,如果没有自己实现分区器(partition)时,使用的是默认的分区器,此时如果配置文件没有配置参数时,使用的是父rdd的最大分区数,源码如下:
*reduce分区的数量 *
先按rdd1、rdd2的分区数进行降序排列,此时按续遍历,如果发现有rdd自己定时了partitioner,就返回自己定义的;如果没有定义,去查询spark.default.parallelism,如果没有该配置,返回父rdd分区数最高的一个分区;上面rdd3的分区为3个,取最高的
* 默认是HashPartitioner 还有一个RangePartitioner def defaultPartitioner(rdd: RDD[_],
others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++
others).sortBy(_.partitions.size).reverse for (r <- bySize if
r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { return
r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism"))
{ new HashPartitioner(rdd.context.defaultParallelism) } else { new
HashPartitioner(bySize.head.partitions.size) } }
解析五:打包发布,使用资源管理yarn
程序如下:
object TestSparkCore { def main(args: Array[String]): Unit = { // 1. 创建spark
context val conf = new SparkConf() conf.setAppName("first") // 设置应用程序的名字 //
conf.setMaster("spark://focuson1:7077") // 等价于 --master 参数
conf.set("spark.shuffle.manager", "hash") // 修改shuffle的实现类 val sc = new
SparkContext(conf) test_reduceByKey(sc) sc.stop() }
打成jar包,传到focuson1上,执行下面语句。
spark-submit --master yarn-cluster --class com.bd.spark.core.TestSparkCore
my_first_app.jar --master yarn-cluster 使用yarn或spark://ip:7077 --class
执行的类的,有包要写上包 --conf 配置 如--conf spark.shuffle.manager=hash --driver-class-path
jar包路径,不会发布到全部worker --jars jar包路径,会发布到全部worker --application-arguments
传递给主方法的参数
*像conf、appname等程序的优先级大于spark-submit
spark core
* core之rdd方法
* rdd action/* * actions方法
//collect,从每个worker中拿数据,在driver中显示。driver可能会oom
//takeordered是升序从每个分区中(一个rdd有多个分区,每个分区是一个task)拿出i个数据,拿到driver进行比较,拿出i个数据
//top是降序,类似takeordered */ def test_reduce(sc: SparkContext) = { val rdd =
sc.makeRDD(List("hello world", "hello count", "world spark")) rdd.reduce((x, y)
=> (x + y)) } def test_countApprox(sc: SparkContext) = {
//在数据量特别大,不需要精确结果时,求一个近似值 val rdd3 = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd3.countApprox(1000l) } def test_saveAsTextFile(sc: SparkContext) = { val rdd
= sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd.saveAsTextFile("hdfs://focuson1:9000/spark") }
* rdd transformationdef test_flatMap(sc: SparkContext) = { val rdd =
sc.makeRDD(List("hello world", "hello count", "world spark"), 2) val rdd2 =
rdd.flatMap { x => x.split(" ") } println(rdd2.collect()) //res1: Array[String]
= Array(hello, world, hello, count, world, spark) } def test_union(sc:
SparkContext) = { //执行时,在一个stage内,分为三个rdd 求并积 分区是rdd1+rdd2 val rdd1 =
sc.parallelize(List(1, 2, 3, 4, 5)) val rdd2 = sc.parallelize(List(5, 6, 7, 8,
9, 10)) val rdd3 = rdd1 ++ rdd2 //res54: Array[Int] = Array(1, 2, 3, 4, 5, 5,
6, 7, 8, 9, 10) } def test_cartesian(sc: SparkContext) { //分区数为rdd1*rdd2 笛卡尔积
val rdd1 = sc.parallelize(List("dog", "cat", "tiger")) val rdd2 =
sc.parallelize(List(1, 2)) rdd1.cartesian(rdd2) //res5: Array[(String, Int)] =
Array((dog,1), (dog,2), (cat,1), (cat,2), (tiger,1), (tiger,2)) }
//mapPartitionsWithIndex def test_mapPartitionsWithIndex(sc: SparkContext) = {
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2) var list = List[String]()
// 分区0的所有数据+a , 分区1的数据 +b rdd.mapPartitionsWithIndex((i, iter) => { while
(iter.hasNext) { if (i == 0) list = list :+ (iter.next + "a") else { list =
list :+ (iter.next + "b") } } list.iterator }) } def test_zip(sc: SparkContext)
= { //两个rdd元素必须相等 val list = sc.makeRDD(List(43, 5, 2, 5, 6, 33)) val list2 =
sc.makeRDD(List("a", "b", "c", "d", "e", "f")) list.zip(list2).collect //res29:
Array[(Int, String)] = Array((43,a), (5,b), (2,c), (5,d), (6,e), (33,f)) } def
test_reparition(sc: SparkContext) = { val rdd3 = sc.parallelize(List(1, 2, 3,
4, 5, 6), 2) rdd3.coalesce(4) //默认是false,即分区由多变少,此时由2变为4不能成功,还是两个分区
rdd3.coalesce(4, true) //此时会成功 /** * def coalesce(numPartitions: Int, shuffle:
Boolean = false) * 默认是false,即分区由多变少,有多变少不会进行shuffle;true时会进行分区,此时会进行shuffle */
/* def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null):
RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } */
rdd3.repartition(4) //相当于 rdd3.coalesce(4, true) } def test_reduceByKey(sc:
SparkContext) = { //reduceByKey就是把key值进行分组,然后每组内进行reduce val rdd =
sc.makeRDD(List(("hello", 1), ("hello", 1), ("hello", 1), ("world", 1),
("world", 1))) val rdd2 = rdd.reduceByKey { (x, y) => x + y } //res2:
Array[(String, Int)] = Array((hello,3), (world,2)) } def test_intersection(sc:
SparkContext) = { //取两个rdd的交集 val rdd1 = sc.parallelize(List("dog", "cat",
"tiger"), 2) val rdd2 = sc.parallelize(List("dog", "wolf", "pig"), 3) val rdd3
= rdd1.intersection(rdd2) //res23: Array[String] = Array(dog) } def
test_sortBy(sc: SparkContext) = { val list = sc.makeRDD(List(43, 5, 2, 5, 6,
33)) list.sortBy(x => x) //升序 list.sortBy(x => x, false) //降序 } def
test_aggregateByKey(sc: SparkContext) = { import scala.math._ val rdd =
sc.parallelize(List(("pig", 3), ("cat", 2), ("dog", 5), ("cat", 4), ("dog", 3),
("cat", 3), ("cat", 7), ("cat", 4)), 2) rdd.aggregateByKey(0)((x, y) => x + y,
(x, y) => x * y) /* partition:[0] (pig,3) (cat,2) (dog,5) (cat,4) partition:[1]
(dog,3) (cat,3) (cat,7) (cat,4)*/
//同一个分区内根据key进行分组,然后每组的value值进行第一个表达式的reduce操作 /* partition:[0] (pig,3) (cat,6)
(dog,5) partition:[1] (dog,3) (cat,14) (cat,7) */
//然后对各个分区的所有数据按key进行分区,然后按对value值进行reduce //res38: Array[(String, Int)] =
Array((dog,15), (pig,3), (cat,84))
//参数0(zeroValue)是指参与第一个表达式的运算,即每个分区内按分区之后每个组都有一个zeroValue值。如果rdd.aggregateByKey(100)((x,y)=>x+y,
(x,y)=>x*y) rdd.aggregateByKey(100)((x, y) => x + y, (x, y) => x * y) /*
partition:[0] (pig,3) (cat,2) (dog,5) (cat,4) partition:[1] (dog,3) (cat,3)
(cat,7) (cat,4)*/ //同一个分区内根据key进行分组,然后每组的value值进行第一个表达式的reduce操作 /*
partition:[0] (pig,103) (cat,106) (dog,105) partition:[1] (dog,103) (cat,114)
(cat,107) */ //然后对各个分区的所有数据按key进行分区,然后按对value值进行reduce //res40: Array[(String,
Int)] = Array((dog,10815), (pig,103), (cat,12084)) } def test_cogroup(sc:
SparkContext) = { val rdd1 = sc.parallelize(List(("cat", 1), ("dog", 1),
("cat", 3))) val rdd2 = sc.parallelize(List(("cat", 2), ("dog", 2)))
rdd1.cogroup(rdd2) //res49: Array[(String, (Iterable[Int], Iterable[Int]))] =
Array((dog,(CompactBuffer(1),CompactBuffer(2))), (cat,(CompactBuffer(1,
3),CompactBuffer(2)))) } def test_combineByKey(sc: SparkContext) = { } def
test_groupBykey(sc: SparkContext) = { val rdd1 = sc.parallelize(List(("cat",
1), ("dog", 1), ("cat", 3), ("dog", 2))) rdd1.groupByKey() //res46:
Array[(String, Iterable[Int])] = Array((dog,CompactBuffer(1, 2)),
(cat,CompactBuffer(1, 3)))
//Groupbykey会将所有的数据发给reducer,reducer压力会比较大,另外会比较占用网络带宽, 相比之下,reduceByKey,
会在mapper端首先进行运算,reducer的压力小,另外也可以节省网络带宽 } def test_join(sc: SparkContext) = {
val rdd1 = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 3))) val rdd2 =
sc.parallelize(List(("cat", 2), ("dog", 2), ("tiger", 2))) rdd1.join(rdd2)
//Array((dog,(1,2)), (cat,(1,2)), (cat,(3,2))) 将两个rdd集合中key相同的元素连接在一起 没有tiger }
def test_leftOuterJoin(sc: SparkContext) = { val rdd1 =
sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 3), ("wolf", 1))) val rdd2
= sc.parallelize(List(("cat", 2), ("dog", 2), ("tiger", 2))) val array =
rdd1.leftOuterJoin(rdd2) // 左外连接 //Array((wolf,(1,None)), (dog,(1,Some(2))),
(cat,(1,Some(2))), (cat,(3,Some(2)))) for ((k, v) <- array) { println("key:" +
k + " value:" + v._2.getOrElse(0)) } }
* cache
def test_cache(sc: SparkContext) = { val rdd = sc.parallelize(List()).cache()
//缓存 (不会压缩) /* /** Persist this RDD with the default storage level
(`MEMORY_ONLY`). */ def persist(): this.type =
persist(StorageLevel.MEMORY_ONLY) /** Persist this RDD with the default storage
level (`MEMORY_ONLY`). */ def cache(): this.type = persist() */ val rdd2 =
sc.parallelize(List()).persist() //该方法等于cache,默认是MEMORY_ONLY /*
spark.storage.memoryFraction = 0.6//这个意思表示0.6的内存作为缓存,其余的作为计算内存
StorageLevel.DISK_ONLY 只存到磁盘 StorageLevel.DISK_ONLY_2 在其他worker也缓存一份
StorageLevel.MEMORY_AND_DISK StorageLevel.MEMORY_AND_DISK2
StorageLevel.MEMORY_AND_DISK_SER//SER表是序列化压缩 StorageLevel.MEMORY_AND_DISK_SER2
StorageLevel.MEMORY_ONLY_SER StorageLevel.MEMORY_ONLY_SER2 StorageLevel.NONE
StorageLevel.OFF_HEAP//Similar to MEMORY_ONLY_SER, but store the data in
off-heap memory. This requires off-heap memory to be enabled. */ val rdd3 =
sc.parallelize(List()).persist(StorageLevel.OFF_HEAP) }
热门工具 换一换