2. SpringBoot 整合 Canal 优化
  TEZNKK3IfmPf 2023年11月13日 33 0

一般来说,一个项目里面会监听多个表的变化。

application.yml 配置

#配置 canal
canal:
server:
ip: 127.0.0.1
port: 11111
destination: example
username: canal
password: canal
schemaName: springboot
consumer:
start: true

对应的目录:

2. SpringBoot 整合 Canal 优化

实体 DBDataChange

package top.yueshushu.learn.consumer.dto;

import lombok.Data;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* @author yjl
* @date 2022-04-29
*/
@Data
public class DBDataChange implements Serializable {

private static final long serialVersionUID = -1L;
/**
* 数据库名
*/
private String schemaName;
/**
* 监控的表名
*/
private String tableName;
/**
* 对应的事件类型 INSERT UPDATE DELETE
*/
private String eventType;
/**
* 操作时间
*/
private long operatorTime;
/**
* 执行时间
*/
private long executeTime;
/**
* 构建时间
*/
private long buildTime;
/**
* 执行的 sql
*/
private String sql;
/**
二维数组结构,一次dml,可能操作多行
map.key: columnName 针对的是同一条sql的处理。
*/
private List<Map<String, Detail>> dataList;

public void addDetail(Map<String, Detail> detailMap) {
if (null == dataList) {
this.dataList = new ArrayList<>();
}
this.dataList.add(detailMap);
}

public void setDataList(List<Map<String, Detail>> dataList) {
this.dataList = dataList;
}

@Data
public static class Detail implements Serializable {

private static final long serialVersionUID = 1L;
/**
* 列名
*/
String columnName;
/**
* 是否更新
*/
boolean updated;
/**
* 之前的值
*/
Object before;
/**
* 现在的值
*/
Object after;
}
}

JdbcTypeUtil 工具类转换

package top.yueshushu.learn.consumer.util;

import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.*;

/**
* 类型转换工具类
*
* @author yjl 2022-04-29
* @version 1.0.0
*/
public class JdbcTypeUtil {

private static Logger logger = LoggerFactory.getLogger(JdbcTypeUtil.class);

public static Object getRSData(ResultSet rs, String columnName, int jdbcType) throws SQLException {
if (jdbcType == Types.BIT || jdbcType == Types.BOOLEAN) {
return rs.getByte(columnName);
} else {
return rs.getObject(columnName);
}
}

public static Class<?> jdbcType2javaType(int jdbcType) {
switch (jdbcType) {
case Types.BIT:
case Types.BOOLEAN:
// return Boolean.class;
case Types.TINYINT:
return Byte.TYPE;
case Types.SMALLINT:
return Short.class;
case Types.INTEGER:
return Integer.class;
case Types.BIGINT:
return Long.class;
case Types.DECIMAL:
case Types.NUMERIC:
return BigDecimal.class;
case Types.REAL:
return Float.class;
case Types.FLOAT:
case Types.DOUBLE:
return Double.class;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
return String.class;
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
case Types.BLOB:
return byte[].class;
case Types.DATE:
return Date.class;
case Types.TIME:
return Time.class;
case Types.TIMESTAMP:
return Timestamp.class;
default:
return String.class;
}
}

public static Object typeConvert(String columnName, String value, int sqlType, String mysqlType) {
if (value == null || value.equals("")) {
return null;
}

try {
Object res;
switch (sqlType) {
case Types.INTEGER:
res = Integer.parseInt(value);
break;
case Types.SMALLINT:
res = Short.parseShort(value);
break;
case Types.BIT:
case Types.TINYINT:
res = Byte.parseByte(value);
break;
case Types.BIGINT:
if (mysqlType.startsWith("bigint") && mysqlType.endsWith("unsigned")) {
res = new BigInteger(value);
} else {
res = Long.parseLong(value);
}
break;
// case Types.BIT:
case Types.BOOLEAN:
res = !"0".equals(value);
break;
case Types.DOUBLE:
case Types.FLOAT:
res = Double.parseDouble(value);
break;
case Types.REAL:
res = Float.parseFloat(value);
break;
case Types.DECIMAL:
case Types.NUMERIC:
res = new BigDecimal(value);
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
case Types.BLOB:
res = value.getBytes("ISO-8859-1");
break;
case Types.DATE:
if (!value.startsWith("0000-00-00")) {
value = value.trim().replace(" ", "T");
DateTime dt = new DateTime(value);
res = new Date(dt.toDate().getTime());
} else {
res = null;
}
break;
case Types.TIME:
value = "T" + value;
DateTime dt = new DateTime(value);
res = new Time(dt.toDate().getTime());
break;
case Types.TIMESTAMP:
if (!value.startsWith("0000-00-00")) {
value = value.trim().replace(" ", "T");
dt = new DateTime(value);
res = new Timestamp(dt.toDate().getTime());
} else {
res = null;
}
break;
case Types.CLOB:
default:
res = value;
}
return res;
} catch (Exception e) {
logger.error("table: {} column: {}, failed convert type {} to {}", columnName, value, sqlType, mysqlType);
return value;
}
}
}

