Remove downstreamConn.ringConsumers
We no longer need long-lived ring buffer consumers now that upstreamConn.produce dispatches messages to downstream connections.
This commit is contained in:
parent
40ff14ec6c
commit
f0bc919885
@ -66,7 +66,6 @@ type downstreamConn struct {
|
|||||||
password string // empty after authentication
|
password string // empty after authentication
|
||||||
network *network // can be nil
|
network *network // can be nil
|
||||||
|
|
||||||
ringConsumers map[*network]*RingConsumer
|
|
||||||
ourMessages map[*irc.Message]struct{}
|
ourMessages map[*irc.Message]struct{}
|
||||||
caps map[string]bool
|
caps map[string]bool
|
||||||
|
|
||||||
@ -81,7 +80,6 @@ func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn
|
|||||||
dc := &downstreamConn{
|
dc := &downstreamConn{
|
||||||
conn: *newConn(srv, netConn, logger),
|
conn: *newConn(srv, netConn, logger),
|
||||||
id: id,
|
id: id,
|
||||||
ringConsumers: make(map[*network]*RingConsumer),
|
|
||||||
ourMessages: make(map[*irc.Message]struct{}),
|
ourMessages: make(map[*irc.Message]struct{}),
|
||||||
caps: make(map[string]bool),
|
caps: make(map[string]bool),
|
||||||
}
|
}
|
||||||
@ -683,7 +681,6 @@ func (dc *downstreamConn) welcome() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
consumer := net.ring.NewConsumer(seqPtr)
|
consumer := net.ring.NewConsumer(seqPtr)
|
||||||
dc.ringConsumers[net] = consumer
|
|
||||||
|
|
||||||
// TODO: this means all history is lost when trying to send it while the
|
// TODO: this means all history is lost when trying to send it while the
|
||||||
// upstream is disconnected. We need to store history differently so that
|
// upstream is disconnected. We need to store history differently so that
|
||||||
|
4
ring.go
4
ring.go
@ -36,6 +36,10 @@ func (r *Ring) Produce(msg *irc.Message) {
|
|||||||
r.cur++
|
r.cur++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Ring) Cur() uint64 {
|
||||||
|
return r.cur
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Ring) Close() {
|
func (r *Ring) Close() {
|
||||||
if r.closed {
|
if r.closed {
|
||||||
panic("soju: Ring.Close called twice")
|
panic("soju: Ring.Close called twice")
|
||||||
|
@ -1367,7 +1367,7 @@ func (uc *upstreamConn) produce(msg *irc.Message) {
|
|||||||
uc.network.ring.Produce(msg)
|
uc.network.ring.Produce(msg)
|
||||||
|
|
||||||
uc.forEachDownstream(func(dc *downstreamConn) {
|
uc.forEachDownstream(func(dc *downstreamConn) {
|
||||||
dc.sendFromUpstream(dc.ringConsumers[uc.network].Consume(), uc)
|
dc.sendFromUpstream(msg, uc)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
13
user.go
13
user.go
@ -303,10 +303,10 @@ func (u *user) run() {
|
|||||||
case eventDownstreamDisconnected:
|
case eventDownstreamDisconnected:
|
||||||
dc := e.dc
|
dc := e.dc
|
||||||
|
|
||||||
for net, rc := range dc.ringConsumers {
|
dc.forEachNetwork(func(net *network) {
|
||||||
seq := rc.Close()
|
seq := net.ring.Cur()
|
||||||
net.history[dc.clientName] = seq
|
net.history[dc.clientName] = seq
|
||||||
}
|
})
|
||||||
|
|
||||||
for i := range u.downstreamConns {
|
for i := range u.downstreamConns {
|
||||||
if u.downstreamConns[i] == dc {
|
if u.downstreamConns[i] == dc {
|
||||||
@ -349,12 +349,6 @@ func (u *user) createNetwork(net *Network) (*network, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
u.forEachDownstream(func(dc *downstreamConn) {
|
|
||||||
if dc.network == nil {
|
|
||||||
dc.ringConsumers[network] = network.ring.NewConsumer(nil)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
u.networks = append(u.networks, network)
|
u.networks = append(u.networks, network)
|
||||||
|
|
||||||
go network.run()
|
go network.run()
|
||||||
@ -375,7 +369,6 @@ func (u *user) deleteNetwork(id int64) error {
|
|||||||
if dc.network != nil && dc.network == net {
|
if dc.network != nil && dc.network == net {
|
||||||
dc.Close()
|
dc.Close()
|
||||||
}
|
}
|
||||||
delete(dc.ringConsumers, net)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
net.Stop()
|
net.Stop()
|
||||||
|
Loading…
Reference in New Issue
Block a user