Kafka分布式消息系统

什么是Kafka

在Spark生态体系中,Kafka占有非常重要的位置。Kafka是一个使用Scala语言编写的基于ZooKeeper的高吞吐量低延迟的分布式发布与订阅消息系统,它可以实时处理大量消息数据以满足各种需求。比如基于Hadoop的批处理系统,低延迟的实时系统等。即便使用非常普通的硬件,Kafka每秒也可以处理数百万条消息,其延迟最低只有几毫秒。
在实际开发中,Kafka常常作为Spark Streaming的实时数据源,Spark Streaming从Kafka中读取实时消息进行处理,保证了数据的可靠性与实时性。二者是实时消息处理系统的重要组成部分。
那么Kafka到底是什么?简单来说,Kafka是消息中间件的一种。

举个生产者与消费者的例子:生产者生产鸡蛋,消费者消费鸡蛋。

假设消费者消费鸡蛋的时候噎住了(系统宕机了),而生产者还在生产鸡蛋,那么新生产的鸡蛋就丢失了;

再比如,生产者1秒钟生产100个鸡蛋(大交易量的情况),而消费者1秒钟只能消费50个鸡蛋,那过不了多长时间,消费者就吃不消了(消息堵塞,最终导致系统超时),导致鸡蛋又丢失了。

这个时候我们放个篮子在生产者与消费者中间,生产者生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,这个篮子就相当于“Kafka”;鸡蛋则相当于Kafka中的消息(Message);篮子相当于存放消息的消息队列,也就是Kafka集群;

当篮子满了,鸡蛋放不下了,这时再加几个篮子,就是Kafka集群扩容。

Kafka中的一些基本概念:

消息(Message)

Kafka的数据单元被称为消息, 也称为事件(Event)。可以把消息看成是数据库里的一行数据或一条记录。为了提高效率,消息可以分组传输,每一组消息就是一个批次,分成批次传输可以减少网络开销。

服务器节点(Broker)

Kafka集群包含一个或多个服务器节点,一个独立的服务器节点被称为Broker。

主题(Topic)

每条发布到Kafka集群的消息都有一个类别,这个类别被称为主题。在物理上,不同主题的消息分开存储;在逻辑上,一个主题的消息虽然保存于一个或多个Broker上,但用户只需指定消息的主题即可生产或消费消息而不必关心消息存于何处。

分区(Partition)

为了使Kafka的吞吐率可以水平扩展,物理上把主题分成一个或多个分区。创建主题时可指定分区数量。

生产者(Producer)

负责发布消息到Kafka的Broker,实际上属于Broker的一种客户端。生产者负责选择哪些消息应该分配到哪个主题内的哪个分区。默认生产者会把消息均匀的分布到特定主题的所有分区上,但在某些情况下,生产者会将消息直接写到指定的分区。

消费者(Consumer)

从Kafka的Broker上读取消息的客户端。读取消息时需要指定读取的主题,通常消费者会订阅一个或多个主题,并按照消息生成的顺序读取他们。

不同的主题好比不同的高速公路,分区好比某条高速公路上的车道,消息就是车道上运行的车辆。如果车流量大,则拓宽车道,反之,则减少车道;而消费者就好比高速公路上的收费站,开放的收费站越多,则车辆通过速度越快。

Kafka架构

Kafka的消息传递流程如图所示。生产者将消息发送给Kafka集群,同时Kafka集群将消息转发给消费者。
一个典型的Kafka集群中包含若干生产者(数据可以是Web前端产生的页面内容或者服务器日志等)、若干Broker、若干消费者(可以是Hadoop集群、实时监控程序、数据仓库或其它服务)以及一个ZooKeeper集群。ZooKeeper用于管理和协调Broker。当Kafka系统中新增了Broker或者某个Broker故障失效时,ZooKeeper将通知生产者和消费者。生产者和消费者据此开始与其它Broker协调工作。生产者使用Push模式将消息发送到Broker,而消费者使用Pull模式从Broker订阅并消费消息。

