Fix issues related to Ring

- RingConsumer is now used directly in the goroutine responsible for
  writing downstream messages. This allows the ring buffer not to be
  consumed on write error.
- RingConsumer now has a channel attached. This allows PRIVMSG messages
  to always use RingConsumer, instead of also directly pushing messages
  to all downstream connections.
- Multiple clients with the same history name are now supported.
- Ring is now protected by a mutex
This commit is contained in:
Simon Ser 2020-02-17 15:46:29 +01:00
parent 2a43dbd71a
commit 9a93c56cdf
No known key found for this signature in database
GPG Key ID: 0FDE7BE0E88F5E48
4 changed files with 159 additions and 56 deletions

View File

@ -40,15 +40,16 @@ func (err ircError) Error() string {
} }
type downstreamConn struct { type downstreamConn struct {
net net.Conn net net.Conn
irc *irc.Conn irc *irc.Conn
srv *Server srv *Server
logger Logger logger Logger
messages chan *irc.Message messages chan *irc.Message
consumers chan *RingConsumer
closed chan struct{}
registered bool registered bool
user *user user *user
closed bool
nick string nick string
username string username string
realname string realname string
@ -56,11 +57,13 @@ type downstreamConn struct {
func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn { func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
dc := &downstreamConn{ dc := &downstreamConn{
net: netConn, net: netConn,
irc: irc.NewConn(netConn), irc: irc.NewConn(netConn),
srv: srv, srv: srv,
logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())}, logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
messages: make(chan *irc.Message, 64), messages: make(chan *irc.Message, 64),
consumers: make(chan *RingConsumer),
closed: make(chan struct{}),
} }
go func() { go func() {
@ -85,6 +88,15 @@ func (dc *downstreamConn) prefix() *irc.Prefix {
} }
} }
func (dc *downstreamConn) isClosed() bool {
select {
case <-dc.closed:
return true
default:
return false
}
}
func (dc *downstreamConn) readMessages() error { func (dc *downstreamConn) readMessages() error {
dc.logger.Printf("new connection") dc.logger.Printf("new connection")
@ -104,7 +116,7 @@ func (dc *downstreamConn) readMessages() error {
return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err) return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
} }
if dc.closed { if dc.isClosed() {
return nil return nil
} }
} }
@ -113,16 +125,39 @@ func (dc *downstreamConn) readMessages() error {
} }
func (dc *downstreamConn) writeMessages() error { func (dc *downstreamConn) writeMessages() error {
for msg := range dc.messages { for {
if err := dc.irc.WriteMessage(msg); err != nil { var err error
var closed bool
select {
case msg := <-dc.messages:
err = dc.irc.WriteMessage(msg)
case consumer := <-dc.consumers:
for {
msg := consumer.Peek()
if msg == nil {
break
}
err = dc.irc.WriteMessage(msg)
if err != nil {
break
}
consumer.Consume()
}
case <-dc.closed:
closed = true
}
if err != nil {
return err return err
} }
if closed {
break
}
} }
return nil return nil
} }
func (dc *downstreamConn) Close() error { func (dc *downstreamConn) Close() error {
if dc.closed { if dc.isClosed() {
return fmt.Errorf("downstream connection already closed") return fmt.Errorf("downstream connection already closed")
} }
@ -134,17 +169,9 @@ func (dc *downstreamConn) Close() error {
} }
} }
u.lock.Unlock() u.lock.Unlock()
// TODO: figure out a better way to advance the ring buffer consumer cursor
u.forEachUpstream(func(uc *upstreamConn) {
// TODO: let clients specify the ring buffer name in their username
uc.ring.Consumer("").Reset()
})
} }
close(dc.messages) close(dc.closed)
dc.closed = true
return nil return nil
} }
@ -211,6 +238,7 @@ func (dc *downstreamConn) register() error {
dc.user = u dc.user = u
u.lock.Lock() u.lock.Lock()
firstDownstream := len(u.downstreamConns) == 0
u.downstreamConns = append(u.downstreamConns, dc) u.downstreamConns = append(u.downstreamConns, dc)
u.lock.Unlock() u.lock.Unlock()
@ -249,15 +277,41 @@ func (dc *downstreamConn) register() error {
} }
// TODO: let clients specify the ring buffer name in their username // TODO: let clients specify the ring buffer name in their username
consumer := uc.ring.Consumer("") historyName := ""
for {
// TODO: these messages will get lost if the connection is closed var seqPtr *uint64
msg := consumer.Consume() if firstDownstream {
if msg == nil { seq, ok := uc.history[historyName]
break if ok {
seqPtr = &seq
} }
dc.SendMessage(msg)
} }
consumer, ch := uc.ring.Consumer(seqPtr)
go func() {
for {
var closed bool
select {
case <-ch:
dc.consumers <- consumer
case <-dc.closed:
closed = true
}
if closed {
break
}
}
seq := consumer.Close()
dc.user.lock.Lock()
lastDownstream := len(dc.user.downstreamConns) == 0
dc.user.lock.Unlock()
if lastDownstream {
uc.history[historyName] = seq
}
}()
}) })
return nil return nil

85
ring.go
View File

