SpringBoot整合Netty
前言
Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高并发协议服务器和客户端。
Netty主要基于Java NIO实现,提供了异步和事件驱动的网络编程工具,简化了TCP和UDP服务器的编程。
Netty广泛应用于分布式系统、实时通信、游戏开发等领域,例如,知名的Elasticsearch和Dubbo框架内部都采用了Netty。
Netty吸收了多种协议的实现经验,经过精心设计,能够在保证易于开发的同时,确保应用的性能、稳定性和伸缩性。
Netty的优势
- 高性能:Netty 采用异步的、事件驱动的模型,基于 Java NIO 提供了非阻塞的 I/O 操作,能够处理大量的并发连接。它使用了高效的线程模型和内存管理策略,以实现更好的性能和吞吐量。
- 简单易用:Netty 提供了简洁、一致的 API,使开发者能够更快地构建和维护网络应用程序。它的设计注重可读性和可维护性,提供了丰富的功能组件和工具,简化了网络编程的复杂性。
- 异步和事件驱动:Netty 基于事件驱动的模型,通过注册感兴趣的事件和回调机制来处理请求和响应。它支持异步的、非阻塞的 I/O 操作,可以处理大量并发连接,提高了系统的响应能力和吞吐量。
- 灵活性和可扩展性:Netty 提供了灵活的、可配置的组件和扩展点,使开发者可以根据应用程序的需求进行定制和扩展。它支持多种协议和传输方式,包括 TCP、UDP、HTTP 等,可以适应不同的应用场景和需求。
- 完善的协议支持:Netty 提供了丰富的协议编解码器和处理器,包括常用的网络协议(如 HTTP、WebSocket、SMTP 等),使开发者可以更方便地处理和解析不同的协议数据。
- 成熟和广泛应用:Netty 是一个成熟的框架,已经被广泛应用于许多大型互联网公司和开源项目中。它在性能、稳定性和可靠性方面经过了大量实践和验证,被广泛认可。
使用场景
Netty 是一个异步事件驱动的网络应用框架,用于快速开发高性能、高可靠性的网络 IO 程序。
以下是一些常见的使用场景:
- 服务器与客户端之间的通信:Netty 可以用来开发客户端和服务器,或者服务器与服务器之间的通信。
- 远程服务调用:RPC 框架(如 Dubbo)的基础通信组件,可以使用 Netty 进行数据的传输。
- 实时通信:Netty 可以用来实现即时通信系统,例如聊天室、在线游戏等。
- 数据传输:Netty 可以用于数据的传输,如 Hadoop 的数据传输、大数据传输等。
- HTTP 服务器:Netty 可以用来开发 HTTP 服务器,处理 HTTP 请求。
- 框架与工具:Netty 提供了一些可以直接使用的工具和框架,如 Netty-SocketIO 可以用来开发 Web 应用。
Netty的核心组件
- Bootstrap、ServerBootstrap(启动器):客户端和服务端的启动引导程序
- Channel(通道):数据传送的通道,Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 等。常见的 Channel 类型有以下几个:
- NioServerSocketChannel 异步 TCP 服务端。
- NioSocketChannel 异步 TCP 客户端。
- OioServerSocketChannel 同步 TCP 服务端。
- OioSocketChannel 同步 TCP 客户端。
- NioDatagramChannel 异步 UDP 连接。
- OioDatagramChannel 同步 UDP 连接。
- ChannelPipeline(通道管道):为 ChannelHandler 链提供了容器,当 channel 创建时,就会被自动分配到它专属的 ChannelPipeline,这个关联是永久性的。
- ChannelHandler(通道处理器):Channel中有相关事件发生的时候会触发执行,充当了所有处理入站和出站数据的逻辑容器。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。ChannelHandler分为ChannelInboundHandler和ChannelOutboundHandler。ChannelInboundHandler负责读取数据时处理数据。ChannelOutboundHandler负责在写入数据时处理数据。(开发人员主要是对这个进行定制开发)
- EventLoopGroup、EventLoop(事件循环器):Io异步执行的任务队列和线程池,主要是配合 Channel workGroup线程组负责处理 I/O 操作以及业务逻辑,bossGroup线程组用来处理接收客户端的请求连接。一个EventLoopGroup包含多个EventLoop;每一个EventLoop包含1个selector和1个事件循环线程
- ChannelFuture():Io任务执行以后未来的返回结果。Netty 框架中所有的 I/O 操作都为异步的,因此我们需要 ChannelFuture 的 addListener()注册一个 ChannelFutureListener 监听事件,当操作执行成功或者失败时,监听就会自动触发返回结果。
- ByteBuf: 字节缓冲区,是Netty处理字节的核心类,支持引用计数和池化。
- Codec: 编解码器,用于转换消息从POJO到ByteBuf或者反过来。
- ChannelHandlerContext:ChannelHandler相关联的上下文信息对象,封装了pipeline和ChannelHandler。
流程图
整合案例
- 创建一个spring boot工程
- pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.SpringBoot</groupId> <artifactId>SpringBoot</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>com.example.netty</groupId> <artifactId>SpringBoot_Netty</artifactId> <version>0.0.1-SNAPSHOT</version> <name>SpringBoot_Netty</name> <description>SpringBoot_Netty</description> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.6.13</spring-boot.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.100.Final</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>${spring-boot.version}</version> <configuration> <mainClass>com.example.netty.springboot_netty.SpringBootNettyApplication</mainClass> <skip>true</skip> </configuration> <executions> <execution> <id>repackage</id> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
- yaml文件
server: port: 8080 netty: port: 8081
- Netty服务端
- ChannelInitializer
package com.example.netty; import com.example.netty.handler.ChatHandler; import com.example.netty.handler.NettyServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.stream.ChunkedWriteHandler; /** * @Program: SpringBoot * @ClassName WsServerInitializer * @Author: liutao * @Description: * @Create: 2024-03-07 10:54 * @Version 1.0 **/ public class WsServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel channel) throws Exception { //通过SocketChannel获取对应的pipeline管道 ChannelPipeline pipeline = channel.pipeline(); //websocket基于http协议,所以需要http编解码器 pipeline.addLast(new HttpServerCodec()); //添加对于读写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //对httpMessage进行聚合 pipeline.addLast(new HttpObjectAggregator(1024*64)); // ================= 上述是用于支持http协议的 ============== // ================= 下面是用于支持websocket的 ============== //websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址 //比如处理一些握手动作(ping,pong) pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); //自定义handler pipeline.addLast(new ChatHandler()); } }
- 核心 通道处理器
package com.example.netty.handler; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; /** * @Program: SpringBoot * @ClassName ChatHandler * @Author: liutao * @Description: * @Create: 2024-03-07 10:46 * @Version 1.0 **/ @Slf4j public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { /** * 用于记录和管理所有客户端的channel **/ private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * @MethodName: count * @description: 在线用户人数 **/ public static int count() { return clients.size(); } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { log.info("客户端的远程地址是:"+ ctx.channel().remoteAddress()); //客户端传递过来的消息 String content = msg.text(); log.info("接收到了客户端的消息是:" + content); //将客户端发送过来的消息刷到所有的channel中 for (Channel channel : clients) { channel.writeAndFlush(new TextWebSocketFrame("[服务器接收到了客户端的消息:]" + LocalDateTime.now() + ",消息为:" + content)); } } //客户端创建的时候触发,当客户端连接上服务端之后,就可以获取该channel,然后放到channelGroup中进行统一管理 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { clients.add(ctx.channel()); log.info("当前channelGroup中包含的channel的数量是:" + clients.size()); log.info("客户端连接,当前被添加的channel的短ID是:" + ctx.channel().id().asShortText()); } //客户端销毁的时候触发, @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //当handlerRemoved 被触发时候,channelGroup会自动移除对应的channel log.info("客户端断开,当前被移除的channel的短ID是:" + ctx.channel().id().asShortText()); clients.remove(ctx.channel()); ctx.close(); log.info("当前channelGroup中包含的channel的数量是:" + clients.size()); } /*** * @MethodName: exceptionCaught * @description: 发生异常关闭当前通道 * @Author: LiuTao * @Param: [ctx, cause] * @UpdateTime: 2024/3/7 14:29 * @Return: void * @Throw: **/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } }
- 绑定springboot应用启动时同时启用netty服务端
package com.example.netty.config; import com.example.netty.TcpServer; import com.example.netty.WebSocketServer; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; /** * @Program: SpringBoot * @ClassName NettyBootServerInitConfig * @Author: liutao * @Description: * @Create: 2024-03-07 10:42 * @Version 1.0 **/ @Component public class NettyBootServerInitConfig implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext().getParent() == null) { WebSocketServer.getInstance().start(); } } }
测试
- 启动服务
- 打开apipost新建websocket连接 并连接后端ws服务
- 发送消息给服务端
结尾
最后希望小伙伴 点赞 关注 分享三连!!
由于涛哥设备更换接下来会讲解 Mac电脑从零到一的配置搭建和设备设置调整和shell工具优化
阅读剩余
版权声明:
作者:涛哥
链接:https://ltbk.net/back/spring_family/spring-boot/article/1773.html
文章版权归作者所有,未经允许请勿转载。
作者:涛哥
链接:https://ltbk.net/back/spring_family/spring-boot/article/1773.html
文章版权归作者所有,未经允许请勿转载。
THE END