package channels import ( "context" "fmt" "io" "net/http" "os" "path/filepath" "strings" "sync" "time" "github.com/slack-go/slack" "github.com/slack-go/slack/slackevents" "github.com/slack-go/slack/socketmode" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/voice" ) type SlackChannel struct { *BaseChannel config config.SlackConfig api *slack.Client socketClient *socketmode.Client botUserID string transcriber *voice.GroqTranscriber ctx context.Context cancel context.CancelFunc pendingAcks sync.Map } type slackMessageRef struct { ChannelID string Timestamp string } func NewSlackChannel(cfg config.SlackConfig, messageBus *bus.MessageBus) (*SlackChannel, error) { if cfg.BotToken == "" || cfg.AppToken == "" { return nil, fmt.Errorf("slack bot_token and app_token are required") } api := slack.New( cfg.BotToken, slack.OptionAppLevelToken(cfg.AppToken), ) socketClient := socketmode.New(api) base := NewBaseChannel("slack", cfg, messageBus, cfg.AllowFrom) return &SlackChannel{ BaseChannel: base, config: cfg, api: api, socketClient: socketClient, }, nil } func (c *SlackChannel) SetTranscriber(transcriber *voice.GroqTranscriber) { c.transcriber = transcriber } func (c *SlackChannel) Start(ctx context.Context) error { logger.InfoC("slack", "Starting Slack channel (Socket Mode)") c.ctx, c.cancel = context.WithCancel(ctx) authResp, err := c.api.AuthTest() if err != nil { return fmt.Errorf("slack auth test failed: %w", err) } c.botUserID = authResp.UserID logger.InfoCF("slack", "Slack bot connected", map[string]interface{}{ "bot_user_id": c.botUserID, "team": authResp.Team, }) go c.eventLoop() go func() { if err := c.socketClient.RunContext(c.ctx); err != nil { if c.ctx.Err() == nil { logger.ErrorCF("slack", "Socket Mode connection error", map[string]interface{}{ "error": err.Error(), }) } } }() c.setRunning(true) logger.InfoC("slack", "Slack channel started (Socket Mode)") return nil } func (c *SlackChannel) Stop(ctx context.Context) error { logger.InfoC("slack", "Stopping Slack channel") if c.cancel != nil { c.cancel() } c.setRunning(false) logger.InfoC("slack", "Slack channel stopped") return nil } func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.IsRunning() { return fmt.Errorf("slack channel not running") } channelID, threadTS := parseSlackChatID(msg.ChatID) if channelID == "" { return fmt.Errorf("invalid slack chat ID: %s", msg.ChatID) } opts := []slack.MsgOption{ slack.MsgOptionText(msg.Content, false), } if threadTS != "" { opts = append(opts, slack.MsgOptionTS(threadTS)) } _, _, err := c.api.PostMessageContext(ctx, channelID, opts...) if err != nil { return fmt.Errorf("failed to send slack message: %w", err) } if ref, ok := c.pendingAcks.LoadAndDelete(msg.ChatID); ok { msgRef := ref.(slackMessageRef) c.api.AddReaction("white_check_mark", slack.ItemRef{ Channel: msgRef.ChannelID, Timestamp: msgRef.Timestamp, }) } logger.DebugCF("slack", "Message sent", map[string]interface{}{ "channel_id": channelID, "thread_ts": threadTS, }) return nil } func (c *SlackChannel) eventLoop() { for { select { case <-c.ctx.Done(): return case event, ok := <-c.socketClient.Events: if !ok { return } switch event.Type { case socketmode.EventTypeEventsAPI: c.handleEventsAPI(event) case socketmode.EventTypeSlashCommand: c.handleSlashCommand(event) case socketmode.EventTypeInteractive: if event.Request != nil { c.socketClient.Ack(*event.Request) } } } } } func (c *SlackChannel) handleEventsAPI(event socketmode.Event) { if event.Request != nil { c.socketClient.Ack(*event.Request) } eventsAPIEvent, ok := event.Data.(slackevents.EventsAPIEvent) if !ok { return } switch ev := eventsAPIEvent.InnerEvent.Data.(type) { case *slackevents.MessageEvent: c.handleMessageEvent(ev) case *slackevents.AppMentionEvent: c.handleAppMention(ev) case *slackevents.ReactionAddedEvent: c.handleReactionAdded(ev) case *slackevents.ReactionRemovedEvent: c.handleReactionRemoved(ev) } } func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) { if ev.User == c.botUserID || ev.User == "" { return } if ev.BotID != "" { return } if ev.SubType != "" && ev.SubType != "file_share" { return } senderID := ev.User channelID := ev.Channel threadTS := ev.ThreadTimeStamp messageTS := ev.TimeStamp chatID := channelID if threadTS != "" { chatID = channelID + "/" + threadTS } c.api.AddReaction("eyes", slack.ItemRef{ Channel: channelID, Timestamp: messageTS, }) c.pendingAcks.Store(chatID, slackMessageRef{ ChannelID: channelID, Timestamp: messageTS, }) content := ev.Text content = c.stripBotMention(content) var mediaPaths []string if ev.Message != nil && len(ev.Message.Files) > 0 { for _, file := range ev.Message.Files { localPath := c.downloadSlackFile(file) if localPath == "" { continue } mediaPaths = append(mediaPaths, localPath) if isAudioFile(file.Name, file.Mimetype) && c.transcriber != nil && c.transcriber.IsAvailable() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) result, err := c.transcriber.Transcribe(ctx, localPath) cancel() if err != nil { logger.ErrorCF("slack", "Voice transcription failed", map[string]interface{}{"error": err.Error()}) content += fmt.Sprintf("\n[audio: %s (transcription failed)]", file.Name) } else { content += fmt.Sprintf("\n[voice transcription: %s]", result.Text) } } else { content += fmt.Sprintf("\n[file: %s]", file.Name) } } } if strings.TrimSpace(content) == "" { return } metadata := map[string]string{ "message_ts": messageTS, "channel_id": channelID, "thread_ts": threadTS, "platform": "slack", } logger.DebugCF("slack", "Received message", map[string]interface{}{ "sender_id": senderID, "chat_id": chatID, "preview": truncateStringSlack(content, 50), "has_thread": threadTS != "", }) c.HandleMessage(senderID, chatID, content, mediaPaths, metadata) } func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) { if ev.User == c.botUserID { return } senderID := ev.User channelID := ev.Channel threadTS := ev.ThreadTimeStamp messageTS := ev.TimeStamp var chatID string if threadTS != "" { chatID = channelID + "/" + threadTS } else { chatID = channelID + "/" + messageTS } c.api.AddReaction("eyes", slack.ItemRef{ Channel: channelID, Timestamp: messageTS, }) c.pendingAcks.Store(chatID, slackMessageRef{ ChannelID: channelID, Timestamp: messageTS, }) content := c.stripBotMention(ev.Text) if strings.TrimSpace(content) == "" { return } metadata := map[string]string{ "message_ts": messageTS, "channel_id": channelID, "thread_ts": threadTS, "platform": "slack", "is_mention": "true", } c.HandleMessage(senderID, chatID, content, nil, metadata) } func (c *SlackChannel) handleSlashCommand(event socketmode.Event) { cmd, ok := event.Data.(slack.SlashCommand) if !ok { return } if event.Request != nil { c.socketClient.Ack(*event.Request) } senderID := cmd.UserID channelID := cmd.ChannelID chatID := channelID content := cmd.Text if strings.TrimSpace(content) == "" { content = "help" } metadata := map[string]string{ "channel_id": channelID, "platform": "slack", "is_command": "true", "trigger_id": cmd.TriggerID, } logger.DebugCF("slack", "Slash command received", map[string]interface{}{ "sender_id": senderID, "command": cmd.Command, "text": truncateStringSlack(content, 50), }) c.HandleMessage(senderID, chatID, content, nil, metadata) } func (c *SlackChannel) handleReactionAdded(ev *slackevents.ReactionAddedEvent) { logger.DebugCF("slack", "Reaction added", map[string]interface{}{ "reaction": ev.Reaction, "user": ev.User, "item_ts": ev.Item.Timestamp, }) } func (c *SlackChannel) handleReactionRemoved(ev *slackevents.ReactionRemovedEvent) { logger.DebugCF("slack", "Reaction removed", map[string]interface{}{ "reaction": ev.Reaction, "user": ev.User, "item_ts": ev.Item.Timestamp, }) } func (c *SlackChannel) downloadSlackFile(file slack.File) string { mediaDir := filepath.Join(os.TempDir(), "picoclaw_media") if err := os.MkdirAll(mediaDir, 0755); err != nil { logger.ErrorCF("slack", "Failed to create media directory", map[string]interface{}{"error": err.Error()}) return "" } downloadURL := file.URLPrivateDownload if downloadURL == "" { downloadURL = file.URLPrivate } if downloadURL == "" { logger.ErrorCF("slack", "No download URL for file", map[string]interface{}{"file_id": file.ID}) return "" } localPath := filepath.Join(mediaDir, file.Name) req, err := http.NewRequest("GET", downloadURL, nil) if err != nil { logger.ErrorCF("slack", "Failed to create download request", map[string]interface{}{"error": err.Error()}) return "" } req.Header.Set("Authorization", "Bearer "+c.config.BotToken) resp, err := http.DefaultClient.Do(req) if err != nil { logger.ErrorCF("slack", "Failed to download file", map[string]interface{}{"error": err.Error()}) return "" } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { logger.ErrorCF("slack", "File download returned non-200 status", map[string]interface{}{"status": resp.StatusCode}) return "" } out, err := os.Create(localPath) if err != nil { logger.ErrorCF("slack", "Failed to create local file", map[string]interface{}{"error": err.Error()}) return "" } defer out.Close() if _, err := io.Copy(out, resp.Body); err != nil { logger.ErrorCF("slack", "Failed to write file", map[string]interface{}{"error": err.Error()}) return "" } logger.DebugCF("slack", "File downloaded", map[string]interface{}{"path": localPath, "name": file.Name}) return localPath } func (c *SlackChannel) stripBotMention(text string) string { mention := fmt.Sprintf("<@%s>", c.botUserID) text = strings.ReplaceAll(text, mention, "") return strings.TrimSpace(text) } func parseSlackChatID(chatID string) (channelID, threadTS string) { parts := strings.SplitN(chatID, "/", 2) channelID = parts[0] if len(parts) > 1 { threadTS = parts[1] } return } func truncateStringSlack(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] }