Fix writer goroutine races
Any SendMessage call after Close could potentially block forever if the outgoing channel was filled up. Now the channel is drained before the writer goroutine exits.
This commit is contained in:
parent
5b03760be7
commit
8c6328207b
@ -57,7 +57,7 @@ type downstreamConn struct {
|
|||||||
irc *irc.Conn
|
irc *irc.Conn
|
||||||
srv *Server
|
srv *Server
|
||||||
logger Logger
|
logger Logger
|
||||||
outgoing chan *irc.Message
|
outgoing chan<- *irc.Message
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
|
|
||||||
registered bool
|
registered bool
|
||||||
@ -84,13 +84,14 @@ type downstreamConn struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn {
|
func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn {
|
||||||
|
outgoing := make(chan *irc.Message, 64)
|
||||||
dc := &downstreamConn{
|
dc := &downstreamConn{
|
||||||
id: id,
|
id: id,
|
||||||
net: netConn,
|
net: netConn,
|
||||||
irc: irc.NewConn(netConn),
|
irc: irc.NewConn(netConn),
|
||||||
srv: srv,
|
srv: srv,
|
||||||
logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
|
logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
|
||||||
outgoing: make(chan *irc.Message, 64),
|
outgoing: outgoing,
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
ringConsumers: make(map[*network]*RingConsumer),
|
ringConsumers: make(map[*network]*RingConsumer),
|
||||||
caps: make(map[string]bool),
|
caps: make(map[string]bool),
|
||||||
@ -102,14 +103,25 @@ func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn
|
|||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := dc.writeMessages(); err != nil {
|
for msg := range outgoing {
|
||||||
|
if dc.srv.Debug {
|
||||||
|
dc.logger.Printf("sent: %v", msg)
|
||||||
|
}
|
||||||
|
dc.net.SetWriteDeadline(time.Now().Add(writeTimeout))
|
||||||
|
if err := dc.irc.WriteMessage(msg); err != nil {
|
||||||
dc.logger.Printf("failed to write message: %v", err)
|
dc.logger.Printf("failed to write message: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err := dc.net.Close(); err != nil {
|
if err := dc.net.Close(); err != nil {
|
||||||
dc.logger.Printf("failed to close connection: %v", err)
|
dc.logger.Printf("failed to close connection: %v", err)
|
||||||
} else {
|
} else {
|
||||||
dc.logger.Printf("connection closed")
|
dc.logger.Printf("connection closed")
|
||||||
}
|
}
|
||||||
|
// Drain the outgoing channel to prevent SendMessage from blocking
|
||||||
|
for range outgoing {
|
||||||
|
// This space is intentionally left blank
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
dc.logger.Printf("new connection")
|
dc.logger.Printf("new connection")
|
||||||
@ -244,28 +256,6 @@ func (dc *downstreamConn) readMessages(ch chan<- event) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dc *downstreamConn) writeMessages() error {
|
func (dc *downstreamConn) writeMessages() error {
|
||||||
// TODO: any SendMessage call after the connection is closed will
|
|
||||||
// either block or drop
|
|
||||||
for {
|
|
||||||
var err error
|
|
||||||
var closed bool
|
|
||||||
select {
|
|
||||||
case msg := <-dc.outgoing:
|
|
||||||
if dc.srv.Debug {
|
|
||||||
dc.logger.Printf("sent: %v", msg)
|
|
||||||
}
|
|
||||||
dc.net.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
||||||
err = dc.irc.WriteMessage(msg)
|
|
||||||
case <-dc.closed:
|
|
||||||
closed = true
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if closed {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -275,12 +265,16 @@ func (dc *downstreamConn) Close() error {
|
|||||||
return fmt.Errorf("downstream connection already closed")
|
return fmt.Errorf("downstream connection already closed")
|
||||||
}
|
}
|
||||||
close(dc.closed)
|
close(dc.closed)
|
||||||
|
close(dc.outgoing)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendMessage queues a new outgoing message. It is safe to call from any
|
// SendMessage queues a new outgoing message. It is safe to call from any
|
||||||
// goroutine.
|
// goroutine.
|
||||||
func (dc *downstreamConn) SendMessage(msg *irc.Message) {
|
func (dc *downstreamConn) SendMessage(msg *irc.Message) {
|
||||||
|
if dc.isClosed() {
|
||||||
|
return
|
||||||
|
}
|
||||||
// TODO: strip tags if the client doesn't support them (see runNetwork)
|
// TODO: strip tags if the client doesn't support them (see runNetwork)
|
||||||
dc.outgoing <- msg
|
dc.outgoing <- msg
|
||||||
}
|
}
|
||||||
|
18
upstream.go
18
upstream.go
@ -113,24 +113,13 @@ func connectToUpstream(network *network) (*upstreamConn, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
// TODO: any SendMessage call after the connection is closed will
|
for msg := range outgoing {
|
||||||
// either block or drop
|
|
||||||
for {
|
|
||||||
var closed bool
|
|
||||||
select {
|
|
||||||
case msg := <-outgoing:
|
|
||||||
if uc.srv.Debug {
|
if uc.srv.Debug {
|
||||||
uc.logger.Printf("sent: %v", msg)
|
uc.logger.Printf("sent: %v", msg)
|
||||||
}
|
}
|
||||||
uc.net.SetWriteDeadline(time.Now().Add(writeTimeout))
|
uc.net.SetWriteDeadline(time.Now().Add(writeTimeout))
|
||||||
if err := uc.irc.WriteMessage(msg); err != nil {
|
if err := uc.irc.WriteMessage(msg); err != nil {
|
||||||
uc.logger.Printf("failed to write message: %v", err)
|
uc.logger.Printf("failed to write message: %v", err)
|
||||||
closed = true
|
|
||||||
}
|
|
||||||
case <-uc.closed:
|
|
||||||
closed = true
|
|
||||||
}
|
|
||||||
if closed {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -139,6 +128,10 @@ func connectToUpstream(network *network) (*upstreamConn, error) {
|
|||||||
} else {
|
} else {
|
||||||
uc.logger.Printf("connection closed")
|
uc.logger.Printf("connection closed")
|
||||||
}
|
}
|
||||||
|
// Drain the outgoing channel to prevent SendMessage from blocking
|
||||||
|
for range outgoing {
|
||||||
|
// This space is intentionally left blank
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return uc, nil
|
return uc, nil
|
||||||
@ -159,6 +152,7 @@ func (uc *upstreamConn) Close() error {
|
|||||||
return fmt.Errorf("upstream connection already closed")
|
return fmt.Errorf("upstream connection already closed")
|
||||||
}
|
}
|
||||||
close(uc.closed)
|
close(uc.closed)
|
||||||
|
close(uc.outgoing)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user