From 92fece5cd4b29da4944263ffa258dc76dfe2a1f5 Mon Sep 17 00:00:00 2001 From: Simon Ser Date: Wed, 19 Aug 2020 13:24:05 +0200 Subject: [PATCH] Nuke in-memory ring buffer Instead, always read chat history from logs. Unify the implicit chat history (pushing history to clients) and explicit chat history (via the CHATHISTORY command). Instead of keeping track of ring buffer cursors for each client, use message IDs. If necessary, the ring buffer could be re-introduced behind a common MessageStore interface (could be useful when on-disk logs are disabled). References: https://todo.sr.ht/~emersion/soju/80 --- downstream.go | 35 ++++++++++++++------- ring.go | 86 --------------------------------------------------- server.go | 2 -- upstream.go | 37 ++++++++++------------ user.go | 3 +- 5 files changed, 41 insertions(+), 122 deletions(-) delete mode 100644 ring.go diff --git a/downstream.go b/downstream.go index 09b34c7..cf651a6 100644 --- a/downstream.go +++ b/downstream.go @@ -850,13 +850,27 @@ func (dc *downstreamConn) welcome() error { dc.sendNetworkHistory(net) delete(net.offlineClients, dc.clientName) } + + // Fast-forward history to last message + for target, history := range net.history { + if ch, ok := net.channels[target]; ok && ch.Detached { + continue + } + + lastID, err := lastMsgID(net, target, time.Now()) + if err != nil { + dc.logger.Printf("failed to get last message ID: %v", err) + continue + } + history.clients[dc.clientName] = lastID + } }) return nil } func (dc *downstreamConn) sendNetworkHistory(net *network) { - if dc.caps["draft/chathistory"] { + if dc.caps["draft/chathistory"] || dc.srv.LogPath == "" { return } for target, history := range net.history { @@ -864,13 +878,17 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) { continue } - seq, ok := history.clients[dc.clientName] + lastDelivered, ok := history.clients[dc.clientName] if !ok { continue } - history.clients[dc.clientName] = history.ring.Cur() - consumer := history.ring.NewConsumer(seq) + limit := 4000 + history, err := loadHistoryLatestID(dc.network, target, lastDelivered, limit) + if err != nil { + dc.logger.Printf("failed to send implicit history for %q: %v", target, err) + continue + } batchRef := "history" if dc.caps["batch"] { @@ -881,12 +899,7 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) { }) } - for { - msg := consumer.Consume() - if msg == nil { - break - } - + for _, msg := range history { // Don't replay all messages, because that would mess up client // state. For instance we just sent the list of users, sending // PART messages for one of these users would be incorrect. @@ -900,10 +913,8 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) { } if dc.caps["batch"] { - msg = msg.Copy() msg.Tags["batch"] = irc.TagValue(batchRef) } - dc.SendMessage(dc.marshalMessage(msg, net)) } diff --git a/ring.go b/ring.go deleted file mode 100644 index 9eacd8b..0000000 --- a/ring.go +++ /dev/null @@ -1,86 +0,0 @@ -package soju - -import ( - "fmt" - - "gopkg.in/irc.v3" -) - -// Ring implements a single producer, multiple consumer ring buffer. The ring -// buffer size is fixed. The ring buffer is stored in memory. -type Ring struct { - buffer []*irc.Message - cap uint64 - cur uint64 -} - -// NewRing creates a new ring buffer. -func NewRing(capacity int) *Ring { - return &Ring{ - buffer: make([]*irc.Message, capacity), - cap: uint64(capacity), - } -} - -// Produce appends a new message to the ring buffer. -func (r *Ring) Produce(msg *irc.Message) { - i := int(r.cur % r.cap) - r.buffer[i] = msg - r.cur++ -} - -// Cur returns the current history sequence number. -func (r *Ring) Cur() uint64 { - return r.cur -} - -// NewConsumer creates a new ring buffer consumer. -// -// The consumer will get messages starting from the specified history sequence -// number (see Ring.Cur). -func (r *Ring) NewConsumer(seq uint64) *RingConsumer { - return &RingConsumer{ring: r, cur: seq} -} - -// RingConsumer is a ring buffer consumer. -type RingConsumer struct { - ring *Ring - cur uint64 -} - -// diff returns the number of pending messages. It assumes the Ring is locked. -func (rc *RingConsumer) diff() uint64 { - if rc.cur > rc.ring.cur { - panic(fmt.Sprintf("soju: consumer cursor (%v) greater than producer cursor (%v)", rc.cur, rc.ring.cur)) - } - return rc.ring.cur - rc.cur -} - -// Peek returns the next pending message if any without consuming it. A nil -// message is returned if no message is available. -func (rc *RingConsumer) Peek() *irc.Message { - diff := rc.diff() - if diff == 0 { - return nil - } - if diff > rc.ring.cap { - // Consumer drops diff - cap entries - rc.cur = rc.ring.cur - rc.ring.cap - } - i := int(rc.cur % rc.ring.cap) - msg := rc.ring.buffer[i] - if msg == nil { - panic(fmt.Sprintf("soju: unexpected nil ring buffer entry at index %v", i)) - } - return msg -} - -// Consume consumes and returns the next pending message. A nil message is -// returned if no message is available. -func (rc *RingConsumer) Consume() *irc.Message { - msg := rc.Peek() - if msg != nil { - rc.cur++ - } - return msg -} diff --git a/server.go b/server.go index c538c20..b04f385 100644 --- a/server.go +++ b/server.go @@ -47,7 +47,6 @@ func (l *prefixLogger) Printf(format string, v ...interface{}) { type Server struct { Hostname string Logger Logger - RingCap int HistoryLimit int LogPath string Debug bool @@ -64,7 +63,6 @@ type Server struct { func NewServer(db *DB) *Server { return &Server{ Logger: log.New(log.Writer(), "", log.LstdFlags), - RingCap: 4096, HistoryLimit: 1000, users: make(map[string]*user), db: db, diff --git a/upstream.go b/upstream.go index ecbf19c..97b2d03 100644 --- a/upstream.go +++ b/upstream.go @@ -708,7 +708,6 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error { delete(ch.Members, msg.Prefix.Name) ch.Members[newNick] = memberships uc.appendLog(ch.Name, msg) - uc.appendHistory(ch.Name, msg) } } @@ -818,7 +817,6 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error { delete(ch.Members, msg.Prefix.Name) uc.appendLog(ch.Name, msg) - uc.appendHistory(ch.Name, msg) } } @@ -1611,7 +1609,6 @@ func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message uc.SendMessage(msg) } -// TODO: handle moving logs when a network name changes, when support for this is added func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) { if uc.srv.LogPath == "" { return @@ -1623,13 +1620,6 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) { uc.messageLoggers[entity] = ml } - if _, err := ml.Append(msg); err != nil { - uc.logger.Printf("failed to log message: %v", err) - } -} - -// appendHistory appends a message to the history. entity can be empty. -func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) { detached := false if ch, ok := uc.network.channels[entity]; ok { detached = ch.Detached @@ -1637,36 +1627,45 @@ func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) { history, ok := uc.network.history[entity] if !ok { + lastID, err := lastMsgID(uc.network, entity, time.Now()) + if err != nil { + uc.logger.Printf("failed to log message: failed to get last message ID: %v", err) + return + } + history = &networkHistory{ - clients: make(map[string]uint64), - ring: NewRing(uc.srv.RingCap), + clients: make(map[string]string), } uc.network.history[entity] = history for clientName, _ := range uc.network.offlineClients { - history.clients[clientName] = 0 + history.clients[clientName] = lastID } if detached { // If the channel is detached, online clients act as offline // clients too uc.forEachDownstream(func(dc *downstreamConn) { - history.clients[dc.clientName] = 0 + history.clients[dc.clientName] = lastID }) } } - history.ring.Produce(msg) + msgID, err := ml.Append(msg) + if err != nil { + uc.logger.Printf("failed to log message: %v", err) + return + } if !detached { uc.forEachDownstream(func(dc *downstreamConn) { - history.clients[dc.clientName] = history.ring.Cur() + history.clients[dc.clientName] = msgID }) } } -// produce appends a message to the logs, adds it to the history and forwards -// it to connected downstream connections. +// produce appends a message to the logs and forwards it to connected downstream +// connections. // // If origin is not nil and origin doesn't support echo-message, the message is // forwarded to all connections except origin. @@ -1675,8 +1674,6 @@ func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstr uc.appendLog(target, msg) } - uc.appendHistory(target, msg) - // Don't forward messages if it's a detached channel if ch, ok := uc.network.channels[target]; ok && ch.Detached { return diff --git a/user.go b/user.go index 4cae173..021e5b6 100644 --- a/user.go +++ b/user.go @@ -51,8 +51,7 @@ type eventDownstreamDisconnected struct { type eventStop struct{} type networkHistory struct { - clients map[string]uint64 // indexed by client name - ring *Ring // can be nil if there are no offline clients + clients map[string]string // indexed by client name } type network struct {