diff --git a/cmd/picoclaw/main.go b/cmd/picoclaw/main.go index 0b13bc7..c14ec58 100644 --- a/cmd/picoclaw/main.go +++ b/cmd/picoclaw/main.go @@ -552,7 +552,7 @@ func gatewayCmd() { }) // Setup cron tool and service - cronService, _ := setupCronTool(agentLoop, msgBus, cfg.WorkspacePath()) + cronService := setupCronTool(agentLoop, msgBus, cfg.WorkspacePath()) heartbeatService := heartbeat.NewHeartbeatService( cfg.WorkspacePath(), @@ -690,7 +690,7 @@ func getConfigPath() string { return filepath.Join(home, ".picoclaw", "config.json") } -func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace string) (*cron.CronService, *tools.CronTool) { +func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace string) *cron.CronService { cronStorePath := filepath.Join(workspace, "cron", "jobs.json") // Create cron service @@ -706,7 +706,7 @@ func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace return result, nil }) - return cronService, cronTool + return cronService } func loadConfig() (*config.Config, error) { diff --git a/pkg/agent/context.go b/pkg/agent/context.go index d1b3397..e737fbd 100644 --- a/pkg/agent/context.go +++ b/pkg/agent/context.go @@ -11,13 +11,14 @@ import ( "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/skills" + "github.com/sipeed/picoclaw/pkg/tools" ) type ContextBuilder struct { workspace string skillsLoader *skills.SkillsLoader memory *MemoryStore - toolsSummary func() []string // Function to get tool summaries dynamically + tools *tools.ToolRegistry // Direct reference to tool registry } func getGlobalConfigDir() string { @@ -28,9 +29,9 @@ func getGlobalConfigDir() string { return filepath.Join(home, ".picoclaw") } -func NewContextBuilder(workspace string, toolsSummaryFunc func() []string) *ContextBuilder { - // builtin skills: 当前项目的 skills 目录 - // 使用当前工作目录下的 skills/ 目录 +func NewContextBuilder(workspace string) *ContextBuilder { + // builtin skills: skills directory in current project + // Use the skills/ directory under the current working directory wd, _ := os.Getwd() builtinSkillsDir := filepath.Join(wd, "skills") globalSkillsDir := filepath.Join(getGlobalConfigDir(), "skills") @@ -39,10 +40,14 @@ func NewContextBuilder(workspace string, toolsSummaryFunc func() []string) *Cont workspace: workspace, skillsLoader: skills.NewSkillsLoader(workspace, globalSkillsDir, builtinSkillsDir), memory: NewMemoryStore(workspace), - toolsSummary: toolsSummaryFunc, } } +// SetToolsRegistry sets the tools registry for dynamic tool summary generation. +func (cb *ContextBuilder) SetToolsRegistry(registry *tools.ToolRegistry) { + cb.tools = registry +} + func (cb *ContextBuilder) getIdentity() string { now := time.Now().Format("2006-01-02 15:04 (Monday)") workspacePath, _ := filepath.Abs(filepath.Join(cb.workspace)) @@ -80,11 +85,11 @@ Your workspace is at: %s } func (cb *ContextBuilder) buildToolsSection() string { - if cb.toolsSummary == nil { + if cb.tools == nil { return "" } - summaries := cb.toolsSummary() + summaries := cb.tools.GetSummaries() if len(summaries) == 0 { return "" } diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 439428a..5cdd6a7 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -13,6 +13,8 @@ import ( "os" "path/filepath" "strings" + "sync" + "time" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" @@ -27,11 +29,13 @@ type AgentLoop struct { provider providers.LLMProvider workspace string model string + contextWindow int // Maximum context window size in tokens maxIterations int sessions *session.SessionManager contextBuilder *ContextBuilder tools *tools.ToolRegistry running bool + summarizing sync.Map // Tracks which sessions are currently being summarized } func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop { @@ -71,16 +75,22 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers sessionsManager := session.NewSessionManager(filepath.Join(workspace, "sessions")) + // Create context builder and set tools registry + contextBuilder := NewContextBuilder(workspace) + contextBuilder.SetToolsRegistry(toolsRegistry) + return &AgentLoop{ bus: msgBus, provider: provider, workspace: workspace, model: cfg.Agents.Defaults.Model, + contextWindow: cfg.Agents.Defaults.MaxTokens, // Restore context window for summarization maxIterations: cfg.Agents.Defaults.MaxToolIterations, sessions: sessionsManager, - contextBuilder: NewContextBuilder(workspace, func() []string { return toolsRegistry.GetSummaries() }), + contextBuilder: contextBuilder, tools: toolsRegistry, running: false, + summarizing: sync.Map{}, } } @@ -318,6 +328,21 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent) al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) + // Context compression: Check if we need to summarize + // Trigger if history > 20 messages OR estimated tokens > 75% of context window + newHistory := al.sessions.GetHistory(msg.SessionKey) + tokenEstimate := al.estimateTokens(newHistory) + threshold := al.contextWindow * 75 / 100 + + if len(newHistory) > 20 || tokenEstimate > threshold { + if _, loading := al.summarizing.LoadOrStore(msg.SessionKey, true); !loading { + go func() { + defer al.summarizing.Delete(msg.SessionKey) + al.summarizeSession(msg.SessionKey) + }() + } + } + // Log response preview responsePreview := truncate(finalContent, 120) logger.InfoCF("agent", fmt.Sprintf("Response to %s:%s: %s", msg.Channel, msg.SenderID, responsePreview), @@ -595,3 +620,108 @@ func truncateString(s string, maxLen int) string { } return s[:maxLen-3] + "..." } + +// summarizeSession summarizes the conversation history for a session. +func (al *AgentLoop) summarizeSession(sessionKey string) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + history := al.sessions.GetHistory(sessionKey) + summary := al.sessions.GetSummary(sessionKey) + + // Keep last 4 messages for continuity + if len(history) <= 4 { + return + } + + toSummarize := history[:len(history)-4] + + // Oversized Message Guard + // Skip messages larger than 50% of context window to prevent summarizer overflow + maxMessageTokens := al.contextWindow / 2 + validMessages := make([]providers.Message, 0) + omitted := false + + for _, m := range toSummarize { + if m.Role != "user" && m.Role != "assistant" { + continue + } + // Estimate tokens for this message + msgTokens := len(m.Content) / 4 + if msgTokens > maxMessageTokens { + omitted = true + continue + } + validMessages = append(validMessages, m) + } + + if len(validMessages) == 0 { + return + } + + // Multi-Part Summarization + // Split into two parts if history is significant + var finalSummary string + if len(validMessages) > 10 { + mid := len(validMessages) / 2 + part1 := validMessages[:mid] + part2 := validMessages[mid:] + + s1, _ := al.summarizeBatch(ctx, part1, "") + s2, _ := al.summarizeBatch(ctx, part2, "") + + // Merge them + mergePrompt := fmt.Sprintf("Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s", s1, s2) + resp, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: mergePrompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err == nil { + finalSummary = resp.Content + } else { + finalSummary = s1 + " " + s2 + } + } else { + finalSummary, _ = al.summarizeBatch(ctx, validMessages, summary) + } + + if omitted && finalSummary != "" { + finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]" + } + + if finalSummary != "" { + al.sessions.SetSummary(sessionKey, finalSummary) + al.sessions.TruncateHistory(sessionKey, 4) + al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) + } +} + +// summarizeBatch summarizes a batch of messages. +func (al *AgentLoop) summarizeBatch(ctx context.Context, batch []providers.Message, existingSummary string) (string, error) { + prompt := "Provide a concise summary of this conversation segment, preserving core context and key points.\n" + if existingSummary != "" { + prompt += "Existing context: " + existingSummary + "\n" + } + prompt += "\nCONVERSATION:\n" + for _, m := range batch { + prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content) + } + + response, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: prompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err != nil { + return "", err + } + return response.Content, nil +} + +// estimateTokens estimates the number of tokens in a message list. +func (al *AgentLoop) estimateTokens(messages []providers.Message) int { + total := 0 + for _, m := range messages { + total += len(m.Content) / 4 // Simple heuristic: 4 chars per token + } + return total +} diff --git a/pkg/cron/service.go b/pkg/cron/service.go index c85ab2b..9434ed8 100644 --- a/pkg/cron/service.go +++ b/pkg/cron/service.go @@ -149,13 +149,15 @@ func (cs *CronService) checkJobs() { } // Update next run times for due jobs immediately (before executing) + // Use map for O(n) lookup instead of O(n²) nested loop + dueMap := make(map[string]bool, len(dueJobs)) + for _, job := range dueJobs { + dueMap[job.ID] = true + } for i := range cs.store.Jobs { - for _, dueJob := range dueJobs { - if cs.store.Jobs[i].ID == dueJob.ID { - // Reset NextRunAtMS temporarily so we don't re-execute - cs.store.Jobs[i].State.NextRunAtMS = nil - break - } + if dueMap[cs.store.Jobs[i].ID] { + // Reset NextRunAtMS temporarily so we don't re-execute + cs.store.Jobs[i].State.NextRunAtMS = nil } } @@ -325,6 +327,9 @@ func (cs *CronService) AddJob(name string, schedule CronSchedule, message string now := time.Now().UnixMilli() + // One-time tasks (at) should be deleted after execution + deleteAfterRun := (schedule.Kind == "at") + job := CronJob{ ID: generateID(), Name: name, @@ -342,7 +347,7 @@ func (cs *CronService) AddJob(name string, schedule CronSchedule, message string }, CreatedAtMS: now, UpdatedAtMS: now, - DeleteAfterRun: false, + DeleteAfterRun: deleteAfterRun, } cs.store.Jobs = append(cs.store.Jobs, job) diff --git a/pkg/utils/string.go b/pkg/utils/string.go new file mode 100644 index 0000000..0d9837c --- /dev/null +++ b/pkg/utils/string.go @@ -0,0 +1,16 @@ +package utils + +// Truncate returns a truncated version of s with at most maxLen runes. +// Handles multi-byte Unicode characters properly. +// If the string is truncated, "..." is appended to indicate truncation. +func Truncate(s string, maxLen int) string { + runes := []rune(s) + if len(runes) <= maxLen { + return s + } + // Reserve 3 chars for "..." + if maxLen <= 3 { + return string(runes[:maxLen]) + } + return string(runes[:maxLen-3]) + "..." +}