Spark Streaming 实时流处理引擎

01 什么是Spark Streaming

Spark Streaming是Spark Core API(Spark RDD)的扩展,支持对实时数据流进行可伸缩、高吞吐量、容错处理。数据可以从Kafka、Flume、Kinesis或TCP Socket等多种来源获取,并且可以使用复杂的算法处理数据,这些算法由map()、reduce()、join()和window()等高级函数表示。处理后的数据可以推送到文件系统、数据库等存储系统。事实上,可以将Spark的机器学习和图形处理算法应用于数据流。

image-20220501224946569

Spark Streaming的主要优点如下:

易于使用。Spark Streaming提供了很多高级操作算子,允许以编写批处理作业的方式编写流式作业。它支持Java、Scala和Python语言。

易于与Spark体系整合。通过在Spark Core上运行Spark Streaming,可以在Spark Streaming中使用与Spark RDD相同的代码进行批处理,构建强大的交互应用程序,而不仅仅是数据分析。

02 Spark Streaming工作原理

Spark Streaming接收实时输入的数据流,并将数据流以时间片(秒级)为单位拆分成批次,然后将每个批次交给Spark引擎(Spark Core)进行处理,最终生成以批次组成的结果数据流。

image-20220501225106219

Spark Streaming提供了一种高级抽象,称为DStream(Discretized Stream)。DStream表示一个连续不断的数据流,它可以从Kafka、Flume和Kinesis等数据源的输入数据流创建,也可以通过对其他DStream应用高级函数(例如map()、reduce()、join()和window())进行转换创建。在内部,对输入数据流拆分成的每个批次实际上是一个RDD,一个DStream则由多个RDD组成,相当于一个RDD序列

image-20220501225117153

DStream中的每个RDD都包含来自特定时间间隔的数据。

image-20220501225241853

应用于DStream上的任何操作实际上都是对底层RDD的操作。例如,对一个DStream应用flatMap()算子操作,实际上是对DStream中每个时间段的RDD都执行一次flatMap()算子,生成对应时间段的新RDD,所有的新RDD组成了一个新Dstream。

image-20220501225257562

03 输入DStream和Receiver

输入DStream表示从数据源接收的输入数据流,每个输入DStream(除了文件数据流之外)都与一个Receiver对象相关联,该对象接收来自数据源的数据并将其存储在Spark的内存中进行处理。

如果希望在Spark Streaming应用程序中并行接收多个数据流,可以创建多个输入DStream,同时将创建多个Receiver,接收多个数据流。但需要注意的是,一个Spark Streaming应用程序的Executor是一个长时间运行的任务,它会占用分配给Spark Streaming应用程序的一个CPU内核(占用Spark Streaming应用程序所在节点的一个CPU内核),因此Spark Streaming应用程序需要分配足够的内核(如果在本地运行,则是线程)来处理接收到的数据,并运行Receiver。

在本地运行Spark Streaming应用程序时,不要使用“local”或“local[1]”作为主URL。这两种方式都意味着只有一个线程将用于本地运行任务。如果正在使用基于Receiver的输入DStream(例如Socket、Kafka、Flume等),那么将使用单线程运行Receiver,导致没有多余的线程来处理接收到的数据(Spark Streaming至少需要两个线程,一个线程用于运行Receiver接收数据,一个线程用于处理接收到的数据)。因此,在本地运行时,应该使用“local[n]”作为主URL,其中n>Receiver的数量(若Spark Streaming应用程序只创建了一个DStream,则只有一个Receiver,n的最小值为2)。

每个Spark应用程序都有各自独立的一个或多个Executor进程负责执行任务。将Spark Streaming应用程序发布到集群上运行时,每个Executor进程所分配的CPU内核数量必须大于Receiver的数量,因为1个Receiver独占1个CPU内核,还需要至少1个CPU内核进行数据的处理,这样才能保证至少两个线程同时进行(一个线程用于运行Receiver接收数据,一个线程用于处理接收到的数据)。否则系统将接收数据,但无法进行处理。若Spark Streaming应用程序只创建了一个DStream,则只有一个Receiver,Executor所分配的CPU内核数量的最小值为2。

04 第一个Spark Streaming程序

假设需要监听TCP Socket端口的数据,实时计算接收到的文本数据中的单词数,步骤如下:

  1. 导入相应类

导入Spark Streaming所需的类和StreamingContext中的隐式转换,代码如下:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
  1. 创建StreamingContext

StreamingContext是所有数据流操作的上下文,在进行数据流操作之前需要先创建该对象。例如,创建一个本地StreamingContext对象,使用两个执行线程,批处理间隔为1秒(每隔1秒获取一次数据,生成一个RDD),代码如下:

val conf = new SparkConf()
  .setMaster("local[2]")
  .setAppName("NetworkWordCount")

//按照时间间隔为1秒钟切分数据流
val ssc = new StreamingContext(conf, Seconds(1))
  1. 创建DStream

使用StreamingContext可以创建一个输入DStream,它表示来自TCP源的流数据。例如,从主机名为localhost、端口为9999的TCP源获取数据,代码如下:

val lines = ssc.socketTextStream("localhost", 9999)

上述代码中的lines是一个输入DStream,表示从服务器接收的数据流。lines中的每条记录都是一行文本。

  1. 操作DStream

DStream创建成功后,可以对DStream应用算子操作,生成新的DStream,类似对RDD的操作。例如,按空格字符将每一行文本分割为单词,代码如下:

val words = lines.flatMap(_.split(" "))

在本例中,lines的每一行将被分成多个单词,单词组成的数据流则为一个新的DStream,使用words表示。接下来需要统计单词数量,代码如下:

import org.apache.spark.streaming.StreamingContext._ //计算每一批次中的每一个单词数量
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _) 
wordCounts.print() //默认打印前10个元素(DStream中的RDD)到控制台
  1. 启动Spark Streaming

在DStream的创建与转换代码编写完毕后,需要启动Spark Streaming才能真正的开始计算,因此需要在最后添加以下代码:

//开始计算
ssc.start()      
//等待计算结束
ssc.awaitTermination() 

到此,一个简单的单词计数例子就完成了。

05 Spark Streaming数据源

Spark Streaming提供了两种内置的数据源支持:基本数据源和高级数据源。基本数据源是指文件系统、Socket连接等;高级数据源是指Kafka、Flume、Kinesis等数据源。

  1. 文件流

对于从任何与HDFS API兼容的文件系统上的文件中读取数据,可以通过以下方式创建DStream:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
  1. Socket流

通过监听Socket端口接收数据,例如以下代码,从本地的9999端口接收数据:

//创建一个本地StreamingContext对象,使用两个执行线程,批处理间隔为1秒
val conf = new SparkConf()
  .setMaster("local[2]")
  .setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))
//连接localhost:9999获取数据,转为DStream
val lines = ssc.socketTextStream("localhost", 9999)
  1. RDD队列流

使用streamingContext.queueStream(queueOfRDDs)可以基于RDD队列创建DStream。推入队列的每个RDD将被视为DStream中的一批数据,并像流一样进行处理。这种方式常用于测试Spark Streaming应用程序。

下面以Kafka数据源为例,介绍高级数据源的使用。

首先需要在Maven工程中引入Spark Streaming的API依赖库:



  org.apache.spark
  spark-core_2.11
  2.4.0




  org.apache.spark
  spark-streaming_2.11
  2.4.0

然后引入Spark Streaming针对Kafka的第三方依赖库(针对Kafka 0.10版本):


  org.apache.spark
   spark-streaming-kafka-0-10_2.11
  2.4.0

引入所需库后,可以在Spark Streaming应用程序中使用以下格式代码创建输入DStream:

