diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 6283251..58c0a25 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -9,6 +9,7 @@ type MessageBus struct { inbound chan InboundMessage outbound chan OutboundMessage handlers map[string]MessageHandler + closed bool mu sync.RWMutex } @@ -21,6 +22,11 @@ func NewMessageBus() *MessageBus { } func (mb *MessageBus) PublishInbound(msg InboundMessage) { + mb.mu.RLock() + defer mb.mu.RUnlock() + if mb.closed { + return + } mb.inbound <- msg } @@ -34,6 +40,11 @@ func (mb *MessageBus) ConsumeInbound(ctx context.Context) (InboundMessage, bool) } func (mb *MessageBus) PublishOutbound(msg OutboundMessage) { + mb.mu.RLock() + defer mb.mu.RUnlock() + if mb.closed { + return + } mb.outbound <- msg } @@ -60,6 +71,12 @@ func (mb *MessageBus) GetHandler(channel string) (MessageHandler, bool) { } func (mb *MessageBus) Close() { + mb.mu.Lock() + defer mb.mu.Unlock() + if mb.closed { + return + } + mb.closed = true close(mb.inbound) close(mb.outbound) }