Spark SQL 结构化数据处理引擎

什么是Spark SQL

Spark SQL是一个用于结构化数据处理的Spark组件。所谓结构化数据,是指具有Schema信息的数据,例如json、parquet、avro、csv格式的数据。与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口。

Spark SQL的主要特点:

  1. 将SQL查询与Spark应用程序无缝组合
    Spark SQL允许使用SQL在Spark程序中查询结构化数据。与Hive不同的是,Hive是将SQL翻译成MapReduce作业,底层是基于MapReduce;而Spark SQL底层使用的是Spark RDD。例如以下代码,在Spark应用程序中嵌入SQL语句:

    results = spark.sql( "SELECT * FROM people")
  2. 以相同的方式连接到多种数据源
    Spark SQL提供了访问各种数据源的通用方法,数据源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。例如以下代码,读取HDFS中的JSON文件,然后将该文件的内容创建为临时视图,最后与其他表根据指定的字段关联查询:

    //读取JSON文件
    val userScoreDF = spark.read.json("hdfs://centos01:9000/people.json")
    //创建临时视图user_score
    userScoreDF.createTempView("user_score")
    //根据name关联查询
    val resDF=spark.sql("SELECT i.age,i.name,c.score FROM user_info i " +
                    "JOIN user_score c ON i.name=c.name")
  3. 在现有的数据仓库上运行SQL或HiveQL查询
    Spark SQL支持HiveQL语法以及Hive SerDes和UDF(用户自定义函数),允许访问现有的Hive仓库。

DataFrame和Dataset

DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为DataFrame,例如:Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等。
DataFrame在RDD的基础上添加了数据描述信息(Schema,即元信息),因此看起来更像是一张数据库表。

file

Spark SQL基本使用

Spark Shell启动时除了默认创建一个名为“sc”的SparkContext的实例外,还创建了一个名为“spark”的SparkSession实例,该“spark”变量也可以在Spark Shell中直接使用。
SparkSession只是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext。SparkSession允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序,支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame数据。
例如,在HDFS中有一个文件/input/person.txt,文件内容:

1,zhangsan,25
2,lisi,22
3,wangwu,30