import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext, 
   [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

06 DStream操作

与RDD类似,许多普通RDD上可用的操作算子DStream也支持。使用这些算子可以修改输入DStream中的数据,进而创建一个新的DStream。对DStream的操作主要有三种:无状态操作、状态操作、窗口操作。

1、无状态操作

无状态操作指的是,每次都只计算当前时间批次的内容,处理结果不依赖于之前批次的数据,例如每次只计算最近1秒钟时间批次产生的数据。常用的DStream无状态操作算子如表所示。

image-20220501230342715

2、状态操作

状态操作是指,需要把当前时间批次和历史时间批次的数据进行累加计算,即当前时间批次的处理需要使用之前批次的数据或中间结果。使用updateStateByKey()算子可以保留key的状态,并持续不断地用新状态更新之前的状态。使用该算子可以返回一个新的“有状态的”DStream,其中通过对每个key的前一个状态和新状态应用给定的函数来更新每个key的当前状态。

例如,对数据流中的实时单词进行计数,每当接收到新的单词,需要将当前单词数量累加到之前批次的结果中。这里单词的数量就是状态,对单词数量的更新就是状态的更新。定义状态更新函数,实现按批次累加单词数量的代码如下:

/**
 * 定义状态更新函数,按批次累加单词数量
 * @param values 当前批次某个单词的出现次数,相当于Seq(1,1,1)
 * @param state 某个单词上一批次累加的结果,因为可能没有值,所以用Option类型
 */
val updateFunc=(values:Seq[Int],state:Option[Int])=>{

  //累加当前批次某个单词的数量
  val currentCount=values.foldLeft(0)(+) 

  //获取上一批次某个单词的数量,默认值0
  val previousCount= state.getOrElse(0)

  //求和。使用Some表示一定有值,不为None
  Some(currentCount+previousCount)
}

将updateFunc函数作为参数传入updateStateByKey()算子即可对DStream中的单词按批次累加,代码如下:

//更新状态,按批次累加
val result:DStream[(String,Int)]= wordCounts.updateStateByKey(updateFunc)
//默认打印DStream中每个RDD中的前10个元素到控制台
result.print()

3、窗口操作

Spark Streaming提供了窗口计算,允许在滑动窗口(某个时间段内的数据)上进行操作。当窗口在DStream上滑动时,位于窗口内的RDD就会被组合起来,并对其进行操作。

假设批处理时间间隔为1秒,现需要每隔2秒对过去3秒的数据进行计算,此时就需要使用滑动窗口计算,计算过程如图所示(相当于一个窗口在DStream上滑动)。

image-20220501231103471

任何窗口计算都需要指定以下两个参数:

窗口长度:窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数。

滑动时间间隔:前一个窗口滑动到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数。

Spark针对窗口计算提供了相应的算子。例如,每隔10秒计算最后30秒的单词数量,代码如下:

val windowedWordCounts = pairsDStream.reduceByKeyAndWindow(
  (a: Int, b: Int) => (a + b),
  Seconds(30),
  Seconds(10)
)

一些常用的窗口操作算子如表。

image-20220501231232615

4、输出操作

输出操作允许将DStream的数据输出到外部系统,如数据库或文件系统。输出操作触发所有DStream转换操作的实际执行,类似于RDD的行动算子。Spark Streaming定义的输出操作如表。

image-20220501231333840

foreachRDD(func)是一个功能强大的算子,它允许将数据发送到外部系统。理解如何正确有效地使用这个算子非常重要。

5、缓存及持久化

与RDD类似,DStream也允许将流数据持久化到内存中。也就是说,在DStream上使用persist()方法可以将该DStream的每个RDD持久化到内存中。这在DStream中的数据需要被计算多次(例如,对同一数据进行多次操作)时非常有用。对于基于窗口的操作,如reduceByWindow()、reduceByKeyAndWindow(),以及基于状态的操作,如updateStateByKey(),这些都默认开启了persist()。因此,基于窗口操作生成的DStream将自动持久化到内存中,而不需要手动调用persist()。

对于通过网络接收的输入流(如Kafka、Flume、Socket等),默认的持久化存储级别被设置为将数据复制到两个节点,以便容错。

6、检查点

Spark Streaming应用程序必须全天候运行,因此与应用程序逻辑无关的故障(例如,系统故障、JVM崩溃等)不应该对其产生影响。为此,Spark Streaming需要对足够的数据设置检查点,存储到容错系统中,使其能够从故障中恢复。Spark Streaming有两种类型的检查点:元数据检查点、数据检查点。

元数据检查点:元数据主要指配置信息、DStream操作、未完成的批次。

数据检查点:将生成的RDD保存到可靠的存储系统(例如HDFS)中。为了避免系统恢复时间的无限增长,将有状态转换的中间RDD定期存储到可靠系统中,以切断依赖链。

Views: 152

Spark SQL 结构化数据处理引擎

什么是Spark SQL

Spark SQL是一个用于结构化数据处理的Spark组件。所谓结构化数据,是指具有Schema信息的数据,例如json、parquet、avro、csv格式的数据。与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口。

Spark SQL的主要特点:

  1. 将SQL查询与Spark应用程序无缝组合
    Spark SQL允许使用SQL在Spark程序中查询结构化数据。与Hive不同的是,Hive是将SQL翻译成MapReduce作业,底层是基于MapReduce;而Spark SQL底层使用的是Spark RDD。例如以下代码,在Spark应用程序中嵌入SQL语句:

    results = spark.sql( "SELECT * FROM people")
  2. 以相同的方式连接到多种数据源
    Spark SQL提供了访问各种数据源的通用方法,数据源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。例如以下代码,读取HDFS中的JSON文件,然后将该文件的内容创建为临时视图,最后与其他表根据指定的字段关联查询:

    //读取JSON文件
    val userScoreDF = spark.read.json("hdfs://centos01:9000/people.json")
    //创建临时视图user_score
    userScoreDF.createTempView("user_score")
    //根据name关联查询
    val resDF=spark.sql("SELECT i.age,i.name,c.score FROM user_info i " +
                    "JOIN user_score c ON i.name=c.name")
  3. 在现有的数据仓库上运行SQL或HiveQL查询
    Spark SQL支持HiveQL语法以及Hive SerDes和UDF(用户自定义函数),允许访问现有的Hive仓库。

DataFrame和Dataset

DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为DataFrame,例如:Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等。
DataFrame在RDD的基础上添加了数据描述信息(Schema,即元信息),因此看起来更像是一张数据库表。

file

Spark SQL基本使用

Spark Shell启动时除了默认创建一个名为“sc”的SparkContext的实例外,还创建了一个名为“spark”的SparkSession实例,该“spark”变量也可以在Spark Shell中直接使用。
SparkSession只是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext。SparkSession允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序,支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame数据。
例如,在HDFS中有一个文件/input/person.txt,文件内容:

1,zhangsan,25
2,lisi,22
3,wangwu,30

现需要使用Spark SQL将该文件中的数据按照年龄降序排列,步骤:

  1. 加载数据为Dataset
    调用SparkSession的API read.textFile()可以读取指定路径中的文件内容,并加载为一个Dataset:

    $ spark-shell --master yarn
    scala> val d1=spark.read.textFile("hdfs://centos01:9000/input/person.txt")
    d1: org.apache.spark.sql.Dataset[String] = [value: string]

    从变量d1的类型可以看出,textFile()方法将读取的数据转为了Dataset。除了textFile()方法读取文本内容外,还可以使用csv()、jdbc()、json()等方法读取csv文件、jdbc数据源、json文件等数据。调用Dataset中的show()方法可以输出Dataset中的数据内容。查看d1中的数据内容:

    scala> d1.show()
    +-------------+
    |        value|
    +-------------+
    |1,zhangsan,25|
    |2,lisi,22|
    |3,wangwu,30|
    +-------------+

    从上述内容可以看出,Dataset将文件中的每一行看做一个元素,并且所有元素组成了一列,列名默认为“value”。

    如果Spark报错ClassNotFoundException: Class com.hadoop.compression.lzo.Lzo说明Hadoop支持Lzo压缩但是Spark没有配置支持压缩相关的类库,可以修改spark-defaults.conf并添加两行:
    spark.driver.extraClassPath /opt/pkg/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar spark.executor.extraClassPath /opt/pkg/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar

  2. 给Dataset添加元数据信息
    定义一个样例类Person,用于存放数据描述信息(Schema):

    scala> case class Person(id:Int,name:String,age:Int)

    导入SparkSession的隐式转换,以便后续可以使用Dataset的算子:

    scala> import spark.implicits._

    调用Dataset的map()算子将每一个元素拆分并存入Person类中:

    scala> val personDataset=d1.map(line=>{
         | val fields = line.split(",")
         | val id = fields(0).toInt
         | val name = fields(1)
         | val age = fields(2).toInt
         | Person(id, name, age)
         | })

    此时查看personDataset中的数据内容,personDataset中的数据类似于一张关系型数据库的表:

    scala> personDataset.show()
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 25|
    |  2|    lisi| 22|
    |  3|  wangwu| 30|
    +---+--------+---+
  3. 将Dataset转为DataFrame
    Spark SQL查询的是DataFrame中的数据,因此需要将存有元数据信息的Dataset转为DataFrame。
    调用Dataset的toDF()方法,将存有元数据的Dataset转为DataFrame,代码:

    scala> val pdf = personDataset.toDF()
    pdf: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
  4. 执行SQL查询
    在DataFrame上创建一个临时视图“v_person”,代码:

    scala> pdf.createTempView("v_person")

    使用SparkSession对象执行SQL查询,代码:

    scala> val result = spark.sql("select * from v_person order by age desc")
    result: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

    调用show()方法输出结果数据,代码:

    scala> result.show()
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  3|  wangwu| 30|
    |  1|zhangsan| 25|
    |  2|    lisi | 22|
    +---+--------+---+

    可以看到,结果数据已按照age字段降序排列。

  5. 指定导出文件格式

    # 导出json格式文件
    scala> result.write.format("json").save("/output/json")
    # 导出scv格式文件
    scala> result.write.format("csv").save("/output/csv")
    # 导出orc格式文件
    scala> result.write.format("orc").save("/output/orc")
    # 导出parquet格式文件
    scala> result.write.format("parquet").save("/output/parquet")

    查看导出的文件(有几个文件说明有几个分区):

    $ hadoop fs -cat /output/json/part-*
    {"id":3,"name":"wangwu","age":30}
    {"id":1,"name":"zhangsan","age":25}
    {"id":2,"name":"lisi","age":22}
    
    $ hadoop fs -cat /output/csv/part-*
    3,wangwu,30
    1,zhangsan,25
    2,lisi,22
    
    $ hadoop fs -cat /output/orc/part-*
    ORC...(二进制乱码)
    
    [hadoop@hadoop100 conf]$ hadoop fs -cat /output/parquet/part-*
    PAR1...(二进制乱码)
  6. 分区设置
    Spark中使用SparkSql进行shuffle操作,默认分区数是200个;参数配置是--conf spark.sql.shuffle.partitions, 如果想要修改SparkSQL执行shuffle操作时分区数:

    1. 配置 spark.sql.shuffle.partitions,适用场景spark.sql()合并分区

      spark.conf.set("spark.sql.shuffle.partitions", 5) #后面的数字是你希望的分区数

      这样配置后,通过spark.sql()执行后写出的数据分区数就是你要求的个数,如这里5。

    2. 配置 coalesce(n),适用场景spark写出数据到指定路径下合并分区,不会引起shuffle

      df = spark.sql(sql_string).coalesce(1) #合并分区数
      df.write.format("csv")
      .mode("overwrite")
      .option("sep", ",")
      .option("header", True)
      .save(hdfs_path)
    3. 配置repartition(n), 重新分区,会引发shuffle

      df = spark.sql(sql_string).repartition(1) #重新分区,会引发全局shuffle
      df.write.format("csv")
      .mode("overwrite")
      .option("sep", ",")
      .option("header", True)
      .save(hdfs_path)
  7. 分区分桶导出

    1. 并行写出之 partitionBy() 指定分区列 , 会根据分区列创建子文件夹,并行写出数据
      df.write.mode("overwrite")
      .partitionBy("day")
      .save("/tmp/partitioned-files.parquet")
    2. 并行写出之 repartition() ,一般spark中有几个分区就会有几个并行的IO写出
      df.repartition(5)
      .write.format("csv")
      .save("/tmp/multiple.csv")
    3. 分桶写出,好处是后续读入的时候数据就不会做shuffle了,因为相同分桶的数据会被划分到同一个物理分区中
      csvFile.write.format("parquet")
      .mode("overwrite")
      .bucketBy(5, "gmv") #第一个参数:分成几个桶,第二个参数:按哪列进行分桶
      .saveAsTable("bucketedFiles")

      Spark IO相关API请参考:Spark官方API文档Input and Output

Spark SQL数据源

Spark SQL支持通过DataFrame接口对各种数据源进行操作。DataFrame可以使用相关转换算子进行操作,也可以用于创建临时视图。将DataFrame注册为临时视图可以对其中的数据使用SQL查询。
Spark SQL提供了两个常用的加载数据和写入数据的方法:load()方法和save()方法。load()方法可以加载外部数据源为一个DataFrame,save()方法可以将一个DataFrame写入到指定的数据源。

1、默认数据源
默认情况下,load()方法和save()方法只支持Parquet格式的文件,也可以在配置文件中通过参数spark.sql.sources.default对默认文件格式进行更改。
Spark SQL可以很容易的读取Parquet文件并将其数据转为DataFrame数据集。例如,读取HDFS中的文件/users.parquet,并将其中的name列与favorite_color列写入HDFS的/result目录,代码:

val spark = SparkSession.builder() //创建或得到SparkSession
  .appName("SparkSQLDataSource")
  .master("local[*]")
  .getOrCreate()
//加载parquet格式的文件,返回一个DataFrame集合
val usersDF = spark.read.load("hdfs://centos01:9000/users.parquet")
usersDF.show()
// +------+--------------+----------------+
// |  name|favorite_color|favorite_numbers|
// +------+--------------+----------------+
// |Alyssa|            null|  [3, 9, 15, 20]|
// |   Ben|              red|                []|
// +------+--------------+----------------+
 //查询DataFrame中的name列和favorite_color列,并写入HDFS
usersDF.select("name","favorite_color")
  .write.save("hdfs://centos01:9000/result")

除了使用select()方法查询外,也可以使用SparkSession对象的sql()方法执行SQL语句进行查询,该方法的返回结果仍然是一个DataFrame。

//创建临时视图
usersDF.createTempView("t_user")
//执行SQL查询,并将结果写入到HDFS
spark.sql("SELECT name,favorite_color FROM t_user")
  .write.save("hdfs://centos01:9000/result")

2、手动指定数据源

使用format()方法可以手动指定数据源。数据源需要使用完全限定名(例如org.apache.spark.sql.parquet),但对于Spark SQL的内置数据源,也可以使用它们的缩写名(json,parquet,jdbc,orc,libsvm,csv,text)。例如,手动指定csv格式的数据源:

val peopleDFCsv=spark.read.format("csv").load("hdfs://centos01:9000/people.csv")

在指定数据源的同时,可以使用option()方法向指定的数据源传递所需参数。例如,向JDBC数据源传递账号、密码等参数:

val jdbcDF = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://192.168.1.69:3306/spark_db")
  .option("driver","com.mysql.jdbc.Driver")
  .option("dbtable", "student")
  .option("user", "root")
  .option("password", "123456")
  .load()

3、数据写入模式
在写入数据的同时,可以使用mode()方法指定如何处理已经存在的数据,该方法的参数是一个枚举类SaveMode,其取值解析如下:

  • SaveMode.ErrorIfExists:默认值。当向数据源写入一个DataFrame时,如果数据已经存在,则会抛出异常。
  • SaveMode.Append:当向数据源写入一个DataFrame时,如果数据或表已经存在,则会在原有的基础上进行追加。
  • SaveMode.Overwrite:当向数据源写入一个DataFrame时,如果数据或表已经存在,则会将其覆盖(包括数据或表的Schema)。
  • SaveMode.Ignore:当向数据源写入一个DataFrame时,如果数据或表已经存在,则不会写入内容,类似SQL中的“CREATE TABLE IF NOT EXISTS”。

例如,HDFS中有一个JSON格式的文件/people.json,内容:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

现需要查询该文件中的name列,并将结果写入HDFS的/result目录中,若该目录存在则将其覆盖,代码:

val peopleDF = spark.read.format("json").load("hdfs://centos01:9000/people.json")
peopleDF.select("name")
  .write.mode(SaveMode.Overwrite).format("json")
  .save("hdfs://centos01:9000/result")

4、分区自动推断
表分区是Hive等系统中常用的优化查询效率的方法(Spark SQL的表分区与Hive的表分区类似)。在分区表中,数据通常存储在不同的分区目录中,分区目录通常以“分区列名=值”的格式进行命名。例如,以people作为表名,gendercountry作为分区列,存储数据的目录结构如下:

path
└── to
    └── people
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

对于所有内置的数据源(包括Text/CSV/JSON/ORC/Parquet),Spark SQL都能够根据目录名自动发现和推断分区信息。分区示例:

  1. 在本地(或HDFS)新建以下三个目录及文件,其中的目录people代表表名,gendercountry代表分区列,people.json存储实际人口数据:

    D:\people\gender=male\country=CN\people.json
    D:\people\gender=male\country=US\people.json
    D:\people\gender=female\country=CN\people.json

    三个people.json文件的数据分别如下:

    {"name":"zhangsan","age":32}
    {"name":"lisi", "age":30}
    {"name":"wangwu", "age":19}
    {"name":"Michael"}
    {"name":"Jack", "age":20}
    {"name":"Justin", "age":18}
    {"name":"xiaohong","age":17}
    {"name":"xiaohua", "age":22}
    {"name":"huanhuan", "age":16}
  2. 执行以下代码,读取表people的数据并显示:

    val usersDF = spark.read.format("json").load("D:\\people") //读取表数据为一个DataFrame
    usersDF.printSchema() //输出Schema信息
    usersDF.show() //输出表数据

    控制台输出的Schema信息如下:

    root
    |-- age: long (nullable = true)
    |-- name: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- country: string (nullable = true)

    控制台输出的表数据如下:

    +----+--------+------+-------+
    | age|    name|gender|country|
    +----+--------+------+-------+
    |  17|xiaohong|female|     CN|
    |  22| xiaohua|female|     CN|
    |  16|huanhuan|female|     CN|
    |  32|zhangsan|  male|     CN|
    |  30|     lisi|  male|     CN|
    |  19|   wangwu|  male|     CN|
    |null| Michael|  male|     US|
    |  20|     Jack|  male|     US|
    |  18|   Justin|  male|     US|
    +----+--------+------+-------+

从控制台输出的Schema信息和表数据可以看出,Spark SQL在读取数据时,自动推断出了两个分区列gendercountry,并将该两列的值添加到了DataFrame中。

Parquet文件

Apache Parquet是Hadoop生态系统中任何项目都可以使用的列式存储格式,不受数据处理框架、数据模型和编程语言的影响。Spark SQL支持对Parquet文件的读写,并且可以自动保存源数据的Schema。当写入Parquet文件时,为了提高兼容性,所有列都会自动转换为“可为空”状态。
加载和写入Parquet文件时,除了可以使用load()方法和save()方法外,还可以直接使用Spark SQL内置的parquet()方法,例如以下代码:

//读取Parquet文件为一个DataFrame
val usersDF = spark.read.parquet("hdfs://centos01:9000/users.parquet")
//将DataFrame相关数据保存为Parquet文件,包括Schema信息
usersDF.select("name","favorite_color")
  .write.parquet("hdfs://centos01:9000/result")

JSON数据集

Spark SQL可以自动推断JSON文件的Schema,并将其加载为DataFrame。在加载和写入JSON文件时,除了可以使用load()方法和save()方法外,还可以直接使用Spark SQL内置的json()方法。该方法不仅可以读写JSON文件,还可以将Dataset[String]类型的数据集转为DataFrame。

需要注意的是,要想成功的将一个JSON文件加载为DataFrame,JSON文件的每一行必须包含一个独立有效的JSON对象,而不能将一个JSON对象分散在多行。例如以下JSON内容可以被成功加载:

{"name":"zhangsan","age":32}
{"name":"lisi", "age":30}
{"name":"wangwu", "age":19}

使用json()方法加载JSON数据的例子如下代码所示:

//创建或得到SparkSession
val spark = SparkSession.builder()
.appName("SparkSQLDataSource")
.config("spark.sql.parquet.mergeSchema",true)
.master("local[*]")
.getOrCreate()

/****1. 创建用户基本信息表*****/
import spark.implicits._
//创建用户信息Dataset集合
val arr=Array(
 "{'name':'zhangsan','age':20}",
 "{'name':'lisi','age':18}"
)
val userInfo: Dataset[String] = spark.createDataset(arr)
//将Dataset[String]转为DataFrame
val userInfoDF = spark.read.json(userInfo)
//创建临时视图user_info
userInfoDF.createTempView("user_info")
//显示数据
userInfoDF.show()
// +---+--------+
// |age|    name|
// +---+--------+
// | 20|zhangsan|
// | 18|    lisi|
// +---+--------+

/****2. 创建用户成绩表*****/
//读取JSON文件
val userScoreDF = spark.read.json("D:\\people\\people.json")
//创建临时视图user_score
userScoreDF.createTempView("user_score")
userScoreDF.show()
// +--------+-----+
// |    name|score|
// +--------+-----+
// |zhangsan|   98|
// |    lisi|   88|
// |  wangwu|   95|
//      +--------+-----+
/****3. 根据name字段关联查询*****/
val resDF=spark.sql("SELECT i.age,i.name,c.score FROM user_info i " +
                      "JOIN user_score c ON i.name=c.name") 
resDF.show()
// +---+--------+-----+
// |age|    name|score|
// +---+--------+-----+
// | 20|zhangsan|   98|
// | 18|    lisi|   88|
// +---+--------+-----+

Hive 表

Spark SQL还支持读取和写入存储在Apache Hive中的数据。然而,由于Hive有大量依赖项,这些依赖项不包括在默认的Spark发行版中,如果在classpath上配置了这些Hive依赖项,Spark将自动加载它们。需要注意的是,这些Hive依赖项必须出现在所有Worker节点上,因为它们需要访问Hive序列化和反序列化库(SerDes),以便访问存储在Hive中的数据。
使用Spark SQL读取和写入Hive数据:
1、创建SparkSession对象
创建一个SparkSession对象,并开启Hive支持,代码:

val spark = SparkSession
  .builder()
  .appName("Spark Hive Demo")
  .enableHiveSupport()//开启Hive支持
  .getOrCreate()

2、创建Hive表
创建一张Hive表students,并指定字段分隔符为制表符“\t”,代码:

spark.sql("CREATE TABLE IF NOT EXISTS students (name STRING, age INT) " +
  "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'")

3、导入本地数据到Hive表
本地文件/home/hadoop/students.txt的内容如下(字段之间以制表符“\t”分隔):

zhangsan    20
lisi    25
wangwu  19

将本地文件/home/hadoop/students.txt中的数据导入到表students中,代码:

spark.sql("LOAD DATA LOCAL INPATH '/home/hadoop/students.txt'  INTO TABLE students")

4、查询表数据
查询表students的数据并显示到控制台,代码:

spark.sql("SELECT * FROM students").show()

显示结果:

+--------+---+
|    name|age|
+--------+---+
|zhangsan| 20|
|     lisi| 25|
|   wangwu| 19|
+--------+---+

5、创建表的同时指定存储格式
创建一个Hive表hive_records,数据存储格式为Parquet(默认为普通文本格式),代码:

spark.sql("CREATE TABLE hive_records(key STRING, value INT) STORED AS PARQUET")

6、将DataFrame写入Hive表
使用saveAsTable()方法可以将一个DataFrame写入到指定的Hive表中。例如,加载students表的数据并转为DataFrame,然后将DataFrame写入Hive表hive_records中,代码:

//加载students表的数据为DataFrame
val studentsDF = spark.table("students")
//将DataFrame以覆盖的方式写入表hive_records中
studentsDF.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
//查询hive_records表数据并显示到控制
spark.sql("SELECT * FROM hive_records").show()

Spark SQL应用程序写完后,需要提交到Spark集群中运行。若以Hive为数据源,提交之前需要做好Hive数据仓库、元数据库等的配置。

JDBC

Spark SQL还可以使用JDBC API从其他关系型数据库读取数据,返回的结果仍然是一个DataFrame,可以很容易地在Spark SQL中处理,或者与其他数据源进行连接查询。在使用JDBC连接数据库时可以指定相应的连接属性,常用的连接属性如表。

file

使用JDBC API对MySQL表student和表score进行关联查询,代码:

val jdbcDF = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://192.168.1.69:3306/spark_db")
  .option("driver","com.mysql.jdbc.Driver")
  .option("dbtable", "(select st.name,sc.score from student st,score sc " +
    "where st.id=sc.id) t")
  .option("user", "root")
  .option("password", "123456")
  .load()

上述代码中,dbtable属性的值是一个子查询,相当于SQL查询中的FROM关键字后的一部分。除了上述查询方式外,使用query属性编写完整SQL语句进行查询也能达到同样的效果,代码:

val jdbcDF = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://192.168.1.234:3306/spark_db")
  .option("driver","com.mysql.jdbc.Driver")
  .option("query", "select st.name,sc.score from student st,score sc " +
    "where st.id=sc.id")
  .option("user", "root")
  .option("password", "123456")
  .load()

Spark SQL内置函数

Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions中。这些函数主要分为10类:UDF函数、聚合函数、日期函数、排序函数、非聚合函数、数学函数、混杂函数、窗口函数、字符串函数、集合函数,大部分函数与Hive中相同。
使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL语句中使用。例如,以编程的方式使用lower()函数将用户姓名转为小写,代码如下:

//显示DataFrame数据(df指DataFrame对象)
df.show()
// +--------+
// |    name|
// +--------+
// |ZhangSan|
// |    LiSi|
// |  WangWu|
// +--------+
//使用lower()函数将某列转为小写
import org.apache.spark.sql.functions._
df.select(lower(col("name")).as("name")).show()
// +--------+
// |    name|
// +--------+
// |zhangsan|
// |    lisi|
// |  wangwu|
// +--------+

Spark SQL自定义函数

当Spark SQL提供的内置函数不能满足查询需求时,用户也可以根据自己的业务编写自定义函数
(User Defined Functions,UDF),然后在Spark SQL中调用。
Spark SQL提供了一些常用的聚合函数,如count()countDistinct()avg()max()min()等。此外,用户也可以根据自己的业务编写自定义聚合函数(User Defined Aggregate Functions,UDAF)。
UDF主要是针对单个输入,返回单个输出;而UDAF则可以针对多个输入进行聚合计算返回单个输出,功能更加强大。要编写UDAF,需要新建一个类,继承抽象类UserDefinedAggregateFunction,并实现其中未实现的方法。

Spark SQL开窗函数

row_number()开窗函数是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排序的顺序添加一列行号(从1开始),根据行号可以方便的对每一组数据取前N行(分组取TOPN)。row_number()函数的使用格式如下:

row_number() over (partition by 列名 order by 列名 desc) 行号列别名

格式说明:

  • partition by:按照某一列进行分组。
  • order by:分组后按照某一列进行组内排序。
  • desc:降序,默认升序。

Views: 138

Spark RDD 实战案例

IDEA 创建基于Maven的Spark项目

三台虚拟机搭建的集群
启动集成如下:

------------ hadoop100 ------------
26369 Master
2514 QuorumPeerMain
2898 Kafka
29011 SparkSubmit
30643 HRegionServer
3493 JobHistoryServer
3573 NodeManager
3065 NameNode
30476 HMaster
31036 Jps
28927 SparkSubmit
------------ hadoop101 ------------
5153 QuorumPeerMain
5537 Kafka
5905 ResourceManager
6018 NodeManager
104355 Jps
84258 Worker
104039 HRegionServer
5645 DataNode
104140 HMaster
------------ hadoop102 ------------
71683 Worker
5509 Kafka
104804 Jps
5752 SecondaryNameNode
5897 NodeManager
5643 DataNode
5116 QuorumPeerMain
104542 HRegionServer

本地环境由于需要打包放到虚拟机运行,因此scala版本需要和虚拟机中编译spark所用的scala的版本一致。如何知道虚拟机中spark是用的什么版本scala编译的呢?可以进入虚拟机的spark-shell查看:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.8
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_281)

