如何通过SSE来实现ChatpGPT的对话推送功能
SEE (Server-sent events) 是一种通过http推送消息连接技术。 前端通过 web api EventSource 来跟后端建立连接。 后端可以通过这个连接发送任意的字符串数据。 SEE 的 MIME 请求类型 是text/event-stream
SSE 跟 Websocket 差异
由于SSE基于HTTP协议,因此它可以在大多数现代浏览器中使用,并且无需进行额外的协议升级。
适用场景:SSE适合于需要服务器向客户端实时推送数据的场景,例如股票价格更新、新闻实时推送等。而WebSocket则适合于需要实时双向通信的场景,如聊天应用、多人在线协作编辑等。
代码示例
测试全部代码
核心代码
SseEmitter管理
package xyz.zhiwei.spring_life.sse;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* @author zhiweicoding.xyz
* @date 01/12/2024
* @email diaozhiwei2k@gmail.com
*/
public class SseSession {
private static Map<String, SseEmitter> sessionMap = new ConcurrentHashMap<>();
public static void add(String sessionKey,SseEmitter sseEmitter){
if(sessionMap.get(sessionKey) != null){
throw new SseException("
User exists!");
}
sessionMap.put(sessionKey, sseEmitter);
}
public static boolean exists(String sessionKey){
return sessionMap.get(sessionKey) != null;
}
public static boolean remove(String sessionKey){
SseEmitter sseEmitter = sessionMap.get(sessionKey);
if(sseEmitter != null){
sseEmitter.complete();
}
return false;
}
public static void onError(String sessionKey,Throwable throwable){
SseEmitter sseEmitter = sessionMap.get(sessionKey);
if(sseEmitter != null){
sseEmitter.completeWithError(throwable);
}
}
public static void send(String sessionKey,String content) throws IOException {
sessionMap.get(sessionKey).send(content);
}
}
方法调用
package xyz.zhiwei.spring_life.sse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author zhiweicoding.xyz
* @date 01/12/2024
* @email diaozhiwei2k@gmail.com
*/
@Service
@Slf4j
public class SseServerImpl implements SseServer {
@Override
public SseEmitter connect(String userId) {
if (SseSession.exists(userId)) {
SseSession.remove(userId);
}
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onError((err) -> {
log.error("type: SseSession Error, msg: {} session Id : {}", err.getMessage(), userId);
SseSession.onError(userId, err);
});
sseEmitter.onTimeout(() -> {
log.info("type: SseSession Timeout, session Id : {}", userId);
SseSession.remove(userId);
});
sseEmitter.onCompletion(() -> {
log.info("type: SseSession Completion, session Id : {}", userId);
SseSession.remove(userId);
});
SseSession.add(userId, sseEmitter);
//一个简单的定时
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
executorService.schedule(() -> {
try {
SseSession.send(userId, "heart");
} catch (IOException e) {
throw new RuntimeException(e);
}
}, 5, TimeUnit.SECONDS);
return sseEmitter;
}
@Override
public boolean send(String userId, String content) {
if (SseSession.exists(userId)) {
try {
SseSession.send(userId, content);
return true;
} catch (IOException exception) {
log.error("type: SseSession send Erorr:IOException, msg: {} session Id : {}", exception.getMessage(), userId);
}
} else {
throw new SseException("User Id " + userId + " not Found");
}
return false;
}
@Override
public boolean close(String userId) {
log.info("type: SseSession Close, session Id : {}", userId);
return SseSession.remove(userId);
}
}
js调用后端的服务
<!doctype html>
<html lang="en">
<head>
<title>sse demo</title>
</head>
<body>
<div>sse demo</div>
your user id : <input type="text" id="connectionUserId" value=""></input>
<button type="button" id="connectionBtn"> connection</button>
<div id="infoChat" class="">
your user id : <span id="userId"></span> <br/>
user id <input type="text" id="toUserId"></input> <br/>
msg : <input type="text" id="message"></input> <br/>
<button type="button" id="sendBtn"> send</button>
<button type="button" id="closeBtn"> close</button>
</div>
<div id="result"></div>
</body>
</html>
<script>
var defaultUrl = 'http://localhost:8082/sse';
const Http = new XMLHttpRequest();
var userId;
var connectiionBtn = document.querySelector("#connectionBtn");
var sendBtn = document.querySelector("#sendBtn");
var closeBtn = document.querySelector('#closeBtn')
connectiionBtn.onclick = () => {
userId = document.getElementById("connectionUserId").value;
console.log(userId);
let source = new EventSource(`${defaultUrl}/subscribe/${userId}`);
source.onmessage = (event) => {
text = document.getElementById('result').innerText;
text += '\n' + event.data;
document.getElementById('result').innerText = text;
};
source.onopen = (event) => {
text = document.getElementById('result').innerText;
text += '\n subscribe success';
console.log(event);
document.getElementById('result').innerText = text;
document.getElementById('userId').innerText = userId;
};
source.onerror = (err) => {
console.log(err);
document.getElementById('result').innerText = err;
}
}
sendBtn.onclick = async () => {
let toUserId = document.getElementById("toUserId").value;
let msg = document.getElementById("message").value;
const data = {
userId: toUserId,
msg: msg
};
const response = await fetch(`${defaultUrl}/send/${userId}`, {
method: 'POST',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
response.text().then((data) => {
if (data != "Success") {
alert(data)
}
});
}
closeBtn.onclick = () => {
fetch(`${defaultUrl}/close/${userId}`).then((res) => {
alert('close success');
})
}
</script>
License:
CC BY 4.0