Flume综合案例之拦截器

Flume综合案例之静态拦截器使用

1. 案例场景

  • A、B两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log

  • 现在需要把A、B 机器中的access.log、nginx.log、web.log 采集汇总到C机器上然后统一收集到hdfs中。

  • 但是在hdfs中要求的目录为:

/source/logs/access/20200210/**
/source/logs/nginx/20200210/**
/source/logs/web/20200210/**

2. 场景分析

image-20200522145357477

3. 数据流程处理分析

image-20200522145429320

4. 实现

  • 服务器A对应的IP为 192.168.52.100

  • 服务器B对应的IP为 192.168.52.110

  • 服务器C对应的IP为 192.168.52.120

采集端配置文件开发
  • node01与node02服务器开发flume的配置文件
cd /kkb/install/apache-flume-1.9.0-bin/conf/
vim exec_source_avro_sink.conf
  • 内容如下
# Name the components on this agent
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# set source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /kkb/install/taillogs/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
##  static拦截器的功能就是往采集到的数据的header中插入自定义的key-value对;与node03上的agent的sink中的type相呼应
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /kkb/install/taillogs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /kkb/install/taillogs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# set sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node03
a1.sinks.k1.port = 41415

# set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

static interceptor 静态拦截器

服务端配置文件开发
  • 在node03上面开发flume配置文件
cd /kkb/install/apache-flume-1.9.0-bin/conf/
vim avro_source_hdfs_sink.conf
  • 内容如下
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = node03
a1.sources.r1.port =41415

#定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://node01:8020/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#时间类型
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
#生成的文件按时间生成
a1.sinks.k1.hdfs.rollInterval = 30
#生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize  = 10485760
#批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 10000
#flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
#操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000

#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
采集端文件生成脚本
  • 在node01与node02上面开发shell脚本,模拟数据生成
cd /kkb/install/shells
vim server.sh
  • 内容如下
#!/bin/bash
while true
do  
 date >> /kkb/install/taillogs/access.log; 
 date >> /kkb/install/taillogs/web.log;
 date >> /kkb/install/taillogs/nginx.log;
  sleep 0.5;
done
  • node01、node02给脚本添加可执行权限
chmod u+x server.sh
顺序启动服务
  • node03启动flume实现数据收集
cd /kkb/install/apache-flume-1.9.0-bin/
bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console
  • node01与node02启动flume实现数据监控
cd /kkb/install/apache-flume-1.9.0-bin/
bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console
  • node01与node02启动生成文件脚本
cd /kkb/install/shells
sh server.sh
  • 查看hdfs目录/source/logs

Flume综合案例之自定义拦截器使用

案例需求:

  • 在数据采集之后,通过Flume的拦截器,实现将无效的JSON格式的消息过滤掉。

实现步骤

第一步:创建maven java工程,导入jar包

    
        org.apache.flume
        flume-ng-core
        1.9.0
    


    
        
            org.apache.maven.plugins
            maven-compiler-plugin
            3.0
            
                1.8
                1.8
                UTF-8
                
            
        
        
            org.apache.maven.plugins
            maven-shade-plugin
            3.1.1
            
                
                    package
                    
                        shade
                    
                    
                        
                            
                                *:*
                                
                                    META-INF/*.SF
                                    META-INF/*.DSA
                                    META-INF/*.RSA
                                
                            
                        
                        
                            
                                
                            
                        
                    
                
            
        
    
第二步:自定义flume的拦截器

新建package com.niit.flume.interceptor

新建类及内部类,分别是JsonInterceptorMyBuilder

package com.niit.flume.interceptor;

import org.apache.commons.codec.Charsets;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @Author: deLucia
 * @Date: 2021/6/2
 * @Version: 1.0
 * @Description:
 */
public class JsonInterceptor implements Interceptor {
    private static final Logger LOG = LoggerFactory.getLogger(JsonInterceptor.class);
    private String tag = "";

