Merge pull request #165 from alexhoshina/feat/onebot

feat: add OneBot channel support
This commit is contained in:
Meng Zhuo
2026-02-15 09:12:00 +08:00
committed by GitHub
4 changed files with 725 additions and 0 deletions

View File

@@ -60,6 +60,14 @@
"webhook_port": 18791,
"webhook_path": "/webhook/line",
"allow_from": []
},
"onebot": {
"enabled": false,
"ws_url": "ws://127.0.0.1:3001",
"access_token": "",
"reconnect_interval": 5,
"group_trigger_prefix": [],
"allow_from": []
}
},
"providers": {

View File

@@ -163,6 +163,19 @@ func (m *Manager) initChannels() error {
}
}
if m.config.Channels.OneBot.Enabled && m.config.Channels.OneBot.WSUrl != "" {
logger.DebugC("channels", "Attempting to initialize OneBot channel")
onebot, err := NewOneBotChannel(m.config.Channels.OneBot, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize OneBot channel", map[string]interface{}{
"error": err.Error(),
})
} else {
m.channels["onebot"] = onebot
logger.InfoC("channels", "OneBot channel enabled successfully")
}
}
logger.InfoCF("channels", "Channel initialization completed", map[string]interface{}{
"enabled_channels": len(m.channels),
})

686
pkg/channels/onebot.go Normal file
View File

