您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

解密 SSE,像 ChatGPT 一样返回流式响应

时间:2023-11-20 14:31:10  来源:微信公众号  作者:古明地觉的编程教室

我们知道目前的 HTTP/1.1 采用的是标准的请求-响应模型,客户端主动发请求,服务端被动地返回响应。这种模型在客户端需要实时获取结果的场景下是不合适的,因为这意味着客户端需要不断地轮询,所以最好的做法是服务端生成结果之后,主动推送给客户端。

比如 ChatGPT,它在生成内容时,也是生成一部分,就主动向客户端推送一部分。而在这个过程中,客户端不需要做任何事情,只需等待 ChatGPT 服务端返回内容即可。

说到这儿,你肯定想到了 WebSocket,没错这是一种解决方案。但 WebSocket 太重了,它和 HTTP 都是基于 TCP 的应用层传输协议,只不过在握手的时候搭了 HTTP 的便车,利用 HTTP 本身的协议升级特性,伪装成 HTTP,这样就能绕过浏览器沙箱、网络防火墙等限制。

当完成握手之后,后续传输的数据就不再是 HTTP 报文,而是 WebSocket 格式的二进制帧。所以这两者完全是不同的协议,那有没有一种办法,我们仍然使用 HTTP 协议,同时还能让服务端主动推送数据呢?

答案是有的,也就是本文将要介绍的 SSE 技术,它的英文全称是 Server-Sent Events(服务端推送事件)。通过 SSE 可以让服务端即时推送数据到客户端,而不需要客户端轮询服务端以获取更新。

解密 SSE,像 ChatGPT 一样返回流式响应图片

到这你可能会问,那 WebSocket 和 SSE 有什么区别呢?

1)通信方式

WebSocket 提供全双工通信,服务端和客户端都可以在同一个连接上同时发送和接收数据。最重要的是,WebSocket 独立于 HTTP 协议,尽管它开始于一个 HTTP 握手。

SSE 仅提供服务端到客户端的单向通信,客户端不能通过 SSE 给服务端发信息。

2)协议和实现

WebSocket 使用自己的协议(ws:// 或 wss://),需要服务端和客户端都支持,并且协议比较复杂。

SSE 则是使用标准的 HTTP 协议,实现起来更简单,尤其是在服务端。

3)适用场景

WebSocket 适用于服务端和客户端之间双向实时通信的场景,如在线游戏、聊天应用等。

SSE 适用于服务端向客户端单向推送数据的场景,如消息通知、数据更新。并且 SSE 自动支持断线重连,而 WebSocket 则需要额外部署。

4)复杂性和资源使用

WebSocket 由于其双向通信的能力,通常比 SSE 更复杂,可能需要更多的资源来维护和管理连接。

SSE 因为其单向性和基于 HTTP 的特性,它可以利用现有的网络基础设施,如代理服务器、负载均衡器和防火墙等等,通常更容易实现和维护。

 

相信现在你已经明白 SSE 是做什么的了,它的目的就是让服务端能够主动推送数据给客户端。如果不需要和服务端动态交互,只是希望服务端在有数据的时候推过来,那么 WebSocket 就有些太重了,因为这意味着要替换 HTTP 协议,而使用 SSE 无疑是更好的选择。

SSE 是什么我们已经知道了,那它是怎么实现的呢?原理是什么呢?

1)建立连接

客户端发起一个标准的 HTTP 请求来开启 SSE 会话,这个请求的特殊之处在于它包含一个头字段。

Accept: text/event-stream

相当于客户端告诉服务端,期望接收 SSE 消息流。而服务端在看到该字段时,也知道这是一个 SSE 请求,于是立即向客户端返回响应头,注意:返回的只有响应头,里面会包含如下头字段。

Content-Type: text/event-stream

响应头返回之后标志着 SSE 连接成功建立,并且连接会保持开放状态,服务端后续可以随时通过此连接向客户端发送数据。此外当连接不小心断开时,客户端也会自动进行重连。

所以在普通的 HTTP 请求中,一旦服务端返回,那么请求结束了。虽然可以将 Connection 头字段设置为 keep-alive 保证连接不断开,但每次访问都包含了 HTTP 请求/响应的完整过程。

而在 SSE 中,服务端会保持一个开放的连接,只要有新数据可用,就会直接发送给客户端。所以服务端会将响应以流的形式发送给客户端,每次发送的消息都是响应流的一部分,而不是独立的 HTTP 响应。

