Skip to content

长时间运行任务的流式传输和异步操作

Agent2Agent (A2A) 协议明确设计用于处理可能不会立即完成的任务。许多 AI 驱动的操作通常是长时间运行的,涉及多个步骤,产生增量结果,或需要人工干预。A2A 提供了管理此类异步交互的机制,确保客户端能够有效接收更新,无论它们保持持续连接还是以更断开的方式运行。

使用服务器发送事件 (SSE) 的流式传输

对于产生增量结果(如生成长文档或流媒体)或提供持续状态更新的任务,A2A 支持使用服务器发送事件 (SSE) 进行实时通信。当客户端能够与 A2A 服务器保持活动的 HTTP 连接时,这种方法是理想的。

以下关键功能详细说明了 SSE 流式传输在 A2A 协议中的实现和管理方式:

  • 服务器能力: A2A 服务器必须通过在其 Agent Card 中设置 capabilities.streaming: true 来表明其对流式传输的支持。

  • 启动流: 客户端使用 message/stream RPC 方法发送初始消息(例如,提示或命令)并同时订阅该任务的更新。

  • 服务器响应和连接: 如果订阅成功,服务器响应 HTTP 200 OK 状态和 Content-Type: text/event-stream。此 HTTP 连接保持打开状态,供服务器向客户端推送事件。

  • 事件结构和类型: 服务器通过此流发送事件。每个事件的 data 字段包含 JSON-RPC 2.0 响应对象,通常是 SendStreamingMessageResponseSendStreamingMessageResponseresult 字段包含:

    • Task:表示工作的当前状态。
    • TaskStatusUpdateEvent:传达任务生命周期状态的变化(例如,从 workinginput-requiredcompleted)。它还提供来自智能体的中间消息。
    • TaskArtifactUpdateEvent:传递任务生成的新工件或更新工件。这用于以块的形式流式传输大文件或数据结构,使用 appendlastChunk 等字段帮助重新组装。
  • 流终止: 服务器通过在 TaskStatusUpdateEvent 中设置 final: true 来表示一个周期的更新结束。这通常发生在任务达到终止状态时。之后,服务器通常会关闭 SSE 连接。

  • 重新订阅: 如果客户端的 SSE 连接在任务仍处于活动状态时过早中断,客户端可以使用 tasks/resubscribe RPC 方法尝试重新连接到流。

何时使用流式传输

SSE 流式传输最适合用于:

  • 实时监控长时间运行任务的进度。
  • 增量接收大型结果(工件)。
  • 交互式、对话式交换,其中即时反馈或部分响应有益。
  • 需要来自智能体的低延迟更新的应用程序。

协议规范参考

有关详细结构,请参阅协议规范:

断开连接场景的推送通知

对于非常长时间运行的任务(例如,持续数分钟、数小时甚至数天)或当客户端无法或不愿意保持持久连接时(如移动客户端或无服务器功能),A2A 支持使用推送通知进行异步更新。这允许 A2A 服务器在发生重要任务更新时主动通知客户端提供的 webhook。

以下关键功能详细说明了推送通知在 A2A 协议中的实现和管理方式:

  • 服务器能力: A2A 服务器必须通过在其 Agent Card 中设置 capabilities.pushNotifications: true 来表明其对此功能的支持。
  • 配置: 客户端向服务器提供 PushNotificationConfig。此配置通过以下方式提供:
    • 在初始 message/sendmessage/stream 请求中,或
    • 单独使用 tasks/pushNotificationConfig/set RPC 方法为现有任务提供。 PushNotificationConfig 包括 url(HTTPS webhook URL)、可选的 token(用于客户端验证)和可选的 authentication 详细信息(用于 A2A 服务器向 webhook 进行身份验证)。
  • 通知触发: A2A 服务器决定何时发送推送通知,通常在任务达到重要状态更改时(例如,终止状态、input-requiredauth-required)。
  • 通知有效载荷: A2A 协议没有严格定义 HTTP 正文有效载荷,但它应包含足够的信息供客户端识别任务 ID 并了解更新的一般性质(例如,新的 TaskState)。
  • 客户端操作: 在接收到推送通知(并成功验证其真实性)后,客户端通常使用 tasks/get RPC 方法和通知中的 taskId 来检索完整的、更新的 Task 对象,包括任何新工件。

