Merge branch 'main' into telegram-using-telego

This commit is contained in:
yinwm
2026-02-12 12:06:48 +08:00
committed by GitHub
32 changed files with 4715 additions and 49 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/open-dingtalk/dingtalk-stream-sdk-go/client"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/utils"
)
// DingTalkChannel implements the Channel interface for DingTalk (钉钉)
@@ -107,7 +108,7 @@ func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
return fmt.Errorf("invalid session_webhook type for chat %s", msg.ChatID)
}
log.Printf("DingTalk message to %s: %s", msg.ChatID, truncateStringDingTalk(msg.Content, 100))
log.Printf("DingTalk message to %s: %s", msg.ChatID, utils.Truncate(msg.Content, 100))
// Use the session webhook to send the reply
return c.SendDirectReply(sessionWebhook, msg.Content)
@@ -151,7 +152,7 @@ func (c *DingTalkChannel) onChatBotMessageReceived(ctx context.Context, data *ch
"session_webhook": data.SessionWebhook,
}
log.Printf("DingTalk message from %s (%s): %s", senderNick, senderID, truncateStringDingTalk(content, 50))
log.Printf("DingTalk message from %s (%s): %s", senderNick, senderID, utils.Truncate(content, 50))
// Handle the message through the base channel
c.HandleMessage(senderID, chatID, content, nil, metadata)
@@ -183,11 +184,3 @@ func (c *DingTalkChannel) SendDirectReply(sessionWebhook, content string) error
return nil
}
// truncateStringDingTalk truncates a string to max length for logging (avoiding name collision with telegram.go)
func truncateStringDingTalk(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/utils"
"github.com/sipeed/picoclaw/pkg/voice"
)
@@ -172,7 +173,7 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag
logger.DebugCF("discord", "Received message", map[string]interface{}{
"sender_name": senderName,
"sender_id": senderID,
"preview": truncateString(content, 50),
"preview": utils.Truncate(content, 50),
})
metadata := map[string]string{

View File

@@ -15,6 +15,7 @@ import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/utils"
)
type FeishuChannel struct {
@@ -165,7 +166,7 @@ func (c *FeishuChannel) handleMessageReceive(_ context.Context, event *larkim.P2
logger.InfoCF("feishu", "Feishu message received", map[string]interface{}{
"sender_id": senderID,
"chat_id": chatID,
"preview": truncateString(content, 80),
"preview": utils.Truncate(content, 80),
})
c.HandleMessage(senderID, chatID, content, nil, metadata)

View File

@@ -136,6 +136,19 @@ func (m *Manager) initChannels() error {
}
}
if m.config.Channels.Slack.Enabled && m.config.Channels.Slack.BotToken != "" {
logger.DebugC("channels", "Attempting to initialize Slack channel")
slackCh, err := NewSlackChannel(m.config.Channels.Slack, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize Slack channel", map[string]interface{}{
"error": err.Error(),
})
} else {
m.channels["slack"] = slackCh
logger.InfoC("channels", "Slack channel enabled successfully")
}
}
logger.InfoCF("channels", "Channel initialization completed", map[string]interface{}{
"enabled_channels": len(m.channels),
})

446
pkg/channels/slack.go Normal file
View File

