Send one NOTICE on new upstream disconnect/connect errors

In order to notify the user when we are disconnected from a network
(either due to an error, or due a QUIT), and when we fail reconnecting,
this commit adds support for sending a short NOTICE message from the
service user to all relevant downstreams.

The last error is stored, and cleared on successful connection, to
ensure that the user is *not* flooded with identical connection error
messages, which can often happen when a server is down.

No lock is needed on lastError because it is only read and modified from
the user goroutine.

Closes: https://todo.sr.ht/~emersion/soju/27
This commit is contained in:
delthas 2020-04-04 04:48:25 +02:00 committed by Simon Ser
parent ee8aae7a96
commit eb941d2d2b
No known key found for this signature in database
GPG Key ID: 0FDE7BE0E88F5E48
3 changed files with 61 additions and 7 deletions

View File

@ -21,6 +21,14 @@ type serviceCommand struct {
children serviceCommandSet children serviceCommandSet
} }
func sendServiceNOTICE(dc *downstreamConn, text string) {
dc.SendMessage(&irc.Message{
Prefix: &irc.Prefix{Name: serviceNick},
Command: "NOTICE",
Params: []string{dc.nick, text},
})
}
func sendServicePRIVMSG(dc *downstreamConn, text string) { func sendServicePRIVMSG(dc *downstreamConn, text string) {
dc.SendMessage(&irc.Message{ dc.SendMessage(&irc.Message{
Prefix: &irc.Prefix{Name: serviceNick}, Prefix: &irc.Prefix{Name: serviceNick},

View File

@ -103,12 +103,7 @@ func connectToUpstream(network *network) (*upstreamConn, error) {
} }
func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) { func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
uc.user.forEachDownstream(func(dc *downstreamConn) { uc.network.forEachDownstream(f)
if dc.network != nil && dc.network != uc.network {
return
}
f(dc)
})
} }
func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) { func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {

53
user.go
View File

@ -1,6 +1,7 @@
package soju package soju
import ( import (
"fmt"
"sync" "sync"
"time" "time"
@ -14,6 +15,11 @@ type eventUpstreamMessage struct {
uc *upstreamConn uc *upstreamConn
} }
type eventUpstreamConnectionError struct {
net *network
err error
}
type eventUpstreamConnected struct { type eventUpstreamConnected struct {
uc *upstreamConn uc *upstreamConn
} }
@ -22,6 +28,11 @@ type eventUpstreamDisconnected struct {
uc *upstreamConn uc *upstreamConn
} }
type eventUpstreamError struct {
uc *upstreamConn
err error
}
type eventDownstreamMessage struct { type eventDownstreamMessage struct {
msg *irc.Message msg *irc.Message
dc *downstreamConn dc *downstreamConn
@ -41,7 +52,8 @@ type network struct {
ring *Ring ring *Ring
stopped chan struct{} stopped chan struct{}
history map[string]uint64 history map[string]uint64
lastError error
lock sync.Mutex lock sync.Mutex
conn *upstreamConn conn *upstreamConn
@ -57,6 +69,15 @@ func newNetwork(user *user, record *Network) *network {
} }
} }
func (net *network) forEachDownstream(f func(*downstreamConn)) {
net.user.forEachDownstream(func(dc *downstreamConn) {
if dc.network != nil && dc.network != net {
return
}
f(dc)
})
}
func (net *network) run() { func (net *network) run() {
var lastTry time.Time var lastTry time.Time
for { for {
@ -77,12 +98,14 @@ func (net *network) run() {
uc, err := connectToUpstream(net) uc, err := connectToUpstream(net)
if err != nil { if err != nil {
net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err) net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to connect: %v", err)}
continue continue
} }
uc.register() uc.register()
if err := uc.runUntilRegistered(); err != nil { if err := uc.runUntilRegistered(); err != nil {
uc.logger.Printf("failed to register: %v", err) uc.logger.Printf("failed to register: %v", err)
net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to register: %v", err)}
uc.Close() uc.Close()
continue continue
} }
@ -90,6 +113,7 @@ func (net *network) run() {
net.user.events <- eventUpstreamConnected{uc} net.user.events <- eventUpstreamConnected{uc}
if err := uc.readMessages(net.user.events); err != nil { if err := uc.readMessages(net.user.events); err != nil {
uc.logger.Printf("failed to handle messages: %v", err) uc.logger.Printf("failed to handle messages: %v", err)
net.user.events <- eventUpstreamError{uc, fmt.Errorf("failed to handle messages: %v", err)}
} }
uc.Close() uc.Close()
net.user.events <- eventUpstreamDisconnected{uc} net.user.events <- eventUpstreamDisconnected{uc}
@ -200,6 +224,11 @@ func (u *user) run() {
uc.network.lock.Unlock() uc.network.lock.Unlock()
uc.updateAway() uc.updateAway()
uc.forEachDownstream(func(dc *downstreamConn) {
sendServiceNOTICE(dc, fmt.Sprintf("connected to %s", uc.network.Name))
})
uc.network.lastError = nil
case eventUpstreamDisconnected: case eventUpstreamDisconnected:
uc := e.uc uc := e.uc
@ -214,6 +243,28 @@ func (u *user) run() {
} }
uc.endPendingLISTs(true) uc.endPendingLISTs(true)
if uc.network.lastError == nil {
uc.forEachDownstream(func(dc *downstreamConn) {
sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s", uc.network.Name))
})
}
case eventUpstreamConnectionError:
net := e.net
if net.lastError == nil || net.lastError.Error() != e.err.Error() {
net.forEachDownstream(func(dc *downstreamConn) {
sendServiceNOTICE(dc, fmt.Sprintf("failed connecting/registering to %s: %v", net.Name, e.err))
})
}
net.lastError = e.err
case eventUpstreamError:
uc := e.uc
uc.forEachDownstream(func(dc *downstreamConn) {
sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s: %v", uc.network.Name, e.err))
})
uc.network.lastError = e.err
case eventUpstreamMessage: case eventUpstreamMessage:
msg, uc := e.msg, e.uc msg, uc := e.msg, e.uc
if uc.isClosed() { if uc.isClosed() {