Spark SQL 结构化数据处理引擎

什么是Spark SQL

Spark SQL是一个用于结构化数据处理的Spark组件。所谓结构化数据,是指具有Schema信息的数据,例如json、parquet、avro、csv格式的数据。与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口。

Spark SQL的主要特点:

  1. 将SQL查询与Spark应用程序无缝组合
    Spark SQL允许使用SQL在Spark程序中查询结构化数据。与Hive不同的是,Hive是将SQL翻译成MapReduce作业,底层是基于MapReduce;而Spark SQL底层使用的是Spark RDD。例如以下代码,在Spark应用程序中嵌入SQL语句:
  2. 以相同的方式连接到多种数据源
    Spark SQL提供了访问各种数据源的通用方法,数据源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。例如以下代码,读取HDFS中的JSON文件,然后将该文件的内容创建为临时视图,最后与其他表根据指定的字段关联查询:
  3. 在现有的数据仓库上运行SQL或HiveQL查询
    Spark SQL支持HiveQL语法以及Hive SerDes和UDF(用户自定义函数),允许访问现有的Hive仓库。

DataFrame和Dataset

DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为DataFrame,例如:Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等。
DataFrame在RDD的基础上添加了数据描述信息(Schema,即元信息),因此看起来更像是一张数据库表。

file

Spark SQL基本使用

Spark Shell启动时除了默认创建一个名为“sc”的SparkContext的实例外,还创建了一个名为“spark”的SparkSession实例,该“spark”变量也可以在Spark Shell中直接使用。
SparkSession只是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext。SparkSession允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序,支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame数据。
例如,在HDFS中有一个文件/input/person.txt,文件内容:

现需要使用Spark SQL将该文件中的数据按照年龄降序排列,步骤:

  1. 加载数据为Dataset
    调用SparkSession的API read.textFile()可以读取指定路径中的文件内容,并加载为一个Dataset:

    从变量d1的类型可以看出,textFile()方法将读取的数据转为了Dataset。除了textFile()方法读取文本内容外,还可以使用csv()、jdbc()、json()等方法读取csv文件、jdbc数据源、json文件等数据。调用Dataset中的show()方法可以输出Dataset中的数据内容。查看d1中的数据内容:

    从上述内容可以看出,Dataset将文件中的每一行看做一个元素,并且所有元素组成了一列,列名默认为“value”。

    如果Spark报错ClassNotFoundException: Class com.hadoop.compression.lzo.Lzo说明Hadoop支持Lzo压缩但是Spark没有配置支持压缩相关的类库,可以修改spark-defaults.conf并添加两行:
    spark.driver.extraClassPath /opt/pkg/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar spark.executor.extraClassPath /opt/pkg/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar

  2. 给Dataset添加元数据信息
    定义一个样例类Person,用于存放数据描述信息(Schema):

    导入SparkSession的隐式转换,以便后续可以使用Dataset的算子:

    调用Dataset的map()算子将每一个元素拆分并存入Person类中:

    此时查看personDataset中的数据内容,personDataset中的数据类似于一张关系型数据库的表:

  3. 将Dataset转为DataFrame
    Spark SQL查询的是DataFrame中的数据,因此需要将存有元数据信息的Dataset转为DataFrame。
    调用Dataset的toDF()方法,将存有元数据的Dataset转为DataFrame,代码:

  4. 执行SQL查询
    在DataFrame上创建一个临时视图“v_person”,代码:

    使用SparkSession对象执行SQL查询,代码:

    调用show()方法输出结果数据,代码:

    可以看到,结果数据已按照age字段降序排列。

  5. 指定导出文件格式

    查看导出的文件(有几个文件说明有几个分区):

  6. 分区设置
    Spark中使用SparkSql进行shuffle操作,默认分区数是200个;参数配置是--conf spark.sql.shuffle.partitions, 如果想要修改SparkSQL执行shuffle操作时分区数:

    1. 配置 spark.sql.shuffle.partitions,适用场景spark.sql()合并分区

      这样配置后,通过spark.sql()执行后写出的数据分区数就是你要求的个数,如这里5。

    2. 配置 coalesce(n),适用场景spark写出数据到指定路径下合并分区,不会引起shuffle

    3. 配置repartition(n), 重新分区,会引发shuffle

  7. 分区分桶导出

    1. 并行写出之 partitionBy() 指定分区列 , 会根据分区列创建子文件夹,并行写出数据
    2. 并行写出之 repartition() ,一般spark中有几个分区就会有几个并行的IO写出
    3. 分桶写出,好处是后续读入的时候数据就不会做shuffle了,因为相同分桶的数据会被划分到同一个物理分区中

      Spark IO相关API请参考:Spark官方API文档Input and Output

