coalesce与repartition 解决小文件问题
repartition(numPartitions: Int)
返回numPartitions分区个数的新RDD(或DataFrame)。
可以增加或减少此RDD中的并行性级别,内部使用shuffle来重新分配数据。
如果要减少partition数量,可考虑使用`coalesce`,这可以避免执行shuffle。
coalesce(numPartitions: Int, shuffle: Boolean = false)
1)返回一个新的RDD,该RDD被缩减为`numPartitions`分区。
2)这导致窄依赖,例如, 如果从1000个分区转到100个分区,则不会有shuffle,而是100个新分区中的每一个都将声明当前分区的10个分区。
3)如果您正在进行剧烈的合并,例如将numPartitions从1000减少为1,这将会导致计算发生在非
常少的节点上(例如numPartitions = 1的情况下为一个节点)。
4)为了避免这种情况,可以传递shuffle = true,或者直接使用repartition。
这将添加一个shuffle步骤,意味着当前的上游分区将并行执行(无论当前的分区是什么)。
5)注意:随着shuffle = true,实际上可以合并到更大数量的分区。 如果你有少量的分区,比如
100,那么这很有用,可能有几个分区异常大。 调用coalesce(1000,shuffle = true)将导致
使用hash partitioner分发数据到1000个分区。
解决小文件问题
数据采集阶段: 配置合理的flume参数等
数据清洗: 使用coalesce或repatition设置合理的分区数
使用hbase保存数据
合并小文件程序
2 解决小文件问题-使用coalesce或repatition
解决小文件问题
val df = sqlContext.createDataFrame(rowRdd, struct)
val newDF = df.coalesce(1)
将newDF导入到hive表或者使用DataFrame的数据源方式写数据
2 解决小文件问题-合并小文件
1. 将小文件目录(srcDataPath)下的文件移动到临时目录/mergePath/${mergeTime}/src
2. 计算临时目录(/mergePath/${mergeTime}/src)的大小。 根据大小确定分区的数。1024M /128M = 8
3. 使用coalesce或者repartition, 传入分区数。
将临时目录数据写入临时的数据目录(/mergePath/${mergeTime}/data)
4. 将临时数据目录文件move到文件目录(srcDataPath)
5. 删除临时目录(/mergePath/${mergeTime}/src)
${mergeTime} 是变量, 用于标识每次合并的唯一标识
见代码案例实现
2 shuffle阶段产生小文件的原因
spark.sql.shuffle.partitions
调整Shuffle的并行度, 即task的数量。
shuffle的每个partition对应一个task, task数越多, 效率越高。
Spark默认在执行聚合(即shuffle)时,默认有200个分区。 这是由conf变
量“spark.sql.shuffle.partitions”定义的。 这就是使用DataFrame或者
spark整合Hive,产生shuffle后会产生大量小文件的原因。
热门工具 换一换