IDEA 创建基于Maven的Spark项目
三台虚拟机搭建的集群
启动集成如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
------------ hadoop100 ------------ 26369 Master 2514 QuorumPeerMain 2898 Kafka 29011 SparkSubmit 30643 HRegionServer 3493 JobHistoryServer 3573 NodeManager 3065 NameNode 30476 HMaster 31036 Jps 28927 SparkSubmit ------------ hadoop101 ------------ 5153 QuorumPeerMain 5537 Kafka 5905 ResourceManager 6018 NodeManager 104355 Jps 84258 Worker 104039 HRegionServer 5645 DataNode 104140 HMaster ------------ hadoop102 ------------ 71683 Worker 5509 Kafka 104804 Jps 5752 SecondaryNameNode 5897 NodeManager 5643 DataNode 5116 QuorumPeerMain 104542 HRegionServer |
本地环境由于需要打包放到虚拟机运行,因此scala版本需要和虚拟机中编译spark所用的scala的版本一致。如何知道虚拟机中spark是用的什么版本scala编译的呢?可以进入虚拟机的spark-shell查看:
1 2 3 4 5 6 7 8 |
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.8 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_281) |
可见2.4.8并不是官网所说的使用scala2.12,实际上是2.11
接下来使用IDEA创建一个MAVEN项目,使用scala模板
配置项目名称
配置有效的maven环境
修改pom文件:
- 设置scala的版本,我虚拟机里的Spark是用的scala2.11版本编译的,这里能找到最接近的可用的版本就是2.11.8。
- 添加maven-scala-plugin依赖,使用的是2.11的版本。
- 添加spark-core_2.11, 版本为2.4.8,和虚拟机安装的Spark版本保持一致
- 此外因为下面的示例还要操作hbase,因为又引入了hadoop和hbase相关的一些依赖,其中hadoop-common依赖排除了jackson-databind是因为出现了依赖冲突的问题。
- 为了演示官方示例(蒙特卡洛法求Pi值),引入了spark-sql依赖
最终完整的pom.xml
文件内容如下:
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.delucia</groupId> <artifactId>SparkProject</artifactId> <version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.11.8</scala.version> <spark.version>2.4.8</spark.version> <hadoop.version>3.1.4</hadoop.version> <hbase.version>2.2.3</hbase.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> <!-- 解决打包错误:Failure to find org.glassfish:javax.el:pom:3.0.1-b08-SNAPSHOT --> <exclusions> <exclusion> <groupId>org.glassfish</groupId> <artifactId>javax.el</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>{scala.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.41</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs</groupId> <artifactId>specs</artifactId> <version>1.2.5</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-tools/maven-scala-plugin --> <dependency> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.8</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.5.1</version> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project> |
项目右键选择 maven->Reimport 等待依赖下载完成。
你会发现test包下面有一些自动生成的源文件有错误(依赖版本问题),这里直接把报错的文件删除,只保留一个AppTest.scala文件,可以点击测试以下环境,main包下面自动生成的文件也是直接删除即可。:
给项目根目录下创建data和output文件夹,最终项目结构:
实现单词计数
在data下面创建文本文件words.txt,内容如下
1 2 3 |
hello hadoop hello java scala |
本地模式运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package cn.delucia.spark.rdd import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Spark RDD 单词计数程序 * Program Args: data\words.txt output\wc * 注意:需提前删除output\wc文件夹 */ object WordCountLocal { def main(args: Array[String]): Unit = { //创建SparkConf对象 val conf = new SparkConf() //设置应用程序名称,可以在Spark WebUI中显示 conf.setAppName("WordCountLocal") //设置集群Master节点访问地址 //conf.setMaster("spark://hadoop100:7077"); //远程-提交到集群(standalone模式) conf.setMaster("local[2]") // 本地模式 - 2个CPU核心 //创建SparkContext对象,该对象是提交Spark应用程序的入口 val sc = new SparkContext(conf); //读取指定路径(取程序执行时传入的第一个参数)中的文件内容,生成一个RDD集合 val linesRDD:RDD[String] = sc.textFile(args(0)) //将RDD数据按照空格进行切分并合并为一个新的RDD val wordsRDD:RDD[String] = linesRDD.flatMap(_.split(" ")) //将RDD中的每个单词和数字1放到一个元组里,即(word,1) val paresRDD:RDD[(String, Int)] = wordsRDD.map((_,1)) //对单词根据key进行聚合,对相同的key进行value的累加 val wordCountsRDD:RDD[(String, Int)] = paresRDD.reduceByKey(_+_) //按照单词数量降序排列 val wordCountsSortRDD:RDD[(String, Int)] = wordCountsRDD.sortBy(_._2,false) //保存结果到指定的路径(取程序执行时传入的第二个参数) wordCountsSortRDD.saveAsTextFile(args(1)) //停止SparkContext,结束该任务 sc.stop() } } |
直接点击运行,由于是两个核心,结果有两个文件生成
上传到集群运行 – Spark on YARN 模式
前提 集群已经安装好Hadoop集群并运行DFS和YARN
Spark已经配置好Spark on YARN的相关设置
编写程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
package cn.delucia.spark.rdd import org.apache.spark.{SparkConf, SparkContext} /** Spark RDD单词计数程序 - 打jar包上传到集群 Spark on YARN 模式 */ object WordCountCluster { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("WordCountCluster") conf.setMaster("spark://hadoop100:7077") val sc = new SparkContext(conf) sc.textFile(args(0)) .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .sortBy(_._2, ascending = false) .saveAsTextFile(args(1)) sc.stop() } } |
使用maven生命周期插件clean然后install打成jar包上传到集群的/opt/data目录下,将jar包改名位WordCountCluster.jar,进入spark的安装目录执行命令:
1 2 3 |
bin/spark-submit --master yarn --class cn.delucia.spark.rdd.WordCountCluster /opt/data/WordCountCluster.jar hdfs://hadoop100:8020/tmp/words.txt hdfs://hadoop100:8020/tmp/output/wc_cluster |
即可把任务提交到YARN集群运行,可以去output目录可以查看结果
有两个文件生成说明有两个分区,因为设置了2个核心
求平均成绩
data下创建score.txt
1 2 3 4 5 6 7 8 9 10 11 12 |
Andy,98 Jack,87 Bill,99 Andy,78 Jack,85 Bill,86 Andy,90 Jack,88 Bill,76 Andy,58 Jack,67 Bill,79 |
编写程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
package cn.delucia.spark.rdd import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 求成绩平均分 */ object AverageScoreLocal { def main(args: Array[String]): Unit = { //创建SparkConf对象,存储应用程序的配置信息 val conf = new SparkConf() //设置应用程序名称,可以在Spark WebUI中显示 conf.setAppName("AverageScore") //设置集群Master节点访问地址 conf.setMaster("local") val sc = new SparkContext(conf) //1. 加载数据 val linesRDD: RDD[String] = sc.textFile("data/avg_score.txt") //2. 将RDD中的元素转为(key,value)形式,便于后面进行聚合 val tupleRDD: RDD[(String, Int)] = linesRDD.map(line => { val name = line.split("\t")(0)//姓名 val score = line.split("\t")(1).toInt//成绩 (name, score) }) //3. 根据姓名进行分组,形成新的RDD val groupedRDD: RDD[(String, Iterable[Int])] = tupleRDD.groupByKey() //4. 迭代计算RDD中每个学生的平均分 val resultRDD: RDD[(String, Int)] = groupedRDD.map(line => { val name = line._1//姓名 val iteratorScore: Iterator[Int] = line._2.iterator//成绩迭代器 var sum = 0//总分 var count = 0//科目数量 //迭代累加所有科目成绩 while (iteratorScore.hasNext) { val score = iteratorScore.next() sum += score count += 1 } //计算平均分 val averageScore = sum / count (name, averageScore)//返回(姓名,平均分)形式的元组 }) //保存结果 resultRDD.saveAsTextFile("output/avg_score") } } |
输出结果:
1 2 3 4 |
(BETA,45) (Bill,85) (Andy,81) (Jack,81) |
统计学生最好的三次成绩并倒序排列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
package cn.delucia.spark.rdd import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Spark分组取TopN程序 */ object GroupTopNLocal { def main(args: Array[String]): Unit = { //创建SparkConf对象,存储应用程序的配置信息 val conf = new SparkConf() //设置应用程序名称,可以在Spark WebUI中显示 conf.setAppName("RDDGroupTopN") //设置集群Master节点访问地址,此处为本地模式,使用尽可能多的核心 conf.setMaster("local[*]") val sc = new SparkContext(conf) //1. 加载本地数据 val linesRDD: RDD[String] = sc.textFile("data/score.txt") //2. 将RDD元素转为(String,Int)形式的元组 val tupleRDD:RDD[(String,Int)]=linesRDD.map(line=>{ val name=line.split(",")(0) val score=line.split(",")(1) (name,score.toInt) }) //3. 按照key(姓名)进行分组 val top3=tupleRDD.groupByKey().map(groupedData=>{ val name:String=groupedData._1 //每一组的成绩降序后取前3个 val scoreTop3:List[Int]=groupedData._2 .toList.sortWith(_>_).take(3) (name,scoreTop3)//返回元组 }) //当使用多核心时,分区排序完就各自并行输出结果了,导致输出内容互相干扰 //这里先调用count,由于count属于Action算子,会触发前面的计算完成 //这样等top3里面的所有分区都计算完毕再输出,可以让输出格式更好 top3.count //4. 循环打印分组结果 top3.foreach(tuple=>{ println("姓名:"+tuple._1) val tupleValue=tuple._2.iterator while (tupleValue.hasNext){ val value=tupleValue.next() println("成绩:"+value) } println("*******************") }) } } |
控制台输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
姓名:Andy 成绩:98 成绩:90 成绩:78 ******************* 姓名:Bill 成绩:99 成绩:86 成绩:79 ******************* 姓名:Jack 成绩:88 成绩:87 成绩:85 ******************* |
倒排索引统计每日新增用户
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package cn.delucia.spark.rdd import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Spark RDD统计每日新增用户 */ object DayNewUserLocal { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("DayNewUser") conf.setMaster("local[*]") val sc = new SparkContext(conf) //1. 构建测试数据 val tupleRDD:RDD[(String,String)] = sc.parallelize( Array( ("2020-01-01", "user1"), ("2020-01-01", "user2"), ("2020-01-01", "user3"), ("2020-01-02", "user1"), ("2020-01-02", "user2"), ("2020-01-02", "user4"), ("2020-01-03", "user2"), ("2020-01-03", "user5"), ("2020-01-03", "user6") ) ) //2. 倒排(互换RDD中元组的元素顺序) val tupleRDD2:RDD[(String,String)] = tupleRDD.map( line => (line._2, line._1) ) //3. 将倒排后的RDD按照key分组 val groupedRDD: RDD[(String, Iterable[String])] = tupleRDD2.groupByKey() //4. 取分组后的每个日期集合中的最小日期,并计数为1 (只有第一次登陆才算做当日新增用户) val dateRDD:RDD[(String,Int)] = groupedRDD.map( line => (line._2.min, 1) ) //5. 计算所有相同key(即日期)的数量 val resultMap: collection.Map[String, Long] = dateRDD.countByKey() //将结果Map循环打印到控制台 resultMap.foreach(println) } } |
结果:
1 2 3 |
(2020-01-01,3) (2020-01-02,1) (2020-01-03,2) |
自定义排序规则(二次排序)
先准备数组,data下创建文本文件
1 2 3 4 5 6 7 8 9 |
2 98 1 99 2 67 3 75 3 88 2 85 1 90 3 100 1 62 |
先根据第一列升序排列,如果相同再根据第二列降序排列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
package cn.delucia.spark.rdd import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 二次排序自定义key类 * 先根据名字升序排列,如果名字相同再根据成绩降序排列 * @param first 每一行的第一个字段 * @param second 每一行的第二个字段 */ class SecondSortKey(val first: Int, val second: Int) extends Ordered[SecondSortKey] with Serializable { override def toString: String = first + "->" + second; /** * 实现compare()方法 */ override def compare(that: SecondSortKey): Int = { //若第一个字段不相等,按照第一个字段升序排列 if (this.first - that.first != 0) { this.first - that.first } else { //否则按照第二个字段降序排列 that.second - this.second } } } /** * 二次排序运行主类 */ object SecondSort { def main(args: Array[String]): Unit = { //创建SparkConf对象 val conf = new SparkConf() //设置应用程序名称,可以在Spark WebUI中显示 conf.setAppName("SecondSort") //设置集群Master节点访问地址,此处为本地模式 conf.setMaster("local") //创建SparkContext对象,该对象是提交Spark应用程序的入口 val sc = new SparkContext(conf); //1. 读取指定路径的文件内容,生成一个RDD集合 val lines: RDD[String] = sc.textFile("data\\sort.txt") //2. 将RDD中的元素转为(SecondSortKey, String)形式的元组 val pair: RDD[(SecondSortKey, String)] = lines.map(line => ( new SecondSortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt), line) ) //3. 按照元组的key(SecondSortKey的实例)进行排序 val pairSort: RDD[(SecondSortKey, String)] = pair.sortByKey() //取排序后的元组中的第二个值(value值) //val result: RDD[String] = pairSort.map(line => line._2) //打印最终结果 pairSort.foreach(line => println(line)) } } |
控制台输出结果:
1 2 3 4 5 6 7 8 9 |
(1->99,1 99) (1->90,1 90) (1->62,1 62) (2->98,2 98) (2->85,2 85) (2->67,2 67) (3->100,3 100) (3->88,3 88) (3->75,3 75) |
读写HBase数据库
HBase是Spark应用程序经常打交道的一个数据源。下面使用Spark程序对其进行读写操作:
使用HBase API向HBase写入数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
package cn.delucia.spark.rdd import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.ConnectionFactory /** * 向HBase表写入数据 - hbase API */ object SparkWriteHBase { def main(args: Array[String]): Unit = { //创建SparkConf对象,存储应用程序的配置信息 val conf = new SparkConf() conf.setAppName("SparkWriteHBase") conf.setMaster("local[*]") //创建SparkContext对象 val sc = new SparkContext(conf) //1. 构建需要添加的数据RDD val initRDD = sc.makeRDD( Array( "003,王五,山东,23", "004,赵六,河北,20" ) ) //2. 循环RDD的每个分区 initRDD.foreachPartition(partition=> { //2.1 设置HBase配置信息 val hbaseConf = HBaseConfiguration.create() //设置ZooKeeper集群地址 hbaseConf.set("hbase.zookeeper.quorum","hadoop100") //设置ZooKeeper连接端口,默认2181 hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //创建数据库连接对象 val conn = ConnectionFactory.createConnection(hbaseConf) //指定表名 val tableName = TableName.valueOf("student") //获取需要添加数据的Table对象 val table = conn.getTable(tableName) //2.2 循环当前分区的每行数据 partition.foreach(line => { //分割每行数据,获取要添加的每个值 val arr = line.split(",") val rowkey = arr(0) val name = arr(1) val address = arr(2) val age = arr(3) //创建Put对象 val put = new Put(Bytes.toBytes(rowkey)) put.addColumn( Bytes.toBytes("info"),//列族名 Bytes.toBytes("name"),//列名 Bytes.toBytes(name)//列值 ) put.addColumn( Bytes.toBytes("info"),//列族名 Bytes.toBytes("address"),//列名 Bytes.toBytes(address)) //列值 put.addColumn( Bytes.toBytes("info"),//列族名 Bytes.toBytes("age"),//列名 Bytes.toBytes(age)) //列值 //执行添加 table.put(put) }) }) } } |
使用Spark API向HBase写入数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
package cn.delucia.spark.rdd import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 向HBase表写入数据 使用 Spark API(saveAsHadoopDataset方法) */ object SparkWriteHBase2 { def main(args: Array[String]): Unit = { //创建SparkConf对象,存储应用程序的配置信息 val conf = new SparkConf() conf.setAppName("SparkWriteHBase2") conf.setMaster("local[*]") //创建SparkContext对象 val sc = new SparkContext(conf) //1. 设置配置信息 //创建Hadoop JobConf对象 val jobConf = new JobConf() //设置ZooKeeper集群地址 jobConf.set("hbase.zookeeper.quorum","hadoop100") //设置ZooKeeper连接端口,默认2181 jobConf.set("hbase.zookeeper.property.clientPort", "2181") //指定输出格式 jobConf.setOutputFormat(classOf[TableOutputFormat]) //指定表名 jobConf.set(TableOutputFormat.OUTPUT_TABLE,"student") //2. 构建需要写入的RDD数据 val initRDD = sc.makeRDD( Array( "005,王五,山东,23", "006,赵六,河北,20" ) ) //将RDD转换为(ImmutableBytesWritable, Put)类型 val resultRDD: RDD[(ImmutableBytesWritable, Put)] = initRDD.map( _.split(",") ).map(arr => { val rowkey = arr(0) val name = arr(1)//姓名 val address = arr(2)//地址 val age = arr(3)//年龄 //创建Put对象 val put = new Put(Bytes.toBytes(rowkey)) put.addColumn( Bytes.toBytes("info"),//列族 Bytes.toBytes("name"),//列名 Bytes.toBytes(name)//列值 ) put.addColumn( Bytes.toBytes("info"),//列族 Bytes.toBytes("address"),//列名 Bytes.toBytes(address)) //列值 put.addColumn( Bytes.toBytes("info"),//列族 Bytes.toBytes("age"),//列名 Bytes.toBytes(age)) //列值 //拼接为元组返回 (new ImmutableBytesWritable, put) }) //3. 写入数据 resultRDD.saveAsHadoopDataset(jobConf) sc.stop() } } |
批量向Hbase写入数据
使用BulkLoader加载HFile数据到HBase:
参考阅读:
https://blog.cloudera.com/how-to-use-hbase-bulk-loading-and-why/
https://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
package cn.delucia.spark.rdd import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles} import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Spark 批量写入数据到HBase(使用BulkLoader加载HFile数据到HBase) */ object SparkWriteHBase3 { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "hadoop") //创建SparkConf对象,存储应用程序的配置信息 val conf = new SparkConf() conf.setAppName("SparkWriteHBase3") conf.setMaster("local[*]") //创建SparkContext对象 val sc = new SparkContext(conf) //1. 设置HDFS和HBase配置信息 val hadoopConf = new Configuration() hadoopConf.set("fs.defaultFS", "hdfs://hadoop100:8020") val fileSystem = FileSystem.get(hadoopConf) val hbaseConf = HBaseConfiguration.create(hadoopConf) //设置ZooKeeper集群地址 hbaseConf.set("hbase.zookeeper.quorum", "hadoop100") //设置ZooKeeper连接端口,默认2181 hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //创建数据库连接对象 val conn = ConnectionFactory.createConnection(hbaseConf) //指定表名 val tableName = TableName.valueOf("student") //获取需要添加数据的Table对象 val table = conn.getTable(tableName) //获取操作数据库的Admin对象 val admin = conn.getAdmin() //2. 添加数据前的判断 //如果HBase表不存在,则创建一个新表 if (!admin.tableExists(tableName)) { val desc = new HTableDescriptor(tableName) //表名 val hcd = new HColumnDescriptor("info") //列族 desc.addFamily(hcd) admin.createTable(desc) //创建表 } //如果存放HFile文件的HDFS目录已经存在,则删除 if (fileSystem.exists(new Path("hdfs://hadoop100:8020/tmp/hbase"))) { fileSystem.delete(new Path("hdfs://hadoop100:8020/tmp/hbase"), true) } //3. 构建需要添加的RDD数据 //初始数据 val initRDD = sc.makeRDD( Array( "rowkey:007,name:王五", "rowkey:007,address:山东", "rowkey:007,age:23", "rowkey:008,name:赵六", "rowkey:008,address:河北", "rowkey:008,age:20" ) ) //数据转换 //转换为(ImmutableBytesWritable, KeyValue)类型的RDD val resultRDD: RDD[(ImmutableBytesWritable, KeyValue)] = initRDD.map( _.split(",") ).map(arr => { val rowkey = arr(0).split(":")(1) //rowkey val qualifier = arr(1).split(":")(0) //列名 val value = arr(1).split(":")(1) //列值 val kv = new KeyValue( Bytes.toBytes(rowkey), Bytes.toBytes("info"), Bytes.toBytes(qualifier), Bytes.toBytes(value) ) //构建(ImmutableBytesWritable, KeyValue)类型的元组返回 (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), kv) }) //4. 写入数据 //在HDFS中生成HFile文件 hbaseConf.set("hbase.mapreduce.hfileoutputformat.table.name",tableName.getNameAsString) resultRDD.saveAsNewAPIHadoopFile( "hdfs://hadoop100:8020/tmp/hbase", classOf[ImmutableBytesWritable], //对应RDD元素中的key classOf[KeyValue], //对应RDD元素中的value classOf[HFileOutputFormat2], hbaseConf ) //加载HFile文件到HBase val bulkLoader = new LoadIncrementalHFiles(hbaseConf) val regionLocator = conn.getRegionLocator(tableName) bulkLoader.doBulkLoad( new Path("hdfs://hadoop100:8020/tmp/hbase"), //HFile文件位置 admin, //操作HBase数据库的Admin对象 table, //目标Table对象(包含表名) regionLocator //RegionLocator对象,用于查看单个HBase表的区域位置信息 ) sc.stop() } } |
读取Hbase数据
前面学了三钟方法向HBase数据库插入数据,并且一共向Hbase的student表写入了6条记录,下面我们写一个程序从HBase中把它们读取出来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package cn.delucia.spark.rdd import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD /** * Spark读取HBase表数据 */ object SparkReadHBase { def main(args: Array[String]): Unit = { //创建SparkConf对象,存储应用程序的配置信息 val conf = new SparkConf() conf.setAppName("SparkReadHBase") conf.setMaster("local[*]") //创建SparkContext对象 val sc = new SparkContext(conf) //1. 设置HBase配置信息 val hbaseConf = HBaseConfiguration.create() //设置ZooKeeper集群地址 hbaseConf.set("hbase.zookeeper.quorum","hadoop100") //设置ZooKeeper连接端口,默认2181 hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //指定表名 hbaseConf.set(TableInputFormat.INPUT_TABLE, "student") //2. 读取HBase表数据并转化成RDD val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result] ) //3. 输出RDD中的数据到控制台 hbaseRDD.foreach{ case (_ ,result) => //获取行键 val key = Bytes.toString(result.getRow) //通过列族和列名获取列值 val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes)) val gender = Bytes.toString(result.getValue("info".getBytes,"address".getBytes)) val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes)) println("行键:"+key+"\t姓名:"+name+"\t地址:"+gender+"\t年龄:"+age) } } } |
输出
1 2 3 4 5 6 |
行键:003 姓名:王五 地址:山东 年龄:23 行键:004 姓名:赵六 地址:河北 年龄:20 行键:005 姓名:王五 地址:山东 年龄:23 行键:006 姓名:赵六 地址:河北 年龄:20 行键:007 姓名:王五 地址:山东 年龄:23 行键:008 姓名:赵六 地址:河北 年龄:20 |
解决数据倾斜问题
避免数据倾斜的办法还有很多,比如:
- 对数据进行预处理
- 过滤掉没有意义的数据
- 提高shuffle的并行度,增加分区数量
但这不能解决大量相同key导致的单个分区数据过多的问题
对于存在大量相同的key导致数据集中在某个分区的问题,这里给出一个解决办法:
添加随机前缀进行双重集合:
- 首先给数据添加随机前缀,进行分区内的局部聚合
- 然后去除随机前缀,进行全局聚合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
package cn.delucia.spark.rdd import org.apache.spark.{SparkConf, SparkContext} import scala.util.Random /** * Spark RDD解决数据倾斜案例 - 由于存在大量相同的key导致数据集中在某个分区 * 解决办法:添加随机前缀进行双重集合) * 1. 首先给数据添加随机前缀,进行分区内的局部聚合 * 2. 然后去除随机前缀,进行全局聚合 */ object DataLean { def main(args: Array[String]): Unit = { //创建Spark配置对象 val conf = new SparkConf(); conf.setAppName("DataLean") conf.setMaster("local[*]") //创建SparkContext对象 val sc = new SparkContext(conf) //1. 读取测试数据 val linesRDD = sc.textFile("data/data.txt") //2. 统计单词数量 linesRDD .flatMap(_.split(" ")) .map((_, 1)) .map(t => { val word = t._1 val random = Random.nextInt(100)//产生0~99的随机数 //单词加入随机数前缀,格式:(前缀_单词,数量) (random + "_" + word, 1) }) .reduceByKey(_ + _)//局部聚合 .map(t => { val word = t._1 val count = t._2 val w = word.split("_")(1)//去除前缀 //单词去除随机数前缀,格式:(单词,数量) (w, count) }) .reduceByKey(_ + _)//全局聚合 //输出结果到指定的HDFS目录 .saveAsTextFile("output/data_lean") } } |
Views: 78