From 10da094259a45042951cf006a2d39a33d984853d Mon Sep 17 00:00:00 2001 From: Simon Ser Date: Wed, 1 Apr 2020 15:48:56 +0200 Subject: [PATCH] Stop ring consumers when deleting network --- downstream.go | 7 ++++++- ring.go | 20 ++++++++++++++++++++ user.go | 1 + 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/downstream.go b/downstream.go index 41f1aeb..f3e0fca 100644 --- a/downstream.go +++ b/downstream.go @@ -739,7 +739,12 @@ func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) { for { var closed bool select { - case <-ch: + case _, ok := <-ch: + if !ok { + closed = true + break + } + uc := net.upstream() if uc == nil { dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) diff --git a/ring.go b/ring.go index 9bb23ac..24923db 100644 --- a/ring.go +++ b/ring.go @@ -16,6 +16,7 @@ type Ring struct { lock sync.Mutex cur uint64 consumers []*RingConsumer + closed bool } // NewRing creates a new ring buffer. @@ -31,6 +32,10 @@ func (r *Ring) Produce(msg *irc.Message) { r.lock.Lock() defer r.lock.Unlock() + if r.closed { + panic("soju: Ring.Produce called after Close") + } + i := int(r.cur % r.cap) r.buffer[i] = msg r.cur++ @@ -45,6 +50,21 @@ func (r *Ring) Produce(msg *irc.Message) { } } +func (r *Ring) Close() { + r.lock.Lock() + defer r.lock.Unlock() + + if r.closed { + panic("soju: Ring.Close called twice") + } + + for _, rc := range r.consumers { + close(rc.ch) + } + + r.closed = true +} + // NewConsumer creates a new ring buffer consumer. // // If seq is nil, the consumer will get messages starting from the last diff --git a/user.go b/user.go index 7e0a299..d335e52 100644 --- a/user.go +++ b/user.go @@ -305,6 +305,7 @@ func (u *user) deleteNetwork(id int64) error { }) net.Stop() + net.ring.Close() u.networks = append(u.networks[:i], u.networks[i+1:]...) return nil }