diff --git a/config/config.example.json b/config/config.example.json index ee3ac97..f7e8835 100644 --- a/config/config.example.json +++ b/config/config.example.json @@ -60,6 +60,14 @@ "webhook_port": 18791, "webhook_path": "/webhook/line", "allow_from": [] + }, + "onebot": { + "enabled": false, + "ws_url": "ws://127.0.0.1:3001", + "access_token": "", + "reconnect_interval": 5, + "group_trigger_prefix": [], + "allow_from": [] } }, "providers": { diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 69e9b2b..15f8c60 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -163,6 +163,19 @@ func (m *Manager) initChannels() error { } } + if m.config.Channels.OneBot.Enabled && m.config.Channels.OneBot.WSUrl != "" { + logger.DebugC("channels", "Attempting to initialize OneBot channel") + onebot, err := NewOneBotChannel(m.config.Channels.OneBot, m.bus) + if err != nil { + logger.ErrorCF("channels", "Failed to initialize OneBot channel", map[string]interface{}{ + "error": err.Error(), + }) + } else { + m.channels["onebot"] = onebot + logger.InfoC("channels", "OneBot channel enabled successfully") + } + } + logger.InfoCF("channels", "Channel initialization completed", map[string]interface{}{ "enabled_channels": len(m.channels), }) diff --git a/pkg/channels/onebot.go b/pkg/channels/onebot.go new file mode 100644 index 0000000..fe6edbe --- /dev/null +++ b/pkg/channels/onebot.go @@ -0,0 +1,677 @@ +package channels + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" + + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/config" + "github.com/sipeed/picoclaw/pkg/logger" +) + +type OneBotChannel struct { + *BaseChannel + config config.OneBotConfig + conn *websocket.Conn + ctx context.Context + cancel context.CancelFunc + dedup map[string]struct{} + dedupRing []string + dedupIdx int + mu sync.Mutex + writeMu sync.Mutex + echoCounter int64 +} + + +type oneBotRawEvent struct { + PostType string `json:"post_type"` + MessageType string `json:"message_type"` + SubType string `json:"sub_type"` + MessageID json.RawMessage `json:"message_id"` + UserID json.RawMessage `json:"user_id"` + GroupID json.RawMessage `json:"group_id"` + RawMessage string `json:"raw_message"` + Message json.RawMessage `json:"message"` + Sender json.RawMessage `json:"sender"` + SelfID json.RawMessage `json:"self_id"` + Time json.RawMessage `json:"time"` + MetaEventType string `json:"meta_event_type"` + Echo string `json:"echo"` + RetCode json.RawMessage `json:"retcode"` + Status string `json:"status"` +} + +type oneBotSender struct { + UserID json.RawMessage `json:"user_id"` + Nickname string `json:"nickname"` + Card string `json:"card"` +} + +type oneBotEvent struct { + PostType string + MessageType string + SubType string + MessageID string + UserID int64 + GroupID int64 + Content string + RawContent string + IsBotMentioned bool + Sender oneBotSender + SelfID int64 + Time int64 + MetaEventType string +} + +type oneBotAPIRequest struct { + Action string `json:"action"` + Params interface{} `json:"params"` + Echo string `json:"echo,omitempty"` +} + +type oneBotSendPrivateMsgParams struct { + UserID int64 `json:"user_id"` + Message string `json:"message"` +} + +type oneBotSendGroupMsgParams struct { + GroupID int64 `json:"group_id"` + Message string `json:"message"` +} + +func NewOneBotChannel(cfg config.OneBotConfig, messageBus *bus.MessageBus) (*OneBotChannel, error) { + base := NewBaseChannel("onebot", cfg, messageBus, cfg.AllowFrom) + + const dedupSize = 1024 + return &OneBotChannel{ + BaseChannel: base, + config: cfg, + dedup: make(map[string]struct{}, dedupSize), + dedupRing: make([]string, dedupSize), + dedupIdx: 0, + }, nil +} + +func (c *OneBotChannel) Start(ctx context.Context) error { + if c.config.WSUrl == "" { + return fmt.Errorf("OneBot ws_url not configured") + } + + logger.InfoCF("onebot", "Starting OneBot channel", map[string]interface{}{ + "ws_url": c.config.WSUrl, + }) + + c.ctx, c.cancel = context.WithCancel(ctx) + + if err := c.connect(); err != nil { + return fmt.Errorf("failed to connect to OneBot: %w", err) + } + + go c.listen() + + if c.config.ReconnectInterval > 0 { + go c.reconnectLoop() + } + + c.setRunning(true) + logger.InfoC("onebot", "OneBot channel started successfully") + + return nil +} + +func (c *OneBotChannel) connect() error { + dialer := websocket.DefaultDialer + dialer.HandshakeTimeout = 10 * time.Second + + header := make(map[string][]string) + if c.config.AccessToken != "" { + header["Authorization"] = []string{"Bearer " + c.config.AccessToken} + } + + conn, _, err := dialer.Dial(c.config.WSUrl, header) + if err != nil { + return err + } + + c.mu.Lock() + c.conn = conn + c.mu.Unlock() + + logger.InfoC("onebot", "WebSocket connected") + return nil +} + +func (c *OneBotChannel) reconnectLoop() { + interval := time.Duration(c.config.ReconnectInterval) * time.Second + if interval < 5*time.Second { + interval = 5 * time.Second + } + + for { + select { + case <-c.ctx.Done(): + return + case <-time.After(interval): + c.mu.Lock() + conn := c.conn + c.mu.Unlock() + + if conn == nil { + logger.InfoC("onebot", "Attempting to reconnect...") + if err := c.connect(); err != nil { + logger.ErrorCF("onebot", "Reconnect failed", map[string]interface{}{ + "error": err.Error(), + }) + } else { + go c.listen() + } + } + } + } +} + +func (c *OneBotChannel) Stop(ctx context.Context) error { + logger.InfoC("onebot", "Stopping OneBot channel") + c.setRunning(false) + + if c.cancel != nil { + c.cancel() + } + + c.mu.Lock() + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + c.mu.Unlock() + + return nil +} + +func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { + if !c.IsRunning() { + return fmt.Errorf("OneBot channel not running") + } + + c.mu.Lock() + conn := c.conn + c.mu.Unlock() + + if conn == nil { + return fmt.Errorf("OneBot WebSocket not connected") + } + + action, params, err := c.buildSendRequest(msg) + if err != nil { + return err + } + + c.writeMu.Lock() + c.echoCounter++ + echo := fmt.Sprintf("send_%d", c.echoCounter) + c.writeMu.Unlock() + + req := oneBotAPIRequest{ + Action: action, + Params: params, + Echo: echo, + } + + data, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("failed to marshal OneBot request: %w", err) + } + + c.writeMu.Lock() + err = conn.WriteMessage(websocket.TextMessage, data) + c.writeMu.Unlock() + + if err != nil { + logger.ErrorCF("onebot", "Failed to send message", map[string]interface{}{ + "error": err.Error(), + }) + return err + } + + return nil +} + +func (c *OneBotChannel) buildSendRequest(msg bus.OutboundMessage) (string, interface{}, error) { + chatID := msg.ChatID + + if len(chatID) > 6 && chatID[:6] == "group:" { + groupID, err := strconv.ParseInt(chatID[6:], 10, 64) + if err != nil { + return "", nil, fmt.Errorf("invalid group ID in chatID: %s", chatID) + } + return "send_group_msg", oneBotSendGroupMsgParams{ + GroupID: groupID, + Message: msg.Content, + }, nil + } + + if len(chatID) > 8 && chatID[:8] == "private:" { + userID, err := strconv.ParseInt(chatID[8:], 10, 64) + if err != nil { + return "", nil, fmt.Errorf("invalid user ID in chatID: %s", chatID) + } + return "send_private_msg", oneBotSendPrivateMsgParams{ + UserID: userID, + Message: msg.Content, + }, nil + } + + userID, err := strconv.ParseInt(chatID, 10, 64) + if err != nil { + return "", nil, fmt.Errorf("invalid chatID for OneBot: %s", chatID) + } + + return "send_private_msg", oneBotSendPrivateMsgParams{ + UserID: userID, + Message: msg.Content, + }, nil +} + +func (c *OneBotChannel) listen() { + logger.InfoC("onebot", "WebSocket listener started") + for { + select { + case <-c.ctx.Done(): + logger.InfoC("onebot", "WebSocket listener stopping (context cancelled)") + return + default: + c.mu.Lock() + conn := c.conn + c.mu.Unlock() + + if conn == nil { + logger.WarnC("onebot", "WebSocket connection is nil, listener exiting") + return + } + + _, message, err := conn.ReadMessage() + if err != nil { + logger.ErrorCF("onebot", "WebSocket read error", map[string]interface{}{ + "error": err.Error(), + }) + c.mu.Lock() + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + c.mu.Unlock() + return + } + + logger.DebugCF("onebot", "Raw WebSocket message received", map[string]interface{}{ + "length": len(message), + "payload": string(message), + }) + + var raw oneBotRawEvent + if err := json.Unmarshal(message, &raw); err != nil { + logger.WarnCF("onebot", "Failed to unmarshal raw event", map[string]interface{}{ + "error": err.Error(), + "payload": string(message), + }) + continue + } + + if raw.Echo != "" || raw.Status != "" { + logger.DebugCF("onebot", "Received API response, skipping", map[string]interface{}{ + "echo": raw.Echo, + "status": raw.Status, + }) + continue + } + + logger.DebugCF("onebot", "Parsed raw event", map[string]interface{}{ + "post_type": raw.PostType, + "message_type": raw.MessageType, + "sub_type": raw.SubType, + "meta_event_type": raw.MetaEventType, + }) + + c.handleRawEvent(&raw) + } + } +} + +func parseJSONInt64(raw json.RawMessage) (int64, error) { + if len(raw) == 0 { + return 0, nil + } + + var n int64 + if err := json.Unmarshal(raw, &n); err == nil { + return n, nil + } + + var s string + if err := json.Unmarshal(raw, &s); err == nil { + return strconv.ParseInt(s, 10, 64) + } + return 0, fmt.Errorf("cannot parse as int64: %s", string(raw)) +} + +func parseJSONString(raw json.RawMessage) string { + if len(raw) == 0 { + return "" + } + var s string + if err := json.Unmarshal(raw, &s); err == nil { + return s + } + + return string(raw) +} + +type parseMessageResult struct { + Text string + IsBotMentioned bool +} + +func parseMessageContentEx(raw json.RawMessage, selfID int64) parseMessageResult { + if len(raw) == 0 { + return parseMessageResult{} + } + + var s string + if err := json.Unmarshal(raw, &s); err == nil { + mentioned := false + if selfID > 0 { + cqAt := fmt.Sprintf("[CQ:at,qq=%d]", selfID) + if strings.Contains(s, cqAt) { + mentioned = true + s = strings.ReplaceAll(s, cqAt, "") + s = strings.TrimSpace(s) + } + } + return parseMessageResult{Text: s, IsBotMentioned: mentioned} + } + + var segments []map[string]interface{} + if err := json.Unmarshal(raw, &segments); err == nil { + var text string + mentioned := false + selfIDStr := strconv.FormatInt(selfID, 10) + for _, seg := range segments { + segType, _ := seg["type"].(string) + data, _ := seg["data"].(map[string]interface{}) + switch segType { + case "text": + if data != nil { + if t, ok := data["text"].(string); ok { + text += t + } + } + case "at": + if data != nil && selfID > 0 { + qqVal := fmt.Sprintf("%v", data["qq"]) + if qqVal == selfIDStr || qqVal == "all" { + mentioned = true + } + } + } + } + return parseMessageResult{Text: strings.TrimSpace(text), IsBotMentioned: mentioned} + } + return parseMessageResult{} +} + +func (c *OneBotChannel) handleRawEvent(raw *oneBotRawEvent) { + switch raw.PostType { + case "message": + evt, err := c.normalizeMessageEvent(raw) + if err != nil { + logger.WarnCF("onebot", "Failed to normalize message event", map[string]interface{}{ + "error": err.Error(), + }) + return + } + c.handleMessage(evt) + case "meta_event": + c.handleMetaEvent(raw) + case "notice": + logger.DebugCF("onebot", "Notice event received", map[string]interface{}{ + "sub_type": raw.SubType, + }) + case "request": + logger.DebugCF("onebot", "Request event received", map[string]interface{}{ + "sub_type": raw.SubType, + }) + case "": + logger.DebugCF("onebot", "Event with empty post_type (possibly API response)", map[string]interface{}{ + "echo": raw.Echo, + "status": raw.Status, + }) + default: + logger.DebugCF("onebot", "Unknown post_type", map[string]interface{}{ + "post_type": raw.PostType, + }) + } +} + +func (c *OneBotChannel) normalizeMessageEvent(raw *oneBotRawEvent) (*oneBotEvent, error) { + userID, err := parseJSONInt64(raw.UserID) + if err != nil { + return nil, fmt.Errorf("parse user_id: %w (raw: %s)", err, string(raw.UserID)) + } + + groupID, _ := parseJSONInt64(raw.GroupID) + selfID, _ := parseJSONInt64(raw.SelfID) + ts, _ := parseJSONInt64(raw.Time) + messageID := parseJSONString(raw.MessageID) + + parsed := parseMessageContentEx(raw.Message, selfID) + isBotMentioned := parsed.IsBotMentioned + + content := raw.RawMessage + if content == "" { + content = parsed.Text + } else if selfID > 0 { + cqAt := fmt.Sprintf("[CQ:at,qq=%d]", selfID) + if strings.Contains(content, cqAt) { + isBotMentioned = true + content = strings.ReplaceAll(content, cqAt, "") + content = strings.TrimSpace(content) + } + } + + var sender oneBotSender + if len(raw.Sender) > 0 { + if err := json.Unmarshal(raw.Sender, &sender); err != nil { + logger.WarnCF("onebot", "Failed to parse sender", map[string]interface{}{ + "error": err.Error(), + "sender": string(raw.Sender), + }) + } + } + + logger.DebugCF("onebot", "Normalized message event", map[string]interface{}{ + "message_type": raw.MessageType, + "user_id": userID, + "group_id": groupID, + "message_id": messageID, + "content_len": len(content), + "nickname": sender.Nickname, + }) + + return &oneBotEvent{ + PostType: raw.PostType, + MessageType: raw.MessageType, + SubType: raw.SubType, + MessageID: messageID, + UserID: userID, + GroupID: groupID, + Content: content, + RawContent: raw.RawMessage, + IsBotMentioned: isBotMentioned, + Sender: sender, + SelfID: selfID, + Time: ts, + MetaEventType: raw.MetaEventType, + }, nil +} + +func (c *OneBotChannel) handleMetaEvent(raw *oneBotRawEvent) { + switch raw.MetaEventType { + case "lifecycle": + logger.InfoCF("onebot", "Lifecycle event", map[string]interface{}{ + "sub_type": raw.SubType, + }) + case "heartbeat": + logger.DebugC("onebot", "Heartbeat received") + default: + logger.DebugCF("onebot", "Unknown meta_event_type", map[string]interface{}{ + "meta_event_type": raw.MetaEventType, + }) + } +} + +func (c *OneBotChannel) handleMessage(evt *oneBotEvent) { + if c.isDuplicate(evt.MessageID) { + logger.DebugCF("onebot", "Duplicate message, skipping", map[string]interface{}{ + "message_id": evt.MessageID, + }) + return + } + + content := evt.Content + if content == "" { + logger.DebugCF("onebot", "Received empty message, ignoring", map[string]interface{}{ + "message_id": evt.MessageID, + }) + return + } + + senderID := strconv.FormatInt(evt.UserID, 10) + var chatID string + + metadata := map[string]string{ + "message_id": evt.MessageID, + } + + switch evt.MessageType { + case "private": + chatID = "private:" + senderID + logger.InfoCF("onebot", "Received private message", map[string]interface{}{ + "sender": senderID, + "message_id": evt.MessageID, + "length": len(content), + "content": truncate(content, 100), + }) + + case "group": + groupIDStr := strconv.FormatInt(evt.GroupID, 10) + chatID = "group:" + groupIDStr + metadata["group_id"] = groupIDStr + + senderUserID, _ := parseJSONInt64(evt.Sender.UserID) + if senderUserID > 0 { + metadata["sender_user_id"] = strconv.FormatInt(senderUserID, 10) + } + + if evt.Sender.Card != "" { + metadata["sender_name"] = evt.Sender.Card + } else if evt.Sender.Nickname != "" { + metadata["sender_name"] = evt.Sender.Nickname + } + + triggered, strippedContent := c.checkGroupTrigger(content, evt.IsBotMentioned) + if !triggered { + logger.DebugCF("onebot", "Group message ignored (no trigger)", map[string]interface{}{ + "sender": senderID, + "group": groupIDStr, + "is_mentioned": evt.IsBotMentioned, + "content": truncate(content, 100), + }) + return + } + content = strippedContent + + logger.InfoCF("onebot", "Received group message", map[string]interface{}{ + "sender": senderID, + "group": groupIDStr, + "message_id": evt.MessageID, + "is_mentioned": evt.IsBotMentioned, + "length": len(content), + "content": truncate(content, 100), + }) + + default: + logger.WarnCF("onebot", "Unknown message type, cannot route", map[string]interface{}{ + "type": evt.MessageType, + "message_id": evt.MessageID, + "user_id": evt.UserID, + }) + return + } + + if evt.Sender.Nickname != "" { + metadata["nickname"] = evt.Sender.Nickname + } + + logger.DebugCF("onebot", "Forwarding message to bus", map[string]interface{}{ + "sender_id": senderID, + "chat_id": chatID, + "content": truncate(content, 100), + }) + + c.HandleMessage(senderID, chatID, content, []string{}, metadata) +} + +func (c *OneBotChannel) isDuplicate(messageID string) bool { + if messageID == "" || messageID == "0" { + return false + } + + c.mu.Lock() + defer c.mu.Unlock() + + if _, exists := c.dedup[messageID]; exists { + return true + } + + if old := c.dedupRing[c.dedupIdx]; old != "" { + delete(c.dedup, old) + } + c.dedupRing[c.dedupIdx] = messageID + c.dedup[messageID] = struct{}{} + c.dedupIdx = (c.dedupIdx + 1) % len(c.dedupRing) + + return false +} + +func truncate(s string, n int) string { + runes := []rune(s) + if len(runes) <= n { + return s + } + return string(runes[:n]) + "..." +} + +func (c *OneBotChannel) checkGroupTrigger(content string, isBotMentioned bool) (triggered bool, strippedContent string) { + if isBotMentioned { + return true, strings.TrimSpace(content) + } + + for _, prefix := range c.config.GroupTriggerPrefix { + if prefix == "" { + continue + } + if strings.HasPrefix(content, prefix) { + return true, strings.TrimSpace(strings.TrimPrefix(content, prefix)) + } + } + + return false, content +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 6af9438..03dfe31 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -77,6 +77,7 @@ type ChannelsConfig struct { DingTalk DingTalkConfig `json:"dingtalk"` Slack SlackConfig `json:"slack"` LINE LINEConfig `json:"line"` + OneBot OneBotConfig `json:"onebot"` } type WhatsAppConfig struct { @@ -145,6 +146,15 @@ type LINEConfig struct { AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_LINE_ALLOW_FROM"` } +type OneBotConfig struct { + Enabled bool `json:"enabled" env:"PICOCLAW_CHANNELS_ONEBOT_ENABLED"` + WSUrl string `json:"ws_url" env:"PICOCLAW_CHANNELS_ONEBOT_WS_URL"` + AccessToken string `json:"access_token" env:"PICOCLAW_CHANNELS_ONEBOT_ACCESS_TOKEN"` + ReconnectInterval int `json:"reconnect_interval" env:"PICOCLAW_CHANNELS_ONEBOT_RECONNECT_INTERVAL"` + GroupTriggerPrefix []string `json:"group_trigger_prefix" env:"PICOCLAW_CHANNELS_ONEBOT_GROUP_TRIGGER_PREFIX"` + AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_ONEBOT_ALLOW_FROM"` +} + type HeartbeatConfig struct { Enabled bool `json:"enabled" env:"PICOCLAW_HEARTBEAT_ENABLED"` Interval int `json:"interval" env:"PICOCLAW_HEARTBEAT_INTERVAL"` // minutes, min 5 @@ -265,6 +275,14 @@ func DefaultConfig() *Config { WebhookPath: "/webhook/line", AllowFrom: FlexibleStringSlice{}, }, + OneBot: OneBotConfig{ + Enabled: false, + WSUrl: "ws://127.0.0.1:3001", + AccessToken: "", + ReconnectInterval: 5, + GroupTriggerPrefix: []string{}, + AllowFrom: FlexibleStringSlice{}, + }, }, Providers: ProvidersConfig{ Anthropic: ProviderConfig{},