Maxwell 数据库数据实时采集

1、Maxwell 简介

Maxwell 是一个能实时读取 MySQL 二进制日志文件binlog,并生成 Json格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。

Maxwell主要提供了下列功能

    1. 支持SELECT * FROM table的方式进行全量数据初始化。
    1. 支持在主库发生failover后,自动恢复binlog位置,实现断点续传。
    1. 可以对数据进行分区,解决数据倾斜问题,发送到Kafka的数据支持库、表、列等级别的数据分区。
    1. 工作方式是伪装为slave接收binlog events,然后根据schema信息拼装,可以接受ddl、xid、row等event。

2、Mysql Binlog介绍

2.1 Binlog 简介

MySQL中一般有以下几种日志

日志类型 写入日志的信息
错误日志 记录在启动,运行或停止mysqld时遇到的问题
通用查询日志 记录建立的客户端连接和执行的语句
二进制日志 binlog 记录更改数据的语句
中继日志 从服务器 复制 主服务器接收的数据更改
慢查询日志 记录所有执行时间超过 long_query_time 秒的所有查询或不使用索引的查询
DDL日志(元数据日志) 元数据操作由DDL语句执行

在默认情况下,系统仅仅打开错误日志,关闭了其他所有日志,以达到尽可能减少IO损耗提高系统性能的目的,但是在一般稍微重要一点的实际应用场景中,都至少需要打开二进制日志,因为这是MySQL很多存储引擎进行增量备份的基础,也是MySQL实现复制的基本条件

接下来主要介绍二进制日志 binlog。

MySQL 的二进制日志 binlog 可以说是 MySQL 最重要的日志,它记录了所有的 DDLDML 语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。binlog 的主要目的是复制和恢复

Binlog日志的两个最重要的使用场景

  • MySQL主从复制
    • MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves来达到master-slave数据一致的目的。
  • 数据恢复
    • 通过使用 mysqlbinlog工具来使恢复数据。

2.2 Binlog 的日志格式

记录在二进制日志中的事件的格式取决于二进制记录格式。支持三种格式类型:

  • Statement:基于SQL语句的复制(statement-based replication, SBR)
  • Row:基于行的复制(row-based replication, RBR)
  • Mixed:混合模式复制(mixed-based replication, MBR)

Statement

  • 每一条会修改数据的sql都会记录在binlog中。
  • 优点
    • 不需要记录每一行的变化,减少了binlog日志量,节约了IO, 提高了性能。
  • 缺点
    • 在进行数据同步的过程中有可能出现数据不一致。
    • 比如 update tt set create_date=now(),如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。

Row

  • 它不记录sql语句上下文相关信息,仅保存哪条记录被修改。
  • 优点
    • 保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,它只记录执行后的效果。
  • 缺点
    • 每行数据的修改都会记录,最明显的就是update语句,导致更新多少条数据就会产生多少事件,占用较大空间。

Mixed

  • 从5.1.8版本开始,MySQL提供了Mixed格式,实际上就是Statement与Row的结合。
  • 在Mixed模式下,一般的复制使用Statement模式保存binlog,对于Statement模式无法复制的操作使用Row模式保存binlog, MySQL会根据执行的SQL语句选择日志保存方式(因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为Row模式)。
  • 优点
    • 节省空间,同时兼顾了一定的一致性。
  • 缺点
    • 还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。

3、Mysql 实时数据同步方案对比

  • mysql 数据实时同步可以通过解析mysql的 binlog 的方式来实现,解析binlog可以有多种方式,可以通过canal,或者maxwell等各种方式实现。以下是各种抽取方式的对比介绍。

    mysql实时同步方案对比

  • 其中canal 由 Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费canal解析到的数据。

  • Maxwell相对于canal的优势是使用简单,Maxwell比Canal更加轻量级,它直接将数据变更输出为json字符串,不需要再编写客户端。对于缺乏基础建设,短时间内需要快速迭代的项目和公司比较合适。

  • 另外Maxwell 有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。

4、开启Mysql的Binlog

  • 1、服务器当中安装mysql(省略)

    • 注意:mysql的版本尽量不要太低,也不要太高,最好使用5.6及以上版本。
  • 2、添加mysql普通用户maxwell

    • 为mysql添加一个普通用户maxwell,因为maxwell这个软件默认用户使用的是maxwell这个用户。

    • 进入mysql客户端,然后执行以下命令,进行授权

    • 执行sql语句

    maxwell会自动在MySQL中创建名为maxwell的数据库作为元数据保存使用。

  • 3、修改配置文件 /etc/my.cnf

    • 执行命令 sudo vim /etc/my.cnf, 添加或修改以下三行配置

  • 4、重启mysql服务

    • 执行如下命令

  • 5、验证binlog是否配置成功

    • 进入mysql客户端,并执行以下命令进行验证

    image-20210517161330004

  • 6、查看binlog日志文件生成

    • 进入 /var/lib/mysql 目录,查看binlog日志文件.

    image-20210517162315281

