Maxwell 数据库数据实时采集

1、Maxwell 简介

Maxwell 是一个能实时读取 MySQL 二进制日志文件binlog,并生成 Json格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。

Maxwell主要提供了下列功能

    1. 支持SELECT * FROM table的方式进行全量数据初始化。
    1. 支持在主库发生failover后,自动恢复binlog位置,实现断点续传。
    1. 可以对数据进行分区,解决数据倾斜问题,发送到Kafka的数据支持库、表、列等级别的数据分区。
    1. 工作方式是伪装为slave接收binlog events,然后根据schema信息拼装,可以接受ddl、xid、row等event。

2、Mysql Binlog介绍

2.1 Binlog 简介

MySQL中一般有以下几种日志

日志类型 写入日志的信息
错误日志 记录在启动,运行或停止mysqld时遇到的问题
通用查询日志 记录建立的客户端连接和执行的语句
二进制日志 binlog 记录更改数据的语句
中继日志 从服务器 复制 主服务器接收的数据更改
慢查询日志 记录所有执行时间超过 long_query_time 秒的所有查询或不使用索引的查询
DDL日志(元数据日志) 元数据操作由DDL语句执行

在默认情况下,系统仅仅打开错误日志,关闭了其他所有日志,以达到尽可能减少IO损耗提高系统性能的目的,但是在一般稍微重要一点的实际应用场景中,都至少需要打开二进制日志,因为这是MySQL很多存储引擎进行增量备份的基础,也是MySQL实现复制的基本条件

接下来主要介绍二进制日志 binlog。

MySQL 的二进制日志 binlog 可以说是 MySQL 最重要的日志,它记录了所有的 DDLDML 语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。binlog 的主要目的是复制和恢复

Binlog日志的两个最重要的使用场景

  • MySQL主从复制
    • MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves来达到master-slave数据一致的目的。
  • 数据恢复
    • 通过使用 mysqlbinlog工具来使恢复数据。

2.2 Binlog 的日志格式

记录在二进制日志中的事件的格式取决于二进制记录格式。支持三种格式类型:

  • Statement:基于SQL语句的复制(statement-based replication, SBR)
  • Row:基于行的复制(row-based replication, RBR)
  • Mixed:混合模式复制(mixed-based replication, MBR)

Statement

  • 每一条会修改数据的sql都会记录在binlog中。
  • 优点
    • 不需要记录每一行的变化,减少了binlog日志量,节约了IO, 提高了性能。
  • 缺点
    • 在进行数据同步的过程中有可能出现数据不一致。
    • 比如 update tt set create_date=now(),如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。

Row

  • 它不记录sql语句上下文相关信息,仅保存哪条记录被修改。
  • 优点
    • 保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,它只记录执行后的效果。
  • 缺点
    • 每行数据的修改都会记录,最明显的就是update语句,导致更新多少条数据就会产生多少事件,占用较大空间。

Mixed

  • 从5.1.8版本开始,MySQL提供了Mixed格式,实际上就是Statement与Row的结合。
  • 在Mixed模式下,一般的复制使用Statement模式保存binlog,对于Statement模式无法复制的操作使用Row模式保存binlog, MySQL会根据执行的SQL语句选择日志保存方式(因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为Row模式)。
  • 优点
    • 节省空间,同时兼顾了一定的一致性。
  • 缺点
    • 还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。

3、Mysql 实时数据同步方案对比

  • mysql 数据实时同步可以通过解析mysql的 binlog 的方式来实现,解析binlog可以有多种方式,可以通过canal,或者maxwell等各种方式实现。以下是各种抽取方式的对比介绍。

    mysql实时同步方案对比

  • 其中canal 由 Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费canal解析到的数据。

  • Maxwell相对于canal的优势是使用简单,Maxwell比Canal更加轻量级,它直接将数据变更输出为json字符串,不需要再编写客户端。对于缺乏基础建设,短时间内需要快速迭代的项目和公司比较合适。

  • 另外Maxwell 有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。