现需要使用Spark SQL将该文件中的数据按照年龄降序排列,步骤:

  1. 加载数据为Dataset
    调用SparkSession的API read.textFile()可以读取指定路径中的文件内容,并加载为一个Dataset:

    $ spark-shell --master yarn
    scala> val d1=spark.read.textFile("hdfs://centos01:9000/input/person.txt")
    d1: org.apache.spark.sql.Dataset[String] = [value: string]

    从变量d1的类型可以看出,textFile()方法将读取的数据转为了Dataset。除了textFile()方法读取文本内容外,还可以使用csv()、jdbc()、json()等方法读取csv文件、jdbc数据源、json文件等数据。调用Dataset中的show()方法可以输出Dataset中的数据内容。查看d1中的数据内容:

    scala> d1.show()
    +-------------+
    |        value|
    +-------------+
    |1,zhangsan,25|
    |2,lisi,22|
    |3,wangwu,30|
    +-------------+

    从上述内容可以看出,Dataset将文件中的每一行看做一个元素,并且所有元素组成了一列,列名默认为“value”。

    如果Spark报错ClassNotFoundException: Class com.hadoop.compression.lzo.Lzo说明Hadoop支持Lzo压缩但是Spark没有配置支持压缩相关的类库,可以修改spark-defaults.conf并添加两行:
    spark.driver.extraClassPath /opt/pkg/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar spark.executor.extraClassPath /opt/pkg/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar

  2. 给Dataset添加元数据信息
    定义一个样例类Person,用于存放数据描述信息(Schema):

    scala> case class Person(id:Int,name:String,age:Int)

    导入SparkSession的隐式转换,以便后续可以使用Dataset的算子:

    scala> import spark.implicits._

    调用Dataset的map()算子将每一个元素拆分并存入Person类中:

    scala> val personDataset=d1.map(line=>{
         | val fields = line.split(",")
         | val id = fields(0).toInt
         | val name = fields(1)
         | val age = fields(2).toInt
         | Person(id, name, age)
         | })

    此时查看personDataset中的数据内容,personDataset中的数据类似于一张关系型数据库的表:

    scala> personDataset.show()
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 25|
    |  2|    lisi| 22|
    |  3|  wangwu| 30|
    +---+--------+---+
  3. 将Dataset转为DataFrame
    Spark SQL查询的是DataFrame中的数据,因此需要将存有元数据信息的Dataset转为DataFrame。
    调用Dataset的toDF()方法,将存有元数据的Dataset转为DataFrame,代码:

    scala> val pdf = personDataset.toDF()
    pdf: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
  4. 执行SQL查询
    在DataFrame上创建一个临时视图“v_person”,代码:

    scala> pdf.createTempView("v_person")

    使用SparkSession对象执行SQL查询,代码:

    scala> val result = spark.sql("select * from v_person order by age desc")
    result: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

    调用show()方法输出结果数据,代码:

    scala> result.show()
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  3|  wangwu| 30|
    |  1|zhangsan| 25|
    |  2|    lisi | 22|
    +---+--------+---+

    可以看到,结果数据已按照age字段降序排列。

  5. 指定导出文件格式

    # 导出json格式文件
    scala> result.write.format("json").save("/output/json")
    # 导出scv格式文件
    scala> result.write.format("csv").save("/output/csv")
    # 导出orc格式文件
    scala> result.write.format("orc").save("/output/orc")
    # 导出parquet格式文件
    scala> result.write.format("parquet").save("/output/parquet")

    查看导出的文件(有几个文件说明有几个分区):

    $ hadoop fs -cat /output/json/part-*
    {"id":3,"name":"wangwu","age":30}
    {"id":1,"name":"zhangsan","age":25}
    {"id":2,"name":"lisi","age":22}
    
    $ hadoop fs -cat /output/csv/part-*
    3,wangwu,30
    1,zhangsan,25
    2,lisi,22
    
    $ hadoop fs -cat /output/orc/part-*
    ORC...(二进制乱码)
    
    [hadoop@hadoop100 conf]$ hadoop fs -cat /output/parquet/part-*
    PAR1...(二进制乱码)
  6. 分区设置
    Spark中使用SparkSql进行shuffle操作,默认分区数是200个;参数配置是--conf spark.sql.shuffle.partitions, 如果想要修改SparkSQL执行shuffle操作时分区数:

    1. 配置 spark.sql.shuffle.partitions,适用场景spark.sql()合并分区

      spark.conf.set("spark.sql.shuffle.partitions", 5) #后面的数字是你希望的分区数

      这样配置后,通过spark.sql()执行后写出的数据分区数就是你要求的个数,如这里5。

    2. 配置 coalesce(n),适用场景spark写出数据到指定路径下合并分区,不会引起shuffle

      df = spark.sql(sql_string).coalesce(1) #合并分区数
      df.write.format("csv")
      .mode("overwrite")
      .option("sep", ",")
      .option("header", True)
      .save(hdfs_path)
    3. 配置repartition(n), 重新分区,会引发shuffle

      df = spark.sql(sql_string).repartition(1) #重新分区,会引发全局shuffle
      df.write.format("csv")
      .mode("overwrite")
      .option("sep", ",")
      .option("header", True)
      .save(hdfs_path)
  7. 分区分桶导出

    1. 并行写出之 partitionBy() 指定分区列 , 会根据分区列创建子文件夹,并行写出数据
      df.write.mode("overwrite")
      .partitionBy("day")
      .save("/tmp/partitioned-files.parquet")
    2. 并行写出之 repartition() ,一般spark中有几个分区就会有几个并行的IO写出
      df.repartition(5)
      .write.format("csv")
      .save("/tmp/multiple.csv")
    3. 分桶写出,好处是后续读入的时候数据就不会做shuffle了,因为相同分桶的数据会被划分到同一个物理分区中
      csvFile.write.format("parquet")
      .mode("overwrite")
      .bucketBy(5, "gmv") #第一个参数:分成几个桶,第二个参数:按哪列进行分桶
      .saveAsTable("bucketedFiles")

      Spark IO相关API请参考:Spark官方API文档Input and Output