5、Maxwell安装部署

  • 1、下载对应版本的安装包

  • 2、上传服务器

  • 3、解压安装包到指定目录

  • 4、修改maxwell配置文件

    • 进入到安装目录 /kkb/install/maxwell-1.21.1 进行如下操作

    • 配置文件config.properties 内容如下:

    • 注意:一定要保证使用maxwell 用户和 123456 密码能够连接上mysql数据库。

6、kafka介绍和使用

6.1 Kafka简介

​ Kafka是最初由Linkedin公司开发,它是一个分布式、可分区、多副本,基于zookeeper协调的分布式日志系统;常见可以用于web/nginx日志、访问日志,消息服务等等。Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。主要应用场景是:日志收集系统和消息系统

​ Kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。Kafka就是一种发布-订阅模式。将消息保存在磁盘中,以顺序读写方式访问磁盘,避免随机读写导致性能瓶颈。

  • 消息(Message)
    • 是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
  • 消息队列(Message Queue)
    • 一种应用间的通信方式,消息发送后可以立即返回,通过消息系统来确保信息的可靠传递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。

6.2 Kafka特性

  • 高吞吐、低延迟

  • 高伸缩性

  • 持久性、可靠性

  • 容错性

  • 高并发

6.3 Kafka集群架构

kafka集群架构

  • producer

  • broker

  • topic

  • partition

  • replica

  • consumer

  • consumer group

  • leader

  • follower

  • controller

  • zookeeper

  • offset

    • 偏移量

6.4 Kafka集群安装部署

  • 1、下载安装包(http://kafka.apache.org

  • 2、规划安装目录

  • 3、上传安装包到服务器中

  • 4、解压安装包到指定规划目录

  • 5、重命名解压目录

  • 6、修改配置文件

    • 在node01上修改

    • 进入到kafka安装目录下有一个config目录

      • vi server.properties

    • 配置kafka环境变量

      • sudo vi /etc/profile

  • 6、分发kafka安装目录到其他节点

  • 7、修改node02和node03上的配置

    • node02

    • vi server.properties

    • node03

    • vi server.properties

  • 8、让每台节点的kafka环境变量生效

    • 在每台服务器执行命令

6.5 kafka集群启动和停止

  • 1、启动kafka集群

    • 先启动zookeeper集群,然后在所有节点如下执行脚本

  • 2、停止kafka集群

    • 所有节点执行关闭kafka脚本

6.6 kafka命令行的管理使用

  • 1、创建topic

    • 使用 kafka-topics.sh脚本

  • 2、查询所有的topic

    • 使用 kafka-topics.sh脚本

  • 3、查看topic的描述信息

    • 使用 kafka-topics.sh脚本

  • 4、删除topic

    • 使用 kafka-topics.sh脚本

  • 5、模拟生产者写入数据到topic中

    • 使用 kafka-console-producer.sh 脚本

  • 6、模拟消费者拉取topic中的数据

    • 使用 kafka-console-consumer.sh 脚本

    或者(推荐)

7、Maxwell实时采集mysql表数据到kafka

  • 1、启动kafka集群和zookeeper集群

    • 启动zookeeper集群

    • 启动kafka集群

  • 2、创建topic

    (如果虚拟机磁盘容量有效,可以将分区数和复制因子都设置为1.)

  • 3、启动maxwell服务

  • 4、插入数据并进行测试

    • 向mysql表中插入一条数据,并开启kafka的消费者,查看kafka是否能够接收到数据。

    • 向mysql当中创建数据库和数据库表并插入数据

  • 5、启动kafka的自带控制台消费者

    • 启动Kafka消费者监听maxwell主题

    • 等待一段事件,观察maxwell主题是否有消息发送过来

    • json数据字段说明

    • database

      • 数据库名称
    • table

      • 表名称
    • type

      • 操作类型
      • 包括 insert/update/delete 等
    • ts

      • 操作时间戳
    • xid

      • 事务id
    • commit

      • 同一个xid代表同一个事务,事务的最后一条语句会有commit
    • data

      • 最新的数据,修改后的数据
    • old

      • 旧数据,修改前的数据

Views: 11

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注