Kafka快速起步指南。
1、获取KAFKA
下载 最新版本的Kafka并解压:
1 2 |
$ tar -xzf kafka_2.13-2.6.0.tgz $ cd kafka_2.13-2.6.0 |
2、准备Kafka环境
注意: 本地环境需要安装 Java 8+
运行下面命令开启Kafka中内置的Zookeeper服务
1 2 3 |
# 启动Kafka中内置的ZooKeeper服务 # 注意: ZooKeeper框架未来将从Kafka框架中分离出去 $ bin/zookeeper-server-start.sh config/zookeeper.properties |
打开另一个终端运行Kafka代理服务:
1 2 |
# 开启Kafka代理服务 $ bin/kafka-server-start.sh config/server.properties |
当所有服务都成功启动,一个简单的单机Kafka环境就准备好了.
3、创建一个消息主题
Kafka是一个分布式事件流平台,它允许您跨多台机器读、写、存储和处理消息(在官方文档中也称为事件 )。
示例事件有支付事务、来自移动电话的地理位置更新、发货订单、来自物联网设备或医疗设备的传感器测量等等。这些事件被组织并存储在 主题中。非常简单,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。
因此,在编写第一个事件之前,必须创建一个主题。打开另一个终端会话并运行:
1 |
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 |
所有Kafka的命令行工具都有额外的选项: 运行Kafka -topic .sh命令而不带任何参数来显示使用信息。例如,它还可以显示详细信息,如新主题的分区数量:
1 2 3 |
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs: Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0 |
4、向对应主题写入消息
Kafka客户端通过网络与Kafka代理服务通信以读写消息。一旦接收到这些消息,代理将以持久和容错的方式存储这些事件,您需要多长时间就存储多长时间——甚至永远存储。
运行控制台生成器客户端,在主题中写入一些事件。默认情况下,您输入的每一行都将导致向主题写入一个单独的消息。
1 2 3 |
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 This is my first event This is my second event |
您可以在任何时候使用Ctrl-C停止生产者(producer)客户端
5、读取消息
打开另一个终端会话并运行控制台消费者客户端(kafka-console-consumer)来读取您刚刚创建的事件:
1 2 3 |
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 This is my first event This is my second event |
您可以在任何时候使用Ctrl-C
停止客户端。
您可以自由试验: 例如,切换回您的生产者终端(上一步)来编写额外的事件,并查看事件如何立即显示在您的消费者终端中。
因为事件长期存储在Kafka中,所以它们可以被任意多的消费者读取。您可以通过打开另一个终端会话并再次运行之前的命令来轻松地验证这一点。
6、使用KAFKA CONNECT将数据导入/导出为消息流
您可能在关系数据库或传统消息传递系统等现有系统中拥有大量数据,以及许多已经使用这些系统的应用程序。Kafka Connect允许您不断地从外部系统摄取数据到Kafka,反之亦然。因此,与Kafka集成现有系统是非常容易的。为了使这个过程更容易,有数百个这样的连接器可用。
看一看Kafka Connect section部分,了解更多关于如何持续地导入/导出数据到Kafka和导出。
7、使用KAFKA STREAMS处理消息
一旦您的数据以消息的形式存储在Kafka中,您就可以使用 Kafka Streams 客户端的Java/Scala库处理数据。它允许您实现关键任务的实时应用程序和微服务,其中输入和/或输出数据存储在Kafka主题中。
Kafka Streams将客户端编写和部署标准Java和Scala应用程序的简单性与Kafka服务器端集群技术的优势结合在一起,使这些应用程序具有高度的可伸缩性(highly scalable)、弹性(elastic)、容错性(fault-tolerant)和分布式(distributed)。这个库完全支持精确一次处理(exactly-once processing)的原语、有状态(stateful )操作和聚合(aggregation)、窗口(windowing)、连接(join)、基于事件时间的处理等等。
给你一个初步的体验,这里是如何实现流行的WordCount算法:
1 2 3 4 5 6 7 8 |
KStream<String, String> textLines = builder.stream("quickstart-events"); KTable<String, Long> wordCounts = textLines .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" "))) .groupBy((keyIgnored, word) -> word) .count(); wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long())); |
Kafka 流处理演示演示和应用程序部署教程演示了如何从头到尾编写和运行这样的流处理应用程序。
8、停止KAFKA
既然你已经完成了《快速起步》,那么你可以随时停止Kafka运行——或者继续练习。
-
使用
Ctrl-C
停止生产者和消费者客户端。 -
用
Ctrl-C
停止Kafka代理服务器。 -
最后,用
Ctrl-C
停止ZooKeeper服务器。
如果您还想删除您本地Kafka环境的任何数据,包括您在此过程中创建的任何事件,运行命令:
1 |
$ rm -rf /tmp/kafka-logs /tmp/zookeeper |
恭喜你!
您已经成功地完成了Apache Kafka快速启动。
为了解更多详情,我们建议以下步骤:
-
通过阅读简要介绍了解Kafka的高级使用技巧,它的主要概念,以及它和其他技术的比较。要更详细地了解Kafka,请查看文档。
-
浏览 Kafka用例,了解我们全球社区的其他用户是如何从Kafka中获得价值的。
-
加入Kafka的线下交流会,或者参加每年一次的Kafka峰会。
Views: 389