DBDataChangeConsumer

package top.yueshushu.learn.consumer;
import top.yueshushu.learn.consumer.dto.DBDataChange;

import java.util.List;

/**
* 数据库配置信息
*/
public interface DBDataChangeConsumer {
/**
* 获取数据库配置信息。
* 针对单个数据库时,可以设置默认的 方法,返回默认的数据库。
* @return 返回默认的数据库名
*/
default String getSchemaName() {
return "springboot";
}

/**
* 获取表名
* @return 返回对应的表名。
* 每一个实现 Impl 都是针对的一个表
*/
String getTableName();

/**
* 接收数据之后的处理
* @param dataChanges
*/
void accept(List<DBDataChange> dataChanges);
}

DBDataChangeMsgHandler

package top.yueshushu.learn.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import top.yueshushu.learn.consumer.dto.DBDataChange;
import top.yueshushu.learn.consumer.util.JdbcTypeUtil;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* @author yjl
* @date 2022-04-29
*/
@Slf4j
@Component
public class DBDataChangeMsgHandler {

@Value("${canal.server.ip:127.0.0.1}")
private String canalServerIp;
@Value("${canal.port:11111}")
private int canalPort;
@Value("${canal.destination:example}")
private String destination;
@Value("${canal.username:}")
private String username;
@Value("${canal.password:}")
private String password;
@Value("${canal.schemaName:springboot}")
private String schemaName;
@Value("${canal.consumer.start:false}")
private boolean start;

@Resource
private ApplicationContext ctx;

/**
* CanalConnector 连接对象
*/
private CanalConnector connector;

private Map<String, DBDataChangeConsumer> consumerByKey;
/**
* 是否正在启动中
*/
private volatile boolean run;

/**
* 系统启动时,进行初始化监听
*/
@PostConstruct
private void init() {
log.info(">>>>>>开始进行初始化");
if (!start) {
log.info("System will not start canal consumer...");
return;
}

consumerByKey = findConsumer();
if (CollectionUtils.isEmpty(consumerByKey)) {
log.info("no consumer. System won't listen on canal");
return;
}
createConnector();
}

/**
* 系统关闭时,进行断开连接。
*/
@PreDestroy
public void disconnect() {
run = false;
if (null != connector) {
connector.disconnect();
}
}

/**
* 构建监听的信息集合,即 subscribe()方法中的内容key
* @return
*/
private Map<String, DBDataChangeConsumer> findConsumer() {
// 获取实现该 接口的所有实现类信息。
Map<String, DBDataChangeConsumer> consumers = ctx.getBeansOfType(DBDataChangeConsumer.class);
Map<String, DBDataChangeConsumer> tempConsumerByKey = new HashMap<>(consumers.size(), 1.0f);
consumers.values().forEach(consumer -> {
//没有表名,不进行处理了。 筛选
if (StringUtils.isEmpty(consumer.getTableName())) {
log.warn("consumer.tableName is empty, ignore. consumer: " + consumer);
return;
}
String schema = consumer.getSchemaName();
DBDataChangeConsumer previousConsumer = tempConsumerByKey.put(StringUtils.isEmpty(schema) ? this.schemaName : schema + "." + consumer.getTableName(), consumer);
if (null != previousConsumer) {
log.warn("consumer for table is duplicated. only keep one. tableName:{}, ignore consumer:{}", previousConsumer.getTableName(), previousConsumer);
}
});
return tempConsumerByKey;
}

/**
* 创建连接处理
*/
private void createConnector() {
// 创建连接
connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(canalServerIp, canalPort), destination, username, password);
connector.connect();

String subscribe = String.join(",", consumerByKey.keySet());
// .*代表database,..*代表table
connector.subscribe(subscribe);
//错误的数据回滚
connector.rollback();

Thread thread = new Thread(() -> {
run = true;
pullMsg();
}, "canal-consumer-thread-01");
thread.start();
}

