一、数据同步思路分析
1.1.数据同步问题分析
elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elastic search与mysql之间的数据同步。
问题:在微服务中,负责酒店管理(操作mysql)的业务与负责酒店搜索(操作elastic search)的业务可能在两个不同的微服务上,数据同步该如何实现呢?
MySQL中的数据并把不是一成不变的,MySQL变了,ES也需要变不仅仅ES有了数据同步问题,数据库双写的情况,Red is与MySQL也有数据同步问题这些问题如何解决呢?如果现在项目是一个单体的项目,所有的业务都写在一个项目当中,那就非常好办前我再写新增修改删除的业务时同时把ES索引库也更新了。但是我们的项目是微服务,不同的业务在不同的微服务上
1.2.数据同步解决方案
1.2.1.方案一:同步调用
同步调用存在问题:业务会出现耦合(MySQL完成CRUD操作的时候,需要调用hotel-demo中更新索引库的接口,这样会出现业务的耦合,但是实现起来比较简单),影响性能
1.2.2.方案二:异步通知(推荐)
通过使用MQ解决了上面同步调用的问题,提升了性能,但是使用引入了MQ中间件,复杂性上升
1.2.3.方案三:监听binlog
MysQL里面有binloga默认是关闭的,一且开启,每当MysQL做增删改操作,都会将响应的操作记录在binlog当中,只要数据变化了,binlog就会变化,我们可以利用类似canal的中间件监听binlog,一且发生变化,就会通知对应的微服务,在这个方案中它完全依赖于这个中间件,监听MySQL跟酒店管理服务没有任何关系,完全解除了服务之间的耦合了,这种方案可降低耦合,但是他要开启MySQL的binlog,对MySQL来讲它的压力就增加了
二、实现elastic search与数据库数据同步
案例:利用MQ实现mysql与elasticsearch数据同步
利用hotel-admin项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。步骤:
- 导入hotel-admin项目,启动并测试酒店数据的CRUD
- 声明exchange、queue、Routing Key
- 在hotel-admin中的增、删、改业务中完成消息发送
- 在hotel-demo中完成消息监听,并更新elastic search中数据
- 启动并测试数据同步功能
实现jia如下:
2.1.打开操作酒店管理的微服务完成增删改
调整数据库连接,然后重启项目,测试增、删、改就可以
2.2.声明队列和交换机
在hotel-admin(elasticsearch)模块声明exchange、queue、RoutingKey
2.2.1.引入RabbitMQ队列
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2.2.在配置文件中配置RabbitMQ的地址:
这里主要是application.yaml中配置连接信息
2.2.3.定义常量类
声明队列交换机、队列、RoutingKey:在constants包下定义一个常量类:
public class MqConstants {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 新增和修改的队列: 新增和修改用一个队列,传递过去id,如果id存在就是修改,不存在就是删除
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 删除的队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增和删除的RoutingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert.update";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
2.2.4.创建交换机、队列、然后进行绑定
在config包下创建MqConfig 基于Bean创建队列、交换机然后绑定,代码如下:
@Configuration
public class MqConfig {
/**
* 创建交换机:TopicExchange
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true,false);
}
/**
* 创建新增、修改队列
* @return
*/
@Bean
public Queue insertAndUpdateQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
/**
* 创建删除队列
* @return
*/
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
}
/**
* 将新增、修改的队列和交换机进行绑定
* @param insertAndUpdateQueue 修改、新增的队列
* @param topicExchange 交换机
* @return
*/
@Bean
public Binding insertAndUpdateQueueBinding(Queue insertAndUpdateQueue, TopicExchange topicExchange){
return BindingBuilder.bind(insertAndUpdateQueue).to(topicExchange).with(MqConstants.HOTEL_INSERT_KEY);
}
/**
* 将删除的队列和交换机进行绑定
* @param deleteQueue 删除的队列
* @param topicExchange 交换机
* @return
*/
@Bean
public Binding deleteQueueBinding(Queue deleteQueue, TopicExchange topicExchange){
return BindingBuilder.bind(deleteQueue).to(topicExchange).with(MqConstants.HOTEL_DELETE_KEY);
}
}
2.3.发送mq消息
这里发送消息要利用生产者也就是hotel-admin模块完成在新增、修改和删除操作后,发送消息到代理的操作,如下:
2.3.1.导入RabbitMQ依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.3.2.复制在之前模块创建的常量类
在hotel-admin模块中加入上面写的常量类:
public class MqConstants {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 新增和修改的队列: 新增和修改用一个队列,传递过去id,如果id存在就是修改,不存在就是删除
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 删除的队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增和删除的RoutingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert.update";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
2.3.3.配置RabbitMQ的地址:
2.3.4.编写消息发送的代码
在消息管理模块(消息的发送者),当新增一条消息后,需告诉Hotel-demo(消息的接受者)说消息新增了,调用RabbitTemplate发送这条消息:第一个参数交换机、第二个队列、第三个内容,发送的消息为这个对象的id,不发送这个对象,来减少MQ内存的消耗。
修改HotelController 中对于酒店信息增、删、改的方法添加往rabbitMQ发送消息的代码如下:
@RestController
@RequestMapping("hotel")
public class HotelController {
@Autowired
private IHotelService hotelService;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id){
return hotelService.getById(id);
}
@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
){
Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords());
}
/**
* 新增
* @param hotel
*/
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
//指定发送的交换机和routing_key 将消息发送到路由
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
/**
* 修改
* @param hotel
*/
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
//指定发送的交换机和routing_key 将消息发送到路由
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
/**
* 删除
* @param id
*/
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
//指定发送的交换机和routing_key 将消息发送到路由
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
}
2.4.生产者-监听MQ中消息
在hotel-demo中完成消息监听,并更新elasticsearch中数据
2.4.1.在创建方法监听队列中的信息
在MQ包下创建HotelServiceListener,代码如下:
@Component
public class HotelServiceListener {
@Autowired
private IHotelService hotelService;
/**
* 监听新增和修改的队列
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpDate(Long id){
hotelService.insertById(id);
}
/**
* 监听删除的队列
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
2.4.2.在service层准备接口
上面在监听的时候,需要去调用hotelService.deleteById(id)和 hotelService.insertById(id);需要再service层创建,在IHotelService中创建添加接口
void insertById(Long id);
void deleteById(Long id);
2.4.2.实现接口
在HotelService中实现上面的两个接口,如下:
@Override
public void insertById(Long id) {
try {
//根据ID查询酒店数据
Hotel hotel = getById(id);
//转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
//准备request对象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
//准备JSON文档
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
//发送请求
client.index(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void deleteById(Long id) {
try {
//准备request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
//发送请求
client.delete(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
2.5.测试数据同步
重启消息的接收方(Hotel-demo),发送方(Hotel-admin),查看MQ:出现了上面设置的对列、交换机、绑定关系
打开发送方(Hotel-admin)的页面,尝试修改数据
然后在接收方(Hotel-demo)的搜索功能中查看商品价格,已经发送了变化
删除数据之前,可以利用vue插件将数据复制出来,后期在添加进去即可,如下:
删除后,后续还需要,也可以添加回来,如下:
效果如下: