From 9d5728ec5b8d6c46e0ff7bdd65e8046a24620aaa Mon Sep 17 00:00:00 2001 From: Avisek Ray <153633053+biisal@users.noreply.github.com> Date: Mon, 16 Feb 2026 11:50:16 +0530 Subject: [PATCH] feat: implement structured Telegram command handling with a dedicated command service and `telegohandler` integration. (#164) --- pkg/channels/manager.go | 2 +- pkg/channels/telegram.go | 98 +++++++++++-------- pkg/channels/telegram_commands.go | 153 ++++++++++++++++++++++++++++++ 3 files changed, 212 insertions(+), 41 deletions(-) create mode 100644 pkg/channels/telegram_commands.go diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 15f8c60..7f6abc4 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -48,7 +48,7 @@ func (m *Manager) initChannels() error { if m.config.Channels.Telegram.Enabled && m.config.Channels.Telegram.Token != "" { logger.DebugC("channels", "Attempting to initialize Telegram channel") - telegram, err := NewTelegramChannel(m.config.Channels.Telegram, m.bus) + telegram, err := NewTelegramChannel(m.config, m.bus) if err != nil { logger.ErrorCF("channels", "Failed to initialize Telegram channel", map[string]interface{}{ "error": err.Error(), diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index b14b163..5601d50 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -11,7 +11,10 @@ import ( "sync" "time" + th "github.com/mymmrac/telego/telegohandler" + "github.com/mymmrac/telego" + "github.com/mymmrac/telego/telegohandler" tu "github.com/mymmrac/telego/telegoutil" "github.com/sipeed/picoclaw/pkg/bus" @@ -24,7 +27,8 @@ import ( type TelegramChannel struct { *BaseChannel bot *telego.Bot - config config.TelegramConfig + commands TelegramCommander + config *config.Config chatIDs map[string]int64 transcriber *voice.GroqTranscriber placeholders sync.Map // chatID -> messageID @@ -41,13 +45,14 @@ func (c *thinkingCancel) Cancel() { } } -func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*TelegramChannel, error) { +func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) { var opts []telego.BotOption + telegramCfg := cfg.Channels.Telegram - if cfg.Proxy != "" { - proxyURL, parseErr := url.Parse(cfg.Proxy) + if telegramCfg.Proxy != "" { + proxyURL, parseErr := url.Parse(telegramCfg.Proxy) if parseErr != nil { - return nil, fmt.Errorf("invalid proxy URL %q: %w", cfg.Proxy, parseErr) + return nil, fmt.Errorf("invalid proxy URL %q: %w", telegramCfg.Proxy, parseErr) } opts = append(opts, telego.WithHTTPClient(&http.Client{ Transport: &http.Transport{ @@ -56,15 +61,16 @@ func NewTelegramChannel(cfg config.TelegramConfig, bus *bus.MessageBus) (*Telegr })) } - bot, err := telego.NewBot(cfg.Token, opts...) + bot, err := telego.NewBot(telegramCfg.Token, opts...) if err != nil { return nil, fmt.Errorf("failed to create telegram bot: %w", err) } - base := NewBaseChannel("telegram", cfg, bus, cfg.AllowFrom) + base := NewBaseChannel("telegram", telegramCfg, bus, telegramCfg.AllowFrom) return &TelegramChannel{ BaseChannel: base, + commands: NewTelegramCommands(bot, cfg), bot: bot, config: cfg, chatIDs: make(map[string]int64), @@ -88,31 +94,45 @@ func (c *TelegramChannel) Start(ctx context.Context) error { return fmt.Errorf("failed to start long polling: %w", err) } + bh, err := telegohandler.NewBotHandler(c.bot, updates) + if err != nil { + return fmt.Errorf("failed to create bot handler: %w", err) + } + + bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { + c.commands.Help(ctx, message) + return nil + }, th.CommandEqual("help")) + bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { + return c.commands.Start(ctx, message) + }, th.CommandEqual("start")) + + bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { + return c.commands.Show(ctx, message) + }, th.CommandEqual("show")) + + bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { + return c.commands.List(ctx, message) + }, th.CommandEqual("list")) + + bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { + return c.handleMessage(ctx, &message) + }, th.AnyMessage()) + c.setRunning(true) logger.InfoCF("telegram", "Telegram bot connected", map[string]interface{}{ "username": c.bot.Username(), }) + go bh.Start() + go func() { - for { - select { - case <-ctx.Done(): - return - case update, ok := <-updates: - if !ok { - logger.InfoC("telegram", "Updates channel closed, reconnecting...") - return - } - if update.Message != nil { - c.handleMessage(ctx, update) - } - } - } + <-ctx.Done() + bh.Stop() }() return nil } - func (c *TelegramChannel) Stop(ctx context.Context) error { logger.InfoC("telegram", "Stopping Telegram bot...") c.setRunning(false) @@ -166,30 +186,27 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err return nil } -func (c *TelegramChannel) handleMessage(ctx context.Context, update telego.Update) { - message := update.Message +func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Message) error { if message == nil { - return + return fmt.Errorf("message is nil") } user := message.From if user == nil { - return + return fmt.Errorf("message sender (user) is nil") } - userID := fmt.Sprintf("%d", user.ID) - senderID := userID + senderID := fmt.Sprintf("%d", user.ID) if user.Username != "" { - senderID = fmt.Sprintf("%s|%s", userID, user.Username) + senderID = fmt.Sprintf("%d|%s", user.ID, user.Username) } // 检查白名单,避免为被拒绝的用户下载附件 - if !c.IsAllowed(userID) && !c.IsAllowed(senderID) { + if !c.IsAllowed(senderID) { logger.DebugCF("telegram", "Message rejected by allowlist", map[string]interface{}{ - "user_id": userID, - "username": user.Username, + "user_id": senderID, }) - return + return nil } chatID := message.Chat.ID @@ -222,7 +239,7 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, update telego.Updat content += message.Caption } - if message.Photo != nil && len(message.Photo) > 0 { + if len(message.Photo) > 0 { photo := message.Photo[len(message.Photo)-1] photoPath := c.downloadPhoto(ctx, photo.FileID) if photoPath != "" { @@ -231,7 +248,7 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, update telego.Updat if content != "" { content += "\n" } - content += fmt.Sprintf("[image: photo]") + content += "[image: photo]" } } @@ -252,7 +269,7 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, update telego.Updat "error": err.Error(), "path": voicePath, }) - transcribedText = fmt.Sprintf("[voice (transcription failed)]") + transcribedText = "[voice (transcription failed)]" } else { transcribedText = fmt.Sprintf("[voice transcription: %s]", result.Text) logger.InfoCF("telegram", "Voice transcribed successfully", map[string]interface{}{ @@ -260,7 +277,7 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, update telego.Updat }) } } else { - transcribedText = fmt.Sprintf("[voice]") + transcribedText = "[voice]" } if content != "" { @@ -278,7 +295,7 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, update telego.Updat if content != "" { content += "\n" } - content += fmt.Sprintf("[audio]") + content += "[audio]" } } @@ -290,7 +307,7 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, update telego.Updat if content != "" { content += "\n" } - content += fmt.Sprintf("[file]") + content += "[file]" } } @@ -338,7 +355,8 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, update telego.Updat "is_group": fmt.Sprintf("%t", message.Chat.Type != "private"), } - c.HandleMessage(senderID, fmt.Sprintf("%d", chatID), content, mediaPaths, metadata) + c.HandleMessage(fmt.Sprintf("%d", user.ID), fmt.Sprintf("%d", chatID), content, mediaPaths, metadata) + return nil } func (c *TelegramChannel) downloadPhoto(ctx context.Context, fileID string) string { diff --git a/pkg/channels/telegram_commands.go b/pkg/channels/telegram_commands.go new file mode 100644 index 0000000..df245e1 --- /dev/null +++ b/pkg/channels/telegram_commands.go @@ -0,0 +1,153 @@ +package channels + +import ( + "context" + "fmt" + "strings" + + "github.com/mymmrac/telego" + "github.com/sipeed/picoclaw/pkg/config" +) + +type TelegramCommander interface { + Help(ctx context.Context, message telego.Message) error + Start(ctx context.Context, message telego.Message) error + Show(ctx context.Context, message telego.Message) error + List(ctx context.Context, message telego.Message) error +} + +type cmd struct { + bot *telego.Bot + config *config.Config +} + +func NewTelegramCommands(bot *telego.Bot, cfg *config.Config) TelegramCommander { + return &cmd{ + bot: bot, + config: cfg, + } +} + +func commandArgs(text string) string { + parts := strings.SplitN(text, " ", 2) + if len(parts) < 2 { + return "" + } + return strings.TrimSpace(parts[1]) +} +func (c *cmd) Help(ctx context.Context, message telego.Message) error { + msg := `/start - Start the bot +/help - Show this help message +/show [model|channel] - Show current configuration +/list [models|channels] - List available options + ` + _, err := c.bot.SendMessage(ctx, &telego.SendMessageParams{ + ChatID: telego.ChatID{ID: message.Chat.ID}, + Text: msg, + ReplyParameters: &telego.ReplyParameters{ + MessageID: message.MessageID, + }, + }) + return err +} + +func (c *cmd) Start(ctx context.Context, message telego.Message) error { + _, err := c.bot.SendMessage(ctx, &telego.SendMessageParams{ + ChatID: telego.ChatID{ID: message.Chat.ID}, + Text: "Hello! I am PicoClaw 🦞", + ReplyParameters: &telego.ReplyParameters{ + MessageID: message.MessageID, + }, + }) + return err +} + +func (c *cmd) Show(ctx context.Context, message telego.Message) error { + args := commandArgs(message.Text) + if args == "" { + _, err := c.bot.SendMessage(ctx, &telego.SendMessageParams{ + ChatID: telego.ChatID{ID: message.Chat.ID}, + Text: "Usage: /show [model|channel]", + ReplyParameters: &telego.ReplyParameters{ + MessageID: message.MessageID, + }, + }) + return err + } + + var response string + switch args { + case "model": + response = fmt.Sprintf("Current Model: %s (Provider: %s)", + c.config.Agents.Defaults.Model, + c.config.Agents.Defaults.Provider) + case "channel": + response = "Current Channel: telegram" + default: + response = fmt.Sprintf("Unknown parameter: %s. Try 'model' or 'channel'.", args) + } + + _, err := c.bot.SendMessage(ctx, &telego.SendMessageParams{ + ChatID: telego.ChatID{ID: message.Chat.ID}, + Text: response, + ReplyParameters: &telego.ReplyParameters{ + MessageID: message.MessageID, + }, + }) + return err +} +func (c *cmd) List(ctx context.Context, message telego.Message) error { + args := commandArgs(message.Text) + if args == "" { + _, err := c.bot.SendMessage(ctx, &telego.SendMessageParams{ + ChatID: telego.ChatID{ID: message.Chat.ID}, + Text: "Usage: /list [models|channels]", + ReplyParameters: &telego.ReplyParameters{ + MessageID: message.MessageID, + }, + }) + return err + } + + var response string + switch args { + case "models": + provider := c.config.Agents.Defaults.Provider + if provider == "" { + provider = "configured default" + } + response = fmt.Sprintf("Configured Model: %s\nProvider: %s\n\nTo change models, update config.yaml", + c.config.Agents.Defaults.Model, provider) + + case "channels": + var enabled []string + if c.config.Channels.Telegram.Enabled { + enabled = append(enabled, "telegram") + } + if c.config.Channels.WhatsApp.Enabled { + enabled = append(enabled, "whatsapp") + } + if c.config.Channels.Feishu.Enabled { + enabled = append(enabled, "feishu") + } + if c.config.Channels.Discord.Enabled { + enabled = append(enabled, "discord") + } + if c.config.Channels.Slack.Enabled { + enabled = append(enabled, "slack") + } + response = fmt.Sprintf("Enabled Channels:\n- %s", strings.Join(enabled, "\n- ")) + + default: + response = fmt.Sprintf("Unknown parameter: %s. Try 'models' or 'channels'.", args) + } + + _, err := c.bot.SendMessage(ctx, &telego.SendMessageParams{ + ChatID: telego.ChatID{ID: message.Chat.ID}, + Text: response, + ReplyParameters: &telego.ReplyParameters{ + MessageID: message.MessageID, + }, + }) + return err +}