Kafka集群部署的讨论

只有单台机器构成的 Kafka 伪集群只能用于日常测试之用,根本无法满足实际的线上生产需求。而真正的线上环境需要仔细地考量各种因素,结合自身的业务需求而制定。下面我就分别从操作系统、磁盘、磁盘容量和带宽等方面来讨论一下。

操作系统

首先我们先看看要把 Kafka 安装到什么操作系统上。

目前常见的操作系统有 3 种:

  1. Linux
  2. Windows
  3. macOS。

如果考虑操作系统与 Kafka 的适配性,Linux 系统显然要比其他两个特别是 Windows 系统更加适合部署 Kafka。主要是在下面这三个方面上,Linux 的表现更胜一筹。

  • I/O 模型的使用
    • Kafka 客户端底层使用了 Java 的 selector,selector 在 Linux 上的I/O模型的实现机制和Windows是不同的。部署在 Linux 上比起部署在Windows上能够获得更高效的 I/O 性能。
  • 数据网络传输效率
    • 在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。
  • 社区支持度
    • 相比Windows系统,社区会优先解决Linux系统上发现的Bug。

总结:Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证,千万不要应用于生产环境。

磁盘

磁盘对 Kafka 性能的影响最重要。在对 Kafka 集群进行磁盘规划时经常面对的问题是,我应该选择普通的机械磁盘还是固态硬盘?前者成本低且容量大,但易损坏;后者性能优势大,不过单价高。

Kafka 大量使用磁盘不假,可它使用的方式多是顺序读写操作,一定程度上规避了机械磁盘最大的劣势,即随机读写操作慢。从这一点上来说,使用 SSD 似乎并没有太大的性能优势,毕竟从性价比上来说,机械磁盘物美价廉,而它因易损坏而造成的可靠性差等缺陷,又由 Kafka 在软件层面提供机制来保证,故使用普通机械磁盘是很划算的。

关于磁盘选择另一个经常讨论的话题就是到底是否应该使用磁盘阵列(RAID)。使用 RAID 的两个主要优势在于:

  • 提供冗余的磁盘存储空间
  • 提供负载均衡

不过就 Kafka 而言,一方面 Kafka 自己实现了冗余机制来提供高可靠性;另一方面通过分区的概念,Kafka 也能在软件层面自行实现负载均衡。

综合以上的考量,建议:

  • 追求性价比的公司可以不搭建 RAID,使用普通磁盘组成存储空间即可。

  • 使用机械磁盘完全能够胜任 Kafka 线上环境。

磁盘容量

Kafka 集群到底需要多大的存储空间?Kafka 需要将消息保存在底层的磁盘上,这些消息默认会被保存一段时间然后自动被删除。虽然这段时间是可以配置的,但你应该如何结合自身业务场景和存储需求来规划 Kafka 集群的存储容量呢?

我举一个简的例子来说明该如何思考这个问题。假设你所在公司有个业务每天需要向 Kafka 集群发送 1 亿条消息,每条消息保存两份以防止数据丢失,另外消息默认保存两周时间。现在假设消息的平均大小是 1KB,那么你能说出你的 Kafka 集群需要为这个业务预留多少磁盘空间吗?

我们来计算一下:

每天, 1 亿条 1KB 大小的消息保存两份且留存两周的时间,那么每天所需空间大小就等于

100000000 * 1KB * 2 / 1000 / 1000 = 200GB

一般情况下 Kafka 集群除了消息数据还有其他类型的数据,比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空间,因此总的存储容量就是

200GB x (1 + 10%) = 220GB

既然要保存两周,那么整体容量即为:

220GB * 14 ≈ 3TB

Kafka 支持数据的压缩,假设压缩比是 0.75,那么最后你需要规划的存储空间就是:

3TB * 0.75 = 2.25TB

总之在规划磁盘容量时你需要考虑下面这几个元素:

  • 新增消息数
  • 消息留存时间
  • 平均消息大小
  • 备份数
  • 是否启用压缩

带宽

对于 Kafka 这种通过网络大量进行数据传输的框架而言,带宽特别容易成为瓶颈。带宽也主要有两种:

  1. 1Gbps 的千兆网络

  2. 10Gbps 的万兆网络

