对于spark的典型应用场景为批处理,一般由基本数据源(文件系统如:hdfs)或者高级数据源(flume、kafka)作为spark的数据接入端。输出一样可以是文件系统或数据库等等。本文介绍一个用java写的demo程序,功能是从本地接收数据,经过spark处理之后输出到Elasticsearch。

先上代码:
maven
<dependency> <groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId> <version>2.3.1</version>
<scope>provided</scope> </dependency> <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId> <version>2.3.1</version>
<scope>provided</scope> </dependency> <dependency>
<groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version> <scope>provided</scope> </dependency> <dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId> <version>6.3.0</version>
</dependency> </dependencies>
代码
import org.apache.spark.SparkConf; import
org.apache.spark.api.java.JavaPairRDD; import
org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.SparkSession;
import com.unigroup.utils.BCConvert; import java.util.Arrays; import
java.util.List; import java.util.regex.Pattern; import
org.elasticsearch.spark.rdd.api.java.JavaEsSpark; public static void
main(String[] args) throws Exception { //利用sparksql创建Context SparkSession spark
= SparkSession.builder() .appName("JavaWordCount")
.master("local[2]")//如果是本地环境可以是local[n],n是线程数,必须大于1 //
.master("spark://192.168.7.51:7077")//如果是集群模式,需要指定master地址
.config("spark.es.nodes", "192.168.7.51")//指定es地址 .config("spark.es.port",
"9200")//指定es端口号 .getOrCreate();
//指定本地文件路径,如果spark是集群模式,需要每个节点上对应路径下都要有此文件。或者使用hdfs。 JavaRDD<String> lines =
spark.read().textFile("/path/to/test.txt").javaRDD(); //切分单词 JavaRDD<String>
words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
//转PairRDD JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new
Tuple2<>(s, 1)); //统计单词数 JavaPairRDD<String, Integer> counts =
ones.reduceByKey((i1, i2) -> i1 + i2); //全角转半角 JavaPairRDD<String, String>
ones1 = words.mapToPair(s -> new Tuple2<>(s, BCConvert.bj2qj(s))); //将结果保存到文件系统
// counts.saveAsTextFile("/Users/liubowen/sparkstream/out"); //返回结果 //
List<Tuple2<String, Integer>> output = counts.collect(); //循环结果 // for
(Tuple2<?,?> tuple : output) { // System.out.println(tuple._1() + ": " +
tuple._2()); // } //将结果保存到es //JavaEsSpark.saveJsonToEs(lines, "/spark/doc");
// List list = ones.collect(); // System.out.println(list); spark.stop(); }
<>读取文件系统数据


spark读取文件系统数据一般使用textFile(),注意这里如果是从本地文件中取数据,在集群模式下,必须保证每个节点对应路径下都要有数据文件。也可以使用hdfs或nfs。
textFile("/path/to/test.txt") textFile("hdfs://hadoop:9000/test.txt")
<>输出结果到Elasticsearch

maven依赖,使用elasticsearch-hadoop包,注意对应Elasticsearch版本
<dependency> <groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId> <version>6.3.0</version>
</dependency>
scala
import org.elasticsearch.spark.rdd.EsSpark;
java
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
需要指定Elasticsearch,可以写在配置文件里,也可以在程序里指定。

* 配置文件:spark-default.conf spark.es.nodes   eshosts spark.es.port   9200
* 写在程序里 SparkConf conf = new
SparkConf().setMaster(master).setAppName("StreamingTest") //
.set("spark.executor.memory", "1g") // .set("spark.dynamicAllocation.enabled",
"false") .set("spark.es.nodes", "192.168.7.51") .set("spark.es.port", "9200");
写入
JavaEsSparkStreaming.saveJsonToEs(out, "/spark/doc");
更多:Spark专栏 <https://blog.csdn.net/xiaoyu_bd/article/category/8044062>
——————————————————————————————————
作者:桃花惜春风
转载请标明出处,原文地址:
https://blog.csdn.net/xiaoyu_BD/article/details/82776974
<https://blog.csdn.net/xiaoyu_BD/article/details/82776974>
如果感觉本文对您有帮助,您的支持是我坚持写作最大的动力,谢谢!

友情链接
ioDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信