可见2.4.8并不是官网所说的使用scala2.12,实际上是2.11

接下来使用IDEA创建一个MAVEN项目,使用scala模板
file
配置项目名称
file

配置有效的maven环境
file

修改pom文件:

  1. 设置scala的版本,我虚拟机里的Spark是用的scala2.11版本编译的,这里能找到最接近的可用的版本就是2.11.8。
  2. 添加maven-scala-plugin依赖,使用的是2.11的版本。
  3. 添加spark-core_2.11, 版本为2.4.8,和虚拟机安装的Spark版本保持一致
  4. 此外因为下面的示例还要操作hbase,因为又引入了hadoop和hbase相关的一些依赖,其中hadoop-common依赖排除了jackson-databind是因为出现了依赖冲突的问题。
  5. 为了演示官方示例(蒙特卡洛法求Pi值),引入了spark-sql依赖

最终完整的pom.xml文件内容如下:


    4.0.0
    cn.delucia
    SparkProject
    1.0-SNAPSHOT
    2008
    
        2.11.8
        2.4.8
        3.1.4
        2.2.3
        1.8
        1.8
        UTF-8
    

    
        
            scala-tools.org
            Scala-Tools Maven2 Repository
            http://scala-tools.org/repo-releases
        
    
    
        
            scala-tools.org
            Scala-Tools Maven2 Repository
            http://scala-tools.org/repo-releases
        
    

    
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            
                
                    com.fasterxml.jackson.core
                    jackson-databind
                
            
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        
        
            org.apache.hbase
            hbase-client
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-server
            ${hbase.version}
            
            
                
                    org.glassfish
                    javax.el
                
            
        
        
            org.apache.hbase
            hbase-mapreduce
            ${hbase.version}
        
        
            org.apache.spark
            spark-core_2.11
            ${spark.version}
        
        
            org.apache.spark
            spark-sql_2.11
            ${spark.version}
        
        
            org.scala-lang
            scala-library
            ${scala.version}
        
        
            mysql
            mysql-connector-java
            5.1.41
        
        
            junit
            junit
            4.4
            test
        
        
            org.specs
            specs
            1.2.5
            test
        
        
        
            org.scala-tools
            maven-scala-plugin
            2.15.2
        
    

    
        src/main/scala
        src/test/scala
        
            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        
                            compile
                            testCompile
                        
                    
                
                
                    ${scala.version}
                    
                        -target:jvm-1.8
                    
                
            
            
                org.apache.maven.plugins
                maven-eclipse-plugin
                2.5.1
                
                    true
                    
                        ch.epfl.lamp.sdt.core.scalabuilder
                    
                    
                        ch.epfl.lamp.sdt.core.scalanature
                    
                    
                        org.eclipse.jdt.launching.JRE_CONTAINER
                        ch.epfl.lamp.sdt.launching.SCALA_CONTAINER
                    
                
            
        
    
    
        
            
                org.scala-tools
                maven-scala-plugin
                
                    ${scala.version}
                
            
        
    

