package channels import ( "context" "fmt" "os" "strings" "sync" "time" "github.com/slack-go/slack" "github.com/slack-go/slack/slackevents" "github.com/slack-go/slack/socketmode" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/utils" "github.com/sipeed/picoclaw/pkg/voice" ) type SlackChannel struct { *BaseChannel config config.SlackConfig api *slack.Client socketClient *socketmode.Client botUserID string transcriber *voice.GroqTranscriber ctx context.Context cancel context.CancelFunc pendingAcks sync.Map } type slackMessageRef struct { ChannelID string Timestamp string } func NewSlackChannel(cfg config.SlackConfig, messageBus *bus.MessageBus) (*SlackChannel, error) { if cfg.BotToken == "" || cfg.AppToken == "" { return nil, fmt.Errorf("slack bot_token and app_token are required") } api := slack.New( cfg.BotToken, slack.OptionAppLevelToken(cfg.AppToken), ) socketClient := socketmode.New(api) base := NewBaseChannel("slack", cfg, messageBus, cfg.AllowFrom) return &SlackChannel{ BaseChannel: base, config: cfg, api: api, socketClient: socketClient, }, nil } func (c *SlackChannel) SetTranscriber(transcriber *voice.GroqTranscriber) { c.transcriber = transcriber } func (c *SlackChannel) Start(ctx context.Context) error { logger.InfoC("slack", "Starting Slack channel (Socket Mode)") c.ctx, c.cancel = context.WithCancel(ctx) authResp, err := c.api.AuthTest() if err != nil { return fmt.Errorf("slack auth test failed: %w", err) } c.botUserID = authResp.UserID logger.InfoCF("slack", "Slack bot connected", map[string]interface{}{ "bot_user_id": c.botUserID, "team": authResp.Team, }) go c.eventLoop() go func() { if err := c.socketClient.RunContext(c.ctx); err != nil { if c.ctx.Err() == nil { logger.ErrorCF("slack", "Socket Mode connection error", map[string]interface{}{ "error": err.Error(), }) } } }() c.setRunning(true) logger.InfoC("slack", "Slack channel started (Socket Mode)") return nil } func (c *SlackChannel) Stop(ctx context.Context) error { logger.InfoC("slack", "Stopping Slack channel") if c.cancel != nil { c.cancel() } c.setRunning(false) logger.InfoC("slack", "Slack channel stopped") return nil } func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.IsRunning() { return fmt.Errorf("slack channel not running") } channelID, threadTS := parseSlackChatID(msg.ChatID) if channelID == "" { return fmt.Errorf("invalid slack chat ID: %s", msg.ChatID) } opts := []slack.MsgOption{ slack.MsgOptionText(msg.Content, false), } if threadTS != "" { opts = append(opts, slack.MsgOptionTS(threadTS)) } _, _, err := c.api.PostMessageContext(ctx, channelID, opts...) if err != nil { return fmt.Errorf("failed to send slack message: %w", err) } if ref, ok := c.pendingAcks.LoadAndDelete(msg.ChatID); ok { msgRef := ref.(slackMessageRef) c.api.AddReaction("white_check_mark", slack.ItemRef{ Channel: msgRef.ChannelID, Timestamp: msgRef.Timestamp, }) } logger.DebugCF("slack", "Message sent", map[string]interface{}{ "channel_id": channelID, "thread_ts": threadTS, }) return nil } func (c *SlackChannel) eventLoop() { for { select { case <-c.ctx.Done(): return case event, ok := <-c.socketClient.Events: if !ok { return } switch event.Type { case socketmode.EventTypeEventsAPI: c.handleEventsAPI(event) case socketmode.EventTypeSlashCommand: c.handleSlashCommand(event) case socketmode.EventTypeInteractive: if event.Request != nil { c.socketClient.Ack(*event.Request) } } } } } func (c *SlackChannel) handleEventsAPI(event socketmode.Event) { if event.Request != nil { c.socketClient.Ack(*event.Request) } eventsAPIEvent, ok := event.Data.(slackevents.EventsAPIEvent) if !ok { return } switch ev := eventsAPIEvent.InnerEvent.Data.(type) { case *slackevents.MessageEvent: c.handleMessageEvent(ev) case *slackevents.AppMentionEvent: c.handleAppMention(ev) } } func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) { if ev.User == c.botUserID || ev.User == "" { return } if ev.BotID != "" { return } if ev.SubType != "" && ev.SubType != "file_share" { return } // 检查白名单,避免为被拒绝的用户下载附件 if !c.IsAllowed(ev.User) { logger.DebugCF("slack", "Message rejected by allowlist", map[string]interface{}{ "user_id": ev.User, }) return } senderID := ev.User channelID := ev.Channel threadTS := ev.ThreadTimeStamp messageTS := ev.TimeStamp chatID := channelID if threadTS != "" { chatID = channelID + "/" + threadTS } c.api.AddReaction("eyes", slack.ItemRef{ Channel: channelID, Timestamp: messageTS, }) c.pendingAcks.Store(chatID, slackMessageRef{ ChannelID: channelID, Timestamp: messageTS, }) content := ev.Text content = c.stripBotMention(content) var mediaPaths []string localFiles := []string{} // 跟踪需要清理的本地文件 // 确保临时文件在函数返回时被清理 defer func() { for _, file := range localFiles { if err := os.Remove(file); err != nil { logger.DebugCF("slack", "Failed to cleanup temp file", map[string]interface{}{ "file": file, "error": err.Error(), }) } } }() if ev.Message != nil && len(ev.Message.Files) > 0 { for _, file := range ev.Message.Files { localPath := c.downloadSlackFile(file) if localPath == "" { continue } localFiles = append(localFiles, localPath) mediaPaths = append(mediaPaths, localPath) if utils.IsAudioFile(file.Name, file.Mimetype) && c.transcriber != nil && c.transcriber.IsAvailable() { ctx, cancel := context.WithTimeout(c.ctx, 30*time.Second) defer cancel() result, err := c.transcriber.Transcribe(ctx, localPath) if err != nil { logger.ErrorCF("slack", "Voice transcription failed", map[string]interface{}{"error": err.Error()}) content += fmt.Sprintf("\n[audio: %s (transcription failed)]", file.Name) } else { content += fmt.Sprintf("\n[voice transcription: %s]", result.Text) } } else { content += fmt.Sprintf("\n[file: %s]", file.Name) } } } if strings.TrimSpace(content) == "" { return } metadata := map[string]string{ "message_ts": messageTS, "channel_id": channelID, "thread_ts": threadTS, "platform": "slack", } logger.DebugCF("slack", "Received message", map[string]interface{}{ "sender_id": senderID, "chat_id": chatID, "preview": utils.Truncate(content, 50), "has_thread": threadTS != "", }) c.HandleMessage(senderID, chatID, content, mediaPaths, metadata) } func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) { if ev.User == c.botUserID { return } if !c.IsAllowed(ev.User) { logger.DebugCF("slack", "Mention rejected by allowlist", map[string]interface{}{ "user_id": ev.User, }) return } senderID := ev.User channelID := ev.Channel threadTS := ev.ThreadTimeStamp messageTS := ev.TimeStamp var chatID string if threadTS != "" { chatID = channelID + "/" + threadTS } else { chatID = channelID + "/" + messageTS } c.api.AddReaction("eyes", slack.ItemRef{ Channel: channelID, Timestamp: messageTS, }) c.pendingAcks.Store(chatID, slackMessageRef{ ChannelID: channelID, Timestamp: messageTS, }) content := c.stripBotMention(ev.Text) if strings.TrimSpace(content) == "" { return } metadata := map[string]string{ "message_ts": messageTS, "channel_id": channelID, "thread_ts": threadTS, "platform": "slack", "is_mention": "true", } c.HandleMessage(senderID, chatID, content, nil, metadata) } func (c *SlackChannel) handleSlashCommand(event socketmode.Event) { cmd, ok := event.Data.(slack.SlashCommand) if !ok { return } if event.Request != nil { c.socketClient.Ack(*event.Request) } if !c.IsAllowed(cmd.UserID) { logger.DebugCF("slack", "Slash command rejected by allowlist", map[string]interface{}{ "user_id": cmd.UserID, }) return } senderID := cmd.UserID channelID := cmd.ChannelID chatID := channelID content := cmd.Text if strings.TrimSpace(content) == "" { content = "help" } metadata := map[string]string{ "channel_id": channelID, "platform": "slack", "is_command": "true", "trigger_id": cmd.TriggerID, } logger.DebugCF("slack", "Slash command received", map[string]interface{}{ "sender_id": senderID, "command": cmd.Command, "text": utils.Truncate(content, 50), }) c.HandleMessage(senderID, chatID, content, nil, metadata) } func (c *SlackChannel) downloadSlackFile(file slack.File) string { downloadURL := file.URLPrivateDownload if downloadURL == "" { downloadURL = file.URLPrivate } if downloadURL == "" { logger.ErrorCF("slack", "No download URL for file", map[string]interface{}{"file_id": file.ID}) return "" } return utils.DownloadFile(downloadURL, file.Name, utils.DownloadOptions{ LoggerPrefix: "slack", ExtraHeaders: map[string]string{ "Authorization": "Bearer " + c.config.BotToken, }, }) } func (c *SlackChannel) stripBotMention(text string) string { mention := fmt.Sprintf("<@%s>", c.botUserID) text = strings.ReplaceAll(text, mention, "") return strings.TrimSpace(text) } func parseSlackChatID(chatID string) (channelID, threadTS string) { parts := strings.SplitN(chatID, "/", 2) channelID = parts[0] if len(parts) > 1 { threadTS = parts[1] } return }