diff --git a/downstream.go b/downstream.go index 6a9cf55..2515ae2 100644 --- a/downstream.go +++ b/downstream.go @@ -289,6 +289,73 @@ func (dc *downstreamConn) SendMessage(msg *irc.Message) { dc.conn.SendMessage(msg) } +// sendMessageWithID sends an outgoing message with the specified internal ID. +func (dc *downstreamConn) sendMessageWithID(msg *irc.Message, id string) { + dc.SendMessage(msg) + + if id == "" || !dc.messageSupportsHistory(msg) { + return + } + + dc.sendPing(id) +} + +// advanceMessageWithID advances history to the specified message ID without +// sending a message. This is useful e.g. for self-messages when echo-message +// isn't enabled. +func (dc *downstreamConn) advanceMessageWithID(msg *irc.Message, id string) { + if id == "" || !dc.messageSupportsHistory(msg) { + return + } + + dc.sendPing(id) +} + +// ackMsgID acknowledges that a message has been received. +func (dc *downstreamConn) ackMsgID(id string) { + netName, entity, _, _, err := parseMsgID(id) + if err != nil { + dc.logger.Printf("failed to ACK message ID %q: %v", id, err) + return + } + + network := dc.user.getNetwork(netName) + if network == nil { + return + } + + history, ok := network.history[entity] + if !ok { + return + } + + history.clients[dc.clientName] = id +} + +func (dc *downstreamConn) sendPing(msgID string) { + token := "soju-msgid-" + base64.RawURLEncoding.EncodeToString([]byte(msgID)) + dc.SendMessage(&irc.Message{ + Command: "PING", + Params: []string{token}, + }) +} + +func (dc *downstreamConn) handlePong(token string) { + if !strings.HasPrefix(token, "soju-msgid-") { + dc.logger.Printf("received unrecognized PONG token %q", token) + return + } + token = strings.TrimPrefix(token, "soju-msgid-") + b, err := base64.RawURLEncoding.DecodeString(token) + if err != nil { + dc.logger.Printf("received malformed PONG token: %v", err) + return + } + msgID := string(b) + + dc.ackMsgID(msgID) +} + // marshalMessage re-formats a message coming from an upstream connection so // that it's suitable for being sent on this downstream connection. Only // messages that may appear in logs are supported, except MODE. @@ -875,6 +942,20 @@ func (dc *downstreamConn) welcome() error { return nil } +// messageSupportsHistory checks whether the provided message can be sent as +// part of an history batch. +func (dc *downstreamConn) messageSupportsHistory(msg *irc.Message) bool { + // Don't replay all messages, because that would mess up client + // state. For instance we just sent the list of users, sending + // PART messages for one of these users would be incorrect. + // TODO: add support for draft/event-playback + switch msg.Command { + case "PRIVMSG", "NOTICE": + return true + } + return false +} + func (dc *downstreamConn) sendNetworkHistory(net *network) { if dc.caps["draft/chathistory"] || dc.user.msgStore == nil { return @@ -906,15 +987,7 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) { } for _, msg := range history { - // Don't replay all messages, because that would mess up client - // state. For instance we just sent the list of users, sending - // PART messages for one of these users would be incorrect. - ignore := true - switch msg.Command { - case "PRIVMSG", "NOTICE": - ignore = false - } - if ignore { + if !dc.messageSupportsHistory(msg) { continue } @@ -983,6 +1056,12 @@ func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error { Params: []string{dc.srv.Hostname, source}, }) return nil + case "PONG": + if len(msg.Params) == 0 { + return newNeedMoreParamsError(msg.Command) + } + token := msg.Params[len(msg.Params)-1] + dc.handlePong(token) case "USER": return ircError{&irc.Message{ Command: irc.ERR_ALREADYREGISTERED, diff --git a/upstream.go b/upstream.go index 729471d..3a9df76 100644 --- a/upstream.go +++ b/upstream.go @@ -1607,9 +1607,13 @@ func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message uc.SendMessage(msg) } -func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) { +// appendLog appends a message to the log file. +// +// The internal message ID is returned. If the message isn't recorded in the +// log file, an empty string is returned. +func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) (msgID string) { if uc.user.msgStore == nil { - return + return "" } detached := false @@ -1622,7 +1626,7 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) { lastID, err := uc.user.msgStore.LastMsgID(uc.network, entity, time.Now()) if err != nil { uc.logger.Printf("failed to log message: failed to get last message ID: %v", err) - return + return "" } history = &networkHistory{ @@ -1646,14 +1650,10 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) { msgID, err := uc.user.msgStore.Append(uc.network, entity, msg) if err != nil { uc.logger.Printf("failed to log message: %v", err) - return + return "" } - if !detached && msgID != "" { - uc.forEachDownstream(func(dc *downstreamConn) { - history.clients[dc.clientName] = msgID - }) - } + return msgID } // produce appends a message to the logs and forwards it to connected downstream @@ -1662,8 +1662,9 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) { // If origin is not nil and origin doesn't support echo-message, the message is // forwarded to all connections except origin. func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstreamConn) { + var msgID string if target != "" { - uc.appendLog(target, msg) + msgID = uc.appendLog(target, msg) } // Don't forward messages if it's a detached channel @@ -1673,7 +1674,9 @@ func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstr uc.forEachDownstream(func(dc *downstreamConn) { if dc != origin || dc.caps["echo-message"] { - dc.SendMessage(dc.marshalMessage(msg, uc.network)) + dc.sendMessageWithID(dc.marshalMessage(msg, uc.network), msgID) + } else { + dc.advanceMessageWithID(msg, msgID) } }) }