Introduce a user.events channel

This allows to easily add new events, and also guarantees ordering
between different event types.
This commit is contained in:
Simon Ser 2020-03-27 16:33:19 +01:00
parent 764b54da59
commit 474f2889d9
No known key found for this signature in database
GPG Key ID: 0FDE7BE0E88F5E48
4 changed files with 22 additions and 20 deletions

View File

@ -227,7 +227,7 @@ func (dc *downstreamConn) isClosed() bool {
} }
} }
func (dc *downstreamConn) readMessages(ch chan<- downstreamIncomingMessage) error { func (dc *downstreamConn) readMessages(ch chan<- event) error {
for { for {
msg, err := dc.irc.ReadMessage() msg, err := dc.irc.ReadMessage()
if err == io.EOF { if err == io.EOF {
@ -240,7 +240,7 @@ func (dc *downstreamConn) readMessages(ch chan<- downstreamIncomingMessage) erro
dc.logger.Printf("received: %v", msg) dc.logger.Printf("received: %v", msg)
} }
ch <- downstreamIncomingMessage{msg, dc} ch <- eventDownstreamMessage{msg, dc}
} }
return nil return nil

View File

@ -119,7 +119,7 @@ func (s *Server) Serve(ln net.Listener) error {
if err := dc.runUntilRegistered(); err != nil { if err := dc.runUntilRegistered(); err != nil {
dc.logger.Print(err) dc.logger.Print(err)
} else { } else {
if err := dc.readMessages(dc.user.downstreamIncoming); err != nil { if err := dc.readMessages(dc.user.events); err != nil {
dc.logger.Print(err) dc.logger.Print(err)
} }
} }

View File

@ -1167,7 +1167,7 @@ func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
return nil return nil
} }
func (uc *upstreamConn) readMessages(ch chan<- upstreamIncomingMessage) error { func (uc *upstreamConn) readMessages(ch chan<- event) error {
for { for {
msg, err := uc.irc.ReadMessage() msg, err := uc.irc.ReadMessage()
if err == io.EOF { if err == io.EOF {
@ -1180,7 +1180,7 @@ func (uc *upstreamConn) readMessages(ch chan<- upstreamIncomingMessage) error {
uc.logger.Printf("received: %v", msg) uc.logger.Printf("received: %v", msg)
} }
ch <- upstreamIncomingMessage{msg, uc} ch <- eventUpstreamMessage{msg, uc}
} }
return nil return nil

32
user.go
View File

@ -7,12 +7,14 @@ import (
"gopkg.in/irc.v3" "gopkg.in/irc.v3"
) )
type upstreamIncomingMessage struct { type event interface{}
type eventUpstreamMessage struct {
msg *irc.Message msg *irc.Message
uc *upstreamConn uc *upstreamConn
} }
type downstreamIncomingMessage struct { type eventDownstreamMessage struct {
msg *irc.Message msg *irc.Message
dc *downstreamConn dc *downstreamConn
} }
@ -58,7 +60,7 @@ func (net *network) run() {
net.conn = uc net.conn = uc
net.lock.Unlock() net.lock.Unlock()
if err := uc.readMessages(net.user.upstreamIncoming); err != nil { if err := uc.readMessages(net.user.events); err != nil {
uc.logger.Printf("failed to handle messages: %v", err) uc.logger.Printf("failed to handle messages: %v", err)
} }
uc.Close() uc.Close()
@ -79,8 +81,7 @@ type user struct {
User User
srv *Server srv *Server
upstreamIncoming chan upstreamIncomingMessage events chan event
downstreamIncoming chan downstreamIncomingMessage
lock sync.Mutex lock sync.Mutex
networks []*network networks []*network
@ -89,10 +90,9 @@ type user struct {
func newUser(srv *Server, record *User) *user { func newUser(srv *Server, record *User) *user {
return &user{ return &user{
User: *record, User: *record,
srv: srv, srv: srv,
upstreamIncoming: make(chan upstreamIncomingMessage, 64), events: make(chan event, 64),
downstreamIncoming: make(chan downstreamIncomingMessage, 64),
} }
} }
@ -149,10 +149,10 @@ func (u *user) run() {
} }
u.lock.Unlock() u.lock.Unlock()
for { for e := range u.events {
select { switch e := e.(type) {
case upstreamMsg := <-u.upstreamIncoming: case eventUpstreamMessage:
msg, uc := upstreamMsg.msg, upstreamMsg.uc msg, uc := e.msg, e.uc
if uc.closed { if uc.closed {
uc.logger.Printf("ignoring message on closed connection: %v", msg) uc.logger.Printf("ignoring message on closed connection: %v", msg)
break break
@ -160,8 +160,8 @@ func (u *user) run() {
if err := uc.handleMessage(msg); err != nil { if err := uc.handleMessage(msg); err != nil {
uc.logger.Printf("failed to handle message %q: %v", msg, err) uc.logger.Printf("failed to handle message %q: %v", msg, err)
} }
case downstreamMsg := <-u.downstreamIncoming: case eventDownstreamMessage:
msg, dc := downstreamMsg.msg, downstreamMsg.dc msg, dc := e.msg, e.dc
if dc.isClosed() { if dc.isClosed() {
dc.logger.Printf("ignoring message on closed connection: %v", msg) dc.logger.Printf("ignoring message on closed connection: %v", msg)
break break
@ -174,6 +174,8 @@ func (u *user) run() {
dc.logger.Printf("failed to handle message %q: %v", msg, err) dc.logger.Printf("failed to handle message %q: %v", msg, err)
dc.Close() dc.Close()
} }
default:
u.srv.Logger.Printf("received unknown event type: %T", e)
} }
} }
} }