因此 SSE 的服务端在发送数据时,并不遵循传统的一次请求,一次响应模式。它在建立连接之后会保持连接开放,并通过这个持续的连接流式地发送数据,这种方式就使得 SSE 非常适合实时数据推送的场景。

2)发送消息

客户端发送请求,服务端返回响应头之后,SSE 连接就建立成功了。此时客户端只需要躺平,安静地等待服务端的输出即可。所以现在的关键就在于服务端要返回什么格式的数据呢?很简单,一个基本的消息由以下几部分组成:

  • data:实际的消息数据;
  • id:可选,消息的唯一标识符,用于在连接重新建立时同步消息;
  • event:可选,定义事件类型,用于客户端区分消息的类型;
  • retry:可选,自动重连的时间(毫秒),如果连接中断,客户端在自动重新连接之前,需要等待多长时间;

注意:每个消息要以两个换行符(nn)结束,举个例子,我们发送一个 Hello World。

data: Hello Worldnn

也可以发送带有事件类型的消息:

event: userUpdate
data: {"username": "Serpen", "age": 18}nn

还是比较简单的,服务端可以保持连接并随时发送更多数据。然后客户端在收到时会进行处理,但不需要(也不能)对服务端作出任何回应,它只需要被动地接收来自服务端的数据即可。当服务端认为数据已经全部发送完毕、无需再发时,那么便可以主动断开连接。

关于 SSE 的原理我们就解释清楚了,下面来实际编程实现它,这里我们先使用原生的 asyncio 实现 SSE。

import asyncio
from asyncio import StreamReader, StreamWriter

class SSE:

    def __init__(self, host="0.0.0.0", port=9999):
        self.host = host
        self.port = port

    @staticmethod
    def parse_request_headers(data: bytes) -> dict:
        """
        此函数负责从原始字节流中解析出请求头
        """
        headers = data.split(b"rnrn")[0].split(b"rn")
        header_dict = {}
        for header in headers[1:]:
            key, val = header.decode("utf-8").split(":", 1)
            header_dict[key.lower()] = val.strip()
        return header_dict

    async def handler_requests(self,
                               reader: StreamReader,
                               writer: StreamWriter):
        """
        负责处理来自客户端的请求
        每来一个客户端连接,就会基于此函数创建一个协程
        并且自动传递两个参数:reader 和 writer
        reader.read  负责读取数据,等价于 socket.recv
        writer.write 负责发送数据,等价于 socket.send
        """
        # 获取客户端的请求报文,这里对请求方法、请求地址不做限制
        data = awAIt reader.readuntil(b"rnrn")
        # 解析出请求头
        request_headers = self.parse_request_headers(data)
        # 简单检测一下 accept 字段,如果不是建立 SSE,那么直接关闭连接
        if request_headers.get("accept") != "text/event-stream":
            writer.close()
            return await writer.wait_closed()
        # 如果是 SSE 连接,那么返回响应头
        response_header = (
            b"HTTP/1.1 200 OKrn"
            b"Content-Type: text/event-streamrn"
            b"Cache-Control: no-cachern"
            b"Connection: keep-alivern"
            b'Access-Control-Allow-Origin: *rn'
            b"rn"
        )
        writer.write(response_header)
        await writer.drain()

        # 然后便可以不断地向客户端返回数据了
        for _ in range(5):
            # 每隔 1 秒返回数据
            data = "data: 高老师总能分享出好东西rnrn".encode("utf-8")
            writer.write(data)
            await writer.drain()
            await asyncio.sleep(1)
        # 数据传输完毕
        writer.close()
        await writer.wait_closed()

    async def __create_server(self):
        # 创建服务,第一个参数是一个回调函数
        # 当连接过来的时候就会根据此函数创建一个协程
        # 后面是绑定的 ip 和 端口
        server = await asyncio.start_server(self.handler_requests,
                                            self.host,
                                            self.port)
        # 然后开启无限循环
        async with server:
            await server.serve_forever()

    def run_server(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__create_server())

if __name__ == '__main__':
    sse = SSE()
    sse.run_server()

服务端代码编写完毕,下面编写前端代码。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <style>
        #data {
            font-weight: bold;
            color: cadetblue;
            font-size: large;
        }
    </style>
