feat: merge heartbeat service improvements from feat-heartbeat branch

- Add ChannelSender interface for sending heartbeat results to users
- Add sendResponse() to automatically deliver results to last channel
- Add createDefaultHeartbeatTemplate() for first-run setup
- Support HEARTBEAT_OK silent response (legacy compatibility)
- Add structured logging with INFO/ERROR levels
- Move integration tests to separate file with build tag

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
yinwm
2026-02-13 01:42:22 +08:00
parent 53b5be862f
commit e7e086155e
4 changed files with 372 additions and 183 deletions

View File

@@ -1,6 +1,13 @@
// PicoClaw - Ultra-lightweight personal AI agent
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
// License: MIT
//
// Copyright (c) 2026 PicoClaw contributors
package heartbeat package heartbeat
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
@@ -8,6 +15,13 @@ import (
"time" "time"
"github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/state"
)
const (
minIntervalMinutes = 5
defaultIntervalMinutes = 30
heartbeatOK = "HEARTBEAT_OK"
) )
// ToolResult represents a structured result from tool execution. // ToolResult represents a structured result from tool execution.
@@ -25,10 +39,18 @@ type ToolResult struct {
// It returns a ToolResult that can indicate async operations. // It returns a ToolResult that can indicate async operations.
type HeartbeatHandler func(prompt string) *ToolResult type HeartbeatHandler func(prompt string) *ToolResult
// ChannelSender defines the interface for sending messages to channels.
// This is used to send heartbeat results back to the user.
type ChannelSender interface {
SendToChannel(ctx context.Context, channelName, chatID, content string) error
}
// HeartbeatService manages periodic heartbeat checks
type HeartbeatService struct { type HeartbeatService struct {
workspace string workspace string
channelSender ChannelSender
stateManager *state.Manager
onHeartbeat func(string) (string, error) onHeartbeat func(string) (string, error)
// onHeartbeatWithTools is the new handler that supports ToolResult returns
onHeartbeatWithTools HeartbeatHandler onHeartbeatWithTools HeartbeatHandler
interval time.Duration interval time.Duration
enabled bool enabled bool
@@ -37,16 +59,34 @@ type HeartbeatService struct {
stopChan chan struct{} stopChan chan struct{}
} }
func NewHeartbeatService(workspace string, onHeartbeat func(string) (string, error), intervalS int, enabled bool) *HeartbeatService { // NewHeartbeatService creates a new heartbeat service
func NewHeartbeatService(workspace string, onHeartbeat func(string) (string, error), intervalMinutes int, enabled bool) *HeartbeatService {
// Apply minimum interval
if intervalMinutes < minIntervalMinutes && intervalMinutes != 0 {
intervalMinutes = minIntervalMinutes
}
if intervalMinutes == 0 {
intervalMinutes = defaultIntervalMinutes
}
return &HeartbeatService{ return &HeartbeatService{
workspace: workspace, workspace: workspace,
onHeartbeat: onHeartbeat, onHeartbeat: onHeartbeat,
interval: time.Duration(intervalS) * time.Second, interval: time.Duration(intervalMinutes) * time.Minute,
enabled: enabled, enabled: enabled,
stateManager: state.NewManager(workspace),
stopChan: make(chan struct{}), stopChan: make(chan struct{}),
} }
} }
// SetChannelSender sets the channel sender for delivering heartbeat results.
func (hs *HeartbeatService) SetChannelSender(sender ChannelSender) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.channelSender = sender
}
// SetOnHeartbeatWithTools sets the tool-supporting heartbeat handler. // SetOnHeartbeatWithTools sets the tool-supporting heartbeat handler.
// This handler returns a ToolResult that can indicate async operations. // This handler returns a ToolResult that can indicate async operations.
// When set, this handler takes precedence over the legacy onHeartbeat callback. // When set, this handler takes precedence over the legacy onHeartbeat callback.
@@ -56,24 +96,34 @@ func (hs *HeartbeatService) SetOnHeartbeatWithTools(handler HeartbeatHandler) {
hs.onHeartbeatWithTools = handler hs.onHeartbeatWithTools = handler
} }
// Start begins the heartbeat service
func (hs *HeartbeatService) Start() error { func (hs *HeartbeatService) Start() error {
hs.mu.Lock() hs.mu.Lock()
defer hs.mu.Unlock() defer hs.mu.Unlock()
if hs.started { if hs.started {
logger.InfoC("heartbeat", "Heartbeat service already running")
return nil return nil
} }
if !hs.enabled { if !hs.enabled {
return fmt.Errorf("heartbeat service is disabled") logger.InfoC("heartbeat", "Heartbeat service disabled")
return nil
} }
hs.started = true hs.started = true
hs.stopChan = make(chan struct{})
go hs.runLoop() go hs.runLoop()
logger.InfoCF("heartbeat", "Heartbeat service started", map[string]any{
"interval_minutes": hs.interval.Minutes(),
})
return nil return nil
} }
// Stop gracefully stops the heartbeat service
func (hs *HeartbeatService) Stop() { func (hs *HeartbeatService) Stop() {
hs.mu.Lock() hs.mu.Lock()
defer hs.mu.Unlock() defer hs.mu.Unlock()
@@ -82,59 +132,81 @@ func (hs *HeartbeatService) Stop() {
return return
} }
hs.started = false logger.InfoC("heartbeat", "Stopping heartbeat service")
close(hs.stopChan) close(hs.stopChan)
hs.started = false
} }
func (hs *HeartbeatService) running() bool { // IsRunning returns whether the service is running
select { func (hs *HeartbeatService) IsRunning() bool {
case <-hs.stopChan: hs.mu.RLock()
return false defer hs.mu.RUnlock()
default: return hs.started
return true
}
} }
// runLoop runs the heartbeat ticker
func (hs *HeartbeatService) runLoop() { func (hs *HeartbeatService) runLoop() {
ticker := time.NewTicker(hs.interval) ticker := time.NewTicker(hs.interval)
defer ticker.Stop() defer ticker.Stop()
// Run first heartbeat after initial delay
time.AfterFunc(time.Second, func() {
hs.executeHeartbeat()
})
for { for {
select { select {
case <-hs.stopChan: case <-hs.stopChan:
return return
case <-ticker.C: case <-ticker.C:
hs.checkHeartbeat() hs.executeHeartbeat()
} }
} }
} }
func (hs *HeartbeatService) checkHeartbeat() { // executeHeartbeat performs a single heartbeat check
func (hs *HeartbeatService) executeHeartbeat() {
hs.mu.RLock() hs.mu.RLock()
if !hs.enabled || !hs.running() { enabled := hs.enabled && hs.started
handler := hs.onHeartbeat
handlerWithTools := hs.onHeartbeatWithTools
hs.mu.RUnlock() hs.mu.RUnlock()
if !enabled {
return return
} }
hs.mu.RUnlock()
logger.DebugC("heartbeat", "Executing heartbeat")
prompt := hs.buildPrompt() prompt := hs.buildPrompt()
if prompt == "" {
logger.InfoC("heartbeat", "No heartbeat prompt (HEARTBEAT.md empty or missing)")
return
}
// Prefer the new tool-supporting handler // Prefer the new tool-supporting handler
if hs.onHeartbeatWithTools != nil { if handlerWithTools != nil {
hs.executeHeartbeatWithTools(prompt) hs.executeHeartbeatWithTools(prompt)
} else if hs.onHeartbeat != nil { } else if handler != nil {
_, err := hs.onHeartbeat(prompt) response, err := handler(prompt)
if err != nil { if err != nil {
hs.log(fmt.Sprintf("Heartbeat error: %v", err)) hs.logError("Heartbeat processing error: %v", err)
return
} }
// Check for HEARTBEAT_OK - completely silent response
if isHeartbeatOK(response) {
hs.logInfo("Heartbeat OK - silent")
return
}
// Non-OK response - send to last channel
hs.sendResponse(response)
} }
} }
// ExecuteHeartbeatWithTools executes a heartbeat using the tool-supporting handler. // ExecuteHeartbeatWithTools executes a heartbeat using the tool-supporting handler.
// This method processes ToolResult returns and handles async tasks appropriately. // This method processes ToolResult returns and handles async tasks appropriately.
// If the result is async, it logs that the task started in background.
// If the result is an error, it logs the error message.
// This method is designed to be called from checkHeartbeat or directly by external code.
func (hs *HeartbeatService) ExecuteHeartbeatWithTools(prompt string) { func (hs *HeartbeatService) ExecuteHeartbeatWithTools(prompt string) {
hs.executeHeartbeatWithTools(prompt) hs.executeHeartbeatWithTools(prompt)
} }
@@ -144,19 +216,19 @@ func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) {
result := hs.onHeartbeatWithTools(prompt) result := hs.onHeartbeatWithTools(prompt)
if result == nil { if result == nil {
hs.log("Heartbeat handler returned nil result") hs.logInfo("Heartbeat handler returned nil result")
return return
} }
// Handle different result types // Handle different result types
if result.IsError { if result.IsError {
hs.log(fmt.Sprintf("Heartbeat error: %s", result.ForLLM)) hs.logError("Heartbeat error: %s", result.ForLLM)
return return
} }
if result.Async { if result.Async {
// Async task started - log and return immediately // Async task started - log and return immediately
hs.log(fmt.Sprintf("Async task started: %s", result.ForLLM)) hs.logInfo("Async task started: %s", result.ForLLM)
logger.InfoCF("heartbeat", "Async heartbeat task started", logger.InfoCF("heartbeat", "Async heartbeat task started",
map[string]interface{}{ map[string]interface{}{
"message": result.ForLLM, "message": result.ForLLM,
@@ -164,37 +236,156 @@ func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) {
return return
} }
// Normal completion - log result // Check if silent (HEARTBEAT_OK equivalent)
hs.log(fmt.Sprintf("Heartbeat completed: %s", result.ForLLM)) if result.Silent {
} hs.logInfo("Heartbeat OK - silent")
return
func (hs *HeartbeatService) buildPrompt() string {
notesDir := filepath.Join(hs.workspace, "memory")
notesFile := filepath.Join(notesDir, "HEARTBEAT.md")
var notes string
if data, err := os.ReadFile(notesFile); err == nil {
notes = string(data)
} }
now := time.Now().Format("2006-01-02 15:04") // Normal completion - send result to user if available
if result.ForUser != "" {
hs.sendResponse(result.ForUser)
} else if result.ForLLM != "" {
hs.sendResponse(result.ForLLM)
}
hs.logInfo("Heartbeat completed: %s", result.ForLLM)
}
// buildPrompt builds the heartbeat prompt from HEARTBEAT.md
func (hs *HeartbeatService) buildPrompt() string {
// Use memory directory for HEARTBEAT.md
notesDir := filepath.Join(hs.workspace, "memory")
heartbeatPath := filepath.Join(notesDir, "HEARTBEAT.md")
data, err := os.ReadFile(heartbeatPath)
if err != nil {
if os.IsNotExist(err) {
// Create default HEARTBEAT.md template
hs.createDefaultHeartbeatTemplate()
return ""
}
hs.logError("Error reading HEARTBEAT.md: %v", err)
return ""
}
content := string(data)
if len(content) == 0 {
return ""
}
// Build prompt with system instructions
now := time.Now().Format("2006-01-02 15:04:05")
prompt := fmt.Sprintf(`# Heartbeat Check prompt := fmt.Sprintf(`# Heartbeat Check
Current time: %s Current time: %s
Check if there are any tasks I should be aware of or actions I should take. You are a proactive AI assistant. This is a scheduled heartbeat check.
Review the memory file for any important updates or changes. Review the following tasks and execute any necessary actions using available skills.
Be proactive in identifying potential issues or improvements. If there is nothing that requires attention, respond ONLY with: HEARTBEAT_OK
%s %s
`, now, notes) `, now, content)
return prompt return prompt
} }
func (hs *HeartbeatService) log(message string) { // createDefaultHeartbeatTemplate creates the default HEARTBEAT.md file
logFile := filepath.Join(hs.workspace, "memory", "heartbeat.log") func (hs *HeartbeatService) createDefaultHeartbeatTemplate() {
notesDir := filepath.Join(hs.workspace, "memory")
heartbeatPath := filepath.Join(notesDir, "HEARTBEAT.md")
// Ensure memory directory exists
if err := os.MkdirAll(notesDir, 0755); err != nil {
hs.logError("Failed to create memory directory: %v", err)
return
}
defaultContent := `# Heartbeat Check List
This file contains tasks for the heartbeat service to check periodically.
## Examples
- Check for unread messages
- Review upcoming calendar events
- Check device status (e.g., MaixCam)
## Instructions
If there's nothing that needs attention, respond with: HEARTBEAT_OK
This ensures the heartbeat runs silently when everything is fine.
---
Add your heartbeat tasks below this line:
`
if err := os.WriteFile(heartbeatPath, []byte(defaultContent), 0644); err != nil {
hs.logError("Failed to create default HEARTBEAT.md: %v", err)
} else {
hs.logInfo("Created default HEARTBEAT.md template")
}
}
// sendResponse sends the heartbeat response to the last channel
func (hs *HeartbeatService) sendResponse(response string) {
hs.mu.RLock()
sender := hs.channelSender
hs.mu.RUnlock()
if sender == nil {
hs.logInfo("No channel sender configured, heartbeat result not sent")
return
}
// Get last channel from state
lastChannel := hs.stateManager.GetLastChannel()
if lastChannel == "" {
hs.logInfo("No last channel recorded, heartbeat result not sent")
return
}
// Parse channel format: "platform:user_id" (e.g., "telegram:123456")
var platform, userID string
n, err := fmt.Sscanf(lastChannel, "%[^:]:%s", &platform, &userID)
if err != nil || n != 2 {
hs.logError("Invalid last channel format: %s", lastChannel)
return
}
// Send to channel
ctx := context.Background()
if err := sender.SendToChannel(ctx, platform, userID, response); err != nil {
hs.logError("Error sending to channel %s: %v", platform, err)
return
}
hs.logInfo("Heartbeat result sent to %s", platform)
}
// isHeartbeatOK checks if the response is HEARTBEAT_OK
func isHeartbeatOK(response string) bool {
return response == heartbeatOK
}
// logInfo logs an informational message to the heartbeat log
func (hs *HeartbeatService) logInfo(format string, args ...any) {
hs.log("INFO", format, args...)
}
// logError logs an error message to the heartbeat log
func (hs *HeartbeatService) logError(format string, args ...any) {
hs.log("ERROR", format, args...)
}
// log writes a message to the heartbeat log file
func (hs *HeartbeatService) log(level, format string, args ...any) {
// Ensure memory directory exists
logDir := filepath.Join(hs.workspace, "memory")
os.MkdirAll(logDir, 0755)
logFile := filepath.Join(logDir, "heartbeat.log")
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
return return
@@ -202,5 +393,5 @@ func (hs *HeartbeatService) log(message string) {
defer f.Close() defer f.Close()
timestamp := time.Now().Format("2006-01-02 15:04:05") timestamp := time.Now().Format("2006-01-02 15:04:05")
fmt.Fprintf(f, "[%s] %s\n", timestamp, message) fmt.Fprintf(f, "[%s] [%s] %s\n", timestamp, level, fmt.Sprintf(format, args...))
} }

View File

@@ -213,7 +213,7 @@ func TestLogPath(t *testing.T) {
hs := NewHeartbeatService(tmpDir, nil, 30, true) hs := NewHeartbeatService(tmpDir, nil, 30, true)
// Write a log entry // Write a log entry
hs.log("Test log entry") hs.log("INFO", "Test log entry")
// Verify log file exists at correct path // Verify log file exists at correct path
expectedLogPath := filepath.Join(memDir, "heartbeat.log") expectedLogPath := filepath.Join(memDir, "heartbeat.log")

View File

@@ -0,0 +1,126 @@
//go:build integration
package providers
import (
"context"
exec "os/exec"
"strings"
"testing"
"time"
)
// TestIntegration_RealClaudeCLI tests the ClaudeCliProvider with a real claude CLI.
// Run with: go test -tags=integration ./pkg/providers/...
func TestIntegration_RealClaudeCLI(t *testing.T) {
// Check if claude CLI is available
path, err := exec.LookPath("claude")
if err != nil {
t.Skip("claude CLI not found in PATH, skipping integration test")
}
t.Logf("Using claude CLI at: %s", path)
p := NewClaudeCliProvider(t.TempDir())
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
resp, err := p.Chat(ctx, []Message{
{Role: "user", Content: "Respond with only the word 'pong'. Nothing else."},
}, nil, "", nil)
if err != nil {
t.Fatalf("Chat() with real CLI error = %v", err)
}
// Verify response structure
if resp.Content == "" {
t.Error("Content is empty")
}
if resp.FinishReason != "stop" {
t.Errorf("FinishReason = %q, want %q", resp.FinishReason, "stop")
}
if resp.Usage == nil {
t.Error("Usage should not be nil from real CLI")
} else {
if resp.Usage.PromptTokens == 0 {
t.Error("PromptTokens should be > 0")
}
if resp.Usage.CompletionTokens == 0 {
t.Error("CompletionTokens should be > 0")
}
t.Logf("Usage: prompt=%d, completion=%d, total=%d",
resp.Usage.PromptTokens, resp.Usage.CompletionTokens, resp.Usage.TotalTokens)
}
t.Logf("Response content: %q", resp.Content)
// Loose check - should contain "pong" somewhere (model might capitalize or add punctuation)
if !strings.Contains(strings.ToLower(resp.Content), "pong") {
t.Errorf("Content = %q, expected to contain 'pong'", resp.Content)
}
}
func TestIntegration_RealClaudeCLI_WithSystemPrompt(t *testing.T) {
if _, err := exec.LookPath("claude"); err != nil {
t.Skip("claude CLI not found in PATH")
}
p := NewClaudeCliProvider(t.TempDir())
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
resp, err := p.Chat(ctx, []Message{
{Role: "system", Content: "You are a calculator. Only respond with numbers. No text."},
{Role: "user", Content: "What is 2+2?"},
}, nil, "", nil)
if err != nil {
t.Fatalf("Chat() error = %v", err)
}
t.Logf("Response: %q", resp.Content)
if !strings.Contains(resp.Content, "4") {
t.Errorf("Content = %q, expected to contain '4'", resp.Content)
}
}
func TestIntegration_RealClaudeCLI_ParsesRealJSON(t *testing.T) {
if _, err := exec.LookPath("claude"); err != nil {
t.Skip("claude CLI not found in PATH")
}
// Run claude directly and verify our parser handles real output
cmd := exec.Command("claude", "-p", "--output-format", "json",
"--dangerously-skip-permissions", "--no-chrome", "--no-session-persistence", "-")
cmd.Stdin = strings.NewReader("Say hi")
cmd.Dir = t.TempDir()
output, err := cmd.Output()
if err != nil {
t.Fatalf("claude CLI failed: %v", err)
}
t.Logf("Raw CLI output: %s", string(output))
// Verify our parser can handle real output
p := NewClaudeCliProvider("")
resp, err := p.parseClaudeCliResponse(string(output))
if err != nil {
t.Fatalf("parseClaudeCliResponse() failed on real CLI output: %v", err)
}
if resp.Content == "" {
t.Error("parsed Content is empty")
}
if resp.FinishReason != "stop" {
t.Errorf("FinishReason = %q, want stop", resp.FinishReason)
}
if resp.Usage == nil {
t.Error("Usage should not be nil")
}
t.Logf("Parsed: content=%q, finish=%s, usage=%+v", resp.Content, resp.FinishReason, resp.Usage)
}

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"runtime" "runtime"
"strings" "strings"
@@ -980,130 +979,3 @@ func TestFindMatchingBrace(t *testing.T) {
} }
} }
} }
// --- Integration test: real claude CLI ---
func TestIntegration_RealClaudeCLI(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
// Check if claude CLI is available
path, err := exec.LookPath("claude")
if err != nil {
t.Skip("claude CLI not found in PATH, skipping integration test")
}
t.Logf("Using claude CLI at: %s", path)
p := NewClaudeCliProvider(t.TempDir())
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
resp, err := p.Chat(ctx, []Message{
{Role: "user", Content: "Respond with only the word 'pong'. Nothing else."},
}, nil, "", nil)
if err != nil {
t.Fatalf("Chat() with real CLI error = %v", err)
}
// Verify response structure
if resp.Content == "" {
t.Error("Content is empty")
}
if resp.FinishReason != "stop" {
t.Errorf("FinishReason = %q, want %q", resp.FinishReason, "stop")
}
if resp.Usage == nil {
t.Error("Usage should not be nil from real CLI")
} else {
if resp.Usage.PromptTokens == 0 {
t.Error("PromptTokens should be > 0")
}
if resp.Usage.CompletionTokens == 0 {
t.Error("CompletionTokens should be > 0")
}
t.Logf("Usage: prompt=%d, completion=%d, total=%d",
resp.Usage.PromptTokens, resp.Usage.CompletionTokens, resp.Usage.TotalTokens)
}
t.Logf("Response content: %q", resp.Content)
// Loose check - should contain "pong" somewhere (model might capitalize or add punctuation)
if !strings.Contains(strings.ToLower(resp.Content), "pong") {
t.Errorf("Content = %q, expected to contain 'pong'", resp.Content)
}
}
func TestIntegration_RealClaudeCLI_WithSystemPrompt(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
if _, err := exec.LookPath("claude"); err != nil {
t.Skip("claude CLI not found in PATH")
}
p := NewClaudeCliProvider(t.TempDir())
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
resp, err := p.Chat(ctx, []Message{
{Role: "system", Content: "You are a calculator. Only respond with numbers. No text."},
{Role: "user", Content: "What is 2+2?"},
}, nil, "", nil)
if err != nil {
t.Fatalf("Chat() error = %v", err)
}
t.Logf("Response: %q", resp.Content)
if !strings.Contains(resp.Content, "4") {
t.Errorf("Content = %q, expected to contain '4'", resp.Content)
}
}
func TestIntegration_RealClaudeCLI_ParsesRealJSON(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
if _, err := exec.LookPath("claude"); err != nil {
t.Skip("claude CLI not found in PATH")
}
// Run claude directly and verify our parser handles real output
cmd := exec.Command("claude", "-p", "--output-format", "json",
"--dangerously-skip-permissions", "--no-chrome", "--no-session-persistence", "-")
cmd.Stdin = strings.NewReader("Say hi")
cmd.Dir = t.TempDir()
output, err := cmd.Output()
if err != nil {
t.Fatalf("claude CLI failed: %v", err)
}
t.Logf("Raw CLI output: %s", string(output))
// Verify our parser can handle real output
p := NewClaudeCliProvider("")
resp, err := p.parseClaudeCliResponse(string(output))
if err != nil {
t.Fatalf("parseClaudeCliResponse() failed on real CLI output: %v", err)
}
if resp.Content == "" {
t.Error("parsed Content is empty")
}
if resp.FinishReason != "stop" {
t.Errorf("FinishReason = %q, want stop", resp.FinishReason)
}
if resp.Usage == nil {
t.Error("Usage should not be nil")
}
t.Logf("Parsed: content=%q, finish=%s, usage=%+v", resp.Content, resp.FinishReason, resp.Usage)
}