From 4dfa133cb83a068c67a99d4086b53c109b8c5e08 Mon Sep 17 00:00:00 2001 From: yinwm Date: Fri, 13 Feb 2026 11:13:32 +0800 Subject: [PATCH] refactor(heartbeat): add configurable interval and channel-aware routing feat(config): add heartbeat interval configuration with default 30 minutes feat(state): migrate state file from workspace root to state directory feat(channels): skip internal channels in outbound dispatcher feat(agent): record last active channel for heartbeat context refactor(subagent): use configurable default model instead of provider default --- cmd/picoclaw/main.go | 10 +++++--- pkg/agent/loop.go | 25 ++++++++++++++++--- pkg/channels/manager.go | 8 ++++++ pkg/config/config.go | 6 +++-- pkg/heartbeat/service.go | 44 +++++++++++++++++++++++++++------ pkg/heartbeat/service_test.go | 8 +++--- pkg/state/state.go | 18 +++++++++++--- pkg/tools/subagent.go | 30 +++++++++++----------- pkg/tools/subagent_tool_test.go | 18 +++++++------- 9 files changed, 120 insertions(+), 47 deletions(-) diff --git a/cmd/picoclaw/main.go b/cmd/picoclaw/main.go index 552b370..c9d8e61 100644 --- a/cmd/picoclaw/main.go +++ b/cmd/picoclaw/main.go @@ -654,12 +654,16 @@ func gatewayCmd() { heartbeatService := heartbeat.NewHeartbeatService( cfg.WorkspacePath(), - 30, + cfg.Heartbeat.Interval, cfg.Heartbeat.Enabled, ) heartbeatService.SetBus(msgBus) - heartbeatService.SetHandler(func(prompt string) *tools.ToolResult { - response, err := agentLoop.ProcessDirect(context.Background(), prompt, "heartbeat") + heartbeatService.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult { + // Use cli:direct as fallback if no valid channel + if channel == "" || chatID == "" { + channel, chatID = "cli", "direct" + } + response, err := agentLoop.ProcessDirectWithChannel(context.Background(), prompt, "heartbeat", channel, chatID) if err != nil { return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err)) } diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 3f3286d..bf721cf 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -82,7 +82,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers toolsRegistry.Register(messageTool) // Register spawn tool - subagentManager := tools.NewSubagentManager(provider, workspace, msgBus) + subagentManager := tools.NewSubagentManager(provider, cfg.Agents.Defaults.Model, workspace, msgBus) spawnTool := tools.NewSpawnTool(subagentManager) toolsRegistry.Register(spawnTool) @@ -187,9 +187,14 @@ func (al *AgentLoop) ProcessDirectWithChannel(ctx context.Context, content, sess } func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { - // Add message preview to log - preview := utils.Truncate(msg.Content, 80) - logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, preview), + // Add message preview to log (show full content for error messages) + var logContent string + if strings.Contains(msg.Content, "Error:") || strings.Contains(msg.Content, "error") { + logContent = msg.Content // Full content for errors + } else { + logContent = utils.Truncate(msg.Content, 80) + } + logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, logContent), map[string]interface{}{ "channel": msg.Channel, "chat_id": msg.ChatID, @@ -255,6 +260,18 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe // runAgentLoop is the core message processing logic. // It handles context building, LLM calls, tool execution, and response handling. func (al *AgentLoop) runAgentLoop(ctx context.Context, opts processOptions) (string, error) { + // 0. Record last channel for heartbeat notifications (skip internal channels) + if opts.Channel != "" && opts.ChatID != "" { + // Don't record internal channels (cli, system, subagent) + internalChannels := map[string]bool{"cli": true, "system": true, "subagent": true} + if !internalChannels[opts.Channel] { + channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID) + if err := al.RecordLastChannel(channelKey); err != nil { + logger.WarnCF("agent", "Failed to record last channel: %v", map[string]interface{}{"error": err.Error()}) + } + } + } + // 1. Update tool contexts al.updateToolContexts(opts.Channel, opts.ChatID) diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index b0e1416..06e746c 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -218,6 +218,9 @@ func (m *Manager) StopAll(ctx context.Context) error { func (m *Manager) dispatchOutbound(ctx context.Context) { logger.InfoC("channels", "Outbound dispatcher started") + // Internal channels that don't have actual handlers + internalChannels := map[string]bool{"cli": true, "system": true, "subagent": true} + for { select { case <-ctx.Done(): @@ -229,6 +232,11 @@ func (m *Manager) dispatchOutbound(ctx context.Context) { continue } + // Silently skip internal channels + if internalChannels[msg.Channel] { + continue + } + m.mu.RLock() channel, exists := m.channels[msg.Channel] m.mu.RUnlock() diff --git a/pkg/config/config.go b/pkg/config/config.go index 0f63902..197b959 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -135,7 +135,8 @@ type SlackConfig struct { } type HeartbeatConfig struct { - Enabled bool `json:"enabled" env:"PICOCLAW_HEARTBEAT_ENABLED"` + Enabled bool `json:"enabled" env:"PICOCLAW_HEARTBEAT_ENABLED"` + Interval int `json:"interval" env:"PICOCLAW_HEARTBEAT_INTERVAL"` // minutes, min 5 } type ProvidersConfig struct { @@ -261,7 +262,8 @@ func DefaultConfig() *Config { }, }, Heartbeat: HeartbeatConfig{ - Enabled: true, + Enabled: true, + Interval: 30, // default 30 minutes }, } } diff --git a/pkg/heartbeat/service.go b/pkg/heartbeat/service.go index ce8605b..c7e0fdf 100644 --- a/pkg/heartbeat/service.go +++ b/pkg/heartbeat/service.go @@ -27,7 +27,8 @@ const ( // HeartbeatHandler is the function type for handling heartbeat. // It returns a ToolResult that can indicate async operations. -type HeartbeatHandler func(prompt string) *tools.ToolResult +// channel and chatID are derived from the last active user channel. +type HeartbeatHandler func(prompt, channel, chatID string) *tools.ToolResult // HeartbeatService manages periodic heartbeat checks type HeartbeatService struct { @@ -168,7 +169,11 @@ func (hs *HeartbeatService) executeHeartbeat() { return } - result := handler(prompt) + // Get last channel info for context + lastChannel := hs.state.GetLastChannel() + channel, chatID := hs.parseLastChannel(lastChannel) + + result := handler(prompt, channel, chatID) if result == nil { hs.logInfo("Heartbeat handler returned nil result") @@ -287,13 +292,12 @@ func (hs *HeartbeatService) sendResponse(response string) { return } - // Parse channel format: "platform:user_id" (e.g., "telegram:123456") - parts := strings.SplitN(lastChannel, ":", 2) - if len(parts) != 2 || parts[0] == "" || parts[1] == "" { - hs.logError("Invalid last channel format: %s", lastChannel) + platform, userID := hs.parseLastChannel(lastChannel) + + // Skip internal channels that can't receive messages + if platform == "" || userID == "" { return } - platform, userID := parts[0], parts[1] msgBus.PublishOutbound(bus.OutboundMessage{ Channel: platform, @@ -304,6 +308,32 @@ func (hs *HeartbeatService) sendResponse(response string) { hs.logInfo("Heartbeat result sent to %s", platform) } +// parseLastChannel parses the last channel string into platform and userID. +// Returns empty strings for invalid or internal channels. +func (hs *HeartbeatService) parseLastChannel(lastChannel string) (platform, userID string) { + if lastChannel == "" { + return "", "" + } + + // Parse channel format: "platform:user_id" (e.g., "telegram:123456") + parts := strings.SplitN(lastChannel, ":", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + hs.logError("Invalid last channel format: %s", lastChannel) + return "", "" + } + + platform, userID = parts[0], parts[1] + + // Skip internal channels + internalChannels := map[string]bool{"cli": true, "system": true, "subagent": true} + if internalChannels[platform] { + hs.logInfo("Skipping internal channel: %s", platform) + return "", "" + } + + return platform, userID +} + // logInfo logs an informational message to the heartbeat log func (hs *HeartbeatService) logInfo(format string, args ...any) { hs.log("INFO", format, args...) diff --git a/pkg/heartbeat/service_test.go b/pkg/heartbeat/service_test.go index 3e55d94..d7aed15 100644 --- a/pkg/heartbeat/service_test.go +++ b/pkg/heartbeat/service_test.go @@ -28,7 +28,7 @@ func TestExecuteHeartbeat_Async(t *testing.T) { Async: true, } - hs.SetHandler(func(prompt string) *tools.ToolResult { + hs.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult { asyncCalled = true if prompt == "" { t.Error("Expected non-empty prompt") @@ -57,7 +57,7 @@ func TestExecuteHeartbeat_Error(t *testing.T) { hs := NewHeartbeatService(tmpDir, 30, true) hs.started = true // Enable for testing - hs.SetHandler(func(prompt string) *tools.ToolResult { + hs.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult { return &tools.ToolResult{ ForLLM: "Heartbeat failed: connection error", ForUser: "", @@ -95,7 +95,7 @@ func TestExecuteHeartbeat_Silent(t *testing.T) { hs := NewHeartbeatService(tmpDir, 30, true) hs.started = true // Enable for testing - hs.SetHandler(func(prompt string) *tools.ToolResult { + hs.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult { return &tools.ToolResult{ ForLLM: "Heartbeat completed successfully", ForUser: "", @@ -169,7 +169,7 @@ func TestExecuteHeartbeat_NilResult(t *testing.T) { hs := NewHeartbeatService(tmpDir, 30, true) hs.started = true // Enable for testing - hs.SetHandler(func(prompt string) *tools.ToolResult { + hs.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult { return nil }) diff --git a/pkg/state/state.go b/pkg/state/state.go index 5c2cd98..0bb9cd4 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -35,6 +35,7 @@ type Manager struct { func NewManager(workspace string) *Manager { stateDir := filepath.Join(workspace, "state") stateFile := filepath.Join(stateDir, "state.json") + oldStateFile := filepath.Join(workspace, "state.json") // Create state directory if it doesn't exist os.MkdirAll(stateDir, 0755) @@ -45,10 +46,19 @@ func NewManager(workspace string) *Manager { state: &State{}, } - // Load existing state if available - if err := sm.load(); err != nil { - // Log warning but continue with empty state - log.Printf("[WARN] state: failed to load state file, starting fresh: %v", err) + // Try to load from new location first + if _, err := os.Stat(stateFile); os.IsNotExist(err) { + // New file doesn't exist, try migrating from old location + if data, err := os.ReadFile(oldStateFile); err == nil { + if err := json.Unmarshal(data, sm.state); err == nil { + // Migrate to new location + sm.saveAtomic() + log.Printf("[INFO] state: migrated state from %s to %s", oldStateFile, stateFile) + } + } + } else { + // Load from new location + sm.load() } return sm diff --git a/pkg/tools/subagent.go b/pkg/tools/subagent.go index 1e398a1..c7de9fe 100644 --- a/pkg/tools/subagent.go +++ b/pkg/tools/subagent.go @@ -22,21 +22,23 @@ type SubagentTask struct { } type SubagentManager struct { - tasks map[string]*SubagentTask - mu sync.RWMutex - provider providers.LLMProvider - bus *bus.MessageBus - workspace string - nextID int + tasks map[string]*SubagentTask + mu sync.RWMutex + provider providers.LLMProvider + defaultModel string + bus *bus.MessageBus + workspace string + nextID int } -func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *bus.MessageBus) *SubagentManager { +func NewSubagentManager(provider providers.LLMProvider, defaultModel, workspace string, bus *bus.MessageBus) *SubagentManager { return &SubagentManager{ - tasks: make(map[string]*SubagentTask), - provider: provider, - bus: bus, - workspace: workspace, - nextID: 1, + tasks: make(map[string]*SubagentTask), + provider: provider, + defaultModel: defaultModel, + bus: bus, + workspace: workspace, + nextID: 1, } } @@ -93,7 +95,7 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask, call default: } - response, err := sm.provider.Chat(ctx, messages, nil, sm.provider.GetDefaultModel(), map[string]interface{}{ + response, err := sm.provider.Chat(ctx, messages, nil, sm.defaultModel, map[string]interface{}{ "max_tokens": 4096, }) @@ -237,7 +239,7 @@ func (t *SubagentTool) Execute(ctx context.Context, args map[string]interface{}) }, } - response, err := t.manager.provider.Chat(ctx, messages, nil, t.manager.provider.GetDefaultModel(), map[string]interface{}{ + response, err := t.manager.provider.Chat(ctx, messages, nil, t.manager.defaultModel, map[string]interface{}{ "max_tokens": 4096, }) diff --git a/pkg/tools/subagent_tool_test.go b/pkg/tools/subagent_tool_test.go index 6275c07..8a7d22f 100644 --- a/pkg/tools/subagent_tool_test.go +++ b/pkg/tools/subagent_tool_test.go @@ -39,7 +39,7 @@ func (m *MockLLMProvider) GetContextWindow() int { // TestSubagentTool_Name verifies tool name func TestSubagentTool_Name(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) tool := NewSubagentTool(manager) if tool.Name() != "subagent" { @@ -50,7 +50,7 @@ func TestSubagentTool_Name(t *testing.T) { // TestSubagentTool_Description verifies tool description func TestSubagentTool_Description(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) tool := NewSubagentTool(manager) desc := tool.Description() @@ -65,7 +65,7 @@ func TestSubagentTool_Description(t *testing.T) { // TestSubagentTool_Parameters verifies tool parameters schema func TestSubagentTool_Parameters(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) tool := NewSubagentTool(manager) params := tool.Parameters() @@ -115,7 +115,7 @@ func TestSubagentTool_Parameters(t *testing.T) { // TestSubagentTool_SetContext verifies context setting func TestSubagentTool_SetContext(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) tool := NewSubagentTool(manager) tool.SetContext("test-channel", "test-chat") @@ -129,7 +129,7 @@ func TestSubagentTool_SetContext(t *testing.T) { func TestSubagentTool_Execute_Success(t *testing.T) { provider := &MockLLMProvider{} msgBus := bus.NewMessageBus() - manager := NewSubagentManager(provider, "/tmp/test", msgBus) + manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus) tool := NewSubagentTool(manager) tool.SetContext("telegram", "chat-123") @@ -185,7 +185,7 @@ func TestSubagentTool_Execute_Success(t *testing.T) { func TestSubagentTool_Execute_NoLabel(t *testing.T) { provider := &MockLLMProvider{} msgBus := bus.NewMessageBus() - manager := NewSubagentManager(provider, "/tmp/test", msgBus) + manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus) tool := NewSubagentTool(manager) ctx := context.Background() @@ -208,7 +208,7 @@ func TestSubagentTool_Execute_NoLabel(t *testing.T) { // TestSubagentTool_Execute_MissingTask tests error handling for missing task func TestSubagentTool_Execute_MissingTask(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) tool := NewSubagentTool(manager) ctx := context.Background() @@ -259,7 +259,7 @@ func TestSubagentTool_Execute_NilManager(t *testing.T) { func TestSubagentTool_Execute_ContextPassing(t *testing.T) { provider := &MockLLMProvider{} msgBus := bus.NewMessageBus() - manager := NewSubagentManager(provider, "/tmp/test", msgBus) + manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus) tool := NewSubagentTool(manager) // Set context @@ -288,7 +288,7 @@ func TestSubagentTool_ForUserTruncation(t *testing.T) { // Create a mock provider that returns very long content provider := &MockLLMProvider{} msgBus := bus.NewMessageBus() - manager := NewSubagentManager(provider, "/tmp/test", msgBus) + manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus) tool := NewSubagentTool(manager) ctx := context.Background()