Skip to content

s13 - 连线与执行

s12 的画布上摆满了节点,你可以拖拽、连线、配置。但它只是一张好看的图——点「运行」什么都不会发生。 这一章要做的事情:让这张图真正跑起来。

这一章要解决什么问题

你画了一个工作流:Start → LLM → Condition → End。每个节点都配好了参数。但这些节点之间是孤立的——LLM 不知道 Start 传了什么进来,Condition 不知道 LLM 的输出是什么,End 收不到任何东西。

你需要一个执行引擎:按正确的顺序跑完所有节点,把上游的输出传给下游的输入,让数据在节点之间流动起来。

先想清楚:图怎么跑

节点和连线构成了一张有向图。Start 是起点,End 是终点,中间可能有分支(Condition)和汇聚。

执行这张图的关键问题是:什么顺序执行?

不能随便排。一个节点必须等它的所有上游执行完才能开始——因为它的输入依赖上游的输出。这叫拓扑排序

举个例子:

Start → LLM → Condition → EndA (true分支)
                   ↓ false
                  EndB

合法的执行顺序:

  1. Start(没有上游,直接执行)
  2. LLM(依赖 Start)
  3. Condition(依赖 LLM)
  4. EndA 或 EndB(依赖 Condition 的结果)

以下顺序是非法的:

  • LLM → Start → ...(LLM 依赖 Start,不能先跑)

拓扑排序:Kahn's Algorithm

拓扑排序的经典算法。思路很简单:

  1. 统计每个节点的入度(有几条边指向它)
  2. 把入度为 0 的节点放进队列
  3. 从队列取一个节点出来,把它指向的节点的入度减 1
  4. 如果某个节点的入度变成 0,放进队列
  5. 重复直到队列空了

执行引擎的核心就在 app/engine/executor.ts

typescript
export function topologicalSort(nodes: FlowNode[], edges: FlowEdge[]): string[] {
  const inDegree: Record<string, number> = {};
  const adjacency: Record<string, string[]> = {};

  for (const node of nodes) {
    inDegree[node.id] = 0;
    adjacency[node.id] = [];
  }

  for (const edge of edges) {
    adjacency[edge.source].push(edge.target);
    inDegree[edge.target] = (inDegree[edge.target] || 0) + 1;
  }

  const queue: string[] = [];
  for (const id in inDegree) {
    if (inDegree[id] === 0) queue.push(id);
  }

  const result: string[] = [];
  while (queue.length > 0) {
    const current = queue.shift()!;
    result.push(current);
    for (const neighbor of adjacency[current]) {
      inDegree[neighbor]--;
      if (inDegree[neighbor] === 0) queue.push(neighbor);
    }
  }

  if (result.length !== nodes.length) {
    throw new Error("节点图中存在循环依赖,无法执行");
  }

  return result;
}

注意最后的检查:如果排序结果的长度不等于节点数,说明图里有环(A → B → C → A),这种图没法执行。

数据怎么在节点之间流动

每个节点执行完,输出写入一个共享的执行上下文(Execution Context):

typescript
interface ExecutionContext {
  outputs: Record<string, string>;  // 节点 ID → 输出
  currentNodeId: string | null;
  logs: ExecutionLog[];
  aborted: boolean;
}

下游节点通过模板语法引用上游的输出。比如 LLM 节点的 prompt 可以写:

请总结以下内容:{{start.user_input}}

执行引擎会把 \{\{start.user_input\}\} 替换成 Start 节点的输出。这就是 resolveTemplate 函数干的事:

typescript
function resolveTemplate(template: string, context: ExecutionContext): string {
  return template.replace(/\{\{(\w+)(?:\.(\w+))?\}\}/g, (_, nodeId, field) => {
    const output = context.outputs[nodeId];
    if (!output) return `[${nodeId} 无输出]`;
    if (field) {
      try {
        const parsed = JSON.parse(output);
        return parsed[field] ?? `[${nodeId}.${field} 不存在]`;
      } catch {
        return output;
      }
    }
    return output;
  });
}

\{\{nodeId\}\} 引用整个输出,\{\{nodeId.field\}\} 从 JSON 输出中取某个字段。

节点执行器:Runner 模式

每种节点类型有自己的执行逻辑。用一个 runners.ts 把它们分离开:

typescript
// Start:初始化变量
class StartRunner implements NodeRunner {
  async run(node, context) {
    return node.config.variables || "{}";
  }
}

// LLM:调模型 API
class LLMRunner implements NodeRunner {
  async run(node, context) {
    const prompt = resolveTemplate(node.config.prompt, context);
    const res = await fetch("/api/execute-llm", {
      method: "POST",
      body: JSON.stringify({ prompt, systemPrompt: node.config.system_prompt }),
    });
    const data = await res.json();
    return data.content;
  }
}

// Condition:求值表达式
class ConditionRunner implements NodeRunner {
  async run(node, context) {
    const expression = resolveTemplate(node.config.expression, context);
    const result = evaluateExpression(expression);
    return result ? "true" : "false";
  }
}

执行引擎遍历排序后的节点列表,找到对应的 runner,调用 runner.run(node, context),把返回值存进 context.outputs[nodeId]

这种模式叫策略模式——每种节点类型是一个策略,执行引擎不关心具体实现,只调用统一的接口。以后加新的节点类型,写一个新的 runner 注册进去就行,不用改执行引擎。

