Skip to content

7. 流式传输和多轮交互(LangGraph 示例)

Helloworld 示例演示了 A2A 的基本机制。对于更高级的功能,如强大的流式传输、任务状态管理和由 LLM 驱动的多轮对话,我们将转向位于 a2a-samples/samples/python/agents/langgraph/ 的 LangGraph 示例。

此示例具有一个"货币智能体",它使用 Gemini 模型通过 LangChain 和 LangGraph 来回答货币转换问题。

设置 LangGraph 示例

  1. 创建一个 Gemini API 密钥,如果您还没有的话。

  2. 环境变量:

    a2a-samples/samples/python/agents/langgraph/ 目录中创建一个 .env 文件:

    echo "GOOGLE_API_KEY=YOUR_API_KEY_HERE" > .env
    

    YOUR_API_KEY_HERE 替换为您的实际 Gemini API 密钥。

  3. 安装依赖项(如果尚未涵盖):

    langgraph 示例有自己的 pyproject.toml,其中包含 langchain-google-genailanggraph 等依赖项。当您使用 pip install -e .[dev]a2a-samples 根目录安装 SDK 时,这应该也安装了工作区示例的依赖项,包括 langgraph-example。如果遇到导入错误,请确保从根目录成功安装了主 SDK。

运行 LangGraph 服务器

在终端中导航到 a2a-samples/samples/python/agents/langgraph/app 目录,并确保您的虚拟环境(来自 SDK 根目录)已激活。

启动 LangGraph 智能体服务器:

python __main__.py

这将启动服务器,通常在 http://localhost:10000

与 LangGraph 智能体交互

打开一个新终端窗口,激活您的虚拟环境,并导航到 a2a-samples/samples/python/agents/langgraph/app

运行其测试客户端:

python test_client.py

现在您可以通过在运行 __main__.py 的终端窗口中按 Ctrl+C 来关闭服务器。

展示的关键功能

langgraph 示例展示了几个重要的 A2A 概念:

  1. LLM 集成

    • agent.py 定义了 CurrencyAgent。它使用 ChatGoogleGenerativeAI 和 LangGraph 的 create_react_agent 来处理用户查询。
    • 这演示了真实的 LLM 如何为智能体的逻辑提供动力。
  2. 任务状态管理

    • samples/langgraph/__main__.py 使用 InMemoryTaskStore 初始化 DefaultRequestHandler

      
      
    • CurrencyAgentExecutor(在 samples/langgraph/agent_executor.py 中),当其 execute 方法被 DefaultRequestHandler 调用时,与包含当前任务(如果有的话)的 RequestContext 交互。

    • 对于 message/sendDefaultRequestHandler 使用 TaskStore 在交互之间持久化和检索任务状态。对 message/send 的响应将是完整的 Task 对象,如果智能体的执行流程涉及多个步骤或导致持久化任务。
    • test_client.pyrun_single_turn_test 演示了获取 Task 对象并使用 get_task 查询它。
  3. 使用 TaskStatusUpdateEventTaskArtifactUpdateEvent 进行流式传输

    • CurrencyAgentExecutor 中的 execute 方法负责处理非流式和流式请求,由 DefaultRequestHandler 编排。
    • 当 LangGraph 智能体处理请求时(可能涉及调用 get_exchange_rate 等工具),CurrencyAgentExecutor 将不同类型的事件加入 EventQueue
      • TaskStatusUpdateEvent:用于中间更新(例如,"正在查找汇率...","正在处理汇率..")。这些事件上的 final 标志为 False
      • TaskArtifactUpdateEvent:当最终答案准备就绪时,它作为工件加入队列。lastChunk 标志为 True
      • 带有 state=TaskState.completedfinal=True 的最终 TaskStatusUpdateEvent 被发送,以表示流式传输任务的结束。
    • test_client.pyrun_streaming_test 函数将在从服务器接收时打印这些单独的事件块。
  4. 多轮对话(TaskState.input_required

    • CurrencyAgent 可以在查询不明确时要求澄清(例如,用户问"100 美元是多少?")。
    • 当这种情况发生时,CurrencyAgentExecutor 将加入一个 TaskStatusUpdateEvent,其中 status.stateTaskState.input_requiredstatus.message 包含智能体的问题(例如,"您想转换成哪种货币?")。此事件对于当前交互流将具有 final=True
    • test_client.pyrun_multi_turn_test 函数演示了这一点:
      • 它发送一个初始的不明确查询。
      • 智能体响应(通过处理加入事件的 DefaultRequestHandler)返回一个状态为 input_requiredTask
      • 客户端然后发送第二条消息,包括第一次 Task 响应中的 taskIdcontextId,以提供缺失的信息("英镑")。这继续同一任务。

探索代码

花一些时间查看这些文件:

  • __main__.py:使用 A2AStarletteApplicationDefaultRequestHandler 的服务器设置。注意 AgentCard 定义包括 capabilities.streaming=True
  • agent.py:具有 LangGraph、LLM 模型和工具定义的 CurrencyAgent
  • agent_executor.py:实现 execute(和 cancel)方法的 CurrencyAgentExecutor。它使用 RequestContext 来理解正在进行的任务,使用 EventQueue 发送各种事件(TaskStatusUpdateEventTaskArtifactUpdateEvent,如果没有任务存在,则通过第一个事件隐式发送新 Task 对象)。
  • test_client.py:演示各种交互模式,包括检索任务 ID 并将其用于多轮对话。

此示例提供了更丰富的说明,展示了 A2A 如何促进智能体之间复杂、有状态和异步的交互。