什么是Spark SQL
Spark SQL是一个用于结构化数据处理的Spark组件。所谓结构化数据,是指具有Schema信息的数据,例如json、parquet、avro、csv格式的数据。与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口。
Spark SQL的主要特点:
- 将SQL查询与Spark应用程序无缝组合
Spark SQL允许使用SQL在Spark程序中查询结构化数据。与Hive不同的是,Hive是将SQL翻译成MapReduce作业,底层是基于MapReduce;而Spark SQL底层使用的是Spark RDD。例如以下代码,在Spark应用程序中嵌入SQL语句:
1results = spark.sql( "SELECT * FROM people") - 以相同的方式连接到多种数据源
Spark SQL提供了访问各种数据源的通用方法,数据源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。例如以下代码,读取HDFS中的JSON文件,然后将该文件的内容创建为临时视图,最后与其他表根据指定的字段关联查询:
1234567//读取JSON文件val userScoreDF = spark.read.json("hdfs://centos01:9000/people.json")//创建临时视图user_scoreuserScoreDF.createTempView("user_score")//根据name关联查询val resDF=spark.sql("SELECT i.age,i.name,c.score FROM user_info i " +"JOIN user_score c ON i.name=c.name") - 在现有的数据仓库上运行SQL或HiveQL查询
Spark SQL支持HiveQL语法以及Hive SerDes和UDF(用户自定义函数),允许访问现有的Hive仓库。
DataFrame和Dataset
DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为DataFrame,例如:Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等。
DataFrame在RDD的基础上添加了数据描述信息(Schema,即元信息),因此看起来更像是一张数据库表。
Spark SQL基本使用
Spark Shell启动时除了默认创建一个名为“sc”的SparkContext的实例外,还创建了一个名为“spark”的SparkSession实例,该“spark”变量也可以在Spark Shell中直接使用。
SparkSession只是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext。SparkSession允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序,支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame数据。
例如,在HDFS中有一个文件/input/person.txt,文件内容:
1 2 3 |
1,zhangsan,25 2,lisi,22 3,wangwu,30 |
现需要使用Spark SQL将该文件中的数据按照年龄降序排列,步骤:
-
加载数据为Dataset
调用SparkSession的API read.textFile()可以读取指定路径中的文件内容,并加载为一个Dataset:123$ spark-shell --master yarnscala> val d1=spark.read.textFile("hdfs://centos01:9000/input/person.txt")d1: org.apache.spark.sql.Dataset[String] = [value: string]从变量d1的类型可以看出,textFile()方法将读取的数据转为了Dataset。除了textFile()方法读取文本内容外,还可以使用csv()、jdbc()、json()等方法读取csv文件、jdbc数据源、json文件等数据。调用Dataset中的show()方法可以输出Dataset中的数据内容。查看d1中的数据内容:
12345678scala> d1.show()+-------------+| value|+-------------+|1,zhangsan,25||2,lisi,22||3,wangwu,30|+-------------+从上述内容可以看出,Dataset将文件中的每一行看做一个元素,并且所有元素组成了一列,列名默认为“value”。
如果Spark报错
ClassNotFoundException: Class com.hadoop.compression.lzo.Lzo
说明Hadoop支持Lzo压缩但是Spark没有配置支持压缩相关的类库,可以修改spark-defaults.conf
并添加两行:
spark.driver.extraClassPath /opt/pkg/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar spark.executor.extraClassPath /opt/pkg/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar
-
给Dataset添加元数据信息
定义一个样例类Person,用于存放数据描述信息(Schema):1scala> case class Person(id:Int,name:String,age:Int)导入SparkSession的隐式转换,以便后续可以使用Dataset的算子:
1scala> import spark.implicits._调用Dataset的map()算子将每一个元素拆分并存入Person类中:
1234567scala> val personDataset=d1.map(line=>{| val fields = line.split(",")| val id = fields(0).toInt| val name = fields(1)| val age = fields(2).toInt| Person(id, name, age)| })此时查看personDataset中的数据内容,personDataset中的数据类似于一张关系型数据库的表:
12345678scala> personDataset.show()+---+--------+---+| id| name|age|+---+--------+---+| 1|zhangsan| 25|| 2| lisi| 22|| 3| wangwu| 30|+---+--------+---+ -
将Dataset转为DataFrame
Spark SQL查询的是DataFrame中的数据,因此需要将存有元数据信息的Dataset转为DataFrame。
调用Dataset的toDF()方法,将存有元数据的Dataset转为DataFrame,代码:12scala> val pdf = personDataset.toDF()pdf: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] -
执行SQL查询
在DataFrame上创建一个临时视图“v_person”,代码:1scala> pdf.createTempView("v_person")使用SparkSession对象执行SQL查询,代码:
12scala> val result = spark.sql("select * from v_person order by age desc")result: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]调用show()方法输出结果数据,代码:
12345678scala> result.show()+---+--------+---+| id| name|age|+---+--------+---+| 3| wangwu| 30|| 1|zhangsan| 25|| 2| lisi | 22|+---+--------+---+可以看到,结果数据已按照age字段降序排列。
-
指定导出文件格式
12345678# 导出json格式文件scala> result.write.format("json").save("/output/json")# 导出scv格式文件scala> result.write.format("csv").save("/output/csv")# 导出orc格式文件scala> result.write.format("orc").save("/output/orc")# 导出parquet格式文件scala> result.write.format("parquet").save("/output/parquet")查看导出的文件(有几个文件说明有几个分区):
123456789101112131415$ hadoop fs -cat /output/json/part-*{"id":3,"name":"wangwu","age":30}{"id":1,"name":"zhangsan","age":25}{"id":2,"name":"lisi","age":22}$ hadoop fs -cat /output/csv/part-*3,wangwu,301,zhangsan,252,lisi,22$ hadoop fs -cat /output/orc/part-*ORC...(二进制乱码)[hadoop@hadoop100 conf]$ hadoop fs -cat /output/parquet/part-*PAR1...(二进制乱码) -
分区设置
Spark中使用SparkSql进行shuffle操作,默认分区数是200
个;参数配置是--conf spark.sql.shuffle.partitions
, 如果想要修改SparkSQL执行shuffle操作时分区数:-
配置 spark.sql.shuffle.partitions,适用场景spark.sql()合并分区
1spark.conf.set("spark.sql.shuffle.partitions", 5) #后面的数字是你希望的分区数这样配置后,通过spark.sql()执行后写出的数据分区数就是你要求的个数,如这里5。
-
配置 coalesce(n),适用场景spark写出数据到指定路径下合并分区,不会引起shuffle
123456df = spark.sql(sql_string).coalesce(1) #合并分区数df.write.format("csv").mode("overwrite").option("sep", ",").option("header", True).save(hdfs_path) -
配置repartition(n), 重新分区,会引发shuffle
123456df = spark.sql(sql_string).repartition(1) #重新分区,会引发全局shuffledf.write.format("csv").mode("overwrite").option("sep", ",").option("header", True).save(hdfs_path)
-
-
分区分桶导出
- 并行写出之 partitionBy() 指定分区列 , 会根据分区列创建子文件夹,并行写出数据
123df.write.mode("overwrite").partitionBy("day").save("/tmp/partitioned-files.parquet") - 并行写出之 repartition() ,一般spark中有几个分区就会有几个并行的IO写出
123df.repartition(5).write.format("csv").save("/tmp/multiple.csv") - 分桶写出,好处是后续读入的时候数据就不会做shuffle了,因为相同分桶的数据会被划分到同一个物理分区中
1234csvFile.write.format("parquet").mode("overwrite").bucketBy(5, "gmv") #第一个参数:分成几个桶,第二个参数:按哪列进行分桶.saveAsTable("bucketedFiles")Spark IO相关API请参考:Spark官方API文档Input and Output
- 并行写出之 partitionBy() 指定分区列 , 会根据分区列创建子文件夹,并行写出数据
Spark SQL数据源
Spark SQL支持通过DataFrame接口对各种数据源进行操作。DataFrame可以使用相关转换算子进行操作,也可以用于创建临时视图。将DataFrame注册为临时视图可以对其中的数据使用SQL查询。
Spark SQL提供了两个常用的加载数据和写入数据的方法:load()
方法和save()
方法。load()
方法可以加载外部数据源为一个DataFrame,save()
方法可以将一个DataFrame写入到指定的数据源。
1、默认数据源
默认情况下,load()
方法和save()
方法只支持Parquet格式的文件,也可以在配置文件中通过参数spark.sql.sources.default
对默认文件格式进行更改。
Spark SQL可以很容易的读取Parquet文件并将其数据转为DataFrame数据集。例如,读取HDFS中的文件/users.parquet
,并将其中的name
列与favorite_color
列写入HDFS的/result
目录,代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
val spark = SparkSession.builder() //创建或得到SparkSession .appName("SparkSQLDataSource") .master("local[*]") .getOrCreate() //加载parquet格式的文件,返回一个DataFrame集合 val usersDF = spark.read.load("hdfs://centos01:9000/users.parquet") usersDF.show() // +------+--------------+----------------+ // | name|favorite_color|favorite_numbers| // +------+--------------+----------------+ // |Alyssa| null| [3, 9, 15, 20]| // | Ben| red| []| // +------+--------------+----------------+ //查询DataFrame中的name列和favorite_color列,并写入HDFS usersDF.select("name","favorite_color") .write.save("hdfs://centos01:9000/result") |
除了使用select()
方法查询外,也可以使用SparkSession
对象的sql()
方法执行SQL语句进行查询,该方法的返回结果仍然是一个DataFrame。
1 2 3 4 5 |
//创建临时视图 usersDF.createTempView("t_user") //执行SQL查询,并将结果写入到HDFS spark.sql("SELECT name,favorite_color FROM t_user") .write.save("hdfs://centos01:9000/result") |
2、手动指定数据源
使用format()方法可以手动指定数据源。数据源需要使用完全限定名(例如org.apache.spark.sql.parquet),但对于Spark SQL的内置数据源,也可以使用它们的缩写名(json,parquet,jdbc,orc,libsvm,csv,text)。例如,手动指定csv格式的数据源:
1 |
val peopleDFCsv=spark.read.format("csv").load("hdfs://centos01:9000/people.csv") |
在指定数据源的同时,可以使用option()方法向指定的数据源传递所需参数。例如,向JDBC数据源传递账号、密码等参数:
1 2 3 4 5 6 7 |
val jdbcDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.1.69:3306/spark_db") .option("driver","com.mysql.jdbc.Driver") .option("dbtable", "student") .option("user", "root") .option("password", "123456") .load() |
3、数据写入模式
在写入数据的同时,可以使用mode()
方法指定如何处理已经存在的数据,该方法的参数是一个枚举类SaveMode,其取值解析如下:
SaveMode.ErrorIfExists
:默认值。当向数据源写入一个DataFrame时,如果数据已经存在,则会抛出异常。SaveMode.Append
:当向数据源写入一个DataFrame时,如果数据或表已经存在,则会在原有的基础上进行追加。SaveMode.Overwrite
:当向数据源写入一个DataFrame时,如果数据或表已经存在,则会将其覆盖(包括数据或表的Schema)。SaveMode.Ignore
:当向数据源写入一个DataFrame时,如果数据或表已经存在,则不会写入内容,类似SQL中的“CREATE TABLE IF NOT EXISTS”。
例如,HDFS中有一个JSON格式的文件/people.json,内容:
1 2 3 |
{"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} |
现需要查询该文件中的name列,并将结果写入HDFS的/result目录中,若该目录存在则将其覆盖,代码:
1 2 3 4 |
val peopleDF = spark.read.format("json").load("hdfs://centos01:9000/people.json") peopleDF.select("name") .write.mode(SaveMode.Overwrite).format("json") .save("hdfs://centos01:9000/result") |
4、分区自动推断
表分区是Hive等系统中常用的优化查询效率的方法(Spark SQL的表分区与Hive的表分区类似)。在分区表中,数据通常存储在不同的分区目录中,分区目录通常以“分区列名=值”的格式进行命名。例如,以people
作为表名,gender
和country
作为分区列,存储数据的目录结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
path └── to └── people ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ... |
对于所有内置的数据源(包括Text/CSV/JSON/ORC/Parquet
),Spark SQL都能够根据目录名自动发现和推断分区信息。分区示例:
-
在本地(或HDFS)新建以下三个目录及文件,其中的目录
people
代表表名,gender
和country
代表分区列,people.json
存储实际人口数据:123D:\people\gender=male\country=CN\people.jsonD:\people\gender=male\country=US\people.jsonD:\people\gender=female\country=CN\people.json三个people.json文件的数据分别如下:
123{"name":"zhangsan","age":32}{"name":"lisi", "age":30}{"name":"wangwu", "age":19}123{"name":"Michael"}{"name":"Jack", "age":20}{"name":"Justin", "age":18}123{"name":"xiaohong","age":17}{"name":"xiaohua", "age":22}{"name":"huanhuan", "age":16} -
执行以下代码,读取表people的数据并显示:
123val usersDF = spark.read.format("json").load("D:\\people") //读取表数据为一个DataFrameusersDF.printSchema() //输出Schema信息usersDF.show() //输出表数据控制台输出的Schema信息如下:
12345root|-- age: long (nullable = true)|-- name: string (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)控制台输出的表数据如下:
12345678910111213+----+--------+------+-------+| age| name|gender|country|+----+--------+------+-------+| 17|xiaohong|female| CN|| 22| xiaohua|female| CN|| 16|huanhuan|female| CN|| 32|zhangsan| male| CN|| 30| lisi| male| CN|| 19| wangwu| male| CN||null| Michael| male| US|| 20| Jack| male| US|| 18| Justin| male| US|+----+--------+------+-------+
从控制台输出的Schema信息和表数据可以看出,Spark SQL在读取数据时,自动推断出了两个分区列gender
和country
,并将该两列的值添加到了DataFrame中。
Parquet文件
Apache Parquet是Hadoop生态系统中任何项目都可以使用的列式存储格式,不受数据处理框架、数据模型和编程语言的影响。Spark SQL支持对Parquet文件的读写,并且可以自动保存源数据的Schema。当写入Parquet文件时,为了提高兼容性,所有列都会自动转换为“可为空”状态。
加载和写入Parquet文件时,除了可以使用load()
方法和save()
方法外,还可以直接使用Spark SQL内置的parquet()
方法,例如以下代码:
1 2 3 4 5 |
//读取Parquet文件为一个DataFrame val usersDF = spark.read.parquet("hdfs://centos01:9000/users.parquet") //将DataFrame相关数据保存为Parquet文件,包括Schema信息 usersDF.select("name","favorite_color") .write.parquet("hdfs://centos01:9000/result") |
JSON数据集
Spark SQL可以自动推断JSON文件的Schema,并将其加载为DataFrame。在加载和写入JSON文件时,除了可以使用load()
方法和save()
方法外,还可以直接使用Spark SQL内置的json()
方法。该方法不仅可以读写JSON文件,还可以将Dataset[String]类型的数据集转为DataFrame。
需要注意的是,要想成功的将一个JSON文件加载为DataFrame,JSON文件的每一行必须包含一个独立有效的JSON对象,而不能将一个JSON对象分散在多行。例如以下JSON内容可以被成功加载:
1 2 3 |
{"name":"zhangsan","age":32} {"name":"lisi", "age":30} {"name":"wangwu", "age":19} |
使用json()方法加载JSON数据的例子如下代码所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
//创建或得到SparkSession val spark = SparkSession.builder() .appName("SparkSQLDataSource") .config("spark.sql.parquet.mergeSchema",true) .master("local[*]") .getOrCreate() /****1. 创建用户基本信息表*****/ import spark.implicits._ //创建用户信息Dataset集合 val arr=Array( "{'name':'zhangsan','age':20}", "{'name':'lisi','age':18}" ) val userInfo: Dataset[String] = spark.createDataset(arr) //将Dataset[String]转为DataFrame val userInfoDF = spark.read.json(userInfo) //创建临时视图user_info userInfoDF.createTempView("user_info") //显示数据 userInfoDF.show() // +---+--------+ // |age| name| // +---+--------+ // | 20|zhangsan| // | 18| lisi| // +---+--------+ /****2. 创建用户成绩表*****/ //读取JSON文件 val userScoreDF = spark.read.json("D:\\people\\people.json") //创建临时视图user_score userScoreDF.createTempView("user_score") userScoreDF.show() // +--------+-----+ // | name|score| // +--------+-----+ // |zhangsan| 98| // | lisi| 88| // | wangwu| 95| // +--------+-----+ /****3. 根据name字段关联查询*****/ val resDF=spark.sql("SELECT i.age,i.name,c.score FROM user_info i " + "JOIN user_score c ON i.name=c.name") resDF.show() // +---+--------+-----+ // |age| name|score| // +---+--------+-----+ // | 20|zhangsan| 98| // | 18| lisi| 88| // +---+--------+-----+ |
Hive 表
Spark SQL还支持读取和写入存储在Apache Hive中的数据。然而,由于Hive有大量依赖项,这些依赖项不包括在默认的Spark发行版中,如果在classpath上配置了这些Hive依赖项,Spark将自动加载它们。需要注意的是,这些Hive依赖项必须出现在所有Worker节点上,因为它们需要访问Hive序列化和反序列化库(SerDes),以便访问存储在Hive中的数据。
使用Spark SQL读取和写入Hive数据:
1、创建SparkSession对象
创建一个SparkSession对象,并开启Hive支持,代码:
1 2 3 4 5 |
val spark = SparkSession .builder() .appName("Spark Hive Demo") .enableHiveSupport()//开启Hive支持 .getOrCreate() |
2、创建Hive表
创建一张Hive表students,并指定字段分隔符为制表符“\t”,代码:
1 2 |
spark.sql("CREATE TABLE IF NOT EXISTS students (name STRING, age INT) " + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'") |
3、导入本地数据到Hive表
本地文件/home/hadoop/students.txt
的内容如下(字段之间以制表符“\t”分隔):
1 2 3 |
zhangsan 20 lisi 25 wangwu 19 |
将本地文件/home/hadoop/students.txt
中的数据导入到表students
中,代码:
1 |
spark.sql("LOAD DATA LOCAL INPATH '/home/hadoop/students.txt' INTO TABLE students") |
4、查询表数据
查询表students的数据并显示到控制台,代码:
1 |
spark.sql("SELECT * FROM students").show() |
显示结果:
1 2 3 4 5 6 7 |
+--------+---+ | name|age| +--------+---+ |zhangsan| 20| | lisi| 25| | wangwu| 19| +--------+---+ |
5、创建表的同时指定存储格式
创建一个Hive表hive_records,数据存储格式为Parquet(默认为普通文本格式),代码:
1 |
spark.sql("CREATE TABLE hive_records(key STRING, value INT) STORED AS PARQUET") |
6、将DataFrame写入Hive表
使用saveAsTable()
方法可以将一个DataFrame写入到指定的Hive表中。例如,加载students表的数据并转为DataFrame,然后将DataFrame写入Hive表hive_records
中,代码:
1 2 3 4 5 6 |
//加载students表的数据为DataFrame val studentsDF = spark.table("students") //将DataFrame以覆盖的方式写入表hive_records中 studentsDF.write.mode(SaveMode.Overwrite).saveAsTable("hive_records") //查询hive_records表数据并显示到控制 spark.sql("SELECT * FROM hive_records").show() |
Spark SQL应用程序写完后,需要提交到Spark集群中运行。若以Hive为数据源,提交之前需要做好Hive数据仓库、元数据库等的配置。
JDBC
Spark SQL还可以使用JDBC API从其他关系型数据库读取数据,返回的结果仍然是一个DataFrame,可以很容易地在Spark SQL中处理,或者与其他数据源进行连接查询。在使用JDBC连接数据库时可以指定相应的连接属性,常用的连接属性如表。
使用JDBC API对MySQL表student和表score进行关联查询,代码:
1 2 3 4 5 6 7 8 |
val jdbcDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.1.69:3306/spark_db") .option("driver","com.mysql.jdbc.Driver") .option("dbtable", "(select st.name,sc.score from student st,score sc " + "where st.id=sc.id) t") .option("user", "root") .option("password", "123456") .load() |
上述代码中,dbtable属性的值是一个子查询,相当于SQL查询中的FROM关键字后的一部分。除了上述查询方式外,使用query属性编写完整SQL语句进行查询也能达到同样的效果,代码:
1 2 3 4 5 6 7 8 |
val jdbcDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.1.234:3306/spark_db") .option("driver","com.mysql.jdbc.Driver") .option("query", "select st.name,sc.score from student st,score sc " + "where st.id=sc.id") .option("user", "root") .option("password", "123456") .load() |
Spark SQL内置函数
Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions
中。这些函数主要分为10类:UDF函数、聚合函数、日期函数、排序函数、非聚合函数、数学函数、混杂函数、窗口函数、字符串函数、集合函数,大部分函数与Hive中相同。
使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL语句中使用。例如,以编程的方式使用lower()
函数将用户姓名转为小写,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
//显示DataFrame数据(df指DataFrame对象) df.show() // +--------+ // | name| // +--------+ // |ZhangSan| // | LiSi| // | WangWu| // +--------+ //使用lower()函数将某列转为小写 import org.apache.spark.sql.functions._ df.select(lower(col("name")).as("name")).show() // +--------+ // | name| // +--------+ // |zhangsan| // | lisi| // | wangwu| // +--------+ |
Spark SQL自定义函数
当Spark SQL提供的内置函数不能满足查询需求时,用户也可以根据自己的业务编写自定义函数
(User Defined Functions,UDF),然后在Spark SQL中调用。
Spark SQL提供了一些常用的聚合函数,如count()
、countDistinct()
、avg()
、max()
、min()
等。此外,用户也可以根据自己的业务编写自定义聚合函数(User Defined Aggregate Functions,UDAF)。
UDF主要是针对单个输入,返回单个输出;而UDAF则可以针对多个输入进行聚合计算返回单个输出,功能更加强大。要编写UDAF,需要新建一个类,继承抽象类UserDefinedAggregateFunction,并实现其中未实现的方法。
Spark SQL开窗函数
row_number()开窗函数是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排序的顺序添加一列行号(从1开始),根据行号可以方便的对每一组数据取前N行(分组取TOPN)。row_number()函数的使用格式如下:
1 |
row_number() over (partition by 列名 order by 列名 desc) 行号列别名 |
格式说明:
partition by
:按照某一列进行分组。order by
:分组后按照某一列进行组内排序。desc
:降序,默认升序。
Views: 138