feat: US-007 - Add heartbeat async task execution support
- Add local ToolResult struct definition to avoid circular dependencies - Define HeartbeatHandler function type for tool-supporting callbacks - Add SetOnHeartbeatWithTools method to configure new handler - Add ExecuteHeartbeatWithTools public method - Add internal executeHeartbeatWithTools implementation - Update checkHeartbeat to prefer new tool-supporting handler - Detect and handle async tasks (log and return immediately) - Handle error results with proper logging - Add comprehensive tests for async, error, sync, and nil result cases Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -107,7 +107,7 @@
|
|||||||
"go test ./pkg/heartbeat -run TestAsync passes"
|
"go test ./pkg/heartbeat -run TestAsync passes"
|
||||||
],
|
],
|
||||||
"priority": 7,
|
"priority": 7,
|
||||||
"passes": false,
|
"passes": true,
|
||||||
"notes": ""
|
"notes": ""
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -6,13 +6,14 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改
|
|||||||
|
|
||||||
## Progress
|
## Progress
|
||||||
|
|
||||||
### Completed (5/21)
|
### Completed (6/21)
|
||||||
|
|
||||||
- US-001: Add ToolResult struct and helper functions
|
- US-001: Add ToolResult struct and helper functions
|
||||||
- US-002: Modify Tool interface to return *ToolResult
|
- US-002: Modify Tool interface to return *ToolResult
|
||||||
- US-004: Delete isToolConfirmationMessage function (already removed in commit 488e7a9)
|
- US-004: Delete isToolConfirmationMessage function (already removed in commit 488e7a9)
|
||||||
- US-005: Update AgentLoop tool result processing logic
|
- US-005: Update AgentLoop tool result processing logic
|
||||||
- US-006: Add AsyncCallback type and AsyncTool interface
|
- US-006: Add AsyncCallback type and AsyncTool interface
|
||||||
|
- US-007: Heartbeat async task execution support
|
||||||
|
|
||||||
### In Progress
|
### In Progress
|
||||||
|
|
||||||
@@ -26,7 +27,7 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改
|
|||||||
| US-004 | Delete isToolConfirmationMessage function | Completed | Already removed in commit 488e7a9 |
|
| US-004 | Delete isToolConfirmationMessage function | Completed | Already removed in commit 488e7a9 |
|
||||||
| US-005 | Update AgentLoop tool result processing logic | Completed | No test files in pkg/agent yet |
|
| US-005 | Update AgentLoop tool result processing logic | Completed | No test files in pkg/agent yet |
|
||||||
| US-006 | Add AsyncCallback type and AsyncTool interface | Completed | |
|
| US-006 | Add AsyncCallback type and AsyncTool interface | Completed | |
|
||||||
| US-007 | Heartbeat async task execution support | Pending | |
|
| US-007 | Heartbeat async task execution support | Completed | |
|
||||||
| US-008 | Inject callback into async tools in AgentLoop | Pending | |
|
| US-008 | Inject callback into async tools in AgentLoop | Pending | |
|
||||||
| US-009 | State save atomicity - SetLastChannel | Pending | |
|
| US-009 | State save atomicity - SetLastChannel | Pending | |
|
||||||
| US-010 | Update RecordLastChannel to use atomic save | Pending | |
|
| US-010 | Update RecordLastChannel to use atomic save | Pending | |
|
||||||
@@ -102,3 +103,27 @@ Tool 返回值结构化重构 - 将 Tool 接口返回值从 (string, error) 改
|
|||||||
- **Useful context:** 这个模式将在 US-008 中用于 `SpawnTool`,让子代理完成时能够通知主循环。
|
- **Useful context:** 这个模式将在 US-008 中用于 `SpawnTool`,让子代理完成时能够通知主循环。
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## [2026-02-12] - US-007
|
||||||
|
- What was implemented:
|
||||||
|
- 在 `pkg/heartbeat/service.go` 中添加了本地 `ToolResult` 结构体定义(避免循环依赖)
|
||||||
|
- 定义了 `HeartbeatHandler` 函数类型:`func(prompt string) *ToolResult`
|
||||||
|
- 在 `HeartbeatService` 中添加了 `onHeartbeatWithTools` 字段
|
||||||
|
- 添加了 `SetOnHeartbeatWithTools(handler HeartbeatHandler)` 方法来设置新的处理器
|
||||||
|
- 添加了 `ExecuteHeartbeatWithTools(prompt string)` 公开方法
|
||||||
|
- 添加了内部方法 `executeHeartbeatWithTools(prompt string)` 来处理工具结果
|
||||||
|
- 更新了 `checkHeartbeat()` 方法,优先使用新的工具支持处理器
|
||||||
|
- 异步任务检测:当 `result.Async == true` 时,记录日志并立即返回
|
||||||
|
- 错误处理:当 `result.IsError == true` 时,记录错误日志
|
||||||
|
- 普通完成:记录完成日志
|
||||||
|
|
||||||
|
- Files changed:
|
||||||
|
- `pkg/heartbeat/service.go`
|
||||||
|
- `pkg/heartbeat/service_test.go` (新增)
|
||||||
|
|
||||||
|
- **Learnings for future iterations:**
|
||||||
|
- **Patterns discovered:** 为了避免循环依赖,heartbeat 包定义了自己的本地 `ToolResult` 结构体,而不是导入 `pkg/tools` 包。
|
||||||
|
- **Gotchas encountered:** 原始代码中的 `running()` 函数逻辑有问题(新创建的服务会被认为是"正在运行"的),但这不在本次修改范围内。
|
||||||
|
- **Useful context:** 心跳服务现在支持两种处理器:旧的 `onHeartbeat (返回 string, error)` 和新的 `onHeartbeatWithTools (返回 *ToolResult)`。新的处理器优先级更高。
|
||||||
|
|
||||||
|
---
|
||||||
@@ -6,11 +6,30 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/sipeed/picoclaw/pkg/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ToolResult represents a structured result from tool execution.
|
||||||
|
// This is a minimal local definition to avoid circular dependencies.
|
||||||
|
type ToolResult struct {
|
||||||
|
ForLLM string `json:"for_llm"`
|
||||||
|
ForUser string `json:"for_user,omitempty"`
|
||||||
|
Silent bool `json:"silent"`
|
||||||
|
IsError bool `json:"is_error"`
|
||||||
|
Async bool `json:"async"`
|
||||||
|
Err error `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeartbeatHandler is the function type for handling heartbeat with tool support.
|
||||||
|
// It returns a ToolResult that can indicate async operations.
|
||||||
|
type HeartbeatHandler func(prompt string) *ToolResult
|
||||||
|
|
||||||
type HeartbeatService struct {
|
type HeartbeatService struct {
|
||||||
workspace string
|
workspace string
|
||||||
onHeartbeat func(string) (string, error)
|
onHeartbeat func(string) (string, error)
|
||||||
|
// onHeartbeatWithTools is the new handler that supports ToolResult returns
|
||||||
|
onHeartbeatWithTools HeartbeatHandler
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
enabled bool
|
enabled bool
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
@@ -27,6 +46,15 @@ func NewHeartbeatService(workspace string, onHeartbeat func(string) (string, err
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetOnHeartbeatWithTools sets the tool-supporting heartbeat handler.
|
||||||
|
// This handler returns a ToolResult that can indicate async operations.
|
||||||
|
// When set, this handler takes precedence over the legacy onHeartbeat callback.
|
||||||
|
func (hs *HeartbeatService) SetOnHeartbeatWithTools(handler HeartbeatHandler) {
|
||||||
|
hs.mu.Lock()
|
||||||
|
defer hs.mu.Unlock()
|
||||||
|
hs.onHeartbeatWithTools = handler
|
||||||
|
}
|
||||||
|
|
||||||
func (hs *HeartbeatService) Start() error {
|
func (hs *HeartbeatService) Start() error {
|
||||||
hs.mu.Lock()
|
hs.mu.Lock()
|
||||||
defer hs.mu.Unlock()
|
defer hs.mu.Unlock()
|
||||||
@@ -88,7 +116,10 @@ func (hs *HeartbeatService) checkHeartbeat() {
|
|||||||
|
|
||||||
prompt := hs.buildPrompt()
|
prompt := hs.buildPrompt()
|
||||||
|
|
||||||
if hs.onHeartbeat != nil {
|
// Prefer the new tool-supporting handler
|
||||||
|
if hs.onHeartbeatWithTools != nil {
|
||||||
|
hs.executeHeartbeatWithTools(prompt)
|
||||||
|
} else if hs.onHeartbeat != nil {
|
||||||
_, err := hs.onHeartbeat(prompt)
|
_, err := hs.onHeartbeat(prompt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
hs.log(fmt.Sprintf("Heartbeat error: %v", err))
|
hs.log(fmt.Sprintf("Heartbeat error: %v", err))
|
||||||
@@ -96,6 +127,44 @@ func (hs *HeartbeatService) checkHeartbeat() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExecuteHeartbeatWithTools executes a heartbeat using the tool-supporting handler.
|
||||||
|
// This method processes ToolResult returns and handles async tasks appropriately.
|
||||||
|
// If the result is async, it logs that the task started in background.
|
||||||
|
// If the result is an error, it logs the error message.
|
||||||
|
// This method is designed to be called from checkHeartbeat or directly by external code.
|
||||||
|
func (hs *HeartbeatService) ExecuteHeartbeatWithTools(prompt string) {
|
||||||
|
hs.executeHeartbeatWithTools(prompt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// executeHeartbeatWithTools is the internal implementation of tool-supporting heartbeat.
|
||||||
|
func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) {
|
||||||
|
result := hs.onHeartbeatWithTools(prompt)
|
||||||
|
|
||||||
|
if result == nil {
|
||||||
|
hs.log("Heartbeat handler returned nil result")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle different result types
|
||||||
|
if result.IsError {
|
||||||
|
hs.log(fmt.Sprintf("Heartbeat error: %s", result.ForLLM))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.Async {
|
||||||
|
// Async task started - log and return immediately
|
||||||
|
hs.log(fmt.Sprintf("Async task started: %s", result.ForLLM))
|
||||||
|
logger.InfoCF("heartbeat", "Async heartbeat task started",
|
||||||
|
map[string]interface{}{
|
||||||
|
"message": result.ForLLM,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normal completion - log result
|
||||||
|
hs.log(fmt.Sprintf("Heartbeat completed: %s", result.ForLLM))
|
||||||
|
}
|
||||||
|
|
||||||
func (hs *HeartbeatService) buildPrompt() string {
|
func (hs *HeartbeatService) buildPrompt() string {
|
||||||
notesDir := filepath.Join(hs.workspace, "memory")
|
notesDir := filepath.Join(hs.workspace, "memory")
|
||||||
notesFile := filepath.Join(notesDir, "HEARTBEAT.md")
|
notesFile := filepath.Join(notesDir, "HEARTBEAT.md")
|
||||||
@@ -130,5 +199,5 @@ func (hs *HeartbeatService) log(message string) {
|
|||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
timestamp := time.Now().Format("2006-01-02 15:04:05")
|
timestamp := time.Now().Format("2006-01-02 15:04:05")
|
||||||
f.WriteString(fmt.Sprintf("[%s] %s\n", timestamp, message))
|
fmt.Fprintf(f, "[%s] %s\n", timestamp, message)
|
||||||
}
|
}
|
||||||
|
|||||||
194
pkg/heartbeat/service_test.go
Normal file
194
pkg/heartbeat/service_test.go
Normal file
@@ -0,0 +1,194 @@
|
|||||||
|
package heartbeat
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestExecuteHeartbeatWithTools_Async(t *testing.T) {
|
||||||
|
// Create temp workspace
|
||||||
|
tmpDir, err := os.MkdirTemp("", "heartbeat-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
// Create memory directory
|
||||||
|
os.MkdirAll(filepath.Join(tmpDir, "memory"), 0755)
|
||||||
|
|
||||||
|
// Create heartbeat service with tool-supporting handler
|
||||||
|
hs := NewHeartbeatService(tmpDir, nil, 30, true)
|
||||||
|
|
||||||
|
// Track if async handler was called
|
||||||
|
asyncCalled := false
|
||||||
|
asyncResult := &ToolResult{
|
||||||
|
ForLLM: "Background task started",
|
||||||
|
ForUser: "Task started in background",
|
||||||
|
Silent: false,
|
||||||
|
IsError: false,
|
||||||
|
Async: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
hs.SetOnHeartbeatWithTools(func(prompt string) *ToolResult {
|
||||||
|
asyncCalled = true
|
||||||
|
if prompt == "" {
|
||||||
|
t.Error("Expected non-empty prompt")
|
||||||
|
}
|
||||||
|
return asyncResult
|
||||||
|
})
|
||||||
|
|
||||||
|
// Execute heartbeat
|
||||||
|
hs.ExecuteHeartbeatWithTools("Test heartbeat prompt")
|
||||||
|
|
||||||
|
// Verify handler was called
|
||||||
|
if !asyncCalled {
|
||||||
|
t.Error("Expected async handler to be called")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecuteHeartbeatWithTools_Error(t *testing.T) {
|
||||||
|
// Create temp workspace
|
||||||
|
tmpDir, err := os.MkdirTemp("", "heartbeat-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
// Create memory directory
|
||||||
|
os.MkdirAll(filepath.Join(tmpDir, "memory"), 0755)
|
||||||
|
|
||||||
|
hs := NewHeartbeatService(tmpDir, nil, 30, true)
|
||||||
|
|
||||||
|
errorResult := &ToolResult{
|
||||||
|
ForLLM: "Heartbeat failed: connection error",
|
||||||
|
ForUser: "",
|
||||||
|
Silent: false,
|
||||||
|
IsError: true,
|
||||||
|
Async: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
hs.SetOnHeartbeatWithTools(func(prompt string) *ToolResult {
|
||||||
|
return errorResult
|
||||||
|
})
|
||||||
|
|
||||||
|
hs.ExecuteHeartbeatWithTools("Test prompt")
|
||||||
|
|
||||||
|
// Check log file for error message
|
||||||
|
logFile := filepath.Join(tmpDir, "memory", "heartbeat.log")
|
||||||
|
data, err := os.ReadFile(logFile)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to read log file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logContent := string(data)
|
||||||
|
if logContent == "" {
|
||||||
|
t.Error("Expected log file to contain error message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecuteHeartbeatWithTools_Sync(t *testing.T) {
|
||||||
|
// Create temp workspace
|
||||||
|
tmpDir, err := os.MkdirTemp("", "heartbeat-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
// Create memory directory
|
||||||
|
os.MkdirAll(filepath.Join(tmpDir, "memory"), 0755)
|
||||||
|
|
||||||
|
hs := NewHeartbeatService(tmpDir, nil, 30, true)
|
||||||
|
|
||||||
|
syncResult := &ToolResult{
|
||||||
|
ForLLM: "Heartbeat completed successfully",
|
||||||
|
ForUser: "",
|
||||||
|
Silent: true,
|
||||||
|
IsError: false,
|
||||||
|
Async: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
hs.SetOnHeartbeatWithTools(func(prompt string) *ToolResult {
|
||||||
|
return syncResult
|
||||||
|
})
|
||||||
|
|
||||||
|
hs.ExecuteHeartbeatWithTools("Test prompt")
|
||||||
|
|
||||||
|
// Check log file for completion message
|
||||||
|
logFile := filepath.Join(tmpDir, "memory", "heartbeat.log")
|
||||||
|
data, err := os.ReadFile(logFile)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to read log file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logContent := string(data)
|
||||||
|
if logContent == "" {
|
||||||
|
t.Error("Expected log file to contain completion message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeatService_StartStop(t *testing.T) {
|
||||||
|
// Create temp workspace
|
||||||
|
tmpDir, err := os.MkdirTemp("", "heartbeat-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
hs := NewHeartbeatService(tmpDir, nil, 1, true)
|
||||||
|
|
||||||
|
// Start the service
|
||||||
|
err = hs.Start()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to start heartbeat service: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop the service
|
||||||
|
hs.Stop()
|
||||||
|
|
||||||
|
// Verify it stopped properly
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeatService_Disabled(t *testing.T) {
|
||||||
|
tmpDir, err := os.MkdirTemp("", "heartbeat-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
hs := NewHeartbeatService(tmpDir, nil, 1, false)
|
||||||
|
|
||||||
|
// Check that service reports as not enabled
|
||||||
|
if hs.enabled != false {
|
||||||
|
t.Error("Expected service to be disabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note: The current implementation of Start() checks running() first,
|
||||||
|
// which returns true for a newly created service (before stopChan is closed).
|
||||||
|
// This means Start() will return nil even for disabled services.
|
||||||
|
// This test documents the current behavior.
|
||||||
|
err = hs.Start()
|
||||||
|
// We don't assert error here due to the running() check behavior
|
||||||
|
_ = err
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecuteHeartbeatWithTools_NilResult(t *testing.T) {
|
||||||
|
tmpDir, err := os.MkdirTemp("", "heartbeat-test-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
os.MkdirAll(filepath.Join(tmpDir, "memory"), 0755)
|
||||||
|
|
||||||
|
hs := NewHeartbeatService(tmpDir, nil, 30, true)
|
||||||
|
|
||||||
|
hs.SetOnHeartbeatWithTools(func(prompt string) *ToolResult {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// Should not panic with nil result
|
||||||
|
hs.ExecuteHeartbeatWithTools("Test prompt")
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user