file

file

主题与分区

Kafka通过主题对消息进行分类,一个主题可以分为多个分区,且每个分区可以存储于不同的Broker上,也就是说,一个主题可以横跨多个服务器。

对主题进行分区的好处是:允许主题消息规模超出一台服务器的文件大小上限。因为一个主题可以有多个分区,且可以存储在不同的服务器上,当一个分区的文件大小超出了所在服务器的文件大小上限时,可以动态添加其它分区,因此可以处理无限量的数据。

file

Kafka会为每个主题维护一个分区日志,记录各个分区的消息存放情况。消息以追加的方式写入到每个分区的尾部,然后以先入先出的顺序进行读取。由于一个主题包含多个分区,所以无法在整个主题范围内保证消息的顺序,但可以保证单个分区内消息的顺序。

当一条消息被发送到Broker时,会根据分区规则被存储到某个分区里。如果分区规则设置的合理,所有消息将被均匀的分配到不同的分区里,这样就实现了水平扩展。如果一个主题的消息都存放到一个文件中,则该文件所在的Broker的I/O将成为主题的性能瓶颈,而分区正好解决了这个问题。

分区中的每个记录都被分配了一个偏移量(offset),偏移量是一个连续递增的整数值,它唯一标识分区中的某个记录。而消费者只需保存该偏移量即可,当消费者客户端向Broker发起消息请求时需要携带偏移量。例如,消费者向Broker请求主题test的分区0中的偏移量从20开始的所有消息以及主题test的分区1中的偏移量从35开始的所有消。当消费者读取消息后,偏移量会线性递增。当然,消费者也可以按照任意顺序消费消息,比如读取已经消费过的历史消息(将偏移量重置到之前版本)。此外,消费者还可以指定从某个分区中一次最多返回多少条数据,防止一次返回数据太多而耗尽客户端的内存。

file

分区副本

在Kafka集群中,为了提高数据的可靠性,同一个分区可以复制多个副本分配到不同的Broker,这种方式类似于HDFS中的副本机制。如果其中一个Broker宕机,其它Broker可以接替宕机的Broker,不过生产者和消费者需要重新连接到新的Broker。

Kafka每个分区的副本都被分为两种类型:领导者副本和跟随者副本。领导者副本只有一个,其余的都是跟随者副本。所有生产者和消费者都向领导者副本发起请求,进行消息的写入与读取,而跟随者副本并不处理客户端的请求,它唯一的任务是从领导者副本复制消息,以保持与领导者副本数据及状态的一致。
如果领导者副本发生崩溃,会从其余的跟随者副本中选出一个作为新的领导者副本。

file

file

消费者组

消费者组(Consumer Group)实际上就是一组消费者的集合。每个消费者属于一个特定的消费者组(可为每个消费者指定组名称,消费者通过组名称对自己进行标识,若不指定组名称则属于默认的组)。
传统消息处理有两种模式:队列模式和发布订阅模式。队列模式是指消费者可以从一台服务器读取消息,并且每个消息只被其中一个消费者消费;发布订阅模式是指消息通过广播方式发送给所有消费者。而Kafka提供了消费者组模式,能够同时具备这两种(队列和发布订阅)模式的特点。
Kafka规定,同一消费者组内不允许多个消费者消费同一分区的消息;而不同的消费者组,可以同时消费同一分区的消息。也就是说,分区与同一个消费者组中的消费者的对应关系是多对一而不允许一对多。举个例子,如果同一个应用有100台机器,这100台机器属于同一个消费者组,则同一条消息在100台机器中只有一台能得到。如果另一个应用也需要同时消费同一个主题的消息,则需要新建一个消费者组并消费同一个主题的消息。我们已经知道,消息存储于分区中,消费者组与分区的关系如图。

file

Kafka集群环境搭建

Kafka依赖ZooKeeper集群,搭建Kafka集群之前,需要先搭建好ZooKeeper集群。ZooKeeper集群的搭建步骤此处不做过多讲解。本例依然使用三台服务器在CentOS7上搭建Kafka集群,三台服务器的主机和IP地址分别为:

1、下载解压Kafka

从Apache官网http://kafka.apache.org下载Kafka的稳定版本kafka_2.11-2.0.0.tgz。
然后将Kafka安装包上传到centos01节点的/opt/softwares目录,并解压到目录/opt/modules下:

2、修改配置文件

修改Kafka安装目录下的config/server.properties文件。在分布式环境中建议至少修改以下配置项:

3、发送安装文件到其他节点
将centos01节点配置好的Kafka安装文件复制到centos02和centos03节点:

复制完成后,修改centos02节点的Kafka安装目录下的config/server.properties文件,修改内容如下:

同理,修改centos03节点的Kafka安装目录下的config/server.properties文件,修改内容如下:

4、启动ZooKeeper集群
分别在三个节点上执行以下命令,启动ZooKeeper集群(需进入ZooKeeper安装目录):

5、启动Kafka集群
分别在三个节点上执行以下命令,启动Kafka集群(需进入Kafka安装目录):

集群启动后,分别在各个节点上执行jps命令,查看启动的Java进程,若能输出如下进程信息,说明启动成功。

查看Kafka安装目录下的日志文件logs/server.log,确保运行稳定,没有抛出异常。至此,Kafka集群搭建完成。

Kafka命令行操作

1、创建主题
创建主题可以使用Kafka提供的命令工具kafka-topics.sh,此处我们创建一个名为topictest的主题,分区数为2,每个分区的副本数为2,命令如下(在Kafka集群的任意节点执行即可):

说明

  • –create:指定命令的动作是创建主题,使用该命令必须指定–topic参数。
  • –topic:所创建的主题名称。
  • –partitions:所创建主题的分区数。
  • –zookeeper:指定ZooKeeper集群的访问地址,这种方式已废弃
    • 建议改成–bootstrap-server的方式指定broker所在地址和端口,如有多个用逗号分隔。
  • –replication-factor:所创建主题的分区副本数,其值必须小于等于Kafka的节点数。
    • 如果只有一个节点,可以不指定

命令执行完毕后,若输出以下结果则表明创建主题成功:

2、查询主题
创建主题成功后,可以执行以下命令,查看当前Kafka集群中存在的所有主题:

也可以使用–describe参数查询某一个主题的详细信息。例如,查询主题topictest的详细信息,命令如下:

输出结果如下:

可以看到,该主题有2个分区,每个分区有2个副本。分区编号为0的副本分布在broker.id为2和3的Broker上,其中broker.id为2上的副本为领导者副本;分区编号为1的副本分布在broker.id为1和3的Broker上,其中broker.id为3上的副本为领导者副本。

3、创建生产者
Kafka生产者作为消息生产角色,可以使用Kafka自带的命令工具创建一个最简单的生产者。例如,在主题topictest上创建一个生产者,命令如下:

说明:

  • –broker-list:指定Kafka Broker的访问地址,只要能访问到其中一个即可连接成功,若想写多个则用逗号隔开。建议将所有的Broker都写上,如果只写其中一个,如果该Broker失效,连接将失败。注意此处的Broker访问端口为9092,Broker通过该端口接收生产者和消费者的请求,该端口在安装Kafka时已经指定。
  • –topic:指定生产者发送消息的主题名称。
    创建完成后,控制台进入等待键盘输入消息的状态。

4、创建消费者
新开启一个SSH连接窗口(可连接Kafka集群中的任何一个节点),在主题topictest上创建一个消费者,命令如下:

上述代码中,参数--bootstrap-server用于指定Kafka Broker访问地址。
消费者创建完成后,等待接收生产者的消息。此时在生产者控制台输入消息“hello kafka”后按回车(可以将文件或者标准输入的消息发送到Kafka集群中,默认一行作为一个消息)即可将消息发送到Kafka集群。

file

在消费者控制台,则可以看到输出相同的消息“hello kafka”:

file

谢谢!

Views: 259

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注