MapReduce工程代码
- 创建maven工程
在项目根目录下创建input文件夹,创建一个文本文件access.log用来模拟nginx日志
12345hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-"hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-"hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-"hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-"hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-" -
引入maven依赖
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>log</artifactId><version>1.0-SNAPSHOT</version><properties><!--定义Hadoop版本--><hadoop.version>3.1.4</hadoop.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><!---引入cdh的仓库--><repositories><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><dependencies><!--添加Hadoop依赖包--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version></dependency><!--添加JUnit4依赖包--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies></project> - 添加log4j.properties文件在资源目录下即resources,文件内容如下
1234567891011121314### 配置根 ###log4j.rootLogger = debug,console,fileAppender## 配置输出到控制台 ###log4j.appender.console = org.apache.log4j.ConsoleAppenderlog4j.appender.console.Target = System.outlog4j.appender.console.layout = org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern = %d{ABSOLUTE} %5p %c:%L - %m%n### 配置输出到文件 ###log4j.appender.fileAppender = org.apache.log4j.FileAppenderlog4j.appender.fileAppender.File = /tmp/logs/log.loglog4j.appender.fileAppender.Append = falselog4j.appender.fileAppender.Threshold = DEBUG,INFO,WARN,ERRORlog4j.appender.fileAppender.layout = org.apache.log4j.PatternLayoutlog4j.appender.fileAppender.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
-
编写MR程序之Mapper:LogMapper.java
12345678910111213141516171819202122232425262728package com.niit.log;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.util.regex.Matcher;import java.util.regex.Pattern;public class LogMapper extends Mapper<LongWritable, Text,Text, IntWritable> {// 按指定模式在字符串查找String pattern = "\\?id=[0-9a-z]*";// 创建 Pattern 对象Pattern r = Pattern.compile(pattern);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String data = value.toString();// 现在创建 matcher 对象Matcher m = r.matcher(data);if (m.find()) {String idStr = m.group(0);String id = idStr.substring(4);context.write(new Text(id),new IntWritable(1));}}} -
编写MR程序之Mapper:LogReducer.java
12345678910111213141516171819package com.niit.log;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class LogReducer extends Reducer<Text, IntWritable,Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable v: values) {sum += v.get();}context.write(key,new IntWritable(sum));}} -
编写MR程序之Job:LogJob.java
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061package com.niit.log;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogJob {public static void main(String[] args) throws Exception {// WindowsSystem.setProperty("HADOOP_USER_NAME", "hadoop");// Linux// if (args.length < 2) {// System.out.println("Usage:hadoop jar Log.jar " + LogJob.class.getName() + " input ouput");// System.exit(0);// }Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);job.setJarByClass(LogJob.class);job.setMapperClass(LogMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setReducerClass(LogReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileSystem fileSystem = FileSystem.get(configuration);// WindowsPath inputPath = new Path("./input/access.log");Path outputPath = new Path("./output/");// Linux// Path inputPath = new Path(args[0]);// Path outputPath = new Path(args[1]);if (fileSystem.exists(outputPath)) {fileSystem.delete(outputPath, true);}// WindwosFileInputFormat.setInputPaths(job, inputPath);FileOutputFormat.setOutputPath(job, outputPath);// Linux// FileInputFormat.setInputPaths(job, new Path(args[0]));// FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean completion = job.waitForCompletion(true);System.exit(completion ? 0 : -1);}} - 本地运行代码,测试下结果正确与否
-
本地运行测试结果正确后,需要对Driver类输入输出部分代码进行修改,具体修改如下:
123456789101112131415161718192021222324// Windows// System.setProperty("HADOOP_USER_NAME", "hadoop");// Linuxif (args.length < 2) {System.out.println("Usage:hadoop jar Log.jar " + LogJob.class.getName() + " input ouput");System.exit(0);}// Windows// Path inputPath = new Path("./input/access.log");// Path outputPath = new Path("./output/");// LinuxPath inputPath = new Path(args[0]);Path outputPath = new Path(args[1]);// Windwos// FileInputFormat.setInputPaths(job, inputPath);// FileOutputFormat.setOutputPath(job, outputPath);// LinuxFileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1])); - 打jar包,提交集群运行
直接使用maven的生命周期的package工具构建jar包,并上传到服务器的/opt/data
目录下,改名为Log.jar
-
为方便操作,编写脚本
/opt/bin/project/exec-mapred-task.sh
来执行MR程序12345678910111213#!/bin/bashinput_path=$(cat /tmp/project-mapred-input-path.txt)#执行MapReduce程序dataformat=`date +%y-%m-%d`echo "/opt/pkg/hadoop/bin/hadoop jar /opt/data/log.jar com.niit.log.LogJob $input_path /output/result/$dataformat"/opt/pkg/hadoop/bin/hadoop jar /opt/data/log.jar com.niit.log.LogJob $input_path /output/result/$dataformat/opt/pkg/hadoop/bin/hdfs dfs -cat /output/result/$dataformat/part-r-00000 > /tmp/project_mr_result.txtecho "======== mapred task result ========"echo $(cat /tmp/project_mr_result.txt)
Views: 77