Store last internal msg ID in DB when detaching
References: https://todo.sr.ht/~emersion/soju/98
This commit is contained in:
parent
45e2c0023a
commit
424f676254
21
db.go
21
db.go
@ -112,7 +112,9 @@ type Channel struct {
|
|||||||
ID int64
|
ID int64
|
||||||
Name string
|
Name string
|
||||||
Key string
|
Key string
|
||||||
|
|
||||||
Detached bool
|
Detached bool
|
||||||
|
DetachedInternalMsgID string
|
||||||
|
|
||||||
RelayDetached MessageFilter
|
RelayDetached MessageFilter
|
||||||
ReattachOn MessageFilter
|
ReattachOn MessageFilter
|
||||||
@ -161,6 +163,7 @@ CREATE TABLE Channel (
|
|||||||
name VARCHAR(255) NOT NULL,
|
name VARCHAR(255) NOT NULL,
|
||||||
key VARCHAR(255),
|
key VARCHAR(255),
|
||||||
detached INTEGER NOT NULL DEFAULT 0,
|
detached INTEGER NOT NULL DEFAULT 0,
|
||||||
|
detached_internal_msgid VARCHAR(255),
|
||||||
relay_detached INTEGER NOT NULL DEFAULT 0,
|
relay_detached INTEGER NOT NULL DEFAULT 0,
|
||||||
reattach_on INTEGER NOT NULL DEFAULT 0,
|
reattach_on INTEGER NOT NULL DEFAULT 0,
|
||||||
detach_after INTEGER NOT NULL DEFAULT 0,
|
detach_after INTEGER NOT NULL DEFAULT 0,
|
||||||
@ -245,6 +248,7 @@ var migrations = []string{
|
|||||||
UNIQUE(network, target, client)
|
UNIQUE(network, target, client)
|
||||||
);
|
);
|
||||||
`,
|
`,
|
||||||
|
"ALTER TABLE Channel ADD COLUMN detached_internal_msgid VARCHAR(255)",
|
||||||
}
|
}
|
||||||
|
|
||||||
type DB struct {
|
type DB struct {
|
||||||
@ -546,7 +550,9 @@ func (db *DB) ListChannels(networkID int64) ([]Channel, error) {
|
|||||||
db.lock.RLock()
|
db.lock.RLock()
|
||||||
defer db.lock.RUnlock()
|
defer db.lock.RUnlock()
|
||||||
|
|
||||||
rows, err := db.db.Query(`SELECT id, name, key, detached, relay_detached, reattach_on, detach_after, detach_on
|
rows, err := db.db.Query(`SELECT
|
||||||
|
id, name, key, detached, detached_internal_msgid,
|
||||||
|
relay_detached, reattach_on, detach_after, detach_on
|
||||||
FROM Channel
|
FROM Channel
|
||||||
WHERE network = ?`, networkID)
|
WHERE network = ?`, networkID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -557,12 +563,13 @@ func (db *DB) ListChannels(networkID int64) ([]Channel, error) {
|
|||||||
var channels []Channel
|
var channels []Channel
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var ch Channel
|
var ch Channel
|
||||||
var key sql.NullString
|
var key, detachedInternalMsgID sql.NullString
|
||||||
var detachAfter int64
|
var detachAfter int64
|
||||||
if err := rows.Scan(&ch.ID, &ch.Name, &key, &ch.Detached, &ch.RelayDetached, &ch.ReattachOn, &detachAfter, &ch.DetachOn); err != nil {
|
if err := rows.Scan(&ch.ID, &ch.Name, &key, &ch.Detached, &detachedInternalMsgID, &ch.RelayDetached, &ch.ReattachOn, &detachAfter, &ch.DetachOn); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ch.Key = key.String
|
ch.Key = key.String
|
||||||
|
ch.DetachedInternalMsgID = detachedInternalMsgID.String
|
||||||
ch.DetachAfter = time.Duration(detachAfter) * time.Second
|
ch.DetachAfter = time.Duration(detachAfter) * time.Second
|
||||||
channels = append(channels, ch)
|
channels = append(channels, ch)
|
||||||
}
|
}
|
||||||
@ -583,14 +590,14 @@ func (db *DB) StoreChannel(networkID int64, ch *Channel) error {
|
|||||||
var err error
|
var err error
|
||||||
if ch.ID != 0 {
|
if ch.ID != 0 {
|
||||||
_, err = db.db.Exec(`UPDATE Channel
|
_, err = db.db.Exec(`UPDATE Channel
|
||||||
SET network = ?, name = ?, key = ?, detached = ?, relay_detached = ?, reattach_on = ?, detach_after = ?, detach_on = ?
|
SET network = ?, name = ?, key = ?, detached = ?, detached_internal_msgid = ?, relay_detached = ?, reattach_on = ?, detach_after = ?, detach_on = ?
|
||||||
WHERE id = ?`,
|
WHERE id = ?`,
|
||||||
networkID, ch.Name, key, ch.Detached, ch.RelayDetached, ch.ReattachOn, detachAfter, ch.DetachOn, ch.ID)
|
networkID, ch.Name, key, ch.Detached, toNullString(ch.DetachedInternalMsgID), ch.RelayDetached, ch.ReattachOn, detachAfter, ch.DetachOn, ch.ID)
|
||||||
} else {
|
} else {
|
||||||
var res sql.Result
|
var res sql.Result
|
||||||
res, err = db.db.Exec(`INSERT INTO Channel(network, name, key, detached, relay_detached, reattach_on, detach_after, detach_on)
|
res, err = db.db.Exec(`INSERT INTO Channel(network, name, key, detached, detached_internal_msgid, relay_detached, reattach_on, detach_after, detach_on)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||||
networkID, ch.Name, key, ch.Detached, ch.RelayDetached, ch.ReattachOn, detachAfter, ch.DetachOn)
|
networkID, ch.Name, key, ch.Detached, toNullString(ch.DetachedInternalMsgID), ch.RelayDetached, ch.ReattachOn, detachAfter, ch.DetachOn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
26
user.go
26
user.go
@ -232,9 +232,20 @@ func (net *network) detach(ch *Channel) {
|
|||||||
if ch.Detached {
|
if ch.Detached {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ch.Detached = true
|
|
||||||
net.user.logger.Printf("network %q: detaching channel %q", net.GetName(), ch.Name)
|
net.user.logger.Printf("network %q: detaching channel %q", net.GetName(), ch.Name)
|
||||||
|
|
||||||
|
ch.Detached = true
|
||||||
|
|
||||||
|
if net.user.msgStore != nil {
|
||||||
|
nameCM := net.casemap(ch.Name)
|
||||||
|
lastID, err := net.user.msgStore.LastMsgID(net, nameCM, time.Now())
|
||||||
|
if err != nil {
|
||||||
|
net.user.logger.Printf("failed to get last message ID for channel %q: %v", ch.Name, err)
|
||||||
|
}
|
||||||
|
ch.DetachedInternalMsgID = lastID
|
||||||
|
}
|
||||||
|
|
||||||
if net.conn != nil {
|
if net.conn != nil {
|
||||||
uch := net.conn.channels.Value(ch.Name)
|
uch := net.conn.channels.Value(ch.Name)
|
||||||
if uch != nil {
|
if uch != nil {
|
||||||
@ -255,9 +266,13 @@ func (net *network) attach(ch *Channel) {
|
|||||||
if !ch.Detached {
|
if !ch.Detached {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ch.Detached = false
|
|
||||||
net.user.logger.Printf("network %q: attaching channel %q", net.GetName(), ch.Name)
|
net.user.logger.Printf("network %q: attaching channel %q", net.GetName(), ch.Name)
|
||||||
|
|
||||||
|
detachedMsgID := ch.DetachedInternalMsgID
|
||||||
|
ch.Detached = false
|
||||||
|
ch.DetachedInternalMsgID = ""
|
||||||
|
|
||||||
var uch *upstreamChannel
|
var uch *upstreamChannel
|
||||||
if net.conn != nil {
|
if net.conn != nil {
|
||||||
uch = net.conn.channels.Value(ch.Name)
|
uch = net.conn.channels.Value(ch.Name)
|
||||||
@ -276,12 +291,9 @@ func (net *network) attach(ch *Channel) {
|
|||||||
forwardChannel(dc, uch)
|
forwardChannel(dc, uch)
|
||||||
}
|
}
|
||||||
|
|
||||||
lastDelivered := net.delivered.LoadID(ch.Name, dc.clientName)
|
if detachedMsgID != "" {
|
||||||
if lastDelivered == "" {
|
dc.sendTargetBacklog(net, ch.Name, detachedMsgID)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dc.sendTargetBacklog(net, ch.Name, lastDelivered)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user