spark实现往es写入数据 并且支持x-pack

废话不说 直接贴代码

1、pom文件
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</
artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>
org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>
2.2.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId> <version>5.6.5</version> </
dependency>
2、scala代码
package com.test.demo import java.util import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch
.spark.sql.EsSparkSQL object SparkToES { def main(args: Array[String]): Unit =
{ //我是本地模式进行写入 到集群去掉setMaster("local[5]") var sconf = new SparkConf().setAppName
(this.getClass.getName).setMaster("local[5]") var sc = new SparkContext(sconf)
sconf.set("es.nodes", "localhost1localhost2,localhost3") sconf.set("es.port",
"9200") sconf.set("es.index.auto.create", "true") sconf.set("es.write.operation"
,"index") sconf.set("es.mapping.id", "id") //如果装有x-pack 可以使用下面方式添加用户名密码 sconf
.set("es.net.http.auth.user","username") sconf.set("es.net.http.auth.pass",
"password") val spark = SparkSession.builder().config(sconf).getOrCreate()
//这里为了方便 直接json字符串转成dataframe 可以根据数据源自行更改 var json="""[{"id":"1","test1":"1","
test2":"2","test3":"3"},{"id":"2","test1":"11","test2":"22","test3":"33"}]"""
import spark.implicits._ var dataset = spark.createDataset(s"$json" :: Nil) var
df=spark.read.json(dataset) //第一个dataframe 第二个形参格式 _index/_type EsSparkSQL
.saveToEs(df,"testsparkes/testsparkes") } }

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信