最近搭了个大模型对话界面,但传统的请求方式只能等大模型全部输出之后再传输至前端,导致使用者以为界面或接口卡住了。

另外由于是微服务无状态,使用WebSocket做长连接还得考虑重连等逻辑,而且引入了复杂度。SSE默认支持断线重连,基于HTTP协议,传输文本信息轻量简单。

在实践过程中也遇到了一些坑,请阅读下方代码注释。

JAVA

遇到问题:

  • Nginx缓存消息,导致消息无法流式输出。需要关闭Nginx缓存,直接推流。
  • new SseEmitter(),默认30秒断连,需要指定new SseEmitter(0L)即不限制消息推送时间。
  • SSE只能传输单行数据,且无法识别空格和换行,因此需要将空格和回车替换为占位符。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@ResponseBody
public SseEmitter handleSseRequest(HttpServletRequest request, HttpServletResponse response) {
// 关闭Nginx缓存,直接推流
response.setHeader("Cache-Control", "no-cache");
response.setHeader("X-Accel-Buffering", "no");
// 流式事件流响应
response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
// 默认30秒断连,改为0则不限制
SseEmitter emitter = new SseEmitter(0L);

// 每100毫秒推送一个消息,总共30个消息
Flowable.interval(100, TimeUnit.MILLISECONDS)
.take(30)
.map(i -> "SSE COUNTER: " + i + "&#92n&#92n")
.subscribe(
data -> {
try {
emitter.send(SseEmitter.event().data(data));
} catch (Exception e) {
emitter.completeWithError(e);
}
},
emitter::completeWithError,
emitter::complete
);
return emitter;
}

前端

遇到问题:

  • 由于EventSource只支持get方式,可以建立连接时通过字段传值。
  • SSE只能传输单行数据,且无法识别空格和换行,因此需要将空格和回车替换为占位符。
  • 建议累加接收数据,并持续解析全量数据为HTML,实现ChatGPT即时解析,提升使用体感。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 由于EventSource只支持get方式,可以建立连接时通过字段传值
let eventSource = new EventSource(target + "?content=" + encodeURIComponent(content));
let sseContent = ""
eventSource.onmessage = function(event) {
let data = event.data

try {
let ssedata = event.data
// SSE只能传输单行数据,且无法识别空格和换行,因此需要将空格和回车替换为占位符
ssedata = ssedata.replaceAll(" ", " ").replaceAll("&#92n", "\n")
// 累加接收数据
sseContent += ssedata
// 将Markdown数据持续解析为HTML,实现类似ChatGPT的返回实时解析,体感很好
document.getElementById("msg_" + msgidx).innerHTML = convertMarkdownToHtml(sseContent)
} catch(e) {
}

// 锁定div滚动条到界面底端
let objDiv = document.getElementsByClassName("lite-chatbox").item(0);
objDiv.scrollTop = objDiv.scrollHeight;
};

eventSource.onerror = function(event) {
// 直接关闭即可,如网路异常则自动重连
eventSource.close()
};