Simplify ring consumer goroutine

Since network.history is now only accessed from the user goroutine, a
lock becomes unnecessary.
This commit is contained in:
Simon Ser 2020-04-01 16:02:31 +02:00
parent 10da094259
commit d4de60a869
No known key found for this signature in database
GPG Key ID: 0FDE7BE0E88F5E48
2 changed files with 66 additions and 74 deletions

View File

@ -71,6 +71,8 @@ type downstreamConn struct {
password string // empty after authentication password string // empty after authentication
network *network // can be nil network *network // can be nil
ringConsumers map[*network]*RingConsumer
negociatingCaps bool negociatingCaps bool
capVersion int capVersion int
caps map[string]bool caps map[string]bool
@ -83,15 +85,16 @@ type downstreamConn struct {
func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn { func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn {
dc := &downstreamConn{ dc := &downstreamConn{
id: id, id: id,
net: netConn, net: netConn,
irc: irc.NewConn(netConn), irc: irc.NewConn(netConn),
srv: srv, srv: srv,
logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())}, logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
outgoing: make(chan *irc.Message, 64), outgoing: make(chan *irc.Message, 64),
closed: make(chan struct{}), closed: make(chan struct{}),
caps: make(map[string]bool), ringConsumers: make(map[*network]*RingConsumer),
ourMessages: make(map[*irc.Message]struct{}), caps: make(map[string]bool),
ourMessages: make(map[*irc.Message]struct{}),
} }
dc.hostname = netConn.RemoteAddr().String() dc.hostname = netConn.RemoteAddr().String()
if host, _, err := net.SplitHostPort(dc.hostname); err == nil { if host, _, err := net.SplitHostPort(dc.hostname); err == nil {
@ -722,9 +725,7 @@ func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) {
var seqPtr *uint64 var seqPtr *uint64
if loadHistory { if loadHistory {
net.lock.Lock()
seq, ok := net.history[dc.clientName] seq, ok := net.history[dc.clientName]
net.lock.Unlock()
if ok { if ok {
seqPtr = &seq seqPtr = &seq
} }
@ -735,79 +736,63 @@ func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) {
serverTimeEnabled := dc.caps["server-time"] serverTimeEnabled := dc.caps["server-time"]
consumer, ch := net.ring.NewConsumer(seqPtr) consumer, ch := net.ring.NewConsumer(seqPtr)
if _, ok := dc.ringConsumers[net]; ok {
panic("network has been added twice")
}
dc.ringConsumers[net] = consumer
go func() { go func() {
for { for range ch {
var closed bool uc := net.upstream()
select { if uc == nil {
case _, ok := <-ch: dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
if !ok { continue
closed = true }
for {
msg := consumer.Peek()
if msg == nil {
break break
} }
uc := net.upstream() dc.lock.Lock()
if uc == nil { _, ours := dc.ourMessages[msg]
dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) delete(dc.ourMessages, msg)
break dc.lock.Unlock()
if ours {
// The message comes from our connection, don't echo it
// back
consumer.Consume()
continue
} }
for { msg = msg.Copy()
msg := consumer.Peek() switch msg.Command {
if msg == nil { case "PRIVMSG":
break msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)
} msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])
default:
panic("expected to consume a PRIVMSG message")
}
dc.lock.Lock() if !msgTagsEnabled {
_, ours := dc.ourMessages[msg] for name := range msg.Tags {
delete(dc.ourMessages, msg) supported := false
dc.lock.Unlock() switch name {
if ours { case "time":
// The message comes from our connection, don't echo it supported = serverTimeEnabled
// back }
consumer.Consume() if !supported {
continue delete(msg.Tags, name)
}
msg = msg.Copy()
switch msg.Command {
case "PRIVMSG":
msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)
msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])
default:
panic("expected to consume a PRIVMSG message")
}
if !msgTagsEnabled {
for name := range msg.Tags {
supported := false
switch name {
case "time":
supported = serverTimeEnabled
}
if !supported {
delete(msg.Tags, name)
}
} }
} }
dc.SendMessage(msg)
consumer.Consume()
} }
case <-dc.closed:
closed = true dc.SendMessage(msg)
} consumer.Consume()
if closed {
break
} }
} }
// TODO: close the consumer from the user goroutine, so we don't need
// that net.history lock
seq := consumer.Close()
net.lock.Lock()
net.history[dc.clientName] = seq
net.lock.Unlock()
}() }()
} }

11
user.go
View File

@ -41,9 +41,10 @@ type network struct {
ring *Ring ring *Ring
stopped chan struct{} stopped chan struct{}
lock sync.Mutex
conn *upstreamConn
history map[string]uint64 history map[string]uint64
lock sync.Mutex
conn *upstreamConn
} }
func newNetwork(user *user, record *Network) *network { func newNetwork(user *user, record *Network) *network {
@ -235,6 +236,12 @@ func (u *user) run() {
}) })
case eventDownstreamDisconnected: case eventDownstreamDisconnected:
dc := e.dc dc := e.dc
for net, rc := range dc.ringConsumers {
seq := rc.Close()
net.history[dc.clientName] = seq
}
for i := range u.downstreamConns { for i := range u.downstreamConns {
if u.downstreamConns[i] == dc { if u.downstreamConns[i] == dc {
u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...) u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)