Files
picoclaw/pkg/agent/loop.go
yinwm 0cce9fc905 refactor(agent): extract reusable tool loop and make subagents independent
Extract core LLM tool loop logic into shared RunToolLoop function that can be
used by both main agent and subagents. Subagents now run their own tool loop
with dedicated tool registry, enabling full independence.

Key changes:
- New pkg/tools/toolloop.go with reusable tool execution logic
- Subagents use message tool to communicate directly with users
- Heartbeat processing is now stateless via ProcessHeartbeat
- Simplified system message routing without result forwarding
- Shared tool registry creation for consistency between agents

This architecture follows openclaw's design where async tools notify via
bus and subagents handle their own user communication.
2026-02-13 14:39:39 +08:00

769 lines
24 KiB
Go

// PicoClaw - Ultra-lightweight personal AI agent
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
// License: MIT
//
// Copyright (c) 2026 PicoClaw contributors
package agent
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/session"
"github.com/sipeed/picoclaw/pkg/state"
"github.com/sipeed/picoclaw/pkg/tools"
"github.com/sipeed/picoclaw/pkg/utils"
)
type AgentLoop struct {
bus *bus.MessageBus
provider providers.LLMProvider
workspace string
model string
contextWindow int // Maximum context window size in tokens
maxIterations int
sessions *session.SessionManager
state *state.Manager
contextBuilder *ContextBuilder
tools *tools.ToolRegistry
running atomic.Bool
summarizing sync.Map // Tracks which sessions are currently being summarized
}
// processOptions configures how a message is processed
type processOptions struct {
SessionKey string // Session identifier for history/context
Channel string // Target channel for tool execution
ChatID string // Target chat ID for tool execution
UserMessage string // User message content (may include prefix)
DefaultResponse string // Response when LLM returns empty
EnableSummary bool // Whether to trigger summarization
SendResponse bool // Whether to send response via bus
NoHistory bool // If true, don't load session history (for heartbeat)
}
// createToolRegistry creates a tool registry with common tools.
// This is shared between main agent and subagents.
func createToolRegistry(workspace string, restrict bool, cfg *config.Config, msgBus *bus.MessageBus) *tools.ToolRegistry {
registry := tools.NewToolRegistry()
// File system tools
registry.Register(tools.NewReadFileTool(workspace, restrict))
registry.Register(tools.NewWriteFileTool(workspace, restrict))
registry.Register(tools.NewListDirTool(workspace, restrict))
registry.Register(tools.NewEditFileTool(workspace, restrict))
registry.Register(tools.NewAppendFileTool(workspace, restrict))
// Shell execution
registry.Register(tools.NewExecTool(workspace, restrict))
// Web tools
braveAPIKey := cfg.Tools.Web.Search.APIKey
registry.Register(tools.NewWebSearchTool(braveAPIKey, cfg.Tools.Web.Search.MaxResults))
registry.Register(tools.NewWebFetchTool(50000))
// Message tool - available to both agent and subagent
// Subagent uses it to communicate directly with user
messageTool := tools.NewMessageTool()
messageTool.SetSendCallback(func(channel, chatID, content string) error {
msgBus.PublishOutbound(bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: content,
})
return nil
})
registry.Register(messageTool)
return registry
}
func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop {
workspace := cfg.WorkspacePath()
os.MkdirAll(workspace, 0755)
restrict := cfg.Agents.Defaults.RestrictToWorkspace
// Create tool registry for main agent
toolsRegistry := createToolRegistry(workspace, restrict, cfg, msgBus)
// Create subagent manager with its own tool registry
subagentManager := tools.NewSubagentManager(provider, cfg.Agents.Defaults.Model, workspace, msgBus)
subagentTools := createToolRegistry(workspace, restrict, cfg, msgBus)
// Subagent doesn't need spawn/subagent tools to avoid recursion
subagentManager.SetTools(subagentTools)
// Register spawn tool (for main agent)
spawnTool := tools.NewSpawnTool(subagentManager)
toolsRegistry.Register(spawnTool)
// Register subagent tool (synchronous execution)
subagentTool := tools.NewSubagentTool(subagentManager)
toolsRegistry.Register(subagentTool)
sessionsManager := session.NewSessionManager(filepath.Join(workspace, "sessions"))
// Create state manager for atomic state persistence
stateManager := state.NewManager(workspace)
// Create context builder and set tools registry
contextBuilder := NewContextBuilder(workspace)
contextBuilder.SetToolsRegistry(toolsRegistry)
return &AgentLoop{
bus: msgBus,
provider: provider,
workspace: workspace,
model: cfg.Agents.Defaults.Model,
contextWindow: cfg.Agents.Defaults.MaxTokens, // Restore context window for summarization
maxIterations: cfg.Agents.Defaults.MaxToolIterations,
sessions: sessionsManager,
state: stateManager,
contextBuilder: contextBuilder,
tools: toolsRegistry,
summarizing: sync.Map{},
}
}
func (al *AgentLoop) Run(ctx context.Context) error {
al.running.Store(true)
for al.running.Load() {
select {
case <-ctx.Done():
return nil
default:
msg, ok := al.bus.ConsumeInbound(ctx)
if !ok {
continue
}
response, err := al.processMessage(ctx, msg)
if err != nil {
response = fmt.Sprintf("Error processing message: %v", err)
}
if response != "" {
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: msg.Channel,
ChatID: msg.ChatID,
Content: response,
})
}
}
}
return nil
}
func (al *AgentLoop) Stop() {
al.running.Store(false)
}
func (al *AgentLoop) RegisterTool(tool tools.Tool) {
al.tools.Register(tool)
}
// RecordLastChannel records the last active channel for this workspace.
// This uses the atomic state save mechanism to prevent data loss on crash.
func (al *AgentLoop) RecordLastChannel(channel string) error {
return al.state.SetLastChannel(channel)
}
// RecordLastChatID records the last active chat ID for this workspace.
// This uses the atomic state save mechanism to prevent data loss on crash.
func (al *AgentLoop) RecordLastChatID(chatID string) error {
return al.state.SetLastChatID(chatID)
}
func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey string) (string, error) {
return al.ProcessDirectWithChannel(ctx, content, sessionKey, "cli", "direct")
}
func (al *AgentLoop) ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) {
msg := bus.InboundMessage{
Channel: channel,
SenderID: "cron",
ChatID: chatID,
Content: content,
SessionKey: sessionKey,
}
return al.processMessage(ctx, msg)
}
// ProcessHeartbeat processes a heartbeat request without session history.
// Each heartbeat is independent and doesn't accumulate context.
func (al *AgentLoop) ProcessHeartbeat(ctx context.Context, content, channel, chatID string) (string, error) {
return al.runAgentLoop(ctx, processOptions{
SessionKey: "heartbeat",
Channel: channel,
ChatID: chatID,
UserMessage: content,
DefaultResponse: "I've completed processing but have no response to give.",
EnableSummary: false,
SendResponse: false,
NoHistory: true, // Don't load session history for heartbeat
})
}
func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
// Add message preview to log (show full content for error messages)
var logContent string
if strings.Contains(msg.Content, "Error:") || strings.Contains(msg.Content, "error") {
logContent = msg.Content // Full content for errors
} else {
logContent = utils.Truncate(msg.Content, 80)
}
logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, logContent),
map[string]interface{}{
"channel": msg.Channel,
"chat_id": msg.ChatID,
"sender_id": msg.SenderID,
"session_key": msg.SessionKey,
})
// Route system messages to processSystemMessage
if msg.Channel == "system" {
return al.processSystemMessage(ctx, msg)
}
// Process as user message
return al.runAgentLoop(ctx, processOptions{
SessionKey: msg.SessionKey,
Channel: msg.Channel,
ChatID: msg.ChatID,
UserMessage: msg.Content,
DefaultResponse: "I've completed processing but have no response to give.",
EnableSummary: true,
SendResponse: false,
})
}
func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
// Verify this is a system message
if msg.Channel != "system" {
return "", fmt.Errorf("processSystemMessage called with non-system message channel: %s", msg.Channel)
}
logger.InfoCF("agent", "Processing system message",
map[string]interface{}{
"sender_id": msg.SenderID,
"chat_id": msg.ChatID,
})
// Parse origin channel from chat_id (format: "channel:chat_id")
var originChannel string
if idx := strings.Index(msg.ChatID, ":"); idx > 0 {
originChannel = msg.ChatID[:idx]
} else {
// Fallback
originChannel = "cli"
}
// Extract subagent result from message content
// Format: "Task 'label' completed.\n\nResult:\n<actual content>"
content := msg.Content
if idx := strings.Index(content, "Result:\n"); idx >= 0 {
content = content[idx+8:] // Extract just the result part
}
// Skip internal channels - only log, don't send to user
internalChannels := map[string]bool{"cli": true, "system": true, "subagent": true}
if internalChannels[originChannel] {
logger.InfoCF("agent", "Subagent completed (internal channel)",
map[string]interface{}{
"sender_id": msg.SenderID,
"content_len": len(content),
"channel": originChannel,
})
return "", nil
}
// Agent acts as dispatcher only - subagent handles user interaction via message tool
// Don't forward result here, subagent should use message tool to communicate with user
logger.InfoCF("agent", "Subagent completed",
map[string]interface{}{
"sender_id": msg.SenderID,
"channel": originChannel,
"content_len": len(content),
})
// Agent only logs, does not respond to user
return "", nil
}
// runAgentLoop is the core message processing logic.
// It handles context building, LLM calls, tool execution, and response handling.
func (al *AgentLoop) runAgentLoop(ctx context.Context, opts processOptions) (string, error) {
// 0. Record last channel for heartbeat notifications (skip internal channels)
if opts.Channel != "" && opts.ChatID != "" {
// Don't record internal channels (cli, system, subagent)
internalChannels := map[string]bool{"cli": true, "system": true, "subagent": true}
if !internalChannels[opts.Channel] {
channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID)
if err := al.RecordLastChannel(channelKey); err != nil {
logger.WarnCF("agent", "Failed to record last channel: %v", map[string]interface{}{"error": err.Error()})
}
}
}
// 1. Update tool contexts
al.updateToolContexts(opts.Channel, opts.ChatID)
// 2. Build messages (skip history for heartbeat)
var history []providers.Message
var summary string
if !opts.NoHistory {
history = al.sessions.GetHistory(opts.SessionKey)
summary = al.sessions.GetSummary(opts.SessionKey)
}
messages := al.contextBuilder.BuildMessages(
history,
summary,
opts.UserMessage,
nil,
opts.Channel,
opts.ChatID,
)
// 3. Save user message to session
al.sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage)
// 4. Run LLM iteration loop
finalContent, iteration, err := al.runLLMIteration(ctx, messages, opts)
if err != nil {
return "", err
}
// If last tool had ForUser content and we already sent it, we might not need to send final response
// This is controlled by the tool's Silent flag and ForUser content
// 5. Handle empty response
if finalContent == "" {
finalContent = opts.DefaultResponse
}
// 6. Save final assistant message to session
al.sessions.AddMessage(opts.SessionKey, "assistant", finalContent)
al.sessions.Save(al.sessions.GetOrCreate(opts.SessionKey))
// 7. Optional: summarization
if opts.EnableSummary {
al.maybeSummarize(opts.SessionKey)
}
// 8. Optional: send response via bus
if opts.SendResponse {
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: finalContent,
})
}
// 9. Log response
responsePreview := utils.Truncate(finalContent, 120)
logger.InfoCF("agent", fmt.Sprintf("Response: %s", responsePreview),
map[string]interface{}{
"session_key": opts.SessionKey,
"iterations": iteration,
"final_length": len(finalContent),
})
return finalContent, nil
}
// runLLMIteration executes the LLM call loop with tool handling.
// Returns the final content, iteration count, and any error.
func (al *AgentLoop) runLLMIteration(ctx context.Context, messages []providers.Message, opts processOptions) (string, int, error) {
iteration := 0
var finalContent string
for iteration < al.maxIterations {
iteration++
logger.DebugCF("agent", "LLM iteration",
map[string]interface{}{
"iteration": iteration,
"max": al.maxIterations,
})
// Build tool definitions
toolDefs := al.tools.GetDefinitions()
providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs))
for _, td := range toolDefs {
providerToolDefs = append(providerToolDefs, providers.ToolDefinition{
Type: td["type"].(string),
Function: providers.ToolFunctionDefinition{
Name: td["function"].(map[string]interface{})["name"].(string),
Description: td["function"].(map[string]interface{})["description"].(string),
Parameters: td["function"].(map[string]interface{})["parameters"].(map[string]interface{}),
},
})
}
// Log LLM request details
logger.DebugCF("agent", "LLM request",
map[string]interface{}{
"iteration": iteration,
"model": al.model,
"messages_count": len(messages),
"tools_count": len(providerToolDefs),
"max_tokens": 8192,
"temperature": 0.7,
"system_prompt_len": len(messages[0].Content),
})
// Log full messages (detailed)
logger.DebugCF("agent", "Full LLM request",
map[string]interface{}{
"iteration": iteration,
"messages_json": formatMessagesForLog(messages),
"tools_json": formatToolsForLog(providerToolDefs),
})
// Call LLM
response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{
"max_tokens": 8192,
"temperature": 0.7,
})
if err != nil {
logger.ErrorCF("agent", "LLM call failed",
map[string]interface{}{
"iteration": iteration,
"error": err.Error(),
})
return "", iteration, fmt.Errorf("LLM call failed: %w", err)
}
// Check if no tool calls - we're done
if len(response.ToolCalls) == 0 {
finalContent = response.Content
logger.InfoCF("agent", "LLM response without tool calls (direct answer)",
map[string]interface{}{
"iteration": iteration,
"content_chars": len(finalContent),
})
break
}
// Log tool calls
toolNames := make([]string, 0, len(response.ToolCalls))
for _, tc := range response.ToolCalls {
toolNames = append(toolNames, tc.Name)
}
logger.InfoCF("agent", "LLM requested tool calls",
map[string]interface{}{
"tools": toolNames,
"count": len(response.ToolCalls),
"iteration": iteration,
})
// Build assistant message with tool calls
assistantMsg := providers.Message{
Role: "assistant",
Content: response.Content,
}
for _, tc := range response.ToolCalls {
argumentsJSON, _ := json.Marshal(tc.Arguments)
assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{
ID: tc.ID,
Type: "function",
Function: &providers.FunctionCall{
Name: tc.Name,
Arguments: string(argumentsJSON),
},
})
}
messages = append(messages, assistantMsg)
// Save assistant message with tool calls to session
al.sessions.AddFullMessage(opts.SessionKey, assistantMsg)
// Execute tool calls
for _, tc := range response.ToolCalls {
// Log tool call with arguments preview
argsJSON, _ := json.Marshal(tc.Arguments)
argsPreview := utils.Truncate(string(argsJSON), 200)
logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview),
map[string]interface{}{
"tool": tc.Name,
"iteration": iteration,
})
// Create async callback for tools that implement AsyncTool
// NOTE: Following openclaw's design, async tools do NOT send results directly to users.
// Instead, they notify the agent via PublishInbound, and the agent decides
// whether to forward the result to the user (in processSystemMessage).
asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) {
// Log the async completion but don't send directly to user
// The agent will handle user notification via processSystemMessage
if !result.Silent && result.ForUser != "" {
logger.InfoCF("agent", "Async tool completed, agent will handle notification",
map[string]interface{}{
"tool": tc.Name,
"content_len": len(result.ForUser),
})
}
}
toolResult := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, opts.Channel, opts.ChatID, asyncCallback)
// Send ForUser content to user immediately if not Silent
if !toolResult.Silent && toolResult.ForUser != "" && opts.SendResponse {
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: toolResult.ForUser,
})
logger.DebugCF("agent", "Sent tool result to user",
map[string]interface{}{
"tool": tc.Name,
"content_len": len(toolResult.ForUser),
})
}
// Determine content for LLM based on tool result
contentForLLM := toolResult.ForLLM
if contentForLLM == "" && toolResult.Err != nil {
contentForLLM = toolResult.Err.Error()
}
toolResultMsg := providers.Message{
Role: "tool",
Content: contentForLLM,
ToolCallID: tc.ID,
}
messages = append(messages, toolResultMsg)
// Save tool result message to session
al.sessions.AddFullMessage(opts.SessionKey, toolResultMsg)
}
}
return finalContent, iteration, nil
}
// updateToolContexts updates the context for tools that need channel/chatID info.
func (al *AgentLoop) updateToolContexts(channel, chatID string) {
// Use ContextualTool interface instead of type assertions
if tool, ok := al.tools.Get("message"); ok {
if mt, ok := tool.(tools.ContextualTool); ok {
mt.SetContext(channel, chatID)
}
}
if tool, ok := al.tools.Get("spawn"); ok {
if st, ok := tool.(tools.ContextualTool); ok {
st.SetContext(channel, chatID)
}
}
if tool, ok := al.tools.Get("subagent"); ok {
if st, ok := tool.(tools.ContextualTool); ok {
st.SetContext(channel, chatID)
}
}
}
// maybeSummarize triggers summarization if the session history exceeds thresholds.
func (al *AgentLoop) maybeSummarize(sessionKey string) {
newHistory := al.sessions.GetHistory(sessionKey)
tokenEstimate := al.estimateTokens(newHistory)
threshold := al.contextWindow * 75 / 100
if len(newHistory) > 20 || tokenEstimate > threshold {
if _, loading := al.summarizing.LoadOrStore(sessionKey, true); !loading {
go func() {
defer al.summarizing.Delete(sessionKey)
al.summarizeSession(sessionKey)
}()
}
}
}
// GetStartupInfo returns information about loaded tools and skills for logging.
func (al *AgentLoop) GetStartupInfo() map[string]interface{} {
info := make(map[string]interface{})
// Tools info
tools := al.tools.List()
info["tools"] = map[string]interface{}{
"count": len(tools),
"names": tools,
}
// Skills info
info["skills"] = al.contextBuilder.GetSkillsInfo()
return info
}
// formatMessagesForLog formats messages for logging
func formatMessagesForLog(messages []providers.Message) string {
if len(messages) == 0 {
return "[]"
}
var result string
result += "[\n"
for i, msg := range messages {
result += fmt.Sprintf(" [%d] Role: %s\n", i, msg.Role)
if msg.ToolCalls != nil && len(msg.ToolCalls) > 0 {
result += " ToolCalls:\n"
for _, tc := range msg.ToolCalls {
result += fmt.Sprintf(" - ID: %s, Type: %s, Name: %s\n", tc.ID, tc.Type, tc.Name)
if tc.Function != nil {
result += fmt.Sprintf(" Arguments: %s\n", utils.Truncate(tc.Function.Arguments, 200))
}
}
}
if msg.Content != "" {
content := utils.Truncate(msg.Content, 200)
result += fmt.Sprintf(" Content: %s\n", content)
}
if msg.ToolCallID != "" {
result += fmt.Sprintf(" ToolCallID: %s\n", msg.ToolCallID)
}
result += "\n"
}
result += "]"
return result
}
// formatToolsForLog formats tool definitions for logging
func formatToolsForLog(tools []providers.ToolDefinition) string {
if len(tools) == 0 {
return "[]"
}
var result string
result += "[\n"
for i, tool := range tools {
result += fmt.Sprintf(" [%d] Type: %s, Name: %s\n", i, tool.Type, tool.Function.Name)
result += fmt.Sprintf(" Description: %s\n", tool.Function.Description)
if len(tool.Function.Parameters) > 0 {
result += fmt.Sprintf(" Parameters: %s\n", utils.Truncate(fmt.Sprintf("%v", tool.Function.Parameters), 200))
}
}
result += "]"
return result
}
// summarizeSession summarizes the conversation history for a session.
func (al *AgentLoop) summarizeSession(sessionKey string) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
history := al.sessions.GetHistory(sessionKey)
summary := al.sessions.GetSummary(sessionKey)
// Keep last 4 messages for continuity
if len(history) <= 4 {
return
}
toSummarize := history[:len(history)-4]
// Oversized Message Guard
// Skip messages larger than 50% of context window to prevent summarizer overflow
maxMessageTokens := al.contextWindow / 2
validMessages := make([]providers.Message, 0)
omitted := false
for _, m := range toSummarize {
if m.Role != "user" && m.Role != "assistant" {
continue
}
// Estimate tokens for this message
msgTokens := len(m.Content) / 4
if msgTokens > maxMessageTokens {
omitted = true
continue
}
validMessages = append(validMessages, m)
}
if len(validMessages) == 0 {
return
}
// Multi-Part Summarization
// Split into two parts if history is significant
var finalSummary string
if len(validMessages) > 10 {
mid := len(validMessages) / 2
part1 := validMessages[:mid]
part2 := validMessages[mid:]
s1, _ := al.summarizeBatch(ctx, part1, "")
s2, _ := al.summarizeBatch(ctx, part2, "")
// Merge them
mergePrompt := fmt.Sprintf("Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s", s1, s2)
resp, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: mergePrompt}}, nil, al.model, map[string]interface{}{
"max_tokens": 1024,
"temperature": 0.3,
})
if err == nil {
finalSummary = resp.Content
} else {
finalSummary = s1 + " " + s2
}
} else {
finalSummary, _ = al.summarizeBatch(ctx, validMessages, summary)
}
if omitted && finalSummary != "" {
finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]"
}
if finalSummary != "" {
al.sessions.SetSummary(sessionKey, finalSummary)
al.sessions.TruncateHistory(sessionKey, 4)
al.sessions.Save(al.sessions.GetOrCreate(sessionKey))
}
}
// summarizeBatch summarizes a batch of messages.
func (al *AgentLoop) summarizeBatch(ctx context.Context, batch []providers.Message, existingSummary string) (string, error) {
prompt := "Provide a concise summary of this conversation segment, preserving core context and key points.\n"
if existingSummary != "" {
prompt += "Existing context: " + existingSummary + "\n"
}
prompt += "\nCONVERSATION:\n"
for _, m := range batch {
prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content)
}
response, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: prompt}}, nil, al.model, map[string]interface{}{
"max_tokens": 1024,
"temperature": 0.3,
})
if err != nil {
return "", err
}
return response.Content, nil
}
// estimateTokens estimates the number of tokens in a message list.
func (al *AgentLoop) estimateTokens(messages []providers.Message) int {
total := 0
for _, m := range messages {
total += len(m.Content) / 4 // Simple heuristic: 4 chars per token
}
return total
}