项目右键选择 maven->Reimport 等待依赖下载完成。
你会发现test包下面有一些自动生成的源文件有错误(依赖版本问题),这里直接把报错的文件删除,只保留一个AppTest.scala文件,可以点击测试以下环境,main包下面自动生成的文件也是直接删除即可。:

file

给项目根目录下创建data和output文件夹,最终项目结构:

file

实现单词计数

在data下面创建文本文件words.txt,内容如下

hello hadoop
hello java
scala

本地模式运行

package cn.delucia.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark RDD 单词计数程序
  * Program Args: data\words.txt output\wc
  * 注意:需提前删除output\wc文件夹
  */
object WordCountLocal {

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val conf = new SparkConf()
    //设置应用程序名称,可以在Spark WebUI中显示
    conf.setAppName("WordCountLocal")

    //设置集群Master节点访问地址
    //conf.setMaster("spark://hadoop100:7077"); //远程-提交到集群(standalone模式)
    conf.setMaster("local[2]")  // 本地模式 - 2个CPU核心

    //创建SparkContext对象,该对象是提交Spark应用程序的入口
    val sc = new SparkContext(conf);

    //读取指定路径(取程序执行时传入的第一个参数)中的文件内容,生成一个RDD集合
    val linesRDD:RDD[String] = sc.textFile(args(0))
    //将RDD数据按照空格进行切分并合并为一个新的RDD
    val wordsRDD:RDD[String] = linesRDD.flatMap(_.split(" "))
    //将RDD中的每个单词和数字1放到一个元组里,即(word,1)
    val paresRDD:RDD[(String, Int)] = wordsRDD.map((_,1))
    //对单词根据key进行聚合,对相同的key进行value的累加
    val wordCountsRDD:RDD[(String, Int)] = paresRDD.reduceByKey(_+_)
    //按照单词数量降序排列
    val wordCountsSortRDD:RDD[(String, Int)] = wordCountsRDD.sortBy(_._2,false)
    //保存结果到指定的路径(取程序执行时传入的第二个参数)
    wordCountsSortRDD.saveAsTextFile(args(1))
    //停止SparkContext,结束该任务
    sc.stop()
  }
}

直接点击运行,由于是两个核心,结果有两个文件生成
file

上传到集群运行 - Spark on YARN 模式

前提 集群已经安装好Hadoop集群并运行DFS和YARN
Spark已经配置好Spark on YARN的相关设置

编写程序

package cn.delucia.spark.rdd

import org.apache.spark.{SparkConf, SparkContext}

/**
 Spark RDD单词计数程序 - 打jar包上传到集群 Spark on YARN 模式
*/
object WordCountCluster {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("WordCountCluster")
    conf.setMaster("spark://hadoop100:7077")
    val sc = new SparkContext(conf)
    sc.textFile(args(0))
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)
      .saveAsTextFile(args(1))

    sc.stop()
  }
}

使用maven生命周期插件clean然后install打成jar包上传到集群的/opt/data目录下,将jar包改名位WordCountCluster.jar,进入spark的安装目录执行命令:

bin/spark-submit
--master yarn
--class cn.delucia.spark.rdd.WordCountCluster /opt/data/WordCountCluster.jar hdfs://hadoop100:8020/tmp/words.txt hdfs://hadoop100:8020/tmp/output/wc_cluster

即可把任务提交到YARN集群运行,可以去output目录可以查看结果
file

有两个文件生成说明有两个分区,因为设置了2个核心

求平均成绩

data下创建score.txt

Andy,98
Jack,87
Bill,99
Andy,78
Jack,85
Bill,86
Andy,90
Jack,88
Bill,76
Andy,58
Jack,67
Bill,79

编写程序:

package cn.delucia.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 求成绩平均分
  */
object AverageScoreLocal {
   def main(args: Array[String]): Unit = {
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      //设置应用程序名称,可以在Spark WebUI中显示
      conf.setAppName("AverageScore")
      //设置集群Master节点访问地址
      conf.setMaster("local")

      val sc = new SparkContext(conf)
      //1. 加载数据
      val linesRDD: RDD[String] = sc.textFile("data/avg_score.txt")
      //2. 将RDD中的元素转为(key,value)形式,便于后面进行聚合
      val tupleRDD: RDD[(String, Int)] = linesRDD.map(line => {
         val name = line.split("\t")(0)//姓名
         val score = line.split("\t")(1).toInt//成绩
         (name, score)
      })
      //3. 根据姓名进行分组,形成新的RDD
      val groupedRDD: RDD[(String, Iterable[Int])] = tupleRDD.groupByKey()
      //4. 迭代计算RDD中每个学生的平均分
      val resultRDD: RDD[(String, Int)] = groupedRDD.map(line => {
         val name = line._1//姓名
         val iteratorScore: Iterator[Int] = line._2.iterator//成绩迭代器
         var sum = 0//总分
         var count = 0//科目数量

         //迭代累加所有科目成绩
         while (iteratorScore.hasNext) {
            val score = iteratorScore.next()
            sum += score
            count += 1
         }
         //计算平均分
         val averageScore = sum / count
         (name, averageScore)//返回(姓名,平均分)形式的元组
      })
      //保存结果
      resultRDD.saveAsTextFile("output/avg_score")
   }
}

输出结果:

(BETA,45)
(Bill,85)
(Andy,81)
(Jack,81)

统计学生最好的三次成绩并倒序排列

package cn.delucia.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Spark分组取TopN程序
  */
