Flume综合案例之拦截器

Flume综合案例之静态拦截器使用

1. 案例场景

  • A、B两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log

  • 现在需要把A、B 机器中的access.log、nginx.log、web.log 采集汇总到C机器上然后统一收集到hdfs中。

  • 但是在hdfs中要求的目录为:

2. 场景分析

image-20200522145357477

3. 数据流程处理分析

image-20200522145429320

4. 实现

  • 服务器A对应的IP为 192.168.52.100

  • 服务器B对应的IP为 192.168.52.110

  • 服务器C对应的IP为 192.168.52.120

采集端配置文件开发
  • node01与node02服务器开发flume的配置文件

  • 内容如下

static interceptor 静态拦截器

服务端配置文件开发
  • 在node03上面开发flume配置文件

  • 内容如下

采集端文件生成脚本
  • 在node01与node02上面开发shell脚本,模拟数据生成

  • 内容如下

  • node01、node02给脚本添加可执行权限

顺序启动服务
  • node03启动flume实现数据收集

  • node01与node02启动flume实现数据监控

  • node01与node02启动生成文件脚本

  • 查看hdfs目录/source/logs

Flume综合案例之自定义拦截器使用

案例需求:

  • 在数据采集之后,通过Flume的拦截器,实现将无效的JSON格式的消息过滤掉。

实现步骤

第一步:创建maven java工程,导入jar包

第二步:自定义flume的拦截器

新建package com.niit.flume.interceptor

新建类及内部类,分别是JsonInterceptorMyBuilder

第三步:打包上传服务器
  • 将我们的拦截器打成jar包放到主机名为hadoop100的服务器上的flume安装目录下的lib目录下
第四步:开发flume的配置文件

开发flume的配置文件

  • 内容如下

记得将下边的i1.type根据自己的实际情况进行替换

第五步:运行FLume Agent

同时开启INFO日志级别

第六步:简单测试

通过netcat客户端发送数据

查看flume的agent的日志信息如下

Views: 106

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注