创建Zookeeper集群(3个)
前提是已经装好Java JDK8+并配置好环境变量。
建议Kafka集群使用专有的Zookeeper集群进行协调管理。
也可以使用Kafka内置的
bin/zookeeper命令启动集群, 默认配置是config/zookeeper.properties
创建3个zk配置文件
[hadoop@hadoop000 config] vi zookeeper-1(2|3).properties
修改配置文件内容如下
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/home/hadoop/tmp/kafka-zk-1(2|3)/data
dataLogDir=/home/hadoop/tmp/kafka-zk-1(2|3)/log
# the port at which the clients will connect
clientPort=2181(2|3)
# 将所有4字命令添加到白名单中
4lw.commands.whitelist=*
# 配置zookeeper服务器的 [主机名或主机ID]:[同步端口(唯一)]:[选举端口(唯一)]
server.1=localhost:2891:3891
server.2=localhost:2892:3892
server.3=localhost:2893:3893
根据配置创建对应的dataDir以及dataLogDir,并在dataDir下创建myid文件。
[hadoop@hadoop000 tmp]echo 1 > kafka-zk-1/data/myid
[hadoop@hadoop000 tmp]echo 2 > kafka-zk-2/data/myid
[hadoop@hadoop000 tmp]echo 3 > kafka-zk-3/data/myid
[hadoop@hadoop000 config] zookeeper-server-start.sh -daemon zookeeper-1.properties
[hadoop@hadoop000 config] zookeeper-server-start.sh -daemon zookeeper-2.properties
[hadoop@hadoop000 config]$ zookeeper-server-start.sh -daemon zookeeper-3.properties
修改Kafka集群配置
vi server-1(2|3).properties
# The id of the broker.
broker.id=1(2|3)
# The port of the broker.
# port=9092(3|4)
listeners=PLAINTEXT://hadoop000:9092(3|4)
# Log file directories - A comma separated list
log.dirs=/home/hadoop/tmp/kafka-1(2|3)/log
# Zookeeper connection string - A comma separated list
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/kafka
启动Kafka代理服务(3个)
[hadoop@hadoop000 config]$ kafka-server-start.sh -daemon server-1.properties
[hadoop@hadoop000 config]$ kafka-server-start.sh -daemon server-2.properties
[hadoop@hadoop000 config]$ kafka-server-start.sh -daemon server-3.properties
创建消息主题(2分区,2副本)
由于我们有3台Kafka服务器,因此可以创建具有多分区以及多副本的主题
$ bin/kafka-topics.sh --bootstrap-server hadoop000:9092,hadoop000:9093,hadoop000:9094 --create --topic my-topic --partitions 2 --replication-factor 2
也可以使用--zookeeper选项进行连接, 如下所示:
$ kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183/kafka --create --topic my-topic --partitions 2 --replication-factor 2但是--zookeeper这个选项在较新版本中已经废弃, 建议使用--bootstrap来代替, 有更好的安全机制
列出主题详情
$ bin/kafka-topics.sh --bootstrap-server hadoop000:9092,hadoop000:9093,hadoop000:9094 --describe --topic my-topic
Topic: my-topic PartitionCount: 2 ReplicationFactor: 2 Configs:
Topic: my-topic Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: my-topic Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1
创建消费者
可以同时监听多台Kafka服务器组成的集群
bin/kafka-console-consumer.sh --bootstrap-server hadoop000:9092,hadoop000:9093,hadoop000:9094 --topic my-topic --from-beginning
创建生产者
-
代理服务器列表可以指定多台Kafka服务器组成的集群
-
按顺序发出一些消息
kafka-console-producer.sh --broker-list hadoop000:9092,hadoop000:9093,hadoop000:9094 --topic my-topic 0 1 2 3 4 5消费者查看消息
[hadoop@hadoop000 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop000:9092,hadoop000:9093,hadoop000:9094 --topic my-topic --from-beginning
4
5
0
1
2
3
小问题:消费者客户端接收的消息顺序为什么这样?
因为只有消费者接受的消息都是来自单个分区的时候才能保证消息有序.
总结
以上安装方式虽然使用了三个zookeeper服务器和三个kafka broker,但是还是运行在一台机器上,因此只能算得上伪分布式,真正的分布式需要将这些服务分布在多台机器上的。
Views: 368