object GroupTopNLocal {
   def main(args: Array[String]): Unit = {
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      //设置应用程序名称,可以在Spark WebUI中显示
      conf.setAppName("RDDGroupTopN")
      //设置集群Master节点访问地址,此处为本地模式,使用尽可能多的核心
      conf.setMaster("local[*]")

      val sc = new SparkContext(conf)
      //1. 加载本地数据
      val linesRDD: RDD[String] = sc.textFile("data/score.txt")

      //2. 将RDD元素转为(String,Int)形式的元组
      val tupleRDD:RDD[(String,Int)]=linesRDD.map(line=>{
         val name=line.split(",")(0)
         val score=line.split(",")(1)
         (name,score.toInt)
      })

      //3. 按照key(姓名)进行分组
      val top3=tupleRDD.groupByKey().map(groupedData=>{
         val name:String=groupedData._1
         //每一组的成绩降序后取前3个
         val scoreTop3:List[Int]=groupedData._2
           .toList.sortWith(_>_).take(3)
         (name,scoreTop3)//返回元组
      })

      //当使用多核心时,分区排序完就各自并行输出结果了,导致输出内容互相干扰
      //这里先调用count,由于count属于Action算子,会触发前面的计算完成
      //这样等top3里面的所有分区都计算完毕再输出,可以让输出格式更好
      top3.count

      //4. 循环打印分组结果
      top3.foreach(tuple=>{
         println("姓名:"+tuple._1)
         val tupleValue=tuple._2.iterator
         while (tupleValue.hasNext){
            val value=tupleValue.next()
            println("成绩:"+value)
         }
         println("*******************")
      })
   }
}

控制台输出:

姓名:Andy
成绩:98
成绩:90
成绩:78
*******************
姓名:Bill
成绩:99
成绩:86
成绩:79
*******************
姓名:Jack
成绩:88
成绩:87
成绩:85
*******************

倒排索引统计每日新增用户

package cn.delucia.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark RDD统计每日新增用户
  */
object DayNewUserLocal {
   def main(args: Array[String]): Unit = {
      val conf = new SparkConf()
      conf.setAppName("DayNewUser")
      conf.setMaster("local[*]")

      val sc = new SparkContext(conf)
      //1. 构建测试数据
      val tupleRDD:RDD[(String,String)] = sc.parallelize(
         Array(
            ("2020-01-01", "user1"),
            ("2020-01-01", "user2"),
            ("2020-01-01", "user3"),
            ("2020-01-02", "user1"),
            ("2020-01-02", "user2"),
            ("2020-01-02", "user4"),
            ("2020-01-03", "user2"),
            ("2020-01-03", "user5"),
            ("2020-01-03", "user6")
         )
      )
      //2. 倒排(互换RDD中元组的元素顺序)
      val tupleRDD2:RDD[(String,String)] = tupleRDD.map(
      line => (line._2, line._1)
      )
      //3. 将倒排后的RDD按照key分组
      val groupedRDD: RDD[(String, Iterable[String])] = tupleRDD2.groupByKey()
      //4. 取分组后的每个日期集合中的最小日期,并计数为1  (只有第一次登陆才算做当日新增用户)
      val dateRDD:RDD[(String,Int)] = groupedRDD.map(
         line => (line._2.min, 1)
      )
      //5. 计算所有相同key(即日期)的数量
      val resultMap: collection.Map[String, Long] = dateRDD.countByKey()
      //将结果Map循环打印到控制台
      resultMap.foreach(println)
   }

}

结果:

(2020-01-01,3)
(2020-01-02,1)
(2020-01-03,2)

自定义排序规则(二次排序)

先准备数组,data下创建文本文件

2 98
1 99
2 67
3 75
3 88
2 85
1 90
3 100
1 62

先根据第一列升序排列,如果相同再根据第二列降序排列

package cn.delucia.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 二次排序自定义key类
 * 先根据名字升序排列,如果名字相同再根据成绩降序排列
 * @param first  每一行的第一个字段
 * @param second 每一行的第二个字段
 */
class SecondSortKey(val first: Int, val second: Int)
  extends Ordered[SecondSortKey] with Serializable {
  override def toString: String = first + "->" + second;

  /**
   * 实现compare()方法
   */
  override def compare(that: SecondSortKey): Int = {
    //若第一个字段不相等,按照第一个字段升序排列
    if (this.first - that.first != 0) {
      this.first - that.first
    } else { //否则按照第二个字段降序排列
      that.second - this.second
    }
  }
}

/**
 * 二次排序运行主类
 */
object SecondSort {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val conf = new SparkConf()
    //设置应用程序名称,可以在Spark WebUI中显示
    conf.setAppName("SecondSort")
    //设置集群Master节点访问地址,此处为本地模式
    conf.setMaster("local")
    //创建SparkContext对象,该对象是提交Spark应用程序的入口
    val sc = new SparkContext(conf);

    //1. 读取指定路径的文件内容,生成一个RDD集合
    val lines: RDD[String] = sc.textFile("data\\sort.txt")
    //2. 将RDD中的元素转为(SecondSortKey, String)形式的元组
    val pair: RDD[(SecondSortKey, String)] = lines.map(line => (
      new SecondSortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),
      line)
    )
    //3. 按照元组的key(SecondSortKey的实例)进行排序
    val pairSort: RDD[(SecondSortKey, String)] = pair.sortByKey()
    //取排序后的元组中的第二个值(value值)
    //val result: RDD[String] = pairSort.map(line => line._2)
    //打印最终结果
    pairSort.foreach(line => println(line))
  }
}

控制台输出结果:

(1->99,1 99)
(1->90,1 90)
(1->62,1 62)
(2->98,2 98)
(2->85,2 85)
(2->67,2 67)
(3->100,3 100)
(3->88,3 88)
(3->75,3 75)

读写HBase数据库

HBase是Spark应用程序经常打交道的一个数据源。下面使用Spark程序对其进行读写操作:

使用HBase API向HBase写入数据

package cn.delucia.spark.rdd

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.ConnectionFactory
/**
  * 向HBase表写入数据 - hbase API
  */
object SparkWriteHBase {
   def main(args: Array[String]): Unit = {
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      conf.setAppName("SparkWriteHBase")
      conf.setMaster("local[*]")
      //创建SparkContext对象
      val sc = new SparkContext(conf)

      //1. 构建需要添加的数据RDD
      val initRDD = sc.makeRDD(
         Array(
            "003,王五,山东,23",
            "004,赵六,河北,20"
         )
      )

      //2. 循环RDD的每个分区
      initRDD.foreachPartition(partition=> {
         //2.1 设置HBase配置信息
         val hbaseConf = HBaseConfiguration.create()
         //设置ZooKeeper集群地址
         hbaseConf.set("hbase.zookeeper.quorum","hadoop100")
         //设置ZooKeeper连接端口,默认2181
         hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
         //创建数据库连接对象
         val conn = ConnectionFactory.createConnection(hbaseConf)
         //指定表名
         val tableName = TableName.valueOf("student")
         //获取需要添加数据的Table对象
         val table = conn.getTable(tableName)

         //2.2 循环当前分区的每行数据
         partition.foreach(line => {
            //分割每行数据,获取要添加的每个值
            val arr = line.split(",")
            val rowkey = arr(0)
            val name = arr(1)
            val address = arr(2)
            val age = arr(3)

            //创建Put对象
            val put = new Put(Bytes.toBytes(rowkey))
            put.addColumn(
               Bytes.toBytes("info"),//列族名
               Bytes.toBytes("name"),//列名
               Bytes.toBytes(name)//列值
            )
            put.addColumn(
               Bytes.toBytes("info"),//列族名
               Bytes.toBytes("address"),//列名
               Bytes.toBytes(address)) //列值
            put.addColumn(
               Bytes.toBytes("info"),//列族名
               Bytes.toBytes("age"),//列名
               Bytes.toBytes(age)) //列值

            //执行添加
            table.put(put)
         })
      })

   }
}

使用Spark API向HBase写入数据

package cn.delucia.spark.rdd

import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  * 向HBase表写入数据 使用 Spark API(saveAsHadoopDataset方法)
  */
object SparkWriteHBase2 {
   def main(args: Array[String]): Unit = {
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      conf.setAppName("SparkWriteHBase2")
      conf.setMaster("local[*]")
      //创建SparkContext对象
      val sc = new SparkContext(conf)

      //1. 设置配置信息
      //创建Hadoop JobConf对象
      val jobConf = new JobConf()
      //设置ZooKeeper集群地址
      jobConf.set("hbase.zookeeper.quorum","hadoop100")
      //设置ZooKeeper连接端口,默认2181
      jobConf.set("hbase.zookeeper.property.clientPort", "2181")
      //指定输出格式
      jobConf.setOutputFormat(classOf[TableOutputFormat])
      //指定表名
      jobConf.set(TableOutputFormat.OUTPUT_TABLE,"student")

      //2. 构建需要写入的RDD数据
      val initRDD = sc.makeRDD(
         Array(
            "005,王五,山东,23",
            "006,赵六,河北,20"
         )
      )

      //将RDD转换为(ImmutableBytesWritable, Put)类型
      val resultRDD: RDD[(ImmutableBytesWritable, Put)] = initRDD.map(
         _.split(",")
      ).map(arr => {
         val rowkey = arr(0)
         val name = arr(1)//姓名
         val address = arr(2)//地址
         val age = arr(3)//年龄

         //创建Put对象
         val put = new Put(Bytes.toBytes(rowkey))
         put.addColumn(
            Bytes.toBytes("info"),//列族
            Bytes.toBytes("name"),//列名
            Bytes.toBytes(name)//列值
         )
         put.addColumn(
            Bytes.toBytes("info"),//列族
            Bytes.toBytes("address"),//列名
            Bytes.toBytes(address)) //列值
         put.addColumn(
            Bytes.toBytes("info"),//列族
            Bytes.toBytes("age"),//列名
            Bytes.toBytes(age)) //列值

         //拼接为元组返回
         (new ImmutableBytesWritable, put)
      })

      //3. 写入数据
      resultRDD.saveAsHadoopDataset(jobConf)
      sc.stop()
   }
}

批量向Hbase写入数据

使用BulkLoader加载HFile数据到HBase:
参考阅读:
https://blog.cloudera.com/how-to-use-hbase-bulk-loading-and-why/
https://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/

package cn.delucia.spark.rdd

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark 批量写入数据到HBase(使用BulkLoader加载HFile数据到HBase)
  */
