SpringBoot整合Netty

前言

Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高并发协议服务器和客户端。

Netty主要基于Java NIO实现,提供了异步和事件驱动的网络编程工具,简化了TCP和UDP服务器的编程。

Netty广泛应用于分布式系统、实时通信、游戏开发等领域,例如,知名的Elasticsearch和Dubbo框架内部都采用了Netty。

Netty吸收了多种协议的实现经验,经过精心设计,能够在保证易于开发的同时,确保应用的性能、稳定性和伸缩性。

Netty的优势

  1. 高性能:Netty 采用异步的、事件驱动的模型,基于 Java NIO 提供了非阻塞的 I/O 操作,能够处理大量的并发连接。它使用了高效的线程模型和内存管理策略,以实现更好的性能和吞吐量。
  2. 简单易用:Netty 提供了简洁、一致的 API,使开发者能够更快地构建和维护网络应用程序。它的设计注重可读性和可维护性,提供了丰富的功能组件和工具,简化了网络编程的复杂性。
  3. 异步和事件驱动:Netty 基于事件驱动的模型,通过注册感兴趣的事件和回调机制来处理请求和响应。它支持异步的、非阻塞的 I/O 操作,可以处理大量并发连接,提高了系统的响应能力和吞吐量。
  4. 灵活性和可扩展性:Netty 提供了灵活的、可配置的组件和扩展点,使开发者可以根据应用程序的需求进行定制和扩展。它支持多种协议和传输方式,包括 TCPUDPHTTP 等,可以适应不同的应用场景和需求。
  5. 完善的协议支持:Netty 提供了丰富的协议编解码器和处理器,包括常用的网络协议(如 HTTPWebSocketSMTP ),使开发者可以更方便地处理和解析不同的协议数据。
  6. 成熟和广泛应用:Netty 是一个成熟的框架,已经被广泛应用于许多大型互联网公司和开源项目中。它在性能、稳定性和可靠性方面经过了大量实践和验证,被广泛认可。

使用场景

Netty 是一个异步事件驱动的网络应用框架,用于快速开发高性能、高可靠性的网络 IO 程序。

以下是一些常见的使用场景:

  1. 服务器与客户端之间的通信:Netty 可以用来开发客户端和服务器,或者服务器与服务器之间的通信。
  2. 远程服务调用:RPC 框架(如 Dubbo)的基础通信组件,可以使用 Netty 进行数据的传输。
  3. 实时通信:Netty 可以用来实现即时通信系统,例如聊天室、在线游戏等。
  4. 数据传输:Netty 可以用于数据的传输,如 Hadoop 的数据传输、大数据传输等。
  5. HTTP 服务器:Netty 可以用来开发 HTTP 服务器,处理 HTTP 请求。
  6. 框架与工具:Netty 提供了一些可以直接使用的工具和框架,如 Netty-SocketIO 可以用来开发 Web 应用。

Netty的核心组件

  1. Bootstrap、ServerBootstrap(启动器):客户端和服务端的启动引导程序
  2. Channel(通道):数据传送的通道,Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 等。常见的 Channel 类型有以下几个:
    • NioServerSocketChannel 异步 TCP 服务端。
    • NioSocketChannel 异步 TCP 客户端。
    • OioServerSocketChannel 同步 TCP 服务端。
    • OioSocketChannel 同步 TCP 客户端。
    • NioDatagramChannel 异步 UDP 连接。
    • OioDatagramChannel 同步 UDP 连接。
  3. ChannelPipeline(通道管道):为 ChannelHandler 链提供了容器,当 channel 创建时,就会被自动分配到它专属的 ChannelPipeline,这个关联是永久性的。
  4. ChannelHandler(通道处理器):Channel中有相关事件发生的时候会触发执行,充当了所有处理入站和出站数据的逻辑容器。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。ChannelHandler分为ChannelInboundHandler和ChannelOutboundHandler。ChannelInboundHandler负责读取数据时处理数据。ChannelOutboundHandler负责在写入数据时处理数据。(开发人员主要是对这个进行定制开发)
  5. EventLoopGroup、EventLoop(事件循环器):Io异步执行的任务队列和线程池,主要是配合 Channel workGroup线程组负责处理 I/O 操作以及业务逻辑,bossGroup线程组用来处理接收客户端的请求连接。一个EventLoopGroup包含多个EventLoop;每一个EventLoop包含1个selector和1个事件循环线程
  6. ChannelFuture():Io任务执行以后未来的返回结果。Netty 框架中所有的 I/O 操作都为异步的,因此我们需要 ChannelFuture 的 addListener()注册一个 ChannelFutureListener 监听事件,当操作执行成功或者失败时,监听就会自动触发返回结果。
  7. ByteBuf: 字节缓冲区,是Netty处理字节的核心类,支持引用计数和池化。
  8. Codec: 编解码器,用于转换消息从POJO到ByteBuf或者反过来。
  9. ChannelHandlerContext:ChannelHandler相关联的上下文信息对象,封装了pipeline和ChannelHandler。

流程图

