From db198335aad036ce9255478c7b6e9cf7e7749385 Mon Sep 17 00:00:00 2001 From: Simon Ser Date: Fri, 10 Apr 2020 19:22:47 +0200 Subject: [PATCH] Per-entity ring buffers Instead of having one ring buffer per network, each network has one ring buffer per entity (channel or nick). This allows history to be more fair: if there's a lot of activity in a channel, it won't prune activity in other channels. We now track history sequence numbers per client and per network in networkHistory. The overall list of offline clients is still tracked in network.offlineClients. When all clients have received history, the ring buffer can be released. In the future, we should get rid of too-old offline clients to avoid having to maintain history for them forever. We should also add a per-user limit on the number of ring buffers. --- downstream.go | 41 +++++++++++++++++++++++++---------------- upstream.go | 29 ++++++++++++++++++++++++++--- user.go | 47 +++++++++++++++++++++++++++++++++-------------- 3 files changed, 84 insertions(+), 33 deletions(-) diff --git a/downstream.go b/downstream.go index b925796..d289988 100644 --- a/downstream.go +++ b/downstream.go @@ -637,15 +637,6 @@ func (dc *downstreamConn) welcome() error { return err } - // Only send history if we're the first connected client with that name and - // network - sendHistory := true - dc.user.forEachDownstream(func(conn *downstreamConn) { - if dc.clientName == conn.clientName && dc.network == conn.network { - sendHistory = false - } - }) - dc.SendMessage(&irc.Message{ Prefix: dc.srv.prefix(), Command: irc.RPL_WELCOME, @@ -688,12 +679,32 @@ func (dc *downstreamConn) welcome() error { }) dc.forEachNetwork(func(net *network) { - seq, ok := net.history[dc.clientName] - if !sendHistory || !ok { - return + // Only send history if we're the first connected client with that name + // for the network + if _, ok := net.offlineClients[dc.clientName]; ok { + dc.sendNetworkHistory(net) + delete(net.offlineClients, dc.clientName) + } + }) + + return nil +} + +func (dc *downstreamConn) sendNetworkHistory(net *network) { + for target, history := range net.history { + seq, ok := history.offlineClients[dc.clientName] + if !ok { + continue + } + delete(history.offlineClients, dc.clientName) + + // If all clients have received history, no need to keep the + // ring buffer around + if len(history.offlineClients) == 0 { + delete(net.history, target) } - consumer := net.ring.NewConsumer(seq) + consumer := history.ring.NewConsumer(seq) // TODO: this means all history is lost when trying to send it while the // upstream is disconnected. We need to store history differently so that @@ -725,9 +736,7 @@ func (dc *downstreamConn) welcome() error { dc.SendMessage(dc.marshalMessage(msg, uc)) } - }) - - return nil + } } func (dc *downstreamConn) runUntilRegistered() error { diff --git a/upstream.go b/upstream.go index ae3cd71..42d9792 100644 --- a/upstream.go +++ b/upstream.go @@ -556,11 +556,11 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error { delete(ch.Members, msg.Prefix.Name) ch.Members[newNick] = membership uc.appendLog(ch.Name, msg) + uc.appendHistory(ch.Name, msg) } } if !me { - uc.network.ring.Produce(msg) uc.forEachDownstream(func(dc *downstreamConn) { dc.SendMessage(dc.marshalMessage(msg, uc)) }) @@ -662,11 +662,11 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error { delete(ch.Members, msg.Prefix.Name) uc.appendLog(ch.Name, msg) + uc.appendHistory(ch.Name, msg) } } if msg.Prefix.Name != uc.nick { - uc.network.ring.Produce(msg) uc.forEachDownstream(func(dc *downstreamConn) { dc.SendMessage(dc.marshalMessage(msg, uc)) }) @@ -1294,6 +1294,29 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) { } } +// appendHistory appends a message to the history. entity can be empty. +func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) { + // If no client is offline, no need to append the message to the buffer + if len(uc.network.offlineClients) == 0 { + return + } + + history, ok := uc.network.history[entity] + if !ok { + history = &networkHistory{ + offlineClients: make(map[string]uint64), + ring: NewRing(uc.srv.RingCap), + } + uc.network.history[entity] = history + + for clientName, _ := range uc.network.offlineClients { + history.offlineClients[clientName] = 0 + } + } + + history.ring.Produce(msg) +} + // produce appends a message to the logs, adds it to the history and forwards // it to connected downstream connections. // @@ -1304,7 +1327,7 @@ func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstr uc.appendLog(target, msg) } - uc.network.ring.Produce(msg) + uc.appendHistory(target, msg) uc.forEachDownstream(func(dc *downstreamConn) { if dc != origin || dc.caps["echo-message"] { diff --git a/user.go b/user.go index 36702bf..dfa0f15 100644 --- a/user.go +++ b/user.go @@ -45,24 +45,29 @@ type eventDownstreamDisconnected struct { dc *downstreamConn } +type networkHistory struct { + offlineClients map[string]uint64 // indexed by client name + ring *Ring // can be nil if there are no offline clients +} + type network struct { Network user *user - ring *Ring stopped chan struct{} - conn *upstreamConn - history map[string]uint64 - lastError error + conn *upstreamConn + history map[string]*networkHistory // indexed by entity + offlineClients map[string]struct{} // indexed by client name + lastError error } func newNetwork(user *user, record *Network) *network { return &network{ - Network: *record, - user: user, - ring: NewRing(user.srv.RingCap), - stopped: make(chan struct{}), - history: make(map[string]uint64), + Network: *record, + user: user, + stopped: make(chan struct{}), + history: make(map[string]*networkHistory), + offlineClients: make(map[string]struct{}), } } @@ -294,11 +299,6 @@ func (u *user) run() { case eventDownstreamDisconnected: dc := e.dc - dc.forEachNetwork(func(net *network) { - seq := net.ring.Cur() - net.history[dc.clientName] = seq - }) - for i := range u.downstreamConns { if u.downstreamConns[i] == dc { u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...) @@ -306,6 +306,25 @@ func (u *user) run() { } } + // Save history if we're the last client with this name + skipHistory := make(map[*network]bool) + u.forEachDownstream(func(conn *downstreamConn) { + if dc.clientName == conn.clientName { + skipHistory[conn.network] = true + } + }) + + dc.forEachNetwork(func(net *network) { + if skipHistory[net] || skipHistory[nil] { + return + } + + net.offlineClients[dc.clientName] = struct{}{} + for _, history := range net.history { + history.offlineClients[dc.clientName] = history.ring.Cur() + } + }) + u.forEachUpstream(func(uc *upstreamConn) { uc.updateAway() })