电商日志分析项目 – 03 日志的分析(MapReduce)

MapReduce工程代码

  1. 创建maven工程
    在项目根目录下创建input文件夹,创建一个文本文件access.log用来模拟nginx日志

    hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-"
    hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-"
    hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-"
    hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-"
    hadoop100,192.168.186.100 - - [21/Apr/2021:01:36:19 +0800] "GET /shop/detail.html?id=4028f00176e1aa620176e6aca6890003 HTTP/1.0" 200 4366 "-" "ApacheBench/2.3" "-"
  2. 引入maven依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>log</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <!--定义Hadoop版本-->
            <hadoop.version>3.1.4</hadoop.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
    
        <!---引入cdh的仓库-->
        <repositories>
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            </repository>
        </repositories>
    
        <dependencies>
            <!--添加Hadoop依赖包-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <!--添加JUnit4依赖包-->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    </project>
  3. 添加log4j.properties文件在资源目录下即resources,文件内容如下
    ### 配置根 ###
    log4j.rootLogger = debug,console,fileAppender
    ## 配置输出到控制台 ###
    log4j.appender.console = org.apache.log4j.ConsoleAppender
    log4j.appender.console.Target = System.out
    log4j.appender.console.layout = org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern = %d{ABSOLUTE} %5p %c:%L - %m%n
    ### 配置输出到文件 ###
    log4j.appender.fileAppender = org.apache.log4j.FileAppender
    log4j.appender.fileAppender.File = /tmp/logs/log.log
    log4j.appender.fileAppender.Append = false
    log4j.appender.fileAppender.Threshold = DEBUG,INFO,WARN,ERROR
    log4j.appender.fileAppender.layout = org.apache.log4j.PatternLayout
    log4j.appender.fileAppender.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
  4. 编写MR程序之Mapper:LogMapper.java

    package com.niit.log;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class LogMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
        // 按指定模式在字符串查找
        String pattern = "\\?id=[0-9a-z]*";
        // 创建 Pattern 对象
        Pattern r = Pattern.compile(pattern);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String data = value.toString();
            // 现在创建 matcher 对象
            Matcher m = r.matcher(data);
            if (m.find()) {
                String idStr = m.group(0);
                String id = idStr.substring(4);
                context.write(new Text(id),new IntWritable(1));
            }
        }
    }   
  5. 编写MR程序之Mapper:LogReducer.java

    package com.niit.log;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class LogReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable v: values) {
                sum += v.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }
  6. 编写MR程序之Job:LogJob.java

    package com.niit.log;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class LogJob {
        public static void main(String[] args) throws Exception {
    
            // Windows
            System.setProperty("HADOOP_USER_NAME", "hadoop");
    
            // Linux
            // if (args.length < 2) {
            //     System.out.println("Usage:hadoop jar Log.jar " + LogJob.class.getName() + " input ouput");
            //     System.exit(0);
            // }
    
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
            job.setJarByClass(LogJob.class);
    
            job.setMapperClass(LogMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setReducerClass(LogReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            FileSystem fileSystem = FileSystem.get(configuration);
    
            // Windows
            Path inputPath = new Path("./input/access.log");
            Path outputPath = new Path("./output/");
    
            // Linux
            // Path inputPath = new Path(args[0]);
            // Path outputPath = new Path(args[1]);
    
            if (fileSystem.exists(outputPath)) {
                fileSystem.delete(outputPath, true);
            }
    
            // Windwos
            FileInputFormat.setInputPaths(job, inputPath);
            FileOutputFormat.setOutputPath(job, outputPath);
    
            // Linux
            // FileInputFormat.setInputPaths(job, new Path(args[0]));
            // FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            boolean completion = job.waitForCompletion(true);
            System.exit(completion ? 0 : -1);
        }
    }
  7. 本地运行代码,测试下结果正确与否
  8. 本地运行测试结果正确后,需要对Driver类输入输出部分代码进行修改,具体修改如下:

    // Windows
    // System.setProperty("HADOOP_USER_NAME", "hadoop");
    
    // Linux
    if (args.length < 2) {
        System.out.println("Usage:hadoop jar Log.jar " + LogJob.class.getName() + " input ouput");
        System.exit(0);
    }
    
    // Windows
    // Path inputPath = new Path("./input/access.log");
    // Path outputPath = new Path("./output/");
    
    // Linux
    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);
    
    // Windwos
    // FileInputFormat.setInputPaths(job, inputPath);
    // FileOutputFormat.setOutputPath(job, outputPath);
    
    // Linux
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
  9. 打jar包,提交集群运行
    直接使用maven的生命周期的package工具构建jar包,并上传到服务器的/opt/data目录下,改名为Log.jar
  10. 为方便操作,编写脚本/opt/bin/project/exec-mapred-task.sh来执行MR程序

    
    #!/bin/bash
    
    input_path=$(cat /tmp/project-mapred-input-path.txt)
    
    #执行MapReduce程序
    dataformat=`date +%y-%m-%d`
    echo "/opt/pkg/hadoop/bin/hadoop jar /opt/data/log.jar com.niit.log.LogJob $input_path /output/result/$dataformat"
    /opt/pkg/hadoop/bin/hadoop jar /opt/data/log.jar com.niit.log.LogJob $input_path /output/result/$dataformat
    /opt/pkg/hadoop/bin/hdfs dfs -cat /output/result/$dataformat/part-r-00000 > /tmp/project_mr_result.txt
    
    echo "======== mapred task result ========"
    echo $(cat /tmp/project_mr_result.txt)

Views: 78