4、开启Mysql的Binlog

  • 1、服务器当中安装mysql(省略)

    • 注意:mysql的版本尽量不要太低,也不要太高,最好使用5.6及以上版本。
  • 2、添加mysql普通用户maxwell

    • 为mysql添加一个普通用户maxwell,因为maxwell这个软件默认用户使用的是maxwell这个用户。

    • 进入mysql客户端,然后执行以下命令,进行授权

    mysql -uroot -p123456
    • 执行sql语句
    --校验级别最低,只校验密码长度
    mysql> set global validate_password_policy=LOW;
    mysql> set global validate_password_length=6;
    
    --创建maxwell库(启动时候会自动创建,不需手动创建)和用户
    mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
    mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
    mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 
    --刷新权限
    mysql> flush privileges;

    maxwell会自动在MySQL中创建名为maxwell的数据库作为元数据保存使用。

  • 3、修改配置文件 /etc/my.cnf

    • 执行命令 sudo vim /etc/my.cnf, 添加或修改以下三行配置
    #binlog日志名称前缀
    log-bin= /var/lib/mysql/mysql-bin
    
    #binlog日志格式
    binlog-format=ROW
    
    #唯一标识,这个值的区间是:1到(2^32)-1
    server_id=1
  • 4、重启mysql服务

    • 执行如下命令
    sudo service mysqld restart
  • 5、验证binlog是否配置成功

    • 进入mysql客户端,并执行以下命令进行验证
    mysql -uroot -p123456
    mysql> show variables like '%log_bin%';

    image-20210517161330004

  • 6、查看binlog日志文件生成

    • 进入 /var/lib/mysql 目录,查看binlog日志文件.

    image-20210517162315281

5、Maxwell安装部署

  • 1、下载对应版本的安装包

  • 2、上传服务器

  • 3、解压安装包到指定目录

    tar -zxvf maxwell-1.21.1.tar.gz -C /kkb/install/
  • 4、修改maxwell配置文件

    • 进入到安装目录 /kkb/install/maxwell-1.21.1 进行如下操作
    cd /kkb/install/maxwell-1.21.1 
    cp config.properties.example config.properties
    vim config.properties
    • 配置文件config.properties 内容如下:
    # choose where to produce data to
    producer=kafka
    # list of kafka brokers
    kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
    # mysql login info
    host=node03
    port=3306
    user=maxwell
    password=123456
    # kafka topic to write to
    kafka_topic=maxwell
    • 注意:一定要保证使用maxwell 用户和 123456 密码能够连接上mysql数据库。

6、kafka介绍和使用

6.1 Kafka简介

​ Kafka是最初由Linkedin公司开发,它是一个分布式、可分区、多副本,基于zookeeper协调的分布式日志系统;常见可以用于web/nginx日志、访问日志,消息服务等等。Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。主要应用场景是:日志收集系统和消息系统

​ Kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。Kafka就是一种发布-订阅模式。将消息保存在磁盘中,以顺序读写方式访问磁盘,避免随机读写导致性能瓶颈。

  • 消息(Message)
    • 是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
  • 消息队列(Message Queue)
    • 一种应用间的通信方式,消息发送后可以立即返回,通过消息系统来确保信息的可靠传递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。

6.2 Kafka特性

  • 高吞吐、低延迟

    kafka 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性

    每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性

    Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失。
  • 容错性

    允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作。
  • 高并发

    支持数千个客户端同时读写。

6.3 Kafka集群架构

