Maxwell教程
第一章、 Maxwell简介
1.1、Maxwell 概述
Maxwell 是由美国Zendesk公司开源,用Java编写的MySQL变更数据抓取软件。它会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流数据处理平台。官网地址:http://maxwells-daemon.io/
1.2、Maxwell输出数据格式
1.2.1、mysql insert
mysql
insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem'
maxwell
maxwell: {
"database": "test",
"table": "maxwell",
"type": "insert",
"ts": 1449786310,
"xid": 940752,
"commit": true,
"data": { "id":1, "daemon": "Stanislaw Lem" }
}
1.2.2、update
mysql
update test.maxwell set daemon = 'firebus! firebus!' where id = 1;
maxwell
maxwell: {
"database": "test",
"table": "maxwell",
"type": "update",
"ts": 1449786341,
"xid": 940786,
"commit": true,
"data": {"id":1, "daemon": "Firebus! Firebus!"},
"old": {"daemon": "Stanislaw Lem"}
}
注:Maxwell输出的json字段说明
字段 |
说明 |
database |
变更数据所属的数据库 |
table |
表更数据所属的表 |
type |
数据变更类型 |
ts |
数据变更发生的时间戳 |
xid |
事务id |
commit |
事务提交标志,可用于重新组装事务 |
data |
对于insert类型,表示插入的数据;对于update类型,标识修改之后的数据;对于delete类型,表示删除的数据 |
old |
对于update类型,表示修改之前的数据,只包含变更字段 |
第二章、Maxwell原理
Maxwell的工作原理是实时读取MySQL数据库的二进制日志(Binlog),从中获取变更数据,再将变更数据以JSON格式发送至Kafka等流处理平台。
2.1、 MySQL二进制日志
二进制日志(Binlog)是MySQL服务端非常重要的一种日志,它会保存MySQL数据库的所有数据变更记录。Binlog的主要作用包括主从复制和数据恢复。Maxwell的工作原理和主从复制密切相关。
2.2、MySQL主从复制
MySQL的主从复制,就是用来建立一个和主数据库完全一样的数据库环境,这个数据库称为从数据库。
1)主从复制的应用场景如下:
(1)做数据库的热备:主数据库服务器故障后,可切换到从数据库继续工作。
(2)读写分离:主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库工作效率。
2)主从复制的工作原理如下:
(1)Master主库将数据变更记录,写到二进制日志(binary log)中
(2)Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log)
(3)Slave从库读取并回放中继日志中的事件,将改变的数据同步到自己的数据库。
第三章、MaxWell的部署
注:Maxwell-1.30.0及以上版本不再支持JDK1.8。
步骤一、解压重命名
[root@master ~]#
tar -xzvf /chinaskills/maxwell-1.29.0.tar.gz -C /usr/local/src/
mv /usr/local/src/maxwell-1.29.0 /usr/local/src/maxwell
步骤二、配置环境变量
[root@master ~]#
vim /root/.bash_profile
配置内容:
export MAXWELL_HOME=/usr/local/src/maxwell
export PATH=$PATH:$MAXWELL_HOME/bin
加载环境变量
source /root/.bash_profile
步骤三、修改mysql配置
[root@master ~]#
vim /etc/my.cnf
配置内容:
# binlog类型,maxwell要求为row类型
binlog_format=row
# 数据库id
server_id=1
# 启动binlog,该参数的值会作为binlog的文件名
log-bin=master
#启用binlog的数据库
binlog-do-db=test
注:MySQL Binlog模式
Statement-based:基于语句,Binlog会记录所有写操作的SQL语句,包括insert、update、delete等。
优点: 节省空间
缺点: 有可能造成数据不一致,例如insert语句中包含now()函数。
Row-based:基于行,Binlog会记录每次写操作后被操作行记录的变化。
优点:保持数据的绝对一致性。
缺点:占用较大空间。
mixed:混合模式,默认是Statement-based,如果SQL语句可能导致数据不一致,就自动切换到Row-based。
Maxwell要求Binlog采用Row-based模式。
步骤四、重启mysql
[root@master ~]#
systemctl stop mysqld
systemctl start mysqld
步骤五、创建用户并赋予权限
[root@master ~]#
create user 'maxwell'@'%' identified by 'maxwell';
create user 'maxwell'@'localhost' identified by 'maxwell';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';
步骤六、命令行运行
[root@master ~]#
maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=stdout
新建终端,写入表和数据
create table student(id int,name varchar(255));
insert into student(id, name) VALUES (1,'Su');
insert into student(id, name) VALUES (2,'Ming');
insert into student(id, name) VALUES (3,'Yang');
步骤七、配置config.properties
[root@master ~]#
cp /usr/local/src/maxwell/config.properties.example /usr/local/src/maxwell/config.properties
vim /usr/local/src/maxwell/config.properties
配置内容:
log_level=info
producer=stdout
# mysql login info
host=localhost
user=maxwell
password=maxwell
运行命令
maxwell --config /usr/local/src/maxwell/config.properties
后台运行可使用参数:–daemon
后台关闭
ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9
第四章、kafka采集maxwell的数据
注:将kafka服务启动,主题名称为test
步骤一、命令行启动
[root@master ~]#
maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=test --daemon
步骤二、启动kafka 消费者
[root@master ~]#
kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092 --from-beginning
步骤三、在mysql中插入数据
insert into student(id, name) VALUES (666,'kafka consumer');
步骤四、配置config.properties
[root@master ~]#
vim /usr/local/src/maxwell/config.properties
配置内容:
# 设置kafka主题
kafka_topic=test
# 设置生成者
producer=kafka
# 设置bootstrap服务
kafka.bootstrap.servers=localhost:9092
# 设置mysql主机
host=localhost
# 设置mysql用户名
user=maxwell
# 设置mysql密码
password=maxwell
步骤五、启动maxwell
[root@master ~]#
maxwell --config /usr/local/src/maxwell/config.properties --daemon
kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092 --from-beginning
sql 新插入一条数据测试
insert into student(id, name) VALUES (123,'kafka config test');
使用filter可以将不需要更新的表过滤
第五章、Maxwell-bootstrap
[root@master ~]#
maxwell-bootstrap --database test --table student --config /usr/local/src/maxwell/config.properties
该方法默认sql:select * from table
支持where条件查询
不建议使用