安装netcat
Netcat 是一款简单的Unix工具,简称 nc,安全界叫它瑞士军刀, 使用UDP和TCP协议。 它是一个可靠的容易被其他程序所启用的后台操作工具,同时它也被用作网络的测试工具或黑客工具。 使用它你可以轻易的建立任何连接。内建有很多实用的工具。
$> sudo yum install nmap-ncat.x86_64
nc的一些用法:
端口测试
检测主机上8080端口服务是否开放
telnet 192.168.1.2 8080
或者
nc -vz 192.168.1.2 8080
z表示不发送数据,v表示显示额外信息
nc 命令后面的 8080 可以写成一个范围进行扫描:
nc -v -v -w3 -z 192.168.1.2 8080-8083
两次 -v 是让它报告更详细的内容,-w3 是设置扫描超时时间为 3 秒。
传输测试
A 主机上监听了 8080 端口
nc -l -p 8080
然后在 B 主机上连接过去:
nc 192.168.1.2 8080
两边就可以会话了,随便输入点什么按回车,另外一边应该会显示出来.
netcat source
配置
[hello.conf]
#声明三种组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#定义source信息
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
#定义sink信息
a1.sinks.k1.type=logger
#定义channel信息
a1.channels.c1.type=memory
#绑定在一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
运行
- 启动flume agent
$> bin/flume-ng agent -f conf/hello.conf -n a1 -Dflume.root.logger=INFO,console - 启动nc的客户端
$> nc localhost 8888 $nc> hello world - 在Flume的终端输出
hello world.
exec source
实时日志收集,实时收集日志。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /home/centos/test.txt
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
spooldir 源
监控一个文件夹,静态文件, 批量收集。
收集完之后,会重命名文件成新文件.COMPLETED.
配置文件
[spooldir_r.conf]
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/centos/spool
a1.sources.r1.fileHeader=true
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
创建目录
$>mkdir ~/spool
启动flume
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
seq source
生成事件序列的源, 一般用于测试.
[seq]
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type=seq
a1.sources.r1.totalEvents=1000
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
[运行]
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
Stress Source
用于压力测试的源.
a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1
TailDir Source
Taildir Source目前只是个预览版本,还不能运行在windows系统上。
Taildir Source监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们,如果新的一行数据还没写完,Taildir Source会等到这行写完后再读取。
Taildir Source是可靠的,即使发生文件滚动也不会丢失数据。它会定期地以JSON格式在一个专门用于定位的文件上记录每个文件的最后读取位置。如果Flume由于某种原因停止或挂掉,它可以从文件的标记位置重新开始读取。
Taildir Source还可以从任意指定的位置开始读取文件。默认情况下,它将从每个文件的第一行开始读取。
文件按照修改时间的顺序来读取。修改时间最早的文件将最先被读取(简单记成:先来先走)。
Taildir Source不重命名、删除或修改它监控的文件。当前不支持读取二进制文件。只能逐行读取文本文件。
文件滚动(file rotate)就是我们常见的log4j等日志框架或者系统会自动丢弃日志文件中时间久远的日志,一般按照日志文件大小或时间来自动分割或丢弃的机制。
练习
使用Flume监听整个目录的实时追加文件,并上传至HDFS
#步骤一:agent Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#步骤二:source
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/tail_dir.json -- 指定position_file 的位置(记录每次上传后的偏移量,实现断点续传的关键)
a1.sources.r1.filegroups = f1 f2 -- 监控的文件目录集合
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*file.* -- 定义监控的文件目录1
a1.sources.r1.filegroups.f2 = /opt/module/flume/files/.*log.* -- 定义监控的文件目录2
#步骤三: channel selector
a1.sources.r1.selector.type = replicating
#步骤四: channel
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#步骤五: sinkprocessor,默认配置defaultsinkprocessor
#步骤六: sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume/upload3/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0
#步骤七:连接source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Views: 325
