Move upstreamConn.ring to network
This handles upstream disconnection and re-connection better.
This commit is contained in:
parent
9486d657c5
commit
293a0e8e20
@ -699,27 +699,34 @@ func (dc *downstreamConn) register() error {
|
|||||||
forwardChannel(dc, ch)
|
forwardChannel(dc, ch)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
dc.forEachNetwork(func(net *network) {
|
||||||
historyName := dc.rawUsername
|
historyName := dc.rawUsername
|
||||||
|
|
||||||
// TODO: need to take dc.network into account here
|
// TODO: need to take dc.network into account here
|
||||||
var seqPtr *uint64
|
var seqPtr *uint64
|
||||||
if firstDownstream {
|
if firstDownstream {
|
||||||
uc.network.lock.Lock()
|
net.lock.Lock()
|
||||||
seq, ok := uc.network.history[historyName]
|
seq, ok := net.history[historyName]
|
||||||
uc.network.lock.Unlock()
|
net.lock.Unlock()
|
||||||
if ok {
|
if ok {
|
||||||
seqPtr = &seq
|
seqPtr = &seq
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: we need to create a consumer when adding networks on-the-fly
|
// TODO: we need to create a consumer when adding networks on-the-fly
|
||||||
consumer, ch := uc.ring.NewConsumer(seqPtr)
|
consumer, ch := net.ring.NewConsumer(seqPtr)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
var closed bool
|
var closed bool
|
||||||
select {
|
select {
|
||||||
case <-ch:
|
case <-ch:
|
||||||
|
uc := net.upstream()
|
||||||
|
if uc == nil {
|
||||||
|
dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
|
||||||
|
break
|
||||||
|
}
|
||||||
dc.ringMessages <- ringMessage{consumer, uc}
|
dc.ringMessages <- ringMessage{consumer, uc}
|
||||||
case <-dc.closed:
|
case <-dc.closed:
|
||||||
closed = true
|
closed = true
|
||||||
@ -737,9 +744,9 @@ func (dc *downstreamConn) register() error {
|
|||||||
dc.user.lock.Unlock()
|
dc.user.lock.Unlock()
|
||||||
|
|
||||||
if lastDownstream {
|
if lastDownstream {
|
||||||
uc.network.lock.Lock()
|
net.lock.Lock()
|
||||||
uc.network.history[historyName] = seq
|
net.history[historyName] = seq
|
||||||
uc.network.lock.Unlock()
|
net.lock.Unlock()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
})
|
})
|
||||||
@ -1086,7 +1093,7 @@ func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
|
|||||||
dc.ourMessages[echoMsg] = struct{}{}
|
dc.ourMessages[echoMsg] = struct{}{}
|
||||||
dc.lock.Unlock()
|
dc.lock.Unlock()
|
||||||
|
|
||||||
uc.ring.Produce(echoMsg)
|
uc.network.ring.Produce(echoMsg)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
dc.logger.Printf("unhandled message: %v", msg)
|
dc.logger.Printf("unhandled message: %v", msg)
|
||||||
|
@ -34,7 +34,6 @@ type upstreamConn struct {
|
|||||||
srv *Server
|
srv *Server
|
||||||
user *user
|
user *user
|
||||||
outgoing chan<- *irc.Message
|
outgoing chan<- *irc.Message
|
||||||
ring *Ring
|
|
||||||
|
|
||||||
serverName string
|
serverName string
|
||||||
availableUserModes string
|
availableUserModes string
|
||||||
@ -80,7 +79,6 @@ func connectToUpstream(network *network) (*upstreamConn, error) {
|
|||||||
srv: network.user.srv,
|
srv: network.user.srv,
|
||||||
user: network.user,
|
user: network.user,
|
||||||
outgoing: outgoing,
|
outgoing: outgoing,
|
||||||
ring: NewRing(network.user.srv.RingCap),
|
|
||||||
channels: make(map[string]*upstreamChannel),
|
channels: make(map[string]*upstreamChannel),
|
||||||
caps: make(map[string]string),
|
caps: make(map[string]string),
|
||||||
availableChannelTypes: stdChannelTypes,
|
availableChannelTypes: stdChannelTypes,
|
||||||
@ -874,7 +872,7 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
uc.ring.Produce(msg)
|
uc.network.ring.Produce(msg)
|
||||||
case "INVITE":
|
case "INVITE":
|
||||||
var nick string
|
var nick string
|
||||||
var channel string
|
var channel string
|
||||||
|
2
user.go
2
user.go
@ -20,6 +20,7 @@ type downstreamIncomingMessage struct {
|
|||||||
type network struct {
|
type network struct {
|
||||||
Network
|
Network
|
||||||
user *user
|
user *user
|
||||||
|
ring *Ring
|
||||||
|
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
conn *upstreamConn
|
conn *upstreamConn
|
||||||
@ -30,6 +31,7 @@ func newNetwork(user *user, record *Network) *network {
|
|||||||
return &network{
|
return &network{
|
||||||
Network: *record,
|
Network: *record,
|
||||||
user: user,
|
user: user,
|
||||||
|
ring: NewRing(user.srv.RingCap),
|
||||||
history: make(map[string]uint64),
|
history: make(map[string]uint64),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user