From e7e086155e167c35b0aff94d49c68ea49afe7881 Mon Sep 17 00:00:00 2001 From: yinwm Date: Fri, 13 Feb 2026 01:42:22 +0800 Subject: [PATCH] feat: merge heartbeat service improvements from feat-heartbeat branch - Add ChannelSender interface for sending heartbeat results to users - Add sendResponse() to automatically deliver results to last channel - Add createDefaultHeartbeatTemplate() for first-run setup - Support HEARTBEAT_OK silent response (legacy compatibility) - Add structured logging with INFO/ERROR levels - Move integration tests to separate file with build tag Co-Authored-By: Claude Opus 4.6 --- pkg/heartbeat/service.go | 299 ++++++++++++++---- pkg/heartbeat/service_test.go | 2 +- .../claude_cli_provider_integration_test.go | 126 ++++++++ pkg/providers/claude_cli_provider_test.go | 128 -------- 4 files changed, 372 insertions(+), 183 deletions(-) create mode 100644 pkg/providers/claude_cli_provider_integration_test.go diff --git a/pkg/heartbeat/service.go b/pkg/heartbeat/service.go index ea129c4..7c3cd83 100644 --- a/pkg/heartbeat/service.go +++ b/pkg/heartbeat/service.go @@ -1,6 +1,13 @@ +// PicoClaw - Ultra-lightweight personal AI agent +// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot +// License: MIT +// +// Copyright (c) 2026 PicoClaw contributors + package heartbeat import ( + "context" "fmt" "os" "path/filepath" @@ -8,6 +15,13 @@ import ( "time" "github.com/sipeed/picoclaw/pkg/logger" + "github.com/sipeed/picoclaw/pkg/state" +) + +const ( + minIntervalMinutes = 5 + defaultIntervalMinutes = 30 + heartbeatOK = "HEARTBEAT_OK" ) // ToolResult represents a structured result from tool execution. @@ -25,10 +39,18 @@ type ToolResult struct { // It returns a ToolResult that can indicate async operations. type HeartbeatHandler func(prompt string) *ToolResult +// ChannelSender defines the interface for sending messages to channels. +// This is used to send heartbeat results back to the user. +type ChannelSender interface { + SendToChannel(ctx context.Context, channelName, chatID, content string) error +} + +// HeartbeatService manages periodic heartbeat checks type HeartbeatService struct { - workspace string - onHeartbeat func(string) (string, error) - // onHeartbeatWithTools is the new handler that supports ToolResult returns + workspace string + channelSender ChannelSender + stateManager *state.Manager + onHeartbeat func(string) (string, error) onHeartbeatWithTools HeartbeatHandler interval time.Duration enabled bool @@ -37,14 +59,32 @@ type HeartbeatService struct { stopChan chan struct{} } -func NewHeartbeatService(workspace string, onHeartbeat func(string) (string, error), intervalS int, enabled bool) *HeartbeatService { - return &HeartbeatService{ - workspace: workspace, - onHeartbeat: onHeartbeat, - interval: time.Duration(intervalS) * time.Second, - enabled: enabled, - stopChan: make(chan struct{}), +// NewHeartbeatService creates a new heartbeat service +func NewHeartbeatService(workspace string, onHeartbeat func(string) (string, error), intervalMinutes int, enabled bool) *HeartbeatService { + // Apply minimum interval + if intervalMinutes < minIntervalMinutes && intervalMinutes != 0 { + intervalMinutes = minIntervalMinutes } + + if intervalMinutes == 0 { + intervalMinutes = defaultIntervalMinutes + } + + return &HeartbeatService{ + workspace: workspace, + onHeartbeat: onHeartbeat, + interval: time.Duration(intervalMinutes) * time.Minute, + enabled: enabled, + stateManager: state.NewManager(workspace), + stopChan: make(chan struct{}), + } +} + +// SetChannelSender sets the channel sender for delivering heartbeat results. +func (hs *HeartbeatService) SetChannelSender(sender ChannelSender) { + hs.mu.Lock() + defer hs.mu.Unlock() + hs.channelSender = sender } // SetOnHeartbeatWithTools sets the tool-supporting heartbeat handler. @@ -56,24 +96,34 @@ func (hs *HeartbeatService) SetOnHeartbeatWithTools(handler HeartbeatHandler) { hs.onHeartbeatWithTools = handler } +// Start begins the heartbeat service func (hs *HeartbeatService) Start() error { hs.mu.Lock() defer hs.mu.Unlock() if hs.started { + logger.InfoC("heartbeat", "Heartbeat service already running") return nil } if !hs.enabled { - return fmt.Errorf("heartbeat service is disabled") + logger.InfoC("heartbeat", "Heartbeat service disabled") + return nil } hs.started = true + hs.stopChan = make(chan struct{}) + go hs.runLoop() + logger.InfoCF("heartbeat", "Heartbeat service started", map[string]any{ + "interval_minutes": hs.interval.Minutes(), + }) + return nil } +// Stop gracefully stops the heartbeat service func (hs *HeartbeatService) Stop() { hs.mu.Lock() defer hs.mu.Unlock() @@ -82,59 +132,81 @@ func (hs *HeartbeatService) Stop() { return } - hs.started = false + logger.InfoC("heartbeat", "Stopping heartbeat service") close(hs.stopChan) + hs.started = false } -func (hs *HeartbeatService) running() bool { - select { - case <-hs.stopChan: - return false - default: - return true - } +// IsRunning returns whether the service is running +func (hs *HeartbeatService) IsRunning() bool { + hs.mu.RLock() + defer hs.mu.RUnlock() + return hs.started } +// runLoop runs the heartbeat ticker func (hs *HeartbeatService) runLoop() { ticker := time.NewTicker(hs.interval) defer ticker.Stop() + // Run first heartbeat after initial delay + time.AfterFunc(time.Second, func() { + hs.executeHeartbeat() + }) + for { select { case <-hs.stopChan: return case <-ticker.C: - hs.checkHeartbeat() + hs.executeHeartbeat() } } } -func (hs *HeartbeatService) checkHeartbeat() { +// executeHeartbeat performs a single heartbeat check +func (hs *HeartbeatService) executeHeartbeat() { hs.mu.RLock() - if !hs.enabled || !hs.running() { - hs.mu.RUnlock() - return - } + enabled := hs.enabled && hs.started + handler := hs.onHeartbeat + handlerWithTools := hs.onHeartbeatWithTools hs.mu.RUnlock() + if !enabled { + return + } + + logger.DebugC("heartbeat", "Executing heartbeat") + prompt := hs.buildPrompt() + if prompt == "" { + logger.InfoC("heartbeat", "No heartbeat prompt (HEARTBEAT.md empty or missing)") + return + } // Prefer the new tool-supporting handler - if hs.onHeartbeatWithTools != nil { + if handlerWithTools != nil { hs.executeHeartbeatWithTools(prompt) - } else if hs.onHeartbeat != nil { - _, err := hs.onHeartbeat(prompt) + } else if handler != nil { + response, err := handler(prompt) if err != nil { - hs.log(fmt.Sprintf("Heartbeat error: %v", err)) + hs.logError("Heartbeat processing error: %v", err) + return } + + // Check for HEARTBEAT_OK - completely silent response + if isHeartbeatOK(response) { + hs.logInfo("Heartbeat OK - silent") + return + } + + // Non-OK response - send to last channel + hs.sendResponse(response) } } // ExecuteHeartbeatWithTools executes a heartbeat using the tool-supporting handler. // This method processes ToolResult returns and handles async tasks appropriately. -// If the result is async, it logs that the task started in background. -// If the result is an error, it logs the error message. -// This method is designed to be called from checkHeartbeat or directly by external code. func (hs *HeartbeatService) ExecuteHeartbeatWithTools(prompt string) { hs.executeHeartbeatWithTools(prompt) } @@ -144,19 +216,19 @@ func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) { result := hs.onHeartbeatWithTools(prompt) if result == nil { - hs.log("Heartbeat handler returned nil result") + hs.logInfo("Heartbeat handler returned nil result") return } // Handle different result types if result.IsError { - hs.log(fmt.Sprintf("Heartbeat error: %s", result.ForLLM)) + hs.logError("Heartbeat error: %s", result.ForLLM) return } if result.Async { // Async task started - log and return immediately - hs.log(fmt.Sprintf("Async task started: %s", result.ForLLM)) + hs.logInfo("Async task started: %s", result.ForLLM) logger.InfoCF("heartbeat", "Async heartbeat task started", map[string]interface{}{ "message": result.ForLLM, @@ -164,37 +236,156 @@ func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) { return } - // Normal completion - log result - hs.log(fmt.Sprintf("Heartbeat completed: %s", result.ForLLM)) -} - -func (hs *HeartbeatService) buildPrompt() string { - notesDir := filepath.Join(hs.workspace, "memory") - notesFile := filepath.Join(notesDir, "HEARTBEAT.md") - - var notes string - if data, err := os.ReadFile(notesFile); err == nil { - notes = string(data) + // Check if silent (HEARTBEAT_OK equivalent) + if result.Silent { + hs.logInfo("Heartbeat OK - silent") + return } - now := time.Now().Format("2006-01-02 15:04") + // Normal completion - send result to user if available + if result.ForUser != "" { + hs.sendResponse(result.ForUser) + } else if result.ForLLM != "" { + hs.sendResponse(result.ForLLM) + } + hs.logInfo("Heartbeat completed: %s", result.ForLLM) +} + +// buildPrompt builds the heartbeat prompt from HEARTBEAT.md +func (hs *HeartbeatService) buildPrompt() string { + // Use memory directory for HEARTBEAT.md + notesDir := filepath.Join(hs.workspace, "memory") + heartbeatPath := filepath.Join(notesDir, "HEARTBEAT.md") + + data, err := os.ReadFile(heartbeatPath) + if err != nil { + if os.IsNotExist(err) { + // Create default HEARTBEAT.md template + hs.createDefaultHeartbeatTemplate() + return "" + } + hs.logError("Error reading HEARTBEAT.md: %v", err) + return "" + } + + content := string(data) + if len(content) == 0 { + return "" + } + + // Build prompt with system instructions + now := time.Now().Format("2006-01-02 15:04:05") prompt := fmt.Sprintf(`# Heartbeat Check Current time: %s -Check if there are any tasks I should be aware of or actions I should take. -Review the memory file for any important updates or changes. -Be proactive in identifying potential issues or improvements. +You are a proactive AI assistant. This is a scheduled heartbeat check. +Review the following tasks and execute any necessary actions using available skills. +If there is nothing that requires attention, respond ONLY with: HEARTBEAT_OK %s -`, now, notes) +`, now, content) return prompt } -func (hs *HeartbeatService) log(message string) { - logFile := filepath.Join(hs.workspace, "memory", "heartbeat.log") +// createDefaultHeartbeatTemplate creates the default HEARTBEAT.md file +func (hs *HeartbeatService) createDefaultHeartbeatTemplate() { + notesDir := filepath.Join(hs.workspace, "memory") + heartbeatPath := filepath.Join(notesDir, "HEARTBEAT.md") + + // Ensure memory directory exists + if err := os.MkdirAll(notesDir, 0755); err != nil { + hs.logError("Failed to create memory directory: %v", err) + return + } + + defaultContent := `# Heartbeat Check List + +This file contains tasks for the heartbeat service to check periodically. + +## Examples + +- Check for unread messages +- Review upcoming calendar events +- Check device status (e.g., MaixCam) + +## Instructions + +If there's nothing that needs attention, respond with: HEARTBEAT_OK +This ensures the heartbeat runs silently when everything is fine. + +--- + +Add your heartbeat tasks below this line: +` + + if err := os.WriteFile(heartbeatPath, []byte(defaultContent), 0644); err != nil { + hs.logError("Failed to create default HEARTBEAT.md: %v", err) + } else { + hs.logInfo("Created default HEARTBEAT.md template") + } +} + +// sendResponse sends the heartbeat response to the last channel +func (hs *HeartbeatService) sendResponse(response string) { + hs.mu.RLock() + sender := hs.channelSender + hs.mu.RUnlock() + + if sender == nil { + hs.logInfo("No channel sender configured, heartbeat result not sent") + return + } + + // Get last channel from state + lastChannel := hs.stateManager.GetLastChannel() + if lastChannel == "" { + hs.logInfo("No last channel recorded, heartbeat result not sent") + return + } + + // Parse channel format: "platform:user_id" (e.g., "telegram:123456") + var platform, userID string + n, err := fmt.Sscanf(lastChannel, "%[^:]:%s", &platform, &userID) + if err != nil || n != 2 { + hs.logError("Invalid last channel format: %s", lastChannel) + return + } + + // Send to channel + ctx := context.Background() + if err := sender.SendToChannel(ctx, platform, userID, response); err != nil { + hs.logError("Error sending to channel %s: %v", platform, err) + return + } + + hs.logInfo("Heartbeat result sent to %s", platform) +} + +// isHeartbeatOK checks if the response is HEARTBEAT_OK +func isHeartbeatOK(response string) bool { + return response == heartbeatOK +} + +// logInfo logs an informational message to the heartbeat log +func (hs *HeartbeatService) logInfo(format string, args ...any) { + hs.log("INFO", format, args...) +} + +// logError logs an error message to the heartbeat log +func (hs *HeartbeatService) logError(format string, args ...any) { + hs.log("ERROR", format, args...) +} + +// log writes a message to the heartbeat log file +func (hs *HeartbeatService) log(level, format string, args ...any) { + // Ensure memory directory exists + logDir := filepath.Join(hs.workspace, "memory") + os.MkdirAll(logDir, 0755) + + logFile := filepath.Join(logDir, "heartbeat.log") f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return @@ -202,5 +393,5 @@ func (hs *HeartbeatService) log(message string) { defer f.Close() timestamp := time.Now().Format("2006-01-02 15:04:05") - fmt.Fprintf(f, "[%s] %s\n", timestamp, message) + fmt.Fprintf(f, "[%s] [%s] %s\n", timestamp, level, fmt.Sprintf(format, args...)) } diff --git a/pkg/heartbeat/service_test.go b/pkg/heartbeat/service_test.go index 5b1e801..297d2bd 100644 --- a/pkg/heartbeat/service_test.go +++ b/pkg/heartbeat/service_test.go @@ -213,7 +213,7 @@ func TestLogPath(t *testing.T) { hs := NewHeartbeatService(tmpDir, nil, 30, true) // Write a log entry - hs.log("Test log entry") + hs.log("INFO", "Test log entry") // Verify log file exists at correct path expectedLogPath := filepath.Join(memDir, "heartbeat.log") diff --git a/pkg/providers/claude_cli_provider_integration_test.go b/pkg/providers/claude_cli_provider_integration_test.go new file mode 100644 index 0000000..9d1131a --- /dev/null +++ b/pkg/providers/claude_cli_provider_integration_test.go @@ -0,0 +1,126 @@ +//go:build integration + +package providers + +import ( + "context" + exec "os/exec" + "strings" + "testing" + "time" +) + +// TestIntegration_RealClaudeCLI tests the ClaudeCliProvider with a real claude CLI. +// Run with: go test -tags=integration ./pkg/providers/... +func TestIntegration_RealClaudeCLI(t *testing.T) { + // Check if claude CLI is available + path, err := exec.LookPath("claude") + if err != nil { + t.Skip("claude CLI not found in PATH, skipping integration test") + } + t.Logf("Using claude CLI at: %s", path) + + p := NewClaudeCliProvider(t.TempDir()) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + resp, err := p.Chat(ctx, []Message{ + {Role: "user", Content: "Respond with only the word 'pong'. Nothing else."}, + }, nil, "", nil) + + if err != nil { + t.Fatalf("Chat() with real CLI error = %v", err) + } + + // Verify response structure + if resp.Content == "" { + t.Error("Content is empty") + } + if resp.FinishReason != "stop" { + t.Errorf("FinishReason = %q, want %q", resp.FinishReason, "stop") + } + if resp.Usage == nil { + t.Error("Usage should not be nil from real CLI") + } else { + if resp.Usage.PromptTokens == 0 { + t.Error("PromptTokens should be > 0") + } + if resp.Usage.CompletionTokens == 0 { + t.Error("CompletionTokens should be > 0") + } + t.Logf("Usage: prompt=%d, completion=%d, total=%d", + resp.Usage.PromptTokens, resp.Usage.CompletionTokens, resp.Usage.TotalTokens) + } + + t.Logf("Response content: %q", resp.Content) + + // Loose check - should contain "pong" somewhere (model might capitalize or add punctuation) + if !strings.Contains(strings.ToLower(resp.Content), "pong") { + t.Errorf("Content = %q, expected to contain 'pong'", resp.Content) + } +} + +func TestIntegration_RealClaudeCLI_WithSystemPrompt(t *testing.T) { + if _, err := exec.LookPath("claude"); err != nil { + t.Skip("claude CLI not found in PATH") + } + + p := NewClaudeCliProvider(t.TempDir()) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + resp, err := p.Chat(ctx, []Message{ + {Role: "system", Content: "You are a calculator. Only respond with numbers. No text."}, + {Role: "user", Content: "What is 2+2?"}, + }, nil, "", nil) + + if err != nil { + t.Fatalf("Chat() error = %v", err) + } + + t.Logf("Response: %q", resp.Content) + + if !strings.Contains(resp.Content, "4") { + t.Errorf("Content = %q, expected to contain '4'", resp.Content) + } +} + +func TestIntegration_RealClaudeCLI_ParsesRealJSON(t *testing.T) { + if _, err := exec.LookPath("claude"); err != nil { + t.Skip("claude CLI not found in PATH") + } + + // Run claude directly and verify our parser handles real output + cmd := exec.Command("claude", "-p", "--output-format", "json", + "--dangerously-skip-permissions", "--no-chrome", "--no-session-persistence", "-") + cmd.Stdin = strings.NewReader("Say hi") + cmd.Dir = t.TempDir() + + output, err := cmd.Output() + if err != nil { + t.Fatalf("claude CLI failed: %v", err) + } + + t.Logf("Raw CLI output: %s", string(output)) + + // Verify our parser can handle real output + p := NewClaudeCliProvider("") + resp, err := p.parseClaudeCliResponse(string(output)) + if err != nil { + t.Fatalf("parseClaudeCliResponse() failed on real CLI output: %v", err) + } + + if resp.Content == "" { + t.Error("parsed Content is empty") + } + if resp.FinishReason != "stop" { + t.Errorf("FinishReason = %q, want stop", resp.FinishReason) + } + if resp.Usage == nil { + t.Error("Usage should not be nil") + } + + t.Logf("Parsed: content=%q, finish=%s, usage=%+v", resp.Content, resp.FinishReason, resp.Usage) +} diff --git a/pkg/providers/claude_cli_provider_test.go b/pkg/providers/claude_cli_provider_test.go index f6c7983..4d75e60 100644 --- a/pkg/providers/claude_cli_provider_test.go +++ b/pkg/providers/claude_cli_provider_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "os/exec" "path/filepath" "runtime" "strings" @@ -980,130 +979,3 @@ func TestFindMatchingBrace(t *testing.T) { } } } - -// --- Integration test: real claude CLI --- - -func TestIntegration_RealClaudeCLI(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test in short mode") - } - - // Check if claude CLI is available - path, err := exec.LookPath("claude") - if err != nil { - t.Skip("claude CLI not found in PATH, skipping integration test") - } - t.Logf("Using claude CLI at: %s", path) - - p := NewClaudeCliProvider(t.TempDir()) - - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - - resp, err := p.Chat(ctx, []Message{ - {Role: "user", Content: "Respond with only the word 'pong'. Nothing else."}, - }, nil, "", nil) - - if err != nil { - t.Fatalf("Chat() with real CLI error = %v", err) - } - - // Verify response structure - if resp.Content == "" { - t.Error("Content is empty") - } - if resp.FinishReason != "stop" { - t.Errorf("FinishReason = %q, want %q", resp.FinishReason, "stop") - } - if resp.Usage == nil { - t.Error("Usage should not be nil from real CLI") - } else { - if resp.Usage.PromptTokens == 0 { - t.Error("PromptTokens should be > 0") - } - if resp.Usage.CompletionTokens == 0 { - t.Error("CompletionTokens should be > 0") - } - t.Logf("Usage: prompt=%d, completion=%d, total=%d", - resp.Usage.PromptTokens, resp.Usage.CompletionTokens, resp.Usage.TotalTokens) - } - - t.Logf("Response content: %q", resp.Content) - - // Loose check - should contain "pong" somewhere (model might capitalize or add punctuation) - if !strings.Contains(strings.ToLower(resp.Content), "pong") { - t.Errorf("Content = %q, expected to contain 'pong'", resp.Content) - } -} - -func TestIntegration_RealClaudeCLI_WithSystemPrompt(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test in short mode") - } - - if _, err := exec.LookPath("claude"); err != nil { - t.Skip("claude CLI not found in PATH") - } - - p := NewClaudeCliProvider(t.TempDir()) - - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - - resp, err := p.Chat(ctx, []Message{ - {Role: "system", Content: "You are a calculator. Only respond with numbers. No text."}, - {Role: "user", Content: "What is 2+2?"}, - }, nil, "", nil) - - if err != nil { - t.Fatalf("Chat() error = %v", err) - } - - t.Logf("Response: %q", resp.Content) - - if !strings.Contains(resp.Content, "4") { - t.Errorf("Content = %q, expected to contain '4'", resp.Content) - } -} - -func TestIntegration_RealClaudeCLI_ParsesRealJSON(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test in short mode") - } - - if _, err := exec.LookPath("claude"); err != nil { - t.Skip("claude CLI not found in PATH") - } - - // Run claude directly and verify our parser handles real output - cmd := exec.Command("claude", "-p", "--output-format", "json", - "--dangerously-skip-permissions", "--no-chrome", "--no-session-persistence", "-") - cmd.Stdin = strings.NewReader("Say hi") - cmd.Dir = t.TempDir() - - output, err := cmd.Output() - if err != nil { - t.Fatalf("claude CLI failed: %v", err) - } - - t.Logf("Raw CLI output: %s", string(output)) - - // Verify our parser can handle real output - p := NewClaudeCliProvider("") - resp, err := p.parseClaudeCliResponse(string(output)) - if err != nil { - t.Fatalf("parseClaudeCliResponse() failed on real CLI output: %v", err) - } - - if resp.Content == "" { - t.Error("parsed Content is empty") - } - if resp.FinishReason != "stop" { - t.Errorf("FinishReason = %q, want stop", resp.FinishReason) - } - if resp.Usage == nil { - t.Error("Usage should not be nil") - } - - t.Logf("Parsed: content=%q, finish=%s, usage=%+v", resp.Content, resp.FinishReason, resp.Usage) -}