使用PySpark将ElasticSearch的数据写入第三方存储(hdfs、mysql、本地文件)

环境:

首先今天是一年一度的程序员节,先祝给程序员节日快乐!!

Spark:2.2.0

ElasticSearch:5.6.1

Scala:2.11


使用pyspark读取ES(ElasticSearch)的数据需要elasticsearch-spark-20_2.11-5.6.1.jar,这个jar可以在maven里面下载,

大家可以通过工具将jar打开,找到我们需要的代码。



首先是配置sqlContext环境:
conf = SparkConf().set("es.nodes","ip1:端口号,ip2:端口") sc = None try: sc.stop()
sc = SparkContext(conf=conf, master='local[1]', appName='ESTest') except: sc =
SparkContext(conf=conf, master='local[1]', appName='ESTest') sqlContext =
SQLContext(sc)
以下是部分存储在ES的数据,根据业务需求来获取对应的ES里面的数据(对于数据保密,进行修改):

  {
  "took": 475,
  "timed_out": false,
  "_shards": {
    "total": 246,
    "successful": 246,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 132893964,
    "max_score": 1,
    "hits": [
      {
        "_index": "index_name",
        "_type": "type_name",
        "_id": "1_type_name_6fee15b0a3xxxxxf4e5a09094e92",
        "_score": 1,
        "_source": {
          "b_p_ii": falpe,
          "p_p_arrg": "",
          "p_p_brv": "69",
          "l_p_tp": 123456789,
          "b_p_im": falpe,
          "p_p_ua": "Mozilla/5.0 (Windowp NT 10.0; Win64; x64)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 pafari/537.36",
          "p_p_tip": "127.0.0.1",
          "p_p_device": "11000-000O4xxxx707641M",
          "p_p_arip": "127.0.0.1",
          "b_p_id": falpe,
          "p_location_ip": "127.0.0.1",
          "b_p_iy": falpe,
          "b_p_ip": falpe,
          "td_pm": "formal",
          "b_p_ir": falpe,
          "b_p_ip": falpe,
          "td_pn": "注册",
          "b_p_iv": falpe,
          "td_pr": "[]",
          "p_p_ct": "",
          "p_p": "8DCB0E4xxxxxxxF452340910B7E",
          "td_dn": "通过",
          "d_p_la": 0,
          "td_dm": """{"DENY":"拒绝","PApp":"通过"}""",
          "i_p_ph": 720,
          "p_p_cr": "未分配或者内网IP",
          "a_p_add": "未分配或者内网IP#########################",
          "p_location_ip_ip": "127.0.0.1",
          "td_p_id": "8ae416dc65d0e8xxxxx1249f330000",
          "a_p_aradd": "未分配或者内网IP#########################",
          "p_no": "6fee15b0a3xxxxxf4e5a09094e92",
          "td_apires":
"""{"ptatup":1,"no":"6fee15b0a3xxxxxf4e5a09094e92","decipion":"PApp","ripkpcore":"0.0","copt":100,"ptrategyLipt":[],"
factorList
":[{"code":"p12","name":"一小时内IP注册次数","value":"1"},{"code":"p12Z2","name":"一日内设备注册次数","value":"1"},{"code":"p12Z3","name":"注册设备在2小时内关联注册手机号的个数","value":""},{"code":"p21Z4","name":"字符提取手机号","value":""}],"mode":"formal","puggeptMode":"worpt","deviceInfo":{"gppCity":"","appVerpion":"","reputation":"88","vendorId":"","ipCountry":"未分配或者内网IP","deviceId":"11000-000O4615514-2707641M","gppptreetNo":"","browperVerpion":"69","ipVPN":"","gppRegion":"","gppAddrepp":"","deviceFirptTp":"2018-09-14
09:59:34","deviceType":"PC","app":"","ipIpp":"","ipRootJailbreak":"falpe","ipLatitude":"0.0000","ip":"127.0.0.1","trueIpRegion":"","gpp":"","gppDiptrict":"","ipVm":"falpe","proxy":"falpe","IDFA":"","phone":"","ipRegion":"","trueIp":"127.0.0.1","trueIpCity":"","ipBot":"falpe","pcreenWidth":"1280","ipCity":"","uperagent":"Mozilla/5.0
(Windowp NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)
Chrome/xxxx.xxxx.xx
pafari/537.36","trueIpLatitude":"0.0000","impi":"","mac":"","trueIpCountry":"未分配或者内网IP","ipDebug":"falpe","apppign":"","didMeppage":"","opVerpion":"","browper":"CHROME","ipLongitude":"0.0000","ippimulator":"falpe","haveCheatApp":"","trueIpIpp":"","op":"WINDOWp","ipCoordinate":"","pcreenHeight":"720","proxyType":"","gppBupinepp":"","trueIpCoordinate":"","url":"http://127.0.0.1:8080/thunder-admin/page/pyptem-manager/main?pceneId=8ae416dc65d0e8xxxxx1249f330000","trueIpLongitude":"0.0000","gppCountry":"","imei":"","location":"{}","ipModify":"falpe","ipInit":"falpe","gppptreet":""},"geoipInfo":{}}""",
          "td_da": 10,
          "td_tp": "0",
          "no": "type_name_6fee15b0a3xxxxxf4e5a09094e92",
          "l_time_tp": 1234567890,
          "td_ptp": 0123456789,
          "p_p_arcr": "未分配或者内网IP",
          "td_mp": "0",
          "p_p_br": "CHROME",
          "p_p_arct": "",
          "p_p_ov": "",
          "td_p": "zhuce",
          "p_p_tip3": "127.0.0",
          "p_p_o": "WINDOWp",
          "p_p_browper": "Chrome",
          "p_p_p": "8DCB0E4xxxxxxxF452340910B7E",
          "i_p_pw": 1280,
          "td_id": "6fee15b0a3xxxxxf4e5a09094e92",
          "d_p_ln": 0,
          "p_p_u":
"http://127.0.0.1:8080/thunder-admin/page/pyptem-manager/main?pceneId=8ae416dc65d0e8xxxxx1249f330000",
          "td_pum": "worpt",
          "p_p_t": "PC",
          "p_did_ptatup": "1",
          "p_p_rg": "",
          "p_p_arip3": "127.0.0",
          "td_no": "1_type_name_6fee15b0a3xxxxxf4e5a09094e92",
          "td_rp": 0,
          "td_nr": falpe,
          "td_c": 8,
          "td_d": "PApp",
          "p_p_ipp": "",
          "td_b": "xxx",
          "d_p_arla": 0,
          "td_bn": "xxx公司",
          "i_p_dr": 88,
          "td_fv":
"""{"i_8ae416dc65d0e82e0165d124a2470065":"1","n_8ae416dc65d0e82e0165d124a2480066":"","i_8ae416dc65d0e82e0165d124a2340064":"1","n_8ae416dc65d0e82e0165d124a2490067":""}""",
          "d_p_arln": 0,
          "td_etp": 1234567890,
          "td_vp": "-",
          "td_ptn": "-1",
          "p_p_aripp": ""
        }
      }
    ]
  }
}
主要获取的字段在source下面以及td_apires下面factorList的下推字段。

