Spark RDD 实战案例

IDEA 创建基于Maven的Spark项目

三台虚拟机搭建的集群
启动集成如下:

------------ hadoop100 ------------
26369 Master
2514 QuorumPeerMain
2898 Kafka
29011 SparkSubmit
30643 HRegionServer
3493 JobHistoryServer
3573 NodeManager
3065 NameNode
30476 HMaster
31036 Jps
28927 SparkSubmit
------------ hadoop101 ------------
5153 QuorumPeerMain
5537 Kafka
5905 ResourceManager
6018 NodeManager
104355 Jps
84258 Worker
104039 HRegionServer
5645 DataNode
104140 HMaster
------------ hadoop102 ------------
71683 Worker
5509 Kafka
104804 Jps
5752 SecondaryNameNode
5897 NodeManager
5643 DataNode
5116 QuorumPeerMain
104542 HRegionServer

本地环境由于需要打包放到虚拟机运行,因此scala版本需要和虚拟机中编译spark所用的scala的版本一致。如何知道虚拟机中spark是用的什么版本scala编译的呢?可以进入虚拟机的spark-shell查看:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.8
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_281)

可见2.4.8并不是官网所说的使用scala2.12,实际上是2.11

接下来使用IDEA创建一个MAVEN项目,使用scala模板
file
配置项目名称
file

配置有效的maven环境
file

修改pom文件:

  1. 设置scala的版本,我虚拟机里的Spark是用的scala2.11版本编译的,这里能找到最接近的可用的版本就是2.11.8。
  2. 添加maven-scala-plugin依赖,使用的是2.11的版本。
  3. 添加spark-core_2.11, 版本为2.4.8,和虚拟机安装的Spark版本保持一致
  4. 此外因为下面的示例还要操作hbase,因为又引入了hadoop和hbase相关的一些依赖,其中hadoop-common依赖排除了jackson-databind是因为出现了依赖冲突的问题。
  5. 为了演示官方示例(蒙特卡洛法求Pi值),引入了spark-sql依赖

最终完整的pom.xml文件内容如下:


    4.0.0
    cn.delucia
    SparkProject
    1.0-SNAPSHOT
    2008
    
        2.11.8
        2.4.8
        3.1.4
        2.2.3
        1.8
        1.8
        UTF-8
    

    
        
            scala-tools.org
            Scala-Tools Maven2 Repository
            http://scala-tools.org/repo-releases
        
    
    
        
            scala-tools.org
            Scala-Tools Maven2 Repository
            http://scala-tools.org/repo-releases
        
    

    
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            
                
                    com.fasterxml.jackson.core
                    jackson-databind
                
            
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        
        
            org.apache.hbase
            hbase-client
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-server
            ${hbase.version}
            
            
                
                    org.glassfish
                    javax.el
                
            
        
        
            org.apache.hbase
            hbase-mapreduce
            ${hbase.version}
        
        
            org.apache.spark
            spark-core_2.11
            ${spark.version}
        
        
            org.apache.spark
            spark-sql_2.11
            ${spark.version}
        
        
            org.scala-lang
            scala-library
            ${scala.version}
        
        
            mysql
            mysql-connector-java
            5.1.41
        
        
            junit
            junit
            4.4
            test
        
        
            org.specs
            specs
            1.2.5
            test
        
        
        
            org.scala-tools
            maven-scala-plugin
            2.15.2
        
    

    
        src/main/scala
        src/test/scala
        
            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        
                            compile
                            testCompile
                        
                    
                
                
                    ${scala.version}
                    
                        -target:jvm-1.8
                    
                
            
            
                org.apache.maven.plugins
                maven-eclipse-plugin
                2.5.1
                
                    true
                    
                        ch.epfl.lamp.sdt.core.scalabuilder
                    
                    
                        ch.epfl.lamp.sdt.core.scalanature
                    
                    
                        org.eclipse.jdt.launching.JRE_CONTAINER
                        ch.epfl.lamp.sdt.launching.SCALA_CONTAINER
                    
                
            
        
    
    
        
            
                org.scala-tools
                maven-scala-plugin
                
                    ${scala.version}
                
            
        
    

