diff --git a/downstream.go b/downstream.go index 45c4922..342e022 100644 --- a/downstream.go +++ b/downstream.go @@ -231,7 +231,17 @@ func (c *downstreamConn) register() error { forwardChannel(c, ch) } } + + consumer := uc.ring.Consumer() + for { + msg := consumer.Consume() + if msg == nil { + break + } + c.messages <- msg + } }) + return nil } diff --git a/ring.go b/ring.go new file mode 100644 index 0000000..e6a44f8 --- /dev/null +++ b/ring.go @@ -0,0 +1,71 @@ +package jounce + +import ( + "gopkg.in/irc.v3" +) + +// Ring implements a single producer, multiple consumer ring buffer. The ring +// buffer size is fixed. The ring buffer is stored in memory. +type Ring struct { + buffer []*irc.Message + cap, cur uint64 + + consumers []RingConsumer +} + +func NewRing(capacity int) *Ring { + return &Ring{ + buffer: make([]*irc.Message, capacity), + cap: uint64(capacity), + } +} + +func (r *Ring) Produce(msg *irc.Message) { + i := int(r.cur % r.cap) + r.buffer[i] = msg + r.cur++ +} + +func (r *Ring) Consumer() *RingConsumer { + return &RingConsumer{ + ring: r, + cur: 0, // r.cur + } +} + +type RingConsumer struct { + ring *Ring + cur uint64 +} + +func (rc *RingConsumer) Diff() uint64 { + if rc.cur > rc.ring.cur { + panic("jounce: consumer cursor greater than producer cursor") + } + return rc.ring.cur - rc.cur +} + +func (rc *RingConsumer) Peek() *irc.Message { + diff := rc.Diff() + if diff == 0 { + return nil + } + if diff > rc.ring.cap { + // Consumer drops diff - cap entries + rc.cur = rc.ring.cur - rc.ring.cap + } + i := int(rc.cur % rc.ring.cap) + msg := rc.ring.buffer[i] + if msg == nil { + panic("jounce: unexpected nil ring buffer entry") + } + return msg +} + +func (rc *RingConsumer) Consume() *irc.Message { + msg := rc.Peek() + if msg != nil { + rc.cur++ + } + return msg +} diff --git a/server.go b/server.go index dae434e..e88a5e9 100644 --- a/server.go +++ b/server.go @@ -94,6 +94,7 @@ type Upstream struct { type Server struct { Hostname string Logger Logger + RingCap int Upstreams []Upstream // TODO: per-user lock sync.Mutex @@ -103,8 +104,9 @@ type Server struct { func NewServer() *Server { return &Server{ - Logger: log.New(log.Writer(), "", log.LstdFlags), - users: make(map[string]*user), + Logger: log.New(log.Writer(), "", log.LstdFlags), + RingCap: 4096, + users: make(map[string]*user), } } diff --git a/upstream.go b/upstream.go index 5c17b1a..2f38e15 100644 --- a/upstream.go +++ b/upstream.go @@ -32,6 +32,7 @@ type upstreamConn struct { srv *Server user *user messages chan<- *irc.Message + ring *Ring serverName string availableUserModes string @@ -63,6 +64,7 @@ func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) { srv: u.srv, user: u, messages: msgs, + ring: NewRing(u.srv.RingCap), channels: make(map[string]*upstreamChannel), } @@ -302,6 +304,7 @@ func (c *upstreamConn) handleMessage(msg *irc.Message) error { forwardChannel(dc, ch) }) case "PRIVMSG": + c.ring.Produce(msg) c.user.forEachDownstream(func(dc *downstreamConn) { dc.messages <- msg })