Skip to content

s07 - 流式输出

你发一条消息,等了五秒,屏幕上突然蹦出一大段文字。 体验很差。不是因为模型慢,而是因为你什么都看不到。 这一章要做的事情,是让文字一个一个蹦出来——像有人在实时打字一样。

这一章要解决什么问题

s06 的聊天界面已经能用了。你打字,发请求,模型回复,消息出现在屏幕上。

但有一个体验问题:从你发送消息到看到回复,中间有一段空白期。 模型在生成,但你什么都看不到。等模型生成完了,一大段文字一次性出现。

如果回复很短,三五个字,这没什么。但如果模型写了一段 200 字的回答,你要等好几秒才能看到第一个字。

这不是模型的问题——模型其实是一边生成一边输出的。问题出在你的代码:你用了 await response.json(),等整个响应完成才拿到结果。

这一章的目标:让模型的回复像打字一样一个一个出现。

为什么用 SSE

模型 API 支持一种模式:你发请求的时候加一个参数 stream: true,服务器就不会等生成完了再返回完整结果,而是每生成几个 token 就发一段数据过来。

这种"服务器主动往客户端推数据"的机制叫 Server-Sent Events(SSE)

对比一下:

普通请求(s06 的做法)

客户端 → 服务器:给我回答
(等 5 秒)
客户端 ← 服务器:这是完整回答

SSE 流式请求

客户端 → 服务器:给我回答(stream: true)
客户端 ← 服务器:你
客户端 ← 服务器:好
客户端 ← 服务器:,
客户端 ← 服务器:我
客户端 ← 服务器:是
客户端 ← 服务器:模型
客户端 ← 服务器:。
(连接保持打开,直到生成完毕)

每收到一小段,前端就可以立刻显示。用户看到的就是"打字效果"。

代码

两个文件要改:后端 API 路由和前端页面。

后端:流式 API 路由

app/api/chat/route.ts

typescript
import OpenAI from "openai";

const client = new OpenAI({
  apiKey: process.env.DEEPSEEK_API_KEY,
  baseURL: "https://api.deepseek.com",
});