kafka集群架构

  • producer

    消息生产者,发布消息到Kafka集群的终端或服务。
  • broker

    Kafka集群中包含的服务器,一个borker就表示kafka集群中的一个节点。
  • topic

    每条发布到Kafka集群的消息属于的类别,即Kafka是面向 topic 的。
    更通俗的说Topic就像一个消息队列,生产者可以向其写入消息,消费者可以从中读取消息,一个Topic支持多个生产者或消费者同时订阅它,所以其扩展性很好。
  • partition

    每个 topic 包含一个或多个partition。Kafka分配的单位是partition。
  • replica

    partition的副本,保障 partition 的高可用。
  • consumer

    从Kafka集群中消费消息的终端或服务。
  • consumer group

    每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
  • leader

    每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。 producer 和 consumer 只跟 leader 交互。
  • follower

    Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。
  • controller

    知道大家有没有思考过一个问题,就是Kafka集群中某个broker宕机之后,是谁负责感知到他的宕机,以及负责进行Leader Partition的选举?如果你在Kafka集群里新加入了一些机器,此时谁来负责把集群里的数据进行负载均衡的迁移?包括你的Kafka集群的各种元数据,比如说每台机器上有哪些partition,谁是leader,谁是follower,是谁来管理的?如果你要删除一个topic,那么背后的各种partition如何删除,是谁来控制?还有就是比如Kafka集群扩容加入一个新的broker,是谁负责监听这个broker的加入?如果某个broker崩溃了,是谁负责监听这个broker崩溃?这里就需要一个Kafka集群的总控组件,Controller。他负责管理整个Kafka集群范围内的各种东西。
    
  • zookeeper

    (1)   Kafka 通过 zookeeper 来存储集群的meta元数据信息。
    (2)一旦controller所在broker宕机了,此时临时节点消失,集群里其他broker会一直监听这个临时节点,发现临时节点消失了,就争抢再次创建临时节点,保证有一台新的broker会成为controller角色。
  • offset

    • 偏移量
    消费者在对应分区上已经消费的消息数(位置),offset保存的地方跟kafka版本有一定的关系。
    kafka0.8 版本之前offset保存在zookeeper上。
    kafka0.8 版本之后offset保存在kafka集群上。
    它是把消费者消费topic的位置通过kafka集群内部有一个默认的topic,
    名称叫 __consumer_offsets,它默认有50个分区。

