Add support for draft/event-playback

This commit is contained in:
delthas 2021-10-09 00:13:16 +02:00 committed by Simon Ser
parent a6aa7f0008
commit 03f8972305
4 changed files with 160 additions and 50 deletions

View File

@ -518,7 +518,7 @@ func (dc *downstreamConn) SendBatch(typ string, params []string, tags irc.Tags,
func (dc *downstreamConn) sendMessageWithID(msg *irc.Message, id string) { func (dc *downstreamConn) sendMessageWithID(msg *irc.Message, id string) {
dc.SendMessage(msg) dc.SendMessage(msg)
if id == "" || !dc.messageSupportsHistory(msg) { if id == "" || !dc.messageSupportsBacklog(msg) {
return return
} }
@ -529,7 +529,7 @@ func (dc *downstreamConn) sendMessageWithID(msg *irc.Message, id string) {
// sending a message. This is useful e.g. for self-messages when echo-message // sending a message. This is useful e.g. for self-messages when echo-message
// isn't enabled. // isn't enabled.
func (dc *downstreamConn) advanceMessageWithID(msg *irc.Message, id string) { func (dc *downstreamConn) advanceMessageWithID(msg *irc.Message, id string) {
if id == "" || !dc.messageSupportsHistory(msg) { if id == "" || !dc.messageSupportsBacklog(msg) {
return return
} }
@ -571,8 +571,13 @@ func (dc *downstreamConn) handlePong(token string) {
// marshalMessage re-formats a message coming from an upstream connection so // marshalMessage re-formats a message coming from an upstream connection so
// that it's suitable for being sent on this downstream connection. Only // that it's suitable for being sent on this downstream connection. Only
// messages that may appear in logs are supported, except MODE. // messages that may appear in logs are supported, except MODE messages which
// may only appear in single-upstream mode.
func (dc *downstreamConn) marshalMessage(msg *irc.Message, net *network) *irc.Message { func (dc *downstreamConn) marshalMessage(msg *irc.Message, net *network) *irc.Message {
if dc.network != nil {
return msg
}
msg = msg.Copy() msg = msg.Copy()
msg.Prefix = dc.marshalUserPrefix(net, msg.Prefix) msg.Prefix = dc.marshalUserPrefix(net, msg.Prefix)
@ -983,6 +988,12 @@ func (dc *downstreamConn) updateSupportedCaps() {
dc.unsetSupportedCap(cap) dc.unsetSupportedCap(cap)
} }
} }
if dc.srv.LogPath != "" && dc.network != nil {
dc.setSupportedCap("draft/event-playback", "")
} else {
dc.unsetSupportedCap("draft/event-playback")
}
} }
func (dc *downstreamConn) updateNick() { func (dc *downstreamConn) updateNick() {
@ -1297,13 +1308,12 @@ func (dc *downstreamConn) welcome() error {
return nil return nil
} }
// messageSupportsHistory checks whether the provided message can be sent as // messageSupportsBacklog checks whether the provided message can be sent as
// part of an history batch. // part of an history batch.
func (dc *downstreamConn) messageSupportsHistory(msg *irc.Message) bool { func (dc *downstreamConn) messageSupportsBacklog(msg *irc.Message) bool {
// Don't replay all messages, because that would mess up client // Don't replay all messages, because that would mess up client
// state. For instance we just sent the list of users, sending // state. For instance we just sent the list of users, sending
// PART messages for one of these users would be incorrect. // PART messages for one of these users would be incorrect.
// TODO: add support for draft/event-playback
switch msg.Command { switch msg.Command {
case "PRIVMSG", "NOTICE": case "PRIVMSG", "NOTICE":
return true return true
@ -1328,10 +1338,6 @@ func (dc *downstreamConn) sendTargetBacklog(net *network, target, msgID string)
dc.SendBatch("chathistory", []string{dc.marshalEntity(net, target)}, nil, func(batchRef irc.TagValue) { dc.SendBatch("chathistory", []string{dc.marshalEntity(net, target)}, nil, func(batchRef irc.TagValue) {
for _, msg := range history { for _, msg := range history {
if !dc.messageSupportsHistory(msg) {
continue
}
if ch != nil && ch.Detached { if ch != nil && ch.Detached {
if net.detachedMessageNeedsRelay(ch, msg) { if net.detachedMessageNeedsRelay(ch, msg) {
dc.relayDetachedMessage(net, msg) dc.relayDetachedMessage(net, msg)
@ -2326,21 +2332,23 @@ func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
}} }}
} }
eventPlayback := dc.caps["draft/event-playback"]
var history []*irc.Message var history []*irc.Message
switch subcommand { switch subcommand {
case "BEFORE": case "BEFORE":
history, err = store.LoadBeforeTime(network, entity, bounds[0], time.Time{}, limit) history, err = store.LoadBeforeTime(network, entity, bounds[0], time.Time{}, limit, eventPlayback)
case "AFTER": case "AFTER":
history, err = store.LoadAfterTime(network, entity, bounds[0], time.Now(), limit) history, err = store.LoadAfterTime(network, entity, bounds[0], time.Now(), limit, eventPlayback)
case "BETWEEN": case "BETWEEN":
if bounds[0].Before(bounds[1]) { if bounds[0].Before(bounds[1]) {
history, err = store.LoadAfterTime(network, entity, bounds[0], bounds[1], limit) history, err = store.LoadAfterTime(network, entity, bounds[0], bounds[1], limit, eventPlayback)
} else { } else {
history, err = store.LoadBeforeTime(network, entity, bounds[0], bounds[1], limit) history, err = store.LoadBeforeTime(network, entity, bounds[0], bounds[1], limit, eventPlayback)
} }
case "TARGETS": case "TARGETS":
// TODO: support TARGETS in multi-upstream mode // TODO: support TARGETS in multi-upstream mode
targets, err := store.ListTargets(network, bounds[0], bounds[1], limit) targets, err := store.ListTargets(network, bounds[0], bounds[1], limit, eventPlayback)
if err != nil { if err != nil {
dc.logger.Printf("failed fetching targets for chathistory: %v", err) dc.logger.Printf("failed fetching targets for chathistory: %v", err)
return ircError{&irc.Message{ return ircError{&irc.Message{

View File

@ -17,6 +17,8 @@ type messageStore interface {
// date. The message ID returned may not refer to a valid message, but can be // date. The message ID returned may not refer to a valid message, but can be
// used in history queries. // used in history queries.
LastMsgID(network *network, entity string, t time.Time) (string, error) LastMsgID(network *network, entity string, t time.Time) (string, error)
// LoadLatestID queries the latest non-event messages for the given network,
// entity and date, up to a count of limit messages, sorted from oldest to newest.
LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error)
Append(network *network, entity string, msg *irc.Message) (id string, err error) Append(network *network, entity string, msg *irc.Message) (id string, err error)
} }
@ -34,15 +36,18 @@ type chatHistoryMessageStore interface {
// ListTargets lists channels and nicknames by time of the latest message. // ListTargets lists channels and nicknames by time of the latest message.
// It returns up to limit targets, starting from start and ending on end, // It returns up to limit targets, starting from start and ending on end,
// both excluded. end may be before or after start. // both excluded. end may be before or after start.
ListTargets(network *network, start, end time.Time, limit int) ([]chatHistoryTarget, error) // If events is false, only PRIVMSG/NOTICE messages are considered.
ListTargets(network *network, start, end time.Time, limit int, events bool) ([]chatHistoryTarget, error)
// LoadBeforeTime loads up to limit messages before start down to end. The // LoadBeforeTime loads up to limit messages before start down to end. The
// returned messages must be between and excluding the provided bounds. // returned messages must be between and excluding the provided bounds.
// end is before start. // end is before start.
LoadBeforeTime(network *network, entity string, start, end time.Time, limit int) ([]*irc.Message, error) // If events is false, only PRIVMSG/NOTICE messages are considered.
LoadBeforeTime(network *network, entity string, start, end time.Time, limit int, events bool) ([]*irc.Message, error)
// LoadBeforeTime loads up to limit messages after start up to end. The // LoadBeforeTime loads up to limit messages after start up to end. The
// returned messages must be between and excluding the provided bounds. // returned messages must be between and excluding the provided bounds.
// end is after start. // end is after start.
LoadAfterTime(network *network, entity string, start, end time.Time, limit int) ([]*irc.Message, error) // If events is false, only PRIVMSG/NOTICE messages are considered.
LoadAfterTime(network *network, entity string, start, end time.Time, limit int, events bool) ([]*irc.Message, error)
} }
type msgIDType uint type msgIDType uint

View File

@ -249,7 +249,7 @@ func formatMessage(msg *irc.Message) string {
} }
} }
func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, error) { func parseMessage(line, entity string, ref time.Time, events bool) (*irc.Message, time.Time, error) {
var hour, minute, second int var hour, minute, second int
_, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second) _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
if err != nil { if err != nil {
@ -257,30 +257,121 @@ func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time,
} }
line = line[11:] line = line[11:]
var cmd, sender, text string var cmd string
if strings.HasPrefix(line, "<") { var prefix *irc.Prefix
cmd = "PRIVMSG" var params []string
parts := strings.SplitN(line[1:], "> ", 2) if events && strings.HasPrefix(line, "*** ") {
parts := strings.SplitN(line[4:], " ", 2)
if len(parts) != 2 { if len(parts) != 2 {
return nil, time.Time{}, nil return nil, time.Time{}, nil
} }
sender, text = parts[0], parts[1] switch parts[0] {
} else if strings.HasPrefix(line, "-") { case "Joins:", "Parts:", "Quits:":
cmd = "NOTICE" args := strings.SplitN(parts[1], " ", 3)
parts := strings.SplitN(line[1:], "- ", 2) if len(args) < 2 {
if len(parts) != 2 { return nil, time.Time{}, nil
return nil, time.Time{}, nil }
nick := args[0]
mask := strings.TrimSuffix(strings.TrimPrefix(args[1], "("), ")")
maskParts := strings.SplitN(mask, "@", 2)
if len(maskParts) != 2 {
return nil, time.Time{}, nil
}
prefix = &irc.Prefix{
Name: nick,
User: maskParts[0],
Host: maskParts[1],
}
var reason string
if len(args) > 2 {
reason = strings.TrimSuffix(strings.TrimPrefix(args[2], "("), ")")
}
switch parts[0] {
case "Joins:":
cmd = "JOIN"
params = []string{entity}
case "Parts:":
cmd = "PART"
if reason != "" {
params = []string{entity, reason}
} else {
params = []string{entity}
}
case "Quits:":
cmd = "QUIT"
if reason != "" {
params = []string{reason}
}
}
default:
nick := parts[0]
rem := parts[1]
if r := strings.TrimPrefix(rem, "is now known as "); r != rem {
cmd = "NICK"
prefix = &irc.Prefix{
Name: nick,
}
params = []string{r}
} else if r := strings.TrimPrefix(rem, "was kicked by "); r != rem {
args := strings.SplitN(r, " ", 2)
if len(args) != 2 {
return nil, time.Time{}, nil
}
cmd = "KICK"
prefix = &irc.Prefix{
Name: args[0],
}
reason := strings.TrimSuffix(strings.TrimPrefix(args[1], "("), ")")
params = []string{entity, nick}
if reason != "" {
params = append(params, reason)
}
} else if r := strings.TrimPrefix(rem, "changes topic to "); r != rem {
cmd = "TOPIC"
prefix = &irc.Prefix{
Name: nick,
}
topic := strings.TrimSuffix(strings.TrimPrefix(r, "'"), "'")
params = []string{entity, topic}
} else if r := strings.TrimPrefix(rem, "sets mode: "); r != rem {
cmd = "MODE"
prefix = &irc.Prefix{
Name: nick,
}
params = append([]string{entity}, strings.Split(r, " ")...)
} else {
return nil, time.Time{}, nil
}
} }
sender, text = parts[0], parts[1]
} else if strings.HasPrefix(line, "* ") {
cmd = "PRIVMSG"
parts := strings.SplitN(line[2:], " ", 2)
if len(parts) != 2 {
return nil, time.Time{}, nil
}
sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
} else { } else {
return nil, time.Time{}, nil var sender, text string
if strings.HasPrefix(line, "<") {
cmd = "PRIVMSG"
parts := strings.SplitN(line[1:], "> ", 2)
if len(parts) != 2 {
return nil, time.Time{}, nil
}
sender, text = parts[0], parts[1]
} else if strings.HasPrefix(line, "-") {
cmd = "NOTICE"
parts := strings.SplitN(line[1:], "- ", 2)
if len(parts) != 2 {
return nil, time.Time{}, nil
}
sender, text = parts[0], parts[1]
} else if strings.HasPrefix(line, "* ") {
cmd = "PRIVMSG"
parts := strings.SplitN(line[2:], " ", 2)
if len(parts) != 2 {
return nil, time.Time{}, nil
}
sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
} else {
return nil, time.Time{}, nil
}
prefix = &irc.Prefix{Name: sender}
params = []string{entity, text}
} }
year, month, day := ref.Date() year, month, day := ref.Date()
@ -290,14 +381,14 @@ func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time,
Tags: map[string]irc.TagValue{ Tags: map[string]irc.TagValue{
"time": irc.TagValue(t.UTC().Format(serverTimeLayout)), "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
}, },
Prefix: &irc.Prefix{Name: sender}, Prefix: prefix,
Command: cmd, Command: cmd,
Params: []string{entity, text}, Params: params,
} }
return msg, t, nil return msg, t, nil
} }
func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, ref time.Time, end time.Time, limit int, afterOffset int64) ([]*irc.Message, error) { func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, ref time.Time, end time.Time, events bool, limit int, afterOffset int64) ([]*irc.Message, error) {
path := ms.logPath(network, entity, ref) path := ms.logPath(network, entity, ref)
f, err := os.Open(path) f, err := os.Open(path)
if err != nil { if err != nil {
@ -321,7 +412,7 @@ func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, r
} }
for sc.Scan() { for sc.Scan() {
msg, t, err := parseMessage(sc.Text(), entity, ref) msg, t, err := parseMessage(sc.Text(), entity, ref, events)
if err != nil { if err != nil {
return nil, err return nil, err
} else if msg == nil || !t.After(end) { } else if msg == nil || !t.After(end) {
@ -353,7 +444,7 @@ func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, r
} }
} }
func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, ref time.Time, end time.Time, limit int) ([]*irc.Message, error) { func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, ref time.Time, end time.Time, events bool, limit int) ([]*irc.Message, error) {
path := ms.logPath(network, entity, ref) path := ms.logPath(network, entity, ref)
f, err := os.Open(path) f, err := os.Open(path)
if err != nil { if err != nil {
@ -367,7 +458,7 @@ func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, re
var history []*irc.Message var history []*irc.Message
sc := bufio.NewScanner(f) sc := bufio.NewScanner(f)
for sc.Scan() && len(history) < limit { for sc.Scan() && len(history) < limit {
msg, t, err := parseMessage(sc.Text(), entity, ref) msg, t, err := parseMessage(sc.Text(), entity, ref, events)
if err != nil { if err != nil {
return nil, err return nil, err
} else if msg == nil || !t.After(ref) { } else if msg == nil || !t.After(ref) {
@ -385,14 +476,14 @@ func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, re
return history, nil return history, nil
} }
func (ms *fsMessageStore) LoadBeforeTime(network *network, entity string, start time.Time, end time.Time, limit int) ([]*irc.Message, error) { func (ms *fsMessageStore) LoadBeforeTime(network *network, entity string, start time.Time, end time.Time, limit int, events bool) ([]*irc.Message, error) {
start = start.In(time.Local) start = start.In(time.Local)
end = end.In(time.Local) end = end.In(time.Local)
history := make([]*irc.Message, limit) history := make([]*irc.Message, limit)
remaining := limit remaining := limit
tries := 0 tries := 0
for remaining > 0 && tries < fsMessageStoreMaxTries && end.Before(start) { for remaining > 0 && tries < fsMessageStoreMaxTries && end.Before(start) {
buf, err := ms.parseMessagesBefore(network, entity, start, end, remaining, -1) buf, err := ms.parseMessagesBefore(network, entity, start, end, events, remaining, -1)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -410,14 +501,14 @@ func (ms *fsMessageStore) LoadBeforeTime(network *network, entity string, start
return history[remaining:], nil return history[remaining:], nil
} }
func (ms *fsMessageStore) LoadAfterTime(network *network, entity string, start time.Time, end time.Time, limit int) ([]*irc.Message, error) { func (ms *fsMessageStore) LoadAfterTime(network *network, entity string, start time.Time, end time.Time, limit int, events bool) ([]*irc.Message, error) {
start = start.In(time.Local) start = start.In(time.Local)
end = end.In(time.Local) end = end.In(time.Local)
var history []*irc.Message var history []*irc.Message
remaining := limit remaining := limit
tries := 0 tries := 0
for remaining > 0 && tries < fsMessageStoreMaxTries && start.Before(end) { for remaining > 0 && tries < fsMessageStoreMaxTries && start.Before(end) {
buf, err := ms.parseMessagesAfter(network, entity, start, end, remaining) buf, err := ms.parseMessagesAfter(network, entity, start, end, events, remaining)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -460,7 +551,7 @@ func (ms *fsMessageStore) LoadLatestID(network *network, entity, id string, limi
offset = afterOffset offset = afterOffset
} }
buf, err := ms.parseMessagesBefore(network, entity, t, time.Time{}, remaining, offset) buf, err := ms.parseMessagesBefore(network, entity, t, time.Time{}, false, remaining, offset)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -478,7 +569,7 @@ func (ms *fsMessageStore) LoadLatestID(network *network, entity, id string, limi
return history[remaining:], nil return history[remaining:], nil
} }
func (ms *fsMessageStore) ListTargets(network *network, start, end time.Time, limit int) ([]chatHistoryTarget, error) { func (ms *fsMessageStore) ListTargets(network *network, start, end time.Time, limit int, events bool) ([]chatHistoryTarget, error) {
start = start.In(time.Local) start = start.In(time.Local)
end = end.In(time.Local) end = end.In(time.Local)
rootPath := filepath.Join(ms.root, escapeFilename(network.GetName())) rootPath := filepath.Join(ms.root, escapeFilename(network.GetName()))

View File

@ -74,6 +74,12 @@ func (ms *memoryMessageStore) LastMsgID(network *network, entity string, t time.
} }
func (ms *memoryMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) { func (ms *memoryMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
switch msg.Command {
case "PRIVMSG", "NOTICE":
default:
return "", nil
}
k := ringBufferKey{networkID: network.ID, entity: entity} k := ringBufferKey{networkID: network.ID, entity: entity}
rb, ok := ms.buffers[k] rb, ok := ms.buffers[k]
if !ok { if !ok {