10ea698022
This commit adds support for downstream LIST messages from multiple concurrent downstreams to multiple concurrent upstreams, including support for multiple pending LIST requests from the same downstream. Because a unique RPL_LISTEND message must be sent to the requesting downstream, and that there might be multiple upstreams, each sending their own RPL_LISTEND, a cache of RPL_LISTEND replies of some sort is required to match RPL_LISTEND together in order to only send one back downstream. This commit adds a list of "pending LIST" structs, which each contain a map of all upstreams that yet need to send a RPL_LISTEND, and the corresponding LIST request associated with that response. This list of pending LISTs is sorted according to the order that the requesting downstreams sent the LIST messages in. Each pending set also stores the id of the requesting downstream, in order to only forward the replies to it and no other downstream. (This is important because LIST replies can typically amount to several thousands messages on large servers.) When a single downstream makes multiple LIST requests, only the first one will be immediately sent to the upstream servers. The next ones will be buffered until the first one is completed. Distinct downstreams can make concurrent LIST requests without any request buffering. Each RPL_LIST message is forwarded to the downstream of the first matching pending LIST struct. When an upstream sends an RPL_LISTEND message, the upstream is removed from the first matching pending LIST struct, but that message is not immediately forwarded downstream. If there are no remaining pending LIST requests in that struct is then empty, that means all upstreams have sent back all their RPL_LISTEND replies (which means they also sent all their RPL_LIST replies); so a unique RPL_LISTEND is sent to downstream and that pending LIST set is removed from the cache. Upstreams are removed from the pending LIST structs in two other cases: - when they are closed (to avoid stalling because of a disconnected upstream that will never reply to the LIST message): they are removed from all pending LIST structs - when they reply with an ERR_UNKNOWNCOMMAND or RPL_TRYAGAIN LIST reply, which is typically used when a user is not allowed to LIST because they just joined the server: they are removed from the first pending LIST struct, as if an RPL_LISTEND message was received
234 lines
4.8 KiB
Go
234 lines
4.8 KiB
Go
package soju
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"gopkg.in/irc.v3"
|
|
)
|
|
|
|
type event interface{}
|
|
|
|
type eventUpstreamMessage struct {
|
|
msg *irc.Message
|
|
uc *upstreamConn
|
|
}
|
|
|
|
type eventDownstreamMessage struct {
|
|
msg *irc.Message
|
|
dc *downstreamConn
|
|
}
|
|
|
|
type eventDownstreamConnected struct {
|
|
dc *downstreamConn
|
|
}
|
|
|
|
type eventDownstreamDisconnected struct {
|
|
dc *downstreamConn
|
|
}
|
|
|
|
type network struct {
|
|
Network
|
|
user *user
|
|
ring *Ring
|
|
|
|
lock sync.Mutex
|
|
conn *upstreamConn
|
|
history map[string]uint64
|
|
}
|
|
|
|
func newNetwork(user *user, record *Network) *network {
|
|
return &network{
|
|
Network: *record,
|
|
user: user,
|
|
ring: NewRing(user.srv.RingCap),
|
|
history: make(map[string]uint64),
|
|
}
|
|
}
|
|
|
|
func (net *network) run() {
|
|
var lastTry time.Time
|
|
for {
|
|
if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
|
|
delay := retryConnectMinDelay - dur
|
|
net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
|
|
time.Sleep(delay)
|
|
}
|
|
lastTry = time.Now()
|
|
|
|
uc, err := connectToUpstream(net)
|
|
if err != nil {
|
|
net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
|
|
continue
|
|
}
|
|
|
|
uc.register()
|
|
|
|
// TODO: wait for the connection to be registered before adding it to
|
|
// net, otherwise messages might be sent to it while still being
|
|
// unauthenticated
|
|
net.lock.Lock()
|
|
net.conn = uc
|
|
net.lock.Unlock()
|
|
|
|
if err := uc.readMessages(net.user.events); err != nil {
|
|
uc.logger.Printf("failed to handle messages: %v", err)
|
|
}
|
|
uc.Close()
|
|
|
|
net.lock.Lock()
|
|
net.conn = nil
|
|
net.lock.Unlock()
|
|
}
|
|
}
|
|
|
|
func (net *network) upstream() *upstreamConn {
|
|
net.lock.Lock()
|
|
defer net.lock.Unlock()
|
|
return net.conn
|
|
}
|
|
|
|
type user struct {
|
|
User
|
|
srv *Server
|
|
|
|
events chan event
|
|
|
|
networks []*network
|
|
downstreamConns []*downstreamConn
|
|
|
|
// LIST commands in progress
|
|
pendingLISTsLock sync.Mutex
|
|
pendingLISTs []pendingLIST
|
|
}
|
|
|
|
type pendingLIST struct {
|
|
downstreamID uint64
|
|
// list of per-upstream LIST commands not yet sent or completed
|
|
pendingCommands map[int64]*irc.Message
|
|
}
|
|
|
|
func newUser(srv *Server, record *User) *user {
|
|
return &user{
|
|
User: *record,
|
|
srv: srv,
|
|
events: make(chan event, 64),
|
|
}
|
|
}
|
|
|
|
func (u *user) forEachNetwork(f func(*network)) {
|
|
for _, network := range u.networks {
|
|
f(network)
|
|
}
|
|
}
|
|
|
|
func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
|
|
for _, network := range u.networks {
|
|
uc := network.upstream()
|
|
if uc == nil || !uc.registered {
|
|
continue
|
|
}
|
|
f(uc)
|
|
}
|
|
}
|
|
|
|
func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
|
|
for _, dc := range u.downstreamConns {
|
|
f(dc)
|
|
}
|
|
}
|
|
|
|
func (u *user) getNetwork(name string) *network {
|
|
for _, network := range u.networks {
|
|
if network.Addr == name {
|
|
return network
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (u *user) run() {
|
|
networks, err := u.srv.db.ListNetworks(u.Username)
|
|
if err != nil {
|
|
u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
|
|
return
|
|
}
|
|
|
|
for _, record := range networks {
|
|
network := newNetwork(u, &record)
|
|
u.networks = append(u.networks, network)
|
|
|
|
go network.run()
|
|
}
|
|
|
|
for e := range u.events {
|
|
switch e := e.(type) {
|
|
case eventUpstreamMessage:
|
|
msg, uc := e.msg, e.uc
|
|
if uc.isClosed() {
|
|
uc.logger.Printf("ignoring message on closed connection: %v", msg)
|
|
break
|
|
}
|
|
if err := uc.handleMessage(msg); err != nil {
|
|
uc.logger.Printf("failed to handle message %q: %v", msg, err)
|
|
}
|
|
case eventDownstreamConnected:
|
|
dc := e.dc
|
|
|
|
if err := dc.welcome(); err != nil {
|
|
dc.logger.Printf("failed to handle new registered connection: %v", err)
|
|
break
|
|
}
|
|
|
|
u.downstreamConns = append(u.downstreamConns, dc)
|
|
case eventDownstreamDisconnected:
|
|
dc := e.dc
|
|
for i := range u.downstreamConns {
|
|
if u.downstreamConns[i] == dc {
|
|
u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
case eventDownstreamMessage:
|
|
msg, dc := e.msg, e.dc
|
|
if dc.isClosed() {
|
|
dc.logger.Printf("ignoring message on closed connection: %v", msg)
|
|
break
|
|
}
|
|
err := dc.handleMessage(msg)
|
|
if ircErr, ok := err.(ircError); ok {
|
|
ircErr.Message.Prefix = dc.srv.prefix()
|
|
dc.SendMessage(ircErr.Message)
|
|
} else if err != nil {
|
|
dc.logger.Printf("failed to handle message %q: %v", msg, err)
|
|
dc.Close()
|
|
}
|
|
default:
|
|
u.srv.Logger.Printf("received unknown event type: %T", e)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (u *user) createNetwork(net *Network) (*network, error) {
|
|
if net.ID != 0 {
|
|
panic("tried creating an already-existing network")
|
|
}
|
|
|
|
network := newNetwork(u, net)
|
|
err := u.srv.db.StoreNetwork(u.Username, &network.Network)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
u.forEachDownstream(func(dc *downstreamConn) {
|
|
if dc.network == nil {
|
|
dc.runNetwork(network, false)
|
|
}
|
|
})
|
|
|
|
u.networks = append(u.networks, network)
|
|
|
|
go network.run()
|
|
return network, nil
|
|
}
|