soju/msgstore/memory.go

172 lines
3.8 KiB
Go

package msgstore
import (
"context"
"fmt"
"time"
"git.sr.ht/~sircmpwn/go-bare"
"gopkg.in/irc.v4"
"git.sr.ht/~emersion/soju/database"
)
const messageRingBufferCap = 4096
type memoryMsgID struct {
Seq bare.Uint
}
func (memoryMsgID) msgIDType() msgIDType {
return msgIDMemory
}
func parseMemoryMsgID(s string) (netID int64, entity string, seq uint64, err error) {
var id memoryMsgID
netID, entity, err = ParseMsgID(s, &id)
if err != nil {
return 0, "", 0, err
}
return netID, entity, uint64(id.Seq), nil
}
func formatMemoryMsgID(netID int64, entity string, seq uint64) string {
id := memoryMsgID{bare.Uint(seq)}
return formatMsgID(netID, entity, &id)
}
type ringBufferKey struct {
networkID int64
entity string
}
func IsMemoryStore(store Store) bool {
_, ok := store.(*memoryMessageStore)
return ok
}
type memoryMessageStore struct {
buffers map[ringBufferKey]*messageRingBuffer
}
var _ Store = (*memoryMessageStore)(nil)
func NewMemoryStore() *memoryMessageStore {
return &memoryMessageStore{
buffers: make(map[ringBufferKey]*messageRingBuffer),
}
}
func (ms *memoryMessageStore) Close() error {
ms.buffers = nil
return nil
}
func (ms *memoryMessageStore) get(network *database.Network, entity string) *messageRingBuffer {
k := ringBufferKey{networkID: network.ID, entity: entity}
if rb, ok := ms.buffers[k]; ok {
return rb
}
rb := newMessageRingBuffer(messageRingBufferCap)
ms.buffers[k] = rb
return rb
}
func (ms *memoryMessageStore) LastMsgID(network *database.Network, entity string, t time.Time) (string, error) {
var seq uint64
k := ringBufferKey{networkID: network.ID, entity: entity}
if rb, ok := ms.buffers[k]; ok {
seq = rb.cur
}
return formatMemoryMsgID(network.ID, entity, seq), nil
}
func (ms *memoryMessageStore) Append(network *database.Network, entity string, msg *irc.Message) (string, error) {
switch msg.Command {
case "PRIVMSG", "NOTICE":
// Only append these messages, because LoadLatestID shouldn't return
// other kinds of message.
default:
return "", nil
}
k := ringBufferKey{networkID: network.ID, entity: entity}
rb, ok := ms.buffers[k]
if !ok {
rb = newMessageRingBuffer(messageRingBufferCap)
ms.buffers[k] = rb
}
seq := rb.Append(msg)
return formatMemoryMsgID(network.ID, entity, seq), nil
}
func (ms *memoryMessageStore) LoadLatestID(ctx context.Context, id string, options *LoadMessageOptions) ([]*irc.Message, error) {
if options.Events {
return nil, fmt.Errorf("events are unsupported for memory message store")
}
_, _, seq, err := parseMemoryMsgID(id)
if err != nil {
return nil, err
}
k := ringBufferKey{networkID: options.Network.ID, entity: options.Entity}
rb, ok := ms.buffers[k]
if !ok {
return nil, nil
}
return rb.LoadLatestSeq(seq, options.Limit)
}
type messageRingBuffer struct {
buf []*irc.Message
cur uint64
}
func newMessageRingBuffer(capacity int) *messageRingBuffer {
return &messageRingBuffer{
buf: make([]*irc.Message, capacity),
cur: 1,
}
}
func (rb *messageRingBuffer) cap() uint64 {
return uint64(len(rb.buf))
}
func (rb *messageRingBuffer) Append(msg *irc.Message) uint64 {
seq := rb.cur
i := int(seq % rb.cap())
rb.buf[i] = msg
rb.cur++
return seq
}
func (rb *messageRingBuffer) LoadLatestSeq(seq uint64, limit int) ([]*irc.Message, error) {
if seq > rb.cur {
return nil, fmt.Errorf("loading messages from sequence number (%v) greater than current (%v)", seq, rb.cur)
} else if seq == rb.cur {
return nil, nil
}
// The query excludes the message with the sequence number seq
diff := rb.cur - seq - 1
if diff > rb.cap() {
// We dropped diff - cap entries
diff = rb.cap()
}
if int(diff) > limit {
diff = uint64(limit)
}
l := make([]*irc.Message, int(diff))
for i := 0; i < int(diff); i++ {
j := int((rb.cur - diff + uint64(i)) % rb.cap())
l[i] = rb.buf[j]
}
return l, nil
}