Stop ring consumers when deleting network
This commit is contained in:
parent
96039320b6
commit
10da094259
@ -739,7 +739,12 @@ func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) {
|
|||||||
for {
|
for {
|
||||||
var closed bool
|
var closed bool
|
||||||
select {
|
select {
|
||||||
case <-ch:
|
case _, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
closed = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
uc := net.upstream()
|
uc := net.upstream()
|
||||||
if uc == nil {
|
if uc == nil {
|
||||||
dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
|
dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
|
||||||
|
20
ring.go
20
ring.go
@ -16,6 +16,7 @@ type Ring struct {
|
|||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
cur uint64
|
cur uint64
|
||||||
consumers []*RingConsumer
|
consumers []*RingConsumer
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRing creates a new ring buffer.
|
// NewRing creates a new ring buffer.
|
||||||
@ -31,6 +32,10 @@ func (r *Ring) Produce(msg *irc.Message) {
|
|||||||
r.lock.Lock()
|
r.lock.Lock()
|
||||||
defer r.lock.Unlock()
|
defer r.lock.Unlock()
|
||||||
|
|
||||||
|
if r.closed {
|
||||||
|
panic("soju: Ring.Produce called after Close")
|
||||||
|
}
|
||||||
|
|
||||||
i := int(r.cur % r.cap)
|
i := int(r.cur % r.cap)
|
||||||
r.buffer[i] = msg
|
r.buffer[i] = msg
|
||||||
r.cur++
|
r.cur++
|
||||||
@ -45,6 +50,21 @@ func (r *Ring) Produce(msg *irc.Message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Ring) Close() {
|
||||||
|
r.lock.Lock()
|
||||||
|
defer r.lock.Unlock()
|
||||||
|
|
||||||
|
if r.closed {
|
||||||
|
panic("soju: Ring.Close called twice")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, rc := range r.consumers {
|
||||||
|
close(rc.ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.closed = true
|
||||||
|
}
|
||||||
|
|
||||||
// NewConsumer creates a new ring buffer consumer.
|
// NewConsumer creates a new ring buffer consumer.
|
||||||
//
|
//
|
||||||
// If seq is nil, the consumer will get messages starting from the last
|
// If seq is nil, the consumer will get messages starting from the last
|
||||||
|
Loading…
Reference in New Issue
Block a user