object SparkWriteHBase3 {
   def main(args: Array[String]): Unit = {
      System.setProperty("HADOOP_USER_NAME", "hadoop")
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      conf.setAppName("SparkWriteHBase3")
      conf.setMaster("local[*]")
      //创建SparkContext对象
      val sc = new SparkContext(conf)

      //1. 设置HDFS和HBase配置信息
      val hadoopConf = new Configuration()
      hadoopConf.set("fs.defaultFS", "hdfs://hadoop100:8020")
      val fileSystem = FileSystem.get(hadoopConf)
      val hbaseConf = HBaseConfiguration.create(hadoopConf)
      //设置ZooKeeper集群地址
      hbaseConf.set("hbase.zookeeper.quorum", "hadoop100")
      //设置ZooKeeper连接端口,默认2181
      hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
      //创建数据库连接对象
      val conn = ConnectionFactory.createConnection(hbaseConf)
      //指定表名
      val tableName = TableName.valueOf("student")
      //获取需要添加数据的Table对象
      val table = conn.getTable(tableName)
      //获取操作数据库的Admin对象
      val admin = conn.getAdmin()

      //2. 添加数据前的判断
      //如果HBase表不存在,则创建一个新表
      if (!admin.tableExists(tableName)) {
         val desc = new HTableDescriptor(tableName)
         //表名
         val hcd = new HColumnDescriptor("info") //列族
         desc.addFamily(hcd)
         admin.createTable(desc) //创建表
      }
      //如果存放HFile文件的HDFS目录已经存在,则删除
      if (fileSystem.exists(new Path("hdfs://hadoop100:8020/tmp/hbase"))) {
         fileSystem.delete(new Path("hdfs://hadoop100:8020/tmp/hbase"), true)
      }

      //3. 构建需要添加的RDD数据
      //初始数据
      val initRDD = sc.makeRDD(
         Array(
            "rowkey:007,name:王五",
            "rowkey:007,address:山东",
            "rowkey:007,age:23",
            "rowkey:008,name:赵六",
            "rowkey:008,address:河北",
            "rowkey:008,age:20"
         )
      )
      //数据转换
      //转换为(ImmutableBytesWritable, KeyValue)类型的RDD
      val resultRDD: RDD[(ImmutableBytesWritable, KeyValue)] = initRDD.map(
         _.split(",")
      ).map(arr => {
         val rowkey = arr(0).split(":")(1)
         //rowkey
         val qualifier = arr(1).split(":")(0)
         //列名
         val value = arr(1).split(":")(1) //列值

         val kv = new KeyValue(
            Bytes.toBytes(rowkey),
            Bytes.toBytes("info"),
            Bytes.toBytes(qualifier),
            Bytes.toBytes(value)
         )
         //构建(ImmutableBytesWritable, KeyValue)类型的元组返回
         (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), kv)
      })

      //4. 写入数据
      //在HDFS中生成HFile文件
      hbaseConf.set("hbase.mapreduce.hfileoutputformat.table.name",tableName.getNameAsString)
      resultRDD.saveAsNewAPIHadoopFile(
         "hdfs://hadoop100:8020/tmp/hbase",
         classOf[ImmutableBytesWritable], //对应RDD元素中的key
         classOf[KeyValue], //对应RDD元素中的value
         classOf[HFileOutputFormat2],
         hbaseConf
      )
      //加载HFile文件到HBase
      val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
      val regionLocator = conn.getRegionLocator(tableName)
      bulkLoader.doBulkLoad(
         new Path("hdfs://hadoop100:8020/tmp/hbase"), //HFile文件位置
         admin, //操作HBase数据库的Admin对象
         table, //目标Table对象(包含表名)
         regionLocator //RegionLocator对象,用于查看单个HBase表的区域位置信息
      )
      sc.stop()
   }
}

读取Hbase数据

前面学了三钟方法向HBase数据库插入数据,并且一共向Hbase的student表写入了6条记录,下面我们写一个程序从HBase中把它们读取出来

package cn.delucia.spark.rdd

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
/**
  * Spark读取HBase表数据
  */
object SparkReadHBase {
   def main(args: Array[String]): Unit = {
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      conf.setAppName("SparkReadHBase")
      conf.setMaster("local[*]")
      //创建SparkContext对象
      val sc = new SparkContext(conf)

      //1. 设置HBase配置信息
      val hbaseConf = HBaseConfiguration.create()
      //设置ZooKeeper集群地址
      hbaseConf.set("hbase.zookeeper.quorum","hadoop100")
      //设置ZooKeeper连接端口,默认2181
      hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
      //指定表名
      hbaseConf.set(TableInputFormat.INPUT_TABLE, "student")

      //2. 读取HBase表数据并转化成RDD
      val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
         hbaseConf,
         classOf[TableInputFormat],
         classOf[ImmutableBytesWritable],
         classOf[Result]
      )

      //3. 输出RDD中的数据到控制台
      hbaseRDD.foreach{ case (_ ,result) =>
         //获取行键
         val key = Bytes.toString(result.getRow)
         //通过列族和列名获取列值
         val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
         val gender = Bytes.toString(result.getValue("info".getBytes,"address".getBytes))
         val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
         println("行键:"+key+"\t姓名:"+name+"\t地址:"+gender+"\t年龄:"+age)
      }
   }
}

输出

行键:003  姓名:王五   地址:山东   年龄:23
行键:004  姓名:赵六   地址:河北   年龄:20
行键:005  姓名:王五   地址:山东   年龄:23
行键:006  姓名:赵六   地址:河北   年龄:20
行键:007  姓名:王五   地址:山东   年龄:23
行键:008  姓名:赵六   地址:河北   年龄:20

解决数据倾斜问题

避免数据倾斜的办法还有很多,比如:

  1. 对数据进行预处理
  2. 过滤掉没有意义的数据
  3. 提高shuffle的并行度,增加分区数量
    但这不能解决大量相同key导致的单个分区数据过多的问题

对于存在大量相同的key导致数据集中在某个分区的问题,这里给出一个解决办法:

添加随机前缀进行双重集合:

  1. 首先给数据添加随机前缀,进行分区内的局部聚合
  2. 然后去除随机前缀,进行全局聚合
package cn.delucia.spark.rdd

import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random

/**
  * Spark RDD解决数据倾斜案例 - 由于存在大量相同的key导致数据集中在某个分区
 *  解决办法:添加随机前缀进行双重集合)
  * 1. 首先给数据添加随机前缀,进行分区内的局部聚合
  * 2. 然后去除随机前缀,进行全局聚合
  */
object DataLean {
   def main(args: Array[String]): Unit = {
      //创建Spark配置对象
      val conf = new SparkConf();
      conf.setAppName("DataLean")
      conf.setMaster("local[*]")

      //创建SparkContext对象
      val sc = new SparkContext(conf)

      //1. 读取测试数据
      val linesRDD = sc.textFile("data/data.txt")
      //2. 统计单词数量
      linesRDD
        .flatMap(_.split(" "))
        .map((_, 1))
        .map(t => {
           val word = t._1
           val random = Random.nextInt(100)//产生0~99的随机数
           //单词加入随机数前缀,格式:(前缀_单词,数量)
           (random + "_" + word, 1)
        })
        .reduceByKey(_ + _)//局部聚合
        .map(t => {
         val word = t._1
         val count = t._2
         val w = word.split("_")(1)//去除前缀
         //单词去除随机数前缀,格式:(单词,数量)
         (w, count)
      })
        .reduceByKey(_ + _)//全局聚合
        //输出结果到指定的HDFS目录
        .saveAsTextFile("output/data_lean")
   }
}

Views: 79

Spark核心源码分析

01 Spark集群启动原理

Spark集群启动时,会在当前节点(脚本执行节点)上启动Master,在配置文件conf/slave中指定的每个节点上启动一个Worker。而Spark集群是通过脚本启动的。
查看启动脚本的内容:

$ cat sbin/start-all.sh

内容:

#!/usr/bin/env bash
#启动所有spark守护进程
#在此节点上启动Master
#在conf/slave中指定的每个节点上启动一个Worker
 #如果SPARK_HOME环境变量为空,则使用export设置
if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "dirname "$0""/..; pwd)"
fi
#加载Spark配置
. "${SPARK_HOME}/sbin/spark-config.sh"
#启动Master
"${SPARK_HOME}/sbin"/start-master.sh
#启动Workers
"${SPARK_HOME}/sbin"/start-slaves.sh

从启动脚本的内容和注释可以看出,start-all.sh脚本主要做了四件事:

(1)检查并设置环境变量
(2)加载Spark配置
Spark配置的加载,使用了脚本文件sbin/spark-config.sh。
(3)启动Master进程
Master进程的启动,使用了脚本文件sbin/start-master.sh。
(4)启动Worker进程
Worker进程的启动,使用了脚本文件sbin/start-slaves.sh,该文件负责批量启动集群中各个节点上的Worker进程。

02 Spark应用程序提交原理

Spark应用程序的提交入口是spark-submit脚本。除此之外,也可以使用spark-shell脚本和spark-sql脚本进入交互式命令行执行Spark程序。
查看spark-shell脚本的内容:

$ cat bin/spark-shell

内容关键代码:

#!/usr/bin/env bash
......
function main() {
  if $cygwin; then
    stty -icanon min 1 -echo > /dev/null 2>&1
    export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name
 "Spark shell" "$@"
    stty icanon echo > /dev/null 2>&1
  else
    export SPARK_SUBMIT_OPTS
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name
 "Spark shell" "$@"
  fi
}
......

从关键代码可以看出,spark-shell脚本调用了应用程序提交脚本spark-submit,并传入了org.apache.spark.repl.Main类和使用spark-shell命令时的所有参数(例如--master)。通过org.apache.spark.repl.Main类启动的spark-shell将进入REPL模式(交互式编程环境),一直等待用户的输入,除非手动退出。
查看spark-sql脚本的内容:

$ cat bin/spark-sql

代码如下:

#!/usr/bin/env bash
if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

export _SPARK_CMD_USAGE="Usage: ./bin/spark-sql [options] [cli option]"
exec "${SPARK_HOME}"/bin/spark-submit --class 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"

从上述代码可以看出,spark-sql脚本同样调用了应用程序提交脚本spark-submit。这说明,无论采用哪种方式提交应用程序,都间接的调用了spark-submit脚本。查看spark-submit脚本的内容,发现最后一行有如下代码:

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@“

可以看出,最后使用exec命令执行了脚本bin/spark-class,并传入了org.apache.spark.deploy.SparkSubmit类和使用spark-submit命令时的所有参数。脚本bin/spark-class用于执行Spark应用程序并启动JVM进程,而这里执行的正是SparkSubmit类。

从上面的分析可以总结出,若通过spark-shell进行启动,将默认传入以下参数:

org.apache.spark.deploy.SparkSubmit
--class org.apache.spark.repl.Main
--name "Spark shell"

若通过spark-submit提交应用程序,将默认传入以下参数:

org.apache.spark.deploy.SparkSubmit

03 Spark作业工作原理

在学习Spark作业的工作原理时,通常会引入Hadoop MapReduce的工作原理作为入门比较,因为MapReduce与Spark的工作原理有很多相似之处。

1、MapReduce工作原理

MapReduce计算模型主要由三个阶段组成:Map阶段、Shuffle阶段、Reduce阶段。

file

(1)Map阶段

将输入的多个分片(Split)由Map任务以完全并行的方式处理。每个分片由一个Map任务来处理。默认情况下,输入分片的大小与HDFS中数据块(Block)的大小是相同的,即文件有多少个数据块就有多少个输入分片,也就会有多少个Map任务。从而可以调整HDFS数据块的大小来间接改变Map任务的数量。

每个Map任务对输入分片中的记录按照一定的规则解析成多个<key,value>对。默认将文件中的每一行文本内容解析成一个<key,value>对,key为每一行的起始位置,value为本行的文本内容。然后将解析出的所有<key,value>对分别输入到map()方法中进行处理(map()方法一次只处理一个<key,value>对)。map()方法将处理结果仍然是以<key,value>对的形式进行输出。