特别是千兆网络应该是一般公司网络的标准配置。其实真正要规划的是所需的 Kafka 服务器的数量。假设你公司的机房环境是千兆网络,即 1Gbps。

现在你有个业务,其业务目标 是在 1 小时内处理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢?

计算一下:

由于带宽是 1Gbps,即每秒处理 1Gb 的数据。

真实环境中每台 Kafka 服务器都是安装在专属的机器上。通常情况下可以按照Kafka 会用到 70% 的带宽资源来计算,因为根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比较合理的值,也就是说:单台 Kafka 服务器最多也就能使用大约 700Mb(兆比特) 的带宽资源。

这只是它能使用的最大带宽资源,通常要再额外预留出 2/3 的资源,即:

单台服务器使用带宽 700Mb * 1/3 ≈ 240Mbps

好了,有了 240Mbps,我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根据这个目标,我们每秒需要处理 (1024*1024/60/60)MB*8 = 2330Mb 的数据,除以 240,约等于 10 台服务器。如果消息还需要额外复制两份,那么总的服务器台数还要乘以 3,即 30 台。

用这种方法评估线上环境的服务器台数是比较合理的,而且这个方法能够随着你业务需求的变化而动态调整。

小结

在一开始就应该思考好实际场景下业务所需的集群环境。在考量部署方案时需要通盘考虑,不能仅从单个维度上进行评估。

img

Kafka集群配置

配置并不单单指 Kafka 服务器端的配置,其中既有 Broker 端参数,也有主题(后面我用我们更熟悉的 Topic 表示)级别的参数、JVM 端参数和操作系统级别的参数。接下来将要介绍的所有参数都是那些要修改默认值的参数,因为它们的默认值不适合一般的生产环境。

Broker 端参数

目前 Kafka Broker 提供了近 200 个参数,下面我们按照大的用途类别一组一组地介绍它们,希望可以更有针对性,也更方便你记忆。

  1. 存储信息的重要参数

首先 Broker 是需要配置存储信息的,即 Broker 使用哪些磁盘。那么针对存储信息的重要参数有以下这么几个:

  • log.dirs:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。
  • log.dir:注意这是单数,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。

这两个参数应该怎么设置呢?很简单,你只要设置log.dirs,即第一个参数就好了,不要设置log.dir。而且更重要的是,在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:

  • 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
  • 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。
  1. ZooKeeper 相关的设置

下面说说与 ZooKeeper 相关的设置。首先 ZooKeeper 是做什么的呢?它是一个分布式协调框架,负责协调管理并保存 Kafka 集群的所有元数据信息,比如集群都有哪些 Broker 在运行、创建了哪些 Topic,每个 Topic 都有多少分区以及这些分区的 Leader 副本都在哪些机器上等信息。

Kafka 与 ZooKeeper 相关的最重要的参数当属zookeeper.connect。这也是一个 CSV 格式的参数,比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181。2181 是 ZooKeeper 的默认端口。

现在问题来了,如果我让多个 Kafka 集群使用同一套 ZooKeeper 集群,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概念,类似于别名。

如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2。切记 chroot 只需要写一次,而且是加到最后的。我经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的。

  1. Broker 连接相关

第三组参数是与 Broker 连接相关的,即客户端程序或其他 Broker 如何与该 Broker 进行通信的设置。有以下三个参数:

  • listeners

    监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。

  • advertised.listeners

    和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。监听器从构成上来说,它是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密传输等;也可能是你自己定义的协议名字,比如CONTROLLER: //localhost:9092

    一旦你自己定义了协议名称,你必须还要指定listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议,比如指定listener.security.protocol.map=CONTROLLER:PLAINTEXT表示CONTROLLER这个自定义协议底层使用明文不加密传输数据。

  • host.name/port

    这两个是过期的参数了。如果主机名和端口号已经通过监听器配置,则可以忽略。

遇到类似host.name这个设置中到底使用 IP 地址还是主机名。统一的建议是:最好全部使用主机名,即 Broker 端和 Client 端应用配置中全部填写主机名。 Broker 源代码中也使用的是主机名,如果你在某些地方使用了 IP 地址进行连接,可能会发生无法连接的问题。

