Files
picoclaw/pkg/channels/telegram.go
yinwm 5c8626f07b refactor(channels): consolidate media handling and improve resource cleanup
Extract common file download and audio detection logic to utils package,
implement consistent temp file cleanup with defer, add allowlist checks
before downloading attachments, and improve context management across
Discord, Slack, and Telegram channels. Replace logging with structured
logger and prevent context leaks in transcription and thinking animations.
2026-02-12 12:46:28 +08:00

491 lines
12 KiB
Go

package channels
import (
"context"
"fmt"
"os"
"regexp"
"strings"
"sync"
"time"
"github.com/mymmrac/telego"
tu "github.com/mymmrac/telego/telegoutil"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/utils"
"github.com/sipeed/picoclaw/pkg/voice"
)
type TelegramChannel struct {
*BaseChannel
bot *telego.Bot
config config.TelegramConfig
chatIDs map[string]int64
transcriber *voice.GroqTranscriber
placeholders sync.Map // chatID -> messageID
stopThinking sync.Map // chatID -> thinkingCancel
}
type thinkingCancel struct {
fn context.CancelFunc
}
func (c *thinkingCancel) Cancel() {
if c != nil && c.fn != nil {
c.fn()
}
}
func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*TelegramChannel, error) {
bot, err := telego.NewBot(cfg.Token)
if err != nil {
return nil, fmt.Errorf("failed to create telegram bot: %w", err)
}
base := NewBaseChannel("telegram", cfg, bus, cfg.AllowFrom)
return &TelegramChannel{
BaseChannel: base,
bot: bot,
config: cfg,
chatIDs: make(map[string]int64),
transcriber: nil,
placeholders: sync.Map{},
stopThinking: sync.Map{},
}, nil
}
func (c *TelegramChannel) SetTranscriber(transcriber *voice.GroqTranscriber) {
c.transcriber = transcriber
}
func (c *TelegramChannel) Start(ctx context.Context) error {
logger.InfoC("telegram", "Starting Telegram bot (polling mode)...")
updates, err := c.bot.UpdatesViaLongPolling(ctx, &telego.GetUpdatesParams{
Timeout: 30,
})
if err != nil {
return fmt.Errorf("failed to start long polling: %w", err)
}
c.setRunning(true)
logger.InfoCF("telegram", "Telegram bot connected", map[string]interface{}{
"username": c.bot.Username(),
})
go func() {
for {
select {
case <-ctx.Done():
return
case update, ok := <-updates:
if !ok {
logger.InfoC("telegram", "Updates channel closed, reconnecting...")
return
}
if update.Message != nil {
c.handleMessage(ctx, update)
}
}
}
}()
return nil
}
func (c *TelegramChannel) Stop(ctx context.Context) error {
logger.InfoC("telegram", "Stopping Telegram bot...")
c.setRunning(false)
return nil
}
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
if !c.IsRunning() {
return fmt.Errorf("telegram bot not running")
}
chatID, err := parseChatID(msg.ChatID)
if err != nil {
return fmt.Errorf("invalid chat ID: %w", err)
}
// Stop thinking animation
if stop, ok := c.stopThinking.Load(msg.ChatID); ok {
if cf, ok := stop.(*thinkingCancel); ok && cf != nil {
cf.Cancel()
}
c.stopThinking.Delete(msg.ChatID)
}
htmlContent := markdownToTelegramHTML(msg.Content)
// Try to edit placeholder
if pID, ok := c.placeholders.Load(msg.ChatID); ok {
c.placeholders.Delete(msg.ChatID)
editMsg := tu.EditMessageText(tu.ID(chatID), pID.(int), htmlContent)
editMsg.ParseMode = telego.ModeHTML
if _, err = c.bot.EditMessageText(ctx, editMsg); err == nil {
return nil
}
// Fallback to new message if edit fails
}
tgMsg := tu.Message(tu.ID(chatID), htmlContent)
tgMsg.ParseMode = telego.ModeHTML
if _, err = c.bot.SendMessage(ctx, tgMsg); err != nil {
logger.ErrorCF("telegram", "HTML parse failed, falling back to plain text", map[string]interface{}{
"error": err.Error(),
})
tgMsg.ParseMode = ""
_, err = c.bot.SendMessage(ctx, tgMsg)
return err
}
return nil
}
func (c *TelegramChannel) handleMessage(ctx context.Context, update telego.Update) {
message := update.Message
if message == nil {
return
}
user := message.From
if user == nil {
return
}
senderID := fmt.Sprintf("%d", user.ID)
if user.Username != "" {
senderID = fmt.Sprintf("%d|%s", user.ID, user.Username)
}
// 检查白名单,避免为被拒绝的用户下载附件
if !c.IsAllowed(senderID) {
logger.DebugCF("telegram", "Message rejected by allowlist", map[string]interface{}{
"user_id": senderID,
})
return
}
chatID := message.Chat.ID
c.chatIDs[senderID] = chatID
content := ""
mediaPaths := []string{}
localFiles := []string{} // 跟踪需要清理的本地文件
// 确保临时文件在函数返回时被清理
defer func() {
for _, file := range localFiles {
if err := os.Remove(file); err != nil {
logger.DebugCF("telegram", "Failed to cleanup temp file", map[string]interface{}{
"file": file,
"error": err.Error(),
})
}
}
}()
if message.Text != "" {
content += message.Text
}
if message.Caption != "" {
if content != "" {
content += "\n"
}
content += message.Caption
}
if message.Photo != nil && len(message.Photo) > 0 {
photo := message.Photo[len(message.Photo)-1]
photoPath := c.downloadPhoto(ctx, photo.FileID)
if photoPath != "" {
localFiles = append(localFiles, photoPath)
mediaPaths = append(mediaPaths, photoPath)
if content != "" {
content += "\n"
}
content += fmt.Sprintf("[image: photo]")
}
}
if message.Voice != nil {
voicePath := c.downloadFile(ctx, message.Voice.FileID, ".ogg")
if voicePath != "" {
localFiles = append(localFiles, voicePath)
mediaPaths = append(mediaPaths, voicePath)
transcribedText := ""
if c.transcriber != nil && c.transcriber.IsAvailable() {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
result, err := c.transcriber.Transcribe(ctx, voicePath)
if err != nil {
logger.ErrorCF("telegram", "Voice transcription failed", map[string]interface{}{
"error": err.Error(),
"path": voicePath,
})
transcribedText = fmt.Sprintf("[voice (transcription failed)]")
} else {
transcribedText = fmt.Sprintf("[voice transcription: %s]", result.Text)
logger.InfoCF("telegram", "Voice transcribed successfully", map[string]interface{}{
"text": result.Text,
})
}
} else {
transcribedText = fmt.Sprintf("[voice]")
}
if content != "" {
content += "\n"
}
content += transcribedText
}
}
if message.Audio != nil {
audioPath := c.downloadFile(ctx, message.Audio.FileID, ".mp3")
if audioPath != "" {
localFiles = append(localFiles, audioPath)
mediaPaths = append(mediaPaths, audioPath)
if content != "" {
content += "\n"
}
content += fmt.Sprintf("[audio]")
}
}
if message.Document != nil {
docPath := c.downloadFile(ctx, message.Document.FileID, "")
if docPath != "" {
localFiles = append(localFiles, docPath)
mediaPaths = append(mediaPaths, docPath)
if content != "" {
content += "\n"
}
content += fmt.Sprintf("[file]")
}
}
if content == "" {
content = "[empty message]"
}
logger.DebugCF("telegram", "Received message", map[string]interface{}{
"sender_id": senderID,
"chat_id": fmt.Sprintf("%d", chatID),
"preview": utils.Truncate(content, 50),
})
// Thinking indicator
err := c.bot.SendChatAction(ctx, tu.ChatAction(tu.ID(chatID), telego.ChatActionTyping))
if err != nil {
logger.ErrorCF("telegram", "Failed to send chat action", map[string]interface{}{
"error": err.Error(),
})
}
// Stop any previous thinking animation
chatIDStr := fmt.Sprintf("%d", chatID)
if prevStop, ok := c.stopThinking.Load(chatIDStr); ok {
if cf, ok := prevStop.(*thinkingCancel); ok && cf != nil {
cf.Cancel()
}
}
// Create new context for thinking animation with timeout
thinkCtx, thinkCancel := context.WithTimeout(ctx, 5*time.Minute)
c.stopThinking.Store(chatIDStr, &thinkingCancel{fn: thinkCancel})
pMsg, err := c.bot.SendMessage(ctx, tu.Message(tu.ID(chatID), "Thinking... 💭"))
if err == nil {
pID := pMsg.MessageID
c.placeholders.Store(chatIDStr, pID)
go func(cid int64, mid int) {
dots := []string{".", "..", "..."}
emotes := []string{"💭", "🤔", "☁️"}
i := 0
ticker := time.NewTicker(2000 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-thinkCtx.Done():
return
case <-ticker.C:
i++
text := fmt.Sprintf("Thinking%s %s", dots[i%len(dots)], emotes[i%len(emotes)])
_, editErr := c.bot.EditMessageText(thinkCtx, tu.EditMessageText(tu.ID(chatID), mid, text))
if editErr != nil {
logger.DebugCF("telegram", "Failed to edit thinking message", map[string]interface{}{
"error": editErr.Error(),
})
}
}
}
}(chatID, pID)
}
metadata := map[string]string{
"message_id": fmt.Sprintf("%d", message.MessageID),
"user_id": fmt.Sprintf("%d", user.ID),
"username": user.Username,
"first_name": user.FirstName,
"is_group": fmt.Sprintf("%t", message.Chat.Type != "private"),
}
c.HandleMessage(senderID, fmt.Sprintf("%d", chatID), content, mediaPaths, metadata)
}
func (c *TelegramChannel) downloadPhoto(ctx context.Context, fileID string) string {
file, err := c.bot.GetFile(ctx, &telego.GetFileParams{FileID: fileID})
if err != nil {
logger.ErrorCF("telegram", "Failed to get photo file", map[string]interface{}{
"error": err.Error(),
})
return ""
}
return c.downloadFileWithInfo(file, ".jpg")
}
func (c *TelegramChannel) downloadFileWithInfo(file *telego.File, ext string) string {
if file.FilePath == "" {
return ""
}
url := c.bot.FileDownloadURL(file.FilePath)
logger.DebugCF("telegram", "File URL", map[string]interface{}{"url": url})
// Use FilePath as filename for better identification
filename := file.FilePath + ext
return utils.DownloadFile(url, filename, utils.DownloadOptions{
LoggerPrefix: "telegram",
})
}
func (c *TelegramChannel) downloadFile(ctx context.Context, fileID, ext string) string {
file, err := c.bot.GetFile(ctx, &telego.GetFileParams{FileID: fileID})
if err != nil {
logger.ErrorCF("telegram", "Failed to get file", map[string]interface{}{
"error": err.Error(),
})
return ""
}
return c.downloadFileWithInfo(file, ext)
}
func parseChatID(chatIDStr string) (int64, error) {
var id int64
_, err := fmt.Sscanf(chatIDStr, "%d", &id)
return id, err
}
func markdownToTelegramHTML(text string) string {
if text == "" {
return ""
}
codeBlocks := extractCodeBlocks(text)
text = codeBlocks.text
inlineCodes := extractInlineCodes(text)
text = inlineCodes.text
text = regexp.MustCompile(`^#{1,6}\s+(.+)$`).ReplaceAllString(text, "$1")
text = regexp.MustCompile(`^>\s*(.*)$`).ReplaceAllString(text, "$1")
text = escapeHTML(text)
text = regexp.MustCompile(`\[([^\]]+)\]\(([^)]+)\)`).ReplaceAllString(text, `<a href="$2">$1</a>`)
text = regexp.MustCompile(`\*\*(.+?)\*\*`).ReplaceAllString(text, "<b>$1</b>")
text = regexp.MustCompile(`__(.+?)__`).ReplaceAllString(text, "<b>$1</b>")
reItalic := regexp.MustCompile(`_([^_]+)_`)
text = reItalic.ReplaceAllStringFunc(text, func(s string) string {
match := reItalic.FindStringSubmatch(s)
if len(match) < 2 {
return s
}
return "<i>" + match[1] + "</i>"
})
text = regexp.MustCompile(`~~(.+?)~~`).ReplaceAllString(text, "<s>$1</s>")
text = regexp.MustCompile(`^[-*]\s+`).ReplaceAllString(text, "• ")
for i, code := range inlineCodes.codes {
escaped := escapeHTML(code)
text = strings.ReplaceAll(text, fmt.Sprintf("\x00IC%d\x00", i), fmt.Sprintf("<code>%s</code>", escaped))
}
for i, code := range codeBlocks.codes {
escaped := escapeHTML(code)
text = strings.ReplaceAll(text, fmt.Sprintf("\x00CB%d\x00", i), fmt.Sprintf("<pre><code>%s</code></pre>", escaped))
}
return text
}
type codeBlockMatch struct {
text string
codes []string
}
func extractCodeBlocks(text string) codeBlockMatch {
re := regexp.MustCompile("```[\\w]*\\n?([\\s\\S]*?)```")
matches := re.FindAllStringSubmatch(text, -1)
codes := make([]string, 0, len(matches))
for _, match := range matches {
codes = append(codes, match[1])
}
text = re.ReplaceAllStringFunc(text, func(m string) string {
return fmt.Sprintf("\x00CB%d\x00", len(codes)-1)
})
return codeBlockMatch{text: text, codes: codes}
}
type inlineCodeMatch struct {
text string
codes []string
}
func extractInlineCodes(text string) inlineCodeMatch {
re := regexp.MustCompile("`([^`]+)`")
matches := re.FindAllStringSubmatch(text, -1)
codes := make([]string, 0, len(matches))
for _, match := range matches {
codes = append(codes, match[1])
}
text = re.ReplaceAllStringFunc(text, func(m string) string {
return fmt.Sprintf("\x00IC%d\x00", len(codes)-1)
})
return inlineCodeMatch{text: text, codes: codes}
}
func escapeHTML(text string) string {
text = strings.ReplaceAll(text, "&", "&amp;")
text = strings.ReplaceAll(text, "<", "&lt;")
text = strings.ReplaceAll(text, ">", "&gt;")
return text
}