package channels import ( "context" "fmt" "os" "regexp" "strings" "sync" "time" "github.com/mymmrac/telego" tu "github.com/mymmrac/telego/telegoutil" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/utils" "github.com/sipeed/picoclaw/pkg/voice" ) type TelegramChannel struct { *BaseChannel bot *telego.Bot config config.TelegramConfig chatIDs map[string]int64 transcriber *voice.GroqTranscriber placeholders sync.Map // chatID -> messageID stopThinking sync.Map // chatID -> thinkingCancel } type thinkingCancel struct { fn context.CancelFunc } func (c *thinkingCancel) Cancel() { if c != nil && c.fn != nil { c.fn() } } func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*TelegramChannel, error) { bot, err := telego.NewBot(cfg.Token) if err != nil { return nil, fmt.Errorf("failed to create telegram bot: %w", err) } base := NewBaseChannel("telegram", cfg, bus, cfg.AllowFrom) return &TelegramChannel{ BaseChannel: base, bot: bot, config: cfg, chatIDs: make(map[string]int64), transcriber: nil, placeholders: sync.Map{}, stopThinking: sync.Map{}, }, nil } func (c *TelegramChannel) SetTranscriber(transcriber *voice.GroqTranscriber) { c.transcriber = transcriber } func (c *TelegramChannel) Start(ctx context.Context) error { logger.InfoC("telegram", "Starting Telegram bot (polling mode)...") updates, err := c.bot.UpdatesViaLongPolling(ctx, &telego.GetUpdatesParams{ Timeout: 30, }) if err != nil { return fmt.Errorf("failed to start long polling: %w", err) } c.setRunning(true) logger.InfoCF("telegram", "Telegram bot connected", map[string]interface{}{ "username": c.bot.Username(), }) go func() { for { select { case <-ctx.Done(): return case update, ok := <-updates: if !ok { logger.InfoC("telegram", "Updates channel closed, reconnecting...") return } if update.Message != nil { c.handleMessage(ctx, update) } } } }() return nil } func (c *TelegramChannel) Stop(ctx context.Context) error { logger.InfoC("telegram", "Stopping Telegram bot...") c.setRunning(false) return nil } func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.IsRunning() { return fmt.Errorf("telegram bot not running") } chatID, err := parseChatID(msg.ChatID) if err != nil { return fmt.Errorf("invalid chat ID: %w", err) } // Stop thinking animation if stop, ok := c.stopThinking.Load(msg.ChatID); ok { if cf, ok := stop.(*thinkingCancel); ok && cf != nil { cf.Cancel() } c.stopThinking.Delete(msg.ChatID) } htmlContent := markdownToTelegramHTML(msg.Content) // Try to edit placeholder if pID, ok := c.placeholders.Load(msg.ChatID); ok { c.placeholders.Delete(msg.ChatID) editMsg := tu.EditMessageText(tu.ID(chatID), pID.(int), htmlContent) editMsg.ParseMode = telego.ModeHTML if _, err = c.bot.EditMessageText(ctx, editMsg); err == nil { return nil } // Fallback to new message if edit fails } tgMsg := tu.Message(tu.ID(chatID), htmlContent) tgMsg.ParseMode = telego.ModeHTML if _, err = c.bot.SendMessage(ctx, tgMsg); err != nil { logger.ErrorCF("telegram", "HTML parse failed, falling back to plain text", map[string]interface{}{ "error": err.Error(), }) tgMsg.ParseMode = "" _, err = c.bot.SendMessage(ctx, tgMsg) return err } return nil } func (c *TelegramChannel) handleMessage(ctx context.Context, update telego.Update) { message := update.Message if message == nil { return } user := message.From if user == nil { return } senderID := fmt.Sprintf("%d", user.ID) if user.Username != "" { senderID = fmt.Sprintf("%d|%s", user.ID, user.Username) } // 检查白名单,避免为被拒绝的用户下载附件 if !c.IsAllowed(senderID) { logger.DebugCF("telegram", "Message rejected by allowlist", map[string]interface{}{ "user_id": senderID, }) return } chatID := message.Chat.ID c.chatIDs[senderID] = chatID content := "" mediaPaths := []string{} localFiles := []string{} // 跟踪需要清理的本地文件 // 确保临时文件在函数返回时被清理 defer func() { for _, file := range localFiles { if err := os.Remove(file); err != nil { logger.DebugCF("telegram", "Failed to cleanup temp file", map[string]interface{}{ "file": file, "error": err.Error(), }) } } }() if message.Text != "" { content += message.Text } if message.Caption != "" { if content != "" { content += "\n" } content += message.Caption } if message.Photo != nil && len(message.Photo) > 0 { photo := message.Photo[len(message.Photo)-1] photoPath := c.downloadPhoto(ctx, photo.FileID) if photoPath != "" { localFiles = append(localFiles, photoPath) mediaPaths = append(mediaPaths, photoPath) if content != "" { content += "\n" } content += fmt.Sprintf("[image: photo]") } } if message.Voice != nil { voicePath := c.downloadFile(ctx, message.Voice.FileID, ".ogg") if voicePath != "" { localFiles = append(localFiles, voicePath) mediaPaths = append(mediaPaths, voicePath) transcribedText := "" if c.transcriber != nil && c.transcriber.IsAvailable() { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() result, err := c.transcriber.Transcribe(ctx, voicePath) if err != nil { logger.ErrorCF("telegram", "Voice transcription failed", map[string]interface{}{ "error": err.Error(), "path": voicePath, }) transcribedText = fmt.Sprintf("[voice (transcription failed)]") } else { transcribedText = fmt.Sprintf("[voice transcription: %s]", result.Text) logger.InfoCF("telegram", "Voice transcribed successfully", map[string]interface{}{ "text": result.Text, }) } } else { transcribedText = fmt.Sprintf("[voice]") } if content != "" { content += "\n" } content += transcribedText } } if message.Audio != nil { audioPath := c.downloadFile(ctx, message.Audio.FileID, ".mp3") if audioPath != "" { localFiles = append(localFiles, audioPath) mediaPaths = append(mediaPaths, audioPath) if content != "" { content += "\n" } content += fmt.Sprintf("[audio]") } } if message.Document != nil { docPath := c.downloadFile(ctx, message.Document.FileID, "") if docPath != "" { localFiles = append(localFiles, docPath) mediaPaths = append(mediaPaths, docPath) if content != "" { content += "\n" } content += fmt.Sprintf("[file]") } } if content == "" { content = "[empty message]" } logger.DebugCF("telegram", "Received message", map[string]interface{}{ "sender_id": senderID, "chat_id": fmt.Sprintf("%d", chatID), "preview": utils.Truncate(content, 50), }) // Thinking indicator err := c.bot.SendChatAction(ctx, tu.ChatAction(tu.ID(chatID), telego.ChatActionTyping)) if err != nil { logger.ErrorCF("telegram", "Failed to send chat action", map[string]interface{}{ "error": err.Error(), }) } // Stop any previous thinking animation chatIDStr := fmt.Sprintf("%d", chatID) if prevStop, ok := c.stopThinking.Load(chatIDStr); ok { if cf, ok := prevStop.(*thinkingCancel); ok && cf != nil { cf.Cancel() } } // Create new context for thinking animation with timeout thinkCtx, thinkCancel := context.WithTimeout(ctx, 5*time.Minute) c.stopThinking.Store(chatIDStr, &thinkingCancel{fn: thinkCancel}) pMsg, err := c.bot.SendMessage(ctx, tu.Message(tu.ID(chatID), "Thinking... 💭")) if err == nil { pID := pMsg.MessageID c.placeholders.Store(chatIDStr, pID) go func(cid int64, mid int) { dots := []string{".", "..", "..."} emotes := []string{"💭", "🤔", "☁️"} i := 0 ticker := time.NewTicker(2000 * time.Millisecond) defer ticker.Stop() for { select { case <-thinkCtx.Done(): return case <-ticker.C: i++ text := fmt.Sprintf("Thinking%s %s", dots[i%len(dots)], emotes[i%len(emotes)]) _, editErr := c.bot.EditMessageText(thinkCtx, tu.EditMessageText(tu.ID(chatID), mid, text)) if editErr != nil { logger.DebugCF("telegram", "Failed to edit thinking message", map[string]interface{}{ "error": editErr.Error(), }) } } } }(chatID, pID) } metadata := map[string]string{ "message_id": fmt.Sprintf("%d", message.MessageID), "user_id": fmt.Sprintf("%d", user.ID), "username": user.Username, "first_name": user.FirstName, "is_group": fmt.Sprintf("%t", message.Chat.Type != "private"), } c.HandleMessage(fmt.Sprintf("%d", user.ID), fmt.Sprintf("%d", chatID), content, mediaPaths, metadata) } func (c *TelegramChannel) downloadPhoto(ctx context.Context, fileID string) string { file, err := c.bot.GetFile(ctx, &telego.GetFileParams{FileID: fileID}) if err != nil { logger.ErrorCF("telegram", "Failed to get photo file", map[string]interface{}{ "error": err.Error(), }) return "" } return c.downloadFileWithInfo(file, ".jpg") } func (c *TelegramChannel) downloadFileWithInfo(file *telego.File, ext string) string { if file.FilePath == "" { return "" } url := c.bot.FileDownloadURL(file.FilePath) logger.DebugCF("telegram", "File URL", map[string]interface{}{"url": url}) // Use FilePath as filename for better identification filename := file.FilePath + ext return utils.DownloadFile(url, filename, utils.DownloadOptions{ LoggerPrefix: "telegram", }) } func (c *TelegramChannel) downloadFile(ctx context.Context, fileID, ext string) string { file, err := c.bot.GetFile(ctx, &telego.GetFileParams{FileID: fileID}) if err != nil { logger.ErrorCF("telegram", "Failed to get file", map[string]interface{}{ "error": err.Error(), }) return "" } return c.downloadFileWithInfo(file, ext) } func parseChatID(chatIDStr string) (int64, error) { var id int64 _, err := fmt.Sscanf(chatIDStr, "%d", &id) return id, err } func markdownToTelegramHTML(text string) string { if text == "" { return "" } codeBlocks := extractCodeBlocks(text) text = codeBlocks.text inlineCodes := extractInlineCodes(text) text = inlineCodes.text text = regexp.MustCompile(`^#{1,6}\s+(.+)$`).ReplaceAllString(text, "$1") text = regexp.MustCompile(`^>\s*(.*)$`).ReplaceAllString(text, "$1") text = escapeHTML(text) text = regexp.MustCompile(`\[([^\]]+)\]\(([^)]+)\)`).ReplaceAllString(text, `$1`) text = regexp.MustCompile(`\*\*(.+?)\*\*`).ReplaceAllString(text, "$1") text = regexp.MustCompile(`__(.+?)__`).ReplaceAllString(text, "$1") reItalic := regexp.MustCompile(`_([^_]+)_`) text = reItalic.ReplaceAllStringFunc(text, func(s string) string { match := reItalic.FindStringSubmatch(s) if len(match) < 2 { return s } return "" + match[1] + "" }) text = regexp.MustCompile(`~~(.+?)~~`).ReplaceAllString(text, "$1") text = regexp.MustCompile(`^[-*]\s+`).ReplaceAllString(text, "• ") for i, code := range inlineCodes.codes { escaped := escapeHTML(code) text = strings.ReplaceAll(text, fmt.Sprintf("\x00IC%d\x00", i), fmt.Sprintf("%s", escaped)) } for i, code := range codeBlocks.codes { escaped := escapeHTML(code) text = strings.ReplaceAll(text, fmt.Sprintf("\x00CB%d\x00", i), fmt.Sprintf("
%s
", escaped)) } return text } type codeBlockMatch struct { text string codes []string } func extractCodeBlocks(text string) codeBlockMatch { re := regexp.MustCompile("```[\\w]*\\n?([\\s\\S]*?)```") matches := re.FindAllStringSubmatch(text, -1) codes := make([]string, 0, len(matches)) for _, match := range matches { codes = append(codes, match[1]) } text = re.ReplaceAllStringFunc(text, func(m string) string { return fmt.Sprintf("\x00CB%d\x00", len(codes)-1) }) return codeBlockMatch{text: text, codes: codes} } type inlineCodeMatch struct { text string codes []string } func extractInlineCodes(text string) inlineCodeMatch { re := regexp.MustCompile("`([^`]+)`") matches := re.FindAllStringSubmatch(text, -1) codes := make([]string, 0, len(matches)) for _, match := range matches { codes = append(codes, match[1]) } text = re.ReplaceAllStringFunc(text, func(m string) string { return fmt.Sprintf("\x00IC%d\x00", len(codes)-1) }) return inlineCodeMatch{text: text, codes: codes} } func escapeHTML(text string) string { text = strings.ReplaceAll(text, "&", "&") text = strings.ReplaceAll(text, "<", "<") text = strings.ReplaceAll(text, ">", ">") return text }