</head>
<body>
    <h1>SSE Test</h1>
    <div id="data"></div>
    <script>
        document.addEventListener("DOMContentLoaded", function () {
            // 和服务端建立 SSE 连接
            var eventSource = new EventSource("http://localhost:9999");

            eventSource.onmessage = function (e) {
                // 将数据渲染在 <div id="data"></div> 的内部
                var data = e.data + "n";
                document.getElementById('data').innerText += data;
            };

            eventSource.onerror = function (e) {
                console.error('Error occurred:', e);
                eventSource.close();
            };
        });
    </script>
</body>
</html>

代码编写完毕,我们用浏览器打开 HTML 文件,便可看到如下效果。

解密 SSE,像 ChatGPT 一样返回流式响应

以上我们就简单实现了 SSE,当然为了加深印象,这里的后端是使用原生的 asyncio 编写的,但在工作中,我们会使用现成的 Web 框架,比如 FastAPI,Blacksheep 等等。

需要说明的是,虽然通过 SSE 技术可以实现类似 ChatGPT 的效果,但 ChatGPT 内部并没有用到 SSE,它内部是基于 HTTP 的分块传输实现的。因为 SSE 只能通过 GET 请求发出,并且无法自定义请求头。

如果想实现 ChatGPT 的效果,需要使用 HTTP 的分块传输。而像 FastAPI、BlackSheep 等框架提供的流式响应,便是基于 HTTP 的分块传输实现的,比如 FastAPI:

import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

App = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async def event_generator():
    for _ in range(5):
        # 每隔 1 秒返回数据
        data = "data: 高老师总能分享出好东西rnrn".encode("utf-8")
        yield data
        await asyncio.sleep(1)

@app.get("/")
async def sse():
    return StreamingResponse(event_generator(), 
                             media_type="text/event-stream")

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=9999)

首先之前的前端代码依旧可以正常访问,通过修改数据格式和 Content-Type 可以让其支持 SSE。但最正确的做法是直接访问 localhost:9999,效果如下:

解密 SSE,像 ChatGPT 一样返回流式响应图片

所以基于 StreamingResponse 可以实现 SSE,也可以直接访问。而直接访问的话,此时里面的 data: 和 rn 就是实体数据的一部分。并且这种方式和 ChatGPT 的工作机制是相似的,都使用了 HTTP 的分块传输,支持所有的请求方法,而 SSE 只支持 GET 请求。

BlackSheep 也是类似的,它同样也支持流式响应。

import asyncio
from blacksheep import Application, Response, StreamedContent
import uvicorn

app = Application()
app.use_cors(
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async def event_generator():
    for _ in range(5):
        # 每隔 1 秒返回数据
        data = "data: 高老师总能分享出好东西rnrn".encode("utf-8")
        yield data
        await asyncio.sleep(1)

@app.router.get("/")
async def sse():
    return Response(
        200,
        cnotallow=StreamedContent(b"text/event-stream", event_generator),
    )

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=9999)

可以测试一下,效果是一样的。如果你不想实现 SSE,只是希望固定的数据以流的形式一点一点返回,那么记得将数据中多余的 data: 和 rn 给去掉,并最好修改 Content-Type 为合适的类型。

所以 SSE 一般用于需要服务端推数据,但数据不知道什么时候会过来,于是通过 SSE 保持连接开放。后续当服务端有数据了,直接通过连接发送给客户端即可。

而 FastAPI 和 BlackSheep 提供的流式响应更像是,返回的数据比较庞大,如果全部准备好再一次性返回,会让用户陷入长时间的等待,造成不好的体验。于是通过分块传输,准备好一部分就返回一部分。虽然整体时间没变,但可以让用户立刻获取到数据,从而提升用户体验。

比如 ChatGPT,当它回答的内容比较多的时候,那么整个过程耗费几十秒钟是常有的事情,假设 30 秒。相比让用户等待 30 秒,然后内容一下子刷出来,显然生成一部分返回一部分这种方式更让人喜欢。

因此使用 SSE 还是流式响应,则取决于你当前的业务。如果你返回的数据是确定的,只是准备的时间比较长,或者数据量比较大,那么推荐使用流式响应。

至于 SSE,在这些现成的 Web 框架里面,也可以通过流式响应来实现,只需要将 Content-Type 设置为 text/event-stream,并将数据加上前缀 data: 和后缀 rnrn。

但说实话,如果想实现 SSE,不建议通过流式响应来实现,而是使用专门的库。以 FastAPI 为例:

from sse_starlette.sse import EventSourceResponse

FastAPI 其实就是在 starlette 的基础上套了一层壳,通过安装 sse_starlette 可以让 FastAPI 更好地支持 SSE。