@@ -0,0 +1,446 @@
package channels
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/slack-go/slack"
"github.com/slack-go/slack/slackevents"
"github.com/slack-go/slack/socketmode"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/voice"
)
type SlackChannel struct {
*BaseChannel
config config.SlackConfig
api *slack.Client
socketClient *socketmode.Client
botUserID string
transcriber *voice.GroqTranscriber
ctx context.Context
cancel context.CancelFunc
pendingAcks sync.Map
}
type slackMessageRef struct {
ChannelID string
Timestamp string
}
func NewSlackChannel(cfg config.SlackConfig, messageBus *bus.MessageBus) (*SlackChannel, error) {
if cfg.BotToken == "" || cfg.AppToken == "" {
return nil, fmt.Errorf("slack bot_token and app_token are required")
}
api := slack.New(
cfg.BotToken,
slack.OptionAppLevelToken(cfg.AppToken),
)
socketClient := socketmode.New(api)
base := NewBaseChannel("slack", cfg, messageBus, cfg.AllowFrom)
return &SlackChannel{
BaseChannel: base,
config: cfg,
api: api,
socketClient: socketClient,
}, nil
}
func (c *SlackChannel) SetTranscriber(transcriber *voice.GroqTranscriber) {
c.transcriber = transcriber
}
func (c *SlackChannel) Start(ctx context.Context) error {
logger.InfoC("slack", "Starting Slack channel (Socket Mode)")
c.ctx, c.cancel = context.WithCancel(ctx)
authResp, err := c.api.AuthTest()
if err != nil {
return fmt.Errorf("slack auth test failed: %w", err)
}
c.botUserID = authResp.UserID
logger.InfoCF("slack", "Slack bot connected", map[string]interface{}{
"bot_user_id": c.botUserID,
"team": authResp.Team,
})
go c.eventLoop()
go func() {
if err := c.socketClient.RunContext(c.ctx); err != nil {
if c.ctx.Err() == nil {
logger.ErrorCF("slack", "Socket Mode connection error", map[string]interface{}{
"error": err.Error(),
})
}
}
}()
c.setRunning(true)
logger.InfoC("slack", "Slack channel started (Socket Mode)")
return nil
}
func (c *SlackChannel) Stop(ctx context.Context) error {
logger.InfoC("slack", "Stopping Slack channel")
if c.cancel != nil {
c.cancel()
}
c.setRunning(false)
logger.InfoC("slack", "Slack channel stopped")
return nil
}
func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
if !c.IsRunning() {
return fmt.Errorf("slack channel not running")
}
channelID, threadTS := parseSlackChatID(msg.ChatID)
if channelID == "" {
return fmt.Errorf("invalid slack chat ID: %s", msg.ChatID)
}
opts := []slack.MsgOption{
slack.MsgOptionText(msg.Content, false),
}
if threadTS != "" {
opts = append(opts, slack.MsgOptionTS(threadTS))
}
_, _, err := c.api.PostMessageContext(ctx, channelID, opts...)
if err != nil {
return fmt.Errorf("failed to send slack message: %w", err)
}
if ref, ok := c.pendingAcks.LoadAndDelete(msg.ChatID); ok {
msgRef := ref.(slackMessageRef)
c.api.AddReaction("white_check_mark", slack.ItemRef{
Channel: msgRef.ChannelID,
Timestamp: msgRef.Timestamp,
})
}
logger.DebugCF("slack", "Message sent", map[string]interface{}{
"channel_id": channelID,
"thread_ts": threadTS,
})
return nil
}
func (c *SlackChannel) eventLoop() {
for {
select {
case <-c.ctx.Done():
return
case event, ok := <-c.socketClient.Events:
if !ok {
return
}
switch event.Type {
case socketmode.EventTypeEventsAPI:
c.handleEventsAPI(event)
case socketmode.EventTypeSlashCommand:
c.handleSlashCommand(event)
case socketmode.EventTypeInteractive:
if event.Request != nil {
c.socketClient.Ack(*event.Request)
}
}
}
}
}
func (c *SlackChannel) handleEventsAPI(event socketmode.Event) {
if event.Request != nil {
c.socketClient.Ack(*event.Request)
}
eventsAPIEvent, ok := event.Data.(slackevents.EventsAPIEvent)
if !ok {
return
}
switch ev := eventsAPIEvent.InnerEvent.Data.(type) {
case *slackevents.MessageEvent:
c.handleMessageEvent(ev)
case *slackevents.AppMentionEvent:
c.handleAppMention(ev)
case *slackevents.ReactionAddedEvent:
c.handleReactionAdded(ev)
case *slackevents.ReactionRemovedEvent:
c.handleReactionRemoved(ev)
}
}
func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) {
if ev.User == c.botUserID || ev.User == "" {
return
}
if ev.BotID != "" {
return
}
if ev.SubType != "" && ev.SubType != "file_share" {
return
}
senderID := ev.User
channelID := ev.Channel
threadTS := ev.ThreadTimeStamp
messageTS := ev.TimeStamp
chatID := channelID
if threadTS != "" {
chatID = channelID + "/" + threadTS
}
c.api.AddReaction("eyes", slack.ItemRef{
Channel: channelID,
Timestamp: messageTS,
})
c.pendingAcks.Store(chatID, slackMessageRef{
ChannelID: channelID,
Timestamp: messageTS,
})
content := ev.Text
content = c.stripBotMention(content)
var mediaPaths []string
if ev.Message != nil && len(ev.Message.Files) > 0 {
for _, file := range ev.Message.Files {
localPath := c.downloadSlackFile(file)
if localPath == "" {
continue
}
mediaPaths = append(mediaPaths, localPath)
if isAudioFile(file.Name, file.Mimetype) && c.transcriber != nil && c.transcriber.IsAvailable() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
result, err := c.transcriber.Transcribe(ctx, localPath)
cancel()
if err != nil {
logger.ErrorCF("slack", "Voice transcription failed", map[string]interface{}{"error": err.Error()})
content += fmt.Sprintf("\n[audio: %s (transcription failed)]", file.Name)
} else {
content += fmt.Sprintf("\n[voice transcription: %s]", result.Text)
}
} else {
content += fmt.Sprintf("\n[file: %s]", file.Name)
}
}
}
if strings.TrimSpace(content) == "" {
return
}
metadata := map[string]string{
"message_ts": messageTS,
"channel_id": channelID,
"thread_ts": threadTS,
"platform": "slack",
}
logger.DebugCF("slack", "Received message", map[string]interface{}{
"sender_id": senderID,
"chat_id": chatID,
"preview": truncateStringSlack(content, 50),
"has_thread": threadTS != "",
})
c.HandleMessage(senderID, chatID, content, mediaPaths, metadata)
}
func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) {
if ev.User == c.botUserID {
return
}
senderID := ev.User
channelID := ev.Channel
threadTS := ev.ThreadTimeStamp
messageTS := ev.TimeStamp
var chatID string
if threadTS != "" {
chatID = channelID + "/" + threadTS
} else {
chatID = channelID + "/" + messageTS
}
c.api.AddReaction("eyes", slack.ItemRef{
Channel: channelID,
Timestamp: messageTS,
})
c.pendingAcks.Store(chatID, slackMessageRef{
ChannelID: channelID,
Timestamp: messageTS,
})
content := c.stripBotMention(ev.Text)
if strings.TrimSpace(content) == "" {
return
}
metadata := map[string]string{
"message_ts": messageTS,
"channel_id": channelID,
"thread_ts": threadTS,
"platform": "slack",
"is_mention": "true",
}
c.HandleMessage(senderID, chatID, content, nil, metadata)
}
func (c *SlackChannel) handleSlashCommand(event socketmode.Event) {
cmd, ok := event.Data.(slack.SlashCommand)
if !ok {
return
}
if event.Request != nil {
c.socketClient.Ack(*event.Request)
}
senderID := cmd.UserID
channelID := cmd.ChannelID
chatID := channelID
content := cmd.Text
if strings.TrimSpace(content) == "" {
content = "help"
}
metadata := map[string]string{
"channel_id": channelID,
"platform": "slack",
"is_command": "true",
"trigger_id": cmd.TriggerID,
}
logger.DebugCF("slack", "Slash command received", map[string]interface{}{
"sender_id": senderID,
"command": cmd.Command,
"text": truncateStringSlack(content, 50),
})
c.HandleMessage(senderID, chatID, content, nil, metadata)
}
func (c *SlackChannel) handleReactionAdded(ev *slackevents.ReactionAddedEvent) {
logger.DebugCF("slack", "Reaction added", map[string]interface{}{
"reaction": ev.Reaction,
"user": ev.User,
"item_ts": ev.Item.Timestamp,
})
}
func (c *SlackChannel) handleReactionRemoved(ev *slackevents.ReactionRemovedEvent) {
logger.DebugCF("slack", "Reaction removed", map[string]interface{}{
"reaction": ev.Reaction,
"user": ev.User,
"item_ts": ev.Item.Timestamp,
})
}
func (c *SlackChannel) downloadSlackFile(file slack.File) string {
mediaDir := filepath.Join(os.TempDir(), "picoclaw_media")
if err := os.MkdirAll(mediaDir, 0755); err != nil {
logger.ErrorCF("slack", "Failed to create media directory", map[string]interface{}{"error": err.Error()})
return ""
}
downloadURL := file.URLPrivateDownload
if downloadURL == "" {
downloadURL = file.URLPrivate
}
if downloadURL == "" {
logger.ErrorCF("slack", "No download URL for file", map[string]interface{}{"file_id": file.ID})
return ""
}
localPath := filepath.Join(mediaDir, file.Name)
req, err := http.NewRequest("GET", downloadURL, nil)
if err != nil {
logger.ErrorCF("slack", "Failed to create download request", map[string]interface{}{"error": err.Error()})
return ""
}
req.Header.Set("Authorization", "Bearer "+c.config.BotToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
logger.ErrorCF("slack", "Failed to download file", map[string]interface{}{"error": err.Error()})
return ""
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.ErrorCF("slack", "File download returned non-200 status", map[string]interface{}{"status": resp.StatusCode})
return ""
}
out, err := os.Create(localPath)
if err != nil {
logger.ErrorCF("slack", "Failed to create local file", map[string]interface{}{"error": err.Error()})
return ""
}
defer out.Close()
if _, err := io.Copy(out, resp.Body); err != nil {
logger.ErrorCF("slack", "Failed to write file", map[string]interface{}{"error": err.Error()})
return ""
}
logger.DebugCF("slack", "File downloaded", map[string]interface{}{"path": localPath, "name": file.Name})
return localPath
}
func (c *SlackChannel) stripBotMention(text string) string {
mention := fmt.Sprintf("<@%s>", c.botUserID)
text = strings.ReplaceAll(text, mention, "")
return strings.TrimSpace(text)
}
func parseSlackChatID(chatID string) (channelID, threadTS string) {
parts := strings.SplitN(chatID, "/", 2)
channelID = parts[0]
if len(parts) > 1 {
threadTS = parts[1]
}
return
}
func truncateStringSlack(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}

