From b75214f9ea5078204c0d6c0c8bafa5fa760ef563 Mon Sep 17 00:00:00 2001 From: sad Date: Sun, 7 Apr 2024 14:58:24 -0600 Subject: [PATCH] fuck it up --- conn.go | 125 ++++++++++++++++++++++++-------------------------- downstream.go | 12 ++--- rate.go | 30 ++++-------- server.go | 28 +++++------ upstream.go | 8 ++-- user.go | 2 +- 6 files changed, 94 insertions(+), 111 deletions(-) diff --git a/conn.go b/conn.go index 983f976..2f8ef4f 100644 --- a/conn.go +++ b/conn.go @@ -11,7 +11,6 @@ import ( "time" "unicode" - "golang.org/x/time/rate" "gopkg.in/irc.v4" "nhooyr.io/websocket" ) @@ -48,11 +47,11 @@ func newWebsocketIRCConn(c *websocket.Conn, remoteAddr string) ircConn { func (wic *websocketIRCConn) ReadMessage() (*irc.Message, error) { ctx := context.Background() - if !wic.readDeadline.IsZero() { - var cancel context.CancelFunc - ctx, cancel = context.WithDeadline(ctx, wic.readDeadline) - defer cancel() - } +// if !wic.readDeadline.IsZero() { +// var cancel context.CancelFunc +// ctx, cancel = context.WithDeadline(ctx, wic.readDeadline) +// defer cancel() +// } _, b, err := wic.conn.Read(ctx) if err != nil { switch websocket.CloseStatus(err) { @@ -68,11 +67,11 @@ func (wic *websocketIRCConn) ReadMessage() (*irc.Message, error) { func (wic *websocketIRCConn) WriteMessage(msg *irc.Message) error { b := []byte(strings.ToValidUTF8(msg.String(), string(unicode.ReplacementChar))) ctx := context.Background() - if !wic.writeDeadline.IsZero() { - var cancel context.CancelFunc - ctx, cancel = context.WithDeadline(ctx, wic.writeDeadline) - defer cancel() - } +// if !wic.writeDeadline.IsZero() { +// var cancel context.CancelFunc +// ctx, cancel = context.WithDeadline(ctx, wic.writeDeadline) +// defer cancel() +// } return wic.conn.Write(ctx, websocket.MessageText, b) } @@ -112,8 +111,8 @@ func (wa websocketAddr) String() string { type connOptions struct { Logger Logger - RateLimitDelay time.Duration - RateLimitBurst int + //RateLimitDelay time.Duration + //RateLimitBurst int } type conn struct { @@ -127,50 +126,45 @@ type conn struct { closedCh chan struct{} } + func newConn(srv *Server, ic ircConn, options *connOptions) *conn { - outgoing := make(chan *irc.Message, 64) - c := &conn{ - conn: ic, - srv: srv, - outgoing: outgoing, - logger: options.Logger, - closedCh: make(chan struct{}), - } + outgoing := make(chan *irc.Message, 640000) + c := &conn{ + conn: ic, + srv: srv, + outgoing: outgoing, + logger: options.Logger, + closedCh: make(chan struct{}), + } - go func() { - ctx, cancel := c.NewContext(context.Background()) - defer cancel() + go func() { +// defer cancel() - rl := rate.NewLimiter(rate.Every(options.RateLimitDelay), options.RateLimitBurst) - for msg := range outgoing { - if msg == nil { - break - } + for msg := range outgoing { + if msg == nil { + break + } - if err := rl.Wait(ctx); err != nil { - break - } + c.logger.Debugf("sent: %v", msg) +// c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) + if err := c.conn.WriteMessage(msg); err != nil { + c.logger.Printf("failed to write message: %v", err) + break + } + } + if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) { + c.logger.Printf("failed to close connection: %v", err) + } else { + c.logger.Debugf("connection closed") + } + // Drain the outgoing channel to prevent SendMessage from blocking + for range outgoing { + // This space is intentionally left blank + } + }() - c.logger.Debugf("sent: %v", msg) - c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) - if err := c.conn.WriteMessage(msg); err != nil { - c.logger.Printf("failed to write message: %v", err) - break - } - } - if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) { - c.logger.Printf("failed to close connection: %v", err) - } else { - c.logger.Debugf("connection closed") - } - // Drain the outgoing channel to prevent SendMessage from blocking - for range outgoing { - // This space is intentionally left blank - } - }() - - c.logger.Debugf("new connection") - return c + c.logger.Debugf("new connection") + return c } func (c *conn) isClosed() bool { @@ -212,22 +206,25 @@ func (c *conn) ReadMessage() (*irc.Message, error) { // // If the connection is closed before the message is sent, SendMessage silently // drops the message. -func (c *conn) SendMessage(ctx context.Context, msg *irc.Message) { - c.lock.Lock() - defer c.lock.Unlock() +func (c *conn) SendMessage(msg *irc.Message) { + //c.lock.Lock() + //defer c.lock.Unlock() - if c.closed { - return - } + //if c.closed { + // return + //} - select { - case c.outgoing <- msg: - // Success - case <-ctx.Done(): - c.logger.Printf("failed to send message: %v", ctx.Err()) - } + // Attempt to send the message immediately without waiting for context cancellation. + // This removes the ability to cancel this operation, which could lead to issues if the system relies on cancellations for cleanup or other logic. + select { + case c.outgoing <- msg: + // Message sent successfully + default: + c.logger.Printf("failed to send message: outgoing channel is full or closed") + } } + // Shutdown gracefully closes the connection, flushing any pending message. func (c *conn) Shutdown(ctx context.Context) { c.lock.Lock() diff --git a/downstream.go b/downstream.go index f28b40f..9fd0f4f 100644 --- a/downstream.go +++ b/downstream.go @@ -531,11 +531,11 @@ func (dc *downstreamConn) SendMessage(ctx context.Context, msg *irc.Message) { } dc.srv.metrics.downstreamOutMessagesTotal.Inc() - dc.conn.SendMessage(ctx, msg) + dc.conn.SendMessage(msg) } func (dc *downstreamConn) SendBatch(ctx context.Context, typ string, params []string, tags irc.Tags, f func(batchRef string)) { - dc.lastBatchRef++ + dc.lastBatchRef++ ref := fmt.Sprintf("%v", dc.lastBatchRef) if dc.caps.IsEnabled("batch") { @@ -617,8 +617,8 @@ func (dc *downstreamConn) handleMessage(ctx context.Context, msg *irc.Message) e ctx, cancel := dc.conn.NewContext(ctx) defer cancel() - ctx, cancel = context.WithTimeout(ctx, handleDownstreamMessageTimeout) - defer cancel() + //ctx, cancel = context.WithTimeout(ctx) + //defer cancel() switch msg.Command { case "QUIT": @@ -1497,7 +1497,7 @@ func (dc *downstreamConn) welcome(ctx context.Context, user *user) error { dc.SendMessage(ctx, &irc.Message{ Prefix: dc.srv.prefix(), Command: irc.RPL_WELCOME, - Params: []string{dc.nick, "Welcome to soju, " + dc.nick}, + Params: []string{dc.nick, "Welcome to CERN, " + dc.nick}, }) dc.SendMessage(ctx, &irc.Message{ Prefix: dc.srv.prefix(), @@ -1507,7 +1507,7 @@ func (dc *downstreamConn) welcome(ctx context.Context, user *user) error { dc.SendMessage(ctx, &irc.Message{ Prefix: dc.srv.prefix(), Command: irc.RPL_MYINFO, - Params: []string{dc.nick, dc.srv.Config().Hostname, "soju", "aiwroO", "OovaimnqpsrtklbeI"}, + Params: []string{dc.nick, dc.srv.Config().Hostname, "FAGNET", "GAYFAGGOT", "YOUAREGAY"}, }) for _, msg := range xirc.GenerateIsupport(dc.srv.prefix(), isupport) { dc.SendMessage(ctx, msg) diff --git a/rate.go b/rate.go index 74225a3..9637c2c 100644 --- a/rate.go +++ b/rate.go @@ -1,40 +1,26 @@ package soju import ( - "math/rand" "time" ) // backoffer implements a simple exponential backoff. type backoffer struct { - min, max, jitter time.Duration - n int64 + min, max, jitter time.Duration + n int64 } func newBackoffer(min, max, jitter time.Duration) *backoffer { - return &backoffer{min: min, max: max, jitter: jitter} + // Initialize the struct with provided values, but these won't actually affect the backoff now. + return &backoffer{min: min, max: max, jitter: jitter} } func (b *backoffer) Reset() { - b.n = 0 + b.n = 0 } func (b *backoffer) Next() time.Duration { - if b.n == 0 { - b.n = 1 - return 0 - } - - d := time.Duration(b.n) * b.min - if d > b.max { - d = b.max - } else { - b.n *= 2 - } - - if b.jitter != 0 { - d += time.Duration(rand.Int63n(int64(b.jitter))) - } - - return d + // Always return 0 to indicate no waiting period between retries. + return 0 } + diff --git a/server.go b/server.go index d26e873..cf4f849 100644 --- a/server.go +++ b/server.go @@ -27,15 +27,15 @@ import ( ) var ( - retryConnectMinDelay = time.Minute - retryConnectMaxDelay = 10 * time.Minute - retryConnectJitter = time.Minute + retryConnectMinDelay = time.Second + retryConnectMaxDelay = 0 * time.Second + retryConnectJitter = time.Second connectTimeout = 15 * time.Second writeTimeout = 10 * time.Second - upstreamMessageDelay = 2 * time.Second - upstreamMessageBurst = 10 - backlogTimeout = 10 * time.Second - handleDownstreamMessageTimeout = 10 * time.Second +// upstreamMessageDelay = 0 +// upstreamMessageBurst = 999999 + backlogTimeout = 0 * time.Second + handleDownstreamMessageTimeout = 0 * time.Second downstreamRegisterTimeout = 30 * time.Second webpushCheckSubscriptionDelay = 24 * time.Hour webpushPruneSubscriptionDelay = 30 * 24 * time.Hour @@ -322,7 +322,7 @@ func (s *Server) loadWebPushConfig(ctx context.Context) error { } func (s *Server) sendWebPush(ctx context.Context, sub *webpush.Subscription, vapidPubKey string, msg *irc.Message) error { - ctx, cancel := context.WithTimeout(ctx, 15*time.Second) + ctx, cancel := context.WithTimeout(ctx, 0*time.Second) defer cancel() var urgency webpush.Urgency @@ -543,7 +543,7 @@ func (s *Server) HandleAdmin(ic ircConn) { defer c.Close() if shutdown { - c.SendMessage(ctx, &irc.Message{ + c.SendMessage(&irc.Message{ Command: "ERROR", Params: []string{"Server is shutting down"}, }) @@ -560,7 +560,7 @@ func (s *Server) HandleAdmin(ic ircConn) { switch msg.Command { case "BOUNCERSERV": if len(msg.Params) < 1 { - c.SendMessage(ctx, &irc.Message{ + c.SendMessage(&irc.Message{ Command: irc.ERR_NEEDMOREPARAMS, Params: []string{ "*", @@ -575,7 +575,7 @@ func (s *Server) HandleAdmin(ic ircConn) { srv: s, admin: true, print: func(text string) { - c.SendMessage(ctx, &irc.Message{ + c.SendMessage(&irc.Message{ Prefix: s.prefix(), Command: "PRIVMSG", Params: []string{"*", text}, @@ -583,20 +583,20 @@ func (s *Server) HandleAdmin(ic ircConn) { }, }, msg.Params[0]) if err != nil { - c.SendMessage(ctx, &irc.Message{ + c.SendMessage(&irc.Message{ Prefix: s.prefix(), Command: "FAIL", Params: []string{msg.Command, err.Error()}, }) } else { - c.SendMessage(ctx, &irc.Message{ + c.SendMessage(&irc.Message{ Prefix: s.prefix(), Command: msg.Command, Params: []string{"OK"}, }) } default: - c.SendMessage(ctx, &irc.Message{ + c.SendMessage(&irc.Message{ Prefix: s.prefix(), Command: irc.ERR_UNKNOWNCOMMAND, Params: []string{ diff --git a/upstream.go b/upstream.go index e3bdf4e..4be46df 100644 --- a/upstream.go +++ b/upstream.go @@ -30,7 +30,7 @@ var permanentUpstreamCaps = map[string]bool{ "account-notify": true, "account-tag": true, "away-notify": true, - "batch": true, + "batch": false, "chghost": true, "extended-join": true, "extended-monitor": true, @@ -346,8 +346,8 @@ func connectToUpstream(ctx context.Context, network *network) (*upstreamConn, er options := connOptions{ Logger: logger, - RateLimitDelay: upstreamMessageDelay, - RateLimitBurst: upstreamMessageBurst, + //RateLimitDelay: 0, + //RateLimitBurst: 99999, } cm := stdCaseMapping @@ -2077,7 +2077,7 @@ func (uc *upstreamConn) SendMessage(ctx context.Context, msg *irc.Message) { } uc.srv.metrics.upstreamOutMessagesTotal.Inc() - uc.conn.SendMessage(ctx, msg) + uc.conn.SendMessage(msg) } func (uc *upstreamConn) SendMessageLabeled(ctx context.Context, downstreamID uint64, msg *irc.Message) { diff --git a/user.go b/user.go index 588e015..39aaf49 100644 --- a/user.go +++ b/user.go @@ -279,7 +279,7 @@ func (net *network) run() { delay := backoff.Next() - time.Now().Sub(lastTry) if delay > 0 { net.logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr) - time.Sleep(delay) + delay = 0 } lastTry = time.Now()