feat: US-008 - Inject callback into async tools in AgentLoop

- Update ToolRegistry.ExecuteWithContext to accept asyncCallback parameter
- Check if tool implements AsyncTool and set callback if provided
- Define asyncCallback in AgentLoop.runLLMIteration
- Callback uses bus.PublishOutbound to send async results to user
- Update Execute method to pass nil for backward compatibility
- Add debug logging for async callback injection

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
yinwm
2026-02-12 19:42:24 +08:00
parent 7bcd8b284f
commit 4c4c10c915
4 changed files with 59 additions and 6 deletions

View File

@@ -121,7 +121,7 @@
"Typecheck passes" "Typecheck passes"
], ],
"priority": 8, "priority": 8,
"passes": false, "passes": true,
"notes": "" "notes": ""
}, },
{ {

View File

@@ -6,7 +6,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改
## Progress ## Progress
### Completed (6/21) ### Completed (7/21)
- US-001: Add ToolResult struct and helper functions - US-001: Add ToolResult struct and helper functions
- US-002: Modify Tool interface to return *ToolResult - US-002: Modify Tool interface to return *ToolResult
@@ -14,6 +14,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改
- US-005: Update AgentLoop tool result processing logic - US-005: Update AgentLoop tool result processing logic
- US-006: Add AsyncCallback type and AsyncTool interface - US-006: Add AsyncCallback type and AsyncTool interface
- US-007: Heartbeat async task execution support - US-007: Heartbeat async task execution support
- US-008: Inject callback into async tools in AgentLoop
### In Progress ### In Progress
@@ -28,7 +29,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改
| US-005 | Update AgentLoop tool result processing logic | Completed | No test files in pkg/agent yet | | US-005 | Update AgentLoop tool result processing logic | Completed | No test files in pkg/agent yet |
| US-006 | Add AsyncCallback type and AsyncTool interface | Completed | | | US-006 | Add AsyncCallback type and AsyncTool interface | Completed | |
| US-007 | Heartbeat async task execution support | Completed | | | US-007 | Heartbeat async task execution support | Completed | |
| US-008 | Inject callback into async tools in AgentLoop | Pending | | | US-008 | Inject callback into async tools in AgentLoop | Completed | |
| US-009 | State save atomicity - SetLastChannel | Pending | | | US-009 | State save atomicity - SetLastChannel | Pending | |
| US-010 | Update RecordLastChannel to use atomic save | Pending | | | US-010 | Update RecordLastChannel to use atomic save | Pending | |
| US-011 | Refactor MessageTool to use ToolResult | Completed | | | US-011 | Refactor MessageTool to use ToolResult | Completed | |
@@ -127,3 +128,25 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改
- **Useful context:** 心跳服务现在支持两种处理器:旧的 `onHeartbeat (返回 string, error)` 和新的 `onHeartbeatWithTools (返回 *ToolResult)`。新的处理器优先级更高。 - **Useful context:** 心跳服务现在支持两种处理器:旧的 `onHeartbeat (返回 string, error)` 和新的 `onHeartbeatWithTools (返回 *ToolResult)`。新的处理器优先级更高。
--- ---
## [2026-02-12] - US-008
- What was implemented:
- 修改 `ToolRegistry.ExecuteWithContext` 方法签名,增加 `asyncCallback AsyncCallback` 参数
- 在 `ExecuteWithContext` 中检查工具是否实现 `AsyncTool` 接口
- 如果工具实现 `AsyncTool` 且回调非空,调用 `SetCallback` 设置回调
- 添加日志记录异步回调注入
- 在 `AgentLoop.runLLMIteration` 中定义 `asyncCallback` 回调函数
- 回调函数使用 `al.bus.PublishOutbound` 发送结果给用户
- 更新 `Execute` 方法以适配新的签名(传递 nil 作为回调)
- 添加完整的日志记录异步工具结果发送
- Files changed:
- `pkg/tools/registry.go`
- `pkg/agent/loop.go`
- **Learnings for future iterations:**
- **Patterns discovered:** 回调函数应该在工具执行循环中定义,这样可以捕获 `opts.Channel` 和 `opts.ChatID` 等上下文信息。
- **Gotchas encountered:** 更新方法签名时需要同时更新所有调用点。我修改了 `ExecuteWithContext` 的签名,所以也更新了 `Execute` 方法的调用。
- **Useful context:** 异步工具完成时会调用回调,回调将 `ForUser` 内容发送给用户。这允许长时间运行的操作(如子代理)在后台完成并通知用户,而不阻塞主循环。
---

View File

@@ -413,7 +413,25 @@ func (al *AgentLoop) runLLMIteration(ctx context.Context, messages []providers.M
"iteration": iteration, "iteration": iteration,
}) })
toolResult := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, opts.Channel, opts.ChatID) // Create async callback for tools that implement AsyncTool
// This callback sends async completion results to the user
asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) {
// Send ForUser content to user if not silent
if !result.Silent && result.ForUser != "" {
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: result.ForUser,
})
logger.InfoCF("agent", "Async tool result sent to user",
map[string]interface{}{
"tool": tc.Name,
"content_len": len(result.ForUser),
})
}
}
toolResult := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, opts.Channel, opts.ChatID, asyncCallback)
lastToolResult = toolResult lastToolResult = toolResult
// Send ForUser content to user immediately if not Silent // Send ForUser content to user immediately if not Silent

View File

@@ -34,10 +34,13 @@ func (r *ToolRegistry) Get(name string) (Tool, bool) {
} }
func (r *ToolRegistry) Execute(ctx context.Context, name string, args map[string]interface{}) *ToolResult { func (r *ToolRegistry) Execute(ctx context.Context, name string, args map[string]interface{}) *ToolResult {
return r.ExecuteWithContext(ctx, name, args, "", "") return r.ExecuteWithContext(ctx, name, args, "", "", nil)
} }
func (r *ToolRegistry) ExecuteWithContext(ctx context.Context, name string, args map[string]interface{}, channel, chatID string) *ToolResult { // ExecuteWithContext executes a tool with channel/chatID context and optional async callback.
// If the tool implements AsyncTool and a non-nil callback is provided,
// the callback will be set on the tool before execution.
func (r *ToolRegistry) ExecuteWithContext(ctx context.Context, name string, args map[string]interface{}, channel, chatID string, asyncCallback AsyncCallback) *ToolResult {
logger.InfoCF("tool", "Tool execution started", logger.InfoCF("tool", "Tool execution started",
map[string]interface{}{ map[string]interface{}{
"tool": name, "tool": name,
@@ -58,6 +61,15 @@ func (r *ToolRegistry) ExecuteWithContext(ctx context.Context, name string, args
contextualTool.SetContext(channel, chatID) contextualTool.SetContext(channel, chatID)
} }
// If tool implements AsyncTool and callback is provided, set callback
if asyncTool, ok := tool.(AsyncTool); ok && asyncCallback != nil {
asyncTool.SetCallback(asyncCallback)
logger.DebugCF("tool", "Async callback injected",
map[string]interface{}{
"tool": name,
})
}
start := time.Now() start := time.Now()
result := tool.Execute(ctx, args) result := tool.Execute(ctx, args)
duration := time.Since(start) duration := time.Since(start)