Spark SQL数据源

Spark SQL支持通过DataFrame接口对各种数据源进行操作。DataFrame可以使用相关转换算子进行操作,也可以用于创建临时视图。将DataFrame注册为临时视图可以对其中的数据使用SQL查询。
Spark SQL提供了两个常用的加载数据和写入数据的方法:load()方法和save()方法。load()方法可以加载外部数据源为一个DataFrame,save()方法可以将一个DataFrame写入到指定的数据源。

1、默认数据源
默认情况下,load()方法和save()方法只支持Parquet格式的文件,也可以在配置文件中通过参数spark.sql.sources.default对默认文件格式进行更改。
Spark SQL可以很容易的读取Parquet文件并将其数据转为DataFrame数据集。例如,读取HDFS中的文件/users.parquet,并将其中的name列与favorite_color列写入HDFS的/result目录,代码:

val spark = SparkSession.builder() //创建或得到SparkSession
  .appName("SparkSQLDataSource")
  .master("local[*]")
  .getOrCreate()
//加载parquet格式的文件,返回一个DataFrame集合
val usersDF = spark.read.load("hdfs://centos01:9000/users.parquet")
usersDF.show()
// +------+--------------+----------------+
// |  name|favorite_color|favorite_numbers|
// +------+--------------+----------------+
// |Alyssa|            null|  [3, 9, 15, 20]|
// |   Ben|              red|                []|
// +------+--------------+----------------+
 //查询DataFrame中的name列和favorite_color列,并写入HDFS
usersDF.select("name","favorite_color")
  .write.save("hdfs://centos01:9000/result")

除了使用select()方法查询外,也可以使用SparkSession对象的sql()方法执行SQL语句进行查询,该方法的返回结果仍然是一个DataFrame。

//创建临时视图
usersDF.createTempView("t_user")
//执行SQL查询,并将结果写入到HDFS
spark.sql("SELECT name,favorite_color FROM t_user")
  .write.save("hdfs://centos01:9000/result")

2、手动指定数据源

使用format()方法可以手动指定数据源。数据源需要使用完全限定名(例如org.apache.spark.sql.parquet),但对于Spark SQL的内置数据源,也可以使用它们的缩写名(json,parquet,jdbc,orc,libsvm,csv,text)。例如,手动指定csv格式的数据源:

val peopleDFCsv=spark.read.format("csv").load("hdfs://centos01:9000/people.csv")

在指定数据源的同时,可以使用option()方法向指定的数据源传递所需参数。例如,向JDBC数据源传递账号、密码等参数:

val jdbcDF = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://192.168.1.69:3306/spark_db")
  .option("driver","com.mysql.jdbc.Driver")
  .option("dbtable", "student")
  .option("user", "root")
  .option("password", "123456")
  .load()

3、数据写入模式
在写入数据的同时,可以使用mode()方法指定如何处理已经存在的数据,该方法的参数是一个枚举类SaveMode,其取值解析如下:

  • SaveMode.ErrorIfExists:默认值。当向数据源写入一个DataFrame时,如果数据已经存在,则会抛出异常。
  • SaveMode.Append:当向数据源写入一个DataFrame时,如果数据或表已经存在,则会在原有的基础上进行追加。
  • SaveMode.Overwrite:当向数据源写入一个DataFrame时,如果数据或表已经存在,则会将其覆盖(包括数据或表的Schema)。
  • SaveMode.Ignore:当向数据源写入一个DataFrame时,如果数据或表已经存在,则不会写入内容,类似SQL中的“CREATE TABLE IF NOT EXISTS”。