export async function POST(req: Request) {
  const { messages } = await req.json();

  const stream = await client.chat.completions.create({
    model: "deepseek-chat",
    messages,
    stream: true,
  });

  const encoder = new TextEncoder();

  const readable = new ReadableStream({
    async start(controller) {
      for await (const chunk of stream) {
        const text = chunk.choices[0]?.delta?.content || "";
        if (text) {
          controller.enqueue(encoder.encode(`data: ${text}\n\n`));
        }
      }
      controller.enqueue(encoder.encode("data: [DONE]\n\n"));
      controller.close();
    },
  });

  return new Response(readable, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

拆开看。

1. stream: true

typescript
const stream = await client.chat.completions.create({
  model: "deepseek-chat",
  messages,
  stream: true,
});

就加了一个参数。stream: true 告诉 DeepSeek:"不要等生成完再返回,生成一点就发一点。"

返回的 stream 不是一个完整的 response 对象,而是一个异步迭代器。你可以用 for await ... of 逐个拿到每个 chunk。

2. 每个 chunk 长什么样

typescript
for await (const chunk of stream) {
  const text = chunk.choices[0]?.delta?.content || "";
  // ...
}

每个 chunk 是一个对象,结构大概是:

json
{
  "choices": [{
    "delta": { "content": "你" },
    "finish_reason": null
  }]
}

注意字段名变了:普通请求用的是 message.content,流式请求用的是 delta.contentdelta 的意思是"增量"——每次只给你新生成的那一点点。

3. 手动构建 SSE 响应

typescript
const encoder = new TextEncoder();

const readable = new ReadableStream({
  async start(controller) {
    for await (const chunk of stream) {
      const text = chunk.choices[0]?.delta?.content || "";
      if (text) {
        controller.enqueue(encoder.encode(`data: ${text}\n\n`));
      }
    }
    controller.enqueue(encoder.encode("data: [DONE]\n\n"));
    controller.close();
  },
});

SSE 的格式很简单:每条消息以 data: 开头,以 \n\n(两个换行)结尾。最后发一条 data: [DONE] 告诉客户端"生成完了"。

ReadableStream 是 Web 标准 API,用来创建一个可读的流。controller.enqueue() 往流里塞数据,controller.close() 关闭流。

TextEncoder 把字符串转成字节——流里传的是字节不是字符串。

前端:消费流

app/page.tsx

typescript
"use client";

import { useState, useRef, useEffect } from "react";

type Message = {
  role: "user" | "assistant";
  content: string;
};

export default function ChatPage() {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState("");
  const [isLoading, setIsLoading] = useState(false);
  const bottomRef = useRef<HTMLDivElement>(null);

  useEffect(() => {
    bottomRef.current?.scrollIntoView({ behavior: "smooth" });
  }, [messages]);

  async function handleSubmit(e: React.FormEvent) {
    e.preventDefault();
    if (!input.trim() || isLoading) return;

    const userMessage: Message = { role: "user", content: input };
    const newMessages = [...messages, userMessage];
    setMessages(newMessages);
    setInput("");
    setIsLoading(true);

    // 先加一条空的 assistant 消息,后续往里追加
    setMessages([...newMessages, { role: "assistant", content: "" }]);

    const response = await fetch("/api/chat", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ messages: newMessages }),
    });

    const reader = response.body!.getReader();
    const decoder = new TextDecoder();
    let assistantText = "";

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value, { stream: true });
      const lines = chunk.split("\n\n");

      for (const line of lines) {
        if (line.startsWith("data: ") && line !== "data: [DONE]") {
          const text = line.slice(6);
          assistantText += text;
          setMessages([
            ...newMessages,
            { role: "assistant", content: assistantText },
          ]);
        }
      }
    }

    setIsLoading(false);
  }

  return (
    <div className="flex flex-col h-screen max-w-2xl mx-auto p-4">
      <h1 className="text-2xl font-bold mb-4">Build An Agent - s07</h1>

      <div className="flex-1 overflow-y-auto space-y-4 mb-4">
        {messages.map((msg, i) => (
          <div
            key={i}
            className={`p-3 rounded-lg ${
              msg.role === "user"
                ? "bg-blue-100 ml-12"
                : "bg-gray-100 mr-12"
            }`}
          >
            <p className="text-sm text-gray-500 mb-1">
              {msg.role === "user" ? "你" : "模型"}
            </p>
            <p className="whitespace-pre-wrap">
              {msg.content}
              {isLoading &&
                i === messages.length - 1 &&
                msg.role === "assistant" && (
                  <span className="inline-block w-2 h-5 bg-gray-800 ml-0.5 animate-pulse" />
                )}
            </p>
          </div>
        ))}
        <div ref={bottomRef} />
      </div>

      <form onSubmit={handleSubmit} className="flex gap-2">
        <input
          value={input}
          onChange={(e) => setInput(e.target.value)}
          placeholder="输入消息..."
          className="flex-1 border rounded-lg px-4 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"
          disabled={isLoading}
        />
        <button
          type="submit"
          disabled={isLoading}
          className="bg-blue-500 text-white px-6 py-2 rounded-lg hover:bg-blue-600 disabled:opacity-50"
        >
          发送
        </button>
      </form>
    </div>
  );
}

拆开看关键变化。

1. 先加一条空消息

typescript
setMessages([...newMessages, { role: "assistant", content: "" }]);

发送请求之前,先往 messages 里加一条 content 为空的 assistant 消息。这样 UI 上会立刻出现一个"模型"的对话气泡,虽然内容是空的,但用户知道模型在响应了。

2. 读取流

