/**  注意1: Spark saveAsTextFile 保存出来的是一个文件夹,所以才有以下获取文件夹里面的文件并整合成一个文件的操作; 注意2:
在没有main函数的情况下 实现类一定要 implements Serializable,不然会报错不能序列化 */ import
org.apache.spark.api.java.JavaRDD; import pass.common.response.Response; import
pass.computation.action.response.ActionResponse; import java.io.*; import
java.util.ArrayList; import java.util.List; public class SaveFileAction { /** *
@paramfile * @return 成功返回:以part开头的文件列表 */ public static List<String>
getFileList(File file) { List<String> result = new ArrayList<String>(); if
(!file.isDirectory()) { System.out.println(file.getAbsolutePath());
result.add(file.getAbsolutePath()); } else { File[] directoryList =
file.listFiles(new FileFilter() { public boolean accept(File file) { if
(file.isFile() && file.getName().indexOf("part") == 0) { return true; } else {
return false; } } }); for (int i = 0; i < directoryList.length; i++) {
result.add(directoryList[i].getPath()); } } return result; } /** * 删除单个文件 * *
@paramsPath 被删除文件path * @return 删除成功返回true,否则返回false */ public boolean
deleteFile(String sPath) { boolean flag = false; File file = new File(sPath);
// 路径为文件且不为空则进行删除 if (file.isFile() && file.exists()) { file.delete(); flag =
true; } return flag; } public boolean deleteDirectory(String sPath) {
//如果sPath不以文件分隔符结尾,自动添加文件分隔符 if (!sPath.endsWith(File.separator)) { sPath =
sPath + File.separator; } File dirFile = new File(sPath);
//如果dir对应的文件不存在,或者不是一个目录,则退出 if (!dirFile.exists() || !dirFile.isDirectory()) {
return false; } boolean flag = true; //删除文件夹下的所有文件(包括子目录) File[] files =
dirFile.listFiles(); for (int i = 0; i < files.length; i++) { //删除子文件 if
(files[i].isFile()) { flag = deleteFile(files[i].getAbsolutePath()); if (!flag)
break; } //删除子目录 else { flag = deleteDirectory(files[i].getAbsolutePath()); if
(!flag)break; } } if (!flag) return false; //删除当前目录 if (dirFile.delete()) {
return true; } else { return false; } } /** * @param saveRDD * @param fileType
"TXT"、"CSV" * @param filePath * @return * @throws IOException */ public
ActionResponsesaveFile(JavaRDD saveRDD, String fileType, String filePath) {
ActionResponse response =new ActionResponse(); response.setTaskStatus(Response.
FAILURE); if (null == saveRDD) { response.setErrorMsg("分布式内存数据集不能为空!"); return
response; } if (null == fileType || fileType.length() == 0) {
response.setErrorMsg("文件格式不能为空!"); return response; } if (null == filePath ||
filePath.length() ==0) { response.setErrorMsg("文件不能为空!"); return response; }
//写入hdfs if (filePath.contains("hdfs://")) { saveRDD.saveAsTextFile(filePath);
response.setTaskStatus(Response.SUCCESS); response.setErrorMsg(null); return
response; } else { if (new File(filePath).exists()) { deleteDirectory(filePath);
if(deleteDirectory(filePath)) { System.out.println("删除已经存在的文件夹!" + filePath); }
else{ System.out.println("删除不了!!!" + filePath); } }
saveRDD.saveAsTextFile(filePath); // 获取以part开头的文件内容 File f = new File(filePath);
List<String> list2 =new ArrayList<String>(); list2 = getFileList(f); String str
=""; try { for (String l : list2) { BufferedReader br = new BufferedReader(new
FileReader(new File(l))); String s = ""; while ((s = br.readLine()) != null) {
str += s +"\r\n"; //加"\r\n" 写入下面的文件的时候才能换行 } br.close(); } String outPath =
filePath +"." + fileType.toLowerCase(); if (new File(outPath).exists()) {
deleteFile(outPath); if (deleteFile(outPath)) { System.out.println("删除已经存在的文件!"
); } else { System.out.println("删除不了!!!" + outPath); } } //写入 outPath File
fileText =new File(outPath); FileWriter fileWriter = new FileWriter(fileText);
fileWriter.write(str); fileWriter.close(); } catch (IOException e) {
response.setTaskStatus(Response.FAILURE); response.setErrorMsg("写入文件失败");
returnresponse; } response.setTaskStatus(Response.SUCCESS);
response.setErrorMsg(null); return response; } } }  public class SaveAsFileTest
implementsSerializable{ @Test public void saveTXTFile() throws IOException {
SparkConf conf =new SparkConf() .setAppName("map") .setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf); List list1 = Arrays.asList("1",
"2","3","a","4","c"); String fileType1 = "TXT"; String filePath1 = "C:\\Users\\
Administrator\\Desktop\\t2"; String filePath2 = "hdfs://master:9000/hbase/test";
SaveFileAction saveFileAction =new SaveFileAction(); JavaRDD saveRDD1 =
sc.parallelize(list1); Response response = saveFileAction.saveFile(saveRDD1,
fileType1,filePath1); System.out.println("message: "+response.getErrorMsg());
sc.close(); } @Test public void saveCSVFile() throws IOException { SparkConf
conf =new SparkConf() .setAppName("map") .setMaster("local"); JavaSparkContext
sc =new JavaSparkContext(conf); String fileType2 = "CSV"; String filePath1 = "C:
\\Users\\Administrator\\Desktop\\t2"; String filePath2 =
"hdfs://master:9000/hbase/test"; SaveFileAction saveFileAction = new
SaveFileAction(); JavaRDD saveRDD1 = sc.parallelize(list1); JavaRDD saveRDD2 =
sc.textFile("C:\\Users\\Administrator\\Desktop\\im.csv"); Response response =
saveFileAction.saveFile(saveRDD2,fileType2,filePath1); System.out.println(
"message: "+response.getErrorMsg()); sc.close(); } }

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信