以上就是本文的内容,如果对你有帮助,就点个赞吧。



Tags:ChatGPT   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除。
▌相关推荐
解密 SSE,像 ChatGPT 一样返回流式响应
我们知道目前的 HTTP/1.1 采用的是标准的请求-响应模型,客户端主动发请求,服务端被动地返回响应。这种模型在客户端需要实时获取结果的场景下是不合适的,因为这意味着客户端需...【详细内容】
2023-11-20  Tags: ChatGPT  点击:(0)  评论:(0)  加入收藏
人工智能界大震动!OpenAI CEO奥特曼被董事会罢免,ChatGPT怎么办
11 月 18 日,OpenAI 官方宣布,CEO 和董事会成员萨姆&middot;奥特曼(Sam Altman)将离开公司,由 CTO Mira Murati 暂时接任 CEO。奥特曼是 OpenAI 的创始人之一,也是 ChatGPT 的主要...【详细内容】
2023-11-20  Tags: ChatGPT  点击:(2)  评论:(0)  加入收藏
OpenAI换血大震动始末:“ChatGPT之父”奥特曼,缘何被“扫地出门”?
谁也没料到,2023年以ChatGPT掀起人工智能技术革命的科技巨头OpenAI,先革职了自己的首席执行官(CEO)奥特曼,震惊整个科技圈。截至时代周报发稿前,奥特曼最新动态是对后续股权切割事...【详细内容】
2023-11-20  Tags: ChatGPT  点击:(4)  评论:(0)  加入收藏
ChatGPT之父突然被开除:“政变”是如何发生的,对AI世界意味着什么
“在一间明亮的会议室中,奥特曼站在房间的中央,他的表情显得有些沮丧。会议桌上散落着文件和文件夹,象征着变动的决定。身后的墙上挂着一块OpenAl的标志现在看起来有些孤独。房...【详细内容】
2023-11-18  Tags: ChatGPT  点击:(3)  评论:(0)  加入收藏
MIT学者独家撰文:ChatGPT的瓶颈与解药
| 甲子光年科技产业智库,作者|罗鸿胤,编辑|王博、苏霍伊*本文为麻省理工学院(MIT)学者罗鸿胤独家供稿,「甲子光年」经其授权后编辑发布。罗鸿胤是人工智能领域的青年科学家、MIT 计...【详细内容】
2023-11-17  Tags: ChatGPT  点击:(12)  评论:(0)  加入收藏
自然语言数据处理:ChatGPT与DataFocus的震撼力量
在大数据时代,自然语言处理(NLP)技术对于数据分析和商业智能应用具有重要意义。当我们将ChatGPT和DataFocus结合使用时,可以大大提高自然语言处理的效果,并为企业提供更高效、更...【详细内容】
2023-11-17  Tags: ChatGPT  点击:(5)  评论:(0)  加入收藏
号称“可穿在身上”的ChatGPT,爆火的Ai Pin是创新还是又一次概念炒作?
Ai Pin虽好,取代手机还早。一块没有屏幕、如同玩具一般大小的“小方块”,还没上市就被《时代》杂志评选为“2023年度发明”。美东时间11月9日,初创公司Humane发布了这款名为“A...【详细内容】
2023-11-13  Tags: ChatGPT  点击:(7)  评论:(0)  加入收藏
谷歌提出6条通用人工智能分级,ChatGPT只在初级
【导读】谷歌DeepMind创始人Shane Legg带领的研究团队发表了一篇关于AGI时间表的论文。他指出,LLM已经是AGI雏形,提出了6条定义AGI的标准。而且根据AI能力,他们提出了5个AGI的...【详细内容】
2023-11-13  Tags: ChatGPT  点击:(16)  评论:(0)  加入收藏
Bard与ChatGPT的较量,谁将成为智能语言模型的领跑者?
文章概要: ChatGPT的流行与缺陷: ChatGPT在过去一年中变得非常流行,但它在信息的准确性和相关性方面出现了下滑。 Bard的开发与特点: 虽然Google在开发Bard时看似落后,但他们专注...【详细内容】
2023-11-13  Tags: ChatGPT  点击:(13)  评论:(0)  加入收藏
值得关注的十款ChatGPT优秀插件
译者 | 陈峻审校 | 重楼您想从 ChatGPT 中获得更多收益吗?您是否考虑过使用 ChatGPT 插件?OpenAI的ChatGPT Plus付费订阅版本为其用户提供了针对数百款插件的访问。而这些插件...【详细内容】
2023-11-13  Tags: ChatGPT  点击:(7)  评论:(0)  加入收藏
▌简易百科推荐
解密 SSE,像 ChatGPT 一样返回流式响应
我们知道目前的 HTTP/1.1 采用的是标准的请求-响应模型,客户端主动发请求,服务端被动地返回响应。这种模型在客户端需要实时获取结果的场景下是不合适的,因为这意味着客户端需...【详细内容】
2023-11-20  古明地觉的编程教室  微信公众号  Tags:ChatGPT   点击:(0)  评论:(0)  加入收藏
RabbitMQ插件开发指南:定制化你的消息队列
RabbitMQ是一个功能强大的消息队列系统,它提供了灵活的插件机制,使用户能够定制化自己的消息队列。下面将为您介绍RabbitMQ插件开发的指南,让您能够根据自己的需求编写定制化的...【详细内容】
2023-11-20  编程技术汇  今日头条  Tags:RabbitMQ   点击:(4)  评论:(0)  加入收藏
读懂这一篇,集群节点不下线
问题一直在发生1. I&#39;m NotReady阿里云有自己的 Kubernetes 容器集群产品。随着 Kubernetes 集群出货量的剧增,线上用户零星的发现,集群会非常低概率地出现节点 NotReady...【详细内容】
2023-11-20    云原生运维圈  Tags:集群   点击:(2)  评论:(0)  加入收藏
GitHub:程序员正积极使用 AI 编程、JavaScript 语言依然最流行
IT之家 11 月 20 日消息,GitHub 发布了 2023 年度 Octoverse 开源状态报告,其中主要强调了 AI 在开发过程中的作用,并围绕云和 Git 的开源活动展开。官方介绍称,今年的三大趋势...【详细内容】
2023-11-20    IT之家  Tags:AI 编程   点击:(3)  评论:(0)  加入收藏
分片并不意味着分布式
Sharding(分片)是一种将数据和负载分布到多个独立的数据库实例的技术。这种方法通过将原始数据集分割为分片来利用水平可扩展性,然后将这些分片分布到多个数据库实例中。1*yg3P...【详细内容】
2023-11-20  小技术君  微信公众号  Tags:分布式   点击:(3)  评论:(0)  加入收藏
Angular怎么还没死
作者丨Mohit Pandey编译丨诺亚出品 | 51CTO技术栈(微信号:blog51cto)Angular已经完全改版了。“欢迎来到Angular的复兴,”Angular官方在X(twitter)上发布了这样一条帖子。图片正如...【详细内容】
2023-11-20    51CTO  Tags:Angular   点击:(3)  评论:(0)  加入收藏
Vite 5 正式发布,性能大幅提
Vite 5 现已发布,这是 Vite 发展道路上的又一个重要里程碑。新版本采用了 Rollup 4,大大提升了构建性能;此外还带来了一些新选项,可用于提高开发服务器的性能。公告指出,Vite 5...【详细内容】
2023-11-20    OSC开源社区  Tags:Vite   点击:(3)  评论:(0)  加入收藏
如何有效减少 AI 模型的数据中心能源消耗?
在让人工智能变得更好的竞赛中,麻省理工学院(MIT)林肯实验室正在开发降低功耗、高效训练和透明能源使用的方法。在 Google 上搜索航班时,您可能已经注意到,现在每个航班的碳排放...【详细内容】
2023-11-17  AI技术和商业思维  微信公众号  Tags:AI 模型   点击:(12)  评论:(0)  加入收藏
Nacos配置中心的Pull原理,附源码
在单体服务时代,关于配置信息,管理一套配置文件即可。而拆分成微服务之后,每一个系统都会有自己的配置,并且都各不相同,有些配置还需要动态改变,以达到动态降级、切流量、扩缩容等...【详细内容】
2023-11-17  哪吒编程  微信公众号  Tags:Nacos   点击:(7)  评论:(0)  加入收藏
如何在零信任世界中实现API安全性?
随着传统网络边界的不断变化,零信任架构(Zero Trust architecture, ZTA)已经从一个讨论话题,转变为许多企业组织积极推进采用的切实计划。然而,在许多企业所制定的ZTA应用计划中,...【详细内容】
2023-11-17  安全牛  微信公众号  Tags:API   点击:(8)  评论:(0)  加入收藏
站内最新
站内热门
站内头条