条件分支怎么处理

Condition 节点返回 "true""false"。问题来了:它后面的两条路(true 分支和 false 分支)都排在拓扑序里,但只能走一条。

解决办法:在执行阶段检查。每个边(edge)可以有一个 sourcePort 字段,值是 "true""false"。执行引擎在跑某个节点之前,检查它的前驱是不是条件节点、条件结果是什么,决定跳不跳过:

typescript
function shouldSkipNode(nodeId, nodeMap, outEdges, context): boolean {
  // 找到所有指向这个节点的边
  for (const sourceId in outEdges) {
    for (const edge of outEdges[sourceId]) {
      if (edge.target !== nodeId) continue;

      const sourceNode = nodeMap[sourceId];
      if (sourceNode.type !== "condition") continue;

      const conditionResult = context.outputs[sourceId];
      if (conditionResult === "false" && edge.sourcePort !== "false") return true;
      if (conditionResult === "true" && edge.sourcePort === "false") return true;
    }
  }
  return false;
}

拓扑排序保证了条件节点一定先执行,所以检查时一定能拿到结果。

视觉反馈:节点在跑的时候会亮

执行引擎通过回调函数通知 UI:

typescript
await executeWorkflow(nodes, edges, {
  onNodeStatusChange: (id, status) => setNodeStatus(id, status),
  onLog: (log) => addExecutionLog(log),
  onOutput: (id, output) => setNodeOutput(id, output),
  onComplete: () => setRunning(false),
  onError: (id, error) => { ... },
});

每个节点有四种状态:pendingrunningcompleted / error。状态变化实时反映在画布上——正在跑的节点边框变黄并脉动,完成的变绿,出错的变红。连线颜色也会跟着变。

ExecutionPanel 在右下角显示实时日志:

14:32:01 ○ [Start] 开始执行
14:32:01 ● [Start] 执行完成
14:32:01 ○ [LLM] 开始执行
14:32:05 ● [LLM] 执行完成
14:32:05 ○ [Condition] 开始执行
14:32:05 ● [Condition] 条件结果: true

文件结构

code/s13/
├── app/
│   ├── engine/
│   │   ├── types.ts          # 执行引擎的类型定义
│   │   ├── runners.ts        # 每种节点的执行器
│   │   └── executor.ts       # 拓扑排序 + 执行循环
│   ├── api/
│   │   ├── execute-llm/route.ts   # LLM 调用 API
│   │   └── execute-tool/route.ts  # 工具执行 API
│   ├── store/
│   │   └── workflowStore.ts  # Zustand 状态管理
│   ├── components/
│   │   ├── Canvas.tsx        # 画布 + 连线 + 状态着色
│   │   ├── SidePanel.tsx     # 节点配置编辑器
│   │   ├── ExecutionPanel.tsx # 执行日志面板
│   │   └── nodes/
│   │       ├── BaseNode.tsx  # 节点公共外壳 + 状态指示器
│   │       ├── StartNode.tsx
│   │       ├── LLMNode.tsx
│   │       ├── ToolNode.tsx
│   │       ├── ConditionNode.tsx
│   │       └── EndNode.tsx
│   ├── page.tsx              # 主页面 + 运行按钮
│   ├── layout.tsx
│   └── globals.css
├── package.json
└── tsconfig.json

运行

bash
cd code/s13
npm install
npm run dev

打开 http://localhost:3000,试试:

  1. 点击左侧的 Start、LLM、End 按钮,在画布上创建三个节点
  2. 从 Start 的底部端口拖拽到 LLM 的顶部端口,画一条连线
  3. 从 LLM 拖拽到 End
  4. 点击 Start,在右侧配置变量:{"user_input": "用一句话解释什么是 Agent"}
  5. 点击 LLM,配置 prompt:\{\{start.user_input\}\}
  6. 点击「运行」,观察节点依次变色,右下角出现执行日志

试着改改

1. 加一个 echo 工具节点

在 LLM 和 End 之间加一个 Tool 节点,tool_name 设为 echoarguments 设为 {"input": "\{\{llm.output\}\}"}。观察 LLM 的输出如何流经 Tool 到达 End。

2. 试条件分支

搭一个 Start → LLM → Condition → End(True) + End(False) 的图。Condition 的表达式设为 contains(\{\{llm.output\}\}, "是")。分别用不同的 user_input 跑,观察走哪条路。

3. 故意制造循环

连一个 A → B → C → A 的环,点运行。观察报错信息:"节点图中存在循环依赖,无法执行"。

4. 加延时效果

在某个 runner 里加 await new Promise(r => setTimeout(r, 1000)),观察执行面板里日志的时间间隔,感受节点是串行执行的。

教学边界

这一章只做一件事:让工作流真正执行——按拓扑序遍历节点图,数据在节点之间流动,UI 实时反馈执行状态。

不涉及:

  • 并行执行(目前是串行的,一个节点跑完才跑下一个)
  • 执行中断和恢复
  • 错误重试和回滚
  • 历史执行记录和回放
  • 子流程(节点内部嵌套另一张图)

这些都在后面的章节里。

一句话记住

拓扑排序决定执行顺序,执行上下文让数据在节点之间流动——这就是工作流引擎的全部秘密。