6.4 Kafka集群安装部署

  • 1、下载安装包(http://kafka.apache.org

    kafka_2.11-1.1.0.tgz
  • 2、规划安装目录

    /kkb/install
  • 3、上传安装包到服务器中

    通过FTP工具上传安装包到node01服务器上
  • 4、解压安装包到指定规划目录

    tar -zxvf kafka_2.11-1.1.0.tgz -C /kkb/install
  • 5、重命名解压目录

    mv kafka_2.11-1.1.0 kafka
  • 6、修改配置文件

    • 在node01上修改

    • 进入到kafka安装目录下有一个config目录

      • vi server.properties
      #指定kafka对应的broker id ,唯一
      broker.id=0
      #指定数据存放的目录
      log.dirs=/kkb/install/kafka/kafka-logs
      #指定zk地址
      zookeeper.connect=node01:2181,node02:2181,node03:2181
      #指定是否可以删除topic ,默认是false 表示不可以删除
      delete.topic.enable=true
      #指定broker主机名
      host.name=node01
    • 配置kafka环境变量

      • sudo vi /etc/profile
      export KAFKA_HOME=/kkb/install/kafka
      export PATH=$PATH:$KAFKA_HOME/bin
  • 6、分发kafka安装目录到其他节点

    scp -r kafka node02:/kkb/install
    scp -r kafka node03:/kkb/install
    scp /etc/profile node02:/etc
    scp /etc/profile node03:/etc
  • 7、修改node02和node03上的配置

    • node02

    • vi server.properties

      #指定kafka对应的broker id ,唯一
      broker.id=1
      #指定数据存放的目录
      log.dirs=/kkb/install/kafka/kafka-logs
      #指定zk地址
      zookeeper.connect=node01:2181,node02:2181,node03:2181
      #指定是否可以删除topic ,默认是false 表示不可以删除
      delete.topic.enable=true
      #指定broker主机名
      host.name=node02
    • node03

    • vi server.properties

      #指定kafka对应的broker id ,唯一
      broker.id=2
      #指定数据存放的目录
      log.dirs=/kkb/install/kafka/kafka-logs
      #指定zk地址
      zookeeper.connect=node01:2181,node02:2181,node03:2181
      #指定是否可以删除topic ,默认是false 表示不可以删除
      delete.topic.enable=true
      #指定broker主机名
      host.name=node03
  • 8、让每台节点的kafka环境变量生效

    • 在每台服务器执行命令
    source /etc/profile

6.5 kafka集群启动和停止

  • 1、启动kafka集群

    • 先启动zookeeper集群,然后在所有节点如下执行脚本
    nohup kafka-server-start.sh /kkb/install/kafka/config/server.properties >/dev/null 2>&1 &
  • 2、停止kafka集群

    • 所有节点执行关闭kafka脚本
    kafka-server-stop.sh

6.6 kafka命令行的管理使用

  • 1、创建topic

    • 使用 kafka-topics.sh脚本
    kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181
  • 2、查询所有的topic

    • 使用 kafka-topics.sh脚本
    kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 
  • 3、查看topic的描述信息

    • 使用 kafka-topics.sh脚本
    kafka-topics.sh --describe --topic test --zookeeper node01:2181,node02:2181,node03:2181  
  • 4、删除topic

    • 使用 kafka-topics.sh脚本
    kafka-topics.sh --delete --topic test --zookeeper node01:2181,node02:2181,node03:2181 
  • 5、模拟生产者写入数据到topic中

    • 使用 kafka-console-producer.sh 脚本
    kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test 
  • 6、模拟消费者拉取topic中的数据

    • 使用 kafka-console-consumer.sh 脚本
    kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic test --from-beginning

    或者(推荐)

    kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test --from-beginning

7、Maxwell实时采集mysql表数据到kafka

  • 1、启动kafka集群和zookeeper集群

    • 启动zookeeper集群
    #每台节点执行脚本
    nohup zkServer.sh start >/dev/null  2>&1 &
    • 启动kafka集群
    nohup /kkb/install/kafka/bin/kafka-server-start.sh /kkb/install/kafka/co
    nfig/server.properties > /dev/null 2>&1 &
  • 2、创建topic

    kafka-topics.sh --create --topic maxwell --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181

    (如果虚拟机磁盘容量有效,可以将分区数和复制因子都设置为1.)

  • 3、启动maxwell服务

    /kkb/install/maxwell-1.21.1/bin/maxwell
  • 4、插入数据并进行测试

    • 向mysql表中插入一条数据,并开启kafka的消费者,查看kafka是否能够接收到数据。

    • 向mysql当中创建数据库和数据库表并插入数据

      CREATE DATABASE /*!32312 IF NOT EXISTS*/test_db /*!40100 DEFAULT CHARACTER SET utf8 */;
      
      USE test_db;
      
      /*Table structure for table user */
      
      DROP TABLE IF EXISTS user;
      
      CREATE TABLE user (
      id varchar(10) NOT NULL,
      name varchar(10) DEFAULT NULL,
      age int(11) DEFAULT NULL,
      PRIMARY KEY (id)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
      
      /*Data for the table user */
      #插入数据
      insert  into user(id,name,age) values  ('1','xiaokai',20);
      #修改数据
      update user set age= 30 where id='1';
      #删除数据
      delete from user where id='1';
  • 5、启动kafka的自带控制台消费者

    • 启动Kafka消费者监听maxwell主题
    kafka-console-consumer.sh --topic maxwell --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning 
    • 等待一段事件,观察maxwell主题是否有消息发送过来
    {"database":"test_db","table":"user","type":"insert","ts":1621244407,"xid":985,"commit":true,"data":{"id":"1","name":"xiaokai","age":20}}
    
    {"database":"test_db","table":"user","type":"update","ts":1621244413,"xid":999,"commit":true,"data":{"id":"1","name":"xiaokai","age":30},"old":{"age":20}}
    
    {"database":"test_db","table":"user","type":"delete","ts":1621244419,"xid":1013,"commit":true,"data":{"id":"1","name":"xiaokai","age":30}}
    • json数据字段说明

    • database

      • 数据库名称
    • table

      • 表名称
    • type

      • 操作类型
      • 包括 insert/update/delete 等
    • ts

      • 操作时间戳
    • xid

      • 事务id
    • commit

      • 同一个xid代表同一个事务,事务的最后一条语句会有commit
    • data

      • 最新的数据,修改后的数据
    • old

      • 旧数据,修改前的数据

Views: 11