From 70d811f5a84199fe50a41df11a530dfb0dd98de5 Mon Sep 17 00:00:00 2001 From: Simon Ser Date: Mon, 17 Feb 2020 16:09:35 +0100 Subject: [PATCH] Document Ring --- downstream.go | 2 +- ring.go | 22 +++++++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/downstream.go b/downstream.go index e77c083..1f91f55 100644 --- a/downstream.go +++ b/downstream.go @@ -287,7 +287,7 @@ func (dc *downstreamConn) register() error { } } - consumer, ch := uc.ring.Consumer(seqPtr) + consumer, ch := uc.ring.NewConsumer(seqPtr) go func() { for { var closed bool diff --git a/ring.go b/ring.go index f7d3d0d..70c6c57 100644 --- a/ring.go +++ b/ring.go @@ -17,6 +17,7 @@ type Ring struct { consumers []*RingConsumer } +// NewRing creates a new ring buffer. func NewRing(capacity int) *Ring { return &Ring{ buffer: make([]*irc.Message, capacity), @@ -24,6 +25,7 @@ func NewRing(capacity int) *Ring { } } +// Produce appends a new message to the ring buffer. func (r *Ring) Produce(msg *irc.Message) { r.lock.Lock() defer r.lock.Unlock() @@ -42,7 +44,17 @@ func (r *Ring) Produce(msg *irc.Message) { } } -func (r *Ring) Consumer(seq *uint64) (*RingConsumer, <-chan struct{}) { +// NewConsumer creates a new ring buffer consumer. +// +// If seq is nil, the consumer will get messages starting from the last +// producer message. If seq is non-nil, the consumer will get messages starting +// from the specified history sequence number (see RingConsumer.Close). +// +// The returned channel yields a value each time the consumer has a new message +// available. Consume should be called to drain the consumer. +// +// The consumer can only be used from a single goroutine. +func (r *Ring) NewConsumer(seq *uint64) (*RingConsumer, <-chan struct{}) { consumer := &RingConsumer{ ring: r, ch: make(chan struct{}, 1), @@ -63,6 +75,7 @@ func (r *Ring) Consumer(seq *uint64) (*RingConsumer, <-chan struct{}) { return consumer, consumer.ch } +// RingConsumer is a ring buffer consumer. type RingConsumer struct { ring *Ring cur uint64 @@ -78,6 +91,8 @@ func (rc *RingConsumer) diff() uint64 { return rc.ring.cur - rc.cur } +// Peek returns the next pending message if any without consuming it. A nil +// message is returned if no message is available. func (rc *RingConsumer) Peek() *irc.Message { if rc.closed { panic("jounce: RingConsumer.Peek called after Close") @@ -102,6 +117,8 @@ func (rc *RingConsumer) Peek() *irc.Message { return msg } +// Consume consumes and returns the next pending message. A nil message is +// returned if no message is available. func (rc *RingConsumer) Consume() *irc.Message { msg := rc.Peek() if msg != nil { @@ -110,6 +127,9 @@ func (rc *RingConsumer) Consume() *irc.Message { return msg } +// Close stops consuming messages. The consumer channel will be closed. The +// current history sequence number is returned. It can be provided later as an +// argument to Ring.NewConsumer to resume the message stream. func (rc *RingConsumer) Close() uint64 { rc.ring.lock.Lock() for i := range rc.ring.consumers {