项目右键选择 maven->Reimport 等待依赖下载完成。
你会发现test包下面有一些自动生成的源文件有错误(依赖版本问题),这里直接把报错的文件删除,只保留一个AppTest.scala文件,可以点击测试以下环境,main包下面自动生成的文件也是直接删除即可。:

file

给项目根目录下创建data和output文件夹,最终项目结构:

file

实现单词计数

在data下面创建文本文件words.txt,内容如下

hello hadoop
hello java
scala

本地模式运行

package cn.delucia.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark RDD 单词计数程序
  * Program Args: data\words.txt output\wc
  * 注意:需提前删除output\wc文件夹
  */
object WordCountLocal {

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val conf = new SparkConf()
    //设置应用程序名称,可以在Spark WebUI中显示
    conf.setAppName("WordCountLocal")

    //设置集群Master节点访问地址
    //conf.setMaster("spark://hadoop100:7077"); //远程-提交到集群(standalone模式)
    conf.setMaster("local[2]")  // 本地模式 - 2个CPU核心

    //创建SparkContext对象,该对象是提交Spark应用程序的入口
    val sc = new SparkContext(conf);

    //读取指定路径(取程序执行时传入的第一个参数)中的文件内容,生成一个RDD集合
    val linesRDD:RDD[String] = sc.textFile(args(0))
    //将RDD数据按照空格进行切分并合并为一个新的RDD
    val wordsRDD:RDD[String] = linesRDD.flatMap(_.split(" "))
    //将RDD中的每个单词和数字1放到一个元组里,即(word,1)
    val paresRDD:RDD[(String, Int)] = wordsRDD.map((_,1))
    //对单词根据key进行聚合,对相同的key进行value的累加
    val wordCountsRDD:RDD[(String, Int)] = paresRDD.reduceByKey(_+_)
    //按照单词数量降序排列
    val wordCountsSortRDD:RDD[(String, Int)] = wordCountsRDD.sortBy(_._2,false)
    //保存结果到指定的路径(取程序执行时传入的第二个参数)
    wordCountsSortRDD.saveAsTextFile(args(1))
    //停止SparkContext,结束该任务
    sc.stop()
  }
}

直接点击运行,由于是两个核心,结果有两个文件生成
file

上传到集群运行 - Spark on YARN 模式

前提 集群已经安装好Hadoop集群并运行DFS和YARN
Spark已经配置好Spark on YARN的相关设置

编写程序

package cn.delucia.spark.rdd

import org.apache.spark.{SparkConf, SparkContext}

/**
 Spark RDD单词计数程序 - 打jar包上传到集群 Spark on YARN 模式
*/
object WordCountCluster {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("WordCountCluster")
    conf.setMaster("spark://hadoop100:7077")
    val sc = new SparkContext(conf)
    sc.textFile(args(0))
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)
      .saveAsTextFile(args(1))

    sc.stop()
  }
}

使用maven生命周期插件clean然后install打成jar包上传到集群的/opt/data目录下,将jar包改名位WordCountCluster.jar,进入spark的安装目录执行命令:

bin/spark-submit
--master yarn
--class cn.delucia.spark.rdd.WordCountCluster /opt/data/WordCountCluster.jar hdfs://hadoop100:8020/tmp/words.txt hdfs://hadoop100:8020/tmp/output/wc_cluster

即可把任务提交到YARN集群运行,可以去output目录可以查看结果
file

有两个文件生成说明有两个分区,因为设置了2个核心

求平均成绩

data下创建score.txt

Andy,98
Jack,87
Bill,99
Andy,78
Jack,85
Bill,86
Andy,90
Jack,88
Bill,76
Andy,58
Jack,67
Bill,79

编写程序:

package cn.delucia.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 求成绩平均分
  */