@ -1,52 +1,77 @@
package jounce package jounce
import ( import (
"sync"
"gopkg.in/irc.v3" "gopkg.in/irc.v3"
) )
// Ring implements a single producer, multiple consumer ring buffer. The ring // Ring implements a single producer, multiple consumer ring buffer. The ring
// buffer size is fixed. The ring buffer is stored in memory. // buffer size is fixed. The ring buffer is stored in memory.
type Ring struct { type Ring struct {
buffer []*irc.Message buffer []*irc.Message
cap, cur uint64 cap uint64
consumers map[string]*RingConsumer lock sync.Mutex
cur uint64
consumers []*RingConsumer
} }
func NewRing(capacity int) *Ring { func NewRing(capacity int) *Ring {
return &Ring{ return &Ring{
buffer: make([]*irc.Message, capacity), buffer: make([]*irc.Message, capacity),
cap: uint64(capacity), cap: uint64(capacity),
consumers: make(map[string]*RingConsumer),
} }
} }
func (r *Ring) Produce(msg *irc.Message) { func (r *Ring) Produce(msg *irc.Message) {
r.lock.Lock()
defer r.lock.Unlock()
i := int(r.cur % r.cap) i := int(r.cur % r.cap)
r.buffer[i] = msg r.buffer[i] = msg
r.cur++ r.cur++
for _, consumer := range r.consumers {
select {
case consumer.ch <- struct{}{}:
// This space is intentionally left blank
default:
// The channel already has a pending item
}
}
} }
func (r *Ring) Consumer(name string) *RingConsumer { func (r *Ring) Consumer(seq *uint64) (*RingConsumer, <-chan struct{}) {
consumer, ok := r.consumers[name] consumer := &RingConsumer{
if ok { ring: r,
return consumer ch: make(chan struct{}, 1),
} }
consumer = &RingConsumer{ r.lock.Lock()
ring: r, if seq != nil {
cur: r.cur, consumer.cur = *seq
} else {
consumer.cur = r.cur
} }
r.consumers[name] = consumer if consumer.diff() > 0 {
return consumer consumer.ch <- struct{}{}
}
r.consumers = append(r.consumers, consumer)
r.lock.Unlock()
return consumer, consumer.ch
} }
type RingConsumer struct { type RingConsumer struct {
ring *Ring ring *Ring
cur uint64 cur uint64
ch chan struct{}
closed bool
} }
func (rc *RingConsumer) Diff() uint64 { // diff returns the number of pending messages. It assumes the Ring is locked.
func (rc *RingConsumer) diff() uint64 {
if rc.cur > rc.ring.cur { if rc.cur > rc.ring.cur {
panic("jounce: consumer cursor greater than producer cursor") panic("jounce: consumer cursor greater than producer cursor")
} }
@ -54,7 +79,14 @@ func (rc *RingConsumer) Diff() uint64 {
} }
func (rc *RingConsumer) Peek() *irc.Message { func (rc *RingConsumer) Peek() *irc.Message {
diff := rc.Diff() if rc.closed {
panic("jounce: RingConsumer.Peek called after Close")
}
rc.ring.lock.Lock()
defer rc.ring.lock.Unlock()
diff := rc.diff()
if diff == 0 { if diff == 0 {
return nil return nil
} }
@ -78,6 +110,17 @@ func (rc *RingConsumer) Consume() *irc.Message {
return msg return msg
} }
func (rc *RingConsumer) Reset() { func (rc *RingConsumer) Close() uint64 {
rc.cur = rc.ring.cur rc.ring.lock.Lock()
for i := range rc.ring.consumers {
if rc.ring.consumers[i] == rc {
rc.ring.consumers = append(rc.ring.consumers[:i], rc.ring.consumers[i+1:]...)
break
}
}
rc.ring.lock.Unlock()
close(rc.ch)
rc.closed = true
return rc.cur
} }

View File

@ -40,6 +40,13 @@ type user struct {
downstreamConns []*downstreamConn downstreamConns []*downstreamConn
} }
func newUser(srv *Server, username string) *user {
return &user{
username: username,
srv: srv,
}
}
func (u *user) forEachUpstream(f func(uc *upstreamConn)) { func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
u.lock.Lock() u.lock.Lock()
for _, uc := range u.upstreamConns { for _, uc := range u.upstreamConns {
@ -116,7 +123,7 @@ func (s *Server) prefix() *irc.Prefix {
func (s *Server) Run() { func (s *Server) Run() {
// TODO: multi-user // TODO: multi-user
u := &user{username: "jounce", srv: s} u := newUser(s, "jounce")
s.lock.Lock() s.lock.Lock()
s.users[u.username] = u s.users[u.username] = u

View File

@ -44,6 +44,7 @@ type upstreamConn struct {
closed bool closed bool
modes modeSet modes modeSet
channels map[string]*upstreamChannel channels map[string]*upstreamChannel
history map[string]uint64
} }
func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) { func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
@ -66,6 +67,7 @@ func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
messages: msgs, messages: msgs,
ring: NewRing(u.srv.RingCap), ring: NewRing(u.srv.RingCap),
channels: make(map[string]*upstreamChannel), channels: make(map[string]*upstreamChannel),
history: make(map[string]uint64),
} }
go func() { go func() {
@ -305,9 +307,6 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
}) })
case "PRIVMSG": case "PRIVMSG":
uc.ring.Produce(msg) uc.ring.Produce(msg)
uc.user.forEachDownstream(func(dc *downstreamConn) {
dc.SendMessage(msg)
})
case irc.RPL_YOURHOST, irc.RPL_CREATED: case irc.RPL_YOURHOST, irc.RPL_CREATED:
// Ignore // Ignore
case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME: case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME: