Per-entity ring buffers

Instead of having one ring buffer per network, each network has one ring
buffer per entity (channel or nick). This allows history to be more
fair: if there's a lot of activity in a channel, it won't prune activity
in other channels.

We now track history sequence numbers per client and per network in
networkHistory. The overall list of offline clients is still tracked in
network.offlineClients.

When all clients have received history, the ring buffer can be released.

In the future, we should get rid of too-old offline clients to avoid
having to maintain history for them forever. We should also add a
per-user limit on the number of ring buffers.
This commit is contained in:
Simon Ser 2020-04-10 19:22:47 +02:00
parent 20a58b1fa3
commit db198335aa
No known key found for this signature in database
GPG Key ID: 0FDE7BE0E88F5E48
3 changed files with 84 additions and 33 deletions

View File

@ -637,15 +637,6 @@ func (dc *downstreamConn) welcome() error {
return err return err
} }
// Only send history if we're the first connected client with that name and
// network
sendHistory := true
dc.user.forEachDownstream(func(conn *downstreamConn) {
if dc.clientName == conn.clientName && dc.network == conn.network {
sendHistory = false
}
})
dc.SendMessage(&irc.Message{ dc.SendMessage(&irc.Message{
Prefix: dc.srv.prefix(), Prefix: dc.srv.prefix(),
Command: irc.RPL_WELCOME, Command: irc.RPL_WELCOME,
@ -688,12 +679,32 @@ func (dc *downstreamConn) welcome() error {
}) })
dc.forEachNetwork(func(net *network) { dc.forEachNetwork(func(net *network) {
seq, ok := net.history[dc.clientName] // Only send history if we're the first connected client with that name
if !sendHistory || !ok { // for the network
return if _, ok := net.offlineClients[dc.clientName]; ok {
dc.sendNetworkHistory(net)
delete(net.offlineClients, dc.clientName)
}
})
return nil
} }
consumer := net.ring.NewConsumer(seq) func (dc *downstreamConn) sendNetworkHistory(net *network) {
for target, history := range net.history {
seq, ok := history.offlineClients[dc.clientName]
if !ok {
continue
}
delete(history.offlineClients, dc.clientName)
// If all clients have received history, no need to keep the
// ring buffer around
if len(history.offlineClients) == 0 {
delete(net.history, target)
}
consumer := history.ring.NewConsumer(seq)
// TODO: this means all history is lost when trying to send it while the // TODO: this means all history is lost when trying to send it while the
// upstream is disconnected. We need to store history differently so that // upstream is disconnected. We need to store history differently so that
@ -725,9 +736,7 @@ func (dc *downstreamConn) welcome() error {
dc.SendMessage(dc.marshalMessage(msg, uc)) dc.SendMessage(dc.marshalMessage(msg, uc))
} }
}) }
return nil
} }
func (dc *downstreamConn) runUntilRegistered() error { func (dc *downstreamConn) runUntilRegistered() error {

View File

@ -556,11 +556,11 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
delete(ch.Members, msg.Prefix.Name) delete(ch.Members, msg.Prefix.Name)
ch.Members[newNick] = membership ch.Members[newNick] = membership
uc.appendLog(ch.Name, msg) uc.appendLog(ch.Name, msg)
uc.appendHistory(ch.Name, msg)
} }
} }
if !me { if !me {
uc.network.ring.Produce(msg)
uc.forEachDownstream(func(dc *downstreamConn) { uc.forEachDownstream(func(dc *downstreamConn) {
dc.SendMessage(dc.marshalMessage(msg, uc)) dc.SendMessage(dc.marshalMessage(msg, uc))
}) })
@ -662,11 +662,11 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
delete(ch.Members, msg.Prefix.Name) delete(ch.Members, msg.Prefix.Name)
uc.appendLog(ch.Name, msg) uc.appendLog(ch.Name, msg)
uc.appendHistory(ch.Name, msg)
} }
} }
if msg.Prefix.Name != uc.nick { if msg.Prefix.Name != uc.nick {
uc.network.ring.Produce(msg)
uc.forEachDownstream(func(dc *downstreamConn) { uc.forEachDownstream(func(dc *downstreamConn) {
dc.SendMessage(dc.marshalMessage(msg, uc)) dc.SendMessage(dc.marshalMessage(msg, uc))
}) })
@ -1294,6 +1294,29 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
} }
} }
// appendHistory appends a message to the history. entity can be empty.
func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) {
// If no client is offline, no need to append the message to the buffer
if len(uc.network.offlineClients) == 0 {
return
}
history, ok := uc.network.history[entity]
if !ok {
history = &networkHistory{
offlineClients: make(map[string]uint64),
ring: NewRing(uc.srv.RingCap),
}
uc.network.history[entity] = history
for clientName, _ := range uc.network.offlineClients {
history.offlineClients[clientName] = 0
}
}
history.ring.Produce(msg)
}
// produce appends a message to the logs, adds it to the history and forwards // produce appends a message to the logs, adds it to the history and forwards
// it to connected downstream connections. // it to connected downstream connections.
// //
@ -1304,7 +1327,7 @@ func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstr
uc.appendLog(target, msg) uc.appendLog(target, msg)
} }
uc.network.ring.Produce(msg) uc.appendHistory(target, msg)
uc.forEachDownstream(func(dc *downstreamConn) { uc.forEachDownstream(func(dc *downstreamConn) {
if dc != origin || dc.caps["echo-message"] { if dc != origin || dc.caps["echo-message"] {

37
user.go
View File

@ -45,14 +45,19 @@ type eventDownstreamDisconnected struct {
dc *downstreamConn dc *downstreamConn
} }
type networkHistory struct {
offlineClients map[string]uint64 // indexed by client name
ring *Ring // can be nil if there are no offline clients
}
type network struct { type network struct {
Network Network
user *user user *user
ring *Ring
stopped chan struct{} stopped chan struct{}
conn *upstreamConn conn *upstreamConn
history map[string]uint64 history map[string]*networkHistory // indexed by entity
offlineClients map[string]struct{} // indexed by client name
lastError error lastError error
} }
@ -60,9 +65,9 @@ func newNetwork(user *user, record *Network) *network {
return &network{ return &network{
Network: *record, Network: *record,
user: user, user: user,
ring: NewRing(user.srv.RingCap),
stopped: make(chan struct{}), stopped: make(chan struct{}),
history: make(map[string]uint64), history: make(map[string]*networkHistory),
offlineClients: make(map[string]struct{}),
} }
} }
@ -294,11 +299,6 @@ func (u *user) run() {
case eventDownstreamDisconnected: case eventDownstreamDisconnected:
dc := e.dc dc := e.dc
dc.forEachNetwork(func(net *network) {
seq := net.ring.Cur()
net.history[dc.clientName] = seq
})
for i := range u.downstreamConns { for i := range u.downstreamConns {
if u.downstreamConns[i] == dc { if u.downstreamConns[i] == dc {
u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...) u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
@ -306,6 +306,25 @@ func (u *user) run() {
} }
} }
// Save history if we're the last client with this name
skipHistory := make(map[*network]bool)
u.forEachDownstream(func(conn *downstreamConn) {
if dc.clientName == conn.clientName {
skipHistory[conn.network] = true
}
})
dc.forEachNetwork(func(net *network) {
if skipHistory[net] || skipHistory[nil] {
return
}
net.offlineClients[dc.clientName] = struct{}{}
for _, history := range net.history {
history.offlineClients[dc.clientName] = history.ring.Cur()
}
})
u.forEachUpstream(func(uc *upstreamConn) { u.forEachUpstream(func(uc *upstreamConn) {
uc.updateAway() uc.updateAway()
}) })