From 4c4c10c915aeb459afd66bff2ba091f3e19dd38e Mon Sep 17 00:00:00 2001 From: yinwm Date: Thu, 12 Feb 2026 19:42:24 +0800 Subject: [PATCH] feat: US-008 - Inject callback into async tools in AgentLoop - Update ToolRegistry.ExecuteWithContext to accept asyncCallback parameter - Check if tool implements AsyncTool and set callback if provided - Define asyncCallback in AgentLoop.runLLMIteration - Callback uses bus.PublishOutbound to send async results to user - Update Execute method to pass nil for backward compatibility - Add debug logging for async callback injection Co-Authored-By: Claude Opus 4.6 --- .ralph/prd.json | 2 +- .ralph/progress.txt | 27 +++++++++++++++++++++++++-- pkg/agent/loop.go | 20 +++++++++++++++++++- pkg/tools/registry.go | 16 ++++++++++++++-- 4 files changed, 59 insertions(+), 6 deletions(-) diff --git a/.ralph/prd.json b/.ralph/prd.json index b24725b..f0bf0d4 100644 --- a/.ralph/prd.json +++ b/.ralph/prd.json @@ -121,7 +121,7 @@ "Typecheck passes" ], "priority": 8, - "passes": false, + "passes": true, "notes": "" }, { diff --git a/.ralph/progress.txt b/.ralph/progress.txt index 04f25d9..ea466ed 100644 --- a/.ralph/progress.txt +++ b/.ralph/progress.txt @@ -6,7 +6,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改 ## Progress -### Completed (6/21) +### Completed (7/21) - US-001: Add ToolResult struct and helper functions - US-002: Modify Tool interface to return *ToolResult @@ -14,6 +14,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改 - US-005: Update AgentLoop tool result processing logic - US-006: Add AsyncCallback type and AsyncTool interface - US-007: Heartbeat async task execution support +- US-008: Inject callback into async tools in AgentLoop ### In Progress @@ -28,7 +29,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改 | US-005 | Update AgentLoop tool result processing logic | Completed | No test files in pkg/agent yet | | US-006 | Add AsyncCallback type and AsyncTool interface | Completed | | | US-007 | Heartbeat async task execution support | Completed | | -| US-008 | Inject callback into async tools in AgentLoop | Pending | | +| US-008 | Inject callback into async tools in AgentLoop | Completed | | | US-009 | State save atomicity - SetLastChannel | Pending | | | US-010 | Update RecordLastChannel to use atomic save | Pending | | | US-011 | Refactor MessageTool to use ToolResult | Completed | | @@ -126,4 +127,26 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改 - **Gotchas encountered:** 原始代码中的 `running()` 函数逻辑有问题(新创建的服务会被认为是"正在运行"的),但这不在本次修改范围内。 - **Useful context:** 心跳服务现在支持两种处理器:旧的 `onHeartbeat (返回 string, error)` 和新的 `onHeartbeatWithTools (返回 *ToolResult)`。新的处理器优先级更高。 +--- + +## [2026-02-12] - US-008 +- What was implemented: + - 修改 `ToolRegistry.ExecuteWithContext` 方法签名,增加 `asyncCallback AsyncCallback` 参数 + - 在 `ExecuteWithContext` 中检查工具是否实现 `AsyncTool` 接口 + - 如果工具实现 `AsyncTool` 且回调非空,调用 `SetCallback` 设置回调 + - 添加日志记录异步回调注入 + - 在 `AgentLoop.runLLMIteration` 中定义 `asyncCallback` 回调函数 + - 回调函数使用 `al.bus.PublishOutbound` 发送结果给用户 + - 更新 `Execute` 方法以适配新的签名(传递 nil 作为回调) + - 添加完整的日志记录异步工具结果发送 + +- Files changed: + - `pkg/tools/registry.go` + - `pkg/agent/loop.go` + +- **Learnings for future iterations:** + - **Patterns discovered:** 回调函数应该在工具执行循环中定义,这样可以捕获 `opts.Channel` 和 `opts.ChatID` 等上下文信息。 + - **Gotchas encountered:** 更新方法签名时需要同时更新所有调用点。我修改了 `ExecuteWithContext` 的签名,所以也更新了 `Execute` 方法的调用。 + - **Useful context:** 异步工具完成时会调用回调,回调将 `ForUser` 内容发送给用户。这允许长时间运行的操作(如子代理)在后台完成并通知用户,而不阻塞主循环。 + --- \ No newline at end of file diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 6eb199d..5030a1b 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -413,7 +413,25 @@ func (al *AgentLoop) runLLMIteration(ctx context.Context, messages []providers.M "iteration": iteration, }) - toolResult := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, opts.Channel, opts.ChatID) + // Create async callback for tools that implement AsyncTool + // This callback sends async completion results to the user + asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) { + // Send ForUser content to user if not silent + if !result.Silent && result.ForUser != "" { + al.bus.PublishOutbound(bus.OutboundMessage{ + Channel: opts.Channel, + ChatID: opts.ChatID, + Content: result.ForUser, + }) + logger.InfoCF("agent", "Async tool result sent to user", + map[string]interface{}{ + "tool": tc.Name, + "content_len": len(result.ForUser), + }) + } + } + + toolResult := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, opts.Channel, opts.ChatID, asyncCallback) lastToolResult = toolResult // Send ForUser content to user immediately if not Silent diff --git a/pkg/tools/registry.go b/pkg/tools/registry.go index 9e9c365..642dd6f 100644 --- a/pkg/tools/registry.go +++ b/pkg/tools/registry.go @@ -34,10 +34,13 @@ func (r *ToolRegistry) Get(name string) (Tool, bool) { } func (r *ToolRegistry) Execute(ctx context.Context, name string, args map[string]interface{}) *ToolResult { - return r.ExecuteWithContext(ctx, name, args, "", "") + return r.ExecuteWithContext(ctx, name, args, "", "", nil) } -func (r *ToolRegistry) ExecuteWithContext(ctx context.Context, name string, args map[string]interface{}, channel, chatID string) *ToolResult { +// ExecuteWithContext executes a tool with channel/chatID context and optional async callback. +// If the tool implements AsyncTool and a non-nil callback is provided, +// the callback will be set on the tool before execution. +func (r *ToolRegistry) ExecuteWithContext(ctx context.Context, name string, args map[string]interface{}, channel, chatID string, asyncCallback AsyncCallback) *ToolResult { logger.InfoCF("tool", "Tool execution started", map[string]interface{}{ "tool": name, @@ -58,6 +61,15 @@ func (r *ToolRegistry) ExecuteWithContext(ctx context.Context, name string, args contextualTool.SetContext(channel, chatID) } + // If tool implements AsyncTool and callback is provided, set callback + if asyncTool, ok := tool.(AsyncTool); ok && asyncCallback != nil { + asyncTool.SetCallback(asyncCallback) + logger.DebugCF("tool", "Async callback injected", + map[string]interface{}{ + "tool": name, + }) + } + start := time.Now() result := tool.Execute(ctx, args) duration := time.Since(start)