SpringBoot Netty socket使用
  Y8XIq1u6ceQW 2023年11月02日 26 0

SpringBoot Netty socket使用

Netty是由JBOSS提供的一个java开源框架,现为Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

这里springBoot整合起来使用测试,性能怎么的不怎么了解,至少能用

maven引用依赖

<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.42.Final</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>

配置

这里测试看看启动两个socket server,所以两个端口

nettysocket:
    port:  32768
    port2: 32769

Netty 服务

MySoketListener

package com.ld.test.socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

@Component
//@WebListener
public class MySoketListener implements ServletContextListener {
    private static final Logger log = LoggerFactory.getLogger(MySoketListener.class);

    @Value("${nettysocket.port}")
    private Integer port;

    @Value("${nettysocket.port2}")
    private Integer port2;

    private MyTestNettyServer myTestNettyServer;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        log.info("1.启动时,开启监听========================");
        if (myTestNettyServer == null) {
            log.info("2.启动时,MyTestNettyServer为null,启动Netty socket服务========================");

            log.info("=====MyTestNettyServer 端口为:" + port);           

            Thread thread = new Thread(new MyTestNettyServer(port));
            thread.start();

            //启动了别一个服务后 也是能共用ServerHandler.ChannelGroup进行统一对多个端进行转发消息
            //这里注意:不能使用在MyTestNettyServer 使用static 静态端口号,会被覆盖的
            //可以再启动一个服务
            log.info("=====MyTestNettyServer 端口为:" + port2);
            Thread thread2 = new Thread(new MyTestNettyServer(port2));
            thread2.start();

        }
    }

    // 应用关闭时,此方法被调用
    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        log.info("23========================");
    }

}

MyTestNettyServer

package com.ld.test.socket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyTestNettyServer implements Runnable {
    /**
     * 异常输出
     */
    private static final Logger log = LoggerFactory.getLogger(MyTestNettyServer.class);
    private static Integer DEFAULT_PORT = 58765;
    private Integer port = DEFAULT_PORT;
    private String serverName = "";


    //默认
    public MyTestNettyServer() {
        port = DEFAULT_PORT;
        serverName = "srv(" + port + ")";
    }

    public MyTestNettyServer(Integer port) {
        this.port = port;
        serverName = "srv(" + this.port + ")";
    }

    /**
     * soket监听
     */
    // public static void soketListener() {
    public void soketListener() {
        log.info(serverName + "当前SOCKET NettyServer 端口为: port=" + port);
        Integer portNow = port;
        EventLoopGroup bossExecutors = new NioEventLoopGroup();
        EventLoopGroup workExecutors = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossExecutors, workExecutors)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 65535, 65535))
                    .childHandler(new ServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(portNow).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("NettyServer 端口为:" + portNow + " 启动出现异常, 异常内容为:" + e.getMessage() + "========================");

        } finally {
            log.error("NettyServer 服务关闭========================");
            bossExecutors.shutdownGracefully();
            workExecutors.shutdownGracefully();
        }

    }

    /**
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        log.info("多线程启动Netty sserver========================");
        // CallNettyServer.soketListener();
        this.soketListener();
    }


}

ServerHandler

package com.ld.test.socket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;


public class ServerHandler extends SimpleChannelInboundHandler<String> {
    /**
     * 日志输出
     */
    private static final Logger log = LoggerFactory.getLogger(MyTestNettyServer.class);
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    // private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    // private ConcurrentMap<String,T> socketConnectionMap = new ConcurrentHashMap<>();

    //存放 key=channel 及value=DF
    private static ConcurrentMap<String, String> channelMap = new ConcurrentHashMap<>();


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        System.out.println("====");
        System.out.println("channelMap客户端:" + JSON.toJSONString(channelMap));

        System.out.println("====");
        System.out.println(msg);
        System.out.println("====");


    }

    //给http 控制器调用的 转发消息的方法,通过 channelGroup缓存的Channel进行转发
    public static void sendMessage(String msg) throws Exception {
        JSONObject retJson = new JSONObject();
        retJson.put("code", "9999");
        retJson.put("message", "-");
        try {
            System.out.println("===================");
            System.out.println("socet sendMessage收到报文");
            System.out.println(msg);
            System.out.println("===================");

        } catch (Exception e) {
            System.out.println("转换信息出错,收到内容:" + msg);
            e.printStackTrace();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "channelActive");

        //统一给所有客户端发送一个上线通知
        //  channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经上线channelActive"+"\n");

        channelGroup.add(channel);
        System.out.println("当前socket连接数 channelGroup.size():" + channelGroup.size());

        //
        // channelMap.put(channel.id().asLongText(),"13001");

    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "-----channelInactive");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered");
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "handlerAdded");
        // channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经上线"+"\n");
        channelGroup.add(channel);
        //channelMap.put(channel.id().asLongText(),"1001");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        Channel channel = ctx.channel();

        System.out.println(channel.remoteAddress() + "handlerRemoved");

        channelMap.remove(channel.id().asLongText());

        // channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经下线"+"\n");
        System.out.println("当前socket连接数channelGroup.size():" + channelGroup.size());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


ServerInitializer

package com.ld.test.socket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new ServerHandler());
    }
}

SpringBoot服务

package com.ld.test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
@SpringBootApplication
@ServletComponentScan
public class NettySocketApplication {
    public static void main(String[] args) {
        SpringApplication.run(NettySocketApplication.class, args);
    }
}

启动应用

SpringBoot Netty socket使用_Netty

测试

package com.ld.test.test;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.Socket;
import java.util.UUID;

@Slf4j
public class SocketClient1 {
    private static Logger log = LoggerFactory.getLogger(SocketClient1.class);

    public static void main(String[] args) throws IOException {
        // String host = "192.168.2.156";
        // int port = 8068;

        String host = "127.0.0.1";
        int port = 32768;


        //与服务端建立连接
        Socket socket = new Socket(host, port);
        socket.setOOBInline(true);

        //建立连接后获取输出流
        DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
        OutputStreamWriter outSW = new OutputStreamWriter(outputStream, "UTF-8");
        BufferedWriter bw = new BufferedWriter(outSW);


        DataInputStream inputStream = new DataInputStream(socket.getInputStream());
        InputStreamReader inSR = new InputStreamReader(inputStream, "UTF-8");
        BufferedReader br = new BufferedReader(inSR);


        String uuid = UUID.randomUUID().toString();
        uuid = "用户1" + "\r\n";
        log.info("uuid: {}", uuid);
        outputStream.write(uuid.getBytes());
        while (true) {
            //接收消息循环
        }

    }
}

SpringBoot Netty socket使用_Netty_02

SpringBoot Netty socket使用_SpringBoot_03

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
Y8XIq1u6ceQW