Flume Sinks

Flume Sinks 类型有很多,这里只挑出一些我们常用的Sink.

  1. HDFS Sink
  2. Hive Sink
  3. Logger Sink
  4. Avro Sink
  5. HBase Sinks
  6. Kafka Sink
  7. HTTP Sink
  8. File Roll Sink
  9. NULL sink
  10. Custom SInk

HDFS Sink

这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。

使用此Sink需要安装hadoop, 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。

注意,%[localhost], %[IP] 和 %[FQDN]这三个转义符实际上都是用java的API来获取的,在一些网络环境下可能会获取失败。

正在打开的文件会在名称末尾加上“.tmp”的后缀。文件关闭后,会自动删除此扩展名。这样容易排除目录中的那些已完成的文件。 必需的参数已用 粗体 标明。

属性名默认值解释
channel与 Sink 连接的 channel
type组件类型,这个是: hdfs
hdfs.pathHDFS目录路径(例如:hdfs://namenode/flume/webdata/)
hdfs.filePrefixFlumeDataFlume在HDFS文件夹下创建新文件的固定前缀
hdfs.fileSuffixFlume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)
hdfs.inUsePrefixFlume正在写入的临时文件前缀,默认没有
hdfs.inUseSuffix.tmpFlume正在写入的临时文件后缀
hdfs.emptyInUseSuffixfalse如果设置为 false 上面的 hdfs.inUseSuffix 参数在写入文件时会生效,并且写入完成后会在目标文件上移除 hdfs.inUseSuffix 配置的后缀。如果设置为 true 则上面的 hdfs.inUseSuffix 参数会被忽略,写文件时不会带任何后缀
hdfs.rollInterval30当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize1024当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount10当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
hdfs.idleTimeout0关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒
hdfs.batchSize100向 HDFS 写入内容时每次批量操作的 Event 数量
hdfs.codeC压缩算法。可选值:gzip 、 bzip2 、 lzo 、 lzop 、 `snappy
hdfs.fileTypeSequenceFile文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数
hdfs.maxOpenFiles5000允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭
hdfs.minBlockReplicas指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。
hdfs.writeFormatWritable文件写入格式。可选值: Text 、 Writable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。
hdfs.threadsPoolSize10每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)
hdfs.rollTimerPoolSize1每个HDFS Sink实例调度定时文件滚动的线程数
hdfs.kerberosPrincipal用于安全访问 HDFS 的 Kerberos 用户主体
hdfs.kerberosKeytab用于安全访问 HDFS 的 Kerberos keytab 文件
hdfs.proxyUser 代理名
hdfs.roundfalse是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)
hdfs.roundValue1向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30
hdfs.roundUnitsecond向下舍入的单位,可选值: second 、 minute 、 hour
hdfs.timeZoneLocal Time解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai
hdfs.useLocalTimeStampfalse使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳)
hdfs.closeTries0开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;如果设置为0,Sink会一直尝试重命名文件直到成功为止;关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。
hdfs.retryInterval180连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。
serializerTEXTEvent 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。
serializer.* 根据上面 serializer 配置的类型来根据需要添加序列化器的参数

废弃的一些参数:

属性名默认值解释
hdfs.callTimeout10000允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒)
配置范例:

上面的例子中时间戳会向前一个整10分钟取整。比如,一个 Event 的 header 中带的时间戳是11:54:34 AM, June 12, 2012,它会保存的 HDFS 路径就是/flume/events/2012-06-12/1150/00。

Hive Sink

此Sink将包含分隔文本或JSON数据的 Event 直接流式传输到 Hive表或分区上。 Event 使用 Hive事务进行写入, 一旦将一组 Event 提交给Hive,它们就会立即显示给Hive查询。 即将写入的目标分区既可以预先自己创建,也可以选择让 Flume 创建它们,如果没有的话。 写入的 Event 数据中的字段将映射到 Hive表中的相应列。

属性默认值解释
channel与 Sink 连接的 channel
type组件类型,这个是: hive
hive.metastoreHive metastore URI (eg thrift://a.b.com:9083 )
hive.databaseHive 数据库名
hive.tableHive表名
hive.partition逗号分隔的要写入的分区信息。 比如hive表的分区是(continent: string, country: string, time : string), 那么“Asia,India,2014-02-26-01-21”就表示数据会写入到continent=Asia,country=India,time=2014-02-26-01-21这个分区。
hive.txnsPerBatchAsk100Hive从Flume等客户端接收数据流会使用多次事务来操作,而不是只开启一个事务。这个参数指定处理每次请求所开启的事务数量。来自同一个批次中所有事务中的数据最终都在一个文件中。 Flume会向每个事务中写入 batchSize 个 Event,这个参数和 batchSize 一起控制着每个文件的大小,请注意,Hive最终会将这些文件压缩成一个更大的文件。
heartBeatInterval240发送到 Hive 的连续心跳检测间隔(秒),以防止未使用的事务过期。设置为0表示禁用心跳。
autoCreatePartitionstrueFlume 会自动创建必要的 Hive分区以进行流式传输
batchSize15000写入一个 Hive事务中最大的 Event 数量
maxOpenConnections500允许打开的最大连接数。如果超过此数量,则关闭最近最少使用的连接。
callTimeout10000Hive、HDFS I/O操作的超时时间(毫秒),比如:开启事务、写数据、提交事务、取消事务。
serializer 序列化器负责解析 Event 中的字段并把它们映射到 Hive表中的列,选择哪种序列化器取决于 Event 中的数据格式,支持的序列化器有:DELIMITED 和 JSON
roundfalse是否启用时间戳舍入机制
roundUnitminute舍入值的单位,可选值:second 、 minute 、 hour
roundValue1舍入到小于当前时间的最高倍数(使用 roundUnit 配置的单位) 例子1:roundUnit=second,roundValue=10,则14:31:18这个时间戳会被舍入到14:31:10; 例子2:roundUnit=second,roundValue=30,则14:31:18这个时间戳会被舍入到14:31:00,14:31:42这个时间戳会被舍入到14:31:30;
timeZoneLocal Time应用于解析分区中转义序列的时区名称,比如:America/Los_Angeles、Asia/Shanghai、Asia/Tokyo等
useLocalTimeStampfalse替换转义序列时是否使用本地时间戳(否则使用Event header中的timestamp )
下面介绍Hive Sink的两个序列化器:JSON :处理UTF8编码的 Json 格式(严格语法)Event,不需要配置。 JSON中的对象名称直接映射到Hive表中具有相同名称的列。 内部使用 org.apache.hive.hcatalog.data.JsonSerDe ,但独立于 Hive表的 Serde 。 此序列化程序需要安装 HCatalog。DELIMITED: 处理简单的分隔文本 Event。 内部使用 LazySimpleSerde,但独立于 Hive表的 Serde。

属性默认值解释
serializer.delimiter,(类型:字符串)传入数据中的字段分隔符。 要使用特殊字符,请用双引号括起来,例如“\t”
serializer.fieldnames从输入字段到Hive表中的列的映射。 指定为Hive表列名称的逗号分隔列表(无空格),按顺序标识输入字段。 要跳过字段,请保留未指定的列名称。 例如, ‘time,,ip,message’表示输入映射到hive表中的 time,ip 和 message 列的第1,第3和第4个字段。
serializer.serdeSeparatorCtrl-A(类型:字符)自定义底层序列化器的分隔符。如果 serializer.fieldnames 中的字段与 Hive表列的顺序相同,则 serializer.delimiter 与 serializer.serdeSeparator 相同, 并且 serializer.fieldnames 中的字段数小于或等于表的字段数量,可以提高效率,因为传入 Event 正文中的字段不需要重新排序以匹配 Hive表列的顺序。 对于’\t’这样的特殊字符使用单引号,要确保输入字段不包含此字符。 注意:如果 serializer.delimiter 是单个字符,最好将本参数也设置为相同的字符。

为了避免出现找不到类的异常,首先需添加依赖的jar包:

  1. 将hive/lib下面hive-hcatalog-core-3.1.2.jar拷贝或者软链接到flume/lib下
    1. $ cp /opt/pkg/hive/lib/hive-hcatalog-core-3.1.2.jar /opt/pkg/flume/lib/
  2. 接下来我们还需要这个依赖:hive-hcatalog-streaming-3.1.2.jar。这个hive目录是找不到的,需要单独从mvnrepository网站搜索下载,下载地址:https://mvnrepository.com/artifact/org.apache.hive.hcatalog/hive-hcatalog-streaming/3.1.2,下载后移动到flume/lib下即可。

使用Hive Sink 还有以下前提条件:

  • 需要开启事务支持
  • Hive表必须分区分桶
  • Hive表必须是Acid表(开启事务支持)

进入beeline临时开启事务支持(也可以修改配置文件永久开启,但不建议)

创建Hive表如下:

Flume Agnet 配置范例:

启动Flume Agent

使用nc客户端发送数据

查看hive表中是否有数据出现

如果对于行级更新删除需求比较频繁的,可以考虑使用事务表,但平常的hive表并不建议使用事务表。因为事务表的限制很多,加上由于hive表的特性,也很难满足高并发的场景。另外,如果事务表太多,并且存在大量的更新操作,metastore后台启动的合并线程会定期的提交MapReduce Job,也会一定程度上增重集群的负担。

Logger Sink

使用INFO级别把Event内容输出到日志中,一般用来测试、调试使用。这个 Sink 是唯一一个不需要额外配置就能把 Event 的原始内容输出的Sink,参照 输出原始数据到日志 。

提示

在 输出原始数据到日志 一节中说过,通常在Flume的运行日志里面输出数据流中的原始的数据内容是非常不可取的,所以 Flume 的组件默认都不会这么做。但是总有特殊的情况想要把 Event 内容打印出来,就可以借助这个Logger Sink了。

必需的参数已用 粗体 标明。

属性默认值解释
channel与 Sink 绑定的 channel
type组件类型,这个是: logger
maxBytesToLog16Event body 输出到日志的最大字节数,超出的部分会被丢弃

配置范例:

Avro Sink

这个Sink可以作为 Flume 分层收集特性的下半部分。发送到此Sink的 Event 将转换为Avro Event发送到指定的主机/端口上。Event 从 channel 中批量获取,数量根据配置的 batch-size 而定。 必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: avro.
hostname 监听的服务器名(hostname)或者 IP
port 监听的端口
batch-size 100 每次批量发送的 Event 数
connect-timeout 20000 第一次连接请求(握手)的超时时间,单位:毫秒
request-timeout 20000 请求超时时间,单位:毫秒
reset-connection-interval none 重置连接到下一跳之前的时间量(秒)。 这将强制 Avro Sink 重新连接到下一跳。 这将允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。
compression-type none 压缩类型。可选值: none 、 deflate 。压缩类型必须与上一级Avro Source 配置的一致
compression-level 6 Event的压缩级别 0:不压缩,1-9:进行压缩,数字越大,压缩率越高
ssl false 设置为 true 表示开启SSL 下面的 truststore 、 truststore-password 、 truststore-type 就是开启SSL后使用的参数,并且可以指定是否信任所有证书( trust-all-certs )
trust-all-certs false 如果设置为true, 不会检查远程服务器(Avro Source)的SSL服务器证书。不要在生产环境开启这个配置,因为它使攻击者更容易执行中间人攻击并在加密的连接上进行“监听”。
truststore 自定义 Java truststore文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。 如果未指定,将使用全局的keystore配置,如果全局的keystore也未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为 Oracle JRE 中的“jssecacerts”或“cacerts”)。
truststore-password 上面配置的truststore的密码,如果未配置,将使用全局的truststore配置(如果配置了的话)
truststore-type JKS Java truststore的类型。可以配成 JKS 或者其他支持的 Java truststore类型,如果未配置,将使用全局的SSL配置(如果配置了的话)
exclude-protocols SSLv3 要排除的以空格分隔的 SSL/TLS 协议列表。 SSLv3 协议不管是否配置都会被排除掉。
maxIoWorkers 2 * 机器上可用的处理器核心数量 I/O工作线程的最大数量。这个是在 NettyAvroRpcClient 的 NioClientSocketChannelFactory 上配置的。

配置范例:

利用AvroSource和AvroSink实现跃点Agent

配置范例 – Agent #a1:

配置范例 – Agent #a2:

启动Agent #a2

验证Agent #a2

启动Agent #a1

验证Agent #a1

HBase2Sink

提示

这是Flume 1.9新增的Sink。

HBase2Sink 是HBaseSink的HBase 2版本。

所提供的功能和配置参数与HBaseSink相同

必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: hbase2
table 要写入的 Hbase 表名
columnFamily 要写入的 Hbase 列族
zookeeperQuorum Zookeeper 节点(host:port格式,多个用逗号分隔),hbase-site.xml 中属性 hbase.zookeeper.quorum 的值
znodeParent /hbase ZooKeeper 中 HBase 的 Root ZNode 路径,hbase-site.xml 中 zookeeper.znode.parent 的值
batchSize 100 每个事务写入的Event数量
coalesceIncrements false 每次提交时,Sink是否合并多个 increment 到一个cell。如果有限数量的 cell 有多个 increment ,这样可能会提供更好的性能
serializer org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer 默认的列 increment column = “iCol”, payload column = “pCol”
serializer.* 序列化器的一些属性
kerberosPrincipal 以安全方式访问 HBase 的 Kerberos 用户主体
kerberosKeytab 以安全方式访问 HBase 的 Kerberos keytab 文件目录

配置范例:

Kafka Sink

这个 Sink 可以把数据发送到 Kafka topic上。目的就是将 Flume 与 Kafka 集成,以便基于拉的处理系统可以处理来自各种 Flume Source 的数据。

目前支持Kafka 0.10.1.0以上版本,最高已经在Kafka 2.0.1版本上完成了测试,这已经是Flume 1.9发行时候的最高的Kafka版本了。

必需的参数已用 粗体 标明。

属性 默认值 解释
type 组件类型,这个是: org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers Kafka Sink 使用的 Kafka 集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用(HA)支持。格式为 hostname:port,多个用逗号分隔
kafka.topic default-flume-topic 用于发布消息的 Kafka topic 名称 。如果这个参数配置了值,消息就会被发布到这个 topic 上。如果Event header中包含叫做“topic”的属性, Event 就会被发布到 header 中指定的 topic 上,而不会发布到 kafka.topic 指定的 topic 上。支持任意的 header 属性动态替换, 比如%{lyf}就会被 Event header 中叫做“lyf”的属性值替换(如果使用了这种动态替换,建议将 Kafka 的 auto.create.topics.enable 属性设置为 true )。
flumeBatchSize 100 一批中要处理的消息数。设置较大的值可以提高吞吐量,但是会增加延迟。
kafka.producer.acks 1 在考虑成功写入之前,要有多少个副本必须确认消息。可选值, 0 :(从不等待确认); 1 :只等待leader确认; -1 :等待所有副本确认。 设置为-1可以避免某些情况 leader 实例失败的情况下丢失数据。
useFlumeEventFormat false 默认情况下,会直接将 Event body 的字节数组作为消息内容直接发送到 Kafka topic 。如果设置为true,会以 Flume Avro 二进制格式进行读取。 与 Kafka Source 上的同名参数或者 Kafka channel 的 parseAsFlumeEvent 参数相关联,这样以对象的形式处理能使生成端发送过来的 Event header 信息得以保留。
defaultPartitionId 指定所有 Event 将要发送到的 Kafka 分区ID,除非被 partitionIdHeader 参数的配置覆盖。 默认情况下,如果没有设置此参数,Event 会被 Kafka 生产者的分发程序分发,包括 key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发
partitionIdHeader 设置后,Sink将使用 Event header 中使用此属性的值命名的字段的值,并将消息发送到 topic 的指定分区。 如果该值表示无效分区,则将抛出 EventDeliveryException。 如果存在标头值,则此设置将覆盖 defaultPartitionId 。假如这个参数设置为“lyf”,这个 Sink 就会读取 Event header 中的 lyf 属性的值,用该值作为分区ID
allowTopicOverride true 如果设置为 true,会读取 Event header 中的名为 topicHeader 的的属性值,用它作为目标 topic。
topicHeader topic 与上面的 allowTopicOverride 一起使用,allowTopicOverride 会用当前参数配置的名字从 Event header 获取该属性的值,来作为目标 topic 名称
kafka.producer.security.protocol PLAINTEXT 设置使用哪种安全协议写入 Kafka。可选值:SASL_PLAINTEXT 、 SASL_SSL 和 SSL, 有关安全设置的其他信息,请参见下文。
more producer security props   如果使用了 SASL_PLAINTEXT 、 SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为生产者增加安全相关的参数配置
Other Kafka Producer Properties 其他一些 Kafka 生产者配置参数。任何 Kafka 支持的生产者参数都可以使用。唯一的要求是使用“kafka.producer.”这个前缀来配置参数,比如:kafka.producer.linger.ms

注解

Kafka Sink使用 Event header 中的 topic 和其他关键属性将 Event 发送到 Kafka。 如果 header 中存在 topic,则会将Event发送到该特定 topic,从而覆盖为Sink配置的 topic。 如果 header 中存在指定分区相关的参数,则Kafka将使用相关参数发送到指定分区。 header中特定参数相同的 Event 将被发送到同一分区。 如果为空,则将 Event 会被发送到随机分区。 Kafka Sink 还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值,不建议修改这些参数。

弃用的一些参数:

属性 默认值 解释
brokerList 改用 kafka.bootstrap.servers
topic default-flume-topic 改用 kafka.topic
batchSize 100 改用 kafka.flumeBatchSize
requiredAcks 1 改用 kafka.producer.acks

下面给出 Kafka Sink 的配置示例。Kafka 生产者的属性都是以 kafka.producer 为前缀。Kafka 生产者的属性不限于下面示例的几个。此外,可以在此处包含您的自定义属性,并通过作为方法参数传入的Flume Context对象在预处理器中访问它们。

HTTP Sink

HTTP Sink 从 channel 中获取 Event,然后再向远程 HTTP 接口 POST 发送请求,Event 内容作为 POST 的正文发送。

错误处理取决于目标服务器返回的HTTP响应代码。 Sink的 退避 和 就绪 状态是可配置的,事务提交/回滚结果以及Event是否发送成功在内部指标计数器中也是可配置的。

状态代码不可读的服务器返回的任何格式错误的 HTTP 响应都将产生 退避 信号,并且不会从 channel 中消耗该Event。

必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: http.
endpoint 将要 POST 提交数据接口的绝对地址
connectTimeout 5000 连接超时(毫秒)
requestTimeout 5000 一次请求操作的最大超时时间(毫秒)
contentTypeHeader text/plain HTTP请求的Content-Type请求头
acceptHeader text/plain HTTP请求的Accept 请求头
defaultBackoff true 是否默认启用退避机制,如果配置的 backoff.CODE 没有匹配到某个 http 状态码,默认就会使用这个参数值来决定是否退避
defaultRollback true 是否默认启用回滚机制,如果配置的 rollback.CODE 没有匹配到某个 http 状态码,默认会使用这个参数值来决定是否回滚
defaultIncrementMetrics false 是否默认进行统计计数,如果配置的 incrementMetrics.CODE 没有匹配到某个 http 状态码,默认会使用这个参数值来决定是否参与计数
backoff.CODE 配置某个 http 状态码是否启用退避机制(支持200这种精确匹配和2XX一组状态码匹配模式)
rollback.CODE 配置某个 http 状态码是否启用回滚机制(支持200这种精确匹配和2XX一组状态码匹配模式)
incrementMetrics.CODE 配置某个 http 状态码是否参与计数(支持200这种精确匹配和2XX一组状态码匹配模式)

注意 backoff, rollback 和 incrementMetrics 的 code 配置通常都是用具体的HTTP状态码,如果2xx和200这两种配置同时存在,则200的状态码会被精确匹配,其余200~299(除了200以外)之间的状态码会被2xx匹配。

提示

Flume里面好多组件都有这个退避机制,其实就是下一级目标没有按照预期执行的时候,会执行一个延迟操作。比如向HTTP接口提交数据发生了错误触发了退避机制生效,系统等待30秒再执行后续的提交操作, 如果再次发生错误则等待的时间会翻倍,直到达到系统设置的最大等待上限。通常在重试成功后退避就会被重置,下次遇到错误重新开始计算等待的时间。

任何空的或者为 null 的 Event 不会被提交到HTTP接口上。

配置范例:

File Roll Sink

把 Event 存储到本地文件系统。 必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: file_roll.
sink.directory Event 将要保存的目录
sink.pathManager DEFAULT 配置使用哪个路径管理器,这个管理器的作用是按照规则生成新的存储文件名称,可选值有: default 、 rolltime。default规则:prefix+当前毫秒值+“-”+文件序号+“.”+extension;rolltime规则:prefix+yyyyMMddHHmmss+“-”+文件序号+“.”+extension;注:prefix 和 extension 如果没有配置则不会附带
sink.pathManager.extension 如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的扩展名
sink.pathManager.prefix 如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的文件名的固定前缀
sink.rollInterval 30 表示每隔30秒创建一个新文件进行存储。如果设置为0,表示所有 Event 都会写到一个文件中。
sink.serializer TEXT 配置 Event 序列化器,可选值有:text 、 header_and_text 、 avro_event 或者自定义实现了 EventSerializer.Builder 接口的序列化器的全限定类名.。 text 只会把 Event 的 body 的文本内容序列化; header_and_text 会把 header 和 body 内容都序列化。
sink.batchSize 100 每次事务批处理的 Event 数

配置范例:

Null Sink

丢弃所有从 channel 读取到的 Event。 必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: null.
batchSize 100 每次批处理的 Event 数量

配置范例:

Custom Sink

你可以自己写一个 Sink 接口的实现类。启动 Flume 时候必须把你自定义 Sink 所依赖的其他类配置进 classpath 内。custom source 在写配置文件的 type 时候填你的全限定类名。 必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channe
type 组件类型,这个填你自定义class的全限定类名

配置范例:

Views: 140