refactor(heartbeat): simplify service with single handler and direct bus usage

- Remove redundant ChannelSender interface, use *bus.MessageBus directly
- Consolidate two handlers (onHeartbeat, onHeartbeatWithTools) into one
- Move HEARTBEAT.md and heartbeat.log to workspace root
- Simplify NewHeartbeatService signature (remove handler param)
- Add SetBus and SetHandler methods for dependency injection

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
yinwm
2026-02-13 09:51:51 +08:00
parent b59464230a
commit 8fbbb67f70
3 changed files with 127 additions and 198 deletions

View File

@@ -7,7 +7,6 @@
package heartbeat
import (
"context"
"fmt"
"os"
"path/filepath"
@@ -15,6 +14,7 @@ import (
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/state"
"github.com/sipeed/picoclaw/pkg/tools"
@@ -23,35 +23,27 @@ import (
const (
minIntervalMinutes = 5
defaultIntervalMinutes = 30
heartbeatOK = "HEARTBEAT_OK"
)
// HeartbeatHandler is the function type for handling heartbeat with tool support.
// HeartbeatHandler is the function type for handling heartbeat.
// It returns a ToolResult that can indicate async operations.
type HeartbeatHandler func(prompt string) *tools.ToolResult
// ChannelSender defines the interface for sending messages to channels.
// This is used to send heartbeat results back to the user.
type ChannelSender interface {
SendToChannel(ctx context.Context, channelName, chatID, content string) error
}
// HeartbeatService manages periodic heartbeat checks
type HeartbeatService struct {
workspace string
channelSender ChannelSender
stateManager *state.Manager
onHeartbeat func(string) (string, error)
onHeartbeatWithTools HeartbeatHandler
interval time.Duration
enabled bool
mu sync.RWMutex
started bool
stopChan chan struct{}
workspace string
bus *bus.MessageBus
state *state.Manager
handler HeartbeatHandler
interval time.Duration
enabled bool
mu sync.RWMutex
started bool
stopChan chan struct{}
}
// NewHeartbeatService creates a new heartbeat service
func NewHeartbeatService(workspace string, onHeartbeat func(string) (string, error), intervalMinutes int, enabled bool) *HeartbeatService {
func NewHeartbeatService(workspace string, intervalMinutes int, enabled bool) *HeartbeatService {
// Apply minimum interval
if intervalMinutes < minIntervalMinutes && intervalMinutes != 0 {
intervalMinutes = minIntervalMinutes
@@ -62,29 +54,26 @@ func NewHeartbeatService(workspace string, onHeartbeat func(string) (string, err
}
return &HeartbeatService{
workspace: workspace,
onHeartbeat: onHeartbeat,
interval: time.Duration(intervalMinutes) * time.Minute,
enabled: enabled,
stateManager: state.NewManager(workspace),
stopChan: make(chan struct{}),
workspace: workspace,
interval: time.Duration(intervalMinutes) * time.Minute,
enabled: enabled,
state: state.NewManager(workspace),
stopChan: make(chan struct{}),
}
}
// SetChannelSender sets the channel sender for delivering heartbeat results.
func (hs *HeartbeatService) SetChannelSender(sender ChannelSender) {
// SetBus sets the message bus for delivering heartbeat results.
func (hs *HeartbeatService) SetBus(msgBus *bus.MessageBus) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.channelSender = sender
hs.bus = msgBus
}
// SetOnHeartbeatWithTools sets the tool-supporting heartbeat handler.
// This handler returns a ToolResult that can indicate async operations.
// When set, this handler takes precedence over the legacy onHeartbeat callback.
func (hs *HeartbeatService) SetOnHeartbeatWithTools(handler HeartbeatHandler) {
// SetHandler sets the heartbeat handler.
func (hs *HeartbeatService) SetHandler(handler HeartbeatHandler) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.onHeartbeatWithTools = handler
hs.handler = handler
}
// Start begins the heartbeat service
@@ -159,8 +148,7 @@ func (hs *HeartbeatService) runLoop() {
func (hs *HeartbeatService) executeHeartbeat() {
hs.mu.RLock()
enabled := hs.enabled && hs.started
handler := hs.onHeartbeat
handlerWithTools := hs.onHeartbeatWithTools
handler := hs.handler
hs.mu.RUnlock()
if !enabled {
@@ -175,42 +163,8 @@ func (hs *HeartbeatService) executeHeartbeat() {
return
}
// Prefer the new tool-supporting handler
if handlerWithTools != nil {
hs.executeHeartbeatWithTools(prompt)
} else if handler != nil {
response, err := handler(prompt)
if err != nil {
hs.logError("Heartbeat processing error: %v", err)
return
}
// Check for HEARTBEAT_OK - completely silent response
if isHeartbeatOK(response) {
hs.logInfo("Heartbeat OK - silent")
return
}
// Non-OK response - send to last channel
hs.sendResponse(response)
}
}
// ExecuteHeartbeatWithTools executes a heartbeat using the tool-supporting handler.
// This method processes ToolResult returns and handles async tasks appropriately.
func (hs *HeartbeatService) ExecuteHeartbeatWithTools(prompt string) {
hs.executeHeartbeatWithTools(prompt)
}
// executeHeartbeatWithTools is the internal implementation of tool-supporting heartbeat.
func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) {
// Check if handler is configured (thread-safe read)
hs.mu.RLock()
handler := hs.onHeartbeatWithTools
hs.mu.RUnlock()
if handler == nil {
hs.logError("onHeartbeatWithTools handler not configured")
hs.logError("Heartbeat handler not configured")
return
}
@@ -228,7 +182,6 @@ func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) {
}
if result.Async {
// Async task started - log and return immediately
hs.logInfo("Async task started: %s", result.ForLLM)
logger.InfoCF("heartbeat", "Async heartbeat task started",
map[string]interface{}{
@@ -237,13 +190,13 @@ func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) {
return
}
// Check if silent (HEARTBEAT_OK equivalent)
// Check if silent
if result.Silent {
hs.logInfo("Heartbeat OK - silent")
return
}
// Normal completion - send result to user if available
// Send result to user
if result.ForUser != "" {
hs.sendResponse(result.ForUser)
} else if result.ForLLM != "" {
@@ -255,14 +208,11 @@ func (hs *HeartbeatService) executeHeartbeatWithTools(prompt string) {
// buildPrompt builds the heartbeat prompt from HEARTBEAT.md
func (hs *HeartbeatService) buildPrompt() string {
// Use memory directory for HEARTBEAT.md
notesDir := filepath.Join(hs.workspace, "memory")
heartbeatPath := filepath.Join(notesDir, "HEARTBEAT.md")
heartbeatPath := filepath.Join(hs.workspace, "HEARTBEAT.md")
data, err := os.ReadFile(heartbeatPath)
if err != nil {
if os.IsNotExist(err) {
// Create default HEARTBEAT.md template
hs.createDefaultHeartbeatTemplate()
return ""
}
@@ -275,9 +225,8 @@ func (hs *HeartbeatService) buildPrompt() string {
return ""
}
// Build prompt with system instructions
now := time.Now().Format("2006-01-02 15:04:05")
prompt := fmt.Sprintf(`# Heartbeat Check
return fmt.Sprintf(`# Heartbeat Check
Current time: %s
@@ -287,20 +236,11 @@ If there is nothing that requires attention, respond ONLY with: HEARTBEAT_OK
%s
`, now, content)
return prompt
}
// createDefaultHeartbeatTemplate creates the default HEARTBEAT.md file
func (hs *HeartbeatService) createDefaultHeartbeatTemplate() {
notesDir := filepath.Join(hs.workspace, "memory")
heartbeatPath := filepath.Join(notesDir, "HEARTBEAT.md")
// Ensure memory directory exists
if err := os.MkdirAll(notesDir, 0755); err != nil {
hs.logError("Failed to create memory directory: %v", err)
return
}
heartbeatPath := filepath.Join(hs.workspace, "HEARTBEAT.md")
defaultContent := `# Heartbeat Check List
@@ -332,23 +272,22 @@ Add your heartbeat tasks below this line:
// sendResponse sends the heartbeat response to the last channel
func (hs *HeartbeatService) sendResponse(response string) {
hs.mu.RLock()
sender := hs.channelSender
msgBus := hs.bus
hs.mu.RUnlock()
if sender == nil {
hs.logInfo("No channel sender configured, heartbeat result not sent")
if msgBus == nil {
hs.logInfo("No message bus configured, heartbeat result not sent")
return
}
// Get last channel from state
lastChannel := hs.stateManager.GetLastChannel()
lastChannel := hs.state.GetLastChannel()
if lastChannel == "" {
hs.logInfo("No last channel recorded, heartbeat result not sent")
return
}
// Parse channel format: "platform:user_id" (e.g., "telegram:123456")
// Use SplitN to handle user IDs that may contain special characters
parts := strings.SplitN(lastChannel, ":", 2)
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
hs.logError("Invalid last channel format: %s", lastChannel)
@@ -356,21 +295,15 @@ func (hs *HeartbeatService) sendResponse(response string) {
}
platform, userID := parts[0], parts[1]
// Send to channel
ctx := context.Background()
if err := sender.SendToChannel(ctx, platform, userID, response); err != nil {
hs.logError("Error sending to channel %s: %v", platform, err)
return
}
msgBus.PublishOutbound(bus.OutboundMessage{
Channel: platform,
ChatID: userID,
Content: response,
})
hs.logInfo("Heartbeat result sent to %s", platform)
}
// isHeartbeatOK checks if the response is HEARTBEAT_OK
func isHeartbeatOK(response string) bool {
return response == heartbeatOK
}
// logInfo logs an informational message to the heartbeat log
func (hs *HeartbeatService) logInfo(format string, args ...any) {
hs.log("INFO", format, args...)
@@ -383,11 +316,7 @@ func (hs *HeartbeatService) logError(format string, args ...any) {
// log writes a message to the heartbeat log file
func (hs *HeartbeatService) log(level, format string, args ...any) {
// Ensure memory directory exists
logDir := filepath.Join(hs.workspace, "memory")
os.MkdirAll(logDir, 0755)
logFile := filepath.Join(logDir, "heartbeat.log")
logFile := filepath.Join(hs.workspace, "heartbeat.log")
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return