MapReduce工程代码
- 创建maven工程
在项目根目录下创建input文件夹,创建一个文本文件access.log用来模拟nginx日志hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-" hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-" hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-" hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-" hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-" -
引入maven依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>log</artifactId> <version>1.0-SNAPSHOT</version> <properties> <!--定义Hadoop版本--> <hadoop.version>3.1.4</hadoop.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <!---引入cdh的仓库--> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <!--添加Hadoop依赖包--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <!--添加JUnit4依赖包--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies> </project> - 添加log4j.properties文件在资源目录下即resources,文件内容如下
### 配置根 ### log4j.rootLogger = debug,console,fileAppender ## 配置输出到控制台 ### log4j.appender.console = org.apache.log4j.ConsoleAppender log4j.appender.console.Target = System.out log4j.appender.console.layout = org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern = %d{ABSOLUTE} %5p %c:%L - %m%n ### 配置输出到文件 ### log4j.appender.fileAppender = org.apache.log4j.FileAppender log4j.appender.fileAppender.File = /tmp/logs/log.log log4j.appender.fileAppender.Append = false log4j.appender.fileAppender.Threshold = DEBUG,INFO,WARN,ERROR log4j.appender.fileAppender.layout = org.apache.log4j.PatternLayout log4j.appender.fileAppender.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n -
编写MR程序之Mapper:LogMapper.java
package com.niit.log; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; public class LogMapper extends Mapper<LongWritable, Text,Text, IntWritable> { // 按指定模式在字符串查找 String pattern = "\\?id=[0-9a-z]*"; // 创建 Pattern 对象 Pattern r = Pattern.compile(pattern); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String data = value.toString(); // 现在创建 matcher 对象 Matcher m = r.matcher(data); if (m.find()) { String idStr = m.group(0); String id = idStr.substring(4); context.write(new Text(id),new IntWritable(1)); } } } -
编写MR程序之Mapper:LogReducer.java
package com.niit.log; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class LogReducer extends Reducer<Text, IntWritable,Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable v: values) { sum += v.get(); } context.write(key,new IntWritable(sum)); } } -
编写MR程序之Job:LogJob.java
package com.niit.log; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LogJob { public static void main(String[] args) throws Exception { // Windows System.setProperty("HADOOP_USER_NAME", "hadoop"); // Linux // if (args.length < 2) { // System.out.println("Usage:hadoop jar Log.jar " + LogJob.class.getName() + " input ouput"); // System.exit(0); // } Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(LogJob.class); job.setMapperClass(LogMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(LogReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileSystem fileSystem = FileSystem.get(configuration); // Windows Path inputPath = new Path("./input/access.log"); Path outputPath = new Path("./output/"); // Linux // Path inputPath = new Path(args[0]); // Path outputPath = new Path(args[1]); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } // Windwos FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // Linux // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean completion = job.waitForCompletion(true); System.exit(completion ? 0 : -1); } } - 本地运行代码,测试下结果正确与否
-
本地运行测试结果正确后,需要对Driver类输入输出部分代码进行修改,具体修改如下:
// Windows // System.setProperty("HADOOP_USER_NAME", "hadoop"); // Linux if (args.length < 2) { System.out.println("Usage:hadoop jar Log.jar " + LogJob.class.getName() + " input ouput"); System.exit(0); } // Windows // Path inputPath = new Path("./input/access.log"); // Path outputPath = new Path("./output/"); // Linux Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); // Windwos // FileInputFormat.setInputPaths(job, inputPath); // FileOutputFormat.setOutputPath(job, outputPath); // Linux FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); - 打jar包,提交集群运行
直接使用maven的生命周期的package工具构建jar包,并上传到服务器的/opt/data目录下,改名为Log.jar -
为方便操作,编写脚本
/opt/bin/project/exec-mapred-task.sh来执行MR程序#!/bin/bash input_path=$(cat /tmp/project-mapred-input-path.txt) #执行MapReduce程序 dataformat=`date +%y-%m-%d` echo "/opt/pkg/hadoop/bin/hadoop jar /opt/data/log.jar com.niit.log.LogJob $input_path /output/result/$dataformat" /opt/pkg/hadoop/bin/hadoop jar /opt/data/log.jar com.niit.log.LogJob $input_path /output/result/$dataformat /opt/pkg/hadoop/bin/hdfs dfs -cat /output/result/$dataformat/part-r-00000 > /tmp/project_mr_result.txt echo "======== mapred task result ========" echo $(cat /tmp/project_mr_result.txt)
Views: 78