object AverageScoreLocal {
   def main(args: Array[String]): Unit = {
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      //设置应用程序名称,可以在Spark WebUI中显示
      conf.setAppName("AverageScore")
      //设置集群Master节点访问地址
      conf.setMaster("local")

      val sc = new SparkContext(conf)
      //1. 加载数据
      val linesRDD: RDD[String] = sc.textFile("data/avg_score.txt")
      //2. 将RDD中的元素转为(key,value)形式,便于后面进行聚合
      val tupleRDD: RDD[(String, Int)] = linesRDD.map(line => {
         val name = line.split("\t")(0)//姓名
         val score = line.split("\t")(1).toInt//成绩
         (name, score)
      })
      //3. 根据姓名进行分组,形成新的RDD
      val groupedRDD: RDD[(String, Iterable[Int])] = tupleRDD.groupByKey()
      //4. 迭代计算RDD中每个学生的平均分
      val resultRDD: RDD[(String, Int)] = groupedRDD.map(line => {
         val name = line._1//姓名
         val iteratorScore: Iterator[Int] = line._2.iterator//成绩迭代器
         var sum = 0//总分
         var count = 0//科目数量

         //迭代累加所有科目成绩
         while (iteratorScore.hasNext) {
            val score = iteratorScore.next()
            sum += score
            count += 1
         }
         //计算平均分
         val averageScore = sum / count
         (name, averageScore)//返回(姓名,平均分)形式的元组
      })
      //保存结果
      resultRDD.saveAsTextFile("output/avg_score")
   }
}

输出结果:

(BETA,45)
(Bill,85)
(Andy,81)
(Jack,81)

统计学生最好的三次成绩并倒序排列

package cn.delucia.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Spark分组取TopN程序
  */
object GroupTopNLocal {
   def main(args: Array[String]): Unit = {
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      //设置应用程序名称,可以在Spark WebUI中显示
      conf.setAppName("RDDGroupTopN")
      //设置集群Master节点访问地址,此处为本地模式,使用尽可能多的核心
      conf.setMaster("local[*]")

      val sc = new SparkContext(conf)
      //1. 加载本地数据
      val linesRDD: RDD[String] = sc.textFile("data/score.txt")

      //2. 将RDD元素转为(String,Int)形式的元组
      val tupleRDD:RDD[(String,Int)]=linesRDD.map(line=>{
         val name=line.split(",")(0)
         val score=line.split(",")(1)
         (name,score.toInt)
      })

      //3. 按照key(姓名)进行分组
      val top3=tupleRDD.groupByKey().map(groupedData=>{
         val name:String=groupedData._1
         //每一组的成绩降序后取前3个
         val scoreTop3:List[Int]=groupedData._2
           .toList.sortWith(_>_).take(3)
         (name,scoreTop3)//返回元组
      })

      //当使用多核心时,分区排序完就各自并行输出结果了,导致输出内容互相干扰
      //这里先调用count,由于count属于Action算子,会触发前面的计算完成
      //这样等top3里面的所有分区都计算完毕再输出,可以让输出格式更好
      top3.count

      //4. 循环打印分组结果
      top3.foreach(tuple=>{
         println("姓名:"+tuple._1)
         val tupleValue=tuple._2.iterator
         while (tupleValue.hasNext){
            val value=tupleValue.next()
            println("成绩:"+value)
         }
         println("*******************")
      })
   }
}

控制台输出:

姓名:Andy
成绩:98
成绩:90
成绩:78
*******************
姓名:Bill
成绩:99
成绩:86
成绩:79
*******************
姓名:Jack
成绩:88
成绩:87
成绩:85
*******************

倒排索引统计每日新增用户

package cn.delucia.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark RDD统计每日新增用户
  */