因此需要保证在hosts文件中建立主机名和IP之间的映射

  1. 关于 Topic 管理的

第四组参数是关于 Topic 管理的。我来讲讲下面这三个参数:

  • auto.create.topics.enable:是否允许自动创建 Topic。

    建议最好设置成 false,即不允许自动创建 Topic。在我们的线上环境里面有很多名字稀奇古怪的 Topic,我想大概都是因为该参数被设置成了 true 的缘故。

    你可能有这样的经历,要为名为 test 的 Topic 发送事件,但是不小心拼写错误了,把 test 写成了 tst,之后启动了生产者程序。恭喜你,一个名为 tst 的 Topic 就被自动创建了。

    所以我一直相信好的运维应该防止这种情形的发生,特别是对于那些大公司而言,每个部门被分配的 Topic 应该由运维严格把控,决不能允许自行创建任何 Topic。

  • unclean.leader.election.enable:是否允许 Unclean Leader 选举。

    Kafka每个分区都有多个副本来提供高可用。在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。只有保存数据比较多的那些副本才有资格竞选Leader 副本。假如出现这种情况:那些保存数据比较多的副本都挂了时,且此参数设置成 false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选 Leader。这样做的后果是这个分区就不可用了,因为没有 Leader 了。反之如果是 true,那么 Kafka 允许你从那些“跑得慢”的副本中选一个出来当 Leader。这样做的后果是数据有可能就丢失了,一般默认值为false,为了防止万一可以显示指定为false。

  • auto.leader.rebalance.enable:是否允许定期进行 Leader 选举。

    此参数对生产环境影响非常大。设置它的值为 true 表示允许 Kafka 定期满足某些条件时地对一些 Topic 分区进行 Leader 重选举,换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此建议在生产环境中把这个参数设置成 false。

  1. 关于数据留存方面的参数
  • log.retention.{hour|minutes|ms}

    这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hour 最低。通常情况下我们还是设置 hour 级别的多一些,比如log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。

  • log.retention.bytes

    这是指定 Broker 为消息保存的总磁盘容量大小。这个值默认是 -1,表明你想在这台 Broker 上保存多少数据都可以,至少在容量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间,设置此参数可以避免个别“恶意”租户使用过多的磁盘空间。

  • message.max.bytes

    控制 Broker 能够接收的最大消息大小。默认的 1000012 太少了,还不到 1MB,在线上环境中设置一个比较大的值还是比较保险的做法。

Topic 级别参数

Kafka 也支持为不同的 Topic 设置不同的参数值。当前最新的 2.2 版本总共提供了大约 25 个 Topic 级别的参数,当然我们也不必全部了解它们的作用,这里我挑出了一些最关键的参数,你一定要把它们掌握清楚。

如果同时设置了 Topic 级别参数和全局 Broker 参数,到底听谁的呢?哪个说了算呢?

答案就是 Topic 级别参数会覆盖全局 Broker 参数的值,而每个 Topic 都能设置自己的参数值,这就是所谓的 Topic 级别参数。

举个例子说明一下,上一期我提到了消息数据的留存时间参数,在实际生产环境中,如果为所有 Topic 的数据都保存相当长的时间,这样做既不高效也无必要。更适当的做法是允许不同部门的 Topic 根据自身业务需要,设置自己的留存时间。如果只能设置全局 Broker 参数,那么势必要提取所有业务留存时间的最大值作为全局参数值,此时设置 Topic 级别参数把它覆盖,就是一个不错的选择。

下面我们依然按照用途分组的方式引出重要的 Topic 级别参数。

  1. 从保存消息方面来考量的话,下面这组参数是非常重要的:
  • retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
  • retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
  1. 从能处理的消息大小这个角度来看的话,有一个参数是必须要设置的,即
  • max.message.bytes

    它决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。我知道目前在很多公司都把 Kafka 作为一个基础架构组件来运行,上面跑了很多的业务数据。如果在全局层面上,我们不好给出一个合适的最大消息值,那么不同业务部门能够自行设定这个 Topic 级别参数就显得非常必要了。在实际场景中,这种用法也确实是非常常见的。

怎么设置 Topic 级别参数