/**
* 获取消息,并对消息进行处理
*/
private void pullMsg() {

int batchSize = 1000;
while (run) {
try {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
continue;
} catch (InterruptedException ignored) {
log.warn("线程中断异常");
Thread.currentThread().interrupt();
}
}
/*
* 对数据进行处理 封装成 List<DBDataChange> 对象
*/
List<DBDataChange> dataChanges = parseData(message.getEntries());
if (!CollectionUtils.isEmpty(dataChanges)) {
// 前期验证简单,量小,非核心业务,直接同步处理,不异步线程池化
// 有数据,对数据进行处理。
dispatch(dataChanges);
}
// 提交确认
connector.ack(batchId);
} catch (Exception e) {
log.error("pull msg error", e);
if (CanalClientException.class.isAssignableFrom(e.getClass())) {
log.warn("连接无效,进行重试");
reconnect();
}
}
}
}

private void reconnect() {

while (true) {
try {
Thread.sleep(1000L * 60L);
} catch (InterruptedException ignore) {
log.warn("线程中断异常");
Thread.currentThread().interrupt();
}

try {
connector.disconnect();
connector.connect();
log.warn("canal reconnect success...");
break;
} catch (CanalClientException e) {
log.error("reconnect exception", e);
}

}
}

private void dispatch(List<DBDataChange> dataChanges) {

Map<String, List<DBDataChange>> dataChangesByKey =
dataChanges.stream().collect(Collectors.groupingBy(dataChange -> dataChange.getSchemaName() + "." + dataChange.getTableName()));
dataChangesByKey.forEach((key, values) -> {
DBDataChangeConsumer dbDataChangeConsumer = consumerByKey.get(key);
if (dbDataChangeConsumer != null) {
try {
dbDataChangeConsumer.accept(values);
} catch (Exception e) {
log.error("consume error. key: " + key + ", values: " + JSON.toJSONString(values), e);
}
} else {
log.warn("no consumer. key: " + key);
}
});
}

/**
* 对数据进行封装,处理
* @param entryList 数据信息
* @return 返回封装好的数据信息对象
*/
private static List<DBDataChange> parseData(List<Entry> entryList) {

List<DBDataChange> dataChanges = new ArrayList<>(entryList.size());
for (Entry entry : entryList) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}

RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
}
// canal 一般监听的是 数据的变化情况 。
if (rowChange.getIsDdl()) {
// ddl不处理
continue;
}

EventType eventType = rowChange.getEventType();
DBDataChange dbDataChange = new DBDataChange();
dataChanges.add(dbDataChange);

dbDataChange.setSchemaName(entry.getHeader().getSchemaName());
dbDataChange.setTableName(entry.getHeader().getTableName());
dbDataChange.setEventType(eventType.toString());
dbDataChange.setExecuteTime(entry.getHeader().getExecuteTime());
dbDataChange.setBuildTime(System.currentTimeMillis());
dbDataChange.setSql(rowChange.getSql());

for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
// 删除,处理的是之前的数据
dbDataChange.addDetail(toBeforeChange(rowData.getBeforeColumnsList()));
} else if (eventType == EventType.INSERT) {
// 插入,处理的是 之后的数据
dbDataChange.addDetail(toAfterChange(rowData.getAfterColumnsList()));
} else {
Map<String, DBDataChange.Detail> beforeDetailMap = toBeforeChange(rowData.getBeforeColumnsList());
Map<String, DBDataChange.Detail> afterDetailMap = toAfterChange(rowData.getAfterColumnsList());
beforeDetailMap.forEach((key, beforeValue) ->
afterDetailMap.merge(key, beforeValue, (newV, oldV) -> {
newV.setBefore(oldV.getBefore());
return newV;
})
);
dbDataChange.addDetail(afterDetailMap);
}
}
}

return dataChanges;
}

/**
* 处理 删除, 或者修改前的字段信息
* @param columnsList 列
* @return 返回对应的 column 为key的 对象信息
*/
private static Map<String, DBDataChange.Detail> toBeforeChange(List<Column> columnsList) {

Map<String, DBDataChange.Detail> detailMap = new HashMap<>(columnsList.size(), 1.0f);
for (Column column : columnsList) {
DBDataChange.Detail detail = buildDetail(column);
detail.setBefore(JdbcTypeUtil.typeConvert(column.getName(), column.getValue(), column.getSqlType(), column.getMysqlType()));
detailMap.put(detail.getColumnName(), detail);
}
return detailMap;
}