例如,HDFS中有一个JSON格式的文件/people.json,内容:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

现需要查询该文件中的name列,并将结果写入HDFS的/result目录中,若该目录存在则将其覆盖,代码:

val peopleDF = spark.read.format("json").load("hdfs://centos01:9000/people.json")
peopleDF.select("name")
  .write.mode(SaveMode.Overwrite).format("json")
  .save("hdfs://centos01:9000/result")

4、分区自动推断
表分区是Hive等系统中常用的优化查询效率的方法(Spark SQL的表分区与Hive的表分区类似)。在分区表中,数据通常存储在不同的分区目录中,分区目录通常以“分区列名=值”的格式进行命名。例如,以people作为表名,gendercountry作为分区列,存储数据的目录结构如下:

path
└── to
    └── people
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

对于所有内置的数据源(包括Text/CSV/JSON/ORC/Parquet),Spark SQL都能够根据目录名自动发现和推断分区信息。分区示例:

  1. 在本地(或HDFS)新建以下三个目录及文件,其中的目录people代表表名,gendercountry代表分区列,people.json存储实际人口数据:

    D:\people\gender=male\country=CN\people.json
    D:\people\gender=male\country=US\people.json
    D:\people\gender=female\country=CN\people.json

    三个people.json文件的数据分别如下:

    {"name":"zhangsan","age":32}
    {"name":"lisi", "age":30}
    {"name":"wangwu", "age":19}
    {"name":"Michael"}
    {"name":"Jack", "age":20}
    {"name":"Justin", "age":18}
    {"name":"xiaohong","age":17}
    {"name":"xiaohua", "age":22}
    {"name":"huanhuan", "age":16}
  2. 执行以下代码,读取表people的数据并显示:

    val usersDF = spark.read.format("json").load("D:\\people") //读取表数据为一个DataFrame
    usersDF.printSchema() //输出Schema信息
    usersDF.show() //输出表数据

    控制台输出的Schema信息如下:

    root
    |-- age: long (nullable = true)
    |-- name: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- country: string (nullable = true)

    控制台输出的表数据如下:

    +----+--------+------+-------+
    | age|    name|gender|country|
    +----+--------+------+-------+
    |  17|xiaohong|female|     CN|
    |  22| xiaohua|female|     CN|
    |  16|huanhuan|female|     CN|
    |  32|zhangsan|  male|     CN|
    |  30|     lisi|  male|     CN|
    |  19|   wangwu|  male|     CN|
    |null| Michael|  male|     US|
    |  20|     Jack|  male|     US|
    |  18|   Justin|  male|     US|
    +----+--------+------+-------+

从控制台输出的Schema信息和表数据可以看出,Spark SQL在读取数据时,自动推断出了两个分区列gendercountry,并将该两列的值添加到了DataFrame中。

Parquet文件

Apache Parquet是Hadoop生态系统中任何项目都可以使用的列式存储格式,不受数据处理框架、数据模型和编程语言的影响。Spark SQL支持对Parquet文件的读写,并且可以自动保存源数据的Schema。当写入Parquet文件时,为了提高兼容性,所有列都会自动转换为“可为空”状态。
加载和写入Parquet文件时,除了可以使用load()方法和save()方法外,还可以直接使用Spark SQL内置的parquet()方法,例如以下代码:

//读取Parquet文件为一个DataFrame
val usersDF = spark.read.parquet("hdfs://centos01:9000/users.parquet")
//将DataFrame相关数据保存为Parquet文件,包括Schema信息
usersDF.select("name","favorite_color")
  .write.parquet("hdfs://centos01:9000/result")

JSON数据集

Spark SQL可以自动推断JSON文件的Schema,并将其加载为DataFrame。在加载和写入JSON文件时,除了可以使用load()方法和save()方法外,还可以直接使用Spark SQL内置的json()方法。该方法不仅可以读写JSON文件,还可以将Dataset[String]类型的数据集转为DataFrame。

