Spark RDD 实战案例

IDEA 创建基于Maven的Spark项目

三台虚拟机搭建的集群
启动集成如下:

本地环境由于需要打包放到虚拟机运行,因此scala版本需要和虚拟机中编译spark所用的scala的版本一致。如何知道虚拟机中spark是用的什么版本scala编译的呢?可以进入虚拟机的spark-shell查看:

可见2.4.8并不是官网所说的使用scala2.12,实际上是2.11

接下来使用IDEA创建一个MAVEN项目,使用scala模板
file
配置项目名称
file

配置有效的maven环境
file

修改pom文件:

  1. 设置scala的版本,我虚拟机里的Spark是用的scala2.11版本编译的,这里能找到最接近的可用的版本就是2.11.8。
  2. 添加maven-scala-plugin依赖,使用的是2.11的版本。
  3. 添加spark-core_2.11, 版本为2.4.8,和虚拟机安装的Spark版本保持一致
  4. 此外因为下面的示例还要操作hbase,因为又引入了hadoop和hbase相关的一些依赖,其中hadoop-common依赖排除了jackson-databind是因为出现了依赖冲突的问题。
  5. 为了演示官方示例(蒙特卡洛法求Pi值),引入了spark-sql依赖

最终完整的pom.xml文件内容如下:

项目右键选择 maven->Reimport 等待依赖下载完成。
你会发现test包下面有一些自动生成的源文件有错误(依赖版本问题),这里直接把报错的文件删除,只保留一个AppTest.scala文件,可以点击测试以下环境,main包下面自动生成的文件也是直接删除即可。:

file

给项目根目录下创建data和output文件夹,最终项目结构:

file

实现单词计数

在data下面创建文本文件words.txt,内容如下

本地模式运行

直接点击运行,由于是两个核心,结果有两个文件生成
file

上传到集群运行 – Spark on YARN 模式

前提 集群已经安装好Hadoop集群并运行DFS和YARN
Spark已经配置好Spark on YARN的相关设置

编写程序

使用maven生命周期插件clean然后install打成jar包上传到集群的/opt/data目录下,将jar包改名位WordCountCluster.jar,进入spark的安装目录执行命令:

即可把任务提交到YARN集群运行,可以去output目录可以查看结果
file

有两个文件生成说明有两个分区,因为设置了2个核心

求平均成绩

data下创建score.txt

编写程序:

输出结果:

统计学生最好的三次成绩并倒序排列

控制台输出:

倒排索引统计每日新增用户

结果:

自定义排序规则(二次排序)

先准备数组,data下创建文本文件

先根据第一列升序排列,如果相同再根据第二列降序排列

控制台输出结果:

读写HBase数据库

HBase是Spark应用程序经常打交道的一个数据源。下面使用Spark程序对其进行读写操作:

使用HBase API向HBase写入数据

使用Spark API向HBase写入数据

批量向Hbase写入数据

使用BulkLoader加载HFile数据到HBase:
参考阅读:
https://blog.cloudera.com/how-to-use-hbase-bulk-loading-and-why/
https://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/

读取Hbase数据

前面学了三钟方法向HBase数据库插入数据,并且一共向Hbase的student表写入了6条记录,下面我们写一个程序从HBase中把它们读取出来

输出

解决数据倾斜问题

避免数据倾斜的办法还有很多,比如:

  1. 对数据进行预处理
  2. 过滤掉没有意义的数据
  3. 提高shuffle的并行度,增加分区数量
    但这不能解决大量相同key导致的单个分区数据过多的问题

对于存在大量相同的key导致数据集中在某个分区的问题,这里给出一个解决办法:

添加随机前缀进行双重集合:

  1. 首先给数据添加随机前缀,进行分区内的局部聚合
  2. 然后去除随机前缀,进行全局聚合

Views: 78

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注