整合案例

  1. 创建一个spring boot工程
  2. pom.xml文件
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.SpringBoot</groupId>
            <artifactId>SpringBoot</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
        <groupId>com.example.netty</groupId>
        <artifactId>SpringBoot_Netty</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>SpringBoot_Netty</name>
        <description>SpringBoot_Netty</description>
        <properties>
            <java.version>1.8</java.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <spring-boot.version>2.6.13</spring-boot.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.100.Final</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-dependencies</artifactId>
                    <version>${spring-boot.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring-boot.version}</version>
                    <configuration>
                        <mainClass>com.example.netty.springboot_netty.SpringBootNettyApplication</mainClass>
                        <skip>true</skip>
                    </configuration>
                    <executions>
                        <execution>
                            <id>repackage</id>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
  3. yaml文件
    server:
      port: 8080
    netty:
      port: 8081
  4. Netty服务端
  5. ChannelInitializer
    package com.example.netty;
    
    import com.example.netty.handler.ChatHandler;
    import com.example.netty.handler.NettyServerHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    /**
     * @Program: SpringBoot
     * @ClassName WsServerInitializer
     * @Author: liutao
     * @Description:
     * @Create: 2024-03-07 10:54
     * @Version 1.0
     **/
    
    
    public class WsServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel channel) throws Exception {
            //通过SocketChannel获取对应的pipeline管道
            ChannelPipeline pipeline = channel.pipeline();
            //websocket基于http协议,所以需要http编解码器
            pipeline.addLast(new HttpServerCodec());
            //添加对于读写大数据流的支持
            pipeline.addLast(new ChunkedWriteHandler());
            //对httpMessage进行聚合
            pipeline.addLast(new HttpObjectAggregator(1024*64));
    
            // ================= 上述是用于支持http协议的 ==============
            // ================= 下面是用于支持websocket的 ==============
            //websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址
            //比如处理一些握手动作(ping,pong)
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
            //自定义handler
            pipeline.addLast(new ChatHandler());
        }
    }
    
  6. 核心 通道处理器
    package com.example.netty.handler;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.util.concurrent.GlobalEventExecutor;
    import lombok.extern.slf4j.Slf4j;
    
    import java.time.LocalDateTime;
    
    /**
     * @Program: SpringBoot
     * @ClassName ChatHandler
     * @Author: liutao
     * @Description:
     * @Create: 2024-03-07 10:46
     * @Version 1.0
     **/
    
    @Slf4j
    public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        /**
         * 用于记录和管理所有客户端的channel
         **/
        private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        /**
         * @MethodName: count
         * @description: 在线用户人数
         **/
    
        public static int count() {
            return clients.size();
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
            log.info("客户端的远程地址是:"+ ctx.channel().remoteAddress());
            //客户端传递过来的消息
            String content = msg.text();
            log.info("接收到了客户端的消息是:" + content);
    
            //将客户端发送过来的消息刷到所有的channel中
            for (Channel channel : clients) {
                channel.writeAndFlush(new TextWebSocketFrame("[服务器接收到了客户端的消息:]" + LocalDateTime.now() + ",消息为:" + content));
            }
        }
    
        //客户端创建的时候触发,当客户端连接上服务端之后,就可以获取该channel,然后放到channelGroup中进行统一管理
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            clients.add(ctx.channel());
            log.info("当前channelGroup中包含的channel的数量是:" + clients.size());
            log.info("客户端连接,当前被添加的channel的短ID是:" + ctx.channel().id().asShortText());
        }
    
        //客户端销毁的时候触发,
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            //当handlerRemoved 被触发时候,channelGroup会自动移除对应的channel
            log.info("客户端断开,当前被移除的channel的短ID是:" + ctx.channel().id().asShortText());
            clients.remove(ctx.channel());
            ctx.close();
            log.info("当前channelGroup中包含的channel的数量是:" + clients.size());
        }
        /***
         * @MethodName: exceptionCaught
         * @description: 发生异常关闭当前通道
         * @Author: LiuTao
         * @Param: [ctx, cause]
         * @UpdateTime: 2024/3/7 14:29
         * @Return: void
         * @Throw:
         **/
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
        }
    }
    
  7. 绑定springboot应用启动时同时启用netty服务端
    package com.example.netty.config;
    
    import com.example.netty.TcpServer;
    import com.example.netty.WebSocketServer;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.stereotype.Component;
    
    /**
     * @Program: SpringBoot
     * @ClassName NettyBootServerInitConfig
     * @Author: liutao
     * @Description:
     * @Create: 2024-03-07 10:42
     * @Version 1.0
     **/
    
    @Component
    public class NettyBootServerInitConfig implements ApplicationListener<ContextRefreshedEvent> {
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            if (event.getApplicationContext().getParent() == null) {
                WebSocketServer.getInstance().start();
            }
        }
    
    
    }  

测试

  1. 启动服务
  2. 打开apipost新建websocket连接 并连接后端ws服务
  3. 发送消息给服务端

结尾

最后希望小伙伴 点赞 关注 分享三连!!

由于涛哥设备更换接下来会讲解 Mac电脑从零到一的配置搭建和设备设置调整和shell工具优化

阅读剩余
THE END