From 293a0e8e202a20891e8aa7ef4ca1a97d11fd2b4b Mon Sep 17 00:00:00 2001 From: Simon Ser Date: Wed, 25 Mar 2020 10:53:08 +0100 Subject: [PATCH] Move upstreamConn.ring to network This handles upstream disconnection and re-connection better. --- downstream.go | 23 +++++++++++++++-------- upstream.go | 4 +--- user.go | 2 ++ 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/downstream.go b/downstream.go index 3bf3da6..cd2cb77 100644 --- a/downstream.go +++ b/downstream.go @@ -699,27 +699,34 @@ func (dc *downstreamConn) register() error { forwardChannel(dc, ch) } } + }) + dc.forEachNetwork(func(net *network) { historyName := dc.rawUsername // TODO: need to take dc.network into account here var seqPtr *uint64 if firstDownstream { - uc.network.lock.Lock() - seq, ok := uc.network.history[historyName] - uc.network.lock.Unlock() + net.lock.Lock() + seq, ok := net.history[historyName] + net.lock.Unlock() if ok { seqPtr = &seq } } // TODO: we need to create a consumer when adding networks on-the-fly - consumer, ch := uc.ring.NewConsumer(seqPtr) + consumer, ch := net.ring.NewConsumer(seqPtr) go func() { for { var closed bool select { case <-ch: + uc := net.upstream() + if uc == nil { + dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) + break + } dc.ringMessages <- ringMessage{consumer, uc} case <-dc.closed: closed = true @@ -737,9 +744,9 @@ func (dc *downstreamConn) register() error { dc.user.lock.Unlock() if lastDownstream { - uc.network.lock.Lock() - uc.network.history[historyName] = seq - uc.network.lock.Unlock() + net.lock.Lock() + net.history[historyName] = seq + net.lock.Unlock() } }() }) @@ -1086,7 +1093,7 @@ func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error { dc.ourMessages[echoMsg] = struct{}{} dc.lock.Unlock() - uc.ring.Produce(echoMsg) + uc.network.ring.Produce(echoMsg) } default: dc.logger.Printf("unhandled message: %v", msg) diff --git a/upstream.go b/upstream.go index 7ef5f7e..0dff3c3 100644 --- a/upstream.go +++ b/upstream.go @@ -34,7 +34,6 @@ type upstreamConn struct { srv *Server user *user outgoing chan<- *irc.Message - ring *Ring serverName string availableUserModes string @@ -80,7 +79,6 @@ func connectToUpstream(network *network) (*upstreamConn, error) { srv: network.user.srv, user: network.user, outgoing: outgoing, - ring: NewRing(network.user.srv.RingCap), channels: make(map[string]*upstreamChannel), caps: make(map[string]string), availableChannelTypes: stdChannelTypes, @@ -874,7 +872,7 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error { break } - uc.ring.Produce(msg) + uc.network.ring.Produce(msg) case "INVITE": var nick string var channel string diff --git a/user.go b/user.go index 245664c..b4e4060 100644 --- a/user.go +++ b/user.go @@ -20,6 +20,7 @@ type downstreamIncomingMessage struct { type network struct { Network user *user + ring *Ring lock sync.Mutex conn *upstreamConn @@ -30,6 +31,7 @@ func newNetwork(user *user, record *Network) *network { return &network{ Network: *record, user: user, + ring: NewRing(user.srv.RingCap), history: make(map[string]uint64), } }