Merge branch 'pr-12'

This commit is contained in:
lxowalle
2026-02-11 19:47:11 +08:00
7 changed files with 272 additions and 5 deletions

193
pkg/channels/dingtalk.go Normal file
View File

@@ -0,0 +1,193 @@
// PicoClaw - Ultra-lightweight personal AI agent
// DingTalk channel implementation using Stream Mode
package channels
import (
"context"
"fmt"
"log"
"sync"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/client"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
)
// DingTalkChannel implements the Channel interface for DingTalk (钉钉)
// It uses WebSocket for receiving messages via stream mode and API for sending
type DingTalkChannel struct {
*BaseChannel
config config.DingTalkConfig
clientID string
clientSecret string
streamClient *client.StreamClient
ctx context.Context
cancel context.CancelFunc
// Map to store session webhooks for each chat
sessionWebhooks sync.Map // chatID -> sessionWebhook
}
// NewDingTalkChannel creates a new DingTalk channel instance
func NewDingTalkChannel(cfg config.DingTalkConfig, messageBus *bus.MessageBus) (*DingTalkChannel, error) {
if cfg.ClientID == "" || cfg.ClientSecret == "" {
return nil, fmt.Errorf("dingtalk client_id and client_secret are required")
}
base := NewBaseChannel("dingtalk", cfg, messageBus, cfg.AllowFrom)
return &DingTalkChannel{
BaseChannel: base,
config: cfg,
clientID: cfg.ClientID,
clientSecret: cfg.ClientSecret,
}, nil
}
// Start initializes the DingTalk channel with Stream Mode
func (c *DingTalkChannel) Start(ctx context.Context) error {
log.Printf("Starting DingTalk channel (Stream Mode)...")
c.ctx, c.cancel = context.WithCancel(ctx)
// Create credential config
cred := client.NewAppCredentialConfig(c.clientID, c.clientSecret)
// Create the stream client with options
c.streamClient = client.NewStreamClient(
client.WithAppCredential(cred),
client.WithAutoReconnect(true),
)
// Register chatbot callback handler (IChatBotMessageHandler is a function type)
c.streamClient.RegisterChatBotCallbackRouter(c.onChatBotMessageReceived)
// Start the stream client
if err := c.streamClient.Start(c.ctx); err != nil {
return fmt.Errorf("failed to start stream client: %w", err)
}
c.setRunning(true)
log.Println("DingTalk channel started (Stream Mode)")
return nil
}
// Stop gracefully stops the DingTalk channel
func (c *DingTalkChannel) Stop(ctx context.Context) error {
log.Println("Stopping DingTalk channel...")
if c.cancel != nil {
c.cancel()
}
if c.streamClient != nil {
c.streamClient.Close()
}
c.setRunning(false)
log.Println("DingTalk channel stopped")
return nil
}
// Send sends a message to DingTalk via the chatbot reply API
func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
if !c.IsRunning() {
return fmt.Errorf("dingtalk channel not running")
}
// Get session webhook from storage
sessionWebhookRaw, ok := c.sessionWebhooks.Load(msg.ChatID)
if !ok {
return fmt.Errorf("no session_webhook found for chat %s, cannot send message", msg.ChatID)
}
sessionWebhook, ok := sessionWebhookRaw.(string)
if !ok {
return fmt.Errorf("invalid session_webhook type for chat %s", msg.ChatID)
}
log.Printf("DingTalk message to %s: %s", msg.ChatID, truncateStringDingTalk(msg.Content, 100))
// Use the session webhook to send the reply
return c.SendDirectReply(sessionWebhook, msg.Content)
}
// onChatBotMessageReceived implements the IChatBotMessageHandler function signature
// This is called by the Stream SDK when a new message arrives
// IChatBotMessageHandler is: func(c context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error)
func (c *DingTalkChannel) onChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) {
// Extract message content from Text field
content := data.Text.Content
if content == "" {
// Try to extract from Content interface{} if Text is empty
if contentMap, ok := data.Content.(map[string]interface{}); ok {
if textContent, ok := contentMap["content"].(string); ok {
content = textContent
}
}
}
if content == "" {
return nil, nil // Ignore empty messages
}
senderID := data.SenderStaffId
senderNick := data.SenderNick
chatID := senderID
if data.ConversationType != "1" {
// For group chats
chatID = data.ConversationId
}
// Store the session webhook for this chat so we can reply later
c.sessionWebhooks.Store(chatID, data.SessionWebhook)
metadata := map[string]string{
"sender_name": senderNick,
"conversation_id": data.ConversationId,
"conversation_type": data.ConversationType,
"platform": "dingtalk",
"session_webhook": data.SessionWebhook,
}
log.Printf("DingTalk message from %s (%s): %s", senderNick, senderID, truncateStringDingTalk(content, 50))
// Handle the message through the base channel
c.HandleMessage(senderID, chatID, content, nil, metadata)
// Return nil to indicate we've handled the message asynchronously
// The response will be sent through the message bus
return nil, nil
}
// SendDirectReply sends a direct reply using the session webhook
func (c *DingTalkChannel) SendDirectReply(sessionWebhook, content string) error {
replier := chatbot.NewChatbotReplier()
// Convert string content to []byte for the API
contentBytes := []byte(content)
titleBytes := []byte("PicoClaw")
// Send markdown formatted reply
err := replier.SimpleReplyMarkdown(
context.Background(),
sessionWebhook,
titleBytes,
contentBytes,
)
if err != nil {
return fmt.Errorf("failed to send reply: %w", err)
}
return nil
}
// truncateStringDingTalk truncates a string to max length for logging (avoiding name collision with telegram.go)
func truncateStringDingTalk(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}

