SparkSql中DataFrame与json、csv、mysql、hive数据的互操作

1.RDD转换成DataFrame

1.1 RDD to DataFrame

RDD转成DataFrame有2种方式,一种是通过隐式转换,一种是通过SparkSession来进行创建。

1.1.1 RDD隐式转换成DataFrame(推荐)
object RDDToDF { def main(args: Array[String]): Unit = { // 创建一个SparkSession
val session = SparkSession.builder().master("local[2]").appName("RDDToDF"
).getOrCreate()// 通过session获取SparkContext val sc = session.sparkContext //
将文件中的数据转换成RDD val rdd = sc.textFile(
"file:///D:/workspacescala/sparkdemo/sparksqldemo/people.txt").map(_.split(" "
)).map(x => Person(x(0), x(1).toLong)) rdd.foreach(println) //
导入隐式转换(****重要****) import session.implicits._ // 将rdd转换成DataFrame val df =
rdd.toDF() df.show()// 打印 DataFrame 的 Schema约束信息 df.printSchema() } } // 创建一个
case 类,Person封装RDD中的数据,需要给DataFrame形成一个Schema约束 case class Person(name: String,
age: Long)
输出结果:





1.1.2 通过SaprkSession创建的方式

需要通过SchemaType来进行约束
object RDDToDF2 { def main(args: Array[String]): Unit = { val session =
SparkSession.builder().master("local[2]").appName("RDDToDF2").getOrCreate() val
sc = session.sparkContext// 这里不使用case class类,使用spark 提供的 Row类 val rdd =
sc.textFile("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.txt"
).map(_.split(" ")).map(x => Row(x(0), x(1).trim)) val schemaString = "name age"
val splits = schemaString.split(" ") // 约束字段 val fields = schemaString.split(" "
).map(fieldName => StructField(fieldName, StringType, nullable =true)) // 约束类型
val schema = StructType(fields) // 将RDD转换成DataFrame val df =
session.createDataFrame(rdd, schema)// 显示DataFrame的数据 df.show() // 打印schema
df.printSchema() } }
输出结果:



1.2 DataFrame几个api介绍

*
show():将DataFrame的数据直接输出

*
printSchema():打印schema约束信息

*
select():选择字段

*
filter():对DataFrame字段进行过滤

*
groupby():对字段进行分组

*
count():求数量

DataFrame还有一些其他的api,作用和sql语句类似

eg:
df.select("name").show()
结果:


# 这里$"age"需要导入隐式转换才行 df.select("age").filter($"age" >=28).show()
结果:


df.groupBy("name").count().show()
结果:



2.json文件转DataFrame

2.1 json转DataFrame
// 创建session对象 val session = SparkSession.builder().appName("JsonToDF").master(
"local[2]").getOrCreate() // 通过session 读取 json数据,转换成DataFrame对象 val df = session
.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json") //
展示数据 df.show()
结果:



2.2 DataFrame创建temp view,使用sql语句查询
object JsonToDF { def main(args: Array[String]): Unit = { // 创建session对象 val
session = SparkSession.builder().appName("JsonToDF").master("local[2]")
.getOrCreate() // 通过session 读取 json数据,转换成DataFrame对象 val df = session.read.json(
"file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json") //
将DataFrame的数据,创建成一个临时的"表",表名:person,这个只能在当前的session域中有效 df
.createOrReplaceTempView("person") // 通过sql语句进行查询 session.sql("select * from
person").show() // 创建一个全局的temp“表”,这个可以跨session会话有效 df.createGlobalTempView(
"person") session.newSession().sql("select * from global_temp.person where
age>=28").show() session.stop() } }
结果:





3.csv文件转DataFrame

3.1 csv文件转DataFrame
val session = SparkSession.builder().appName("CsvToDF").master("local[2]")
.getOrCreate() //
通过指定格式,load数据,option:可以给文件格式添加参数,如是否有header。如果没有header,生成的DataFrame会默认生成如:_c1,_c2
//val df = session.read.format("csv").option("header", "true").load(
"file:///D:/workspacescala/sparkdemo/sparksqldemo/people2.csv") // df封装的csv方法
val df = session.read.option("header", "true").csv(
"file:///D:/workspacescala/sparkdemo/sparksqldemo/people2.csv") df.show()


3.2 保存DataFrame数据到文件
object CsvToDF { def main(args: Array[String]): Unit = { val session =
SparkSession.builder().appName("CsvToDF").master("local[2]").getOrCreate() val
df = session.read.csv(
"file:///D:/workspacescala/sparkdemo/sparksqldemo/people.csv") // 导入隐式转换 import
session.implicits._ // 将df的数据保存到本地文件,这里因为csv没有头信息,没有对应的schema df.filter($"_c1" >
28).write.csv("d:/csvfile") } }
csv文件的储存和内容:





4.mysql数据与DataFrame的互操作

4.1 DataFrame将数据写入Mysql
object DFToMysql { def main(args: Array[String]): Unit = { val session =
SparkSession.builder().appName("DFToMysql").master("local[2]").getOrCreate()
val df = session.read.json(
"file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json") df.show() //
df写入mysql的第一种方式 val p = new Properties() p.put("user", "root") p.put("password",
"root") // 将df数据写入mysql,这里mode的模式有4种:append:追加,overwrite:覆盖,error , ignore //
overwrite:如果表不存在,会自动创建,覆盖会将之间定义好字段类型,进行修改 df.write.mode("append").jdbc(
"jdbc:mysql://localhost:3306/db_people", "tb_people", p) //df.write.mode(
"overwrite").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", p) //
df写入mysql的第二种方式 /*df.write.mode("append") .format("jdbc") .option("url",
"jdbc:mysql://localhost:3306/db_people") .option("dbtable", "tb_people")
.option("user", "root") .option("password", "root") .save()*/ } }
结果:



4.2 Mysql数据导入为DataFrame
object MySqlToDF { def main(args: Array[String]): Unit = { val session =
SparkSession.builder().master("local[2]").appName("MySqlToDF").getOrCreate() //
jdbc加载的第一种方式 val p = new Properties() p.put("user", "root") p.put("password",
"root") // 通过加载jdbc,获取DataFrame数据 val df = session.read.format("jdbc").jdbc(
"jdbc:mysql://localhost:3306/db_people", "tb_people", properties = p) df.show()
// 加载jdbc的第2中方式 /*val df2 = session.read.format("jdbc") .option("url",
"jdbc:mysql://localhost:3306/db_people") .option("dbtable", "tb_people")
.option("user", "root") .option("password", "root") .load() df2.show()*/ } }


5 Spark使用HQL查询hive中的数据

前提:需要 安装Hive <https://blog.csdn.net/qq_33689414/article/details/80063674>

5.1 将hive-site.xml放入resource目录



hive-site.xml内容
<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>
jdbc:mysql://server01:3306/hive?createDatabaseIfNotExist=true</value> <
description>JDBC connect string for a JDBC metastore</description> </property> <
property> <name>javax.jdo.option.ConnectionDriverName</name> <value>
com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC
metastore</description> </property> <property> <name>
javax.jdo.option.ConnectionUserName</name> <value>root</value> <description>
username to use against metastore database</description> </property> <property>
<name>javax.jdo.option.ConnectionPassword</name> <value>root</value> <
description>password to use against metastore database</description> </property>
</configuration>
5.2 Spark操作Hive数据

Spark新的api实现,使用SparkSession实现
object HiveToDF { def main(args: Array[String]): Unit = { // 新版本 val session =
SparkSession.builder().appName("hivetosql").master("local"
).enableHiveSupport().getOrCreate() session.sqlContext.sql("use default")
session.sqlContext.sql("select * from student").show() } }
使用HiveContext实现
object HiveToDF { def main(args: Array[String]): Unit = { // 老版本 val conf =
new SparkConf().setAppName("HiveToDF").setMaster("local") val sc = new
SparkContext(conf) val hiveContext = new HiveContext(sc) hiveContext.sql("use
default") hiveContext.sql("select * from student").show() } }


补充:spark-shell直接操作Hive数据

*
将hive-site.xml文件放入$SPARK_HOME/conf目录下

*
启动spark-shell时指定mysql连接驱动位置
spark-shell \ --master spark://server01:7077 \ --executor-memory 1g \ --total
-executor-cores 2 \ --driver-class-path /hadoop/hive/lib/mysql-connector-java-
5.1.35-bin.jar


* 操作hive数据 # spark是SparkSession对象 spark.sqlContext.sql("select * from student"
).show()


或者
import org.apache.spark.sql.hive.HiveContext val hiveContext = new
HiveContext(sc) hiveContext.sql("select * from student").show()


补充2:Spark-sql直接操作Hive

*
将hive-site.xml文件放入$SPARK_HOME/conf目录下

*
启动spark-shell时指定mysql连接驱动位置
spark-sql \ --master spark://server01:7077 \ --executor-memory 1g \ --total
-executor-cores 2 \ --driver-class-path /hadoop/hive/lib/mysql-connector-java-
5.1.35-bin.jar


3.操作hive数据
show tables;

select * from person;

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信