object DayNewUserLocal {
   def main(args: Array[String]): Unit = {
      val conf = new SparkConf()
      conf.setAppName("DayNewUser")
      conf.setMaster("local[*]")

      val sc = new SparkContext(conf)
      //1. 构建测试数据
      val tupleRDD:RDD[(String,String)] = sc.parallelize(
         Array(
            ("2020-01-01", "user1"),
            ("2020-01-01", "user2"),
            ("2020-01-01", "user3"),
            ("2020-01-02", "user1"),
            ("2020-01-02", "user2"),
            ("2020-01-02", "user4"),
            ("2020-01-03", "user2"),
            ("2020-01-03", "user5"),
            ("2020-01-03", "user6")
         )
      )
      //2. 倒排(互换RDD中元组的元素顺序)
      val tupleRDD2:RDD[(String,String)] = tupleRDD.map(
      line => (line._2, line._1)
      )
      //3. 将倒排后的RDD按照key分组
      val groupedRDD: RDD[(String, Iterable[String])] = tupleRDD2.groupByKey()
      //4. 取分组后的每个日期集合中的最小日期,并计数为1  (只有第一次登陆才算做当日新增用户)
      val dateRDD:RDD[(String,Int)] = groupedRDD.map(
         line => (line._2.min, 1)
      )
      //5. 计算所有相同key(即日期)的数量
      val resultMap: collection.Map[String, Long] = dateRDD.countByKey()
      //将结果Map循环打印到控制台
      resultMap.foreach(println)
   }

}

结果:

(2020-01-01,3)
(2020-01-02,1)
(2020-01-03,2)

自定义排序规则(二次排序)

先准备数组,data下创建文本文件

2 98
1 99
2 67
3 75
3 88
2 85
1 90
3 100
1 62

先根据第一列升序排列,如果相同再根据第二列降序排列

package cn.delucia.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 二次排序自定义key类
 * 先根据名字升序排列,如果名字相同再根据成绩降序排列
 * @param first  每一行的第一个字段
 * @param second 每一行的第二个字段
 */
class SecondSortKey(val first: Int, val second: Int)
  extends Ordered[SecondSortKey] with Serializable {
  override def toString: String = first + "->" + second;

  /**
   * 实现compare()方法
   */
  override def compare(that: SecondSortKey): Int = {
    //若第一个字段不相等,按照第一个字段升序排列
    if (this.first - that.first != 0) {
      this.first - that.first
    } else { //否则按照第二个字段降序排列
      that.second - this.second
    }
  }
}

/**
 * 二次排序运行主类
 */
object SecondSort {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val conf = new SparkConf()
    //设置应用程序名称,可以在Spark WebUI中显示
    conf.setAppName("SecondSort")
    //设置集群Master节点访问地址,此处为本地模式
    conf.setMaster("local")
    //创建SparkContext对象,该对象是提交Spark应用程序的入口
    val sc = new SparkContext(conf);

    //1. 读取指定路径的文件内容,生成一个RDD集合
    val lines: RDD[String] = sc.textFile("data\\sort.txt")
    //2. 将RDD中的元素转为(SecondSortKey, String)形式的元组
    val pair: RDD[(SecondSortKey, String)] = lines.map(line => (
      new SecondSortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),
      line)
    )
    //3. 按照元组的key(SecondSortKey的实例)进行排序
    val pairSort: RDD[(SecondSortKey, String)] = pair.sortByKey()
    //取排序后的元组中的第二个值(value值)
    //val result: RDD[String] = pairSort.map(line => line._2)
    //打印最终结果
    pairSort.foreach(line => println(line))
  }
}

控制台输出结果:

(1->99,1 99)
(1->90,1 90)
(1->62,1 62)
(2->98,2 98)
(2->85,2 85)
(2->67,2 67)
(3->100,3 100)
(3->88,3 88)
(3->75,3 75)

读写HBase数据库

HBase是Spark应用程序经常打交道的一个数据源。下面使用Spark程序对其进行读写操作:

使用HBase API向HBase写入数据

package cn.delucia.spark.rdd

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.ConnectionFactory
/**
  * 向HBase表写入数据 - hbase API
  */