@@ -0,0 +1,686 @@
package channels
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/logger"
)
type OneBotChannel struct {
*BaseChannel
config config.OneBotConfig
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
dedup map[string]struct{}
dedupRing []string
dedupIdx int
mu sync.Mutex
writeMu sync.Mutex
echoCounter int64
}
type oneBotRawEvent struct {
PostType string `json:"post_type"`
MessageType string `json:"message_type"`
SubType string `json:"sub_type"`
MessageID json.RawMessage `json:"message_id"`
UserID json.RawMessage `json:"user_id"`
GroupID json.RawMessage `json:"group_id"`
RawMessage string `json:"raw_message"`
Message json.RawMessage `json:"message"`
Sender json.RawMessage `json:"sender"`
SelfID json.RawMessage `json:"self_id"`
Time json.RawMessage `json:"time"`
MetaEventType string `json:"meta_event_type"`
Echo string `json:"echo"`
RetCode json.RawMessage `json:"retcode"`
Status BotStatus `json:"status"`
}
type BotStatus struct {
Online bool `json:"online"`
Good bool `json:"good"`
}
type oneBotSender struct {
UserID json.RawMessage `json:"user_id"`
Nickname string `json:"nickname"`
Card string `json:"card"`
}
type oneBotEvent struct {
PostType string
MessageType string
SubType string
MessageID string
UserID int64
GroupID int64
Content string
RawContent string
IsBotMentioned bool
Sender oneBotSender
SelfID int64
Time int64
MetaEventType string
}
type oneBotAPIRequest struct {
Action string `json:"action"`
Params interface{} `json:"params"`
Echo string `json:"echo,omitempty"`
}
type oneBotSendPrivateMsgParams struct {
UserID int64 `json:"user_id"`
Message string `json:"message"`
}
type oneBotSendGroupMsgParams struct {
GroupID int64 `json:"group_id"`
Message string `json:"message"`
}
func NewOneBotChannel(cfg config.OneBotConfig, messageBus *bus.MessageBus) (*OneBotChannel, error) {
base := NewBaseChannel("onebot", cfg, messageBus, cfg.AllowFrom)
const dedupSize = 1024
return &OneBotChannel{
BaseChannel: base,
config: cfg,
dedup: make(map[string]struct{}, dedupSize),
dedupRing: make([]string, dedupSize),
dedupIdx: 0,
}, nil
}
func (c *OneBotChannel) Start(ctx context.Context) error {
if c.config.WSUrl == "" {
return fmt.Errorf("OneBot ws_url not configured")
}
logger.InfoCF("onebot", "Starting OneBot channel", map[string]interface{}{
"ws_url": c.config.WSUrl,
})
c.ctx, c.cancel = context.WithCancel(ctx)
if err := c.connect(); err != nil {
logger.WarnCF("onebot", "Initial connection failed, will retry in background", map[string]interface{}{
"error": err.Error(),
})
} else {
go c.listen()
}
if c.config.ReconnectInterval > 0 {
go c.reconnectLoop()
} else {
// If reconnect is disabled but initial connection failed, we cannot recover
if c.conn == nil {
return fmt.Errorf("failed to connect to OneBot and reconnect is disabled")
}
}
c.setRunning(true)
logger.InfoC("onebot", "OneBot channel started successfully")
return nil
}
func (c *OneBotChannel) connect() error {
dialer := websocket.DefaultDialer
dialer.HandshakeTimeout = 10 * time.Second
header := make(map[string][]string)
if c.config.AccessToken != "" {
header["Authorization"] = []string{"Bearer " + c.config.AccessToken}
}
conn, _, err := dialer.Dial(c.config.WSUrl, header)
if err != nil {
return err
}
c.mu.Lock()
c.conn = conn
c.mu.Unlock()
logger.InfoC("onebot", "WebSocket connected")
return nil
}
func (c *OneBotChannel) reconnectLoop() {
interval := time.Duration(c.config.ReconnectInterval) * time.Second
if interval < 5*time.Second {
interval = 5 * time.Second
}
for {
select {
case <-c.ctx.Done():
return
case <-time.After(interval):
c.mu.Lock()
conn := c.conn
c.mu.Unlock()
if conn == nil {
logger.InfoC("onebot", "Attempting to reconnect...")
if err := c.connect(); err != nil {
logger.ErrorCF("onebot", "Reconnect failed", map[string]interface{}{
"error": err.Error(),
})
} else {
go c.listen()
}
}
}
}
}
func (c *OneBotChannel) Stop(ctx context.Context) error {
logger.InfoC("onebot", "Stopping OneBot channel")
c.setRunning(false)
if c.cancel != nil {
c.cancel()
}
c.mu.Lock()
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.mu.Unlock()
return nil
}
func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
if !c.IsRunning() {
return fmt.Errorf("OneBot channel not running")
}
c.mu.Lock()
conn := c.conn
c.mu.Unlock()
if conn == nil {
return fmt.Errorf("OneBot WebSocket not connected")
}
action, params, err := c.buildSendRequest(msg)
if err != nil {
return err
}
c.writeMu.Lock()
c.echoCounter++
echo := fmt.Sprintf("send_%d", c.echoCounter)
c.writeMu.Unlock()
req := oneBotAPIRequest{
Action: action,
Params: params,
Echo: echo,
}
data, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("failed to marshal OneBot request: %w", err)
}
c.writeMu.Lock()
err = conn.WriteMessage(websocket.TextMessage, data)
c.writeMu.Unlock()
if err != nil {
logger.ErrorCF("onebot", "Failed to send message", map[string]interface{}{
"error": err.Error(),
})
return err
}
return nil
}
func (c *OneBotChannel) buildSendRequest(msg bus.OutboundMessage) (string, interface{}, error) {
chatID := msg.ChatID
if len(chatID) > 6 && chatID[:6] == "group:" {
groupID, err := strconv.ParseInt(chatID[6:], 10, 64)
if err != nil {
return "", nil, fmt.Errorf("invalid group ID in chatID: %s", chatID)
}
return "send_group_msg", oneBotSendGroupMsgParams{
GroupID: groupID,
Message: msg.Content,
}, nil
}
if len(chatID) > 8 && chatID[:8] == "private:" {
userID, err := strconv.ParseInt(chatID[8:], 10, 64)
if err != nil {
return "", nil, fmt.Errorf("invalid user ID in chatID: %s", chatID)
}
return "send_private_msg", oneBotSendPrivateMsgParams{
UserID: userID,
Message: msg.Content,
}, nil
}
userID, err := strconv.ParseInt(chatID, 10, 64)
if err != nil {
return "", nil, fmt.Errorf("invalid chatID for OneBot: %s", chatID)
}
return "send_private_msg", oneBotSendPrivateMsgParams{
UserID: userID,
Message: msg.Content,
}, nil
}
func (c *OneBotChannel) listen() {
for {
select {
case <-c.ctx.Done():
return
default:
c.mu.Lock()
conn := c.conn
c.mu.Unlock()
if conn == nil {
logger.WarnC("onebot", "WebSocket connection is nil, listener exiting")
return
}
_, message, err := conn.ReadMessage()
if err != nil {
logger.ErrorCF("onebot", "WebSocket read error", map[string]interface{}{
"error": err.Error(),
})
c.mu.Lock()
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.mu.Unlock()
return
}
logger.DebugCF("onebot", "Raw WebSocket message received", map[string]interface{}{
"length": len(message),
"payload": string(message),
})
var raw oneBotRawEvent
if err := json.Unmarshal(message, &raw); err != nil {
logger.WarnCF("onebot", "Failed to unmarshal raw event", map[string]interface{}{
"error": err.Error(),
"payload": string(message),
})
continue
}
if raw.Echo != "" || raw.Status.Online || raw.Status.Good {
logger.DebugCF("onebot", "Received API response, skipping", map[string]interface{}{
"echo": raw.Echo,
"status": raw.Status,
})
continue
}
logger.DebugCF("onebot", "Parsed raw event", map[string]interface{}{
"post_type": raw.PostType,
"message_type": raw.MessageType,
"sub_type": raw.SubType,
"meta_event_type": raw.MetaEventType,
})
c.handleRawEvent(&raw)
}
}
}
func parseJSONInt64(raw json.RawMessage) (int64, error) {
if len(raw) == 0 {
return 0, nil
}
var n int64
if err := json.Unmarshal(raw, &n); err == nil {
return n, nil
}
var s string
if err := json.Unmarshal(raw, &s); err == nil {
return strconv.ParseInt(s, 10, 64)
}
return 0, fmt.Errorf("cannot parse as int64: %s", string(raw))
}
func parseJSONString(raw json.RawMessage) string {
if len(raw) == 0 {
return ""
}
var s string
if err := json.Unmarshal(raw, &s); err == nil {
return s
}
return string(raw)
}
type parseMessageResult struct {
Text string
IsBotMentioned bool
}
func parseMessageContentEx(raw json.RawMessage, selfID int64) parseMessageResult {
if len(raw) == 0 {
return parseMessageResult{}
}
var s string
if err := json.Unmarshal(raw, &s); err == nil {
mentioned := false
if selfID > 0 {
cqAt := fmt.Sprintf("[CQ:at,qq=%d]", selfID)
if strings.Contains(s, cqAt) {
mentioned = true
s = strings.ReplaceAll(s, cqAt, "")
s = strings.TrimSpace(s)
}
}
return parseMessageResult{Text: s, IsBotMentioned: mentioned}
}
var segments []map[string]interface{}
if err := json.Unmarshal(raw, &segments); err == nil {
var text string
mentioned := false
selfIDStr := strconv.FormatInt(selfID, 10)
for _, seg := range segments {
segType, _ := seg["type"].(string)
data, _ := seg["data"].(map[string]interface{})
switch segType {
case "text":
if data != nil {
if t, ok := data["text"].(string); ok {
text += t
}
}
case "at":
if data != nil && selfID > 0 {
qqVal := fmt.Sprintf("%v", data["qq"])
if qqVal == selfIDStr || qqVal == "all" {
mentioned = true
}
}
}
}
return parseMessageResult{Text: strings.TrimSpace(text), IsBotMentioned: mentioned}
}
return parseMessageResult{}
}
func (c *OneBotChannel) handleRawEvent(raw *oneBotRawEvent) {
switch raw.PostType {
case "message":
evt, err := c.normalizeMessageEvent(raw)
if err != nil {
logger.WarnCF("onebot", "Failed to normalize message event", map[string]interface{}{
"error": err.Error(),
})
return
}
c.handleMessage(evt)
case "meta_event":
c.handleMetaEvent(raw)
case "notice":
logger.DebugCF("onebot", "Notice event received", map[string]interface{}{
"sub_type": raw.SubType,
})
case "request":
logger.DebugCF("onebot", "Request event received", map[string]interface{}{
"sub_type": raw.SubType,
})
case "":
logger.DebugCF("onebot", "Event with empty post_type (possibly API response)", map[string]interface{}{
"echo": raw.Echo,
"status": raw.Status,
})
default:
logger.DebugCF("onebot", "Unknown post_type", map[string]interface{}{
"post_type": raw.PostType,
})
}
}
func (c *OneBotChannel) normalizeMessageEvent(raw *oneBotRawEvent) (*oneBotEvent, error) {
userID, err := parseJSONInt64(raw.UserID)
if err != nil {
return nil, fmt.Errorf("parse user_id: %w (raw: %s)", err, string(raw.UserID))
}
groupID, _ := parseJSONInt64(raw.GroupID)
selfID, _ := parseJSONInt64(raw.SelfID)
ts, _ := parseJSONInt64(raw.Time)
messageID := parseJSONString(raw.MessageID)
parsed := parseMessageContentEx(raw.Message, selfID)
isBotMentioned := parsed.IsBotMentioned
content := raw.RawMessage
if content == "" {
content = parsed.Text
} else if selfID > 0 {
cqAt := fmt.Sprintf("[CQ:at,qq=%d]", selfID)
if strings.Contains(content, cqAt) {
isBotMentioned = true
content = strings.ReplaceAll(content, cqAt, "")
content = strings.TrimSpace(content)
}
}
var sender oneBotSender
if len(raw.Sender) > 0 {
if err := json.Unmarshal(raw.Sender, &sender); err != nil {
logger.WarnCF("onebot", "Failed to parse sender", map[string]interface{}{
"error": err.Error(),
"sender": string(raw.Sender),
})
}
}
logger.DebugCF("onebot", "Normalized message event", map[string]interface{}{
"message_type": raw.MessageType,
"user_id": userID,
"group_id": groupID,
"message_id": messageID,
"content_len": len(content),
"nickname": sender.Nickname,
})
return &oneBotEvent{
PostType: raw.PostType,
MessageType: raw.MessageType,
SubType: raw.SubType,
MessageID: messageID,
UserID: userID,
GroupID: groupID,
Content: content,
RawContent: raw.RawMessage,
IsBotMentioned: isBotMentioned,
Sender: sender,
SelfID: selfID,
Time: ts,
MetaEventType: raw.MetaEventType,
}, nil
}
func (c *OneBotChannel) handleMetaEvent(raw *oneBotRawEvent) {
switch raw.MetaEventType {
case "lifecycle":
logger.InfoCF("onebot", "Lifecycle event", map[string]interface{}{
"sub_type": raw.SubType,
})
case "heartbeat":
logger.DebugC("onebot", "Heartbeat received")
default:
logger.DebugCF("onebot", "Unknown meta_event_type", map[string]interface{}{
"meta_event_type": raw.MetaEventType,
})
}
}
func (c *OneBotChannel) handleMessage(evt *oneBotEvent) {
if c.isDuplicate(evt.MessageID) {
logger.DebugCF("onebot", "Duplicate message, skipping", map[string]interface{}{
"message_id": evt.MessageID,
})
return
}
content := evt.Content
if content == "" {
logger.DebugCF("onebot", "Received empty message, ignoring", map[string]interface{}{
"message_id": evt.MessageID,
})
return
}
senderID := strconv.FormatInt(evt.UserID, 10)
var chatID string
metadata := map[string]string{
"message_id": evt.MessageID,
}
switch evt.MessageType {
case "private":
chatID = "private:" + senderID
logger.InfoCF("onebot", "Received private message", map[string]interface{}{
"sender": senderID,
"message_id": evt.MessageID,
"length": len(content),
"content": truncate(content, 100),
})
case "group":
groupIDStr := strconv.FormatInt(evt.GroupID, 10)
chatID = "group:" + groupIDStr
metadata["group_id"] = groupIDStr
senderUserID, _ := parseJSONInt64(evt.Sender.UserID)
if senderUserID > 0 {
metadata["sender_user_id"] = strconv.FormatInt(senderUserID, 10)
}
if evt.Sender.Card != "" {
metadata["sender_name"] = evt.Sender.Card
} else if evt.Sender.Nickname != "" {
metadata["sender_name"] = evt.Sender.Nickname
}
triggered, strippedContent := c.checkGroupTrigger(content, evt.IsBotMentioned)
if !triggered {
logger.DebugCF("onebot", "Group message ignored (no trigger)", map[string]interface{}{
"sender": senderID,
"group": groupIDStr,
"is_mentioned": evt.IsBotMentioned,
"content": truncate(content, 100),
})
return
}
content = strippedContent
logger.InfoCF("onebot", "Received group message", map[string]interface{}{
"sender": senderID,
"group": groupIDStr,
"message_id": evt.MessageID,
"is_mentioned": evt.IsBotMentioned,
"length": len(content),
"content": truncate(content, 100),
})
default:
logger.WarnCF("onebot", "Unknown message type, cannot route", map[string]interface{}{
"type": evt.MessageType,
"message_id": evt.MessageID,
"user_id": evt.UserID,
})
return
}
if evt.Sender.Nickname != "" {
metadata["nickname"] = evt.Sender.Nickname
}
logger.DebugCF("onebot", "Forwarding message to bus", map[string]interface{}{
"sender_id": senderID,
"chat_id": chatID,
"content": truncate(content, 100),
})
c.HandleMessage(senderID, chatID, content, []string{}, metadata)
}
func (c *OneBotChannel) isDuplicate(messageID string) bool {
if messageID == "" || messageID == "0" {
return false
}
c.mu.Lock()
defer c.mu.Unlock()
if _, exists := c.dedup[messageID]; exists {
return true
}
if old := c.dedupRing[c.dedupIdx]; old != "" {
delete(c.dedup, old)
}
c.dedupRing[c.dedupIdx] = messageID
c.dedup[messageID] = struct{}{}
c.dedupIdx = (c.dedupIdx + 1) % len(c.dedupRing)
return false
}
func truncate(s string, n int) string {
runes := []rune(s)
if len(runes) <= n {
return s
}
return string(runes[:n]) + "..."
}
func (c *OneBotChannel) checkGroupTrigger(content string, isBotMentioned bool) (triggered bool, strippedContent string) {
if isBotMentioned {
return true, strings.TrimSpace(content)
}
for _, prefix := range c.config.GroupTriggerPrefix {
if prefix == "" {
continue
}
if strings.HasPrefix(content, prefix) {
return true, strings.TrimSpace(strings.TrimPrefix(content, prefix))
}
}
return false, content
}