在数据溢写到磁盘之前,会对数据进行分区(Partition)。分区的数量与设置的Reduce任务的数量相同(默认Reduce任务的数量为1,可以在编写MapReduce程序时对其修改)。这样每个Reduce任务会处理一个分区的数据,可以防止有的Reduce任务分配的数据量太大,而有的Reduce任务分配的数据量太小,从而可以负载均衡,避免数据倾斜。数据分区的划分规则为:取<key,value>对中key的hashCode值,然后除以Reduce任务数量后取余数,余数则是分区编号,分区编号一致的<key,value>对则属于同一个分区。因此,key值相同的<key,value>对一定属于同一个分区,但是同一个分区中可能有多个key值不同的<key,value>对。由于默认Reduce任务的数量为1,而任何数字除以1的余数总是0,因此分区编号从0开始。

(2)Reduce阶段

Reduce阶段首先会对Map阶段的输出结果按照分区进行再一次合并,将同一分区的<key,value>对合并到一起,然后按照key对分区中的<key,value>对进行排序。
每个分区会将排序后的<key,value>对按照key进行分组,key相同的<key,value>对将合并为<key,value-list>对,最终每个分区形成多个<key,value-list>对。例如,key中存储的是用户ID,则同一个用户的<key,value>对会合并到一起。
排序并分组后的分区数据会输入到reduce()方法中进行处理,reduce()方法一次只能处理一个<key,value-list>对。
最后,reduce()方法将处理结果仍然以<key,value>对的形式通过context.write(key,value)进行输出。

(3)Shuffle阶段

Shuffle阶段所处的位置是Map任务输出后,Reduce任务接收前。主要是将Map任务的无规则输出形成一定的有规则数据,以便Reduce任务进行处理。
总结来说,MapReduce的工作原理主要是:通过Map任务读取HDFS中的数据块,这些数据块由Map任务以完全并行的方式处理;然后将Map任务的输出进行排序后输入到Reduce任务中;最后Reduce任务将计算的结果输出到HDFS文件系统中。

2、Spark工作原理

典型的Spark作业(Job)的工作流程如图:

file

(1)从数据源(本地文件、HDFS、HBase等)读取数据并创建RDD。
(2)对RDD进行一系列的转化操作。
(3)对最终RDD执行行动操作,开始一系列的计算,产生计算结果。
(4)将计算结果发送到Driver端,进行查看和输出。

04 Spark检查点原理

通过调用SparkContext的setCheckpointDir()方法指定检查点数据的存储路径,即把RDD数据放到什么位置,通常是放到HDFS中(如果在集群中运行,则必须是HDFS目录)。

总结来说,RDD检查点的运行流程如下:
(1)通过SparkContext的setCheckpointDir()方法设置RDD检查点数据的存储路径。
(2)调用RDD的checkpoint()方法将RDD标记为检查点。
(3)当在RDD上运行一个作业后,会立即触发RDDCheckpointData中的checkpoint()方法。
(4)在checkpoint()方法中又执行了doCheckpoint()方法。
(5)doCheckpoint()方法中执行了writeRDDToCheckpointDirectory()方法。
(6)writeRDDToCheckpointDirectory()方法内部通过调用runJob()方法运行一个作业,真正将RDD数据写入到检查点目录中,写入完成后返回一个ReliableCheckpointRDD实例。

Views: 87

Spark RDD弹性分布式数据集

01 什么是RDD

Spark提供了一种对数据的核心抽象,称为弹性分布式数据集(Resilient Distributed Dataset,简称RDD)。这个数据集的全部或部分可以缓存在内存中,并且可以在多次计算时重用。RDD其实就是一个分布在多个节点上的数据集合。

RDD的弹性主要是指:当内存不够时,数据可以持久化到磁盘,并且RDD具有高效的容错能力。分布式数据集是指:一个数据集存储在不同的节点上,每个节点存储数据集的一部分。

例如,将数据集(hello,world,scala,spark,love,spark,happy)存储在三个节点上,节点一存储(hello,world),节点二存储(scala,spark,love),节点三存储(spark,happy),这样对三个节点的数据可以并行计算,并且三个节点的数据共同组成了一个RDD。

file

分布式数据集类似于HDFS中的文件分块,不同的块存储在不同的节点上;而并行计算类似使用MapReduce读取HDFS中的数据并进行Map和Reduce操作。Spark则包含了这两种功能,并且计算更加灵活。

RDD的主要特征如下:

  • RDD是不可变的,但是可以将RDD转换成新的RDD进行操作
  • RDD是可分区的,一个RDD可以由很多分区组成(数据存在多个节点上),每个分区对应一个Task来执行
  • 对RDD的操作是作用在RDD的每个分区上
  • RDD拥有一系列对分区进行计算的函数,成为算子
  • RDD之间存在依赖关系,可以实现管道化,避免中间数据的存储

在编程时,可以把RDD看作是一个数据操作的基本单位,而不必关心数据的分布式特性,Spark会自动将RDD的数据分发到集群的各个节点。Spark中对数据的操作主要是对RDD的操作(创建、转化、求值)。

1、从对象集合创建RDD

Spark可以通过parallelize()或makeRDD()方法将一个对象集合转化为RDD。
例如,将一个List集合转化为RDD:

scala> val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0]

scala> val rdd=sc.makeRDD(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1]

从返回信息可以看出,上述创建的RDD中存储的是Int类型数据。实际上RDD也是一个集合(并行集合),与常用的List集合不同的是,RDD集合的数据分布于多台机器上。

2、从外部存储创建RDD

Spark的textFile()方法可以读取本地文件系统或外部其它系统中的数据,并创建RDD。所不同的是,数据的来源路径不同。

首先在本地准备文件"/tmp/words.txt,内容如下:

hello hadoop
hello java
scala

