feat(config): add heartbeat interval configuration with default 30 minutes feat(state): migrate state file from workspace root to state directory feat(channels): skip internal channels in outbound dispatcher feat(agent): record last active channel for heartbeat context refactor(subagent): use configurable default model instead of provider default
359 lines
8.5 KiB
Go
359 lines
8.5 KiB
Go
// PicoClaw - Ultra-lightweight personal AI agent
|
|
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
|
|
// License: MIT
|
|
//
|
|
// Copyright (c) 2026 PicoClaw contributors
|
|
|
|
package heartbeat
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
"github.com/sipeed/picoclaw/pkg/state"
|
|
"github.com/sipeed/picoclaw/pkg/tools"
|
|
)
|
|
|
|
const (
|
|
minIntervalMinutes = 5
|
|
defaultIntervalMinutes = 30
|
|
)
|
|
|
|
// HeartbeatHandler is the function type for handling heartbeat.
|
|
// It returns a ToolResult that can indicate async operations.
|
|
// channel and chatID are derived from the last active user channel.
|
|
type HeartbeatHandler func(prompt, channel, chatID string) *tools.ToolResult
|
|
|
|
// HeartbeatService manages periodic heartbeat checks
|
|
type HeartbeatService struct {
|
|
workspace string
|
|
bus *bus.MessageBus
|
|
state *state.Manager
|
|
handler HeartbeatHandler
|
|
interval time.Duration
|
|
enabled bool
|
|
mu sync.RWMutex
|
|
started bool
|
|
stopChan chan struct{}
|
|
}
|
|
|
|
// NewHeartbeatService creates a new heartbeat service
|
|
func NewHeartbeatService(workspace string, intervalMinutes int, enabled bool) *HeartbeatService {
|
|
// Apply minimum interval
|
|
if intervalMinutes < minIntervalMinutes && intervalMinutes != 0 {
|
|
intervalMinutes = minIntervalMinutes
|
|
}
|
|
|
|
if intervalMinutes == 0 {
|
|
intervalMinutes = defaultIntervalMinutes
|
|
}
|
|
|
|
return &HeartbeatService{
|
|
workspace: workspace,
|
|
interval: time.Duration(intervalMinutes) * time.Minute,
|
|
enabled: enabled,
|
|
state: state.NewManager(workspace),
|
|
stopChan: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// SetBus sets the message bus for delivering heartbeat results.
|
|
func (hs *HeartbeatService) SetBus(msgBus *bus.MessageBus) {
|
|
hs.mu.Lock()
|
|
defer hs.mu.Unlock()
|
|
hs.bus = msgBus
|
|
}
|
|
|
|
// SetHandler sets the heartbeat handler.
|
|
func (hs *HeartbeatService) SetHandler(handler HeartbeatHandler) {
|
|
hs.mu.Lock()
|
|
defer hs.mu.Unlock()
|
|
hs.handler = handler
|
|
}
|
|
|
|
// Start begins the heartbeat service
|
|
func (hs *HeartbeatService) Start() error {
|
|
hs.mu.Lock()
|
|
defer hs.mu.Unlock()
|
|
|
|
if hs.started {
|
|
logger.InfoC("heartbeat", "Heartbeat service already running")
|
|
return nil
|
|
}
|
|
|
|
if !hs.enabled {
|
|
logger.InfoC("heartbeat", "Heartbeat service disabled")
|
|
return nil
|
|
}
|
|
|
|
hs.started = true
|
|
hs.stopChan = make(chan struct{})
|
|
|
|
go hs.runLoop()
|
|
|
|
logger.InfoCF("heartbeat", "Heartbeat service started", map[string]any{
|
|
"interval_minutes": hs.interval.Minutes(),
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully stops the heartbeat service
|
|
func (hs *HeartbeatService) Stop() {
|
|
hs.mu.Lock()
|
|
defer hs.mu.Unlock()
|
|
|
|
if !hs.started {
|
|
return
|
|
}
|
|
|
|
logger.InfoC("heartbeat", "Stopping heartbeat service")
|
|
close(hs.stopChan)
|
|
hs.started = false
|
|
}
|
|
|
|
// IsRunning returns whether the service is running
|
|
func (hs *HeartbeatService) IsRunning() bool {
|
|
hs.mu.RLock()
|
|
defer hs.mu.RUnlock()
|
|
return hs.started
|
|
}
|
|
|
|
// runLoop runs the heartbeat ticker
|
|
func (hs *HeartbeatService) runLoop() {
|
|
ticker := time.NewTicker(hs.interval)
|
|
defer ticker.Stop()
|
|
|
|
// Run first heartbeat after initial delay
|
|
time.AfterFunc(time.Second, func() {
|
|
hs.executeHeartbeat()
|
|
})
|
|
|
|
for {
|
|
select {
|
|
case <-hs.stopChan:
|
|
return
|
|
case <-ticker.C:
|
|
hs.executeHeartbeat()
|
|
}
|
|
}
|
|
}
|
|
|
|
// executeHeartbeat performs a single heartbeat check
|
|
func (hs *HeartbeatService) executeHeartbeat() {
|
|
hs.mu.RLock()
|
|
enabled := hs.enabled && hs.started
|
|
handler := hs.handler
|
|
hs.mu.RUnlock()
|
|
|
|
if !enabled {
|
|
return
|
|
}
|
|
|
|
logger.DebugC("heartbeat", "Executing heartbeat")
|
|
|
|
prompt := hs.buildPrompt()
|
|
if prompt == "" {
|
|
logger.InfoC("heartbeat", "No heartbeat prompt (HEARTBEAT.md empty or missing)")
|
|
return
|
|
}
|
|
|
|
if handler == nil {
|
|
hs.logError("Heartbeat handler not configured")
|
|
return
|
|
}
|
|
|
|
// Get last channel info for context
|
|
lastChannel := hs.state.GetLastChannel()
|
|
channel, chatID := hs.parseLastChannel(lastChannel)
|
|
|
|
result := handler(prompt, channel, chatID)
|
|
|
|
if result == nil {
|
|
hs.logInfo("Heartbeat handler returned nil result")
|
|
return
|
|
}
|
|
|
|
// Handle different result types
|
|
if result.IsError {
|
|
hs.logError("Heartbeat error: %s", result.ForLLM)
|
|
return
|
|
}
|
|
|
|
if result.Async {
|
|
hs.logInfo("Async task started: %s", result.ForLLM)
|
|
logger.InfoCF("heartbeat", "Async heartbeat task started",
|
|
map[string]interface{}{
|
|
"message": result.ForLLM,
|
|
})
|
|
return
|
|
}
|
|
|
|
// Check if silent
|
|
if result.Silent {
|
|
hs.logInfo("Heartbeat OK - silent")
|
|
return
|
|
}
|
|
|
|
// Send result to user
|
|
if result.ForUser != "" {
|
|
hs.sendResponse(result.ForUser)
|
|
} else if result.ForLLM != "" {
|
|
hs.sendResponse(result.ForLLM)
|
|
}
|
|
|
|
hs.logInfo("Heartbeat completed: %s", result.ForLLM)
|
|
}
|
|
|
|
// buildPrompt builds the heartbeat prompt from HEARTBEAT.md
|
|
func (hs *HeartbeatService) buildPrompt() string {
|
|
heartbeatPath := filepath.Join(hs.workspace, "HEARTBEAT.md")
|
|
|
|
data, err := os.ReadFile(heartbeatPath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
hs.createDefaultHeartbeatTemplate()
|
|
return ""
|
|
}
|
|
hs.logError("Error reading HEARTBEAT.md: %v", err)
|
|
return ""
|
|
}
|
|
|
|
content := string(data)
|
|
if len(content) == 0 {
|
|
return ""
|
|
}
|
|
|
|
now := time.Now().Format("2006-01-02 15:04:05")
|
|
return fmt.Sprintf(`# Heartbeat Check
|
|
|
|
Current time: %s
|
|
|
|
You are a proactive AI assistant. This is a scheduled heartbeat check.
|
|
Review the following tasks and execute any necessary actions using available skills.
|
|
If there is nothing that requires attention, respond ONLY with: HEARTBEAT_OK
|
|
|
|
%s
|
|
`, now, content)
|
|
}
|
|
|
|
// createDefaultHeartbeatTemplate creates the default HEARTBEAT.md file
|
|
func (hs *HeartbeatService) createDefaultHeartbeatTemplate() {
|
|
heartbeatPath := filepath.Join(hs.workspace, "HEARTBEAT.md")
|
|
|
|
defaultContent := `# Heartbeat Check List
|
|
|
|
This file contains tasks for the heartbeat service to check periodically.
|
|
|
|
## Examples
|
|
|
|
- Check for unread messages
|
|
- Review upcoming calendar events
|
|
- Check device status (e.g., MaixCam)
|
|
|
|
## Instructions
|
|
|
|
If there's nothing that needs attention, respond with: HEARTBEAT_OK
|
|
This ensures the heartbeat runs silently when everything is fine.
|
|
|
|
---
|
|
|
|
Add your heartbeat tasks below this line:
|
|
`
|
|
|
|
if err := os.WriteFile(heartbeatPath, []byte(defaultContent), 0644); err != nil {
|
|
hs.logError("Failed to create default HEARTBEAT.md: %v", err)
|
|
} else {
|
|
hs.logInfo("Created default HEARTBEAT.md template")
|
|
}
|
|
}
|
|
|
|
// sendResponse sends the heartbeat response to the last channel
|
|
func (hs *HeartbeatService) sendResponse(response string) {
|
|
hs.mu.RLock()
|
|
msgBus := hs.bus
|
|
hs.mu.RUnlock()
|
|
|
|
if msgBus == nil {
|
|
hs.logInfo("No message bus configured, heartbeat result not sent")
|
|
return
|
|
}
|
|
|
|
// Get last channel from state
|
|
lastChannel := hs.state.GetLastChannel()
|
|
if lastChannel == "" {
|
|
hs.logInfo("No last channel recorded, heartbeat result not sent")
|
|
return
|
|
}
|
|
|
|
platform, userID := hs.parseLastChannel(lastChannel)
|
|
|
|
// Skip internal channels that can't receive messages
|
|
if platform == "" || userID == "" {
|
|
return
|
|
}
|
|
|
|
msgBus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: platform,
|
|
ChatID: userID,
|
|
Content: response,
|
|
})
|
|
|
|
hs.logInfo("Heartbeat result sent to %s", platform)
|
|
}
|
|
|
|
// parseLastChannel parses the last channel string into platform and userID.
|
|
// Returns empty strings for invalid or internal channels.
|
|
func (hs *HeartbeatService) parseLastChannel(lastChannel string) (platform, userID string) {
|
|
if lastChannel == "" {
|
|
return "", ""
|
|
}
|
|
|
|
// Parse channel format: "platform:user_id" (e.g., "telegram:123456")
|
|
parts := strings.SplitN(lastChannel, ":", 2)
|
|
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
|
|
hs.logError("Invalid last channel format: %s", lastChannel)
|
|
return "", ""
|
|
}
|
|
|
|
platform, userID = parts[0], parts[1]
|
|
|
|
// Skip internal channels
|
|
internalChannels := map[string]bool{"cli": true, "system": true, "subagent": true}
|
|
if internalChannels[platform] {
|
|
hs.logInfo("Skipping internal channel: %s", platform)
|
|
return "", ""
|
|
}
|
|
|
|
return platform, userID
|
|
}
|
|
|
|
// logInfo logs an informational message to the heartbeat log
|
|
func (hs *HeartbeatService) logInfo(format string, args ...any) {
|
|
hs.log("INFO", format, args...)
|
|
}
|
|
|
|
// logError logs an error message to the heartbeat log
|
|
func (hs *HeartbeatService) logError(format string, args ...any) {
|
|
hs.log("ERROR", format, args...)
|
|
}
|
|
|
|
// log writes a message to the heartbeat log file
|
|
func (hs *HeartbeatService) log(level, format string, args ...any) {
|
|
logFile := filepath.Join(hs.workspace, "heartbeat.log")
|
|
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer f.Close()
|
|
|
|
timestamp := time.Now().Format("2006-01-02 15:04:05")
|
|
fmt.Fprintf(f, "[%s] [%s] %s\n", timestamp, level, fmt.Sprintf(format, args...))
|
|
}
|