安装netcat
Netcat 是一款简单的Unix工具,简称 nc,安全界叫它瑞士军刀, 使用UDP和TCP协议。 它是一个可靠的容易被其他程序所启用的后台操作工具,同时它也被用作网络的测试工具或黑客工具。 使用它你可以轻易的建立任何连接。内建有很多实用的工具。
1 |
$> sudo yum install nmap-ncat.x86_64 |
nc的一些用法:
端口测试
检测主机上8080端口服务是否开放
1 |
telnet 192.168.1.2 8080 |
或者
1 |
nc -vz 192.168.1.2 8080 |
z表示不发送数据,v表示显示额外信息
nc 命令后面的 8080 可以写成一个范围进行扫描:
1 |
nc -v -v -w3 -z 192.168.1.2 8080-8083 |
两次 -v 是让它报告更详细的内容,-w3 是设置扫描超时时间为 3 秒。
传输测试
A 主机上监听了 8080 端口
1 |
nc -l -p 8080 |
然后在 B 主机上连接过去:
1 |
nc 192.168.1.2 8080 |
两边就可以会话了,随便输入点什么按回车,另外一边应该会显示出来.
netcat source
配置
[hello.conf]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
#声明三种组件 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #定义source信息 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 #定义sink信息 a1.sinks.k1.type=logger #定义channel信息 a1.channels.c1.type=memory #绑定在一起 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 |
运行
- 启动flume agent
$> bin/flume-ng agent -f conf/hello.conf -n a1 -Dflume.root.logger=INFO,console
- 启动nc的客户端
$> nc localhost 8888 $nc> hello world
- 在Flume的终端输出
hello world.
exec source
实时日志收集,实时收集日志。
1 2 3 4 5 6 7 8 9 10 11 12 |
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=exec a1.sources.r1.command=tail -F /home/centos/test.txt a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 |
spooldir 源
监控一个文件夹,静态文件, 批量收集。
收集完之后,会重命名文件成新文件.COMPLETED.
配置文件
[spooldir_r.conf]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/home/centos/spool a1.sources.r1.fileHeader=true a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 |
创建目录
1 |
$>mkdir ~/spool |
启动flume
1 |
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console |
seq source
生成事件序列的源, 一般用于测试.
[seq]
1 2 3 4 5 6 7 8 9 10 11 12 13 |
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type=seq a1.sources.r1.totalEvents=1000 a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 |
[运行]
1 |
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console |
Stress Source
用于压力测试的源.
1 2 3 4 5 6 |
a1.sources = stresssource-1 a1.channels = memoryChannel-1 a1.sources.stresssource-1.type = org.apache.flume.source.StressSource a1.sources.stresssource-1.size = 10240 a1.sources.stresssource-1.maxTotalEvents = 1000000 a1.sources.stresssource-1.channels = memoryChannel-1 |
TailDir Source
Taildir Source目前只是个预览版本,还不能运行在windows系统上。
Taildir Source监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们,如果新的一行数据还没写完,Taildir Source会等到这行写完后再读取。
Taildir Source是可靠的,即使发生文件滚动也不会丢失数据。它会定期地以JSON格式在一个专门用于定位的文件上记录每个文件的最后读取位置。如果Flume由于某种原因停止或挂掉,它可以从文件的标记位置重新开始读取。
Taildir Source还可以从任意指定的位置开始读取文件。默认情况下,它将从每个文件的第一行开始读取。
文件按照修改时间的顺序来读取。修改时间最早的文件将最先被读取(简单记成:先来先走)。
Taildir Source不重命名、删除或修改它监控的文件。当前不支持读取二进制文件。只能逐行读取文本文件。
文件滚动(file rotate)就是我们常见的log4j等日志框架或者系统会自动丢弃日志文件中时间久远的日志,一般按照日志文件大小或时间来自动分割或丢弃的机制。
练习
使用Flume监听整个目录的实时追加文件,并上传至HDFS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
#步骤一:agent Name a1.sources = r1 a1.sinks = k1 a1.channels = c1 #步骤二:source # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/module/flume/tail_dir.json -- 指定position_file 的位置(记录每次上传后的偏移量,实现断点续传的关键) a1.sources.r1.filegroups = f1 f2 -- 监控的文件目录集合 a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*file.* -- 定义监控的文件目录1 a1.sources.r1.filegroups.f2 = /opt/module/flume/files/.*log.* -- 定义监控的文件目录2 #步骤三: channel selector a1.sources.r1.selector.type = replicating #步骤四: channel # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #步骤五: sinkprocessor,默认配置defaultsinkprocessor #步骤六: sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume/upload3/%Y%m%d/%H #上传文件的前缀 a1.sinks.k1.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a1.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a1.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a1.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a1.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a1.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a1.sinks.k1.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M a1.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a1.sinks.k1.hdfs.rollCount = 0 #步骤七:连接source、channel、sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
Views: 325