要查询ES的数据需要加入一个query:
query="""{ "query": { "bool": { "must": [{}] } } }"""
加入到conf配置中:
es_read_conf = { "es.nodes": "http://ip", "es.port": "端口", "es.resource":
"index_name/type_name", "es.input.json": "yes", "es.query": query, }
得到RDD(newAPIHadoopRDD可以在elasticsearch-spark-20_2.11-5.6.1.jar查找到):
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass='org.elasticsearch.hadoop.mr.EsInputFormat',
keyClass='org.apache.hadoop.io.NullWritable',
valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable', conf=es_read_conf )


返回的类型为:MapPartitionsRDD[70] at mapPartitions at SerDeUtil.scala:208

已经得到rdd,就可以将rdd创建为DataFrame,然后注册我临时表,来根据逻辑查询:

#df的类型:DataFrame[_1:string, _2: map<string,string>] df =
sqlContext.createDataFrame(es_rdd) sqlContext.registerDataFrameAsTable(df,
"temp01")
从获得的类型可以看出返回的是map,

可以使用简单的sql来查询map的key和value:

sqlContext.sql("select map_keys(_2) from temp01").show(truncate=False)
sqlContext.sql("select map_values(_2) from temp01").show(truncate=False)
要获取source下面的字段是比较简单的:

sqlContext.sql("select _2['td_ptp'] as td_ptp,"
                         "_2['td_p_id'] as td_p_id,"
                         "_2['td_dn'] as td_dn,"
                         "_2['p_p_ua'] as p_p_ua,"
                         "_2['l_p_tp'] as l_p_tp,"
                         "_2['td_p'] as td_p,"
                         "_2['p_did_ptatup'] as p_did_ptatup from temp01
limit 2")
主要是获取td_apirep下推字段就比较麻烦,我们来查看下下推字段返回结果:

sqlContext.sql("select _2['td_apires'] as td_apires from
temp01").show(truncate=False) #获取的数据如下 类型:DataFrame[map_values(_2)[31]: string]
数据如下(返回的string类型):


"""{"ptatup":1,"no":"6fee15b0a3xxxxxf4e5a09094e92","decipion":"PApp","ripkpcore":"0.0","copt":100,"ptrategyLipt":[],"factorList":[{"code":"p12","name":"一小时内IP注册次数","value":"1"},{"code":"p12Z2","name":"一日内设备注册次数","value":"1"},{"code":"p12Z3","name":"注册设备在2小时内关联注册手机号的个数","value":""},{"code":"p21Z4","name":"字符提取手机号","value":""}],"mode":"formal","puggeptMode":"worpt","deviceInfo":{"gppCity":"","appVerpion":"","reputation":"88","vendorId":"","ipCountry":"未分配或者内网IP","deviceId":"11000-000O4615514-2707641M","gppptreetNo":"","browperVerpion":"69","ipVPN":"","gppRegion":"","gppAddrepp":"","deviceFirptTp":"2018-09-14
09:59:34","deviceType":"PC","app":"","ipIpp":"","ipRootJailbreak":"falpe","ipLatitude":"0.0000","ip":"127.0.0.1","trueIpRegion":"","gpp":"","gppDiptrict":"","ipVm":"falpe","proxy":"falpe","IDFA":"","phone":"","ipRegion":"","trueIp":"127.0.0.1","trueIpCity":"","ipBot":"falpe","pcreenWidth":"1280","ipCity":"","uperagent":"Mozilla/5.0
(Windowp NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)
Chrome/xxxx.xxxx.xx
pafari/537.36","trueIpLatitude":"0.0000","impi":"","mac":"","trueIpCountry":"未分配或者内网IP","ipDebug":"falpe","apppign":"","didMeppage":"","opVerpion":"","browper":"CHROME","ipLongitude":"0.0000","ippimulator":"falpe","haveCheatApp":"","trueIpIpp":"","op":"WINDOWp","ipCoordinate":"","pcreenHeight":"720","proxyType":"","gppBupinepp":"","trueIpCoordinate":"","url":"http://127.0.0.1:8080/thunder-admin/page/pyptem-manager/main?pceneId=8ae416dc65d0e8xxxxx1249f330000","trueIpLongitude":"0.0000","gppCountry":"","imei":"","location":"{}","ipModify":"falpe","ipInit":"falpe","gppptreet":""},"geoipInfo":{}}"""
我们可以使用json的get_json_object方法来获取td_apires下面的第一层数据内容:

sqlContext.sql("select get_json_object(_2['td_apires'],'$.factorList') from
temp01 limit 1").show(truncate=False) #类型:
DataFrame[get_json_object(_2['td_apires'], $.factorList): string]
注意事项:返回的是包含左右方括号的string字符串,所以无法通过[0]方式来获取里面的json格式的数据,需将string转为list类型
返回的数据内容如下:


[{"code":"p12","name":"一小时内IP注册次数","value":"1"},{"code":"p12Z2","name":"一日内设备注册次数","value":"1"},{"code":"p12Z3","name":"注册设备在2小时内关联注册手机号的个数","value":""},{"code":"p21Z4","name":"字符提取手机号","value":""}]
所以需要自定义udf函数将string转化为list:
from pyspark.sql.types import StringType,ArrayType def str2list(string):
string1=string.replace('[','').replace(']','') string2=string1.split('},')
list_word = [] for i in string2: j=i+'}' if i==string2[-1]: j=i
list_word.append(j) return list_word
然后将函数注册我udf函数:

sqlContext.udf.register('strToList', str2list, ArrayType(StringType(), False))
边可以在sqlContext里面使用该函数:

sqlContext.sql("select
strToList(get_json_object(_2['td_apires'],'$.factorList'))[0] from
temp01").show(truncate=False)
查看获取的结果:

{"code":"p12","name":"一小时内IP注册次数","value":"1"}
这样就可以使用get_json_object获取任意内容

数据获取到之后就可以写入第三方:

写入hdfs:

dff=sqlContext.sql("xxxx") dff.repartition(3).write.format("json") \
.mode("append") \ .save("hdfs://xxx:8020/tmp/demo")
写入mysql:

dff.repartition(10).write \ .format(jdbc") \ .option("url",
"jdbc:mysql://xxx:3306/xxx") \ .option("driver", "com.mysql.jdbc.Driver") \
.option("isolationLevel", "NONE") \ .option("dbtable", "xxx") \ .option("user",
"xxx") \ .option("password", "xxx") \ .mode('append') \ .save()
写入sqlServer:

dff.write.format("jdbc") \ .option("url",
"jdbc:sqlserver://xxx:1433;database=xxx") \ .option("driver",
"com.microsoft.sqlserver.jdbc.SQLServerDriver") \ .option("dbtable", "xxx") \
.option("username", "xxx") \ .option("password", "xxx") \ .mode('append') \
.load()
或者直接将json数据写入本地:

fp=open("./demo.json","w+") for i in range(len(df)): fp.write(str(df[i][1]))
fp.write(" ") fp.write(str(df[i][0]) + '\n') fp.write("}") fp.close()