Move upstreamConn.history to network

This commit is contained in:
Simon Ser 2020-03-20 22:48:17 +01:00
parent 7714c84669
commit 45d118dd12
No known key found for this signature in database
GPG Key ID: 0FDE7BE0E88F5E48
3 changed files with 18 additions and 16 deletions

View File

@ -708,9 +708,9 @@ func (dc *downstreamConn) register() error {
var seqPtr *uint64 var seqPtr *uint64
if firstDownstream { if firstDownstream {
uc.lock.Lock() uc.network.lock.Lock()
seq, ok := uc.history[historyName] seq, ok := uc.network.history[historyName]
uc.lock.Unlock() uc.network.lock.Unlock()
if ok { if ok {
seqPtr = &seq seqPtr = &seq
} }
@ -738,9 +738,9 @@ func (dc *downstreamConn) register() error {
dc.user.lock.Unlock() dc.user.lock.Unlock()
if lastDownstream { if lastDownstream {
uc.lock.Lock() uc.network.lock.Lock()
uc.history[historyName] = seq uc.network.history[historyName] = seq
uc.lock.Unlock() uc.network.lock.Unlock()
} }
}() }()
}) })

View File

@ -8,7 +8,6 @@ import (
"net" "net"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/emersion/go-sasl" "github.com/emersion/go-sasl"
@ -53,9 +52,6 @@ type upstreamConn struct {
saslClient sasl.Client saslClient sasl.Client
saslStarted bool saslStarted bool
lock sync.Mutex
history map[string]uint64 // TODO: move to network
} }
func connectToUpstream(network *network) (*upstreamConn, error) { func connectToUpstream(network *network) (*upstreamConn, error) {
@ -85,7 +81,6 @@ func connectToUpstream(network *network) (*upstreamConn, error) {
outgoing: outgoing, outgoing: outgoing,
ring: NewRing(network.user.srv.RingCap), ring: NewRing(network.user.srv.RingCap),
channels: make(map[string]*upstreamChannel), channels: make(map[string]*upstreamChannel),
history: make(map[string]uint64),
caps: make(map[string]string), caps: make(map[string]string),
} }

15
user.go
View File

@ -20,13 +20,17 @@ type downstreamIncomingMessage struct {
type network struct { type network struct {
Network Network
user *user user *user
lock sync.Mutex
conn *upstreamConn conn *upstreamConn
history map[string]uint64
} }
func newNetwork(user *user, record *Network) *network { func newNetwork(user *user, record *Network) *network {
return &network{ return &network{
Network: *record, Network: *record,
user: user, user: user,
history: make(map[string]uint64),
} }
} }
@ -48,18 +52,18 @@ func (net *network) run() {
uc.register() uc.register()
net.user.lock.Lock() net.lock.Lock()
net.conn = uc net.conn = uc
net.user.lock.Unlock() net.lock.Unlock()
if err := uc.readMessages(net.user.upstreamIncoming); err != nil { if err := uc.readMessages(net.user.upstreamIncoming); err != nil {
uc.logger.Printf("failed to handle messages: %v", err) uc.logger.Printf("failed to handle messages: %v", err)
} }
uc.Close() uc.Close()
net.user.lock.Lock() net.lock.Lock()
net.conn = nil net.conn = nil
net.user.lock.Unlock() net.lock.Unlock()
} }
} }
@ -95,7 +99,10 @@ func (u *user) forEachNetwork(f func(*network)) {
func (u *user) forEachUpstream(f func(uc *upstreamConn)) { func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
u.lock.Lock() u.lock.Lock()
for _, network := range u.networks { for _, network := range u.networks {
network.lock.Lock()
uc := network.conn uc := network.conn
network.lock.Unlock()
if uc == nil || !uc.registered || uc.closed { if uc == nil || !uc.registered || uc.closed {
continue continue
} }