/**
* 处理 插入, 修改后的字段信息
* @param columnsList column 对象
* @return 返回相应的类
*/
private static Map<String, DBDataChange.Detail> toAfterChange(List<Column> columnsList) {

Map<String, DBDataChange.Detail> detailMap = new HashMap<>(columnsList.size(), 1.0f);
for (Column column : columnsList) {
DBDataChange.Detail detail = buildDetail(column);
detail.setAfter(JdbcTypeUtil.typeConvert(column.getName(), column.getValue(), column.getSqlType(), column.getMysqlType()));
detailMap.put(detail.getColumnName(), detail);
}
return detailMap;
}
/**
* 根据 column 构建对象
* @param column column 对象
* @return 返回相应的类
*/
private static DBDataChange.Detail buildDetail(Column column) {
DBDataChange.Detail detail = new DBDataChange.Detail();
detail.setColumnName(column.getName());
detail.setUpdated(column.getUpdated());
return detail;
}


}

具体的类实现 CanalUserImpl

修改时,只需要处理添加这个 实现类即可

package top.yueshushu.learn.consumer.impl;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.yueshushu.learn.consumer.DBDataChangeConsumer;
import top.yueshushu.learn.consumer.dto.DBDataChange;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
对表的监控
*
* @author yjl
* */
@Component
@Slf4j
public class CanalUserImpl implements DBDataChangeConsumer {

// 可以通过 @Resource 注解来引入其它的 Service 业务层

@Override
public String getSchemaName() {
return "springboot";
}

@Override
public String getTableName() {
return "user";
}

@Override
public void accept(List<DBDataChange> dataChanges) {
log.info(" user 表 用户信息变动");
for(DBDataChange dbDataChange:dataChanges){
//只处理 Update Delete Insert 三个类型的信息即可。
String eventType = dbDataChange.getEventType().toUpperCase();
if(!("UPDATE".equals(eventType)||"DELETE".equals(eventType)||"INSERT".equals(eventType))){
continue;
}
//只处理 改变信息数据的.
log.info("对表 {} 进行的处理操作是:{}",getTableName(),eventType);
// 对数据进行处理操作
switch(eventType){
case "UPDATE":{
updateHandler(dbDataChange);
break;
}
case "DELETE":{
deleteHandler(dbDataChange);
break;
}
case "INSERT":{
insertHandler(dbDataChange);
break;
}
default:{
break;
}
}
}
}

/**
* 对更新操作进行处理
* @param dbDataChange 数据对象
*/
private void updateHandler(DBDataChange dbDataChange) {
List<Map<String, DBDataChange.Detail>> dataList = dbDataChange.getDataList();
//循环数据
for(Map<String,DBDataChange.Detail> userChangeMap:dataList){
//对应的是信息
Set<Map.Entry<String, DBDataChange.Detail>> entries = userChangeMap.entrySet();
for(Map.Entry<String,DBDataChange.Detail> entry:entries){
String column=entry.getKey();
//获取改变的属性值
DBDataChange.Detail value = entry.getValue();
if (value !=null &&value.isUpdated()){
log.info(">>>更新字段 {},更新前的值是:{},更新后的新值是:{}",column,
Optional.of(value.getBefore()).map(
Object::toString
).orElse(null),
Optional.of(value.getAfter()).map(
Object::toString
).orElse(null));
//放置具体的业务处理
}
}
}
}

/**
* 对删除操作进行处理
* @param dbDataChange 数据对象
*/
private void deleteHandler(DBDataChange dbDataChange) {
List<Map<String, DBDataChange.Detail>> dataList = dbDataChange.getDataList();
//循环数据
for(Map<String,DBDataChange.Detail> userChangeMap:dataList){
//对应的是信息
Set<Map.Entry<String, DBDataChange.Detail>> entries = userChangeMap.entrySet();
for(Map.Entry<String,DBDataChange.Detail> entry:entries){
String column=entry.getKey();
//获取改变的属性值
DBDataChange.Detail value = entry.getValue();
if (value !=null &&value.getBefore()!=null){
log.info(">>>删除字段 {},删除前的值是:{}",column,value.getBefore().toString());
//放置具体的业务处理
}
}
}
}

/**
* 对插入操作进行处理
* @param dbDataChange 数据对象
*/
private void insertHandler(DBDataChange dbDataChange) {
List<Map<String, DBDataChange.Detail>> dataList = dbDataChange.getDataList();
//循环数据
for(Map<String,DBDataChange.Detail> userChangeMap:dataList){
//对应的是信息
Set<Map.Entry<String, DBDataChange.Detail>> entries = userChangeMap.entrySet();
for(Map.Entry<String,DBDataChange.Detail> entry:entries){
String column=entry.getKey();
//获取改变的属性值
DBDataChange.Detail value = entry.getValue();
if (value !=null &&value.getAfter()!=null){
log.info(">>>插入字段 {},插入的值是:{}",column,value.getAfter().toString());
//放置具体的业务处理
}
}
}
}
}

添加,更新,删除数据时,会正常的 Canal 消费。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

推荐阅读
TEZNKK3IfmPf