diff --git a/pkg/agent/context.go b/pkg/agent/context.go index 5a1a734..9ed5733 100644 --- a/pkg/agent/context.go +++ b/pkg/agent/context.go @@ -84,7 +84,7 @@ func (cb *ContextBuilder) LoadBootstrapFiles() string { return result } -func (cb *ContextBuilder) BuildMessages(history []providers.Message, currentMessage string, media []string) []providers.Message { +func (cb *ContextBuilder) BuildMessages(history []providers.Message, summary string, currentMessage string, media []string) []providers.Message { messages := []providers.Message{} systemPrompt := cb.BuildSystemPrompt() @@ -103,6 +103,10 @@ func (cb *ContextBuilder) BuildMessages(history []providers.Message, currentMess systemPrompt += "\n\n" + skillsContent } + if summary != "" { + systemPrompt += "\n\n## Summary of Previous Conversation\n\n" + summary + } + messages = append(messages, providers.Message{ Role: "system", Content: systemPrompt, diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index e23f4d1..222d46a 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -12,6 +12,8 @@ import ( "fmt" "os" "path/filepath" + "sync" + "time" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" @@ -25,11 +27,13 @@ type AgentLoop struct { provider providers.LLMProvider workspace string model string + contextWindow int maxIterations int sessions *session.SessionManager contextBuilder *ContextBuilder tools *tools.ToolRegistry running bool + summarizing sync.Map } func NewAgentLoop(cfg *config.Config, bus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop { @@ -53,11 +57,13 @@ func NewAgentLoop(cfg *config.Config, bus *bus.MessageBus, provider providers.LL provider: provider, workspace: workspace, model: cfg.Agents.Defaults.Model, + contextWindow: cfg.Agents.Defaults.MaxTokens, maxIterations: cfg.Agents.Defaults.MaxToolIterations, sessions: sessionsManager, contextBuilder: NewContextBuilder(workspace), tools: toolsRegistry, running: false, + summarizing: sync.Map{}, } } @@ -109,8 +115,12 @@ func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey stri } func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { + history := al.sessions.GetHistory(msg.SessionKey) + summary := al.sessions.GetSummary(msg.SessionKey) + messages := al.contextBuilder.BuildMessages( - al.sessions.GetHistory(msg.SessionKey), + history, + summary, msg.Content, nil, ) @@ -187,7 +197,128 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) al.sessions.AddMessage(msg.SessionKey, "user", msg.Content) al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent) + + // Context compression logic + newHistory := al.sessions.GetHistory(msg.SessionKey) + + // Token Awareness (Dynamic) + // Trigger if history > 20 messages OR estimated tokens > 75% of context window + tokenEstimate := al.estimateTokens(newHistory) + threshold := al.contextWindow * 75 / 100 + + if len(newHistory) > 20 || tokenEstimate > threshold { + if _, loading := al.summarizing.LoadOrStore(msg.SessionKey, true); !loading { + go func() { + defer al.summarizing.Delete(msg.SessionKey) + al.summarizeSession(msg.SessionKey) + }() + } + } + al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) return finalContent, nil } + +func (al *AgentLoop) summarizeSession(sessionKey string) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + history := al.sessions.GetHistory(sessionKey) + summary := al.sessions.GetSummary(sessionKey) + + // Keep last 4 messages for continuity + if len(history) <= 4 { + return + } + + toSummarize := history[:len(history)-4] + + // Oversized Message Guard (Dynamic) + // Skip messages larger than 50% of context window to prevent summarizer overflow. + maxMessageTokens := al.contextWindow / 2 + validMessages := make([]providers.Message, 0) + omitted := false + + for _, m := range toSummarize { + if m.Role != "user" && m.Role != "assistant" { + continue + } + // Estimate tokens for this message + msgTokens := len(m.Content) / 4 + if msgTokens > maxMessageTokens { + omitted = true + continue + } + validMessages = append(validMessages, m) + } + + if len(validMessages) == 0 { + return + } + + // Multi-Part Summarization + // Split into two parts if history is significant + var finalSummary string + if len(validMessages) > 10 { + mid := len(validMessages) / 2 + part1 := validMessages[:mid] + part2 := validMessages[mid:] + + s1, _ := al.summarizeBatch(ctx, part1, "") + s2, _ := al.summarizeBatch(ctx, part2, "") + + // Merge them + mergePrompt := fmt.Sprintf("Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s", s1, s2) + resp, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: mergePrompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err == nil { + finalSummary = resp.Content + } else { + finalSummary = s1 + " " + s2 + } + } else { + finalSummary, _ = al.summarizeBatch(ctx, validMessages, summary) + } + + if omitted && finalSummary != "" { + finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]" + } + + if finalSummary != "" { + al.sessions.SetSummary(sessionKey, finalSummary) + al.sessions.TruncateHistory(sessionKey, 4) + al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) + } +} + +func (al *AgentLoop) summarizeBatch(ctx context.Context, batch []providers.Message, existingSummary string) (string, error) { + prompt := "Provide a concise summary of this conversation segment, preserving core context and key points.\n" + if existingSummary != "" { + prompt += "Existing context: " + existingSummary + "\n" + } + prompt += "\nCONVERSATION:\n" + for _, m := range batch { + prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content) + } + + response, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: prompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err != nil { + return "", err + } + return response.Content, nil +} + +func (al *AgentLoop) estimateTokens(messages []providers.Message) int { + total := 0 + for _, m := range messages { + total += len(m.Content) / 4 // Simple heuristic: 4 chars per token + } + return total +} + diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index 0260d02..1ad41f9 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -6,6 +6,7 @@ import ( "log" "regexp" "strings" + "sync" "time" tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" @@ -17,11 +18,13 @@ import ( type TelegramChannel struct { *BaseChannel - bot *tgbotapi.BotAPI - config config.TelegramConfig - chatIDs map[string]int64 - updates tgbotapi.UpdatesChannel - transcriber *voice.GroqTranscriber + bot *tgbotapi.BotAPI + config config.TelegramConfig + chatIDs map[string]int64 + updates tgbotapi.UpdatesChannel + transcriber *voice.GroqTranscriber + placeholders sync.Map // chatID -> messageID + stopThinking sync.Map // chatID -> chan struct{} } func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*TelegramChannel, error) { @@ -33,11 +36,13 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr base := NewBaseChannel("telegram", cfg, bus, cfg.AllowFrom) return &TelegramChannel{ - BaseChannel: base, - bot: bot, - config: cfg, - chatIDs: make(map[string]int64), - transcriber: nil, + BaseChannel: base, + bot: bot, + config: cfg, + chatIDs: make(map[string]int64), + transcriber: nil, + placeholders: sync.Map{}, + stopThinking: sync.Map{}, }, nil } @@ -104,8 +109,26 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err return fmt.Errorf("invalid chat ID: %w", err) } + // Stop thinking animation + if stop, ok := c.stopThinking.Load(msg.ChatID); ok { + close(stop.(chan struct{})) + c.stopThinking.Delete(msg.ChatID) + } + htmlContent := markdownToTelegramHTML(msg.Content) + // Try to edit placeholder + if pID, ok := c.placeholders.Load(msg.ChatID); ok { + c.placeholders.Delete(msg.ChatID) + editMsg := tgbotapi.NewEditMessageText(chatID, pID.(int), htmlContent) + editMsg.ParseMode = tgbotapi.ModeHTML + + if _, err := c.bot.Send(editMsg); err == nil { + return nil + } + // Fallback to new message if edit fails + } + tgMsg := tgbotapi.NewMessage(chatID, htmlContent) tgMsg.ParseMode = tgbotapi.ModeHTML @@ -222,6 +245,37 @@ func (c *TelegramChannel) handleMessage(update tgbotapi.Update) { log.Printf("Telegram message from %s: %s...", senderID, truncateString(content, 50)) + // Thinking indicator + c.bot.Send(tgbotapi.NewChatAction(chatID, tgbotapi.ChatTyping)) + + stopChan := make(chan struct{}) + c.stopThinking.Store(fmt.Sprintf("%d", chatID), stopChan) + + pMsg, err := c.bot.Send(tgbotapi.NewMessage(chatID, "Thinking... 💭")) + if err == nil { + pID := pMsg.MessageID + c.placeholders.Store(fmt.Sprintf("%d", chatID), pID) + + go func(cid int64, mid int, stop <-chan struct{}) { + dots := []string{".", "..", "..."} + emotes := []string{"💭", "🤔", "☁️"} + i := 0 + ticker := time.NewTicker(2000 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-stop: + return + case <-ticker.C: + i++ + text := fmt.Sprintf("Thinking%s %s", dots[i%len(dots)], emotes[i%len(emotes)]) + edit := tgbotapi.NewEditMessageText(cid, mid, text) + c.bot.Send(edit) + } + } + }(chatID, pID, stopChan) + } + metadata := map[string]string{ "message_id": fmt.Sprintf("%d", message.MessageID), "user_id": fmt.Sprintf("%d", user.ID), diff --git a/pkg/session/manager.go b/pkg/session/manager.go index 9e17b30..df86724 100644 --- a/pkg/session/manager.go +++ b/pkg/session/manager.go @@ -13,6 +13,7 @@ import ( type Session struct { Key string `json:"key"` Messages []providers.Message `json:"messages"` + Summary string `json:"summary,omitempty"` Created time.Time `json:"created"` Updated time.Time `json:"updated"` } @@ -92,6 +93,45 @@ func (sm *SessionManager) GetHistory(key string) []providers.Message { return history } +func (sm *SessionManager) GetSummary(key string) string { + sm.mu.RLock() + defer sm.mu.RUnlock() + + session, ok := sm.sessions[key] + if !ok { + return "" + } + return session.Summary +} + +func (sm *SessionManager) SetSummary(key string, summary string) { + sm.mu.Lock() + defer sm.mu.Unlock() + + session, ok := sm.sessions[key] + if ok { + session.Summary = summary + session.Updated = time.Now() + } +} + +func (sm *SessionManager) TruncateHistory(key string, keepLast int) { + sm.mu.Lock() + defer sm.mu.Unlock() + + session, ok := sm.sessions[key] + if !ok { + return + } + + if len(session.Messages) <= keepLast { + return + } + + session.Messages = session.Messages[len(session.Messages)-keepLast:] + session.Updated = time.Now() +} + func (sm *SessionManager) Save(session *Session) error { if sm.storage == "" { return nil