SpringBoot+WebSocket实现即时通讯(J2EE方式)

本文最后更新于2023.04.22-17:59,某些文章具有时效性,若有错误或已失效,请在下方留言或联系涛哥

什么是websocket?

 WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

为什么有了HTTP协议还要WebSocket

HTTP协议采用的是客户端(浏览器)轮询的方式,即客户端发送请求,服务端做出响应,为了获取最新的数据,需要不断的轮询发出HTTP请求,占用大量带宽。
WebSocket采用了一些特殊的报头,使得浏览器和服务器只需要通过“握手”建立一条连接通道后,此链接保持活跃状态,之后的客户端和服务器的通信都使用这个连接,解决了Web实时性的问题,相比于HTTP有一下好处:

  • 一个Web客户端只建立一个TCP连接
  • WebSocket服务端可以主动推送(push)数据到Web客户端
  • 有更加轻量级的头,减少了数据传输量

特点

  1. 建立在TCP协议只上,服务端比较容易实现
  2. 于HTTP协议有良好的兼容性,默认端口也是80和443,握手阶段使用HTTP协议,因此握手时不容易屏蔽,能通过各种HTTP代理服务器
  3. 数据格式轻量,通信高效且节省带宽
  4. 支持传输文本数据和二进制数据
  5. 没有同源限制,客户端可以与任意服务器通信
  6. 也支持加密传输,WS+SSL,URL形如wss://

技术

  • jdk8
  • maven
  • SpringBoot2.6.11
  • websocket
  • fastjosn

实现

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.11</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.websocket</groupId>
    <artifactId>springboot_websocket</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot_websocket</name>
    <description>springboot_websocket</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

websocket核心配置

package com.websocket.springboot_websocket.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @Program: springboot_websocket
 * @ClassName WebSocketConfig
 * @Author: liutao
 * @Description: websocket配置类
 * @Create: 2022-08-19 18:42
 * @Version 1.0
 **/
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

配置websocket服务

package com.websocket.springboot_websocket.websocket;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @Program: springboot_websocket
 * @ClassName WebsocketServer
 * @Author: liutao
 * @Description: websocket服务
 * @Create: 2022-08-19 18:52
 * @Version 1.0
 **/
