Files
picoclaw/pkg/agent/loop.go
Danieldd28 07e624c8da feat: implement dynamic context compression for efficient memory usage
- Added Summary field to Session struct
- Implemented background summarization when history > 20 messages
- Included conversation summary in system prompt for long-term context
- Added thread-safety for concurrent summarization per session
2026-02-10 01:25:46 +07:00

268 lines
6.8 KiB
Go

// PicoClaw - Ultra-lightweight personal AI agent
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
// License: MIT
//
// Copyright (c) 2026 PicoClaw contributors
package agent
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/session"
"github.com/sipeed/picoclaw/pkg/tools"
)
type AgentLoop struct {
bus *bus.MessageBus
provider providers.LLMProvider
workspace string
model string
maxIterations int
sessions *session.SessionManager
contextBuilder *ContextBuilder
tools *tools.ToolRegistry
running bool
summarizing sync.Map
}
func NewAgentLoop(cfg *config.Config, bus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop {
workspace := cfg.WorkspacePath()
os.MkdirAll(workspace, 0755)
toolsRegistry := tools.NewToolRegistry()
toolsRegistry.Register(&tools.ReadFileTool{})
toolsRegistry.Register(&tools.WriteFileTool{})
toolsRegistry.Register(&tools.ListDirTool{})
toolsRegistry.Register(tools.NewExecTool(workspace))
braveAPIKey := cfg.Tools.Web.Search.APIKey
toolsRegistry.Register(tools.NewWebSearchTool(braveAPIKey, cfg.Tools.Web.Search.MaxResults))
toolsRegistry.Register(tools.NewWebFetchTool(50000))
sessionsManager := session.NewSessionManager(filepath.Join(filepath.Dir(cfg.WorkspacePath()), "sessions"))
return &AgentLoop{
bus: bus,
provider: provider,
workspace: workspace,
model: cfg.Agents.Defaults.Model,
maxIterations: cfg.Agents.Defaults.MaxToolIterations,
sessions: sessionsManager,
contextBuilder: NewContextBuilder(workspace),
tools: toolsRegistry,
running: false,
summarizing: sync.Map{},
}
}
func (al *AgentLoop) Run(ctx context.Context) error {
al.running = true
for al.running {
select {
case <-ctx.Done():
return nil
default:
msg, ok := al.bus.ConsumeInbound(ctx)
if !ok {
continue
}
response, err := al.processMessage(ctx, msg)
if err != nil {
response = fmt.Sprintf("Error processing message: %v", err)
}
if response != "" {
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: msg.Channel,
ChatID: msg.ChatID,
Content: response,
})
}
}
}
return nil
}
func (al *AgentLoop) Stop() {
al.running = false
}
func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey string) (string, error) {
msg := bus.InboundMessage{
Channel: "cli",
SenderID: "user",
ChatID: "direct",
Content: content,
SessionKey: sessionKey,
}
return al.processMessage(ctx, msg)
}
func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
history := al.sessions.GetHistory(msg.SessionKey)
summary := al.sessions.GetSummary(msg.SessionKey)
messages := al.contextBuilder.BuildMessages(
history,
summary,
msg.Content,
nil,
)
iteration := 0
var finalContent string
for iteration < al.maxIterations {
iteration++
toolDefs := al.tools.GetDefinitions()
providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs))
for _, td := range toolDefs {
providerToolDefs = append(providerToolDefs, providers.ToolDefinition{
Type: td["type"].(string),
Function: providers.ToolFunctionDefinition{
Name: td["function"].(map[string]interface{})["name"].(string),
Description: td["function"].(map[string]interface{})["description"].(string),
Parameters: td["function"].(map[string]interface{})["parameters"].(map[string]interface{}),
},
})
}
response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{
"max_tokens": 8192,
"temperature": 0.7,
})
if err != nil {
return "", fmt.Errorf("LLM call failed: %w", err)
}
if len(response.ToolCalls) == 0 {
finalContent = response.Content
break
}
assistantMsg := providers.Message{
Role: "assistant",
Content: response.Content,
}
for _, tc := range response.ToolCalls {
argumentsJSON, _ := json.Marshal(tc.Arguments)
assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{
ID: tc.ID,
Type: "function",
Function: &providers.FunctionCall{
Name: tc.Name,
Arguments: string(argumentsJSON),
},
})
}
messages = append(messages, assistantMsg)
for _, tc := range response.ToolCalls {
result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments)
if err != nil {
result = fmt.Sprintf("Error: %v", err)
}
toolResultMsg := providers.Message{
Role: "tool",
Content: result,
ToolCallID: tc.ID,
}
messages = append(messages, toolResultMsg)
}
}
if finalContent == "" {
finalContent = "I've completed processing but have no response to give."
}
al.sessions.AddMessage(msg.SessionKey, "user", msg.Content)
al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent)
// Context compression logic
newHistory := al.sessions.GetHistory(msg.SessionKey)
if len(newHistory) > 20 {
if _, loading := al.summarizing.LoadOrStore(msg.SessionKey, true); !loading {
go func() {
defer al.summarizing.Delete(msg.SessionKey)
al.summarizeSession(msg.SessionKey)
}()
}
}
al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey))
return finalContent, nil
}
func (al *AgentLoop) summarizeSession(sessionKey string) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
history := al.sessions.GetHistory(sessionKey)
summary := al.sessions.GetSummary(sessionKey)
// Keep last 4 messages, summarize the rest
if len(history) <= 4 {
return
}
toSummarize := history[:len(history)-4]
prompt := "Below is a conversation history and an optional existing summary. " +
"Please provide a concise summary of the conversation so far, " +
"preserving the core context and key points discussed. " +
"If there's an existing summary, incorporate it into the new one.\n\n"
if summary != "" {
prompt += "EXISTING SUMMARY: " + summary + "\n\n"
}
prompt += "CONVERSATION TO SUMMARIZE:\n"
for _, m := range toSummarize {
if m.Role == "user" || m.Role == "assistant" {
prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content)
}
}
messages := []providers.Message{
{
Role: "user",
Content: prompt,
},
}
response, err := al.provider.Chat(ctx, messages, nil, al.model, map[string]interface{}{
"max_tokens": 1024,
"temperature": 0.3,
})
if err != nil {
fmt.Printf("Error summarizing session %s: %v\n", sessionKey, err)
return
}
if response.Content != "" {
al.sessions.SetSummary(sessionKey, response.Content)
al.sessions.TruncateHistory(sessionKey, 4)
al.sessions.Save(al.sessions.GetOrCreate(sessionKey))
}
}