172 lines
3.8 KiB
Go
172 lines
3.8 KiB
Go
package msgstore
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.sr.ht/~sircmpwn/go-bare"
|
|
"gopkg.in/irc.v4"
|
|
|
|
"git.sr.ht/~emersion/soju/database"
|
|
)
|
|
|
|
const messageRingBufferCap = 4096
|
|
|
|
type memoryMsgID struct {
|
|
Seq bare.Uint
|
|
}
|
|
|
|
func (memoryMsgID) msgIDType() msgIDType {
|
|
return msgIDMemory
|
|
}
|
|
|
|
func parseMemoryMsgID(s string) (netID int64, entity string, seq uint64, err error) {
|
|
var id memoryMsgID
|
|
netID, entity, err = ParseMsgID(s, &id)
|
|
if err != nil {
|
|
return 0, "", 0, err
|
|
}
|
|
return netID, entity, uint64(id.Seq), nil
|
|
}
|
|
|
|
func formatMemoryMsgID(netID int64, entity string, seq uint64) string {
|
|
id := memoryMsgID{bare.Uint(seq)}
|
|
return formatMsgID(netID, entity, &id)
|
|
}
|
|
|
|
type ringBufferKey struct {
|
|
networkID int64
|
|
entity string
|
|
}
|
|
|
|
func IsMemoryStore(store Store) bool {
|
|
_, ok := store.(*memoryMessageStore)
|
|
return ok
|
|
}
|
|
|
|
type memoryMessageStore struct {
|
|
buffers map[ringBufferKey]*messageRingBuffer
|
|
}
|
|
|
|
var _ Store = (*memoryMessageStore)(nil)
|
|
|
|
func NewMemoryStore() *memoryMessageStore {
|
|
return &memoryMessageStore{
|
|
buffers: make(map[ringBufferKey]*messageRingBuffer),
|
|
}
|
|
}
|
|
|
|
func (ms *memoryMessageStore) Close() error {
|
|
ms.buffers = nil
|
|
return nil
|
|
}
|
|
|
|
func (ms *memoryMessageStore) get(network *database.Network, entity string) *messageRingBuffer {
|
|
k := ringBufferKey{networkID: network.ID, entity: entity}
|
|
if rb, ok := ms.buffers[k]; ok {
|
|
return rb
|
|
}
|
|
rb := newMessageRingBuffer(messageRingBufferCap)
|
|
ms.buffers[k] = rb
|
|
return rb
|
|
}
|
|
|
|
func (ms *memoryMessageStore) LastMsgID(network *database.Network, entity string, t time.Time) (string, error) {
|
|
var seq uint64
|
|
k := ringBufferKey{networkID: network.ID, entity: entity}
|
|
if rb, ok := ms.buffers[k]; ok {
|
|
seq = rb.cur
|
|
}
|
|
return formatMemoryMsgID(network.ID, entity, seq), nil
|
|
}
|
|
|
|
func (ms *memoryMessageStore) Append(network *database.Network, entity string, msg *irc.Message) (string, error) {
|
|
switch msg.Command {
|
|
case "PRIVMSG", "NOTICE":
|
|
// Only append these messages, because LoadLatestID shouldn't return
|
|
// other kinds of message.
|
|
default:
|
|
return "", nil
|
|
}
|
|
|
|
k := ringBufferKey{networkID: network.ID, entity: entity}
|
|
rb, ok := ms.buffers[k]
|
|
if !ok {
|
|
rb = newMessageRingBuffer(messageRingBufferCap)
|
|
ms.buffers[k] = rb
|
|
}
|
|
|
|
seq := rb.Append(msg)
|
|
return formatMemoryMsgID(network.ID, entity, seq), nil
|
|
}
|
|
|
|
func (ms *memoryMessageStore) LoadLatestID(ctx context.Context, id string, options *LoadMessageOptions) ([]*irc.Message, error) {
|
|
if options.Events {
|
|
return nil, fmt.Errorf("events are unsupported for memory message store")
|
|
}
|
|
|
|
_, _, seq, err := parseMemoryMsgID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
k := ringBufferKey{networkID: options.Network.ID, entity: options.Entity}
|
|
rb, ok := ms.buffers[k]
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
|
|
return rb.LoadLatestSeq(seq, options.Limit)
|
|
}
|
|
|
|
type messageRingBuffer struct {
|
|
buf []*irc.Message
|
|
cur uint64
|
|
}
|
|
|
|
func newMessageRingBuffer(capacity int) *messageRingBuffer {
|
|
return &messageRingBuffer{
|
|
buf: make([]*irc.Message, capacity),
|
|
cur: 1,
|
|
}
|
|
}
|
|
|
|
func (rb *messageRingBuffer) cap() uint64 {
|
|
return uint64(len(rb.buf))
|
|
}
|
|
|
|
func (rb *messageRingBuffer) Append(msg *irc.Message) uint64 {
|
|
seq := rb.cur
|
|
i := int(seq % rb.cap())
|
|
rb.buf[i] = msg
|
|
rb.cur++
|
|
return seq
|
|
}
|
|
|
|
func (rb *messageRingBuffer) LoadLatestSeq(seq uint64, limit int) ([]*irc.Message, error) {
|
|
if seq > rb.cur {
|
|
return nil, fmt.Errorf("loading messages from sequence number (%v) greater than current (%v)", seq, rb.cur)
|
|
} else if seq == rb.cur {
|
|
return nil, nil
|
|
}
|
|
|
|
// The query excludes the message with the sequence number seq
|
|
diff := rb.cur - seq - 1
|
|
if diff > rb.cap() {
|
|
// We dropped diff - cap entries
|
|
diff = rb.cap()
|
|
}
|
|
if int(diff) > limit {
|
|
diff = uint64(limit)
|
|
}
|
|
|
|
l := make([]*irc.Message, int(diff))
|
|
for i := 0; i < int(diff); i++ {
|
|
j := int((rb.cur - diff + uint64(i)) % rb.cap())
|
|
l[i] = rb.buf[j]
|
|
}
|
|
|
|
return l, nil
|
|
}
|