diff --git a/downstream.go b/downstream.go index 09b34c7..cf651a6 100644 --- a/downstream.go +++ b/downstream.go @@ -850,13 +850,27 @@ func (dc *downstreamConn) welcome() error { dc.sendNetworkHistory(net) delete(net.offlineClients, dc.clientName) } + + // Fast-forward history to last message + for target, history := range net.history { + if ch, ok := net.channels[target]; ok && ch.Detached { + continue + } + + lastID, err := lastMsgID(net, target, time.Now()) + if err != nil { + dc.logger.Printf("failed to get last message ID: %v", err) + continue + } + history.clients[dc.clientName] = lastID + } }) return nil } func (dc *downstreamConn) sendNetworkHistory(net *network) { - if dc.caps["draft/chathistory"] { + if dc.caps["draft/chathistory"] || dc.srv.LogPath == "" { return } for target, history := range net.history { @@ -864,13 +878,17 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) { continue } - seq, ok := history.clients[dc.clientName] + lastDelivered, ok := history.clients[dc.clientName] if !ok { continue } - history.clients[dc.clientName] = history.ring.Cur() - consumer := history.ring.NewConsumer(seq) + limit := 4000 + history, err := loadHistoryLatestID(dc.network, target, lastDelivered, limit) + if err != nil { + dc.logger.Printf("failed to send implicit history for %q: %v", target, err) + continue + } batchRef := "history" if dc.caps["batch"] { @@ -881,12 +899,7 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) { }) } - for { - msg := consumer.Consume() - if msg == nil { - break - } - + for _, msg := range history { // Don't replay all messages, because that would mess up client // state. For instance we just sent the list of users, sending // PART messages for one of these users would be incorrect. @@ -900,10 +913,8 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) { } if dc.caps["batch"] { - msg = msg.Copy() msg.Tags["batch"] = irc.TagValue(batchRef) } - dc.SendMessage(dc.marshalMessage(msg, net)) } diff --git a/ring.go b/ring.go deleted file mode 100644 index 9eacd8b..0000000 --- a/ring.go +++ /dev/null @@ -1,86 +0,0 @@ -package soju - -import ( - "fmt" - - "gopkg.in/irc.v3" -) - -// Ring implements a single producer, multiple consumer ring buffer. The ring -// buffer size is fixed. The ring buffer is stored in memory. -type Ring struct { - buffer []*irc.Message - cap uint64 - cur uint64 -} - -// NewRing creates a new ring buffer. -func NewRing(capacity int) *Ring { - return &Ring{ - buffer: make([]*irc.Message, capacity), - cap: uint64(capacity), - } -} - -// Produce appends a new message to the ring buffer. -func (r *Ring) Produce(msg *irc.Message) { - i := int(r.cur % r.cap) - r.buffer[i] = msg - r.cur++ -} - -// Cur returns the current history sequence number. -func (r *Ring) Cur() uint64 { - return r.cur -} - -// NewConsumer creates a new ring buffer consumer. -// -// The consumer will get messages starting from the specified history sequence -// number (see Ring.Cur). -func (r *Ring) NewConsumer(seq uint64) *RingConsumer { - return &RingConsumer{ring: r, cur: seq} -} - -// RingConsumer is a ring buffer consumer. -type RingConsumer struct { - ring *Ring - cur uint64 -} - -// diff returns the number of pending messages. It assumes the Ring is locked. -func (rc *RingConsumer) diff() uint64 { - if rc.cur > rc.ring.cur { - panic(fmt.Sprintf("soju: consumer cursor (%v) greater than producer cursor (%v)", rc.cur, rc.ring.cur)) - } - return rc.ring.cur - rc.cur -} - -// Peek returns the next pending message if any without consuming it. A nil -// message is returned if no message is available. -func (rc *RingConsumer) Peek() *irc.Message { - diff := rc.diff() - if diff == 0 { - return nil - } - if diff > rc.ring.cap { - // Consumer drops diff - cap entries - rc.cur = rc.ring.cur - rc.ring.cap - } - i := int(rc.cur % rc.ring.cap) - msg := rc.ring.buffer[i] - if msg == nil { - panic(fmt.Sprintf("soju: unexpected nil ring buffer entry at index %v", i)) - } - return msg -} - -// Consume consumes and returns the next pending message. A nil message is -// returned if no message is available. -func (rc *RingConsumer) Consume() *irc.Message { - msg := rc.Peek() - if msg != nil { - rc.cur++ - } - return msg -} diff --git a/server.go b/server.go index c538c20..b04f385 100644 --- a/server.go +++ b/server.go @@ -47,7 +47,6 @@ func (l *prefixLogger) Printf(format string, v ...interface{}) { type Server struct { Hostname string Logger Logger - RingCap int HistoryLimit int LogPath string Debug bool @@ -64,7 +63,6 @@ type Server struct { func NewServer(db *DB) *Server { return &Server{ Logger: log.New(log.Writer(), "", log.LstdFlags), - RingCap: 4096, HistoryLimit: 1000, users: make(map[string]*user), db: db, diff --git a/upstream.go b/upstream.go index ecbf19c..97b2d03 100644 --- a/upstream.go +++ b/upstream.go @@ -708,7 +708,6 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error { delete(ch.Members, msg.Prefix.Name) ch.Members[newNick] = memberships uc.appendLog(ch.Name, msg) - uc.appendHistory(ch.Name, msg) } } @@ -818,7 +817,6 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error { delete(ch.Members, msg.Prefix.Name) uc.appendLog(ch.Name, msg) - uc.appendHistory(ch.Name, msg) } } @@ -1611,7 +1609,6 @@ func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message uc.SendMessage(msg) } -// TODO: handle moving logs when a network name changes, when support for this is added func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) { if uc.srv.LogPath == "" { return @@ -1623,13 +1620,6 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) { uc.messageLoggers[entity] = ml } - if _, err := ml.Append(msg); err != nil { - uc.logger.Printf("failed to log message: %v", err) - } -} - -// appendHistory appends a message to the history. entity can be empty. -func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) { detached := false if ch, ok := uc.network.channels[entity]; ok { detached = ch.Detached @@ -1637,36 +1627,45 @@ func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) { history, ok := uc.network.history[entity] if !ok { + lastID, err := lastMsgID(uc.network, entity, time.Now()) + if err != nil { + uc.logger.Printf("failed to log message: failed to get last message ID: %v", err) + return + } + history = &networkHistory{ - clients: make(map[string]uint64), - ring: NewRing(uc.srv.RingCap), + clients: make(map[string]string), } uc.network.history[entity] = history for clientName, _ := range uc.network.offlineClients { - history.clients[clientName] = 0 + history.clients[clientName] = lastID } if detached { // If the channel is detached, online clients act as offline // clients too uc.forEachDownstream(func(dc *downstreamConn) { - history.clients[dc.clientName] = 0 + history.clients[dc.clientName] = lastID }) } } - history.ring.Produce(msg) + msgID, err := ml.Append(msg) + if err != nil { + uc.logger.Printf("failed to log message: %v", err) + return + } if !detached { uc.forEachDownstream(func(dc *downstreamConn) { - history.clients[dc.clientName] = history.ring.Cur() + history.clients[dc.clientName] = msgID }) } } -// produce appends a message to the logs, adds it to the history and forwards -// it to connected downstream connections. +// produce appends a message to the logs and forwards it to connected downstream +// connections. // // If origin is not nil and origin doesn't support echo-message, the message is // forwarded to all connections except origin. @@ -1675,8 +1674,6 @@ func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstr uc.appendLog(target, msg) } - uc.appendHistory(target, msg) - // Don't forward messages if it's a detached channel if ch, ok := uc.network.channels[target]; ok && ch.Detached { return diff --git a/user.go b/user.go index 4cae173..021e5b6 100644 --- a/user.go +++ b/user.go @@ -51,8 +51,7 @@ type eventDownstreamDisconnected struct { type eventStop struct{} type networkHistory struct { - clients map[string]uint64 // indexed by client name - ring *Ring // can be nil if there are no offline clients + clients map[string]string // indexed by client name } type network struct {