需要注意的是,要想成功的将一个JSON文件加载为DataFrame,JSON文件的每一行必须包含一个独立有效的JSON对象,而不能将一个JSON对象分散在多行。例如以下JSON内容可以被成功加载:

{"name":"zhangsan","age":32}
{"name":"lisi", "age":30}
{"name":"wangwu", "age":19}

使用json()方法加载JSON数据的例子如下代码所示:

//创建或得到SparkSession
val spark = SparkSession.builder()
.appName("SparkSQLDataSource")
.config("spark.sql.parquet.mergeSchema",true)
.master("local[*]")
.getOrCreate()

/****1. 创建用户基本信息表*****/
import spark.implicits._
//创建用户信息Dataset集合
val arr=Array(
 "{'name':'zhangsan','age':20}",
 "{'name':'lisi','age':18}"
)
val userInfo: Dataset[String] = spark.createDataset(arr)
//将Dataset[String]转为DataFrame
val userInfoDF = spark.read.json(userInfo)
//创建临时视图user_info
userInfoDF.createTempView("user_info")
//显示数据
userInfoDF.show()
// +---+--------+
// |age|    name|
// +---+--------+
// | 20|zhangsan|
// | 18|    lisi|
// +---+--------+

/****2. 创建用户成绩表*****/
//读取JSON文件
val userScoreDF = spark.read.json("D:\\people\\people.json")
//创建临时视图user_score
userScoreDF.createTempView("user_score")
userScoreDF.show()
// +--------+-----+
// |    name|score|
// +--------+-----+
// |zhangsan|   98|
// |    lisi|   88|
// |  wangwu|   95|
//      +--------+-----+
/****3. 根据name字段关联查询*****/
val resDF=spark.sql("SELECT i.age,i.name,c.score FROM user_info i " +
                      "JOIN user_score c ON i.name=c.name") 
resDF.show()
// +---+--------+-----+
// |age|    name|score|
// +---+--------+-----+
// | 20|zhangsan|   98|
// | 18|    lisi|   88|
// +---+--------+-----+

Hive 表

Spark SQL还支持读取和写入存储在Apache Hive中的数据。然而,由于Hive有大量依赖项,这些依赖项不包括在默认的Spark发行版中,如果在classpath上配置了这些Hive依赖项,Spark将自动加载它们。需要注意的是,这些Hive依赖项必须出现在所有Worker节点上,因为它们需要访问Hive序列化和反序列化库(SerDes),以便访问存储在Hive中的数据。
使用Spark SQL读取和写入Hive数据:
1、创建SparkSession对象
创建一个SparkSession对象,并开启Hive支持,代码:

val spark = SparkSession
  .builder()
  .appName("Spark Hive Demo")
  .enableHiveSupport()//开启Hive支持
  .getOrCreate()

2、创建Hive表
创建一张Hive表students,并指定字段分隔符为制表符“\t”,代码:

spark.sql("CREATE TABLE IF NOT EXISTS students (name STRING, age INT) " +
  "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'")

3、导入本地数据到Hive表
本地文件/home/hadoop/students.txt的内容如下(字段之间以制表符“\t”分隔):

zhangsan    20
lisi    25
wangwu  19

将本地文件/home/hadoop/students.txt中的数据导入到表students中,代码:

spark.sql("LOAD DATA LOCAL INPATH '/home/hadoop/students.txt'  INTO TABLE students")

4、查询表数据
查询表students的数据并显示到控制台,代码:

spark.sql("SELECT * FROM students").show()

显示结果:

+--------+---+
|    name|age|
+--------+---+
|zhangsan| 20|
|     lisi| 25|
|   wangwu| 19|
+--------+---+

5、创建表的同时指定存储格式
创建一个Hive表hive_records,数据存储格式为Parquet(默认为普通文本格式),代码:

