1、Maxwell 简介
Maxwell 是一个能实时读取 MySQL 二进制日志文件binlog,并生成 Json格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。
Maxwell主要提供了下列功能
-
- 支持
SELECT * FROM table
的方式进行全量数据初始化。
- 支持
-
- 支持在主库发生failover后,自动恢复binlog位置,实现断点续传。
-
- 可以对数据进行分区,解决数据倾斜问题,发送到Kafka的数据支持库、表、列等级别的数据分区。
-
- 工作方式是伪装为slave接收binlog events,然后根据schema信息拼装,可以接受ddl、xid、row等event。
2、Mysql Binlog介绍
2.1 Binlog 简介
MySQL中一般有以下几种日志
日志类型 | 写入日志的信息 |
---|---|
错误日志 | 记录在启动,运行或停止mysqld时遇到的问题 |
通用查询日志 | 记录建立的客户端连接和执行的语句 |
二进制日志 binlog | 记录更改数据的语句 |
中继日志 | 从服务器 复制 主服务器接收的数据更改 |
慢查询日志 | 记录所有执行时间超过 long_query_time 秒的所有查询或不使用索引的查询 |
DDL日志(元数据日志) | 元数据操作由DDL语句执行 |
在默认情况下,系统仅仅打开错误日志,关闭了其他所有日志,以达到尽可能减少IO损耗提高系统性能的目的,但是在一般稍微重要一点的实际应用场景中,都至少需要打开二进制日志,因为这是MySQL很多存储引擎进行增量备份的基础,也是MySQL实现复制的基本条件
接下来主要介绍二进制日志 binlog。
MySQL 的二进制日志 binlog 可以说是 MySQL 最重要的日志,它记录了所有的 DDL
和 DML
语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。binlog 的主要目的是复制和恢复。
Binlog日志的两个最重要的使用场景
- MySQL主从复制
- MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves来达到master-slave数据一致的目的。
- 数据恢复
- 通过使用 mysqlbinlog工具来使恢复数据。
2.2 Binlog 的日志格式
记录在二进制日志中的事件的格式取决于二进制记录格式。支持三种格式类型:
- Statement:基于SQL语句的复制(statement-based replication, SBR)
- Row:基于行的复制(row-based replication, RBR)
- Mixed:混合模式复制(mixed-based replication, MBR)
Statement
- 每一条会修改数据的sql都会记录在binlog中。
- 优点
- 不需要记录每一行的变化,减少了binlog日志量,节约了IO, 提高了性能。
- 缺点
- 在进行数据同步的过程中有可能出现数据不一致。
- 比如 update tt set create_date=now(),如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
Row
- 它不记录sql语句上下文相关信息,仅保存哪条记录被修改。
- 优点
- 保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,它只记录执行后的效果。
- 缺点
- 每行数据的修改都会记录,最明显的就是update语句,导致更新多少条数据就会产生多少事件,占用较大空间。
Mixed
- 从5.1.8版本开始,MySQL提供了Mixed格式,实际上就是Statement与Row的结合。
- 在Mixed模式下,一般的复制使用Statement模式保存binlog,对于Statement模式无法复制的操作使用Row模式保存binlog, MySQL会根据执行的SQL语句选择日志保存方式(因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为Row模式)。
- 优点
- 节省空间,同时兼顾了一定的一致性。
- 缺点
- 还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。
3、Mysql 实时数据同步方案对比
-
mysql 数据实时同步可以通过解析mysql的 binlog 的方式来实现,解析binlog可以有多种方式,可以通过canal,或者maxwell等各种方式实现。以下是各种抽取方式的对比介绍。
-
其中
canal
由 Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费canal解析到的数据。 -
Maxwell相对于canal的优势是使用简单,Maxwell比Canal更加轻量级,它直接将数据变更输出为json字符串,不需要再编写客户端。对于缺乏基础建设,短时间内需要快速迭代的项目和公司比较合适。
-
另外
Maxwell
有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap
功能,可以直接引导出完整的历史数据用于初始化,非常好用。
4、开启Mysql的Binlog
-
1、服务器当中安装mysql(省略)
- 注意:mysql的版本尽量不要太低,也不要太高,最好使用5.6及以上版本。
-
2、添加mysql普通用户
maxwell
-
为mysql添加一个普通用户maxwell,因为maxwell这个软件默认用户使用的是maxwell这个用户。
-
进入mysql客户端,然后执行以下命令,进行授权
1mysql -uroot -p123456- 执行sql语句
12345678910--校验级别最低,只校验密码长度mysql> set global validate_password_policy=LOW;mysql> set global validate_password_length=6;--创建maxwell库(启动时候会自动创建,不需手动创建)和用户mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';--刷新权限mysql> flush privileges;maxwell会自动在MySQL中创建名为maxwell的数据库作为元数据保存使用。
-
-
3、修改配置文件
/etc/my.cnf
- 执行命令 sudo vim /etc/my.cnf, 添加或修改以下三行配置
12345678#binlog日志名称前缀log-bin= /var/lib/mysql/mysql-bin#binlog日志格式binlog-format=ROW#唯一标识,这个值的区间是:1到(2^32)-1server_id=1 -
4、重启mysql服务
- 执行如下命令
1sudo service mysqld restart -
5、验证binlog是否配置成功
- 进入mysql客户端,并执行以下命令进行验证
12mysql -uroot -p123456mysql> show variables like '%log_bin%'; -
6、查看binlog日志文件生成
- 进入
/var/lib/mysql
目录,查看binlog日志文件.
- 进入
5、Maxwell安装部署
-
1、下载对应版本的安装包
- 地址:https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz
- 安装包名称:
maxwell-1.21.1.tar.gz
-
2、上传服务器
-
3、解压安装包到指定目录
1tar -zxvf maxwell-1.21.1.tar.gz -C /kkb/install/ -
4、修改maxwell配置文件
- 进入到安装目录
/kkb/install/maxwell-1.21.1
进行如下操作
123cd /kkb/install/maxwell-1.21.1cp config.properties.example config.propertiesvim config.properties- 配置文件
config.properties
内容如下:
1234567891011# choose where to produce data toproducer=kafka# list of kafka brokerskafka.bootstrap.servers=node01:9092,node02:9092,node03:9092# mysql login infohost=node03port=3306user=maxwellpassword=123456# kafka topic to write tokafka_topic=maxwell- 注意:一定要保证使用
maxwell
用户和123456
密码能够连接上mysql数据库。
- 进入到安装目录
6、kafka介绍和使用
6.1 Kafka简介
Kafka是最初由Linkedin公司开发,它是一个分布式、可分区、多副本,基于zookeeper协调的分布式日志系统;常见可以用于web/nginx日志、访问日志,消息服务等等。Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。主要应用场景是:日志收集系统和消息系统。
Kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。Kafka就是一种发布-订阅模式。将消息保存在磁盘中,以顺序读写方式访问磁盘,避免随机读写导致性能瓶颈。
- 消息(Message)
- 是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
- 消息队列(Message Queue)
- 一种应用间的通信方式,消息发送后可以立即返回,通过消息系统来确保信息的可靠传递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。
6.2 Kafka特性
-
高吞吐、低延迟
1kafka 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。 -
高伸缩性
1每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。 -
持久性、可靠性
1Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失。 -
容错性
1允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作。 -
高并发
1支持数千个客户端同时读写。
6.3 Kafka集群架构
-
producer
1消息生产者,发布消息到Kafka集群的终端或服务。 -
broker
1Kafka集群中包含的服务器,一个borker就表示kafka集群中的一个节点。 -
topic
12每条发布到Kafka集群的消息属于的类别,即Kafka是面向 topic 的。更通俗的说Topic就像一个消息队列,生产者可以向其写入消息,消费者可以从中读取消息,一个Topic支持多个生产者或消费者同时订阅它,所以其扩展性很好。 -
partition
1每个 topic 包含一个或多个partition。Kafka分配的单位是partition。 -
replica
1partition的副本,保障 partition 的高可用。 -
consumer
1从Kafka集群中消费消息的终端或服务。 -
consumer group
1每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。 -
leader
1每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。 producer 和 consumer 只跟 leader 交互。 -
follower
1Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。 -
controller
12知道大家有没有思考过一个问题,就是Kafka集群中某个broker宕机之后,是谁负责感知到他的宕机,以及负责进行Leader Partition的选举?如果你在Kafka集群里新加入了一些机器,此时谁来负责把集群里的数据进行负载均衡的迁移?包括你的Kafka集群的各种元数据,比如说每台机器上有哪些partition,谁是leader,谁是follower,是谁来管理的?如果你要删除一个topic,那么背后的各种partition如何删除,是谁来控制?还有就是比如Kafka集群扩容加入一个新的broker,是谁负责监听这个broker的加入?如果某个broker崩溃了,是谁负责监听这个broker崩溃?这里就需要一个Kafka集群的总控组件,Controller。他负责管理整个Kafka集群范围内的各种东西。 -
zookeeper
12(1) Kafka 通过 zookeeper 来存储集群的meta元数据信息。(2)一旦controller所在broker宕机了,此时临时节点消失,集群里其他broker会一直监听这个临时节点,发现临时节点消失了,就争抢再次创建临时节点,保证有一台新的broker会成为controller角色。 -
offset
- 偏移量
12345消费者在对应分区上已经消费的消息数(位置),offset保存的地方跟kafka版本有一定的关系。kafka0.8 版本之前offset保存在zookeeper上。kafka0.8 版本之后offset保存在kafka集群上。它是把消费者消费topic的位置通过kafka集群内部有一个默认的topic,名称叫 __consumer_offsets,它默认有50个分区。
6.4 Kafka集群安装部署
-
1、下载安装包(http://kafka.apache.org)
1kafka_2.11-1.1.0.tgz -
2、规划安装目录
1/kkb/install -
3、上传安装包到服务器中
1通过FTP工具上传安装包到node01服务器上 -
4、解压安装包到指定规划目录
1tar -zxvf kafka_2.11-1.1.0.tgz -C /kkb/install -
5、重命名解压目录
1mv kafka_2.11-1.1.0 kafka -
6、修改配置文件
-
在node01上修改
-
进入到kafka安装目录下有一个
config
目录- vi server.properties
12345678910#指定kafka对应的broker id ,唯一broker.id=0#指定数据存放的目录log.dirs=/kkb/install/kafka/kafka-logs#指定zk地址zookeeper.connect=node01:2181,node02:2181,node03:2181#指定是否可以删除topic ,默认是false 表示不可以删除delete.topic.enable=true#指定broker主机名host.name=node01 -
配置kafka环境变量
- sudo vi /etc/profile
12export KAFKA_HOME=/kkb/install/kafkaexport PATH=$PATH:$KAFKA_HOME/bin
-
-
6、分发kafka安装目录到其他节点
1234scp -r kafka node02:/kkb/installscp -r kafka node03:/kkb/installscp /etc/profile node02:/etcscp /etc/profile node03:/etc -
7、修改node02和node03上的配置
-
node02
-
vi server.properties
12345678910#指定kafka对应的broker id ,唯一broker.id=1#指定数据存放的目录log.dirs=/kkb/install/kafka/kafka-logs#指定zk地址zookeeper.connect=node01:2181,node02:2181,node03:2181#指定是否可以删除topic ,默认是false 表示不可以删除delete.topic.enable=true#指定broker主机名host.name=node02 -
node03
-
vi server.properties
12345678910#指定kafka对应的broker id ,唯一broker.id=2#指定数据存放的目录log.dirs=/kkb/install/kafka/kafka-logs#指定zk地址zookeeper.connect=node01:2181,node02:2181,node03:2181#指定是否可以删除topic ,默认是false 表示不可以删除delete.topic.enable=true#指定broker主机名host.name=node03
-
-
8、让每台节点的kafka环境变量生效
- 在每台服务器执行命令
1source /etc/profile
6.5 kafka集群启动和停止
-
1、启动kafka集群
- 先启动zookeeper集群,然后在所有节点如下执行脚本
1nohup kafka-server-start.sh /kkb/install/kafka/config/server.properties >/dev/null 2>&1 & -
2、停止kafka集群
- 所有节点执行关闭kafka脚本
1kafka-server-stop.sh
6.6 kafka命令行的管理使用
-
1、创建topic
- 使用
kafka-topics.sh
脚本
1kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181 - 使用
-
2、查询所有的topic
- 使用
kafka-topics.sh
脚本
1kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 - 使用
-
3、查看topic的描述信息
- 使用
kafka-topics.sh
脚本
1kafka-topics.sh --describe --topic test --zookeeper node01:2181,node02:2181,node03:2181 - 使用
-
4、删除topic
- 使用
kafka-topics.sh
脚本
1kafka-topics.sh --delete --topic test --zookeeper node01:2181,node02:2181,node03:2181 - 使用
-
5、模拟生产者写入数据到topic中
- 使用
kafka-console-producer.sh
脚本
1kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test - 使用
-
6、模拟消费者拉取topic中的数据
- 使用
kafka-console-consumer.sh
脚本
1kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic test --from-beginning或者(推荐)
1kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test --from-beginning - 使用
7、Maxwell实时采集mysql表数据到kafka
-
1、启动kafka集群和zookeeper集群
- 启动zookeeper集群
12#每台节点执行脚本nohup zkServer.sh start >/dev/null 2>&1 &- 启动kafka集群
12nohup /kkb/install/kafka/bin/kafka-server-start.sh /kkb/install/kafka/config/server.properties > /dev/null 2>&1 & -
2、创建topic
1kafka-topics.sh --create --topic maxwell --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181(如果虚拟机磁盘容量有效,可以将分区数和复制因子都设置为1.)
-
3、启动maxwell服务
1/kkb/install/maxwell-1.21.1/bin/maxwell -
4、插入数据并进行测试
-
向mysql表中插入一条数据,并开启kafka的消费者,查看kafka是否能够接收到数据。
-
向mysql当中创建数据库和数据库表并插入数据
12345678910111213141516171819202122CREATE DATABASE /*!32312 IF NOT EXISTS*/<code>test_db</code> /*!40100 DEFAULT CHARACTER SET utf8 */;USE <code>test_db</code>;/*Table structure for table <code>user</code> */DROP TABLE IF EXISTS <code>user</code>;CREATE TABLE <code>user</code> (id</code> varchar(10) NOT NULL,name</code> varchar(10) DEFAULT NULL,age</code> int(11) DEFAULT NULL,PRIMARY KEY (<code>id</code>)) ENGINE=InnoDB DEFAULT CHARSET=utf8;/*Data for the table <code>user</code> */#插入数据insert into <code>user</code>(<code>id</code>,<code>name</code>,<code>age</code>) values ('1','xiaokai',20);#修改数据update <code>user</code> set age= 30 where id='1';#删除数据delete from <code>user</code> where id='1';
-
-
5、启动kafka的自带控制台消费者
- 启动Kafka消费者监听maxwell主题
1kafka-console-consumer.sh --topic maxwell --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning- 等待一段事件,观察maxwell主题是否有消息发送过来
12345{"database":"test_db","table":"user","type":"insert","ts":1621244407,"xid":985,"commit":true,"data":{"id":"1","name":"xiaokai","age":20}}{"database":"test_db","table":"user","type":"update","ts":1621244413,"xid":999,"commit":true,"data":{"id":"1","name":"xiaokai","age":30},"old":{"age":20}}{"database":"test_db","table":"user","type":"delete","ts":1621244419,"xid":1013,"commit":true,"data":{"id":"1","name":"xiaokai","age":30}}-
json数据字段说明
-
database
- 数据库名称
-
table
- 表名称
-
type
- 操作类型
- 包括 insert/update/delete 等
-
ts
- 操作时间戳
-
xid
- 事务id
-
commit
- 同一个xid代表同一个事务,事务的最后一条语句会有commit
-
data
- 最新的数据,修改后的数据
-
old
- 旧数据,修改前的数据
Views: 11