diff --git a/downstream.go b/downstream.go index cd2cb77..e1b98f4 100644 --- a/downstream.go +++ b/downstream.go @@ -702,58 +702,70 @@ func (dc *downstreamConn) register() error { }) dc.forEachNetwork(func(net *network) { - historyName := dc.rawUsername - - // TODO: need to take dc.network into account here - var seqPtr *uint64 - if firstDownstream { - net.lock.Lock() - seq, ok := net.history[historyName] - net.lock.Unlock() - if ok { - seqPtr = &seq - } - } - - // TODO: we need to create a consumer when adding networks on-the-fly - consumer, ch := net.ring.NewConsumer(seqPtr) - go func() { - for { - var closed bool - select { - case <-ch: - uc := net.upstream() - if uc == nil { - dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) - break - } - dc.ringMessages <- ringMessage{consumer, uc} - case <-dc.closed: - closed = true - } - if closed { - break - } - } - - seq := consumer.Close() - - // TODO: need to take dc.network into account here - dc.user.lock.Lock() - lastDownstream := len(dc.user.downstreamConns) == 0 - dc.user.lock.Unlock() - - if lastDownstream { - net.lock.Lock() - net.history[historyName] = seq - net.lock.Unlock() - } - }() + // TODO: need to take dc.network into account when deciding whether or + // not to load history + dc.runNetwork(net, firstDownstream) }) return nil } +// runNetwork starts listening for messages coming from the network's ring +// buffer. +// +// It panics if the network is not suitable for the downstream connection. +func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) { + if dc.network != nil && net != dc.network { + panic("network not suitable for downstream connection") + } + + historyName := dc.rawUsername + + var seqPtr *uint64 + if loadHistory { + net.lock.Lock() + seq, ok := net.history[historyName] + net.lock.Unlock() + if ok { + seqPtr = &seq + } + } + + consumer, ch := net.ring.NewConsumer(seqPtr) + go func() { + for { + var closed bool + select { + case <-ch: + uc := net.upstream() + if uc == nil { + dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) + break + } + dc.ringMessages <- ringMessage{consumer, uc} + case <-dc.closed: + closed = true + } + if closed { + break + } + } + + seq := consumer.Close() + + // TODO: need to take dc.network into account here + dc.user.lock.Lock() + lastDownstream := len(dc.user.downstreamConns) == 0 + dc.user.lock.Unlock() + + if lastDownstream { + net.lock.Lock() + net.history[historyName] = seq + net.lock.Unlock() + } + }() +} + func (dc *downstreamConn) runUntilRegistered() error { for !dc.registered { msg, err := dc.irc.ReadMessage() diff --git a/user.go b/user.go index b4e4060..dbb1b1f 100644 --- a/user.go +++ b/user.go @@ -198,14 +198,26 @@ func (u *user) removeDownstream(dc *downstreamConn) { } func (u *user) createNetwork(net *Network) (*network, error) { + if net.ID != 0 { + panic("tried creating an already-existing network") + } + network := newNetwork(u, net) err := u.srv.db.StoreNetwork(u.Username, &network.Network) if err != nil { return nil, err } + + u.forEachDownstream(func(dc *downstreamConn) { + if dc.network == nil { + dc.runNetwork(network, false) + } + }) + u.lock.Lock() u.networks = append(u.networks, network) u.lock.Unlock() + go network.run() return network, nil }