View File

@@ -123,6 +123,19 @@ func (m *Manager) initChannels() error {
}
}
if m.config.Channels.DingTalk.Enabled && m.config.Channels.DingTalk.ClientID != "" {
logger.DebugC("channels", "Attempting to initialize DingTalk channel")
dingtalk, err := NewDingTalkChannel(m.config.Channels.DingTalk, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize DingTalk channel", map[string]interface{}{
"error": err.Error(),
})
} else {
m.channels["dingtalk"] = dingtalk
logger.InfoC("channels", "DingTalk channel enabled successfully")
}
}
logger.InfoCF("channels", "Channel initialization completed", map[string]interface{}{
"enabled_channels": len(m.channels),
})

View File

@@ -37,6 +37,7 @@ type ChannelsConfig struct {
Discord DiscordConfig `json:"discord"`
MaixCam MaixCamConfig `json:"maixcam"`
QQ QQConfig `json:"qq"`
DingTalk DingTalkConfig `json:"dingtalk"`
}
type WhatsAppConfig struct {
@@ -80,6 +81,13 @@ type QQConfig struct {
AllowFrom []string `json:"allow_from" env:"PICOCLAW_CHANNELS_QQ_ALLOW_FROM"`
}
type DingTalkConfig struct {
Enabled bool `json:"enabled" env:"PICOCLAW_CHANNELS_DINGTALK_ENABLED"`
ClientID string `json:"client_id" env:"PICOCLAW_CHANNELS_DINGTALK_CLIENT_ID"`
ClientSecret string `json:"client_secret" env:"PICOCLAW_CHANNELS_DINGTALK_CLIENT_SECRET"`
AllowFrom []string `json:"allow_from" env:"PICOCLAW_CHANNELS_DINGTALK_ALLOW_FROM"`
}
type ProvidersConfig struct {
Anthropic ProviderConfig `json:"anthropic"`
OpenAI ProviderConfig `json:"openai"`
@@ -160,6 +168,12 @@ func DefaultConfig() *Config {
AppSecret: "",
AllowFrom: []string{},
},
DingTalk: DingTalkConfig{
Enabled: false,
ClientID: "",
ClientSecret: "",
AllowFrom: []string{},
},
},
Providers: ProvidersConfig{
Anthropic: ProviderConfig{},