    public JsonInterceptor(String tag) {
        this.tag = tag;
    }

    @Override
    public void initialize() {

    }

    /**
     * 过滤JSON格式之外的数据 {“key":value, {...}}
     * @param event
     * @return
     */
    @Override
    public Event intercept(Event event) {
        String line = new String(event.getBody(), Charsets.UTF_8);
        line = line.trim();
        if (StringUtils.isBlank(line)) {
            return null;
        }

        // {..}
        if (line.startsWith("{") && line.endsWith("}")) {

            Map headers = event.getHeaders();
            headers.put("tag", tag);
            return event;

        } else {
            LOG.warn("Not Valid JSON: " + line);
            return null;
        }
    }

    @Override
    public List intercept(List list) {
        List out = new ArrayList();
        for (Event event : list) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                out.add(outEvent);
            }
        }
        return out;
    }

    @Override
    public void close() {

    }
    /**
     * 相当于自定义Interceptor的工厂类
     * 在flume采集配置文件中通过指定该Builder来创建Interceptor对象
     *
     * @author
     */
    public static class MyBuilder implements Interceptor.Builder {

        private String tag;

        @Override
        public void configure(Context context) {
            //从flume的配置文件中获得拦截器的“tag”属性值
            this.tag = context.getString("tag", "").trim();
        }

        /*
         * @see org.apache.flume.interceptor.Interceptor.Builder#build()
         */
        @Override
        public JsonInterceptor build() {
            return new JsonInterceptor(tag);
        }
    }
}
第三步:打包上传服务器
  • 将我们的拦截器打成jar包放到主机名为hadoop100的服务器上的flume安装目录下的lib目录下
第四步:开发flume的配置文件

开发flume的配置文件

cd /opt/pkg/flume/conf/
vim nc-interceptor-logger.conf
  • 内容如下

记得将下边的i1.type根据自己的实际情况进行替换

#声明三种组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#定义source信息
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
#定义source的拦截器
a1.sources.r1.interceptors = i1
#根据自定义的拦截器,相应修改全类名及内部类名
a1.sources.r1.interceptors.i1.type =com.niit.flume.interceptor.JsonInterceptor$MyBuilder
## tag的内容将被设置到消息头中
a1.sources.r1.interceptors.i1.tag = json_event

#定义sink信息
a1.sinks.k1.type=logger

#定义channel信息
a1.channels.c1.type=memory

#绑定在一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
第五步:运行FLume Agent

同时开启INFO日志级别

[hadoop@hadoop100 conf]$ bin/flume-ng agent -c conf/ -f conf/nc_interceptor_logger.conf -n a1 -Dflume.root.logger=INFO,console
第六步:简单测试

通过netcat客户端发送数据

[hadoop@hadoop100 ~]$ nc localhost 8888
asd
dsa
{"OK
OK                                                                             {
OK
OK
{"key":123}
OK

查看flume的agent的日志信息如下

2021-05-26 14:40:12,233 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting
2021-05-26 14:40:17,326 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:8888]
2021-05-26 14:40:20,952 (netcat-handler-0) [WARN - com.niit.flume.interceptor.JsonInterceptor.intercept(JsonInterceptor.java:48)] Invalid json format event !
2021-05-26 14:40:20,953 (netcat-handler-0) [WARN - com.niit.flume.interceptor.JsonInterceptor.intercept(JsonInterceptor.java:48)] Invalid json format event !
2021-05-26 14:40:24,109 (netcat-handler-0) [WARN - com.niit.flume.interceptor.JsonInterceptor.intercept(JsonInterceptor.java:48)] Invalid json format event !
2021-05-26 14:40:24,909 (netcat-handler-0) [WARN - com.niit.flume.interceptor.JsonInterceptor.intercept(JsonInterceptor.java:48)] Invalid json format event !
2021-05-26 14:40:36,895 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{tag=json_event} body: 7B 22 6B 65 79 22 3A 31 32 33 7D                {"key":123} }

Views: 108

Index