Flume Sinks 类型有很多,这里只挑出一些我们常用的Sink.
- HDFS Sink
- Hive Sink
- Logger Sink
- Avro Sink
- HBase Sinks
- Kafka Sink
- HTTP Sink
- File Roll Sink
- NULL sink
- Custom SInk
HDFS Sink
这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。
使用此Sink需要安装hadoop, 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。
注意,%[localhost], %[IP] 和 %[FQDN]这三个转义符实际上都是用java的API来获取的,在一些网络环境下可能会获取失败。
正在打开的文件会在名称末尾加上“.tmp”的后缀。文件关闭后,会自动删除此扩展名。这样容易排除目录中的那些已完成的文件。 必需的参数已用 粗体 标明。
属性名 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 连接的 channel |
type | – | 组件类型,这个是: hdfs |
hdfs.path | – | HDFS目录路径(例如:hdfs://namenode/flume/webdata/) |
hdfs.filePrefix | FlumeData | Flume在HDFS文件夹下创建新文件的固定前缀 |
hdfs.fileSuffix | – | Flume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置) |
hdfs.inUsePrefix | – | Flume正在写入的临时文件前缀,默认没有 |
hdfs.inUseSuffix | .tmp | Flume正在写入的临时文件后缀 |
hdfs.emptyInUseSuffix | false | 如果设置为 false 上面的 hdfs.inUseSuffix 参数在写入文件时会生效,并且写入完成后会在目标文件上移除 hdfs.inUseSuffix 配置的后缀。如果设置为 true 则上面的 hdfs.inUseSuffix 参数会被忽略,写文件时不会带任何后缀 |
hdfs.rollInterval | 30 | 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒 |
hdfs.rollSize | 1024 | 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 |
hdfs.rollCount | 10 | 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件) |
hdfs.idleTimeout | 0 | 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒 |
hdfs.batchSize | 100 | 向 HDFS 写入内容时每次批量操作的 Event 数量 |
hdfs.codeC | – | 压缩算法。可选值:gzip 、 bzip2 、 lzo 、 lzop 、 `snappy |
hdfs.fileType | SequenceFile | 文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数 |
hdfs.maxOpenFiles | 5000 | 允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭 |
hdfs.minBlockReplicas | – | 指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。 |
hdfs.writeFormat | Writable | 文件写入格式。可选值: Text 、 Writable 。在使用 Flume 创建数据文件之前设置为 Text ,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。 |
hdfs.threadsPoolSize | 10 | 每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等) |
hdfs.rollTimerPoolSize | 1 | 每个HDFS Sink实例调度定时文件滚动的线程数 |
hdfs.kerberosPrincipal | – | 用于安全访问 HDFS 的 Kerberos 用户主体 |
hdfs.kerberosKeytab | – | 用于安全访问 HDFS 的 Kerberos keytab 文件 |
hdfs.proxyUser | 代理名 | |
hdfs.round | false | 是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符) |
hdfs.roundValue | 1 | 向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30 |
hdfs.roundUnit | second | 向下舍入的单位,可选值: second 、 minute 、 hour |
hdfs.timeZone | Local Time | 解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai |
hdfs.useLocalTimeStamp | false | 使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳) |
hdfs.closeTries | 0 | 开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;如果设置为0,Sink会一直尝试重命名文件直到成功为止;关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。 |
hdfs.retryInterval | 180 | 连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。 |
serializer | TEXT | Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。 |
serializer.* | 根据上面 serializer 配置的类型来根据需要添加序列化器的参数 |
废弃的一些参数:
属性名 | 默认值 | 解释 |
---|---|---|
hdfs.callTimeout | 10000 | 允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒) |
1 2 3 4 5 6 7 8 9 |
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute |
上面的例子中时间戳会向前一个整10分钟取整。比如,一个 Event 的 header 中带的时间戳是11:54:34 AM, June 12, 2012,它会保存的 HDFS 路径就是/flume/events/2012-06-12/1150/00。
Hive Sink
此Sink将包含分隔文本或JSON数据的 Event 直接流式传输到 Hive表或分区上。 Event 使用 Hive事务进行写入, 一旦将一组 Event 提交给Hive,它们就会立即显示给Hive查询。 即将写入的目标分区既可以预先自己创建,也可以选择让 Flume 创建它们,如果没有的话。 写入的 Event 数据中的字段将映射到 Hive表中的相应列。
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 连接的 channel |
type | – | 组件类型,这个是: hive |
hive.metastore | – | Hive metastore URI (eg thrift://a.b.com:9083 ) |
hive.database | – | Hive 数据库名 |
hive.table | – | Hive表名 |
hive.partition | – | 逗号分隔的要写入的分区信息。 比如hive表的分区是(continent: string, country: string, time : string), 那么“Asia,India,2014-02-26-01-21”就表示数据会写入到continent=Asia,country=India,time=2014-02-26-01-21这个分区。 |
hive.txnsPerBatchAsk | 100 | Hive从Flume等客户端接收数据流会使用多次事务来操作,而不是只开启一个事务。这个参数指定处理每次请求所开启的事务数量。来自同一个批次中所有事务中的数据最终都在一个文件中。 Flume会向每个事务中写入 batchSize 个 Event,这个参数和 batchSize 一起控制着每个文件的大小,请注意,Hive最终会将这些文件压缩成一个更大的文件。 |
heartBeatInterval | 240 | 发送到 Hive 的连续心跳检测间隔(秒),以防止未使用的事务过期。设置为0表示禁用心跳。 |
autoCreatePartitions | true | Flume 会自动创建必要的 Hive分区以进行流式传输 |
batchSize | 15000 | 写入一个 Hive事务中最大的 Event 数量 |
maxOpenConnections | 500 | 允许打开的最大连接数。如果超过此数量,则关闭最近最少使用的连接。 |
callTimeout | 10000 | Hive、HDFS I/O操作的超时时间(毫秒),比如:开启事务、写数据、提交事务、取消事务。 |
serializer | 序列化器负责解析 Event 中的字段并把它们映射到 Hive表中的列,选择哪种序列化器取决于 Event 中的数据格式,支持的序列化器有:DELIMITED 和 JSON | |
round | false | 是否启用时间戳舍入机制 |
roundUnit | minute | 舍入值的单位,可选值:second 、 minute 、 hour |
roundValue | 1 | 舍入到小于当前时间的最高倍数(使用 roundUnit 配置的单位) 例子1:roundUnit=second,roundValue=10,则14:31:18这个时间戳会被舍入到14:31:10; 例子2:roundUnit=second,roundValue=30,则14:31:18这个时间戳会被舍入到14:31:00,14:31:42这个时间戳会被舍入到14:31:30; |
timeZone | Local Time | 应用于解析分区中转义序列的时区名称,比如:America/Los_Angeles、Asia/Shanghai、Asia/Tokyo等 |
useLocalTimeStamp | false | 替换转义序列时是否使用本地时间戳(否则使用Event header中的timestamp ) |
org.apache.hive.hcatalog.data.JsonSerDe
,但独立于 Hive表的 Serde
。 此序列化程序需要安装 HCatalog。DELIMITED: 处理简单的分隔文本 Event。 内部使用 LazySimpleSerde,但独立于 Hive表的 Serde。属性 | 默认值 | 解释 |
---|---|---|
serializer.delimiter | , | (类型:字符串)传入数据中的字段分隔符。 要使用特殊字符,请用双引号括起来,例如“\t” |
serializer.fieldnames | – | 从输入字段到Hive表中的列的映射。 指定为Hive表列名称的逗号分隔列表(无空格),按顺序标识输入字段。 要跳过字段,请保留未指定的列名称。 例如, ‘time,,ip,message’表示输入映射到hive表中的 time,ip 和 message 列的第1,第3和第4个字段。 |
serializer.serdeSeparator | Ctrl-A | (类型:字符)自定义底层序列化器的分隔符。如果 serializer.fieldnames 中的字段与 Hive表列的顺序相同,则 serializer.delimiter 与 serializer.serdeSeparator 相同, 并且 serializer.fieldnames 中的字段数小于或等于表的字段数量,可以提高效率,因为传入 Event 正文中的字段不需要重新排序以匹配 Hive表列的顺序。 对于’\t’这样的特殊字符使用单引号,要确保输入字段不包含此字符。 注意:如果 serializer.delimiter 是单个字符,最好将本参数也设置为相同的字符。 |
为了避免出现找不到类的异常,首先需添加依赖的jar包:
- 将hive/lib下面hive-hcatalog-core-3.1.2.jar拷贝或者软链接到flume/lib下
$ cp /opt/pkg/hive/lib/hive-hcatalog-core-3.1.2.jar /opt/pkg/flume/lib/
- 接下来我们还需要这个依赖:hive-hcatalog-streaming-3.1.2.jar。这个hive目录是找不到的,需要单独从mvnrepository网站搜索下载,下载地址:https://mvnrepository.com/artifact/org.apache.hive.hcatalog/hive-hcatalog-streaming/3.1.2,下载后移动到flume/lib下即可。
使用Hive Sink 还有以下前提条件:
- 需要开启事务支持
- Hive表必须分区分桶
- Hive表必须是Acid表(开启事务支持)
进入beeline临时开启事务支持(也可以修改配置文件永久开启,但不建议)
1 2 3 4 5 6 7 |
SET hive.support.concurrency = true; SET hive.enforce.bucketing = true; SET hive.exec.dynamic.partition.mode = nonstrict; SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; SET hive.compactor.initiator.on = true; SET hive.compactor.worker.threads = 1; |
创建Hive表如下:
1 2 3 4 5 6 7 8 9 10 11 12 |
create database logsdb; use logsdb; create table weblogs ( id int , msg string ) partitioned by (continent string, contry string) clustered by (id) into 5 buckets stored as orc tblproperties( 'transactional'='true', 'transactional_properties'='default' ); |
Flume Agnet 配置范例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
$ vi conf/hello-hive.conf #声明三种组件 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #定义source信息 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 #定义sink信息 a1.sinks.k1.type = hive a1.sinks.k1.hive.metastore = thrift://hadoop100:9083 a1.sinks.k1.hive.database = logsdb a1.sinks.k1.hive.table = weblogs a1.sinks.k1.hive.partition = asia,India a1.sinks.k1.useLocalTimeStamp = false a1.sinks.k1.batchSize = 50 a1.sinks.k1.round = true a1.sinks.k1.roundValue = 10 a1.sinks.k1.roundUnit = minute a1.sinks.k1.serializer = DELIMITED a1.sinks.k1.serializer.delimiter = "\t" a1.sinks.k1.serializer.serdeSeparator = '\t' a1.sinks.k1.serializer.fieldnames =id,msg #定义channel信息 a1.channels.c1.type=memory #绑定在一起 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 |
启动Flume Agent
1 |
flume]$ flume-ng agent -c conf/ -f conf/hello-hive.conf -n a1 -Dflume.root.logger=INFO,console |
使用nc客户端发送数据
1 2 3 4 5 |
[hadoop@hadoop100 ~]$ nc localhost 8888 1001 hello OK 1002 world OK |
查看hive表中是否有数据出现
1 2 3 4 5 6 7 8 9 |
0: jdbc:hive2://localhost:10000> select * from weblogs; +-------------+--------------+--------------------+-----------------+ | weblogs.id | weblogs.msg | weblogs.continent | weblogs.contry | +-------------+--------------+--------------------+-----------------+ | 1002 | world | asia | India | | 1001 | hello | asia | India | +-------------+--------------+--------------------+-----------------+ 2 rows selected (5.394 seconds) |
如果对于行级更新删除需求比较频繁的,可以考虑使用事务表,但平常的hive表并不建议使用事务表。因为事务表的限制很多,加上由于hive表的特性,也很难满足高并发的场景。另外,如果事务表太多,并且存在大量的更新操作,metastore后台启动的合并线程会定期的提交MapReduce Job,也会一定程度上增重集群的负担。
Logger Sink
使用INFO级别把Event内容输出到日志中,一般用来测试、调试使用。这个 Sink 是唯一一个不需要额外配置就能把 Event 的原始内容输出的Sink,参照 输出原始数据到日志 。
提示
在 输出原始数据到日志 一节中说过,通常在Flume的运行日志里面输出数据流中的原始的数据内容是非常不可取的,所以 Flume 的组件默认都不会这么做。但是总有特殊的情况想要把 Event 内容打印出来,就可以借助这个Logger Sink了。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 绑定的 channel |
type | – | 组件类型,这个是: logger |
maxBytesToLog | 16 | Event body 输出到日志的最大字节数,超出的部分会被丢弃 |
配置范例:
1 2 3 4 |
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
Avro Sink
这个Sink可以作为 Flume 分层收集特性的下半部分。发送到此Sink的 Event 将转换为Avro Event发送到指定的主机/端口上。Event 从 channel 中批量获取,数量根据配置的 batch-size 而定。 必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 绑定的 channel |
type | – | 组件类型,这个是: avro . |
hostname | – | 监听的服务器名(hostname)或者 IP |
port | – | 监听的端口 |
batch-size | 100 | 每次批量发送的 Event 数 |
connect-timeout | 20000 | 第一次连接请求(握手)的超时时间,单位:毫秒 |
request-timeout | 20000 | 请求超时时间,单位:毫秒 |
reset-connection-interval | none | 重置连接到下一跳之前的时间量(秒)。 这将强制 Avro Sink 重新连接到下一跳。 这将允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。 |
compression-type | none | 压缩类型。可选值: none 、 deflate 。压缩类型必须与上一级Avro Source 配置的一致 |
compression-level | 6 | Event的压缩级别 0:不压缩,1-9:进行压缩,数字越大,压缩率越高 |
ssl | false | 设置为 true 表示开启SSL 下面的 truststore 、 truststore-password 、 truststore-type 就是开启SSL后使用的参数,并且可以指定是否信任所有证书( trust-all-certs ) |
trust-all-certs | false | 如果设置为true, 不会检查远程服务器(Avro Source)的SSL服务器证书。不要在生产环境开启这个配置,因为它使攻击者更容易执行中间人攻击并在加密的连接上进行“监听”。 |
truststore | – | 自定义 Java truststore文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。 如果未指定,将使用全局的keystore配置,如果全局的keystore也未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为 Oracle JRE 中的“jssecacerts”或“cacerts”)。 |
truststore-password | – | 上面配置的truststore的密码,如果未配置,将使用全局的truststore配置(如果配置了的话) |
truststore-type | JKS | Java truststore的类型。可以配成 JKS 或者其他支持的 Java truststore类型,如果未配置,将使用全局的SSL配置(如果配置了的话) |
exclude-protocols | SSLv3 | 要排除的以空格分隔的 SSL/TLS 协议列表。 SSLv3 协议不管是否配置都会被排除掉。 |
maxIoWorkers | 2 * 机器上可用的处理器核心数量 | I/O工作线程的最大数量。这个是在 NettyAvroRpcClient 的 NioClientSocketChannelFactory 上配置的。 |
配置范例:
1 2 3 4 5 6 |
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545 |
利用AvroSource和AvroSink实现跃点Agent
配置范例 – Agent #a1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
#a1 a1.sources = r1 a1.sinks= k1 a1.channels = c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 a1.sinks.k1.type = avro a1.sinks.k1.hostname=localhost a1.sinks.k1.port=9999 a1.channels.c1.type=memory a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
配置范例 – Agent #a2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#a2 a2.sources = r2 a2.sinks= k2 a2.channels = c2 a2.sources.r2.type=avro a2.sources.r2.bind=localhost a2.sources.r2.port=9999 a2.sinks.k2.type = logger a2.channels.c2.type=memory a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 |
启动Agent #a2
1 |
$ flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a2 -Dflume.root.logger=INFO,console |
验证Agent #a2
1 |
$ netstat -anop | grep 9999 |
启动Agent #a1
1 |
$ flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a1 |
验证Agent #a1
1 |
$ netstat -anop | grep 8888 |
HBase2Sink
提示
这是Flume 1.9新增的Sink。
HBase2Sink 是HBaseSink的HBase 2版本。
所提供的功能和配置参数与HBaseSink相同
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 绑定的 channel |
type | – | 组件类型,这个是: hbase2 |
table | – | 要写入的 Hbase 表名 |
columnFamily | – | 要写入的 Hbase 列族 |
zookeeperQuorum | – | Zookeeper 节点(host:port格式,多个用逗号分隔),hbase-site.xml 中属性 hbase.zookeeper.quorum 的值 |
znodeParent | /hbase | ZooKeeper 中 HBase 的 Root ZNode 路径,hbase-site.xml 中 zookeeper.znode.parent 的值 |
batchSize | 100 | 每个事务写入的Event数量 |
coalesceIncrements | false | 每次提交时,Sink是否合并多个 increment 到一个cell。如果有限数量的 cell 有多个 increment ,这样可能会提供更好的性能 |
serializer | org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer | 默认的列 increment column = “iCol”, payload column = “pCol” |
serializer.* | – | 序列化器的一些属性 |
kerberosPrincipal | – | 以安全方式访问 HBase 的 Kerberos 用户主体 |
kerberosKeytab | – | 以安全方式访问 HBase 的 Kerberos keytab 文件目录 |
配置范例:
1 2 3 4 5 6 7 |
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hbase2 a1.sinks.k1.table = foo_table a1.sinks.k1.columnFamily = bar_cf a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer a1.sinks.k1.channel = c1 |
Kafka Sink
这个 Sink 可以把数据发送到 Kafka topic上。目的就是将 Flume 与 Kafka 集成,以便基于拉的处理系统可以处理来自各种 Flume Source 的数据。
目前支持Kafka 0.10.1.0以上版本,最高已经在Kafka 2.0.1版本上完成了测试,这已经是Flume 1.9发行时候的最高的Kafka版本了。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers | – | Kafka Sink 使用的 Kafka 集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用(HA)支持。格式为 hostname:port,多个用逗号分隔 |
kafka.topic | default-flume-topic | 用于发布消息的 Kafka topic 名称 。如果这个参数配置了值,消息就会被发布到这个 topic 上。如果Event header中包含叫做“topic”的属性, Event 就会被发布到 header 中指定的 topic 上,而不会发布到 kafka.topic 指定的 topic 上。支持任意的 header 属性动态替换, 比如%{lyf}就会被 Event header 中叫做“lyf”的属性值替换(如果使用了这种动态替换,建议将 Kafka 的 auto.create.topics.enable 属性设置为 true )。 |
flumeBatchSize | 100 | 一批中要处理的消息数。设置较大的值可以提高吞吐量,但是会增加延迟。 |
kafka.producer.acks | 1 | 在考虑成功写入之前,要有多少个副本必须确认消息。可选值, 0 :(从不等待确认); 1 :只等待leader确认; -1 :等待所有副本确认。 设置为-1可以避免某些情况 leader 实例失败的情况下丢失数据。 |
useFlumeEventFormat | false | 默认情况下,会直接将 Event body 的字节数组作为消息内容直接发送到 Kafka topic 。如果设置为true,会以 Flume Avro 二进制格式进行读取。 与 Kafka Source 上的同名参数或者 Kafka channel 的 parseAsFlumeEvent 参数相关联,这样以对象的形式处理能使生成端发送过来的 Event header 信息得以保留。 |
defaultPartitionId | – | 指定所有 Event 将要发送到的 Kafka 分区ID,除非被 partitionIdHeader 参数的配置覆盖。 默认情况下,如果没有设置此参数,Event 会被 Kafka 生产者的分发程序分发,包括 key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发 |
partitionIdHeader | – | 设置后,Sink将使用 Event header 中使用此属性的值命名的字段的值,并将消息发送到 topic 的指定分区。 如果该值表示无效分区,则将抛出 EventDeliveryException。 如果存在标头值,则此设置将覆盖 defaultPartitionId 。假如这个参数设置为“lyf”,这个 Sink 就会读取 Event header 中的 lyf 属性的值,用该值作为分区ID |
allowTopicOverride | true | 如果设置为 true ,会读取 Event header 中的名为 topicHeader 的的属性值,用它作为目标 topic。 |
topicHeader | topic | 与上面的 allowTopicOverride 一起使用,allowTopicOverride 会用当前参数配置的名字从 Event header 获取该属性的值,来作为目标 topic 名称 |
kafka.producer.security.protocol | PLAINTEXT | 设置使用哪种安全协议写入 Kafka。可选值:SASL_PLAINTEXT 、 SASL_SSL 和 SSL , 有关安全设置的其他信息,请参见下文。 |
more producer security props | 如果使用了 SASL_PLAINTEXT 、 SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为生产者增加安全相关的参数配置 |
|
Other Kafka Producer Properties | – | 其他一些 Kafka 生产者配置参数。任何 Kafka 支持的生产者参数都可以使用。唯一的要求是使用“kafka.producer.”这个前缀来配置参数,比如:kafka.producer.linger.ms |
注解
Kafka Sink使用 Event header 中的 topic 和其他关键属性将 Event 发送到 Kafka。 如果 header 中存在 topic,则会将Event发送到该特定 topic,从而覆盖为Sink配置的 topic。 如果 header 中存在指定分区相关的参数,则Kafka将使用相关参数发送到指定分区。 header中特定参数相同的 Event 将被发送到同一分区。 如果为空,则将 Event 会被发送到随机分区。 Kafka Sink 还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值,不建议修改这些参数。
弃用的一些参数:
属性 | 默认值 | 解释 |
---|---|---|
brokerList | – | 改用 kafka.bootstrap.servers |
topic | default-flume-topic | 改用 kafka.topic |
batchSize | 100 | 改用 kafka.flumeBatchSize |
requiredAcks | 1 | 改用 kafka.producer.acks |
下面给出 Kafka Sink 的配置示例。Kafka 生产者的属性都是以 kafka.producer 为前缀。Kafka 生产者的属性不限于下面示例的几个。此外,可以在此处包含您的自定义属性,并通过作为方法参数传入的Flume Context对象在预处理器中访问它们。
1 2 3 4 5 6 7 8 |
a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy |
HTTP Sink
HTTP Sink 从 channel 中获取 Event,然后再向远程 HTTP 接口 POST 发送请求,Event 内容作为 POST 的正文发送。
错误处理取决于目标服务器返回的HTTP响应代码。 Sink的 退避 和 就绪 状态是可配置的,事务提交/回滚结果以及Event是否发送成功在内部指标计数器中也是可配置的。
状态代码不可读的服务器返回的任何格式错误的 HTTP 响应都将产生 退避 信号,并且不会从 channel 中消耗该Event。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 绑定的 channel |
type | – | 组件类型,这个是: http . |
endpoint | – | 将要 POST 提交数据接口的绝对地址 |
connectTimeout | 5000 | 连接超时(毫秒) |
requestTimeout | 5000 | 一次请求操作的最大超时时间(毫秒) |
contentTypeHeader | text/plain | HTTP请求的Content-Type请求头 |
acceptHeader | text/plain | HTTP请求的Accept 请求头 |
defaultBackoff | true | 是否默认启用退避机制,如果配置的 backoff.CODE 没有匹配到某个 http 状态码,默认就会使用这个参数值来决定是否退避 |
defaultRollback | true | 是否默认启用回滚机制,如果配置的 rollback.CODE 没有匹配到某个 http 状态码,默认会使用这个参数值来决定是否回滚 |
defaultIncrementMetrics | false | 是否默认进行统计计数,如果配置的 incrementMetrics.CODE 没有匹配到某个 http 状态码,默认会使用这个参数值来决定是否参与计数 |
backoff.CODE | – | 配置某个 http 状态码是否启用退避机制(支持200这种精确匹配和2XX一组状态码匹配模式) |
rollback.CODE | – | 配置某个 http 状态码是否启用回滚机制(支持200这种精确匹配和2XX一组状态码匹配模式) |
incrementMetrics.CODE | – | 配置某个 http 状态码是否参与计数(支持200这种精确匹配和2XX一组状态码匹配模式) |
注意 backoff, rollback 和 incrementMetrics 的 code 配置通常都是用具体的HTTP状态码,如果2xx和200这两种配置同时存在,则200的状态码会被精确匹配,其余200~299(除了200以外)之间的状态码会被2xx匹配。
提示
Flume里面好多组件都有这个退避机制,其实就是下一级目标没有按照预期执行的时候,会执行一个延迟操作。比如向HTTP接口提交数据发生了错误触发了退避机制生效,系统等待30秒再执行后续的提交操作, 如果再次发生错误则等待的时间会翻倍,直到达到系统设置的最大等待上限。通常在重试成功后退避就会被重置,下次遇到错误重新开始计算等待的时间。
任何空的或者为 null 的 Event 不会被提交到HTTP接口上。
配置范例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = http a1.sinks.k1.channel = c1 a1.sinks.k1.endpoint = http://localhost:8080/someuri a1.sinks.k1.connectTimeout = 2000 a1.sinks.k1.requestTimeout = 2000 a1.sinks.k1.acceptHeader = application/json a1.sinks.k1.contentTypeHeader = application/json a1.sinks.k1.defaultBackoff = true a1.sinks.k1.defaultRollback = true a1.sinks.k1.defaultIncrementMetrics = false a1.sinks.k1.backoff.4XX = false a1.sinks.k1.rollback.4XX = false a1.sinks.k1.incrementMetrics.4XX = true a1.sinks.k1.backoff.200 = false a1.sinks.k1.rollback.200 = false a1.sinks.k1.incrementMetrics.200 = true |
File Roll Sink
把 Event 存储到本地文件系统。 必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 绑定的 channel |
type | – | 组件类型,这个是: file_roll . |
sink.directory | – | Event 将要保存的目录 |
sink.pathManager | DEFAULT | 配置使用哪个路径管理器,这个管理器的作用是按照规则生成新的存储文件名称,可选值有: default 、 rolltime 。default规则:prefix+当前毫秒值+“-”+文件序号+“.”+extension;rolltime规则:prefix+yyyyMMddHHmmss+“-”+文件序号+“.”+extension;注:prefix 和 extension 如果没有配置则不会附带 |
sink.pathManager.extension | – | 如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的扩展名 |
sink.pathManager.prefix | – | 如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的文件名的固定前缀 |
sink.rollInterval | 30 | 表示每隔30秒创建一个新文件进行存储。如果设置为0,表示所有 Event 都会写到一个文件中。 |
sink.serializer | TEXT | 配置 Event 序列化器,可选值有:text 、 header_and_text 、 avro_event 或者自定义实现了 EventSerializer.Builder 接口的序列化器的全限定类名.。 text 只会把 Event 的 body 的文本内容序列化; header_and_text 会把 header 和 body 内容都序列化。 |
sink.batchSize | 100 | 每次事务批处理的 Event 数 |
配置范例:
1 2 3 4 5 |
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume |
Null Sink
丢弃所有从 channel 读取到的 Event。 必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 绑定的 channel |
type | – | 组件类型,这个是: null . |
batchSize | 100 | 每次批处理的 Event 数量 |
配置范例:
1 2 3 4 |
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = null a1.sinks.k1.channel = c1 |
Custom Sink
你可以自己写一个 Sink 接口的实现类。启动 Flume 时候必须把你自定义 Sink 所依赖的其他类配置进 classpath 内。custom source 在写配置文件的 type 时候填你的全限定类名。 必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 绑定的 channe |
type | – | 组件类型,这个填你自定义class的全限定类名 |
配置范例:
1 2 3 4 |
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = org.example.MySink a1.sinks.k1.channel = c1 |
Views: 140