typescript
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let assistantText = "";

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const chunk = decoder.decode(value, { stream: true });
  const lines = chunk.split("\n\n");

  for (const line of lines) {
    if (line.startsWith("data: ") && line !== "data: [DONE]") {
      const text = line.slice(6);
      assistantText += text;
      setMessages([
        ...newMessages,
        { role: "assistant", content: assistantText },
      ]);
    }
  }
}

这段是核心。response.body 是一个 ReadableStream.getReader() 拿到一个读取器。每次调 reader.read() 返回 { done, value }

  • donetrue 表示流结束了
  • value 是一个 Uint8Array(字节数组)

TextDecoder 把字节转回字符串。注意 { stream: true } 参数——这告诉 decoder "这段数据可能不完整,别急着报错"。因为一个 UTF-8 中文字符可能被拆到两个 chunk 里。

拿到字符串之后,按 \n\n 分割,解析出 data: 后面的文本,拼到 assistantText 里,然后更新 messages。每次更新都会触发 React 重新渲染,用户就看到文字在一点点出现。

3. 闪烁光标

typescript
{isLoading &&
  i === messages.length - 1 &&
  msg.role === "assistant" && (
    <span className="inline-block w-2 h-5 bg-gray-800 ml-0.5 animate-pulse" />
  )}

流式输出的时候,最后一条 assistant 消息后面跟一个闪烁的竖线。animate-pulse 是 Tailwind 内置的动画,让元素透明度来回变化,模拟光标闪烁。流结束、isLoading 变成 false,光标消失。

跑起来

bash
cd code/s07
npm install
npm run dev

打开 http://localhost:3000,发一条消息。你应该能看到:

  1. 立刻出现一个空的"模型"气泡
  2. 文字一个一个出现,像有人在打字
  3. 生成完毕后光标消失

如果回复比较长(比如让它写一段代码),效果会更明显。

SSE 协议长什么样

你在浏览器 DevTools 的 Network 面板里能看到实际传输的数据。打开 Network 标签,发一条消息,找到 /api/chat 请求,点 EventStream 标签(Chrome)或 Response 标签(Firefox),你会看到:

data: 你

data: 好

data: ,

data: 我

data: 是

data: DeepSeek

data: 。

data: [DONE]

就这么简单。每条消息一行,两个换行分隔。最后 [DONE] 结束。

这也是为什么 SSE 比 WebSocket 更适合这个场景——它就是纯文本协议,不需要额外的握手、连接管理。服务器往客户端推数据,一推到底。

试着改改

1. 打印每个 chunk 的原始数据

for await 循环里加一行:

typescript
for await (const chunk of stream) {
  console.log(JSON.stringify(chunk));
  // ...
}

看看每个 chunk 的完整结构。你会发现第一个 chunk 的 delta.content 可能是空的——模型在"思考",还没开始输出文字。

2. 改变更新频率

现在的代码是每个 chunk 都触发一次 setMessages。如果 chunk 很多,React 会频繁重渲染。试试只在 assistantText 长度变化超过 5 个字符时才更新:

typescript
if (assistantText.length % 5 === 0) {
  setMessages([...newMessages, { role: "assistant", content: assistantText }]);
}

3. 加一个"停止生成"按钮

isLoadingtrue 时显示一个"停止"按钮,点击后调 reader.cancel() 中断流。这需要把 reader 提到 state 或 ref 里。

4. 看看 stream 结束时的最后一个 chunk

最后一个有意义的 chunk 的 finish_reason 会是 "stop" 而不是 null。你可以用这个来判断生成是否正常结束。

教学边界

这一章只做一件事:让模型的回复逐字出现。

不涉及:

  • 多轮对话的历史管理(s06 已经做了)
  • 工具调用的流式处理(tool_calls 的 delta 格式不同,后面章节处理)
  • 错误重试或断线续传
  • token 用量统计
  • 打字速度控制(匀速 vs. 真实速度模拟)

一句话记住

流式输出的关键不是"快",而是"让用户看到过程"。stream: true 让模型一边生成一边推数据,前端用 ReadableStream 逐字拼接——用户看到的不再是等待,而是打字。