s07 - 流式输出
你发一条消息,等了五秒,屏幕上突然蹦出一大段文字。 体验很差。不是因为模型慢,而是因为你什么都看不到。 这一章要做的事情,是让文字一个一个蹦出来——像有人在实时打字一样。
这一章要解决什么问题
s06 的聊天界面已经能用了。你打字,发请求,模型回复,消息出现在屏幕上。
但有一个体验问题:从你发送消息到看到回复,中间有一段空白期。 模型在生成,但你什么都看不到。等模型生成完了,一大段文字一次性出现。
如果回复很短,三五个字,这没什么。但如果模型写了一段 200 字的回答,你要等好几秒才能看到第一个字。
这不是模型的问题——模型其实是一边生成一边输出的。问题出在你的代码:你用了 await response.json(),等整个响应完成才拿到结果。
这一章的目标:让模型的回复像打字一样一个一个出现。
为什么用 SSE
模型 API 支持一种模式:你发请求的时候加一个参数 stream: true,服务器就不会等生成完了再返回完整结果,而是每生成几个 token 就发一段数据过来。
这种"服务器主动往客户端推数据"的机制叫 Server-Sent Events(SSE)。
对比一下:
普通请求(s06 的做法)
客户端 → 服务器:给我回答
(等 5 秒)
客户端 ← 服务器:这是完整回答SSE 流式请求
客户端 → 服务器:给我回答(stream: true)
客户端 ← 服务器:你
客户端 ← 服务器:好
客户端 ← 服务器:,
客户端 ← 服务器:我
客户端 ← 服务器:是
客户端 ← 服务器:模型
客户端 ← 服务器:。
(连接保持打开,直到生成完毕)每收到一小段,前端就可以立刻显示。用户看到的就是"打字效果"。
代码
两个文件要改:后端 API 路由和前端页面。
后端:流式 API 路由
app/api/chat/route.ts:
import OpenAI from "openai";
const client = new OpenAI({
apiKey: process.env.DEEPSEEK_API_KEY,
baseURL: "https://api.deepseek.com",
});
export async function POST(req: Request) {
const { messages } = await req.json();
const stream = await client.chat.completions.create({
model: "deepseek-chat",
messages,
stream: true,
});
const encoder = new TextEncoder();
const readable = new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const text = chunk.choices[0]?.delta?.content || "";
if (text) {
controller.enqueue(encoder.encode(`data: ${text}\n\n`));
}
}
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
controller.close();
},
});
return new Response(readable, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}拆开看。
1. stream: true
const stream = await client.chat.completions.create({
model: "deepseek-chat",
messages,
stream: true,
});就加了一个参数。stream: true 告诉 DeepSeek:"不要等生成完再返回,生成一点就发一点。"
返回的 stream 不是一个完整的 response 对象,而是一个异步迭代器。你可以用 for await ... of 逐个拿到每个 chunk。
2. 每个 chunk 长什么样
for await (const chunk of stream) {
const text = chunk.choices[0]?.delta?.content || "";
// ...
}每个 chunk 是一个对象,结构大概是:
{
"choices": [{
"delta": { "content": "你" },
"finish_reason": null
}]
}注意字段名变了:普通请求用的是 message.content,流式请求用的是 delta.content。delta 的意思是"增量"——每次只给你新生成的那一点点。
3. 手动构建 SSE 响应
const encoder = new TextEncoder();
const readable = new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const text = chunk.choices[0]?.delta?.content || "";
if (text) {
controller.enqueue(encoder.encode(`data: ${text}\n\n`));
}
}
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
controller.close();
},
});SSE 的格式很简单:每条消息以 data: 开头,以 \n\n(两个换行)结尾。最后发一条 data: [DONE] 告诉客户端"生成完了"。
ReadableStream 是 Web 标准 API,用来创建一个可读的流。controller.enqueue() 往流里塞数据,controller.close() 关闭流。
TextEncoder 把字符串转成字节——流里传的是字节不是字符串。
前端:消费流
app/page.tsx:
"use client";
import { useState, useRef, useEffect } from "react";
type Message = {
role: "user" | "assistant";
content: string;
};
export default function ChatPage() {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState("");
const [isLoading, setIsLoading] = useState(false);
const bottomRef = useRef<HTMLDivElement>(null);
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
}, [messages]);
async function handleSubmit(e: React.FormEvent) {
e.preventDefault();
if (!input.trim() || isLoading) return;
const userMessage: Message = { role: "user", content: input };
const newMessages = [...messages, userMessage];
setMessages(newMessages);
setInput("");
setIsLoading(true);
// 先加一条空的 assistant 消息,后续往里追加
setMessages([...newMessages, { role: "assistant", content: "" }]);
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages: newMessages }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let assistantText = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split("\n\n");
for (const line of lines) {
if (line.startsWith("data: ") && line !== "data: [DONE]") {
const text = line.slice(6);
assistantText += text;
setMessages([
...newMessages,
{ role: "assistant", content: assistantText },
]);
}
}
}
setIsLoading(false);
}
return (
<div className="flex flex-col h-screen max-w-2xl mx-auto p-4">
<h1 className="text-2xl font-bold mb-4">Build An Agent - s07</h1>
<div className="flex-1 overflow-y-auto space-y-4 mb-4">
{messages.map((msg, i) => (
<div
key={i}
className={`p-3 rounded-lg ${
msg.role === "user"
? "bg-blue-100 ml-12"
: "bg-gray-100 mr-12"
}`}
>
<p className="text-sm text-gray-500 mb-1">
{msg.role === "user" ? "你" : "模型"}
</p>
<p className="whitespace-pre-wrap">
{msg.content}
{isLoading &&
i === messages.length - 1 &&
msg.role === "assistant" && (
<span className="inline-block w-2 h-5 bg-gray-800 ml-0.5 animate-pulse" />
)}
</p>
</div>
))}
<div ref={bottomRef} />
</div>
<form onSubmit={handleSubmit} className="flex gap-2">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="输入消息..."
className="flex-1 border rounded-lg px-4 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"
disabled={isLoading}
/>
<button
type="submit"
disabled={isLoading}
className="bg-blue-500 text-white px-6 py-2 rounded-lg hover:bg-blue-600 disabled:opacity-50"
>
发送
</button>
</form>
</div>
);
}拆开看关键变化。
1. 先加一条空消息
setMessages([...newMessages, { role: "assistant", content: "" }]);发送请求之前,先往 messages 里加一条 content 为空的 assistant 消息。这样 UI 上会立刻出现一个"模型"的对话气泡,虽然内容是空的,但用户知道模型在响应了。
2. 读取流
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let assistantText = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split("\n\n");
for (const line of lines) {
if (line.startsWith("data: ") && line !== "data: [DONE]") {
const text = line.slice(6);
assistantText += text;
setMessages([
...newMessages,
{ role: "assistant", content: assistantText },
]);
}
}
}这段是核心。response.body 是一个 ReadableStream,.getReader() 拿到一个读取器。每次调 reader.read() 返回 { done, value }:
done为true表示流结束了value是一个Uint8Array(字节数组)
TextDecoder 把字节转回字符串。注意 { stream: true } 参数——这告诉 decoder "这段数据可能不完整,别急着报错"。因为一个 UTF-8 中文字符可能被拆到两个 chunk 里。
拿到字符串之后,按 \n\n 分割,解析出 data: 后面的文本,拼到 assistantText 里,然后更新 messages。每次更新都会触发 React 重新渲染,用户就看到文字在一点点出现。
3. 闪烁光标
{isLoading &&
i === messages.length - 1 &&
msg.role === "assistant" && (
<span className="inline-block w-2 h-5 bg-gray-800 ml-0.5 animate-pulse" />
)}流式输出的时候,最后一条 assistant 消息后面跟一个闪烁的竖线。animate-pulse 是 Tailwind 内置的动画,让元素透明度来回变化,模拟光标闪烁。流结束、isLoading 变成 false,光标消失。
跑起来
cd code/s07
npm install
npm run dev打开 http://localhost:3000,发一条消息。你应该能看到:
- 立刻出现一个空的"模型"气泡
- 文字一个一个出现,像有人在打字
- 生成完毕后光标消失
如果回复比较长(比如让它写一段代码),效果会更明显。
SSE 协议长什么样
你在浏览器 DevTools 的 Network 面板里能看到实际传输的数据。打开 Network 标签,发一条消息,找到 /api/chat 请求,点 EventStream 标签(Chrome)或 Response 标签(Firefox),你会看到:
data: 你
data: 好
data: ,
data: 我
data: 是
data: DeepSeek
data: 。
data: [DONE]就这么简单。每条消息一行,两个换行分隔。最后 [DONE] 结束。
这也是为什么 SSE 比 WebSocket 更适合这个场景——它就是纯文本协议,不需要额外的握手、连接管理。服务器往客户端推数据,一推到底。
试着改改
1. 打印每个 chunk 的原始数据
在 for await 循环里加一行:
for await (const chunk of stream) {
console.log(JSON.stringify(chunk));
// ...
}看看每个 chunk 的完整结构。你会发现第一个 chunk 的 delta.content 可能是空的——模型在"思考",还没开始输出文字。
2. 改变更新频率
现在的代码是每个 chunk 都触发一次 setMessages。如果 chunk 很多,React 会频繁重渲染。试试只在 assistantText 长度变化超过 5 个字符时才更新:
if (assistantText.length % 5 === 0) {
setMessages([...newMessages, { role: "assistant", content: assistantText }]);
}3. 加一个"停止生成"按钮
在 isLoading 为 true 时显示一个"停止"按钮,点击后调 reader.cancel() 中断流。这需要把 reader 提到 state 或 ref 里。
4. 看看 stream 结束时的最后一个 chunk
最后一个有意义的 chunk 的 finish_reason 会是 "stop" 而不是 null。你可以用这个来判断生成是否正常结束。
教学边界
这一章只做一件事:让模型的回复逐字出现。
不涉及:
- 多轮对话的历史管理(s06 已经做了)
- 工具调用的流式处理(tool_calls 的 delta 格式不同,后面章节处理)
- 错误重试或断线续传
- token 用量统计
- 打字速度控制(匀速 vs. 真实速度模拟)
一句话记住
流式输出的关键不是"快",而是"让用户看到过程"。stream: true 让模型一边生成一边推数据,前端用 ReadableStream 逐字拼接——用户看到的不再是等待,而是打字。