Prevent panic on publish after MessageBus is closed (#223)
This commit is contained in:
@@ -9,6 +9,7 @@ type MessageBus struct {
|
|||||||
inbound chan InboundMessage
|
inbound chan InboundMessage
|
||||||
outbound chan OutboundMessage
|
outbound chan OutboundMessage
|
||||||
handlers map[string]MessageHandler
|
handlers map[string]MessageHandler
|
||||||
|
closed bool
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -21,6 +22,11 @@ func NewMessageBus() *MessageBus {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mb *MessageBus) PublishInbound(msg InboundMessage) {
|
func (mb *MessageBus) PublishInbound(msg InboundMessage) {
|
||||||
|
mb.mu.RLock()
|
||||||
|
defer mb.mu.RUnlock()
|
||||||
|
if mb.closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
mb.inbound <- msg
|
mb.inbound <- msg
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -34,6 +40,11 @@ func (mb *MessageBus) ConsumeInbound(ctx context.Context) (InboundMessage, bool)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mb *MessageBus) PublishOutbound(msg OutboundMessage) {
|
func (mb *MessageBus) PublishOutbound(msg OutboundMessage) {
|
||||||
|
mb.mu.RLock()
|
||||||
|
defer mb.mu.RUnlock()
|
||||||
|
if mb.closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
mb.outbound <- msg
|
mb.outbound <- msg
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,6 +71,12 @@ func (mb *MessageBus) GetHandler(channel string) (MessageHandler, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mb *MessageBus) Close() {
|
func (mb *MessageBus) Close() {
|
||||||
|
mb.mu.Lock()
|
||||||
|
defer mb.mu.Unlock()
|
||||||
|
if mb.closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mb.closed = true
|
||||||
close(mb.inbound)
|
close(mb.inbound)
|
||||||
close(mb.outbound)
|
close(mb.outbound)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user