object SparkWriteHBase {
   def main(args: Array[String]): Unit = {
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      conf.setAppName("SparkWriteHBase")
      conf.setMaster("local[*]")
      //创建SparkContext对象
      val sc = new SparkContext(conf)

      //1. 构建需要添加的数据RDD
      val initRDD = sc.makeRDD(
         Array(
            "003,王五,山东,23",
            "004,赵六,河北,20"
         )
      )

      //2. 循环RDD的每个分区
      initRDD.foreachPartition(partition=> {
         //2.1 设置HBase配置信息
         val hbaseConf = HBaseConfiguration.create()
         //设置ZooKeeper集群地址
         hbaseConf.set("hbase.zookeeper.quorum","hadoop100")
         //设置ZooKeeper连接端口,默认2181
         hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
         //创建数据库连接对象
         val conn = ConnectionFactory.createConnection(hbaseConf)
         //指定表名
         val tableName = TableName.valueOf("student")
         //获取需要添加数据的Table对象
         val table = conn.getTable(tableName)

         //2.2 循环当前分区的每行数据
         partition.foreach(line => {
            //分割每行数据,获取要添加的每个值
            val arr = line.split(",")
            val rowkey = arr(0)
            val name = arr(1)
            val address = arr(2)
            val age = arr(3)

            //创建Put对象
            val put = new Put(Bytes.toBytes(rowkey))
            put.addColumn(
               Bytes.toBytes("info"),//列族名
               Bytes.toBytes("name"),//列名
               Bytes.toBytes(name)//列值
            )
            put.addColumn(
               Bytes.toBytes("info"),//列族名
               Bytes.toBytes("address"),//列名
               Bytes.toBytes(address)) //列值
            put.addColumn(
               Bytes.toBytes("info"),//列族名
               Bytes.toBytes("age"),//列名
               Bytes.toBytes(age)) //列值

            //执行添加
            table.put(put)
         })
      })

   }
}

使用Spark API向HBase写入数据

package cn.delucia.spark.rdd

import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  * 向HBase表写入数据 使用 Spark API(saveAsHadoopDataset方法)
  */
object SparkWriteHBase2 {
   def main(args: Array[String]): Unit = {
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      conf.setAppName("SparkWriteHBase2")
      conf.setMaster("local[*]")
      //创建SparkContext对象
      val sc = new SparkContext(conf)

      //1. 设置配置信息
      //创建Hadoop JobConf对象
      val jobConf = new JobConf()
      //设置ZooKeeper集群地址
      jobConf.set("hbase.zookeeper.quorum","hadoop100")
      //设置ZooKeeper连接端口,默认2181
      jobConf.set("hbase.zookeeper.property.clientPort", "2181")
      //指定输出格式
      jobConf.setOutputFormat(classOf[TableOutputFormat])
      //指定表名
      jobConf.set(TableOutputFormat.OUTPUT_TABLE,"student")

      //2. 构建需要写入的RDD数据
      val initRDD = sc.makeRDD(
         Array(
            "005,王五,山东,23",
            "006,赵六,河北,20"
         )
      )

      //将RDD转换为(ImmutableBytesWritable, Put)类型
      val resultRDD: RDD[(ImmutableBytesWritable, Put)] = initRDD.map(
         _.split(",")
      ).map(arr => {
         val rowkey = arr(0)
         val name = arr(1)//姓名
         val address = arr(2)//地址
         val age = arr(3)//年龄

         //创建Put对象
         val put = new Put(Bytes.toBytes(rowkey))
         put.addColumn(
            Bytes.toBytes("info"),//列族
            Bytes.toBytes("name"),//列名
            Bytes.toBytes(name)//列值
         )
         put.addColumn(
            Bytes.toBytes("info"),//列族
            Bytes.toBytes("address"),//列名
            Bytes.toBytes(address)) //列值
         put.addColumn(
            Bytes.toBytes("info"),//列族
            Bytes.toBytes("age"),//列名
            Bytes.toBytes(age)) //列值

         //拼接为元组返回
         (new ImmutableBytesWritable, put)
      })

      //3. 写入数据
      resultRDD.saveAsHadoopDataset(jobConf)
      sc.stop()
   }
}

批量向Hbase写入数据

使用BulkLoader加载HFile数据到HBase:
参考阅读:
https://blog.cloudera.com/how-to-use-hbase-bulk-loading-and-why/
https://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/

