Netty使用CompletableFuture实现异步串行队列
  19qMgiCiiRfc 2023年12月23日 70 0

一、前言

CompletableFuture是JDK1.8提供的一种更加强大的异步编程的api。它实现了Future接口,也就是Future的功能特性CompletableFuture也有。它也实现了CompletionStage接口,CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。

CompletableFuture相比于Future最大的改进就是提供了类似观察者模式的回调监听的功能,也就是当上一阶段执行结束后,可以回调你指定的下一阶段任务,而不是阻塞获取结果之后来处理结果。


二、常用方法

1.异步操作

方法

说明

runAsync(Runnable runnable)


接收Runnable实例,没有返回值

runAsync(Runnable runnable, Executor executor);

接收Runnable实例,指定线程池,没有返回值

supplyAsync(Supplier<U> supplier);

有返回值

supplyAsync(Supplier<U> supplier, Executor executor)

有返回值,指定线程池

2.依赖关系

方法

说明

thenApply()

把前面任务的执行结果,交给后面的Function

thenCompose()

thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例。

三、问题产生

我们在上一章的基础上继续学习。https://blog.51cto.com/u_13312531/8879299

我们修改上一章节的Netty初始化器,识别ffee开头、efcd结尾的数据。

package com.example.mynetty.netty;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2023/12/19
 * @des
 */
@Component
public class NettyInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        // 基于分隔符的解码器  ffee开头 efcd结尾 提供分隔符解析报文 128表示单条消息的最大长度,解码器查找分隔符的时候,达到该长度没有找到会抛出异常。
        pipeline.addLast(
                new DelimiterBasedFrameDecoder(128, Unpooled.copiedBuffer(new byte[]{(byte) 0xff, (byte) 0xee}),
                        Unpooled.copiedBuffer(new byte[]{(byte) 0xef, (byte) 0xcd})));

        pipeline.addLast(new ByteArrayDecoder());
        pipeline.addLast(new ByteArrayEncoder());

        pipeline.addLast("handler", new MessageHandler());
    }
}

创建一个异步接收数据服务类。

package com.example.mynetty.service;

import cn.hutool.core.util.HexUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @author qx
 * @date 2023/12/19
 * @des 异步服务类
 */
@Service("taskService")
public class TaskService {

    @Async
    public void syncTask(byte[] data) {
        String s = HexUtil.encodeHexStr(data);
        System.out.println("接收的数据:" + s);
    }
}

由于我们使用了@Async,所以需要在启动类上开启异步注解。

package com.example.mynetty;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class MyNettyApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyNettyApplication.class, args);
    }

}

创建一个获取Spring管理对象的工具类

package com.example.mynetty.utils;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class BeanUtil implements ApplicationListener<ContextRefreshedEvent> {
    /**
     * 上下文对象实例
     */
    private static ApplicationContext ctx;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        log.info("ContextRefreshedEvent");
        ctx = contextRefreshedEvent.getApplicationContext();
    }

    //获取applicationContext
    public static ApplicationContext getApplicationContext() {
        return ctx;
    }


    //通过name,以及Clazz返回指定的Bean
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }

}

修改消息处理器,我们在答复后使用异步接收数据,数据用于后续数据库保存等操作。

package com.example.mynetty.netty;

import com.example.mynetty.service.TaskService;
import com.example.mynetty.utils.BeanUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

/**
 * @author qx
 * @date 2023/12/19
 * @des 处理客户端的消息
 */
@Slf4j
public class MessageHandler extends SimpleChannelInboundHandler<Object> {

    //CompletableFuture<Void> future = CompletableFuture.completedFuture(null);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
        byte[] result = (byte[]) o;
        //log.info("接收到消息:{}", Arrays.toString(result));
        // 答复
        ctx.writeAndFlush(result);

        // 开启异步任务接收数据
        TaskService taskService = BeanUtil.getBean("taskService", TaskService.class);
        taskService.syncTask(result);


        /*future = future.thenCompose(unused -> CompletableFuture.runAsync(()->{
            TaskService taskService = BeanUtil.getBean("taskService", TaskService.class);
            taskService.syncTask(result);
        }));*/
    }
}

测试:

我们创建了10条带顺序的数据,一起发送给Netty接收。

Netty使用CompletableFuture实现异步串行队列_Netty

我们发现网络调试工具是正常按顺序接收到数据的。

但是我们控制台的日志显示接收到的数据的顺序是混乱的。这种情况我们应该避免的,因为有些操作是按顺序执行下去的,我们获取数据的顺序混乱的话是得不到正确操作的。

接收的数据:
接收的数据:001e040227061f061f0a0a0a0a00000800000006000101016dbf
接收的数据:001e040227061f041f0a0a0a0a0000080000000400010101cd67
接收的数据:001e040227061f081f0a0a0a0a0000080000000800010101cfb2
接收的数据:
接收的数据:001e040227061f091f0a0a0a0a00000800000009000101019fde
接收的数据:
接收的数据:001e040227061f071f0a0a0a0a00000800000007000101013dd3
接收的数据:
接收的数据:001e040227061f051f0a0a0a0a00000800000005000101019d0b
接收的数据:
接收的数据:001e040227061f0c1f0a0a0a0a0000080000000c00010101ce01
接收的数据:
接收的数据:001e040227061f0b1f0a0a0a0a0000080000000b000101013f06
接收的数据:
接收的数据:001e040227061f0a1f0a0a0a0a0000080000000a000101016f6a
接收的数据:
接收的数据:
接收的数据:001e040227061f0d1f0a0a0a0a0000080000000d000101019e6d
接收的数据:

四、问题解决

我们使用CompletableFuture让我们的任务异步串行化,也就是任务编排下去。

我们修改消息处理器。

package com.example.mynetty.netty;

import com.example.mynetty.service.TaskService;
import com.example.mynetty.utils.BeanUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;

/**
 * @author qx
 * @date 2023/12/19
 * @des 处理客户端的消息
 */
@Slf4j
public class MessageHandler extends SimpleChannelInboundHandler<Object> {

    CompletableFuture<Void> future = CompletableFuture.completedFuture(null);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
        byte[] result = (byte[]) o;
        //log.info("接收到消息:{}", Arrays.toString(result));
        // 答复
        ctx.writeAndFlush(result);


        // 异步串行化 runAsync不需要返回结果 thenCompose意思为一个任务执行完后再执行下一个任务
        future = future.thenCompose(unused -> CompletableFuture.runAsync(() -> {
            TaskService taskService = BeanUtil.getBean("taskService", TaskService.class);
            taskService.syncTask(result);
        }));
    }
}

去掉服务类中的异步。

package com.example.mynetty.service;

import cn.hutool.core.util.HexUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @author qx
 * @date 2023/12/19
 * @des 异步服务类
 */
@Service("taskService")
public class TaskService {

    public void syncTask(byte[] data) {
        // 使用hutool的工具方法解析字节数组数据为16进制数据
        String s = HexUtil.encodeHexStr(data);
        System.out.println("接收的数据:" + s);
    }
}

重启程序继续测试。

Netty使用CompletableFuture实现异步串行队列_CompletableFuture_02

我们查看控制台上的接收数据的日志显示的顺序也是和发送时的顺序一致。

Netty使用CompletableFuture实现异步串行队列_Netty_03

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年12月23日 0

暂无评论

19qMgiCiiRfc