From f0bc919885b6d2ad8144dab28ad9ed085698962b Mon Sep 17 00:00:00 2001 From: Simon Ser Date: Mon, 6 Apr 2020 18:31:48 +0200 Subject: [PATCH] Remove downstreamConn.ringConsumers We no longer need long-lived ring buffer consumers now that upstreamConn.produce dispatches messages to downstream connections. --- downstream.go | 11 ++++------- ring.go | 4 ++++ upstream.go | 2 +- user.go | 13 +++---------- 4 files changed, 12 insertions(+), 18 deletions(-) diff --git a/downstream.go b/downstream.go index 14dd0ef..95d9400 100644 --- a/downstream.go +++ b/downstream.go @@ -66,7 +66,6 @@ type downstreamConn struct { password string // empty after authentication network *network // can be nil - ringConsumers map[*network]*RingConsumer ourMessages map[*irc.Message]struct{} caps map[string]bool @@ -79,11 +78,10 @@ type downstreamConn struct { func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn { logger := &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())} dc := &downstreamConn{ - conn: *newConn(srv, netConn, logger), - id: id, - ringConsumers: make(map[*network]*RingConsumer), - ourMessages: make(map[*irc.Message]struct{}), - caps: make(map[string]bool), + conn: *newConn(srv, netConn, logger), + id: id, + ourMessages: make(map[*irc.Message]struct{}), + caps: make(map[string]bool), } dc.hostname = netConn.RemoteAddr().String() if host, _, err := net.SplitHostPort(dc.hostname); err == nil { @@ -683,7 +681,6 @@ func (dc *downstreamConn) welcome() error { } consumer := net.ring.NewConsumer(seqPtr) - dc.ringConsumers[net] = consumer // TODO: this means all history is lost when trying to send it while the // upstream is disconnected. We need to store history differently so that diff --git a/ring.go b/ring.go index 949086d..aae96d8 100644 --- a/ring.go +++ b/ring.go @@ -36,6 +36,10 @@ func (r *Ring) Produce(msg *irc.Message) { r.cur++ } +func (r *Ring) Cur() uint64 { + return r.cur +} + func (r *Ring) Close() { if r.closed { panic("soju: Ring.Close called twice") diff --git a/upstream.go b/upstream.go index 2e216ca..83add5a 100644 --- a/upstream.go +++ b/upstream.go @@ -1367,7 +1367,7 @@ func (uc *upstreamConn) produce(msg *irc.Message) { uc.network.ring.Produce(msg) uc.forEachDownstream(func(dc *downstreamConn) { - dc.sendFromUpstream(dc.ringConsumers[uc.network].Consume(), uc) + dc.sendFromUpstream(msg, uc) }) } diff --git a/user.go b/user.go index 5ffa3dd..3cdde16 100644 --- a/user.go +++ b/user.go @@ -303,10 +303,10 @@ func (u *user) run() { case eventDownstreamDisconnected: dc := e.dc - for net, rc := range dc.ringConsumers { - seq := rc.Close() + dc.forEachNetwork(func(net *network) { + seq := net.ring.Cur() net.history[dc.clientName] = seq - } + }) for i := range u.downstreamConns { if u.downstreamConns[i] == dc { @@ -349,12 +349,6 @@ func (u *user) createNetwork(net *Network) (*network, error) { return nil, err } - u.forEachDownstream(func(dc *downstreamConn) { - if dc.network == nil { - dc.ringConsumers[network] = network.ring.NewConsumer(nil) - } - }) - u.networks = append(u.networks, network) go network.run() @@ -375,7 +369,6 @@ func (u *user) deleteNetwork(id int64) error { if dc.network != nil && dc.network == net { dc.Close() } - delete(dc.ringConsumers, net) }) net.Stop()