数据同步
  VxbKCNpUI3P6 2023年11月30日 21 0

一、数据同步思路分析

1.1.数据同步问题分析

elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elastic search与mysql之间的数据同步。

问题:在微服务中,负责酒店管理(操作mysql)的业务与负责酒店搜索(操作elastic search)的业务可能在两个不同的微服务上,数据同步该如何实现呢?

MySQL中的数据并把不是一成不变的,MySQL变了,ES也需要变不仅仅ES有了数据同步问题,数据库双写的情况,Red is与MySQL也有数据同步问题这些问题如何解决呢?如果现在项目是一个单体的项目,所有的业务都写在一个项目当中,那就非常好办前我再写新增修改删除的业务时同时把ES索引库也更新了。但是我们的项目是微服务,不同的业务在不同的微服务上

1.2.数据同步解决方案

1.2.1.方案一:同步调用

数据同步_数据

同步调用存在问题:业务会出现耦合(MySQL完成CRUD操作的时候,需要调用hotel-demo中更新索引库的接口,这样会出现业务的耦合,但是实现起来比较简单),影响性能

1.2.2.方案二:异步通知(推荐)

数据同步_微服务_02

通过使用MQ解决了上面同步调用的问题,提升了性能,但是使用引入了MQ中间件,复杂性上升

1.2.3.方案三:监听binlog

数据同步_数据同步_03

MysQL里面有binloga默认是关闭的,一且开启,每当MysQL做增删改操作,都会将响应的操作记录在binlog当中,只要数据变化了,binlog就会变化,我们可以利用类似canal的中间件监听binlog,一且发生变化,就会通知对应的微服务,在这个方案中它完全依赖于这个中间件,监听MySQL跟酒店管理服务没有任何关系,完全解除了服务之间的耦合了,这种方案可降低耦合,但是他要开启MySQL的binlog,对MySQL来讲它的压力就增加了

二、实现elastic search与数据库数据同步

案例:利用MQ实现mysql与elasticsearch数据同步

利用hotel-admin项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。步骤:

  1. 导入hotel-admin项目,启动并测试酒店数据的CRUD
  2. 声明exchange、queue、Routing Key
  3. 在hotel-admin中的增、删、改业务中完成消息发送
  4. 在hotel-demo中完成消息监听,并更新elastic search中数据
  5. 启动并测试数据同步功能

实现jia如下:

数据同步_微服务_04

2.1.打开操作酒店管理的微服务完成增删改

调整数据库连接,然后重启项目,测试增、删、改就可以

数据同步_微服务_05

2.2.声明队列和交换机

在hotel-admin(elasticsearch)模块声明exchange、queue、RoutingKey

2.2.1.引入RabbitMQ队列

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.2.2.在配置文件中配置RabbitMQ的地址:

这里主要是application.yaml中配置连接信息

数据同步_数据_06

2.2.3.定义常量类

声明队列交换机、队列、RoutingKey:在constants包下定义一个常量类:

public class MqConstants {
    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";

    /**
     * 新增和修改的队列: 新增和修改用一个队列,传递过去id,如果id存在就是修改,不存在就是删除
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";

    /**
     * 删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";

    /**
     * 新增和删除的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert.update";

    /**
     * 删除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";

}

2.2.4.创建交换机、队列、然后进行绑定

在config包下创建MqConfig 基于Bean创建队列、交换机然后绑定,代码如下:

@Configuration
public class MqConfig {

    /**
     * 创建交换机:TopicExchange
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true,false);
    }

    /**
     * 创建新增、修改队列
     * @return
     */
    @Bean
    public Queue insertAndUpdateQueue(){
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
    }