View File

@@ -78,6 +78,7 @@ type ChannelsConfig struct {
DingTalk DingTalkConfig `json:"dingtalk"`
Slack SlackConfig `json:"slack"`
LINE LINEConfig `json:"line"`
OneBot OneBotConfig `json:"onebot"`
}
type WhatsAppConfig struct {
@@ -146,6 +147,15 @@ type LINEConfig struct {
AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_LINE_ALLOW_FROM"`
}
type OneBotConfig struct {
Enabled bool `json:"enabled" env:"PICOCLAW_CHANNELS_ONEBOT_ENABLED"`
WSUrl string `json:"ws_url" env:"PICOCLAW_CHANNELS_ONEBOT_WS_URL"`
AccessToken string `json:"access_token" env:"PICOCLAW_CHANNELS_ONEBOT_ACCESS_TOKEN"`
ReconnectInterval int `json:"reconnect_interval" env:"PICOCLAW_CHANNELS_ONEBOT_RECONNECT_INTERVAL"`
GroupTriggerPrefix []string `json:"group_trigger_prefix" env:"PICOCLAW_CHANNELS_ONEBOT_GROUP_TRIGGER_PREFIX"`
AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_ONEBOT_ALLOW_FROM"`
}
type HeartbeatConfig struct {
Enabled bool `json:"enabled" env:"PICOCLAW_HEARTBEAT_ENABLED"`
Interval int `json:"interval" env:"PICOCLAW_HEARTBEAT_INTERVAL"` // minutes, min 5
@@ -274,6 +284,14 @@ func DefaultConfig() *Config {
WebhookPath: "/webhook/line",
AllowFrom: FlexibleStringSlice{},
},
OneBot: OneBotConfig{
Enabled: false,
WSUrl: "ws://127.0.0.1:3001",
AccessToken: "",
ReconnectInterval: 5,
GroupTriggerPrefix: []string{},
AllowFrom: FlexibleStringSlice{},
},
},
Providers: ProvidersConfig{
Anthropic: ProviderConfig{},