feat(channels): add Slack channel integration with Socket Mode
Add Slack as a messaging channel using Socket Mode (WebSocket), bringing the total supported channels to 8. Features include bidirectional messaging, thread support with per-thread session context, @mention handling, ack reactions (👀/✅), slash commands, file/attachment support with Groq Whisper audio transcription, and allowlist filtering by Slack user ID. Closes #31 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
446
pkg/channels/slack.go
Normal file
446
pkg/channels/slack.go
Normal file
@@ -0,0 +1,446 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/slack-go/slack"
|
||||
"github.com/slack-go/slack/slackevents"
|
||||
"github.com/slack-go/slack/socketmode"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/bus"
|
||||
"github.com/sipeed/picoclaw/pkg/config"
|
||||
"github.com/sipeed/picoclaw/pkg/logger"
|
||||
"github.com/sipeed/picoclaw/pkg/voice"
|
||||
)
|
||||
|
||||
type SlackChannel struct {
|
||||
*BaseChannel
|
||||
config config.SlackConfig
|
||||
api *slack.Client
|
||||
socketClient *socketmode.Client
|
||||
botUserID string
|
||||
transcriber *voice.GroqTranscriber
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
pendingAcks sync.Map
|
||||
}
|
||||
|
||||
type slackMessageRef struct {
|
||||
ChannelID string
|
||||
Timestamp string
|
||||
}
|
||||
|
||||
func NewSlackChannel(cfg config.SlackConfig, messageBus *bus.MessageBus) (*SlackChannel, error) {
|
||||
if cfg.BotToken == "" || cfg.AppToken == "" {
|
||||
return nil, fmt.Errorf("slack bot_token and app_token are required")
|
||||
}
|
||||
|
||||
api := slack.New(
|
||||
cfg.BotToken,
|
||||
slack.OptionAppLevelToken(cfg.AppToken),
|
||||
)
|
||||
|
||||
socketClient := socketmode.New(api)
|
||||
|
||||
base := NewBaseChannel("slack", cfg, messageBus, cfg.AllowFrom)
|
||||
|
||||
return &SlackChannel{
|
||||
BaseChannel: base,
|
||||
config: cfg,
|
||||
api: api,
|
||||
socketClient: socketClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *SlackChannel) SetTranscriber(transcriber *voice.GroqTranscriber) {
|
||||
c.transcriber = transcriber
|
||||
}
|
||||
|
||||
func (c *SlackChannel) Start(ctx context.Context) error {
|
||||
logger.InfoC("slack", "Starting Slack channel (Socket Mode)")
|
||||
|
||||
c.ctx, c.cancel = context.WithCancel(ctx)
|
||||
|
||||
authResp, err := c.api.AuthTest()
|
||||
if err != nil {
|
||||
return fmt.Errorf("slack auth test failed: %w", err)
|
||||
}
|
||||
c.botUserID = authResp.UserID
|
||||
|
||||
logger.InfoCF("slack", "Slack bot connected", map[string]interface{}{
|
||||
"bot_user_id": c.botUserID,
|
||||
"team": authResp.Team,
|
||||
})
|
||||
|
||||
go c.eventLoop()
|
||||
|
||||
go func() {
|
||||
if err := c.socketClient.RunContext(c.ctx); err != nil {
|
||||
if c.ctx.Err() == nil {
|
||||
logger.ErrorCF("slack", "Socket Mode connection error", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
c.setRunning(true)
|
||||
logger.InfoC("slack", "Slack channel started (Socket Mode)")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SlackChannel) Stop(ctx context.Context) error {
|
||||
logger.InfoC("slack", "Stopping Slack channel")
|
||||
|
||||
if c.cancel != nil {
|
||||
c.cancel()
|
||||
}
|
||||
|
||||
c.setRunning(false)
|
||||
logger.InfoC("slack", "Slack channel stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("slack channel not running")
|
||||
}
|
||||
|
||||
channelID, threadTS := parseSlackChatID(msg.ChatID)
|
||||
if channelID == "" {
|
||||
return fmt.Errorf("invalid slack chat ID: %s", msg.ChatID)
|
||||
}
|
||||
|
||||
opts := []slack.MsgOption{
|
||||
slack.MsgOptionText(msg.Content, false),
|
||||
}
|
||||
|
||||
if threadTS != "" {
|
||||
opts = append(opts, slack.MsgOptionTS(threadTS))
|
||||
}
|
||||
|
||||
_, _, err := c.api.PostMessageContext(ctx, channelID, opts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send slack message: %w", err)
|
||||
}
|
||||
|
||||
if ref, ok := c.pendingAcks.LoadAndDelete(msg.ChatID); ok {
|
||||
msgRef := ref.(slackMessageRef)
|
||||
c.api.AddReaction("white_check_mark", slack.ItemRef{
|
||||
Channel: msgRef.ChannelID,
|
||||
Timestamp: msgRef.Timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
logger.DebugCF("slack", "Message sent", map[string]interface{}{
|
||||
"channel_id": channelID,
|
||||
"thread_ts": threadTS,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SlackChannel) eventLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case event, ok := <-c.socketClient.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
switch event.Type {
|
||||
case socketmode.EventTypeEventsAPI:
|
||||
c.handleEventsAPI(event)
|
||||
case socketmode.EventTypeSlashCommand:
|
||||
c.handleSlashCommand(event)
|
||||
case socketmode.EventTypeInteractive:
|
||||
if event.Request != nil {
|
||||
c.socketClient.Ack(*event.Request)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *SlackChannel) handleEventsAPI(event socketmode.Event) {
|
||||
if event.Request != nil {
|
||||
c.socketClient.Ack(*event.Request)
|
||||
}
|
||||
|
||||
eventsAPIEvent, ok := event.Data.(slackevents.EventsAPIEvent)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
switch ev := eventsAPIEvent.InnerEvent.Data.(type) {
|
||||
case *slackevents.MessageEvent:
|
||||
c.handleMessageEvent(ev)
|
||||
case *slackevents.AppMentionEvent:
|
||||
c.handleAppMention(ev)
|
||||
case *slackevents.ReactionAddedEvent:
|
||||
c.handleReactionAdded(ev)
|
||||
case *slackevents.ReactionRemovedEvent:
|
||||
c.handleReactionRemoved(ev)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) {
|
||||
if ev.User == c.botUserID || ev.User == "" {
|
||||
return
|
||||
}
|
||||
if ev.BotID != "" {
|
||||
return
|
||||
}
|
||||
if ev.SubType != "" && ev.SubType != "file_share" {
|
||||
return
|
||||
}
|
||||
|
||||
senderID := ev.User
|
||||
channelID := ev.Channel
|
||||
threadTS := ev.ThreadTimeStamp
|
||||
messageTS := ev.TimeStamp
|
||||
|
||||
chatID := channelID
|
||||
if threadTS != "" {
|
||||
chatID = channelID + "/" + threadTS
|
||||
}
|
||||
|
||||
c.api.AddReaction("eyes", slack.ItemRef{
|
||||
Channel: channelID,
|
||||
Timestamp: messageTS,
|
||||
})
|
||||
|
||||
c.pendingAcks.Store(chatID, slackMessageRef{
|
||||
ChannelID: channelID,
|
||||
Timestamp: messageTS,
|
||||
})
|
||||
|
||||
content := ev.Text
|
||||
content = c.stripBotMention(content)
|
||||
|
||||
var mediaPaths []string
|
||||
|
||||
if ev.Message != nil && len(ev.Message.Files) > 0 {
|
||||
for _, file := range ev.Message.Files {
|
||||
localPath := c.downloadSlackFile(file)
|
||||
if localPath == "" {
|
||||
continue
|
||||
}
|
||||
mediaPaths = append(mediaPaths, localPath)
|
||||
|
||||
if isAudioFile(file.Name, file.Mimetype) && c.transcriber != nil && c.transcriber.IsAvailable() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
result, err := c.transcriber.Transcribe(ctx, localPath)
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
logger.ErrorCF("slack", "Voice transcription failed", map[string]interface{}{"error": err.Error()})
|
||||
content += fmt.Sprintf("\n[audio: %s (transcription failed)]", file.Name)
|
||||
} else {
|
||||
content += fmt.Sprintf("\n[voice transcription: %s]", result.Text)
|
||||
}
|
||||
} else {
|
||||
content += fmt.Sprintf("\n[file: %s]", file.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if strings.TrimSpace(content) == "" {
|
||||
return
|
||||
}
|
||||
|
||||
metadata := map[string]string{
|
||||
"message_ts": messageTS,
|
||||
"channel_id": channelID,
|
||||
"thread_ts": threadTS,
|
||||
"platform": "slack",
|
||||
}
|
||||
|
||||
logger.DebugCF("slack", "Received message", map[string]interface{}{
|
||||
"sender_id": senderID,
|
||||
"chat_id": chatID,
|
||||
"preview": truncateStringSlack(content, 50),
|
||||
"has_thread": threadTS != "",
|
||||
})
|
||||
|
||||
c.HandleMessage(senderID, chatID, content, mediaPaths, metadata)
|
||||
}
|
||||
|
||||
func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) {
|
||||
if ev.User == c.botUserID {
|
||||
return
|
||||
}
|
||||
|
||||
senderID := ev.User
|
||||
channelID := ev.Channel
|
||||
threadTS := ev.ThreadTimeStamp
|
||||
messageTS := ev.TimeStamp
|
||||
|
||||
var chatID string
|
||||
if threadTS != "" {
|
||||
chatID = channelID + "/" + threadTS
|
||||
} else {
|
||||
chatID = channelID + "/" + messageTS
|
||||
}
|
||||
|
||||
c.api.AddReaction("eyes", slack.ItemRef{
|
||||
Channel: channelID,
|
||||
Timestamp: messageTS,
|
||||
})
|
||||
|
||||
c.pendingAcks.Store(chatID, slackMessageRef{
|
||||
ChannelID: channelID,
|
||||
Timestamp: messageTS,
|
||||
})
|
||||
|
||||
content := c.stripBotMention(ev.Text)
|
||||
|
||||
if strings.TrimSpace(content) == "" {
|
||||
return
|
||||
}
|
||||
|
||||
metadata := map[string]string{
|
||||
"message_ts": messageTS,
|
||||
"channel_id": channelID,
|
||||
"thread_ts": threadTS,
|
||||
"platform": "slack",
|
||||
"is_mention": "true",
|
||||
}
|
||||
|
||||
c.HandleMessage(senderID, chatID, content, nil, metadata)
|
||||
}
|
||||
|
||||
func (c *SlackChannel) handleSlashCommand(event socketmode.Event) {
|
||||
cmd, ok := event.Data.(slack.SlashCommand)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if event.Request != nil {
|
||||
c.socketClient.Ack(*event.Request)
|
||||
}
|
||||
|
||||
senderID := cmd.UserID
|
||||
channelID := cmd.ChannelID
|
||||
chatID := channelID
|
||||
content := cmd.Text
|
||||
|
||||
if strings.TrimSpace(content) == "" {
|
||||
content = "help"
|
||||
}
|
||||
|
||||
metadata := map[string]string{
|
||||
"channel_id": channelID,
|
||||
"platform": "slack",
|
||||
"is_command": "true",
|
||||
"trigger_id": cmd.TriggerID,
|
||||
}
|
||||
|
||||
logger.DebugCF("slack", "Slash command received", map[string]interface{}{
|
||||
"sender_id": senderID,
|
||||
"command": cmd.Command,
|
||||
"text": truncateStringSlack(content, 50),
|
||||
})
|
||||
|
||||
c.HandleMessage(senderID, chatID, content, nil, metadata)
|
||||
}
|
||||
|
||||
func (c *SlackChannel) handleReactionAdded(ev *slackevents.ReactionAddedEvent) {
|
||||
logger.DebugCF("slack", "Reaction added", map[string]interface{}{
|
||||
"reaction": ev.Reaction,
|
||||
"user": ev.User,
|
||||
"item_ts": ev.Item.Timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *SlackChannel) handleReactionRemoved(ev *slackevents.ReactionRemovedEvent) {
|
||||
logger.DebugCF("slack", "Reaction removed", map[string]interface{}{
|
||||
"reaction": ev.Reaction,
|
||||
"user": ev.User,
|
||||
"item_ts": ev.Item.Timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *SlackChannel) downloadSlackFile(file slack.File) string {
|
||||
mediaDir := filepath.Join(os.TempDir(), "picoclaw_media")
|
||||
if err := os.MkdirAll(mediaDir, 0755); err != nil {
|
||||
logger.ErrorCF("slack", "Failed to create media directory", map[string]interface{}{"error": err.Error()})
|
||||
return ""
|
||||
}
|
||||
|
||||
downloadURL := file.URLPrivateDownload
|
||||
if downloadURL == "" {
|
||||
downloadURL = file.URLPrivate
|
||||
}
|
||||
if downloadURL == "" {
|
||||
logger.ErrorCF("slack", "No download URL for file", map[string]interface{}{"file_id": file.ID})
|
||||
return ""
|
||||
}
|
||||
|
||||
localPath := filepath.Join(mediaDir, file.Name)
|
||||
|
||||
req, err := http.NewRequest("GET", downloadURL, nil)
|
||||
if err != nil {
|
||||
logger.ErrorCF("slack", "Failed to create download request", map[string]interface{}{"error": err.Error()})
|
||||
return ""
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+c.config.BotToken)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
logger.ErrorCF("slack", "Failed to download file", map[string]interface{}{"error": err.Error()})
|
||||
return ""
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
logger.ErrorCF("slack", "File download returned non-200 status", map[string]interface{}{"status": resp.StatusCode})
|
||||
return ""
|
||||
}
|
||||
|
||||
out, err := os.Create(localPath)
|
||||
if err != nil {
|
||||
logger.ErrorCF("slack", "Failed to create local file", map[string]interface{}{"error": err.Error()})
|
||||
return ""
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
if _, err := io.Copy(out, resp.Body); err != nil {
|
||||
logger.ErrorCF("slack", "Failed to write file", map[string]interface{}{"error": err.Error()})
|
||||
return ""
|
||||
}
|
||||
|
||||
logger.DebugCF("slack", "File downloaded", map[string]interface{}{"path": localPath, "name": file.Name})
|
||||
return localPath
|
||||
}
|
||||
|
||||
func (c *SlackChannel) stripBotMention(text string) string {
|
||||
mention := fmt.Sprintf("<@%s>", c.botUserID)
|
||||
text = strings.ReplaceAll(text, mention, "")
|
||||
return strings.TrimSpace(text)
|
||||
}
|
||||
|
||||
func parseSlackChatID(chatID string) (channelID, threadTS string) {
|
||||
parts := strings.SplitN(chatID, "/", 2)
|
||||
channelID = parts[0]
|
||||
if len(parts) > 1 {
|
||||
threadTS = parts[1]
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func truncateStringSlack(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
}
|
||||
return s[:maxLen]
|
||||
}
|
||||
Reference in New Issue
Block a user