package cn.delucia.spark.rdd

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark 批量写入数据到HBase(使用BulkLoader加载HFile数据到HBase)
  */
object SparkWriteHBase3 {
   def main(args: Array[String]): Unit = {
      System.setProperty("HADOOP_USER_NAME", "hadoop")
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      conf.setAppName("SparkWriteHBase3")
      conf.setMaster("local[*]")
      //创建SparkContext对象
      val sc = new SparkContext(conf)

      //1. 设置HDFS和HBase配置信息
      val hadoopConf = new Configuration()
      hadoopConf.set("fs.defaultFS", "hdfs://hadoop100:8020")
      val fileSystem = FileSystem.get(hadoopConf)
      val hbaseConf = HBaseConfiguration.create(hadoopConf)
      //设置ZooKeeper集群地址
      hbaseConf.set("hbase.zookeeper.quorum", "hadoop100")
      //设置ZooKeeper连接端口,默认2181
      hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
      //创建数据库连接对象
      val conn = ConnectionFactory.createConnection(hbaseConf)
      //指定表名
      val tableName = TableName.valueOf("student")
      //获取需要添加数据的Table对象
      val table = conn.getTable(tableName)
      //获取操作数据库的Admin对象
      val admin = conn.getAdmin()

      //2. 添加数据前的判断
      //如果HBase表不存在,则创建一个新表
      if (!admin.tableExists(tableName)) {
         val desc = new HTableDescriptor(tableName)
         //表名
         val hcd = new HColumnDescriptor("info") //列族
         desc.addFamily(hcd)
         admin.createTable(desc) //创建表
      }
      //如果存放HFile文件的HDFS目录已经存在,则删除
      if (fileSystem.exists(new Path("hdfs://hadoop100:8020/tmp/hbase"))) {
         fileSystem.delete(new Path("hdfs://hadoop100:8020/tmp/hbase"), true)
      }

      //3. 构建需要添加的RDD数据
      //初始数据
      val initRDD = sc.makeRDD(
         Array(
            "rowkey:007,name:王五",
            "rowkey:007,address:山东",
            "rowkey:007,age:23",
            "rowkey:008,name:赵六",
            "rowkey:008,address:河北",
            "rowkey:008,age:20"
         )
      )
      //数据转换
      //转换为(ImmutableBytesWritable, KeyValue)类型的RDD
      val resultRDD: RDD[(ImmutableBytesWritable, KeyValue)] = initRDD.map(
         _.split(",")
      ).map(arr => {
         val rowkey = arr(0).split(":")(1)
         //rowkey
         val qualifier = arr(1).split(":")(0)
         //列名
         val value = arr(1).split(":")(1) //列值

         val kv = new KeyValue(
            Bytes.toBytes(rowkey),
            Bytes.toBytes("info"),
            Bytes.toBytes(qualifier),
            Bytes.toBytes(value)
         )
         //构建(ImmutableBytesWritable, KeyValue)类型的元组返回
         (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), kv)
      })

      //4. 写入数据
      //在HDFS中生成HFile文件
      hbaseConf.set("hbase.mapreduce.hfileoutputformat.table.name",tableName.getNameAsString)
      resultRDD.saveAsNewAPIHadoopFile(
         "hdfs://hadoop100:8020/tmp/hbase",
         classOf[ImmutableBytesWritable], //对应RDD元素中的key
         classOf[KeyValue], //对应RDD元素中的value
         classOf[HFileOutputFormat2],
         hbaseConf
      )
      //加载HFile文件到HBase
      val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
      val regionLocator = conn.getRegionLocator(tableName)
      bulkLoader.doBulkLoad(
         new Path("hdfs://hadoop100:8020/tmp/hbase"), //HFile文件位置
         admin, //操作HBase数据库的Admin对象
         table, //目标Table对象(包含表名)
         regionLocator //RegionLocator对象,用于查看单个HBase表的区域位置信息
      )
      sc.stop()
   }
}

