Add LIST support
This commit adds support for downstream LIST messages from multiple concurrent downstreams to multiple concurrent upstreams, including support for multiple pending LIST requests from the same downstream. Because a unique RPL_LISTEND message must be sent to the requesting downstream, and that there might be multiple upstreams, each sending their own RPL_LISTEND, a cache of RPL_LISTEND replies of some sort is required to match RPL_LISTEND together in order to only send one back downstream. This commit adds a list of "pending LIST" structs, which each contain a map of all upstreams that yet need to send a RPL_LISTEND, and the corresponding LIST request associated with that response. This list of pending LISTs is sorted according to the order that the requesting downstreams sent the LIST messages in. Each pending set also stores the id of the requesting downstream, in order to only forward the replies to it and no other downstream. (This is important because LIST replies can typically amount to several thousands messages on large servers.) When a single downstream makes multiple LIST requests, only the first one will be immediately sent to the upstream servers. The next ones will be buffered until the first one is completed. Distinct downstreams can make concurrent LIST requests without any request buffering. Each RPL_LIST message is forwarded to the downstream of the first matching pending LIST struct. When an upstream sends an RPL_LISTEND message, the upstream is removed from the first matching pending LIST struct, but that message is not immediately forwarded downstream. If there are no remaining pending LIST requests in that struct is then empty, that means all upstreams have sent back all their RPL_LISTEND replies (which means they also sent all their RPL_LIST replies); so a unique RPL_LISTEND is sent to downstream and that pending LIST set is removed from the cache. Upstreams are removed from the pending LIST structs in two other cases: - when they are closed (to avoid stalling because of a disconnected upstream that will never reply to the LIST message): they are removed from all pending LIST structs - when they reply with an ERR_UNKNOWNCOMMAND or RPL_TRYAGAIN LIST reply, which is typically used when a user is not allowed to LIST because they just joined the server: they are removed from the first pending LIST struct, as if an RPL_LISTEND message was received
This commit is contained in:
parent
72a285aaeb
commit
10ea698022
@ -1077,6 +1077,45 @@ func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
|
||||
}
|
||||
sendTopic(dc, ch)
|
||||
}
|
||||
case "LIST":
|
||||
// TODO: support ELIST when supported by all upstreams
|
||||
|
||||
dc.user.pendingLISTsLock.Lock()
|
||||
defer dc.user.pendingLISTsLock.Unlock()
|
||||
|
||||
pl := pendingLIST{
|
||||
downstreamID: dc.id,
|
||||
pendingCommands: make(map[int64]*irc.Message),
|
||||
}
|
||||
var upstreamChannels map[int64][]string
|
||||
if len(msg.Params) > 0 {
|
||||
upstreamChannels = make(map[int64][]string)
|
||||
channels := strings.Split(msg.Params[0], ",")
|
||||
for _, channel := range channels {
|
||||
uc, upstreamChannel, err := dc.unmarshalEntity(channel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
upstreamChannels[uc.network.ID] = append(upstreamChannels[uc.network.ID], upstreamChannel)
|
||||
}
|
||||
}
|
||||
|
||||
dc.user.pendingLISTs = append(dc.user.pendingLISTs, pl)
|
||||
dc.forEachUpstream(func(uc *upstreamConn) {
|
||||
var params []string
|
||||
if upstreamChannels != nil {
|
||||
if channels, ok := upstreamChannels[uc.network.ID]; ok {
|
||||
params = []string{strings.Join(channels, ",")}
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
pl.pendingCommands[uc.network.ID] = &irc.Message{
|
||||
Command: "LIST",
|
||||
Params: params,
|
||||
}
|
||||
uc.trySendList(dc.id)
|
||||
})
|
||||
case "NAMES":
|
||||
if len(msg.Params) == 0 {
|
||||
dc.SendMessage(&irc.Message{
|
||||
|
152
upstream.go
152
upstream.go
@ -59,6 +59,10 @@ type upstreamConn struct {
|
||||
|
||||
saslClient sasl.Client
|
||||
saslStarted bool
|
||||
|
||||
// set of LIST commands in progress, per downstream
|
||||
// access is synchronized with user.pendingLISTsLock
|
||||
pendingLISTDownstreamSet map[uint64]struct{}
|
||||
}
|
||||
|
||||
func connectToUpstream(network *network) (*upstreamConn, error) {
|
||||
@ -79,19 +83,20 @@ func connectToUpstream(network *network) (*upstreamConn, error) {
|
||||
|
||||
outgoing := make(chan *irc.Message, 64)
|
||||
uc := &upstreamConn{
|
||||
network: network,
|
||||
logger: logger,
|
||||
net: netConn,
|
||||
irc: irc.NewConn(netConn),
|
||||
srv: network.user.srv,
|
||||
user: network.user,
|
||||
outgoing: outgoing,
|
||||
channels: make(map[string]*upstreamChannel),
|
||||
caps: make(map[string]string),
|
||||
batches: make(map[string]batch),
|
||||
availableChannelTypes: stdChannelTypes,
|
||||
availableChannelModes: stdChannelModes,
|
||||
availableMemberships: stdMemberships,
|
||||
network: network,
|
||||
logger: logger,
|
||||
net: netConn,
|
||||
irc: irc.NewConn(netConn),
|
||||
srv: network.user.srv,
|
||||
user: network.user,
|
||||
outgoing: outgoing,
|
||||
channels: make(map[string]*upstreamChannel),
|
||||
caps: make(map[string]string),
|
||||
batches: make(map[string]batch),
|
||||
availableChannelTypes: stdChannelTypes,
|
||||
availableChannelModes: stdChannelModes,
|
||||
availableMemberships: stdMemberships,
|
||||
pendingLISTDownstreamSet: make(map[uint64]struct{}),
|
||||
}
|
||||
|
||||
go func() {
|
||||
@ -136,6 +141,8 @@ func (uc *upstreamConn) Close() error {
|
||||
return fmt.Errorf("upstream connection already closed")
|
||||
}
|
||||
close(uc.closed)
|
||||
|
||||
uc.endPendingLists(true)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -172,6 +179,81 @@ func (uc *upstreamConn) isChannel(entity string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (uc *upstreamConn) getPendingList() *pendingLIST {
|
||||
uc.user.pendingLISTsLock.Lock()
|
||||
defer uc.user.pendingLISTsLock.Unlock()
|
||||
for _, pl := range uc.user.pendingLISTs {
|
||||
if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
|
||||
continue
|
||||
}
|
||||
return &pl
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (uc *upstreamConn) endPendingLists(all bool) (found bool) {
|
||||
found = false
|
||||
uc.user.pendingLISTsLock.Lock()
|
||||
defer uc.user.pendingLISTsLock.Unlock()
|
||||
for i := 0; i < len(uc.user.pendingLISTs); i++ {
|
||||
pl := uc.user.pendingLISTs[i]
|
||||
if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
|
||||
continue
|
||||
}
|
||||
delete(pl.pendingCommands, uc.network.ID)
|
||||
if len(pl.pendingCommands) == 0 {
|
||||
uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
|
||||
i--
|
||||
uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
|
||||
dc.SendMessage(&irc.Message{
|
||||
Prefix: dc.srv.prefix(),
|
||||
Command: irc.RPL_LISTEND,
|
||||
Params: []string{dc.nick, "End of /LIST"},
|
||||
})
|
||||
})
|
||||
}
|
||||
found = true
|
||||
if !all {
|
||||
delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
|
||||
uc.user.forEachUpstream(func(uc *upstreamConn) {
|
||||
uc.trySendList(pl.downstreamID)
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (uc *upstreamConn) trySendList(downstreamID uint64) {
|
||||
// must be called with a lock in uc.user.pendingLISTsLock
|
||||
|
||||
if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
|
||||
// a LIST command is already pending
|
||||
// we will try again when that command is completed
|
||||
return
|
||||
}
|
||||
|
||||
for _, pl := range uc.user.pendingLISTs {
|
||||
if pl.downstreamID != downstreamID {
|
||||
continue
|
||||
}
|
||||
// this is the first pending LIST command list of the downstream
|
||||
listCommand, ok := pl.pendingCommands[uc.network.ID]
|
||||
if !ok {
|
||||
// there is no command for this upstream in these LIST commands
|
||||
// do not send anything
|
||||
continue
|
||||
}
|
||||
// there is a command for this upstream in these LIST commands
|
||||
// send it now
|
||||
|
||||
uc.SendMessageLabeled(downstreamID, listCommand)
|
||||
|
||||
uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
|
||||
for _, m := range uc.availableMemberships {
|
||||
if m.Prefix == s[0] {
|
||||
@ -833,6 +915,29 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
|
||||
return fmt.Errorf("failed to parse topic time: %v", err)
|
||||
}
|
||||
ch.TopicTime = time.Unix(sec, 0)
|
||||
case irc.RPL_LIST:
|
||||
var channel, clients, topic string
|
||||
if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pl := uc.getPendingList()
|
||||
if pl == nil {
|
||||
return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
|
||||
}
|
||||
|
||||
uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
|
||||
dc.SendMessage(&irc.Message{
|
||||
Prefix: dc.srv.prefix(),
|
||||
Command: irc.RPL_LIST,
|
||||
Params: []string{dc.nick, dc.marshalChannel(uc, channel), clients, topic},
|
||||
})
|
||||
})
|
||||
case irc.RPL_LISTEND:
|
||||
ok := uc.endPendingLists(false)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
|
||||
}
|
||||
case irc.RPL_NAMREPLY:
|
||||
var name, statusStr, members string
|
||||
if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
|
||||
@ -1090,6 +1195,25 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
|
||||
Params: []string{dc.nick, dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
|
||||
})
|
||||
})
|
||||
case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
|
||||
var command, reason string
|
||||
if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if command == "LIST" {
|
||||
ok := uc.endPendingLists(false)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
|
||||
}
|
||||
uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
|
||||
dc.SendMessage(&irc.Message{
|
||||
Prefix: uc.srv.prefix(),
|
||||
Command: msg.Command,
|
||||
Params: []string{dc.nick, "LIST", reason},
|
||||
})
|
||||
})
|
||||
}
|
||||
case "TAGMSG":
|
||||
// TODO: relay to downstream connections that accept message-tags
|
||||
case "ACK":
|
||||
@ -1100,6 +1224,8 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
|
||||
// Ignore
|
||||
case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
|
||||
// Ignore
|
||||
case irc.RPL_LISTSTART:
|
||||
// Ignore
|
||||
case rpl_localusers, rpl_globalusers:
|
||||
// Ignore
|
||||
case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
|
||||
|
10
user.go
10
user.go
@ -96,6 +96,16 @@ type user struct {
|
||||
|
||||
networks []*network
|
||||
downstreamConns []*downstreamConn
|
||||
|
||||
// LIST commands in progress
|
||||
pendingLISTsLock sync.Mutex
|
||||
pendingLISTs []pendingLIST
|
||||
}
|
||||
|
||||
type pendingLIST struct {
|
||||
downstreamID uint64
|
||||
// list of per-upstream LIST commands not yet sent or completed
|
||||
pendingCommands map[int64]*irc.Message
|
||||
}
|
||||
|
||||
func newUser(srv *Server, record *User) *user {
|
||||
|
Loading…
Reference in New Issue
Block a user