Add in-memory message store
Uses an in-memory ring buffer. Closes: https://todo.sr.ht/~emersion/soju/96
This commit is contained in:
parent
ac3431ef76
commit
1110f39227
144
msgstore_memory.go
Normal file
144
msgstore_memory.go
Normal file
@ -0,0 +1,144 @@
|
||||
package soju
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"gopkg.in/irc.v3"
|
||||
)
|
||||
|
||||
const messageRingBufferCap = 4096
|
||||
|
||||
func parseMemoryMsgID(s string) (netID int64, entity string, seq uint64, err error) {
|
||||
netID, entity, extra, err := parseMsgID(s)
|
||||
if err != nil {
|
||||
return 0, "", 0, err
|
||||
}
|
||||
seq, err = strconv.ParseUint(extra, 10, 64)
|
||||
if err != nil {
|
||||
return 0, "", 0, fmt.Errorf("failed to parse message ID %q: %v", s, err)
|
||||
}
|
||||
return netID, entity, seq, nil
|
||||
}
|
||||
|
||||
func formatMemoryMsgID(netID int64, entity string, seq uint64) string {
|
||||
extra := strconv.FormatUint(seq, 10)
|
||||
return formatMsgID(netID, entity, extra)
|
||||
}
|
||||
|
||||
type ringBufferKey struct {
|
||||
networkID int64
|
||||
entity string
|
||||
}
|
||||
|
||||
type memoryMessageStore struct {
|
||||
buffers map[ringBufferKey]*messageRingBuffer
|
||||
}
|
||||
|
||||
func newMemoryMessageStore() *memoryMessageStore {
|
||||
return &memoryMessageStore{
|
||||
buffers: make(map[ringBufferKey]*messageRingBuffer),
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *memoryMessageStore) Close() error {
|
||||
ms.buffers = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *memoryMessageStore) get(network *network, entity string) *messageRingBuffer {
|
||||
k := ringBufferKey{networkID: network.ID, entity: entity}
|
||||
if rb, ok := ms.buffers[k]; ok {
|
||||
return rb
|
||||
}
|
||||
rb := newMessageRingBuffer(messageRingBufferCap)
|
||||
ms.buffers[k] = rb
|
||||
return rb
|
||||
}
|
||||
|
||||
func (ms *memoryMessageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {
|
||||
var seq uint64
|
||||
k := ringBufferKey{networkID: network.ID, entity: entity}
|
||||
if rb, ok := ms.buffers[k]; ok {
|
||||
seq = rb.cur
|
||||
}
|
||||
return formatMemoryMsgID(network.ID, entity, seq), nil
|
||||
}
|
||||
|
||||
func (ms *memoryMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
|
||||
k := ringBufferKey{networkID: network.ID, entity: entity}
|
||||
rb, ok := ms.buffers[k]
|
||||
if !ok {
|
||||
rb = newMessageRingBuffer(messageRingBufferCap)
|
||||
ms.buffers[k] = rb
|
||||
}
|
||||
|
||||
seq := rb.Append(msg)
|
||||
return formatMemoryMsgID(network.ID, entity, seq), nil
|
||||
}
|
||||
|
||||
func (ms *memoryMessageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
|
||||
_, _, seq, err := parseMemoryMsgID(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
k := ringBufferKey{networkID: network.ID, entity: entity}
|
||||
rb, ok := ms.buffers[k]
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return rb.LoadLatestSeq(seq, limit)
|
||||
}
|
||||
|
||||
type messageRingBuffer struct {
|
||||
buf []*irc.Message
|
||||
cur uint64
|
||||
}
|
||||
|
||||
func newMessageRingBuffer(capacity int) *messageRingBuffer {
|
||||
return &messageRingBuffer{
|
||||
buf: make([]*irc.Message, capacity),
|
||||
cur: 1,
|
||||
}
|
||||
}
|
||||
|
||||
func (rb *messageRingBuffer) cap() uint64 {
|
||||
return uint64(len(rb.buf))
|
||||
}
|
||||
|
||||
func (rb *messageRingBuffer) Append(msg *irc.Message) uint64 {
|
||||
seq := rb.cur
|
||||
i := int(seq % rb.cap())
|
||||
rb.buf[i] = msg
|
||||
rb.cur++
|
||||
return seq
|
||||
}
|
||||
|
||||
func (rb *messageRingBuffer) LoadLatestSeq(seq uint64, limit int) ([]*irc.Message, error) {
|
||||
if seq > rb.cur {
|
||||
return nil, fmt.Errorf("loading messages from sequence number (%v) greater than current (%v)", seq, rb.cur)
|
||||
} else if seq == rb.cur {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// The query excludes the message with the sequence number seq
|
||||
diff := rb.cur - seq - 1
|
||||
if diff > rb.cap() {
|
||||
// We dropped diff - cap entries
|
||||
diff = rb.cap()
|
||||
}
|
||||
if int(diff) > limit {
|
||||
diff = uint64(limit)
|
||||
}
|
||||
|
||||
l := make([]*irc.Message, int(diff))
|
||||
for i := 0; i < int(diff); i++ {
|
||||
j := int((rb.cur - diff + uint64(i)) % rb.cap())
|
||||
l[i] = rb.buf[j]
|
||||
}
|
||||
|
||||
return l, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user