package channels import ( "context" "encoding/json" "fmt" "net" "sync" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/logger" ) type MaixCamChannel struct { *BaseChannel config config.MaixCamConfig listener net.Listener clients map[net.Conn]bool clientsMux sync.RWMutex running bool } type MaixCamMessage struct { Type string `json:"type"` Tips string `json:"tips"` Timestamp float64 `json:"timestamp"` Data map[string]interface{} `json:"data"` } func NewMaixCamChannel(cfg config.MaixCamConfig, bus *bus.MessageBus) (*MaixCamChannel, error) { base := NewBaseChannel("maixcam", cfg, bus, cfg.AllowFrom) return &MaixCamChannel{ BaseChannel: base, config: cfg, clients: make(map[net.Conn]bool), running: false, }, nil } func (c *MaixCamChannel) Start(ctx context.Context) error { logger.InfoC("maixcam", "Starting MaixCam channel server") addr := fmt.Sprintf("%s:%d", c.config.Host, c.config.Port) listener, err := net.Listen("tcp", addr) if err != nil { return fmt.Errorf("failed to listen on %s: %w", addr, err) } c.listener = listener c.setRunning(true) logger.InfoCF("maixcam", "MaixCam server listening", map[string]interface{}{ "host": c.config.Host, "port": c.config.Port, }) go c.acceptConnections(ctx) return nil } func (c *MaixCamChannel) acceptConnections(ctx context.Context) { logger.DebugC("maixcam", "Starting connection acceptor") for { select { case <-ctx.Done(): logger.InfoC("maixcam", "Stopping connection acceptor") return default: conn, err := c.listener.Accept() if err != nil { if c.running { logger.ErrorCF("maixcam", "Failed to accept connection", map[string]interface{}{ "error": err.Error(), }) } return } logger.InfoCF("maixcam", "New connection from MaixCam device", map[string]interface{}{ "remote_addr": conn.RemoteAddr().String(), }) c.clientsMux.Lock() c.clients[conn] = true c.clientsMux.Unlock() go c.handleConnection(conn, ctx) } } } func (c *MaixCamChannel) handleConnection(conn net.Conn, ctx context.Context) { logger.DebugC("maixcam", "Handling MaixCam connection") defer func() { conn.Close() c.clientsMux.Lock() delete(c.clients, conn) c.clientsMux.Unlock() logger.DebugC("maixcam", "Connection closed") }() decoder := json.NewDecoder(conn) for { select { case <-ctx.Done(): return default: var msg MaixCamMessage if err := decoder.Decode(&msg); err != nil { if err.Error() != "EOF" { logger.ErrorCF("maixcam", "Failed to decode message", map[string]interface{}{ "error": err.Error(), }) } return } c.processMessage(msg, conn) } } } func (c *MaixCamChannel) processMessage(msg MaixCamMessage, conn net.Conn) { switch msg.Type { case "person_detected": c.handlePersonDetection(msg) case "heartbeat": logger.DebugC("maixcam", "Received heartbeat") case "status": c.handleStatusUpdate(msg) default: logger.WarnCF("maixcam", "Unknown message type", map[string]interface{}{ "type": msg.Type, }) } } func (c *MaixCamChannel) handlePersonDetection(msg MaixCamMessage) { logger.InfoCF("maixcam", "", map[string]interface{}{ "timestamp": msg.Timestamp, "data": msg.Data, }) senderID := "maixcam" chatID := "default" classInfo, ok := msg.Data["class_name"].(string) if !ok { classInfo = "person" } score, _ := msg.Data["score"].(float64) x, _ := msg.Data["x"].(float64) y, _ := msg.Data["y"].(float64) w, _ := msg.Data["w"].(float64) h, _ := msg.Data["h"].(float64) content := fmt.Sprintf("📷 Person detected!\nClass: %s\nConfidence: %.2f%%\nPosition: (%.0f, %.0f)\nSize: %.0fx%.0f", classInfo, score*100, x, y, w, h) metadata := map[string]string{ "timestamp": fmt.Sprintf("%.0f", msg.Timestamp), "class_id": fmt.Sprintf("%.0f", msg.Data["class_id"]), "score": fmt.Sprintf("%.2f", score), "x": fmt.Sprintf("%.0f", x), "y": fmt.Sprintf("%.0f", y), "w": fmt.Sprintf("%.0f", w), "h": fmt.Sprintf("%.0f", h), } c.HandleMessage(senderID, chatID, content, []string{}, metadata) } func (c *MaixCamChannel) handleStatusUpdate(msg MaixCamMessage) { logger.InfoCF("maixcam", "Status update from MaixCam", map[string]interface{}{ "status": msg.Data, }) } func (c *MaixCamChannel) Stop(ctx context.Context) error { logger.InfoC("maixcam", "Stopping MaixCam channel") c.setRunning(false) if c.listener != nil { c.listener.Close() } c.clientsMux.Lock() defer c.clientsMux.Unlock() for conn := range c.clients { conn.Close() } c.clients = make(map[net.Conn]bool) logger.InfoC("maixcam", "MaixCam channel stopped") return nil } func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.IsRunning() { return fmt.Errorf("maixcam channel not running") } c.clientsMux.RLock() defer c.clientsMux.RUnlock() if len(c.clients) == 0 { logger.WarnC("maixcam", "No MaixCam devices connected") return fmt.Errorf("no connected MaixCam devices") } response := map[string]interface{}{ "type": "command", "timestamp": float64(0), "message": msg.Content, "chat_id": msg.ChatID, } data, err := json.Marshal(response) if err != nil { return fmt.Errorf("failed to marshal response: %w", err) } var sendErr error for conn := range c.clients { if _, err := conn.Write(data); err != nil { logger.ErrorCF("maixcam", "Failed to send to client", map[string]interface{}{ "client": conn.RemoteAddr().String(), "error": err.Error(), }) sendErr = err } } return sendErr }