193
pkg/channels/slack_test.go Normal file
View File

@@ -0,0 +1,193 @@
package channels
import (
"testing"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
)
func TestParseSlackChatID(t *testing.T) {
tests := []struct {
name string
chatID string
wantChanID string
wantThread string
}{
{
name: "channel only",
chatID: "C123456",
wantChanID: "C123456",
wantThread: "",
},
{
name: "channel with thread",
chatID: "C123456/1234567890.123456",
wantChanID: "C123456",
wantThread: "1234567890.123456",
},
{
name: "DM channel",
chatID: "D987654",
wantChanID: "D987654",
wantThread: "",
},
{
name: "empty string",
chatID: "",
wantChanID: "",
wantThread: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
chanID, threadTS := parseSlackChatID(tt.chatID)
if chanID != tt.wantChanID {
t.Errorf("parseSlackChatID(%q) channelID = %q, want %q", tt.chatID, chanID, tt.wantChanID)
}
if threadTS != tt.wantThread {
t.Errorf("parseSlackChatID(%q) threadTS = %q, want %q", tt.chatID, threadTS, tt.wantThread)
}
})
}
}
func TestStripBotMention(t *testing.T) {
ch := &SlackChannel{botUserID: "U12345BOT"}
tests := []struct {
name string
input string
want string
}{
{
name: "mention at start",
input: "<@U12345BOT> hello there",
want: "hello there",
},
{
name: "mention in middle",
input: "hey <@U12345BOT> can you help",
want: "hey can you help",
},
{
name: "no mention",
input: "hello world",
want: "hello world",
},
{
name: "empty string",
input: "",
want: "",
},
{
name: "only mention",
input: "<@U12345BOT>",
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := ch.stripBotMention(tt.input)
if got != tt.want {
t.Errorf("stripBotMention(%q) = %q, want %q", tt.input, got, tt.want)
}
})
}
}
func TestNewSlackChannel(t *testing.T) {
msgBus := bus.NewMessageBus()
t.Run("missing bot token", func(t *testing.T) {
cfg := config.SlackConfig{
BotToken: "",
AppToken: "xapp-test",
}
_, err := NewSlackChannel(cfg, msgBus)
if err == nil {
t.Error("expected error for missing bot_token, got nil")
}
})
t.Run("missing app token", func(t *testing.T) {
cfg := config.SlackConfig{
BotToken: "xoxb-test",
AppToken: "",
}
_, err := NewSlackChannel(cfg, msgBus)
if err == nil {
t.Error("expected error for missing app_token, got nil")
}
})
t.Run("valid config", func(t *testing.T) {
cfg := config.SlackConfig{
BotToken: "xoxb-test",
AppToken: "xapp-test",
AllowFrom: []string{"U123"},
}
ch, err := NewSlackChannel(cfg, msgBus)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if ch.Name() != "slack" {
t.Errorf("Name() = %q, want %q", ch.Name(), "slack")
}
if ch.IsRunning() {
t.Error("new channel should not be running")
}
})
}
func TestSlackChannelIsAllowed(t *testing.T) {
msgBus := bus.NewMessageBus()
t.Run("empty allowlist allows all", func(t *testing.T) {
cfg := config.SlackConfig{
BotToken: "xoxb-test",
AppToken: "xapp-test",
AllowFrom: []string{},
}
ch, _ := NewSlackChannel(cfg, msgBus)
if !ch.IsAllowed("U_ANYONE") {
t.Error("empty allowlist should allow all users")
}
})
t.Run("allowlist restricts users", func(t *testing.T) {
cfg := config.SlackConfig{
BotToken: "xoxb-test",
AppToken: "xapp-test",
AllowFrom: []string{"U_ALLOWED"},
}
ch, _ := NewSlackChannel(cfg, msgBus)
if !ch.IsAllowed("U_ALLOWED") {
t.Error("allowed user should pass allowlist check")
}
if ch.IsAllowed("U_BLOCKED") {
t.Error("non-allowed user should be blocked")
}
})
}
func TestTruncateStringSlack(t *testing.T) {
tests := []struct {
input string
maxLen int
want string
}{
{"hello", 10, "hello"},
{"hello world", 5, "hello"},
{"", 5, ""},
}
for _, tt := range tests {
got := truncateStringSlack(tt.input, tt.maxLen)
if got != tt.want {
t.Errorf("truncateStringSlack(%q, %d) = %q, want %q", tt.input, tt.maxLen, got, tt.want)
}
}
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/utils"
"github.com/sipeed/picoclaw/pkg/voice"
)
@@ -236,7 +237,7 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, update telego.Updat
content = "[empty message]"
}
log.Printf("Telegram message from %s: %s...", senderID, truncateString(content, 50))
log.Printf("Telegram message from %s: %s...", senderID, utils.Truncate(content, 50))
// Thinking indicator
err := c.bot.SendChatAction(ctx, tu.ChatAction(tu.ID(chatID), telego.ChatActionTyping))
@@ -381,13 +382,6 @@ func parseChatID(chatIDStr string) (int64, error) {
return id, err
}
func truncateString(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}
func markdownToTelegramHTML(text string) string {
if text == "" {
return ""

View File

@@ -12,6 +12,7 @@ import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/utils"
)
type WhatsAppChannel struct {
@@ -177,7 +178,7 @@ func (c *WhatsAppChannel) handleIncomingMessage(msg map[string]interface{}) {
metadata["user_name"] = userName
}
log.Printf("WhatsApp message from %s: %s...", senderID, truncateString(content, 50))
log.Printf("WhatsApp message from %s: %s...", senderID, utils.Truncate(content, 50))
c.HandleMessage(senderID, chatID, content, mediaPaths, metadata)
}