    /**
     * 创建删除队列
     * @return
     */
    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
    }

    /**
     * 将新增、修改的队列和交换机进行绑定
     * @param insertAndUpdateQueue 修改、新增的队列
     * @param topicExchange 交换机
     * @return
     */
    @Bean
    public Binding insertAndUpdateQueueBinding(Queue insertAndUpdateQueue, TopicExchange topicExchange){
        return BindingBuilder.bind(insertAndUpdateQueue).to(topicExchange).with(MqConstants.HOTEL_INSERT_KEY);
    }

    /**
     * 将删除的队列和交换机进行绑定
     * @param deleteQueue 删除的队列
     * @param topicExchange 交换机
     * @return
     */
    @Bean
    public Binding deleteQueueBinding(Queue deleteQueue, TopicExchange topicExchange){
        return BindingBuilder.bind(deleteQueue).to(topicExchange).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

2.3.发送mq消息

这里发送消息要利用生产者也就是hotel-admin模块完成在新增、修改和删除操作后,发送消息到代理的操作,如下:

2.3.1.导入RabbitMQ依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.3.2.复制在之前模块创建的常量类

在hotel-admin模块中加入上面写的常量类:

public class MqConstants {
    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";

    /**
     * 新增和修改的队列: 新增和修改用一个队列,传递过去id,如果id存在就是修改,不存在就是删除
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";

    /**
     * 删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";

    /**
     * 新增和删除的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert.update";

    /**
     * 删除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";

}

2.3.3.配置RabbitMQ的地址:

数据同步_数据同步_07

2.3.4.编写消息发送的代码

在消息管理模块(消息的发送者),当新增一条消息后,需告诉Hotel-demo(消息的接受者)说消息新增了,调用RabbitTemplate发送这条消息:第一个参数交换机、第二个队列、第三个内容,发送的消息为这个对象的id,不发送这个对象,来减少MQ内存的消耗。

修改HotelController 中对于酒店信息增、删、改的方法添加往rabbitMQ发送消息的代码如下:

@RestController
@RequestMapping("hotel")
public class HotelController {

    @Autowired
    private IHotelService hotelService;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{id}")
    public Hotel queryById(@PathVariable("id") Long id){
        return hotelService.getById(id);
    }

    @GetMapping("/list")
    public PageResult hotelList(
            @RequestParam(value = "page", defaultValue = "1") Integer page,
            @RequestParam(value = "size", defaultValue = "1") Integer size
    ){
        Page<Hotel> result = hotelService.page(new Page<>(page, size));

        return new PageResult(result.getTotal(), result.getRecords());
    }

    /**
     * 新增
     * @param hotel
     */
    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        hotelService.save(hotel);
        //指定发送的交换机和routing_key 将消息发送到路由
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    }

    /**
     * 修改
     * @param hotel
     */
    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);

        //指定发送的交换机和routing_key 将消息发送到路由
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    }

    /**
     * 删除
     * @param id
     */
    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        //指定发送的交换机和routing_key 将消息发送到路由
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
    }
}

2.4.生产者-监听MQ中消息

在hotel-demo中完成消息监听,并更新elasticsearch中数据

2.4.1.在创建方法监听队列中的信息

在MQ包下创建HotelServiceListener,代码如下:

@Component
public class HotelServiceListener {
    @Autowired
    private IHotelService hotelService;

    /**
     * 监听新增和修改的队列
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpDate(Long id){
        hotelService.insertById(id);
    }

    /**
     * 监听删除的队列
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id){
        hotelService.deleteById(id);
    }
}

2.4.2.在service层准备接口

上面在监听的时候,需要去调用hotelService.deleteById(id)和 hotelService.insertById(id);需要再service层创建,在IHotelService中创建添加接口

void insertById(Long id);

    void deleteById(Long id);

2.4.2.实现接口

在HotelService中实现上面的两个接口,如下:

@Override
    public void insertById(Long id) {
        try {
            //根据ID查询酒店数据
            Hotel hotel = getById(id);
            //转换为文档类型
            HotelDoc hotelDoc = new HotelDoc(hotel);

            //准备request对象
            IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());

            //准备JSON文档
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            //发送请求
            client.index(request,RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void deleteById(Long id) {
        try {
            //准备request
            DeleteRequest request = new DeleteRequest("hotel", id.toString());
            //发送请求
            client.delete(request,RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

2.5.测试数据同步

重启消息的接收方(Hotel-demo),发送方(Hotel-admin),查看MQ:出现了上面设置的对列、交换机、绑定关系

数据同步_数据_08

打开发送方(Hotel-admin)的页面,尝试修改数据

数据同步_数据_09

然后在接收方(Hotel-demo)的搜索功能中查看商品价格,已经发送了变化

数据同步_微服务_10

删除数据之前,可以利用vue插件将数据复制出来,后期在添加进去即可,如下:

数据同步_微服务_11

删除后,后续还需要,也可以添加回来,如下:

数据同步_数据_12

效果如下:

数据同步_数据同步_13



【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月30日 0

暂无评论

推荐阅读
VxbKCNpUI3P6