何时使用推送通知

推送通知适用于:

  • 需要数分钟、数小时或数天才能完成的非常长时间运行的任务。
  • 无法或不愿意保持持久连接的客户端,如移动应用程序或无服务器功能。
  • 客户端只需要在重要状态更改时得到通知而不是持续更新的场景。

协议规范参考

有关详细结构,请参阅协议规范:

客户端推送通知服务

PushNotificationConfig.url 中指定的 url 指向客户端推送通知服务。此服务负责接收来自 A2A 服务器的 HTTP POST 通知。其职责包括对传入通知进行身份验证、验证其相关性,并将通知或其内容传递给适当的客户端应用程序逻辑或系统。

推送通知的安全考虑

由于推送通知的异步和服务器发起的出站性质,安全性至关重要。A2A 服务器(发送通知)和客户端的 webhook 接收器都有关键责任。

A2A 服务器安全(向客户端 webhook 发送通知时)

  • Webhook URL 验证: 服务器不应盲目信任并向客户端提供的任何 URL 发送 POST 请求。恶意客户端可能提供指向内部服务或无关第三方系统的 URL,导致服务器端请求伪造 (SSRF) 攻击或充当分布式拒绝服务 (DDoS) 放大器。
    • 缓解策略: 受信任域的白名单、所有权验证(例如,挑战 - 响应机制)和网络控制(例如,出口防火墙)。
  • 向客户端 webhook 进行身份验证: A2A 服务器必须根据 PushNotificationConfig.authentication 中指定的方案向客户端的 webhook URL 进行身份验证。常见方案包括 Bearer 令牌 (OAuth 2.0)、API 密钥、HMAC 签名或双向 TLS (mTLS)。

客户端 webhook 接收器安全(从 A2A 服务器接收通知时)

  • 验证 A2A 服务器身份: webhook 端点必须严格验证传入通知请求的真实性,以确保它们来自合法的 A2A 服务器而不是冒名顶替者。
    • 验证方法: 验证签名/令牌(例如,针对 A2A 服务器受信任公钥的 JWT 签名、HMAC 签名或 API 密钥验证)。此外,如果提供了 PushNotificationConfig.token,也要进行验证。
  • 防止重放攻击:
    • 时间戳: 通知应包含时间戳。webhook 应拒绝过旧的通知。
    • 随机数/唯一 ID: 对于关键通知,考虑使用唯一的一次性标识符(例如,JWT 的 jti 声明或事件 ID)以防止处理重复通知。
  • 安全密钥管理和轮换: 实施安全的密钥管理实践,包括定期密钥轮换,特别是对于加密密钥。JWKS (JSON Web Key Set) 等协议有助于非对称密钥的密钥轮换。

非对称密钥流示例 (JWT + JWKS)

  1. 客户端设置 PushNotificationConfig,指定 authentication.schemes: ["Bearer"] 和可能的 JWT 预期 issueraudience
  2. A2A 服务器在发送通知时:
    • 生成 JWT,使用其私钥进行签名。JWT 包括 iss(发行方)、aud(受众)、iat(签发时间)、exp(过期时间)、jti(JWT ID)和 taskId 等声明。
    • JWT 头部指示签名算法和密钥 ID (kid)。
    • A2A 服务器通过 JWKS 端点提供其公钥。
  3. 客户端 Webhook 在接收到通知时:
    • 从 Authorization 头部提取 JWT。
    • 检查 JWT 头部中的 kid(密钥 ID)。
    • 从 A2A 服务器的 JWKS 端点获取相应的公钥(建议缓存密钥)。
    • 使用公钥验证 JWT 签名。
    • 验证声明(issaudiatexpjti)。
    • 检查提供的 PushNotificationConfig.token

这种全面的、分层的推送通知安全方法有助于确保消息是真实、完整和及时的,保护发送 A2A 服务器和接收客户端 webhook 基础设施。