From 8fbbb67f70e49ae8c759a1b02b905eeaec0870aa Mon Sep 17 00:00:00 2001 From: yinwm Date: Fri, 13 Feb 2026 09:51:51 +0800 Subject: [PATCH] refactor(heartbeat): simplify service with single handler and direct bus usage - Remove redundant ChannelSender interface, use *bus.MessageBus directly - Consolidate two handlers (onHeartbeat, onHeartbeatWithTools) into one - Move HEARTBEAT.md and heartbeat.log to workspace root - Simplify NewHeartbeatService signature (remove handler param) - Add SetBus and SetHandler methods for dependency injection Co-Authored-By: Claude Opus 4.6 --- cmd/picoclaw/main.go | 14 ++- pkg/heartbeat/service.go | 151 +++++++++----------------------- pkg/heartbeat/service_test.go | 160 ++++++++++++++++------------------ 3 files changed, 127 insertions(+), 198 deletions(-) diff --git a/cmd/picoclaw/main.go b/cmd/picoclaw/main.go index d83597f..552b370 100644 --- a/cmd/picoclaw/main.go +++ b/cmd/picoclaw/main.go @@ -654,10 +654,20 @@ func gatewayCmd() { heartbeatService := heartbeat.NewHeartbeatService( cfg.WorkspacePath(), - nil, - 30*60, + 30, cfg.Heartbeat.Enabled, ) + heartbeatService.SetBus(msgBus) + heartbeatService.SetHandler(func(prompt string) *tools.ToolResult { + response, err := agentLoop.ProcessDirect(context.Background(), prompt, "heartbeat") + if err != nil { + return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err)) + } + if response == "HEARTBEAT_OK" { + return tools.SilentResult("Heartbeat OK") + } + return tools.UserResult(response) + }) channelManager, err := channels.NewManager(cfg, msgBus) if err != nil { diff --git a/pkg/heartbeat/service.go b/pkg/heartbeat/service.go index 912b938..ce8605b 100644 --- a/pkg/heartbeat/service.go +++ b/pkg/heartbeat/service.go @@ -7,7 +7,6 @@ package heartbeat import ( - "context" "fmt" "os" "path/filepath" @@ -15,6 +14,7 @@ import ( "sync" "time" + "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/state" "github.com/sipeed/picoclaw/pkg/tools" @@ -23,35 +23,27 @@ import ( const ( minIntervalMinutes = 5 defaultIntervalMinutes = 30 - heartbeatOK = "HEARTBEAT_OK" ) -// HeartbeatHandler is the function type for handling heartbeat with tool support. +// HeartbeatHandler is the function type for handling heartbeat. // It returns a ToolResult that can indicate async operations. type HeartbeatHandler func(prompt string) *tools.ToolResult -// ChannelSender defines the interface for sending messages to channels. -// This is used to send heartbeat results back to the user. -type ChannelSender interface { - SendToChannel(ctx context.Context, channelName, chatID, content string) error -} - // HeartbeatService manages periodic heartbeat checks type HeartbeatService struct { - workspace string - channelSender ChannelSender - stateManager *state.Manager - onHeartbeat func(string) (string, error) - onHeartbeatWithTools HeartbeatHandler - interval time.Duration - enabled bool - mu sync.RWMutex - started bool - stopChan chan struct{} + workspace string + bus *bus.MessageBus + state *state.Manager + handler HeartbeatHandler + interval time.Duration + enabled bool + mu sync.RWMutex + started bool + stopChan chan struct{} } // NewHeartbeatService creates a new heartbeat service -func NewHeartbeatService(workspace string, onHeartbeat func(string) (string, error), intervalMinutes int, enabled bool) *HeartbeatService { +func NewHeartbeatService(workspace string, intervalMinutes int, enabled bool) *HeartbeatService { // Apply minimum interval if intervalMinutes < minIntervalMinutes && intervalMinutes != 0 { intervalMinutes = minIntervalMinutes @@ -62,29 +54,26 @@ func NewHeartbeatService(workspace string, onHeartbeat func(string) (string, err } return &HeartbeatService{ - workspace: workspace, - onHeartbeat: onHeartbeat, - interval: time.Duration(intervalMinutes) * time.Minute, - enabled: enabled, - stateManager: state.NewManager(workspace), - stopChan: make(chan struct{}), + workspace: workspace, + interval: time.Duration(intervalMinutes) * time.Minute, + enabled: enabled, + state: state.NewManager(workspace), + stopChan: make(chan struct{}), } } -// SetChannelSender sets the channel sender for delivering heartbeat results. -func (hs *HeartbeatService) SetChannelSender(sender ChannelSender) { +// SetBus sets the message bus for delivering heartbeat results. +func (hs *HeartbeatService) SetBus(msgBus *bus.MessageBus) { hs.mu.Lock() defer hs.mu.Unlock() - hs.channelSender = sender + hs.bus = msgBus } -// SetOnHeartbeatWithTools sets the tool-supporting heartbeat handler. -// This handler returns a ToolResult that can indicate async operations. -// When set, this handler takes precedence over the legacy onHeartbeat callback. -func (hs *HeartbeatService) SetOnHeartbeatWithTools(handler HeartbeatHandler) { +// SetHandler sets the heartbeat handler. +func (hs *HeartbeatService) SetHandler(handler HeartbeatHandler) { hs.mu.Lock() defer hs.mu.Unlock() - hs.onHeartbeatWithTools = handler + hs.handler = handler } // Start begins the heartbeat service @@ -159,8 +148,7 @@ func (hs *HeartbeatService) runLoop() { func (hs *HeartbeatService) executeHeartbeat() { hs.mu.RLock() enabled := hs.enabled && hs.started - handler := hs.onHeartbeat - handlerWithTools := hs.onHeartbeatWithTools + handler := hs.handler hs.mu.RUnlock() if !enabled { @@ -175,42 +163,8 @@ func (hs *HeartbeatService) executeHeartbeat() { return } - // Prefer the new tool-supporting handler - if handlerWithTools != nil { - hs.executeHeartbeatWithTools(prompt) - } else if handler != nil { - response, err := handler(prompt) - if err != nil { - hs.logError("Heartbeat processing error: %v", err) - return - } - - // Check for HEARTBEAT_OK - completely silent response - if isHeartbeatOK(response) { - hs.logInfo("Heartbeat OK - silent") - return - } - - // Non-OK response - send to last channel - hs.sendResponse(response) - } -} - -// ExecuteHeartbeatWithTools executes a heartbeat using the tool-supporting handler. -// This method processes ToolResult returns and handles async tasks appropriately. -func (hs *HeartbeatService) ExecuteHeartbeatWithTools(prompt string) { - hs.executeHeartbeatWithTools(prompt) -} - -// executeHeartbeatWithTools is the internal implementation of tool-supporting heartbeat. -func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) { - // Check if handler is configured (thread-safe read) - hs.mu.RLock() - handler := hs.onHeartbeatWithTools - hs.mu.RUnlock() - if handler == nil { - hs.logError("onHeartbeatWithTools handler not configured") + hs.logError("Heartbeat handler not configured") return } @@ -228,7 +182,6 @@ func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) { } if result.Async { - // Async task started - log and return immediately hs.logInfo("Async task started: %s", result.ForLLM) logger.InfoCF("heartbeat", "Async heartbeat task started", map[string]interface{}{ @@ -237,13 +190,13 @@ func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) { return } - // Check if silent (HEARTBEAT_OK equivalent) + // Check if silent if result.Silent { hs.logInfo("Heartbeat OK - silent") return } - // Normal completion - send result to user if available + // Send result to user if result.ForUser != "" { hs.sendResponse(result.ForUser) } else if result.ForLLM != "" { @@ -255,14 +208,11 @@ func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) { // buildPrompt builds the heartbeat prompt from HEARTBEAT.md func (hs *HeartbeatService) buildPrompt() string { - // Use memory directory for HEARTBEAT.md - notesDir := filepath.Join(hs.workspace, "memory") - heartbeatPath := filepath.Join(notesDir, "HEARTBEAT.md") + heartbeatPath := filepath.Join(hs.workspace, "HEARTBEAT.md") data, err := os.ReadFile(heartbeatPath) if err != nil { if os.IsNotExist(err) { - // Create default HEARTBEAT.md template hs.createDefaultHeartbeatTemplate() return "" } @@ -275,9 +225,8 @@ func (hs *HeartbeatService) buildPrompt() string { return "" } - // Build prompt with system instructions now := time.Now().Format("2006-01-02 15:04:05") - prompt := fmt.Sprintf(`# Heartbeat Check + return fmt.Sprintf(`# Heartbeat Check Current time: %s @@ -287,20 +236,11 @@ If there is nothing that requires attention, respond ONLY with: HEARTBEAT_OK %s `, now, content) - - return prompt } // createDefaultHeartbeatTemplate creates the default HEARTBEAT.md file func (hs *HeartbeatService) createDefaultHeartbeatTemplate() { - notesDir := filepath.Join(hs.workspace, "memory") - heartbeatPath := filepath.Join(notesDir, "HEARTBEAT.md") - - // Ensure memory directory exists - if err := os.MkdirAll(notesDir, 0755); err != nil { - hs.logError("Failed to create memory directory: %v", err) - return - } + heartbeatPath := filepath.Join(hs.workspace, "HEARTBEAT.md") defaultContent := `# Heartbeat Check List @@ -332,23 +272,22 @@ Add your heartbeat tasks below this line: // sendResponse sends the heartbeat response to the last channel func (hs *HeartbeatService) sendResponse(response string) { hs.mu.RLock() - sender := hs.channelSender + msgBus := hs.bus hs.mu.RUnlock() - if sender == nil { - hs.logInfo("No channel sender configured, heartbeat result not sent") + if msgBus == nil { + hs.logInfo("No message bus configured, heartbeat result not sent") return } // Get last channel from state - lastChannel := hs.stateManager.GetLastChannel() + lastChannel := hs.state.GetLastChannel() if lastChannel == "" { hs.logInfo("No last channel recorded, heartbeat result not sent") return } // Parse channel format: "platform:user_id" (e.g., "telegram:123456") - // Use SplitN to handle user IDs that may contain special characters parts := strings.SplitN(lastChannel, ":", 2) if len(parts) != 2 || parts[0] == "" || parts[1] == "" { hs.logError("Invalid last channel format: %s", lastChannel) @@ -356,21 +295,15 @@ func (hs *HeartbeatService) sendResponse(response string) { } platform, userID := parts[0], parts[1] - // Send to channel - ctx := context.Background() - if err := sender.SendToChannel(ctx, platform, userID, response); err != nil { - hs.logError("Error sending to channel %s: %v", platform, err) - return - } + msgBus.PublishOutbound(bus.OutboundMessage{ + Channel: platform, + ChatID: userID, + Content: response, + }) hs.logInfo("Heartbeat result sent to %s", platform) } -// isHeartbeatOK checks if the response is HEARTBEAT_OK -func isHeartbeatOK(response string) bool { - return response == heartbeatOK -} - // logInfo logs an informational message to the heartbeat log func (hs *HeartbeatService) logInfo(format string, args ...any) { hs.log("INFO", format, args...) @@ -383,11 +316,7 @@ func (hs *HeartbeatService) logError(format string, args ...any) { // log writes a message to the heartbeat log file func (hs *HeartbeatService) log(level, format string, args ...any) { - // Ensure memory directory exists - logDir := filepath.Join(hs.workspace, "memory") - os.MkdirAll(logDir, 0755) - - logFile := filepath.Join(logDir, "heartbeat.log") + logFile := filepath.Join(hs.workspace, "heartbeat.log") f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return diff --git a/pkg/heartbeat/service_test.go b/pkg/heartbeat/service_test.go index dfd33f0..3e55d94 100644 --- a/pkg/heartbeat/service_test.go +++ b/pkg/heartbeat/service_test.go @@ -9,21 +9,16 @@ import ( "github.com/sipeed/picoclaw/pkg/tools" ) -func TestExecuteHeartbeatWithTools_Async(t *testing.T) { - // Create temp workspace +func TestExecuteHeartbeat_Async(t *testing.T) { tmpDir, err := os.MkdirTemp("", "heartbeat-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) - // Create memory directory - os.MkdirAll(filepath.Join(tmpDir, "memory"), 0755) + hs := NewHeartbeatService(tmpDir, 30, true) + hs.started = true // Enable for testing - // Create heartbeat service with tool-supporting handler - hs := NewHeartbeatService(tmpDir, nil, 30, true) - - // Track if async handler was called asyncCalled := false asyncResult := &tools.ToolResult{ ForLLM: "Background task started", @@ -33,7 +28,7 @@ func TestExecuteHeartbeatWithTools_Async(t *testing.T) { Async: true, } - hs.SetOnHeartbeatWithTools(func(prompt string) *tools.ToolResult { + hs.SetHandler(func(prompt string) *tools.ToolResult { asyncCalled = true if prompt == "" { t.Error("Expected non-empty prompt") @@ -41,44 +36,44 @@ func TestExecuteHeartbeatWithTools_Async(t *testing.T) { return asyncResult }) - // Execute heartbeat - hs.ExecuteHeartbeatWithTools("Test heartbeat prompt") + // Create HEARTBEAT.md + os.WriteFile(filepath.Join(tmpDir, "HEARTBEAT.md"), []byte("Test task"), 0644) + + // Execute heartbeat directly (internal method for testing) + hs.executeHeartbeat() - // Verify handler was called if !asyncCalled { - t.Error("Expected async handler to be called") + t.Error("Expected handler to be called") } } -func TestExecuteHeartbeatWithTools_Error(t *testing.T) { - // Create temp workspace +func TestExecuteHeartbeat_Error(t *testing.T) { tmpDir, err := os.MkdirTemp("", "heartbeat-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) - // Create memory directory - os.MkdirAll(filepath.Join(tmpDir, "memory"), 0755) + hs := NewHeartbeatService(tmpDir, 30, true) + hs.started = true // Enable for testing - hs := NewHeartbeatService(tmpDir, nil, 30, true) - - errorResult := &tools.ToolResult{ - ForLLM: "Heartbeat failed: connection error", - ForUser: "", - Silent: false, - IsError: true, - Async: false, - } - - hs.SetOnHeartbeatWithTools(func(prompt string) *tools.ToolResult { - return errorResult + hs.SetHandler(func(prompt string) *tools.ToolResult { + return &tools.ToolResult{ + ForLLM: "Heartbeat failed: connection error", + ForUser: "", + Silent: false, + IsError: true, + Async: false, + } }) - hs.ExecuteHeartbeatWithTools("Test prompt") + // Create HEARTBEAT.md + os.WriteFile(filepath.Join(tmpDir, "HEARTBEAT.md"), []byte("Test task"), 0644) + + hs.executeHeartbeat() // Check log file for error message - logFile := filepath.Join(tmpDir, "memory", "heartbeat.log") + logFile := filepath.Join(tmpDir, "heartbeat.log") data, err := os.ReadFile(logFile) if err != nil { t.Fatalf("Failed to read log file: %v", err) @@ -90,35 +85,33 @@ func TestExecuteHeartbeatWithTools_Error(t *testing.T) { } } -func TestExecuteHeartbeatWithTools_Sync(t *testing.T) { - // Create temp workspace +func TestExecuteHeartbeat_Silent(t *testing.T) { tmpDir, err := os.MkdirTemp("", "heartbeat-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) - // Create memory directory - os.MkdirAll(filepath.Join(tmpDir, "memory"), 0755) + hs := NewHeartbeatService(tmpDir, 30, true) + hs.started = true // Enable for testing - hs := NewHeartbeatService(tmpDir, nil, 30, true) - - syncResult := &tools.ToolResult{ - ForLLM: "Heartbeat completed successfully", - ForUser: "", - Silent: true, - IsError: false, - Async: false, - } - - hs.SetOnHeartbeatWithTools(func(prompt string) *tools.ToolResult { - return syncResult + hs.SetHandler(func(prompt string) *tools.ToolResult { + return &tools.ToolResult{ + ForLLM: "Heartbeat completed successfully", + ForUser: "", + Silent: true, + IsError: false, + Async: false, + } }) - hs.ExecuteHeartbeatWithTools("Test prompt") + // Create HEARTBEAT.md + os.WriteFile(filepath.Join(tmpDir, "HEARTBEAT.md"), []byte("Test task"), 0644) + + hs.executeHeartbeat() // Check log file for completion message - logFile := filepath.Join(tmpDir, "memory", "heartbeat.log") + logFile := filepath.Join(tmpDir, "heartbeat.log") data, err := os.ReadFile(logFile) if err != nil { t.Fatalf("Failed to read log file: %v", err) @@ -131,25 +124,21 @@ func TestExecuteHeartbeatWithTools_Sync(t *testing.T) { } func TestHeartbeatService_StartStop(t *testing.T) { - // Create temp workspace tmpDir, err := os.MkdirTemp("", "heartbeat-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) - hs := NewHeartbeatService(tmpDir, nil, 1, true) + hs := NewHeartbeatService(tmpDir, 1, true) - // Start the service err = hs.Start() if err != nil { t.Fatalf("Failed to start heartbeat service: %v", err) } - // Stop the service hs.Stop() - // Verify it stopped properly time.Sleep(100 * time.Millisecond) } @@ -160,72 +149,73 @@ func TestHeartbeatService_Disabled(t *testing.T) { } defer os.RemoveAll(tmpDir) - hs := NewHeartbeatService(tmpDir, nil, 1, false) + hs := NewHeartbeatService(tmpDir, 1, false) - // Check that service reports as not enabled if hs.enabled != false { t.Error("Expected service to be disabled") } - // Note: The current implementation of Start() checks running() first, - // which returns true for a newly created service (before stopChan is closed). - // This means Start() will return nil even for disabled services. - // This test documents the current behavior. err = hs.Start() - // We don't assert error here due to the running() check behavior - _ = err + _ = err // Disabled service returns nil } -func TestExecuteHeartbeatWithTools_NilResult(t *testing.T) { +func TestExecuteHeartbeat_NilResult(t *testing.T) { tmpDir, err := os.MkdirTemp("", "heartbeat-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) - os.MkdirAll(filepath.Join(tmpDir, "memory"), 0755) + hs := NewHeartbeatService(tmpDir, 30, true) + hs.started = true // Enable for testing - hs := NewHeartbeatService(tmpDir, nil, 30, true) - - hs.SetOnHeartbeatWithTools(func(prompt string) *tools.ToolResult { + hs.SetHandler(func(prompt string) *tools.ToolResult { return nil }) + // Create HEARTBEAT.md + os.WriteFile(filepath.Join(tmpDir, "HEARTBEAT.md"), []byte("Test task"), 0644) + // Should not panic with nil result - hs.ExecuteHeartbeatWithTools("Test prompt") + hs.executeHeartbeat() } -// TestLogPath verifies heartbeat log is written to memory directory +// TestLogPath verifies heartbeat log is written to workspace directory func TestLogPath(t *testing.T) { - // Create temp workspace tmpDir, err := os.MkdirTemp("", "heartbeat-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) - // Create memory directory - memDir := filepath.Join(tmpDir, "memory") - err = os.MkdirAll(memDir, 0755) - if err != nil { - t.Fatalf("Failed to create memory dir: %v", err) - } - - // Create heartbeat service - hs := NewHeartbeatService(tmpDir, nil, 30, true) + hs := NewHeartbeatService(tmpDir, 30, true) // Write a log entry hs.log("INFO", "Test log entry") - // Verify log file exists at correct path - expectedLogPath := filepath.Join(memDir, "heartbeat.log") + // Verify log file exists at workspace root + expectedLogPath := filepath.Join(tmpDir, "heartbeat.log") if _, err := os.Stat(expectedLogPath); os.IsNotExist(err) { t.Errorf("Expected log file at %s, but it doesn't exist", expectedLogPath) } +} - // Verify log file does NOT exist at old path - oldLogPath := filepath.Join(tmpDir, "heartbeat.log") - if _, err := os.Stat(oldLogPath); err == nil { - t.Error("Log file should not exist at old path (workspace/heartbeat.log)") +// TestHeartbeatFilePath verifies HEARTBEAT.md is at workspace root +func TestHeartbeatFilePath(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "heartbeat-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + hs := NewHeartbeatService(tmpDir, 30, true) + + // Trigger default template creation + hs.buildPrompt() + + // Verify HEARTBEAT.md exists at workspace root + expectedPath := filepath.Join(tmpDir, "HEARTBEAT.md") + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + t.Errorf("Expected HEARTBEAT.md at %s, but it doesn't exist", expectedPath) } }