读取Hbase数据

前面学了三钟方法向HBase数据库插入数据,并且一共向Hbase的student表写入了6条记录,下面我们写一个程序从HBase中把它们读取出来

package cn.delucia.spark.rdd

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
/**
  * Spark读取HBase表数据
  */
object SparkReadHBase {
   def main(args: Array[String]): Unit = {
      //创建SparkConf对象,存储应用程序的配置信息
      val conf = new SparkConf()
      conf.setAppName("SparkReadHBase")
      conf.setMaster("local[*]")
      //创建SparkContext对象
      val sc = new SparkContext(conf)

      //1. 设置HBase配置信息
      val hbaseConf = HBaseConfiguration.create()
      //设置ZooKeeper集群地址
      hbaseConf.set("hbase.zookeeper.quorum","hadoop100")
      //设置ZooKeeper连接端口,默认2181
      hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
      //指定表名
      hbaseConf.set(TableInputFormat.INPUT_TABLE, "student")

      //2. 读取HBase表数据并转化成RDD
      val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
         hbaseConf,
         classOf[TableInputFormat],
         classOf[ImmutableBytesWritable],
         classOf[Result]
      )

      //3. 输出RDD中的数据到控制台
      hbaseRDD.foreach{ case (_ ,result) =>
         //获取行键
         val key = Bytes.toString(result.getRow)
         //通过列族和列名获取列值
         val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
         val gender = Bytes.toString(result.getValue("info".getBytes,"address".getBytes))
         val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
         println("行键:"+key+"\t姓名:"+name+"\t地址:"+gender+"\t年龄:"+age)
      }
   }
}

输出

行键:003  姓名:王五   地址:山东   年龄:23
行键:004  姓名:赵六   地址:河北   年龄:20
行键:005  姓名:王五   地址:山东   年龄:23
行键:006  姓名:赵六   地址:河北   年龄:20
行键:007  姓名:王五   地址:山东   年龄:23
行键:008  姓名:赵六   地址:河北   年龄:20

解决数据倾斜问题

避免数据倾斜的办法还有很多,比如:

  1. 对数据进行预处理
  2. 过滤掉没有意义的数据
  3. 提高shuffle的并行度,增加分区数量
    但这不能解决大量相同key导致的单个分区数据过多的问题

对于存在大量相同的key导致数据集中在某个分区的问题,这里给出一个解决办法:

添加随机前缀进行双重集合:

  1. 首先给数据添加随机前缀,进行分区内的局部聚合
  2. 然后去除随机前缀,进行全局聚合
package cn.delucia.spark.rdd

import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random

/**
  * Spark RDD解决数据倾斜案例 - 由于存在大量相同的key导致数据集中在某个分区
 *  解决办法:添加随机前缀进行双重集合)
  * 1. 首先给数据添加随机前缀,进行分区内的局部聚合
  * 2. 然后去除随机前缀,进行全局聚合
  */
object DataLean {
   def main(args: Array[String]): Unit = {
      //创建Spark配置对象
      val conf = new SparkConf();
      conf.setAppName("DataLean")
      conf.setMaster("local[*]")

      //创建SparkContext对象
      val sc = new SparkContext(conf)

      //1. 读取测试数据
      val linesRDD = sc.textFile("data/data.txt")
      //2. 统计单词数量
      linesRDD
        .flatMap(_.split(" "))
        .map((_, 1))
        .map(t => {
           val word = t._1
           val random = Random.nextInt(100)//产生0~99的随机数
           //单词加入随机数前缀,格式:(前缀_单词,数量)
           (random + "_" + word, 1)
        })
        .reduceByKey(_ + _)//局部聚合
        .map(t => {
         val word = t._1
         val count = t._2
         val w = word.split("_")(1)//去除前缀
         //单词去除随机数前缀,格式:(单词,数量)
         (w, count)
      })
        .reduceByKey(_ + _)//全局聚合
        //输出结果到指定的HDFS目录
        .saveAsTextFile("output/data_lean")
   }
}

Views: 79