fuck it up

This commit is contained in:
sad 2024-04-07 14:58:24 -06:00
parent c5d07658ab
commit b75214f9ea
WARNING! Although there is a key with this ID in the database it does not verify this commit! This commit is SUSPICIOUS.
GPG Key ID: 28D3A882F3E6AD02
6 changed files with 94 additions and 111 deletions

125
conn.go
View File

@ -11,7 +11,6 @@ import (
"time" "time"
"unicode" "unicode"
"golang.org/x/time/rate"
"gopkg.in/irc.v4" "gopkg.in/irc.v4"
"nhooyr.io/websocket" "nhooyr.io/websocket"
) )
@ -48,11 +47,11 @@ func newWebsocketIRCConn(c *websocket.Conn, remoteAddr string) ircConn {
func (wic *websocketIRCConn) ReadMessage() (*irc.Message, error) { func (wic *websocketIRCConn) ReadMessage() (*irc.Message, error) {
ctx := context.Background() ctx := context.Background()
if !wic.readDeadline.IsZero() { // if !wic.readDeadline.IsZero() {
var cancel context.CancelFunc // var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, wic.readDeadline) // ctx, cancel = context.WithDeadline(ctx, wic.readDeadline)
defer cancel() // defer cancel()
} // }
_, b, err := wic.conn.Read(ctx) _, b, err := wic.conn.Read(ctx)
if err != nil { if err != nil {
switch websocket.CloseStatus(err) { switch websocket.CloseStatus(err) {
@ -68,11 +67,11 @@ func (wic *websocketIRCConn) ReadMessage() (*irc.Message, error) {
func (wic *websocketIRCConn) WriteMessage(msg *irc.Message) error { func (wic *websocketIRCConn) WriteMessage(msg *irc.Message) error {
b := []byte(strings.ToValidUTF8(msg.String(), string(unicode.ReplacementChar))) b := []byte(strings.ToValidUTF8(msg.String(), string(unicode.ReplacementChar)))
ctx := context.Background() ctx := context.Background()
if !wic.writeDeadline.IsZero() { // if !wic.writeDeadline.IsZero() {
var cancel context.CancelFunc // var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, wic.writeDeadline) // ctx, cancel = context.WithDeadline(ctx, wic.writeDeadline)
defer cancel() // defer cancel()
} // }
return wic.conn.Write(ctx, websocket.MessageText, b) return wic.conn.Write(ctx, websocket.MessageText, b)
} }
@ -112,8 +111,8 @@ func (wa websocketAddr) String() string {
type connOptions struct { type connOptions struct {
Logger Logger Logger Logger
RateLimitDelay time.Duration //RateLimitDelay time.Duration
RateLimitBurst int //RateLimitBurst int
} }
type conn struct { type conn struct {
@ -127,50 +126,45 @@ type conn struct {
closedCh chan struct{} closedCh chan struct{}
} }
func newConn(srv *Server, ic ircConn, options *connOptions) *conn { func newConn(srv *Server, ic ircConn, options *connOptions) *conn {
outgoing := make(chan *irc.Message, 64) outgoing := make(chan *irc.Message, 640000)
c := &conn{ c := &conn{
conn: ic, conn: ic,
srv: srv, srv: srv,
outgoing: outgoing, outgoing: outgoing,
logger: options.Logger, logger: options.Logger,
closedCh: make(chan struct{}), closedCh: make(chan struct{}),
} }
go func() { go func() {
ctx, cancel := c.NewContext(context.Background()) // defer cancel()
defer cancel()
rl := rate.NewLimiter(rate.Every(options.RateLimitDelay), options.RateLimitBurst) for msg := range outgoing {
for msg := range outgoing { if msg == nil {
if msg == nil { break
break }
}
if err := rl.Wait(ctx); err != nil { c.logger.Debugf("sent: %v", msg)
break // c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
} if err := c.conn.WriteMessage(msg); err != nil {
c.logger.Printf("failed to write message: %v", err)
break
}
}
if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
c.logger.Printf("failed to close connection: %v", err)
} else {
c.logger.Debugf("connection closed")
}
// Drain the outgoing channel to prevent SendMessage from blocking
for range outgoing {
// This space is intentionally left blank
}
}()
c.logger.Debugf("sent: %v", msg) c.logger.Debugf("new connection")
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) return c
if err := c.conn.WriteMessage(msg); err != nil {
c.logger.Printf("failed to write message: %v", err)
break
}
}
if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
c.logger.Printf("failed to close connection: %v", err)
} else {
c.logger.Debugf("connection closed")
}
// Drain the outgoing channel to prevent SendMessage from blocking
for range outgoing {
// This space is intentionally left blank
}
}()
c.logger.Debugf("new connection")
return c
} }
func (c *conn) isClosed() bool { func (c *conn) isClosed() bool {
@ -212,22 +206,25 @@ func (c *conn) ReadMessage() (*irc.Message, error) {
// //
// If the connection is closed before the message is sent, SendMessage silently // If the connection is closed before the message is sent, SendMessage silently
// drops the message. // drops the message.
func (c *conn) SendMessage(ctx context.Context, msg *irc.Message) { func (c *conn) SendMessage(msg *irc.Message) {
c.lock.Lock() //c.lock.Lock()
defer c.lock.Unlock() //defer c.lock.Unlock()
if c.closed { //if c.closed {
return // return
} //}
select { // Attempt to send the message immediately without waiting for context cancellation.
case c.outgoing <- msg: // This removes the ability to cancel this operation, which could lead to issues if the system relies on cancellations for cleanup or other logic.
// Success select {
case <-ctx.Done(): case c.outgoing <- msg:
c.logger.Printf("failed to send message: %v", ctx.Err()) // Message sent successfully
} default:
c.logger.Printf("failed to send message: outgoing channel is full or closed")
}
} }
// Shutdown gracefully closes the connection, flushing any pending message. // Shutdown gracefully closes the connection, flushing any pending message.
func (c *conn) Shutdown(ctx context.Context) { func (c *conn) Shutdown(ctx context.Context) {
c.lock.Lock() c.lock.Lock()

View File

@ -531,11 +531,11 @@ func (dc *downstreamConn) SendMessage(ctx context.Context, msg *irc.Message) {
} }
dc.srv.metrics.downstreamOutMessagesTotal.Inc() dc.srv.metrics.downstreamOutMessagesTotal.Inc()
dc.conn.SendMessage(ctx, msg) dc.conn.SendMessage(msg)
} }
func (dc *downstreamConn) SendBatch(ctx context.Context, typ string, params []string, tags irc.Tags, f func(batchRef string)) { func (dc *downstreamConn) SendBatch(ctx context.Context, typ string, params []string, tags irc.Tags, f func(batchRef string)) {
dc.lastBatchRef++ dc.lastBatchRef++
ref := fmt.Sprintf("%v", dc.lastBatchRef) ref := fmt.Sprintf("%v", dc.lastBatchRef)
if dc.caps.IsEnabled("batch") { if dc.caps.IsEnabled("batch") {
@ -617,8 +617,8 @@ func (dc *downstreamConn) handleMessage(ctx context.Context, msg *irc.Message) e
ctx, cancel := dc.conn.NewContext(ctx) ctx, cancel := dc.conn.NewContext(ctx)
defer cancel() defer cancel()
ctx, cancel = context.WithTimeout(ctx, handleDownstreamMessageTimeout) //ctx, cancel = context.WithTimeout(ctx)
defer cancel() //defer cancel()
switch msg.Command { switch msg.Command {
case "QUIT": case "QUIT":
@ -1497,7 +1497,7 @@ func (dc *downstreamConn) welcome(ctx context.Context, user *user) error {
dc.SendMessage(ctx, &irc.Message{ dc.SendMessage(ctx, &irc.Message{
Prefix: dc.srv.prefix(), Prefix: dc.srv.prefix(),
Command: irc.RPL_WELCOME, Command: irc.RPL_WELCOME,
Params: []string{dc.nick, "Welcome to soju, " + dc.nick}, Params: []string{dc.nick, "Welcome to CERN, " + dc.nick},
}) })
dc.SendMessage(ctx, &irc.Message{ dc.SendMessage(ctx, &irc.Message{
Prefix: dc.srv.prefix(), Prefix: dc.srv.prefix(),
@ -1507,7 +1507,7 @@ func (dc *downstreamConn) welcome(ctx context.Context, user *user) error {
dc.SendMessage(ctx, &irc.Message{ dc.SendMessage(ctx, &irc.Message{
Prefix: dc.srv.prefix(), Prefix: dc.srv.prefix(),
Command: irc.RPL_MYINFO, Command: irc.RPL_MYINFO,
Params: []string{dc.nick, dc.srv.Config().Hostname, "soju", "aiwroO", "OovaimnqpsrtklbeI"}, Params: []string{dc.nick, dc.srv.Config().Hostname, "FAGNET", "GAYFAGGOT", "YOUAREGAY"},
}) })
for _, msg := range xirc.GenerateIsupport(dc.srv.prefix(), isupport) { for _, msg := range xirc.GenerateIsupport(dc.srv.prefix(), isupport) {
dc.SendMessage(ctx, msg) dc.SendMessage(ctx, msg)

30
rate.go
View File

@ -1,40 +1,26 @@
package soju package soju
import ( import (
"math/rand"
"time" "time"
) )
// backoffer implements a simple exponential backoff. // backoffer implements a simple exponential backoff.
type backoffer struct { type backoffer struct {
min, max, jitter time.Duration min, max, jitter time.Duration
n int64 n int64
} }
func newBackoffer(min, max, jitter time.Duration) *backoffer { func newBackoffer(min, max, jitter time.Duration) *backoffer {
return &backoffer{min: min, max: max, jitter: jitter} // Initialize the struct with provided values, but these won't actually affect the backoff now.
return &backoffer{min: min, max: max, jitter: jitter}
} }
func (b *backoffer) Reset() { func (b *backoffer) Reset() {
b.n = 0 b.n = 0
} }
func (b *backoffer) Next() time.Duration { func (b *backoffer) Next() time.Duration {
if b.n == 0 { // Always return 0 to indicate no waiting period between retries.
b.n = 1 return 0
return 0
}
d := time.Duration(b.n) * b.min
if d > b.max {
d = b.max
} else {
b.n *= 2
}
if b.jitter != 0 {
d += time.Duration(rand.Int63n(int64(b.jitter)))
}
return d
} }

View File

@ -27,15 +27,15 @@ import (
) )
var ( var (
retryConnectMinDelay = time.Minute retryConnectMinDelay = time.Second
retryConnectMaxDelay = 10 * time.Minute retryConnectMaxDelay = 0 * time.Second
retryConnectJitter = time.Minute retryConnectJitter = time.Second
connectTimeout = 15 * time.Second connectTimeout = 15 * time.Second
writeTimeout = 10 * time.Second writeTimeout = 10 * time.Second
upstreamMessageDelay = 2 * time.Second // upstreamMessageDelay = 0
upstreamMessageBurst = 10 // upstreamMessageBurst = 999999
backlogTimeout = 10 * time.Second backlogTimeout = 0 * time.Second
handleDownstreamMessageTimeout = 10 * time.Second handleDownstreamMessageTimeout = 0 * time.Second
downstreamRegisterTimeout = 30 * time.Second downstreamRegisterTimeout = 30 * time.Second
webpushCheckSubscriptionDelay = 24 * time.Hour webpushCheckSubscriptionDelay = 24 * time.Hour
webpushPruneSubscriptionDelay = 30 * 24 * time.Hour webpushPruneSubscriptionDelay = 30 * 24 * time.Hour
@ -322,7 +322,7 @@ func (s *Server) loadWebPushConfig(ctx context.Context) error {
} }
func (s *Server) sendWebPush(ctx context.Context, sub *webpush.Subscription, vapidPubKey string, msg *irc.Message) error { func (s *Server) sendWebPush(ctx context.Context, sub *webpush.Subscription, vapidPubKey string, msg *irc.Message) error {
ctx, cancel := context.WithTimeout(ctx, 15*time.Second) ctx, cancel := context.WithTimeout(ctx, 0*time.Second)
defer cancel() defer cancel()
var urgency webpush.Urgency var urgency webpush.Urgency
@ -543,7 +543,7 @@ func (s *Server) HandleAdmin(ic ircConn) {
defer c.Close() defer c.Close()
if shutdown { if shutdown {
c.SendMessage(ctx, &irc.Message{ c.SendMessage(&irc.Message{
Command: "ERROR", Command: "ERROR",
Params: []string{"Server is shutting down"}, Params: []string{"Server is shutting down"},
}) })
@ -560,7 +560,7 @@ func (s *Server) HandleAdmin(ic ircConn) {
switch msg.Command { switch msg.Command {
case "BOUNCERSERV": case "BOUNCERSERV":
if len(msg.Params) < 1 { if len(msg.Params) < 1 {
c.SendMessage(ctx, &irc.Message{ c.SendMessage(&irc.Message{
Command: irc.ERR_NEEDMOREPARAMS, Command: irc.ERR_NEEDMOREPARAMS,
Params: []string{ Params: []string{
"*", "*",
@ -575,7 +575,7 @@ func (s *Server) HandleAdmin(ic ircConn) {
srv: s, srv: s,
admin: true, admin: true,
print: func(text string) { print: func(text string) {
c.SendMessage(ctx, &irc.Message{ c.SendMessage(&irc.Message{
Prefix: s.prefix(), Prefix: s.prefix(),
Command: "PRIVMSG", Command: "PRIVMSG",
Params: []string{"*", text}, Params: []string{"*", text},
@ -583,20 +583,20 @@ func (s *Server) HandleAdmin(ic ircConn) {
}, },
}, msg.Params[0]) }, msg.Params[0])
if err != nil { if err != nil {
c.SendMessage(ctx, &irc.Message{ c.SendMessage(&irc.Message{
Prefix: s.prefix(), Prefix: s.prefix(),
Command: "FAIL", Command: "FAIL",
Params: []string{msg.Command, err.Error()}, Params: []string{msg.Command, err.Error()},
}) })
} else { } else {
c.SendMessage(ctx, &irc.Message{ c.SendMessage(&irc.Message{
Prefix: s.prefix(), Prefix: s.prefix(),
Command: msg.Command, Command: msg.Command,
Params: []string{"OK"}, Params: []string{"OK"},
}) })
} }
default: default:
c.SendMessage(ctx, &irc.Message{ c.SendMessage(&irc.Message{
Prefix: s.prefix(), Prefix: s.prefix(),
Command: irc.ERR_UNKNOWNCOMMAND, Command: irc.ERR_UNKNOWNCOMMAND,
Params: []string{ Params: []string{

View File

@ -30,7 +30,7 @@ var permanentUpstreamCaps = map[string]bool{
"account-notify": true, "account-notify": true,
"account-tag": true, "account-tag": true,
"away-notify": true, "away-notify": true,
"batch": true, "batch": false,
"chghost": true, "chghost": true,
"extended-join": true, "extended-join": true,
"extended-monitor": true, "extended-monitor": true,
@ -346,8 +346,8 @@ func connectToUpstream(ctx context.Context, network *network) (*upstreamConn, er
options := connOptions{ options := connOptions{
Logger: logger, Logger: logger,
RateLimitDelay: upstreamMessageDelay, //RateLimitDelay: 0,
RateLimitBurst: upstreamMessageBurst, //RateLimitBurst: 99999,
} }
cm := stdCaseMapping cm := stdCaseMapping
@ -2077,7 +2077,7 @@ func (uc *upstreamConn) SendMessage(ctx context.Context, msg *irc.Message) {
} }
uc.srv.metrics.upstreamOutMessagesTotal.Inc() uc.srv.metrics.upstreamOutMessagesTotal.Inc()
uc.conn.SendMessage(ctx, msg) uc.conn.SendMessage(msg)
} }
func (uc *upstreamConn) SendMessageLabeled(ctx context.Context, downstreamID uint64, msg *irc.Message) { func (uc *upstreamConn) SendMessageLabeled(ctx context.Context, downstreamID uint64, msg *irc.Message) {

View File

@ -279,7 +279,7 @@ func (net *network) run() {
delay := backoff.Next() - time.Now().Sub(lastTry) delay := backoff.Next() - time.Now().Sub(lastTry)
if delay > 0 { if delay > 0 {
net.logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr) net.logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
time.Sleep(delay) delay = 0
} }
lastTry = time.Now() lastTry = time.Now()