From 07e624c8dacc6bf630298e4a55fb9d9d8f8e62b5 Mon Sep 17 00:00:00 2001 From: Danieldd28 Date: Tue, 10 Feb 2026 01:25:46 +0700 Subject: [PATCH 1/2] feat: implement dynamic context compression for efficient memory usage - Added Summary field to Session struct - Implemented background summarization when history > 20 messages - Included conversation summary in system prompt for long-term context - Added thread-safety for concurrent summarization per session --- pkg/agent/context.go | 6 +++- pkg/agent/loop.go | 76 +++++++++++++++++++++++++++++++++++++++++- pkg/session/manager.go | 40 ++++++++++++++++++++++ 3 files changed, 120 insertions(+), 2 deletions(-) diff --git a/pkg/agent/context.go b/pkg/agent/context.go index 5a1a734..9ed5733 100644 --- a/pkg/agent/context.go +++ b/pkg/agent/context.go @@ -84,7 +84,7 @@ func (cb *ContextBuilder) LoadBootstrapFiles() string { return result } -func (cb *ContextBuilder) BuildMessages(history []providers.Message, currentMessage string, media []string) []providers.Message { +func (cb *ContextBuilder) BuildMessages(history []providers.Message, summary string, currentMessage string, media []string) []providers.Message { messages := []providers.Message{} systemPrompt := cb.BuildSystemPrompt() @@ -103,6 +103,10 @@ func (cb *ContextBuilder) BuildMessages(history []providers.Message, currentMess systemPrompt += "\n\n" + skillsContent } + if summary != "" { + systemPrompt += "\n\n## Summary of Previous Conversation\n\n" + summary + } + messages = append(messages, providers.Message{ Role: "system", Content: systemPrompt, diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index e23f4d1..05c8dc1 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -12,6 +12,8 @@ import ( "fmt" "os" "path/filepath" + "sync" + "time" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" @@ -30,6 +32,7 @@ type AgentLoop struct { contextBuilder *ContextBuilder tools *tools.ToolRegistry running bool + summarizing sync.Map } func NewAgentLoop(cfg *config.Config, bus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop { @@ -58,6 +61,7 @@ func NewAgentLoop(cfg *config.Config, bus *bus.MessageBus, provider providers.LL contextBuilder: NewContextBuilder(workspace), tools: toolsRegistry, running: false, + summarizing: sync.Map{}, } } @@ -109,8 +113,12 @@ func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey stri } func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { + history := al.sessions.GetHistory(msg.SessionKey) + summary := al.sessions.GetSummary(msg.SessionKey) + messages := al.contextBuilder.BuildMessages( - al.sessions.GetHistory(msg.SessionKey), + history, + summary, msg.Content, nil, ) @@ -187,7 +195,73 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) al.sessions.AddMessage(msg.SessionKey, "user", msg.Content) al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent) + + // Context compression logic + newHistory := al.sessions.GetHistory(msg.SessionKey) + if len(newHistory) > 20 { + if _, loading := al.summarizing.LoadOrStore(msg.SessionKey, true); !loading { + go func() { + defer al.summarizing.Delete(msg.SessionKey) + al.summarizeSession(msg.SessionKey) + }() + } + } + al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) return finalContent, nil } + +func (al *AgentLoop) summarizeSession(sessionKey string) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + history := al.sessions.GetHistory(sessionKey) + summary := al.sessions.GetSummary(sessionKey) + + // Keep last 4 messages, summarize the rest + if len(history) <= 4 { + return + } + + toSummarize := history[:len(history)-4] + + prompt := "Below is a conversation history and an optional existing summary. " + + "Please provide a concise summary of the conversation so far, " + + "preserving the core context and key points discussed. " + + "If there's an existing summary, incorporate it into the new one.\n\n" + + if summary != "" { + prompt += "EXISTING SUMMARY: " + summary + "\n\n" + } + + prompt += "CONVERSATION TO SUMMARIZE:\n" + for _, m := range toSummarize { + if m.Role == "user" || m.Role == "assistant" { + prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content) + } + } + + messages := []providers.Message{ + { + Role: "user", + Content: prompt, + }, + } + + response, err := al.provider.Chat(ctx, messages, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + + if err != nil { + fmt.Printf("Error summarizing session %s: %v\n", sessionKey, err) + return + } + + if response.Content != "" { + al.sessions.SetSummary(sessionKey, response.Content) + al.sessions.TruncateHistory(sessionKey, 4) + al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) + } +} diff --git a/pkg/session/manager.go b/pkg/session/manager.go index 9e17b30..df86724 100644 --- a/pkg/session/manager.go +++ b/pkg/session/manager.go @@ -13,6 +13,7 @@ import ( type Session struct { Key string `json:"key"` Messages []providers.Message `json:"messages"` + Summary string `json:"summary,omitempty"` Created time.Time `json:"created"` Updated time.Time `json:"updated"` } @@ -92,6 +93,45 @@ func (sm *SessionManager) GetHistory(key string) []providers.Message { return history } +func (sm *SessionManager) GetSummary(key string) string { + sm.mu.RLock() + defer sm.mu.RUnlock() + + session, ok := sm.sessions[key] + if !ok { + return "" + } + return session.Summary +} + +func (sm *SessionManager) SetSummary(key string, summary string) { + sm.mu.Lock() + defer sm.mu.Unlock() + + session, ok := sm.sessions[key] + if ok { + session.Summary = summary + session.Updated = time.Now() + } +} + +func (sm *SessionManager) TruncateHistory(key string, keepLast int) { + sm.mu.Lock() + defer sm.mu.Unlock() + + session, ok := sm.sessions[key] + if !ok { + return + } + + if len(session.Messages) <= keepLast { + return + } + + session.Messages = session.Messages[len(session.Messages)-keepLast:] + session.Updated = time.Now() +} + func (sm *SessionManager) Save(session *Session) error { if sm.storage == "" { return nil From 2df60b2fa355614e04daf40de2e56a2b56e91271 Mon Sep 17 00:00:00 2001 From: Danieldd28 Date: Tue, 10 Feb 2026 02:00:57 +0700 Subject: [PATCH 2/2] feat: make context compression dynamic and add thinking animation - Implement dynamic context window awareness for compression thresholds - Add 'Thinking' animation for Telegram channel with auto-edit response - Refactor summarization to handle multi-part batches and oversized messages --- pkg/agent/loop.go | 117 +++++++++++++++++++++++++++++---------- pkg/channels/telegram.go | 74 +++++++++++++++++++++---- 2 files changed, 151 insertions(+), 40 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 05c8dc1..222d46a 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -27,6 +27,7 @@ type AgentLoop struct { provider providers.LLMProvider workspace string model string + contextWindow int maxIterations int sessions *session.SessionManager contextBuilder *ContextBuilder @@ -56,6 +57,7 @@ func NewAgentLoop(cfg *config.Config, bus *bus.MessageBus, provider providers.LL provider: provider, workspace: workspace, model: cfg.Agents.Defaults.Model, + contextWindow: cfg.Agents.Defaults.MaxTokens, maxIterations: cfg.Agents.Defaults.MaxToolIterations, sessions: sessionsManager, contextBuilder: NewContextBuilder(workspace), @@ -198,7 +200,13 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) // Context compression logic newHistory := al.sessions.GetHistory(msg.SessionKey) - if len(newHistory) > 20 { + + // Token Awareness (Dynamic) + // Trigger if history > 20 messages OR estimated tokens > 75% of context window + tokenEstimate := al.estimateTokens(newHistory) + threshold := al.contextWindow * 75 / 100 + + if len(newHistory) > 20 || tokenEstimate > threshold { if _, loading := al.summarizing.LoadOrStore(msg.SessionKey, true); !loading { go func() { defer al.summarizing.Delete(msg.SessionKey) @@ -213,55 +221,104 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) } func (al *AgentLoop) summarizeSession(sessionKey string) { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() history := al.sessions.GetHistory(sessionKey) summary := al.sessions.GetSummary(sessionKey) - // Keep last 4 messages, summarize the rest + // Keep last 4 messages for continuity if len(history) <= 4 { return } toSummarize := history[:len(history)-4] - prompt := "Below is a conversation history and an optional existing summary. " + - "Please provide a concise summary of the conversation so far, " + - "preserving the core context and key points discussed. " + - "If there's an existing summary, incorporate it into the new one.\n\n" + // Oversized Message Guard (Dynamic) + // Skip messages larger than 50% of context window to prevent summarizer overflow. + maxMessageTokens := al.contextWindow / 2 + validMessages := make([]providers.Message, 0) + omitted := false - if summary != "" { - prompt += "EXISTING SUMMARY: " + summary + "\n\n" - } - - prompt += "CONVERSATION TO SUMMARIZE:\n" for _, m := range toSummarize { - if m.Role == "user" || m.Role == "assistant" { - prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content) + if m.Role != "user" && m.Role != "assistant" { + continue } + // Estimate tokens for this message + msgTokens := len(m.Content) / 4 + if msgTokens > maxMessageTokens { + omitted = true + continue + } + validMessages = append(validMessages, m) } - messages := []providers.Message{ - { - Role: "user", - Content: prompt, - }, - } - - response, err := al.provider.Chat(ctx, messages, nil, al.model, map[string]interface{}{ - "max_tokens": 1024, - "temperature": 0.3, - }) - - if err != nil { - fmt.Printf("Error summarizing session %s: %v\n", sessionKey, err) + if len(validMessages) == 0 { return } - if response.Content != "" { - al.sessions.SetSummary(sessionKey, response.Content) + // Multi-Part Summarization + // Split into two parts if history is significant + var finalSummary string + if len(validMessages) > 10 { + mid := len(validMessages) / 2 + part1 := validMessages[:mid] + part2 := validMessages[mid:] + + s1, _ := al.summarizeBatch(ctx, part1, "") + s2, _ := al.summarizeBatch(ctx, part2, "") + + // Merge them + mergePrompt := fmt.Sprintf("Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s", s1, s2) + resp, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: mergePrompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err == nil { + finalSummary = resp.Content + } else { + finalSummary = s1 + " " + s2 + } + } else { + finalSummary, _ = al.summarizeBatch(ctx, validMessages, summary) + } + + if omitted && finalSummary != "" { + finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]" + } + + if finalSummary != "" { + al.sessions.SetSummary(sessionKey, finalSummary) al.sessions.TruncateHistory(sessionKey, 4) al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) } } + +func (al *AgentLoop) summarizeBatch(ctx context.Context, batch []providers.Message, existingSummary string) (string, error) { + prompt := "Provide a concise summary of this conversation segment, preserving core context and key points.\n" + if existingSummary != "" { + prompt += "Existing context: " + existingSummary + "\n" + } + prompt += "\nCONVERSATION:\n" + for _, m := range batch { + prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content) + } + + response, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: prompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err != nil { + return "", err + } + return response.Content, nil +} + +func (al *AgentLoop) estimateTokens(messages []providers.Message) int { + total := 0 + for _, m := range messages { + total += len(m.Content) / 4 // Simple heuristic: 4 chars per token + } + return total +} + diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index 0260d02..1ad41f9 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -6,6 +6,7 @@ import ( "log" "regexp" "strings" + "sync" "time" tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" @@ -17,11 +18,13 @@ import ( type TelegramChannel struct { *BaseChannel - bot *tgbotapi.BotAPI - config config.TelegramConfig - chatIDs map[string]int64 - updates tgbotapi.UpdatesChannel - transcriber *voice.GroqTranscriber + bot *tgbotapi.BotAPI + config config.TelegramConfig + chatIDs map[string]int64 + updates tgbotapi.UpdatesChannel + transcriber *voice.GroqTranscriber + placeholders sync.Map // chatID -> messageID + stopThinking sync.Map // chatID -> chan struct{} } func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*TelegramChannel, error) { @@ -33,11 +36,13 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr base := NewBaseChannel("telegram", cfg, bus, cfg.AllowFrom) return &TelegramChannel{ - BaseChannel: base, - bot: bot, - config: cfg, - chatIDs: make(map[string]int64), - transcriber: nil, + BaseChannel: base, + bot: bot, + config: cfg, + chatIDs: make(map[string]int64), + transcriber: nil, + placeholders: sync.Map{}, + stopThinking: sync.Map{}, }, nil } @@ -104,8 +109,26 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err return fmt.Errorf("invalid chat ID: %w", err) } + // Stop thinking animation + if stop, ok := c.stopThinking.Load(msg.ChatID); ok { + close(stop.(chan struct{})) + c.stopThinking.Delete(msg.ChatID) + } + htmlContent := markdownToTelegramHTML(msg.Content) + // Try to edit placeholder + if pID, ok := c.placeholders.Load(msg.ChatID); ok { + c.placeholders.Delete(msg.ChatID) + editMsg := tgbotapi.NewEditMessageText(chatID, pID.(int), htmlContent) + editMsg.ParseMode = tgbotapi.ModeHTML + + if _, err := c.bot.Send(editMsg); err == nil { + return nil + } + // Fallback to new message if edit fails + } + tgMsg := tgbotapi.NewMessage(chatID, htmlContent) tgMsg.ParseMode = tgbotapi.ModeHTML @@ -222,6 +245,37 @@ func (c *TelegramChannel) handleMessage(update tgbotapi.Update) { log.Printf("Telegram message from %s: %s...", senderID, truncateString(content, 50)) + // Thinking indicator + c.bot.Send(tgbotapi.NewChatAction(chatID, tgbotapi.ChatTyping)) + + stopChan := make(chan struct{}) + c.stopThinking.Store(fmt.Sprintf("%d", chatID), stopChan) + + pMsg, err := c.bot.Send(tgbotapi.NewMessage(chatID, "Thinking... 💭")) + if err == nil { + pID := pMsg.MessageID + c.placeholders.Store(fmt.Sprintf("%d", chatID), pID) + + go func(cid int64, mid int, stop <-chan struct{}) { + dots := []string{".", "..", "..."} + emotes := []string{"💭", "🤔", "☁️"} + i := 0 + ticker := time.NewTicker(2000 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-stop: + return + case <-ticker.C: + i++ + text := fmt.Sprintf("Thinking%s %s", dots[i%len(dots)], emotes[i%len(emotes)]) + edit := tgbotapi.NewEditMessageText(cid, mid, text) + c.bot.Send(edit) + } + } + }(chatID, pID, stopChan) + } + metadata := map[string]string{ "message_id": fmt.Sprintf("%d", message.MessageID), "user_id": fmt.Sprintf("%d", user.ID),