spark.sql("CREATE TABLE hive_records(key STRING, value INT) STORED AS PARQUET")

6、将DataFrame写入Hive表
使用saveAsTable()方法可以将一个DataFrame写入到指定的Hive表中。例如,加载students表的数据并转为DataFrame,然后将DataFrame写入Hive表hive_records中,代码:

//加载students表的数据为DataFrame
val studentsDF = spark.table("students")
//将DataFrame以覆盖的方式写入表hive_records中
studentsDF.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
//查询hive_records表数据并显示到控制
spark.sql("SELECT * FROM hive_records").show()

Spark SQL应用程序写完后,需要提交到Spark集群中运行。若以Hive为数据源,提交之前需要做好Hive数据仓库、元数据库等的配置。

JDBC

Spark SQL还可以使用JDBC API从其他关系型数据库读取数据,返回的结果仍然是一个DataFrame,可以很容易地在Spark SQL中处理,或者与其他数据源进行连接查询。在使用JDBC连接数据库时可以指定相应的连接属性,常用的连接属性如表。

file

使用JDBC API对MySQL表student和表score进行关联查询,代码:

val jdbcDF = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://192.168.1.69:3306/spark_db")
  .option("driver","com.mysql.jdbc.Driver")
  .option("dbtable", "(select st.name,sc.score from student st,score sc " +
    "where st.id=sc.id) t")
  .option("user", "root")
  .option("password", "123456")
  .load()

上述代码中,dbtable属性的值是一个子查询,相当于SQL查询中的FROM关键字后的一部分。除了上述查询方式外,使用query属性编写完整SQL语句进行查询也能达到同样的效果,代码:

val jdbcDF = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://192.168.1.234:3306/spark_db")
  .option("driver","com.mysql.jdbc.Driver")
  .option("query", "select st.name,sc.score from student st,score sc " +
    "where st.id=sc.id")
  .option("user", "root")
  .option("password", "123456")
  .load()

Spark SQL内置函数

Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions中。这些函数主要分为10类:UDF函数、聚合函数、日期函数、排序函数、非聚合函数、数学函数、混杂函数、窗口函数、字符串函数、集合函数,大部分函数与Hive中相同。
使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL语句中使用。例如,以编程的方式使用lower()函数将用户姓名转为小写,代码如下:

//显示DataFrame数据(df指DataFrame对象)
df.show()
// +--------+
// |    name|
// +--------+
// |ZhangSan|
// |    LiSi|
// |  WangWu|
// +--------+
//使用lower()函数将某列转为小写
import org.apache.spark.sql.functions._
df.select(lower(col("name")).as("name")).show()
// +--------+
// |    name|
// +--------+
// |zhangsan|
// |    lisi|
// |  wangwu|
// +--------+

Spark SQL自定义函数

当Spark SQL提供的内置函数不能满足查询需求时,用户也可以根据自己的业务编写自定义函数
(User Defined Functions,UDF),然后在Spark SQL中调用。
Spark SQL提供了一些常用的聚合函数,如count()countDistinct()avg()max()min()等。此外,用户也可以根据自己的业务编写自定义聚合函数(User Defined Aggregate Functions,UDAF)。
UDF主要是针对单个输入,返回单个输出;而UDAF则可以针对多个输入进行聚合计算返回单个输出,功能更加强大。要编写UDAF,需要新建一个类,继承抽象类UserDefinedAggregateFunction,并实现其中未实现的方法。

Spark SQL开窗函数

row_number()开窗函数是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排序的顺序添加一列行号(从1开始),根据行号可以方便的对每一组数据取前N行(分组取TOPN)。row_number()函数的使用格式如下:

row_number() over (partition by 列名 order by 列名 desc) 行号列别名

格式说明:

  • partition by:按照某一列进行分组。
  • order by:分组后按照某一列进行组内排序。
  • desc:降序,默认升序。

Views: 138