about spark The typical application scenario of is batch processing , It is generally composed of basic data sources ( File systems such as :hdfs) Or advanced data sources (flume,kafka) As spark Data access terminal of . The output can also be a file system or a database, etc . In this paper, a new method is introduced java Written demo program , The function is to receive data locally , after spark Output to after processing Elasticsearch.

Code first :
maven
<dependency> <groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId> <version>2.3.1</version>
<scope>provided</scope> </dependency> <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId> <version>2.3.1</version>
<scope>provided</scope> </dependency> <dependency>
<groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version> <scope>provided</scope> </dependency> <dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId> <version>6.3.0</version>
</dependency> </dependencies>
code
import org.apache.spark.SparkConf; import
org.apache.spark.api.java.JavaPairRDD; import
org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.SparkSession;
import com.unigroup.utils.BCConvert; import java.util.Arrays; import
java.util.List; import java.util.regex.Pattern; import
org.elasticsearch.spark.rdd.api.java.JavaEsSpark; public static void
main(String[] args) throws Exception { // utilize sparksql establish Context SparkSession spark
= SparkSession.builder() .appName("JavaWordCount")
.master("local[2]")// If it's a local environment, it can be local[n],n Is the number of threads , Must be greater than 1 //
.master("spark://192.168.7.51:7077")// If it is cluster mode , Need to be specified master address
.config("spark.es.nodes", "192.168.7.51")// appoint es address .config("spark.es.port",
"9200")// appoint es Port number .getOrCreate();
// Specify local file path , If spark It's cluster mode , This file needs to be in the corresponding path of each node . Or use hdfs. JavaRDD<String> lines =
spark.read().textFile("/path/to/test.txt").javaRDD(); // Word segmentation JavaRDD<String>
words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
// turn PairRDD JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new
Tuple2<>(s, 1)); // Count the number of words JavaPairRDD<String, Integer> counts =
ones.reduceByKey((i1, i2) -> i1 + i2); // Full angle to half angle JavaPairRDD<String, String>
ones1 = words.mapToPair(s -> new Tuple2<>(s, BCConvert.bj2qj(s))); // Save the results to the file system
// counts.saveAsTextFile("/Users/liubowen/sparkstream/out"); // Return results //
List<Tuple2<String, Integer>> output = counts.collect(); // Cycle results // for
(Tuple2<??> tuple : output) { // System.out.println(tuple._1() + ": " +
tuple._2()); // } // Save results to es //JavaEsSpark.saveJsonToEs(lines, "/spark/doc");
// List list = ones.collect(); // System.out.println(list); spark.stop(); }
<> Read file system data


spark Reading file system data is generally used textFile(), Note that if you are fetching data from a local file , In cluster mode , It is necessary to ensure that there are data files under the corresponding path of each node . It can also be used hdfs or nfs.
textFile("/path/to/test.txt") textFile("hdfs://hadoop:9000/test.txt")
<> Output results to Elasticsearch

maven rely on , use elasticsearch-hadoop package , Pay attention to correspondence Elasticsearch edition
<dependency> <groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId> <version>6.3.0</version>
</dependency>
scala
import org.elasticsearch.spark.rdd.EsSpark;
java
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
Need to be specified Elasticsearch, It can be written in the configuration file , It can also be specified in the program .

* configuration file :spark-default.conf spark.es.nodes   eshosts spark.es.port   9200
* Write it in the program SparkConf conf = new
SparkConf().setMaster(master).setAppName("StreamingTest") //
.set("spark.executor.memory", "1g") // .set("spark.dynamicAllocation.enabled",
"false") .set("spark.es.nodes", "192.168.7.51") .set("spark.es.port", "9200");
write in
JavaEsSparkStreaming.saveJsonToEs(out, "/spark/doc");
more :Spark special column <https://blog.csdn.net/xiaoyu_bd/article/category/8044062>
——————————————————————————————————
author : Peach blossom cherishes spring breeze
Please indicate the source of reprint , Original address :
https://blog.csdn.net/xiaoyu_BD/article/details/82776974
<https://blog.csdn.net/xiaoyu_BD/article/details/82776974>
If you feel that this article can help you , Your support is my biggest motivation to keep writing , thank you !