/**   Be careful1: Spark saveAsTextFile It's a folder, So here's how to get the files in the folder and integrate them into one file; Be careful2:
In nomain In the case of function The implementation class must implements Serializable, Otherwise, an error will be reported and cannot be serialized */ import
org.apache.spark.api.java.JavaRDD; import pass.common.response.Response; import
pass.computation.action.response.ActionResponse; import java.io.*; import
java.util.ArrayList; import java.util.List; public class SaveFileAction { /** *
@paramfile * @return Successful return: withpart List of files at the beginning */ public static List<String>
getFileList(File file) { List<String> result = new ArrayList<String>(); if
(!file.isDirectory()) { System.out.println(file.getAbsolutePath());
result.add(file.getAbsolutePath()); } else { File[] directoryList =
file.listFiles(new FileFilter() { public boolean accept(File file) { if
(file.isFile() && file.getName().indexOf("part") == 0) { return true; } else {
return false; } } }); for (int i = 0; i < directoryList.length; i++) {
result.add(directoryList[i].getPath()); } } return result; } /** * Delete single file * *
@paramsPath Deleted filespath * @return Delete successful returntrue, Otherwise returnfalse */ public boolean
deleteFile(String sPath) { boolean flag = false; File file = new File(sPath);
// Delete if the path is file and not empty if (file.isFile() && file.exists()) { file.delete(); flag =
true; } return flag; } public boolean deleteDirectory(String sPath) {
// IfsPath Does not end with a file separator, Automatically add file separator if (!sPath.endsWith(File.separator)) { sPath =
sPath + File.separator; } File dirFile = new File(sPath);
// Ifdir The corresponding file does not exist, Or not a directory, Quit if (!dirFile.exists() || !dirFile.isDirectory()) {
return false; } boolean flag = true; // Delete all files under folder( Include subdirectories) File[] files =
dirFile.listFiles(); for (int i = 0; i < files.length; i++) { // Delete child file if
(files[i].isFile()) { flag = deleteFile(files[i].getAbsolutePath()); if (!flag)
break; } // delete a sub dir else { flag = deleteDirectory(files[i].getAbsolutePath()); if
(!flag)break; } } if (!flag) return false; // remove the current directory if (dirFile.delete()) {
return true; } else { return false; } } /** * @param saveRDD * @param fileType
"TXT","CSV" * @param filePath * @return * @throws IOException */ public
ActionResponsesaveFile(JavaRDD saveRDD, String fileType, String filePath) {
ActionResponse response =new ActionResponse(); response.setTaskStatus(Response.
FAILURE); if (null == saveRDD) { response.setErrorMsg(" Distributed memory dataset cannot be empty!"); return
response; } if (null == fileType || fileType.length() == 0) {
response.setErrorMsg(" File format cannot be empty!"); return response; } if (null == filePath ||
filePath.length() ==0) { response.setErrorMsg(" File cannot be empty!"); return response; }
// Write inhdfs if (filePath.contains("hdfs://")) { saveRDD.saveAsTextFile(filePath);
response.setTaskStatus(Response.SUCCESS); response.setErrorMsg(null); return
response; } else { if (new File(filePath).exists()) { deleteDirectory(filePath);
if(deleteDirectory(filePath)) { System.out.println(" Delete existing folders!" + filePath); }
else{ System.out.println(" Can not delete!!!" + filePath); } }
saveRDD.saveAsTextFile(filePath); // Get topart File contents at the beginning File f = new File(filePath);
List<String> list2 =new ArrayList<String>(); list2 = getFileList(f); String str
=""; try { for (String l : list2) { BufferedReader br = new BufferedReader(new
FileReader(new File(l))); String s = ""; while ((s = br.readLine()) != null) {
str += s +"\r\n"; // plus"\r\n" Line wrapping is only possible when writing the following file } br.close(); } String outPath =
filePath +"." + fileType.toLowerCase(); if (new File(outPath).exists()) {
deleteFile(outPath); if (deleteFile(outPath)) { System.out.println(" Delete existing files!"
); } else { System.out.println(" Can not delete!!!" + outPath); } } // Write in outPath File
fileText =new File(outPath); FileWriter fileWriter = new FileWriter(fileText);
fileWriter.write(str); fileWriter.close(); } catch (IOException e) {
response.setTaskStatus(Response.FAILURE); response.setErrorMsg(" fail to write to file");
returnresponse; } response.setTaskStatus(Response.SUCCESS);
response.setErrorMsg(null); return response; } } }  public class SaveAsFileTest
implementsSerializable{ @Test public void saveTXTFile() throws IOException {
SparkConf conf =new SparkConf() .setAppName("map") .setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf); List list1 = Arrays.asList("1",
"2","3","a","4","c"); String fileType1 = "TXT"; String filePath1 = "C:\\Users\\
Administrator\\Desktop\\t2"; String filePath2 = "hdfs://master:9000/hbase/test";
SaveFileAction saveFileAction =new SaveFileAction(); JavaRDD saveRDD1 =
sc.parallelize(list1); Response response = saveFileAction.saveFile(saveRDD1,
fileType1,filePath1); System.out.println("message: "+response.getErrorMsg());
sc.close(); } @Test public void saveCSVFile() throws IOException { SparkConf
conf =new SparkConf() .setAppName("map") .setMaster("local"); JavaSparkContext
sc =new JavaSparkContext(conf); String fileType2 = "CSV"; String filePath1 = "C:
\\Users\\Administrator\\Desktop\\t2"; String filePath2 =
"hdfs://master:9000/hbase/test"; SaveFileAction saveFileAction = new
SaveFileAction(); JavaRDD saveRDD1 = sc.parallelize(list1); JavaRDD saveRDD2 =
sc.textFile("C:\\Users\\Administrator\\Desktop\\im.csv"); Response response =
saveFileAction.saveFile(saveRDD2,fileType2,filePath1); System.out.println(
"message: "+response.getErrorMsg()); sc.close(); } }