" />
文章

如何通过SSE来实现ChatpGPT的对话推送功能

SEE (Server-sent events) 是一种通过http推送消息连接技术。 前端通过 web api EventSource 来跟后端建立连接。 后端可以通过这个连接发送任意的字符串数据。 SEE 的 MIME 请求类型 是text/event-stream

SSE 跟 Websocket 差异

SSE

Websocket

单通道连接只支持接收来自后端的消息

双工通道,能收发消息

支持自动重连

不支持需要手动编码

由于SSE基于HTTP协议,因此它可以在大多数现代浏览器中使用,并且无需进行额外的协议升级。

适用场景:SSE适合于需要服务器向客户端实时推送数据的场景,例如股票价格更新、新闻实时推送等。而WebSocket则适合于需要实时双向通信的场景,如聊天应用、多人在线协作编辑等。

代码示例

测试全部代码

github代码示例

核心代码

  1. SseEmitter管理

package xyz.zhiwei.spring_life.sse;
​
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
​
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
​
/**
 * @author zhiweicoding.xyz
 * @date 01/12/2024
 * @email diaozhiwei2k@gmail.com
 */
public class SseSession {
​
​
    private static Map<String, SseEmitter> sessionMap = new ConcurrentHashMap<>();
​
    public static void add(String sessionKey,SseEmitter sseEmitter){
        if(sessionMap.get(sessionKey) != null){
            throw new SseException("
User exists!");
        }
        sessionMap.put(sessionKey, sseEmitter);
    }
​
    public static boolean exists(String sessionKey){
        return sessionMap.get(sessionKey) != null;
    }
​
    public static boolean remove(String sessionKey){
        SseEmitter sseEmitter = sessionMap.get(sessionKey);
        if(sseEmitter != null){
            sseEmitter.complete();
        }
        return false;
    }
​
    public static void onError(String sessionKey,Throwable throwable){
        SseEmitter sseEmitter = sessionMap.get(sessionKey);
        if(sseEmitter != null){
            sseEmitter.completeWithError(throwable);
        }
    }
​
    public static void send(String sessionKey,String content) throws IOException {
        sessionMap.get(sessionKey).send(content);
    }
​
}
  1. 方法调用

package xyz.zhiwei.spring_life.sse;
​
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
​
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
​
/**
 * @author zhiweicoding.xyz
 * @date 01/12/2024
 * @email diaozhiwei2k@gmail.com
 */
@Service
@Slf4j
public class SseServerImpl implements SseServer {
​
​
    @Override
    public SseEmitter connect(String userId) {
        if (SseSession.exists(userId)) {
            SseSession.remove(userId);
        }
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitter.onError((err) -> {
            log.error("type: SseSession Error, msg: {} session Id : {}", err.getMessage(), userId);
            SseSession.onError(userId, err);
        });
​
        sseEmitter.onTimeout(() -> {
            log.info("type: SseSession Timeout, session Id : {}", userId);
            SseSession.remove(userId);
        });
​
        sseEmitter.onCompletion(() -> {
            log.info("type: SseSession Completion, session Id : {}", userId);
            SseSession.remove(userId);
        });
        SseSession.add(userId, sseEmitter);
​
        //一个简单的定时
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
        executorService.schedule(() -> {
            try {
                SseSession.send(userId, "heart");
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, 5, TimeUnit.SECONDS);
        return sseEmitter;
    }
​
    @Override
    public boolean send(String userId, String content) {
        if (SseSession.exists(userId)) {
            try {
                SseSession.send(userId, content);
                return true;
            } catch (IOException exception) {
                log.error("type: SseSession send Erorr:IOException, msg: {} session Id : {}", exception.getMessage(), userId);
            }
        } else {
            throw new SseException("User Id " + userId + " not Found");
        }
        return false;
    }
​
    @Override
    public boolean close(String userId) {
        log.info("type: SseSession Close, session Id : {}", userId);
        return SseSession.remove(userId);
    }
​
}
​
  1. js调用后端的服务

<!doctype html>
<html lang="en">
​
<head>
    <title>sse demo</title>
</head>
​
<body>
<div>sse demo</div>
​
your user id : <input type="text" id="connectionUserId" value=""></input>
<button type="button" id="connectionBtn"> connection</button>
​
<div id="infoChat" class="">
    your user id : <span id="userId"></span> <br/>
    user id <input type="text" id="toUserId"></input> <br/>
    msg : <input type="text" id="message"></input> <br/>
    <button type="button" id="sendBtn"> send</button>
    <button type="button" id="closeBtn"> close</button>
</div>
​
<div id="result"></div>
​
​
</body>
​
</html>
​
​
<script>
    var defaultUrl = 'http://localhost:8082/sse';
​
    const Http = new XMLHttpRequest();
​
    var userId;
​
    var connectiionBtn = document.querySelector("#connectionBtn");
​
    var sendBtn = document.querySelector("#sendBtn");
​
    var closeBtn = document.querySelector('#closeBtn')
​
    connectiionBtn.onclick = () => {
        userId = document.getElementById("connectionUserId").value;
        console.log(userId);
        let source = new EventSource(`${defaultUrl}/subscribe/${userId}`);
        source.onmessage = (event) => {
            text = document.getElementById('result').innerText;
            text += '\n' + event.data;
            document.getElementById('result').innerText = text;
        };
​
​
        source.onopen = (event) => {
            text = document.getElementById('result').innerText;
            text += '\n subscribe success';
            console.log(event);
            document.getElementById('result').innerText = text;
            document.getElementById('userId').innerText = userId;
        };
​
        source.onerror = (err) => {
            console.log(err);
            document.getElementById('result').innerText = err;
        }
    }
​
    sendBtn.onclick = async () => {
        let toUserId = document.getElementById("toUserId").value;
        let msg = document.getElementById("message").value;
        const data = {
            userId: toUserId,
            msg: msg
        };
​
        const response = await fetch(`${defaultUrl}/send/${userId}`, {
            method: 'POST',
            headers: {
                'Accept': 'application/json',
                'Content-Type': 'application/json'
            },
            body: JSON.stringify(data)
        });
​
        response.text().then((data) => {
            if (data != "Success") {
                alert(data)
            }
        });
​
    }
​
​
    closeBtn.onclick = () => {
        fetch(`${defaultUrl}/close/${userId}`).then((res) => {
            alert('close success');
        })
    }
​
​
</script>

License:  CC BY 4.0