// PicoClaw - Ultra-lightweight personal AI agent // DingTalk channel implementation using Stream Mode package channels import ( "context" "fmt" "sync" "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot" "github.com/open-dingtalk/dingtalk-stream-sdk-go/client" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/utils" ) // DingTalkChannel implements the Channel interface for DingTalk (钉钉) // It uses WebSocket for receiving messages via stream mode and API for sending type DingTalkChannel struct { *BaseChannel config config.DingTalkConfig clientID string clientSecret string streamClient *client.StreamClient ctx context.Context cancel context.CancelFunc // Map to store session webhooks for each chat sessionWebhooks sync.Map // chatID -> sessionWebhook } // NewDingTalkChannel creates a new DingTalk channel instance func NewDingTalkChannel(cfg config.DingTalkConfig, messageBus *bus.MessageBus) (*DingTalkChannel, error) { if cfg.ClientID == "" || cfg.ClientSecret == "" { return nil, fmt.Errorf("dingtalk client_id and client_secret are required") } base := NewBaseChannel("dingtalk", cfg, messageBus, cfg.AllowFrom) return &DingTalkChannel{ BaseChannel: base, config: cfg, clientID: cfg.ClientID, clientSecret: cfg.ClientSecret, }, nil } // Start initializes the DingTalk channel with Stream Mode func (c *DingTalkChannel) Start(ctx context.Context) error { logger.InfoC("dingtalk", "Starting DingTalk channel (Stream Mode)...") c.ctx, c.cancel = context.WithCancel(ctx) // Create credential config cred := client.NewAppCredentialConfig(c.clientID, c.clientSecret) // Create the stream client with options c.streamClient = client.NewStreamClient( client.WithAppCredential(cred), client.WithAutoReconnect(true), ) // Register chatbot callback handler (IChatBotMessageHandler is a function type) c.streamClient.RegisterChatBotCallbackRouter(c.onChatBotMessageReceived) // Start the stream client if err := c.streamClient.Start(c.ctx); err != nil { return fmt.Errorf("failed to start stream client: %w", err) } c.setRunning(true) logger.InfoC("dingtalk", "DingTalk channel started (Stream Mode)") return nil } // Stop gracefully stops the DingTalk channel func (c *DingTalkChannel) Stop(ctx context.Context) error { logger.InfoC("dingtalk", "Stopping DingTalk channel...") if c.cancel != nil { c.cancel() } if c.streamClient != nil { c.streamClient.Close() } c.setRunning(false) logger.InfoC("dingtalk", "DingTalk channel stopped") return nil } // Send sends a message to DingTalk via the chatbot reply API func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.IsRunning() { return fmt.Errorf("dingtalk channel not running") } // Get session webhook from storage sessionWebhookRaw, ok := c.sessionWebhooks.Load(msg.ChatID) if !ok { return fmt.Errorf("no session_webhook found for chat %s, cannot send message", msg.ChatID) } sessionWebhook, ok := sessionWebhookRaw.(string) if !ok { return fmt.Errorf("invalid session_webhook type for chat %s", msg.ChatID) } logger.DebugCF("dingtalk", "Sending message", map[string]interface{}{ "chat_id": msg.ChatID, "preview": utils.Truncate(msg.Content, 100), }) // Use the session webhook to send the reply return c.SendDirectReply(ctx, sessionWebhook, msg.Content) } // onChatBotMessageReceived implements the IChatBotMessageHandler function signature // This is called by the Stream SDK when a new message arrives // IChatBotMessageHandler is: func(c context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) func (c *DingTalkChannel) onChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) { // Extract message content from Text field content := data.Text.Content if content == "" { // Try to extract from Content interface{} if Text is empty if contentMap, ok := data.Content.(map[string]interface{}); ok { if textContent, ok := contentMap["content"].(string); ok { content = textContent } } } if content == "" { return nil, nil // Ignore empty messages } senderID := data.SenderStaffId senderNick := data.SenderNick chatID := senderID if data.ConversationType != "1" { // For group chats chatID = data.ConversationId } // Store the session webhook for this chat so we can reply later c.sessionWebhooks.Store(chatID, data.SessionWebhook) metadata := map[string]string{ "sender_name": senderNick, "conversation_id": data.ConversationId, "conversation_type": data.ConversationType, "platform": "dingtalk", "session_webhook": data.SessionWebhook, } logger.DebugCF("dingtalk", "Received message", map[string]interface{}{ "sender_nick": senderNick, "sender_id": senderID, "preview": utils.Truncate(content, 50), }) // Handle the message through the base channel c.HandleMessage(senderID, chatID, content, nil, metadata) // Return nil to indicate we've handled the message asynchronously // The response will be sent through the message bus return nil, nil } // SendDirectReply sends a direct reply using the session webhook func (c *DingTalkChannel) SendDirectReply(ctx context.Context, sessionWebhook, content string) error { replier := chatbot.NewChatbotReplier() // Convert string content to []byte for the API contentBytes := []byte(content) titleBytes := []byte("PicoClaw") // Send markdown formatted reply err := replier.SimpleReplyMarkdown( ctx, sessionWebhook, titleBytes, contentBytes, ) if err != nil { return fmt.Errorf("failed to send reply: %w", err) } return nil }