Spark SQL数据源

Spark SQL支持通过DataFrame接口对各种数据源进行操作。DataFrame可以使用相关转换算子进行操作,也可以用于创建临时视图。将DataFrame注册为临时视图可以对其中的数据使用SQL查询。
Spark SQL提供了两个常用的加载数据和写入数据的方法:load()方法和save()方法。load()方法可以加载外部数据源为一个DataFrame,save()方法可以将一个DataFrame写入到指定的数据源。

1、默认数据源
默认情况下,load()方法和save()方法只支持Parquet格式的文件,也可以在配置文件中通过参数spark.sql.sources.default对默认文件格式进行更改。
Spark SQL可以很容易的读取Parquet文件并将其数据转为DataFrame数据集。例如,读取HDFS中的文件/users.parquet,并将其中的name列与favorite_color列写入HDFS的/result目录,代码:

除了使用select()方法查询外,也可以使用SparkSession对象的sql()方法执行SQL语句进行查询,该方法的返回结果仍然是一个DataFrame。

2、手动指定数据源

使用format()方法可以手动指定数据源。数据源需要使用完全限定名(例如org.apache.spark.sql.parquet),但对于Spark SQL的内置数据源,也可以使用它们的缩写名(json,parquet,jdbc,orc,libsvm,csv,text)。例如,手动指定csv格式的数据源:

在指定数据源的同时,可以使用option()方法向指定的数据源传递所需参数。例如,向JDBC数据源传递账号、密码等参数:

3、数据写入模式
在写入数据的同时,可以使用mode()方法指定如何处理已经存在的数据,该方法的参数是一个枚举类SaveMode,其取值解析如下:

  • SaveMode.ErrorIfExists:默认值。当向数据源写入一个DataFrame时,如果数据已经存在,则会抛出异常。
  • SaveMode.Append:当向数据源写入一个DataFrame时,如果数据或表已经存在,则会在原有的基础上进行追加。
  • SaveMode.Overwrite:当向数据源写入一个DataFrame时,如果数据或表已经存在,则会将其覆盖(包括数据或表的Schema)。
  • SaveMode.Ignore:当向数据源写入一个DataFrame时,如果数据或表已经存在,则不会写入内容,类似SQL中的“CREATE TABLE IF NOT EXISTS”。

例如,HDFS中有一个JSON格式的文件/people.json,内容:

现需要查询该文件中的name列,并将结果写入HDFS的/result目录中,若该目录存在则将其覆盖,代码:

4、分区自动推断
表分区是Hive等系统中常用的优化查询效率的方法(Spark SQL的表分区与Hive的表分区类似)。在分区表中,数据通常存储在不同的分区目录中,分区目录通常以“分区列名=值”的格式进行命名。例如,以people作为表名,gendercountry作为分区列,存储数据的目录结构如下:

对于所有内置的数据源(包括Text/CSV/JSON/ORC/Parquet),Spark SQL都能够根据目录名自动发现和推断分区信息。分区示例:

  1. 在本地(或HDFS)新建以下三个目录及文件,其中的目录people代表表名,gendercountry代表分区列,people.json存储实际人口数据:

    三个people.json文件的数据分别如下:

  2. 执行以下代码,读取表people的数据并显示:

    控制台输出的Schema信息如下:

    控制台输出的表数据如下:

从控制台输出的Schema信息和表数据可以看出,Spark SQL在读取数据时,自动推断出了两个分区列gendercountry,并将该两列的值添加到了DataFrame中。

Parquet文件

Apache Parquet是Hadoop生态系统中任何项目都可以使用的列式存储格式,不受数据处理框架、数据模型和编程语言的影响。Spark SQL支持对Parquet文件的读写,并且可以自动保存源数据的Schema。当写入Parquet文件时,为了提高兼容性,所有列都会自动转换为“可为空”状态。
加载和写入Parquet文件时,除了可以使用load()方法和save()方法外,还可以直接使用Spark SQL内置的parquet()方法,例如以下代码:

JSON数据集

Spark SQL可以自动推断JSON文件的Schema,并将其加载为DataFrame。在加载和写入JSON文件时,除了可以使用load()方法和save()方法外,还可以直接使用Spark SQL内置的json()方法。该方法不仅可以读写JSON文件,还可以将Dataset[String]类型的数据集转为DataFrame。

需要注意的是,要想成功的将一个JSON文件加载为DataFrame,JSON文件的每一行必须包含一个独立有效的JSON对象,而不能将一个JSON对象分散在多行。例如以下JSON内容可以被成功加载:

使用json()方法加载JSON数据的例子如下代码所示:

Hive 表

Spark SQL还支持读取和写入存储在Apache Hive中的数据。然而,由于Hive有大量依赖项,这些依赖项不包括在默认的Spark发行版中,如果在classpath上配置了这些Hive依赖项,Spark将自动加载它们。需要注意的是,这些Hive依赖项必须出现在所有Worker节点上,因为它们需要访问Hive序列化和反序列化库(SerDes),以便访问存储在Hive中的数据。
使用Spark SQL读取和写入Hive数据:
1、创建SparkSession对象
创建一个SparkSession对象,并开启Hive支持,代码:

2、创建Hive表
创建一张Hive表students,并指定字段分隔符为制表符“\t”,代码:

3、导入本地数据到Hive表
本地文件/home/hadoop/students.txt的内容如下(字段之间以制表符“\t”分隔):

将本地文件/home/hadoop/students.txt中的数据导入到表students中,代码:

4、查询表数据
查询表students的数据并显示到控制台,代码:

显示结果:

5、创建表的同时指定存储格式
创建一个Hive表hive_records,数据存储格式为Parquet(默认为普通文本格式),代码:

6、将DataFrame写入Hive表
使用saveAsTable()方法可以将一个DataFrame写入到指定的Hive表中。例如,加载students表的数据并转为DataFrame,然后将DataFrame写入Hive表hive_records中,代码:

Spark SQL应用程序写完后,需要提交到Spark集群中运行。若以Hive为数据源,提交之前需要做好Hive数据仓库、元数据库等的配置。

JDBC

Spark SQL还可以使用JDBC API从其他关系型数据库读取数据,返回的结果仍然是一个DataFrame,可以很容易地在Spark SQL中处理,或者与其他数据源进行连接查询。在使用JDBC连接数据库时可以指定相应的连接属性,常用的连接属性如表。

file

使用JDBC API对MySQL表student和表score进行关联查询,代码:

上述代码中,dbtable属性的值是一个子查询,相当于SQL查询中的FROM关键字后的一部分。除了上述查询方式外,使用query属性编写完整SQL语句进行查询也能达到同样的效果,代码:

Spark SQL内置函数

Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions中。这些函数主要分为10类:UDF函数、聚合函数、日期函数、排序函数、非聚合函数、数学函数、混杂函数、窗口函数、字符串函数、集合函数,大部分函数与Hive中相同。
使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL语句中使用。例如,以编程的方式使用lower()函数将用户姓名转为小写,代码如下:

Spark SQL自定义函数

当Spark SQL提供的内置函数不能满足查询需求时,用户也可以根据自己的业务编写自定义函数
(User Defined Functions,UDF),然后在Spark SQL中调用。
Spark SQL提供了一些常用的聚合函数,如count()countDistinct()avg()max()min()等。此外,用户也可以根据自己的业务编写自定义聚合函数(User Defined Aggregate Functions,UDAF)。
UDF主要是针对单个输入,返回单个输出;而UDAF则可以针对多个输入进行聚合计算返回单个输出,功能更加强大。要编写UDAF,需要新建一个类,继承抽象类UserDefinedAggregateFunction,并实现其中未实现的方法。

Spark SQL开窗函数

row_number()开窗函数是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排序的顺序添加一列行号(从1开始),根据行号可以方便的对每一组数据取前N行(分组取TOPN)。row_number()函数的使用格式如下:

格式说明:

  • partition by:按照某一列进行分组。
  • order by:分组后按照某一列进行组内排序。
  • desc:降序,默认升序。

Views: 138

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注