Add names to consumers
This commit is contained in:
parent
fad9d820c1
commit
7127fa325a
@ -128,6 +128,12 @@ func (c *downstreamConn) Close() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
u.lock.Unlock()
|
u.lock.Unlock()
|
||||||
|
|
||||||
|
// TODO: figure out a better way to advance the ring buffer consumer cursor
|
||||||
|
u.forEachUpstream(func(uc *upstreamConn) {
|
||||||
|
// TODO: let clients specify the ring buffer name in their username
|
||||||
|
uc.ring.Consumer("").Reset()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
close(c.messages)
|
close(c.messages)
|
||||||
@ -232,8 +238,10 @@ func (c *downstreamConn) register() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer := uc.ring.Consumer()
|
// TODO: let clients specify the ring buffer name in their username
|
||||||
|
consumer := uc.ring.Consumer("")
|
||||||
for {
|
for {
|
||||||
|
// TODO: these messages will get lost if the connection is closed
|
||||||
msg := consumer.Consume()
|
msg := consumer.Consume()
|
||||||
if msg == nil {
|
if msg == nil {
|
||||||
break
|
break
|
||||||
|
22
ring.go
22
ring.go
@ -10,13 +10,14 @@ type Ring struct {
|
|||||||
buffer []*irc.Message
|
buffer []*irc.Message
|
||||||
cap, cur uint64
|
cap, cur uint64
|
||||||
|
|
||||||
consumers []RingConsumer
|
consumers map[string]*RingConsumer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRing(capacity int) *Ring {
|
func NewRing(capacity int) *Ring {
|
||||||
return &Ring{
|
return &Ring{
|
||||||
buffer: make([]*irc.Message, capacity),
|
buffer: make([]*irc.Message, capacity),
|
||||||
cap: uint64(capacity),
|
cap: uint64(capacity),
|
||||||
|
consumers: make(map[string]*RingConsumer),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -26,11 +27,18 @@ func (r *Ring) Produce(msg *irc.Message) {
|
|||||||
r.cur++
|
r.cur++
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Ring) Consumer() *RingConsumer {
|
func (r *Ring) Consumer(name string) *RingConsumer {
|
||||||
return &RingConsumer{
|
consumer, ok := r.consumers[name]
|
||||||
ring: r,
|
if ok {
|
||||||
cur: 0, // r.cur
|
return consumer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
consumer = &RingConsumer{
|
||||||
|
ring: r,
|
||||||
|
cur: r.cur,
|
||||||
|
}
|
||||||
|
r.consumers[name] = consumer
|
||||||
|
return consumer
|
||||||
}
|
}
|
||||||
|
|
||||||
type RingConsumer struct {
|
type RingConsumer struct {
|
||||||
@ -69,3 +77,7 @@ func (rc *RingConsumer) Consume() *irc.Message {
|
|||||||
}
|
}
|
||||||
return msg
|
return msg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rc *RingConsumer) Reset() {
|
||||||
|
rc.cur = rc.ring.cur
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user