Topic 级别参数的设置就是这种情况,我们有两种方式可以设置:

  • 创建 Topic 时进行设置
  • 修改 Topic 时设置

我们先来看看如何在创建 Topic 时设置这些参数。我用上面提到的retention.msmax.message.bytes举例。设想你的部门需要将交易数据发送到 Kafka 进行处理,需要保存最近半年的交易数据,同时这些数据很大,通常都有几 MB,但一般不会超过 5MB。现在让我们用以下命令来创建 Topic:

我们只需要知道 Kafka 开放了kafka-topics命令供我们来创建 Topic 即可。对于上面这样一条命令,请注意结尾处的--config设置,我们就是在 config 后面指定了想要设置的 Topic 级别参数。

下面看看使用另一个自带的命令kafka-configs来修改 Topic 级别参数。假设我们现在要发送最大值是 10MB 的消息,该如何修改呢?命令如下:

总体来说,你只能使用这么两种方式来设置 Topic 级别参数。我个人的建议是,你最好始终坚持使用第二种方式来设置,并且在未来,Kafka 社区很有可能统一使用kafka-configs脚本来调整 Topic 级别参数。

JVM 参数

Kafka 服务器端代码是用 Scala 语言编写的,但终归还是编译成 Class 文件在 JVM 上运行,因此 JVM 参数设置对于 Kafka 集群的重要性不言而喻。

Kafka 自 2.0.0 版本开始,已经正式摒弃对 Java 7 的支持了,所以有条件的话至少使用 Java 8 吧。

说到 JVM 端设置,堆大小这个参数至关重要。虽然在后面我们还会讨论如何调优 Kafka 性能的问题,但现在我想无脑给出一个通用的建议:将你的 JVM 堆大小设置成 6GB 吧,这是目前业界比较公认的一个合理值。我见过很多人就是使用默认的 Heap Size 来跑 Kafka,说实话默认的 1GB 有点小,毕竟 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例,Heap Size 不能太小。

JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置。如果你依然在使用 Java 7,那么可以根据以下法则选择合适的垃圾回收器:

  • 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定

    -XX:+UseCurrentMarkSweepGC

  • 否则,使用吞吐量收集器。开启方法是指定

    -XX:+UseParallelGC

当然了,如果你已经在使用 Java 8 了,那么就用默认的 G1 收集器就好了。在没有任何调优的情况下,G1 表现得要比 CMS 出色,主要体现在更少的 Full GC,需要调整的参数更少等,所以使用 G1 就好了。

现在我们确定好了要设置的 JVM 参数,我们该如何为 Kafka 进行设置呢?有些奇怪的是,这个问题居然在 Kafka 官网没有被提及。其实设置的方法也很简单,你只需要设置下面这两个环境变量即可:

  • KAFKA_HEAP_OPTS:指定堆大小。
  • KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。

比如你可以这样启动 Kafka Broker,即在启动 Kafka Broker 之前,先设置上这两个环境变量:

操作系统参数

最后我们来聊聊 Kafka 集群通常都需要设置哪些操作系统参数。通常情况下,Kafka 并不需要设置太多的 OS 参数,但有些因素最好还是关注一下,比如下面这几个:

  • 文件描述符限制
  • 文件系统类型
  • Swappiness
  • 提交时间

首先是ulimit -n。我觉得任何一个 Java 项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000。还记得电影《让子弹飞》里的对话吗:“你和钱,谁对我更重要?都不重要,没有你对我很重要!”。这个参数也有点这么个意思。其实设置这个参数一点都不重要,但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。

其次是文件系统类型的选择。这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,XFS 的性能要强于 ext4,所以生产环境最好还是使用 XFS。对了,最近有个 Kafka 使用 ZFS 的数据报告,貌似性能更加强劲,有条件的话不妨一试。

第三是 swap 的调优。网上很多文章都提到设置其为 0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,我个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。

最后是提交时间或者说是 Flush 落盘时间。向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

小结

这里分享了关于 Kafka 集群设置的各类配置,包括 Topic 级别参数、JVM 参数以及操作系统参数。希望这些最佳实践能够在你搭建 Kafka 集群时助你一臂之力,但切记配置因环境而异,一定要结合自身业务需要以及具体的测试来验证它们的有效性。

Views: 282

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注