diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 05c8dc1..222d46a 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -27,6 +27,7 @@ type AgentLoop struct { provider providers.LLMProvider workspace string model string + contextWindow int maxIterations int sessions *session.SessionManager contextBuilder *ContextBuilder @@ -56,6 +57,7 @@ func NewAgentLoop(cfg *config.Config, bus *bus.MessageBus, provider providers.LL provider: provider, workspace: workspace, model: cfg.Agents.Defaults.Model, + contextWindow: cfg.Agents.Defaults.MaxTokens, maxIterations: cfg.Agents.Defaults.MaxToolIterations, sessions: sessionsManager, contextBuilder: NewContextBuilder(workspace), @@ -198,7 +200,13 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) // Context compression logic newHistory := al.sessions.GetHistory(msg.SessionKey) - if len(newHistory) > 20 { + + // Token Awareness (Dynamic) + // Trigger if history > 20 messages OR estimated tokens > 75% of context window + tokenEstimate := al.estimateTokens(newHistory) + threshold := al.contextWindow * 75 / 100 + + if len(newHistory) > 20 || tokenEstimate > threshold { if _, loading := al.summarizing.LoadOrStore(msg.SessionKey, true); !loading { go func() { defer al.summarizing.Delete(msg.SessionKey) @@ -213,55 +221,104 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) } func (al *AgentLoop) summarizeSession(sessionKey string) { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() history := al.sessions.GetHistory(sessionKey) summary := al.sessions.GetSummary(sessionKey) - // Keep last 4 messages, summarize the rest + // Keep last 4 messages for continuity if len(history) <= 4 { return } toSummarize := history[:len(history)-4] - prompt := "Below is a conversation history and an optional existing summary. " + - "Please provide a concise summary of the conversation so far, " + - "preserving the core context and key points discussed. " + - "If there's an existing summary, incorporate it into the new one.\n\n" + // Oversized Message Guard (Dynamic) + // Skip messages larger than 50% of context window to prevent summarizer overflow. + maxMessageTokens := al.contextWindow / 2 + validMessages := make([]providers.Message, 0) + omitted := false - if summary != "" { - prompt += "EXISTING SUMMARY: " + summary + "\n\n" - } - - prompt += "CONVERSATION TO SUMMARIZE:\n" for _, m := range toSummarize { - if m.Role == "user" || m.Role == "assistant" { - prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content) + if m.Role != "user" && m.Role != "assistant" { + continue } + // Estimate tokens for this message + msgTokens := len(m.Content) / 4 + if msgTokens > maxMessageTokens { + omitted = true + continue + } + validMessages = append(validMessages, m) } - messages := []providers.Message{ - { - Role: "user", - Content: prompt, - }, - } - - response, err := al.provider.Chat(ctx, messages, nil, al.model, map[string]interface{}{ - "max_tokens": 1024, - "temperature": 0.3, - }) - - if err != nil { - fmt.Printf("Error summarizing session %s: %v\n", sessionKey, err) + if len(validMessages) == 0 { return } - if response.Content != "" { - al.sessions.SetSummary(sessionKey, response.Content) + // Multi-Part Summarization + // Split into two parts if history is significant + var finalSummary string + if len(validMessages) > 10 { + mid := len(validMessages) / 2 + part1 := validMessages[:mid] + part2 := validMessages[mid:] + + s1, _ := al.summarizeBatch(ctx, part1, "") + s2, _ := al.summarizeBatch(ctx, part2, "") + + // Merge them + mergePrompt := fmt.Sprintf("Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s", s1, s2) + resp, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: mergePrompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err == nil { + finalSummary = resp.Content + } else { + finalSummary = s1 + " " + s2 + } + } else { + finalSummary, _ = al.summarizeBatch(ctx, validMessages, summary) + } + + if omitted && finalSummary != "" { + finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]" + } + + if finalSummary != "" { + al.sessions.SetSummary(sessionKey, finalSummary) al.sessions.TruncateHistory(sessionKey, 4) al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) } } + +func (al *AgentLoop) summarizeBatch(ctx context.Context, batch []providers.Message, existingSummary string) (string, error) { + prompt := "Provide a concise summary of this conversation segment, preserving core context and key points.\n" + if existingSummary != "" { + prompt += "Existing context: " + existingSummary + "\n" + } + prompt += "\nCONVERSATION:\n" + for _, m := range batch { + prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content) + } + + response, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: prompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err != nil { + return "", err + } + return response.Content, nil +} + +func (al *AgentLoop) estimateTokens(messages []providers.Message) int { + total := 0 + for _, m := range messages { + total += len(m.Content) / 4 // Simple heuristic: 4 chars per token + } + return total +} + diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index 0260d02..1ad41f9 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -6,6 +6,7 @@ import ( "log" "regexp" "strings" + "sync" "time" tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" @@ -17,11 +18,13 @@ import ( type TelegramChannel struct { *BaseChannel - bot *tgbotapi.BotAPI - config config.TelegramConfig - chatIDs map[string]int64 - updates tgbotapi.UpdatesChannel - transcriber *voice.GroqTranscriber + bot *tgbotapi.BotAPI + config config.TelegramConfig + chatIDs map[string]int64 + updates tgbotapi.UpdatesChannel + transcriber *voice.GroqTranscriber + placeholders sync.Map // chatID -> messageID + stopThinking sync.Map // chatID -> chan struct{} } func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*TelegramChannel, error) { @@ -33,11 +36,13 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr base := NewBaseChannel("telegram", cfg, bus, cfg.AllowFrom) return &TelegramChannel{ - BaseChannel: base, - bot: bot, - config: cfg, - chatIDs: make(map[string]int64), - transcriber: nil, + BaseChannel: base, + bot: bot, + config: cfg, + chatIDs: make(map[string]int64), + transcriber: nil, + placeholders: sync.Map{}, + stopThinking: sync.Map{}, }, nil } @@ -104,8 +109,26 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err return fmt.Errorf("invalid chat ID: %w", err) } + // Stop thinking animation + if stop, ok := c.stopThinking.Load(msg.ChatID); ok { + close(stop.(chan struct{})) + c.stopThinking.Delete(msg.ChatID) + } + htmlContent := markdownToTelegramHTML(msg.Content) + // Try to edit placeholder + if pID, ok := c.placeholders.Load(msg.ChatID); ok { + c.placeholders.Delete(msg.ChatID) + editMsg := tgbotapi.NewEditMessageText(chatID, pID.(int), htmlContent) + editMsg.ParseMode = tgbotapi.ModeHTML + + if _, err := c.bot.Send(editMsg); err == nil { + return nil + } + // Fallback to new message if edit fails + } + tgMsg := tgbotapi.NewMessage(chatID, htmlContent) tgMsg.ParseMode = tgbotapi.ModeHTML @@ -222,6 +245,37 @@ func (c *TelegramChannel) handleMessage(update tgbotapi.Update) { log.Printf("Telegram message from %s: %s...", senderID, truncateString(content, 50)) + // Thinking indicator + c.bot.Send(tgbotapi.NewChatAction(chatID, tgbotapi.ChatTyping)) + + stopChan := make(chan struct{}) + c.stopThinking.Store(fmt.Sprintf("%d", chatID), stopChan) + + pMsg, err := c.bot.Send(tgbotapi.NewMessage(chatID, "Thinking... 💭")) + if err == nil { + pID := pMsg.MessageID + c.placeholders.Store(fmt.Sprintf("%d", chatID), pID) + + go func(cid int64, mid int, stop <-chan struct{}) { + dots := []string{".", "..", "..."} + emotes := []string{"💭", "🤔", "☁️"} + i := 0 + ticker := time.NewTicker(2000 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-stop: + return + case <-ticker.C: + i++ + text := fmt.Sprintf("Thinking%s %s", dots[i%len(dots)], emotes[i%len(emotes)]) + edit := tgbotapi.NewEditMessageText(cid, mid, text) + c.bot.Send(edit) + } + } + }(chatID, pID, stopChan) + } + metadata := map[string]string{ "message_id": fmt.Sprintf("%d", message.MessageID), "user_id": fmt.Sprintf("%d", user.ID),