spark在处理的数据在内部是分partition的。

除非是在本地新建的list数组才需要使用parallelize。保存在hdfs中的文件,在使用spark处理的时候是默认分partition的。

我们可以使用getNumPartitions()获取当前rdd的partition的信息。

通过glom()函数能够获取到分partition的rdd信息


我们在处理数据的一般使用的map函数,同样也可以根据partition进行mapPartition处理,但是需要注意的是map处理的是每一行的数据,是item。而mapPartition是处理的是一个partition上的数据,所以它处理的是iterator。

coalesce我们可以用这个函数进行reduce操作,缩减分区数,注意是缩减分区数,不能增加分区数。

repartition 我们可以使用这个函数进行重新分区,指定我们想要的分区数,设置的分区数可以大于当前rdd的分区数,也可以小于当前rdd的分区数。

在coalesce和repartition的选择上遵循这样的原则,如果是减少分区数,优先使用coalesce,如果是增加分区数,使用repartition。

下面第一个例子:


#!/usr/bin/python # -*- coding: UTF-8 -*- from pyspark import SparkContext,
SparkConf def main(): logFile = "/user/root/spark/sparkstudy03.txt" master =
'yarn-client' appName = 'Simple App spark study03' conf =
SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
logData = sc.textFile(logFile) logData.cache() print("partitions is :%s" %
logData.getNumPartitions()) logList = logData.glom().collect() print("glom
result is :%s" % logList) logDataPartition4 = logData.repartition(4)
print("repartition num :%s", logDataPartition4.getNumPartitions())
print("repartition result :%s", logDataPartition4.glom().collect())
logListCoalesce = logDataPartition4.coalesce(3, True).glom().collect()
print("coalesce shuffle true result is :%s" % logListCoalesce) logListCoalesce2
= logDataPartition4.coalesce(3, False).glom().collect() print("coalesce shuffle
false result is :%s" % logListCoalesce2) logData.unpersist() if __name__ ==
'__main__': main()
 

原始数据
1 2 3 4 5 6 7
运行结果:
partitions is :2 glom result is :[[u'1', u'2', u'3', u'4'], [u'5', u'6',
u'7']] ('repartition num :%s', 4) ('repartition result :%s', [[u'2', u'6'],
[u'3', u'7'], [u'4'], [u'1', u'5']]) coalesce shuffle true result is :[[u'1'],
[u'2', u'3', u'5'], [u'6', u'7', u'4']] coalesce shuffle false result is
:[[u'2', u'6'], [u'3', u'7'], [u'4', u'1', u'5']]
 

 

 

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信