@Slf4j
@Component
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {
//    在线人数
    private static int onlineCount;
//    当前会话
    private Session session;
//    用户唯一标识
    private String userId;

    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();

    /**
     * concurrent包的线程安全set,用来存放每个客户端对应的MyWebSocket对象
     */
    private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap();

    /**
     * 为了保存在线用户信息,在方法中新建一个list存储一下【实际项目依据复杂度,可以存储到数据库或者缓存】
     */
    private final static List<Session> SESSIONS = Collections.synchronizedList(new ArrayList<>());

 /**
  * @methodName: onOpen
  * @description: 建立连接
  * @Author LiuTao
  * @param  [session, userId]
  * @updateTime 2022/8/19 19:31
  * @return void
  * @throws
  **/
 @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        this.userId = userId;
        webSocketSet.add(this);
        SESSIONS.add(session);
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
            webSocketMap.put(userId,this);
        } else {
            webSocketMap.put(userId,this);
            addOnlineCount();
        }
        log.info("[连接ID:{}] 建立连接, 当前连接数:{}", this.userId, getOnlineCount());
    }

   /**
    * @methodName: onClose
    * @description: 断开连接
    * @Author LiuTao
    * @param  []
    * @updateTime 2022/8/19 19:31
    * @return void
    * @throws
    **/
   @OnClose
    public void onClose() {
        webSocketSet.remove(this);
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
            subOnlineCount();
        }
        log.info("[连接ID:{}] 断开连接, 当前连接数:{}", userId, getOnlineCount());
    }

    /**
     * @methodName: onError
     * @description: 发送错误
     * @Author LiuTao
     * @param  [session, error]
     * @updateTime 2022/8/19 19:32
     * @return void
     * @throws
     **/
    @OnError
    public void onError(Session session, Throwable error) {
        log.info("[连接ID:{}] 错误原因:{}", this.userId, error.getMessage());
        error.printStackTrace();
    }

   /**
    * @methodName: onMessage
    * @description: 收到消息
    * @Author LiuTao
    * @param  [message]
    * @updateTime 2022/8/19 19:32
    * @return void
    * @throws
    **/
   @OnMessage
    public void onMessage(String message) {
        log.info("[连接ID:{}] 收到消息:{}", this.userId, message);
    }

    /**
     * @methodName: sendMessage
     * @description: 发送消息
     * @Author LiuTao
     * @param  [message, userId]
     * @updateTime 2022/8/19 19:32
     * @return void
     * @throws
     **/
    public void sendMessage(String message,Long userId) {
        WebSocketServer webSocketServer = webSocketMap.get(String.valueOf(userId));
        if (webSocketServer!=null){
            log.info("【websocket消息】推送消息,[toUser]userId={},message={}", userId,message);
            try {
                webSocketServer.session.getBasicRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("[连接ID:{}] 发送消息失败, 消息:{}", this.userId, message, e);
            }
        }
    }

    /**
     * @methodName: sendMassMessage
     * @description: 群发消息
     * @Author LiuTao
     * @param  [message]
     * @updateTime 2022/8/19 19:33
     * @return void
     * @throws
     **/
    public void sendMassMessage(String message) {
        try {
            for (Session session : SESSIONS) {
                if (session.isOpen()) {
                    session.getBasicRemote().sendText(message);
                    log.info("[连接ID:{}] 发送消息:{}",session.getRequestParameterMap().get("userId"),message);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取当前连接数
     * @return
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * 当前连接数加一
     */
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    /**
     * 当前连接数减一
     */
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }

}

web接口

package com.websocket.springboot_websocket.web;


import com.alibaba.fastjson.JSONObject;
import com.websocket.springboot_websocket.websocket.WebSocketServer;
import lombok.Data;
import lombok.experimental.Accessors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Program: springboot_websocket
 * @ClassName WebSocketController
 * @Author: liutao
 * @Description: websocket web层
 * @Create: 2022-08-19 19:01
 * @Version 1.0
 **/
@RestController
@RequestMapping("/ws")
public class WebSocketController {
    @Autowired
    private WebSocketServer webSocketServer;

    /**
     * 消息发送
     */
    @GetMapping("/send/{userId}/{msg}")
    public void send(@PathVariable String msg, @PathVariable String userId){
            webSocketServer.sendMessage(JSONObject.toJSONString(msg), Long.valueOf(String.valueOf(userId)));
    }

    /**
     * 群发消息测试(给当前连接用户发送)
     */
    @GetMapping("/sendMassMessage")
    public void sendMassMessage(){
        WebsocketResponse response = new WebsocketResponse();
        response.setTitle("群发主题");
        webSocketServer.sendMassMessage(JSONObject.toJSONString(response));
    }

    @Data
    @Accessors(chain = true)
    public static class WebsocketResponse {
        private String title;
        private String userId;
        private String userName;
        private int age;
    }

}

测试效果图

进入websocket在线调式工具 http://wstool.jackxiang.com/

先cmd - ipconfig 查看ipv4地址

打开连接1

ws://192.168.31.145:8080/websocket/1

打开连接2

ws://192.168.31.145:8080/websocket/2

向指定用户发送消息:http://localhost:8080/ws/send/1/测试发给1/http://localhost:8080/ws/send/2/测试发给2

群发消息:http://localhost:8080/ws/sendMassMessage

后台

源码下载

此处内容需要回复后并刷新才能查看

结尾

ok,到这里我们的webscoket学习就结束了,通过这个代码我们就可以实现简单的聊天和群聊实现数据的即时通讯

阅读剩余
THE END