From 7127fa325a5c4241ac38b547ebaaffc61f1609c5 Mon Sep 17 00:00:00 2001 From: Simon Ser Date: Fri, 7 Feb 2020 17:15:50 +0100 Subject: [PATCH] Add names to consumers --- downstream.go | 10 +++++++++- ring.go | 26 +++++++++++++++++++------- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/downstream.go b/downstream.go index 342e022..7a67b1f 100644 --- a/downstream.go +++ b/downstream.go @@ -128,6 +128,12 @@ func (c *downstreamConn) Close() error { } } u.lock.Unlock() + + // TODO: figure out a better way to advance the ring buffer consumer cursor + u.forEachUpstream(func(uc *upstreamConn) { + // TODO: let clients specify the ring buffer name in their username + uc.ring.Consumer("").Reset() + }) } close(c.messages) @@ -232,8 +238,10 @@ func (c *downstreamConn) register() error { } } - consumer := uc.ring.Consumer() + // TODO: let clients specify the ring buffer name in their username + consumer := uc.ring.Consumer("") for { + // TODO: these messages will get lost if the connection is closed msg := consumer.Consume() if msg == nil { break diff --git a/ring.go b/ring.go index e6a44f8..5d7086c 100644 --- a/ring.go +++ b/ring.go @@ -10,13 +10,14 @@ type Ring struct { buffer []*irc.Message cap, cur uint64 - consumers []RingConsumer + consumers map[string]*RingConsumer } func NewRing(capacity int) *Ring { return &Ring{ - buffer: make([]*irc.Message, capacity), - cap: uint64(capacity), + buffer: make([]*irc.Message, capacity), + cap: uint64(capacity), + consumers: make(map[string]*RingConsumer), } } @@ -26,11 +27,18 @@ func (r *Ring) Produce(msg *irc.Message) { r.cur++ } -func (r *Ring) Consumer() *RingConsumer { - return &RingConsumer{ - ring: r, - cur: 0, // r.cur +func (r *Ring) Consumer(name string) *RingConsumer { + consumer, ok := r.consumers[name] + if ok { + return consumer } + + consumer = &RingConsumer{ + ring: r, + cur: r.cur, + } + r.consumers[name] = consumer + return consumer } type RingConsumer struct { @@ -69,3 +77,7 @@ func (rc *RingConsumer) Consume() *irc.Message { } return msg } + +func (rc *RingConsumer) Reset() { + rc.cur = rc.ring.cur +}