进入Spark Shell读取本地数据(加载本地文件,必须以file:///开头):

//读取本地数据
scala> val rdd=sc.textFile("file:///tmp/words.txt")
rdd: org.apache.spark.rdd.RDD[String] = /tmp/words.txt MapPartitionsRDD[1]

scala> rdd.collect
res1: Array[String] = Array("hello hadoop ", "hello java ", "scala ")

异常问题

Spark Shell 调用 rdd.collect报错

spark Compression codec com.hadoop.compression.lzo.LzoCodec not found.

这是因为在hadoop中配置了编解码器lzo,所以当使用yarn模式时,spark自身没有lzo的jar包所以报错无法找到!
解决办法:
配置spark-default.conf文件!
修改spark-defaults.sh, 添加:

spark.jars /opt/pkg/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar
读取本地数据时报错路径不存在

相似问题

这个问题发生在集群方式提交本地文件的时候。

这是因为文件的读取是发生在Executor所在的节点(即Worker节点),只有将文件tmp/words.txt同步到所有Worker节点才能避免路径不存在的问题发生。

有意思的地方是,即使不同步到所有Worker节点,先调用rdd.first拉取第一条数据之后再调用rdd.collect就可以获取到数据了(中间也会出现文件找不到问题,但是仍可以获取到结果)。原因就是RDD的collect方法属于转化算子(算子的概念后面会讲)是有惰性的(不是一开始马上收集,需要等待动作算子(如first或者count)操作数据时才会收集)才会主动收集数据

解决办法就是:

  1. 改为Local模式运行 $ spark-shell --master local
  2. 把文件file:///tmp/words.txt同步到所有salve节点。
  3. 把文件先上传到HDFS,然后再读取HDFS上的文件

读取HDFS数据

先将数据上传到HDFS

$ hadoop fs -put /tmp/words.txt /tmp/

再读取HDFS数据

//读取HDFS数据
scala> val rdd=sc.textFile("hdfs://centos01:8082/tmp/words.txt")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://centos01:8082/tmp/words.txt MapPartitionsRDD[2]

scala> rdd.collect
res2: Array[String] = Array("hello hadoop ", "hello java ", "scala ")

02 RDD常用算子

RDD被创建后是只读的,不允许修改。Spark提供了丰富的用于操作RDD的方法,这些方法被称为算子。一个创建完成的RDD只支持两种算子:转化(Transformation)算子和行动(Action)算子。

1、转化算子

Spark中的转化算子主要时对RDD进行操作并返回新的RDD。

(1)map(func)

对rdd1应用map()算子,将rdd1中的每个元素加1并返回一个名为rdd2的新RDD:

scala> val rdd1=sc.parallelize(List(1,2,3,4,5,6))
scala> val rdd2=rdd1.map(x => x+1)

(2)filter(func)

对源RDD的每个元素应用函数func进行过滤,并返回一个新的RDD。
例如,过滤出rdd1中大于3的所有元素,并输出结果:

scala> val rdd1=sc.parallelize(List(1,2,3,4,5,6))
scala> val rdd2=rdd1.filter(_>3)
scala> rdd2.collect
res1: Array[Int] = Array(4, 5, 6)

上述代码中的下划线“_”代表rdd1中的每个元素。

(3)flatMap(func)

与map()算子类似,但是每个传入给函数func的RDD元素会返回0到多个元素,最终会将返回的所有元素合并到一个RDD。
例如,将集合List转为rdd1,然后调用rdd1的flatMap()算子将rdd1的每个元素按照空格进行分割成多个元素,最终合并所有元素到一个新的RDD。

scala> val rdd1=sc.parallelize(List("hadoop hello scala","spark hello"))
scala> val rdd2=rdd1.flatMap(_.split(" "))
scala> rdd2.collect
res3: Array[String] = Array(hadoop, hello, scala, spark, hello)

file

(4)reduceByKey()

reduceByKey()算子的作用对像是元素为(key,value)形式(Scala元组)的RDD,使用该算子可以将相同key的元素聚集到一起,最终把所有相同key的元素合并成为一个元素。该元素的key不变,value可以聚合成一个列表或者进行求和等操作。最终返回的RDD的元素类型和原有类型保持一致。
例如,有两个同学zhangsan和lisi,zhangsan的语文和数学成绩分别为98、78,lisi的语文和数学成绩分别为88、79,现需要分别求zhangsan和lisi的总成绩,代码如下:

scala> val list=List(("zhangsan",98),("zhangsan",78),("lisi",88),("lisi",79))
scala> val rdd1=sc.parallelize(list)
scala> val rdd2=rdd1.reduceByKey((x,y)=>x+y) //可以简化为rdd1.reduceByKey(_+_)
scala> rdd2.collect
res5: Array[(String, Int)] = Array((zhangsan,176), (lisi,167))

file

(5)groupByKey()

groupByKey()算子的作用对像是元素为(key,value)形式(Scala元组)的RDD,使用该算子可以将相同key的元素聚集到一起,最终把所有相同key的元素合并成为一个元素。该元素的key不变,value则聚集到一个集合中。
仍然以上述求学生zhangsan和lisi的总成绩为例,使用groupByKey()算子的代码如下:

scala> val list=List(("zhangsan",98),("zhangsan",78),("lisi",88),("lisi",79))
scala> val rdd1=sc.parallelize(list)
scala> val rdd2=rdd1.groupByKey()
scala> rdd2.map(x => (x._1,x._2.sum)).collect
res0: Array[(String, Int)] = Array((zhangsan,176), (lisi,167))

file

(6)union()

union()算子将两个RDD合并为一个新的RDD,主要用于对不同的数据来源进行合并,两个RDD中的数据类型要保持一致。
例如以下代码,通过集合创建了两个RDD,然后将两个RDD合并成了一个RDD:

scala> val rdd1=sc.parallelize(Array(1,2,3))
scala> val rdd2=sc.parallelize(Array(4,5,6))
scala> val rdd3=rdd1.union(rdd2)
scala> rdd3.collect
res8: Array[Int] = Array(1, 2, 3, 4, 5, 6) 

(7)sortBy()

sortBy()算子将RDD中的元素按照某个规则进行排序。该算子的第一个参数为排序函数,第二个参数是一个布尔值,指定升序(默认)或降序。若需要降序排列,需将第二个参数置为false。
例如,一个数组中存放了三个元组,将该数组转为RDD集合,然后对该RDD按照每个元素中的第二个值进行降序排列,代码如下:

scala> val rdd1=sc.parallelize(Array(("hadoop",12),("java",32),("spark",22)))
scala> val rdd2=rdd1.sortBy(x=>x._2,false)
scala> rdd2.collect
res2: Array[(String, Int)] = Array((java,32),(spark,22),(hadoop,12))

(8)sortByKey()

sortByKey()算子将(key,value)形式的RDD按照key进行排序。默认升序,若需降序排列,可以传入参数false,代码如下:
rdd.sortByKey(false)

(9)join ()

join()算子将两个(key,value)形式的RDD根据key进行连接操作,相当于数据库的内连接(inner join),只返回两个RDD都匹配的内容。例如,将rdd1和rdd2进行内连接,代码如下:

scala> val arr1= Array(("A","a1"),("B","b1"),("C","c1"),("D","d1"),("E","e1")
scala> val rdd1 = sc.parallelize(arr1))
scala> val arr2= Array(("A","A1"),("B","B1"),("C","C1"),("C","C2"),("C","C3"),("E","E1"))
scala> val rdd2 = sc.parallelize(arr2)
scala> rdd1.join(rdd2).collect
res0: Array[(String, (String, String))] = Array((B,(b1,B1)), (A,(a1,A1)), (C,(c1,C1)), (C,(c1,C2)), (C,(c1,C3)), (E,(e1,E1)))
scala> rdd2.join(rdd1).collect
res1: Array[(String, (String, String))] = Array((B,(B1,b1)), (A,(A1,a1)), (C,(C1,c1)), (C,(C2,c1)), (C,(C3,c1)), (E,(E1,e1)))

file

2、行动算子

Spark中的转化算子并不会马上进行运算,而是在遇到行动算子时才会执行相应的语句,触发Spark的任务调度。

file

2、行动算子

(1)reduce()

将数字1到100所组成的集合转为RDD,然后对该RDD进行reduce()算子计算,统计RDD中所有元素值的总和,代码如下:

scala> val rdd1 = sc.parallelize(1 to 100)
scala> rdd1.reduce(_+_)
res2: Int = 5050
scala> val rdd1 = sc.parallelize(1 to 100)
scala> rdd1.count
res3: Long = 100

(2)count()

统计RDD集合中元素的数量:

scala> val rdd1 = sc.parallelize(1 to 100)
scala> rdd1.reduce(_+_)
res2: Int = 5050
scala> val rdd1 = sc.parallelize(1 to 100)
scala> rdd1.count
res3: Long = 100

(3)countByKey()

例如,List集合中存储的是键值对形式的元组,使用该List集合创建一个RDD,然后对其进行countByKey()的计算:

scala> val rdd1 = sc.parallelize(List(("zhang",87),("zhang",79),("li",90)))
scala> rdd1.countByKey
res1: scala.collection.Map[String,Long] = Map(zhang -> 2, li -> 1) 

scala> val rdd1 = sc.parallelize(1 to 100)
scala> rdd1.take(5)
res4: Array[Int] = Array(1, 2, 3, 4, 5)

(4)take(n)

返回集合中前5个元素组成的数组:

scala> val rdd1 = sc.parallelize(List(("zhang",87),("zhang",79),("li",90)))
scala> rdd1.countByKey
res1: scala.collection.Map[String,Long] = Map(zhang -> 2, li -> 1) 

scala> val rdd1 = sc.parallelize(1 to 100)
scala> rdd1.take(5)
res4: Array[Int] = Array(1, 2, 3, 4, 5)

03 RDD的分区

RDD是一个大的数据集合,该集合被划分成多个子集合分布到了不同的节点上,而每一个子集合就称为分区(Partition)。因此也可以说,RDD是由若干个分区组成的。
RDD各个分区中的数据可以并行计算,因此分区的数量决定了并行计算的粒度。Spark会给每一个分区分配一个单独的Task任务对其进行计算,因此并行Task的数量是由分区的数量决定的。RDD分区的一个分区原则是使得分区的数量尽量等于集群中CPU核心数量。
file

04 RDD的依赖

Spark中,对RDD的每一次转化操作都会生成一个新的RDD,由于RDD的懒加载特性,新的RDD会依赖原有RDD,因此RDD之间存在类似流水线的前后依赖关系。这种依赖关系分为两种:窄依赖和宽依赖。

1、窄依赖

窄依赖是指父RDD的一个分区最多被子RDD的一个分区所用。也就是说,父RDD的分区与子RDD的分区的对应关系为一对一或多对一。例如map()、filter()、union()等操作都会产生窄依赖。
file

2、宽依赖

宽依赖是指,父RDD的一个分区被子RDD的多个分区所用。也就是说,父RDD的分区与子RDD的分区的对应关系为多对多。例如groupByKey()、reduceByKey()、sortByKey()等操作都会产生宽依赖。
file

Stage 划分

在Spark中,对每一个RDD的操作都会生成一个新的RDD,将这些RDD用带方向的直线连接起来(从父RDD连接到子RDD)会形成一个关于计算路径的有向无环图,称为DAG(Directed Acyclic Graph)。
file
Spark会根据DAG将整个计算划分为多个阶段,每个阶段称为一个Stage。每个Stage由多个Task任务并行进行计算,每个Task任务作用在一个分区上,一个Stage的总Task任务数量是由Stage中最后一个RDD的分区个数决定。

Stage的划分依据为是否有宽依赖,即是否有Shuffle。Spark调度器会从DAG图的末端向前进行递归划分,遇到Shuffle则进行划分,Shuffle之前的所有RDD组成一个Stage,整个DAG图为一个Stage。经典的单词计数执行流程的Stage划分:
file
比较复杂一点的Stage划分:
file

为说明要根据宽依赖划分Stage呢?这是因为窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join操作(此join非上文的join算子,而是指同步多个并行任务的barrier): 把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果子RDD的分区到 父RDD的分区是窄依赖,就可以实施优化把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个 fork/join并为一个,这将极大地提升性能。Spark把这个叫做 流水线(pipeline)优化。

05 RDD的持久化

Spark中的RDD是懒加载的,只有当遇到行动算子时才会从头计算所有RDD,而且当同一个RDD被多次使用时,每次都需要重新计算一遍,这样会严重增加消耗。为了避免重复计算同一个RDD,可以将RDD进行持久化。
Spark中最重要的功能之一是可以将某个RDD中的数据保存到内存或者磁盘中,每次需要对这个RDD进行算子操作时,可以直接从内存或磁盘中取出该RDD的持久化数据,而不需要从头计算才能得到这个RDD。
例如有多个RDD,它们的依赖关系如图。若RDD3没有持久化保存,则每次对RDD3进行操作时都需要从textFile()开始计算,将文件数据转化为RDD1,再转化为RDD2,最终才得到RDD3。
可以在RDD上使用persist()或cache()方法来标记要持久化的RDD(cache()方法实际上底层调用的是persist()方法)。

file

RDD的检查点

RDD的检查点机制(Checkpoint)相当于对RDD数据进行快照,可以将经常使用的RDD快照到指定的文件系统中,最好是共享文件系统,例如HDFS。当机器发生故障导致内存或磁盘中的RDD数据丢失时可以快速从快照中对指定的RDD进行恢复,而不需要根据RDD的依赖关系从头进行计算,大大提高了计算效率。与cache()或者persist()将RDD数据存放到内存或者磁盘中的不同有以下几点:

(1)cache()或者persist()是将数据存储于机器本地的内存或磁盘,当机器故障时无法进行数据恢复,而检查点是将RDD数据存储于外部的共享文件系统(例如HDFS),共享文件系统的副本机制保证了数据的可靠性。

(2)在Spark应用程序执行结束后,cache()或者persist()存储的数据将被清空,而检查点存储的数据不会受影响,将永久存在,除非手动将其移除。因此,检查点数据可以被下一个Spark应用程序使用,而cache()或者persist()数据只能被当前Spark应用程序使用。

val sc = new SparkContext(conf);
//设置检查点数据存储路径
sc.setCheckpointDir("hdfs://centos01:8020/spark-ck")

06 共享变量

通常情况下,Spark应用程序运行的时候,Spark算子(例如map(func)或filter(func))中的函数func会被发送到远程的多个Worker节点上执行,如果一个算子中使用了某个外部变量,则该变量会拷贝到Worker节点的每一个Task任务中,各个Task任务对变量的操作相互独立。当变量所存储的数据量非常大时(例如一个大型集合)将增加网络传输及内存的开销。因此,Spark提供了两种共享变量:广播变量和累加器。

广播变量

广播变量是将一个变量通过广播的形式发送到每个Worker节点的缓存中,而不是发送到每个Task任务中,各个Task任务可以共享该变量的数据。因此广播变量是只读的。
1、默认情况下变量的传递
例如,map()算子中使用了外部变量arr:

val arr=Array(1,2,3,4,5);
val lines:RDD[String] = sc.textFile("file:///tmp/data.txt")
val result = lines.map(line =>
   (line, arr)
)

变量传递流程如图。
file
2、使用广播变量时变量的传递
例如,使用广播变量将数组arr传递给了map()算子:

val arr=Array(1,2,3,4,5);
val broadcastVar = sc.broadcast(arr)
val result = lines.map(line =>
   (line, broadcastVar) // broadcastVar为广播变量
)

传递流程如图。

file

在分布式函数中可以通过Broadcast对象的value方法访问
广播变量的值:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

累加器

累加器提供了将Worker节点的值聚合到Driver的功能,可以用于实现计数和求和。
例如,对一个整型数组进行求和,若不使用累加器,以下代码的输出结果不正确:

var sum=0 //在Driver中声明
val rdd=sc.makeRDD(Array(1,2,3,4,5))
rdd.foreach(x=>
  //在Executor中执行
  sum+=x
)
println(sum)//输出0

使用累加器对数组进行求和:

//声明一个累加器,默认初始值为0(只能在Driver端定义)
val myacc=sc.longAccumulator("My Accumulator")
val rdd=sc.makeRDD(Array(1,2,3,4,5))
rdd.foreach(x=>
   myacc.add(x)//向累加器中添加值
)
println(myacc.value)//输出15(只能在Driver端读取)

注意,累加器只能在Driver端定义,在Executor端更新。Executor端不能读取累加器的值,需要在Driver端使用value属性读取。

Views: 75

Index