messageStorage: convert to async
Message stores are more complicated that a sync "fire and forget" API allows for. For starters, non trivial stores (say sqlite) can fail during init and we want to be able to catch that. Second, we really need to be able to run migrations and such, which may block (and fail) the activation of the store. On the plus side, this pushes error handling to the caller rather than the stores, which is a good thing as that allows us to eventually push this to the client in the UI, rather than just logging it in the server on stdout
This commit is contained in:
parent
f068fd4290
commit
d62dd3e62d
@ -147,7 +147,7 @@ class Client {
|
||||
}
|
||||
|
||||
for (const messageStorage of client.messageStorage) {
|
||||
messageStorage.enable();
|
||||
messageStorage.enable().catch((e) => log.error(e));
|
||||
}
|
||||
}
|
||||
|
||||
@ -614,7 +614,7 @@ class Client {
|
||||
}
|
||||
|
||||
for (const messageStorage of this.messageStorage) {
|
||||
messageStorage.deleteChannel(target.network, target.chan);
|
||||
messageStorage.deleteChannel(target.network, target.chan).catch((e) => log.error(e));
|
||||
}
|
||||
}
|
||||
|
||||
@ -767,7 +767,7 @@ class Client {
|
||||
});
|
||||
|
||||
for (const messageStorage of this.messageStorage) {
|
||||
messageStorage.close();
|
||||
messageStorage.close().catch((e) => log.error(e));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ const Helper = {
|
||||
parseHostmask,
|
||||
compareHostmask,
|
||||
compareWithWildcard,
|
||||
catch_to_error,
|
||||
|
||||
password: {
|
||||
hash: passwordHash,
|
||||
@ -183,3 +184,17 @@ function compareWithWildcard(a: string, b: string) {
|
||||
const re = new RegExp(`^${user_regex}$`, "i"); // case insensitive
|
||||
return re.test(b);
|
||||
}
|
||||
|
||||
function catch_to_error(prefix: string, err: any): Error {
|
||||
let msg: string;
|
||||
|
||||
if (err instanceof Error) {
|
||||
msg = err.message;
|
||||
} else if (typeof err === "string") {
|
||||
msg = err;
|
||||
} else {
|
||||
msg = err.toString();
|
||||
}
|
||||
|
||||
return new Error(`${prefix}: ${msg}`);
|
||||
}
|
||||
|
@ -260,7 +260,7 @@ class Chan {
|
||||
}
|
||||
|
||||
for (const messageStorage of client.messageStorage) {
|
||||
messageStorage.index(target.network, targetChannel, msg);
|
||||
messageStorage.index(target.network, targetChannel, msg).catch((e) => log.error(e));
|
||||
}
|
||||
}
|
||||
loadMessages(client: Client, network: Network) {
|
||||
|
@ -2,11 +2,12 @@ import type {Database} from "sqlite3";
|
||||
|
||||
import log from "../../log";
|
||||
import path from "path";
|
||||
import fs from "fs";
|
||||
import fs from "fs/promises";
|
||||
import Config from "../../config";
|
||||
import Msg, {Message} from "../../models/msg";
|
||||
import Client from "../../client";
|
||||
import Chan, {Channel} from "../../models/chan";
|
||||
import Helper from "../../helper";
|
||||
import type {
|
||||
SearchResponse,
|
||||
SearchQuery,
|
||||
@ -47,26 +48,26 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
|
||||
this.isEnabled = false;
|
||||
}
|
||||
|
||||
enable() {
|
||||
async enable() {
|
||||
const logsPath = Config.getUserLogsPath();
|
||||
const sqlitePath = path.join(logsPath, `${this.client.name}.sqlite3`);
|
||||
|
||||
try {
|
||||
fs.mkdirSync(logsPath, {recursive: true});
|
||||
} catch (e: any) {
|
||||
log.error("Unable to create logs directory", String(e));
|
||||
|
||||
return;
|
||||
await fs.mkdir(logsPath, {recursive: true});
|
||||
} catch (e) {
|
||||
throw Helper.catch_to_error("Unable to create logs directory", e);
|
||||
}
|
||||
|
||||
this.isEnabled = true;
|
||||
|
||||
this.database = new sqlite3.Database(sqlitePath);
|
||||
|
||||
this.run_migrations().catch((err) => {
|
||||
log.error("Migration failed", String(err));
|
||||
try {
|
||||
await this.run_migrations();
|
||||
} catch (e) {
|
||||
this.isEnabled = false;
|
||||
});
|
||||
throw Helper.catch_to_error("Migration failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
async run_migrations() {
|
||||
@ -106,25 +107,26 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
|
||||
]);
|
||||
}
|
||||
|
||||
close(callback?: (error?: Error | null) => void) {
|
||||
async close() {
|
||||
if (!this.isEnabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.isEnabled = false;
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
this.database.close((err) => {
|
||||
if (err) {
|
||||
log.error(`Failed to close sqlite database: ${err.message}`);
|
||||
reject(`Failed to close sqlite database: ${err.message}`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (callback) {
|
||||
callback(err);
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
index(network: Network, channel: Chan, msg: Msg) {
|
||||
async index(network: Network, channel: Chan, msg: Msg) {
|
||||
if (!this.isEnabled) {
|
||||
return;
|
||||
}
|
||||
@ -140,26 +142,27 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
|
||||
return newMsg;
|
||||
}, {});
|
||||
|
||||
this.run(
|
||||
await this.serialize_run(
|
||||
"INSERT INTO messages(network, channel, time, type, msg) VALUES(?, ?, ?, ?, ?)",
|
||||
[
|
||||
network.uuid,
|
||||
channel.name.toLowerCase(),
|
||||
msg.time.getTime(),
|
||||
msg.type,
|
||||
JSON.stringify(clonedMsg)
|
||||
JSON.stringify(clonedMsg),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
deleteChannel(network: Network, channel: Channel) {
|
||||
async deleteChannel(network: Network, channel: Channel) {
|
||||
if (!this.isEnabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.run(
|
||||
"DELETE FROM messages WHERE network = ? AND channel = ?",
|
||||
await this.serialize_run("DELETE FROM messages WHERE network = ? AND channel = ?", [
|
||||
network.uuid,
|
||||
channel.name.toLowerCase()
|
||||
);
|
||||
channel.name.toLowerCase(),
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -170,7 +173,7 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
|
||||
*/
|
||||
async getMessages(network: Network, channel: Channel): Promise<Message[]> {
|
||||
if (!this.isEnabled || Config.values.maxHistory === 0) {
|
||||
return Promise.resolve([]);
|
||||
return [];
|
||||
}
|
||||
|
||||
// If unlimited history is specified, load 100k messages
|
||||
@ -183,7 +186,7 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
|
||||
limit
|
||||
);
|
||||
|
||||
return rows.reverse().map((row: any) => {
|
||||
return rows.reverse().map((row: any): Message => {
|
||||
const msg = JSON.parse(row.msg);
|
||||
msg.time = row.time;
|
||||
msg.type = row.type;
|
||||
@ -192,7 +195,7 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
|
||||
newMsg.id = this.client.idMsg++;
|
||||
|
||||
return newMsg;
|
||||
}) as Message[];
|
||||
});
|
||||
}
|
||||
|
||||
async search(query: SearchQuery): Promise<SearchResponse> {
|
||||
@ -243,17 +246,10 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
|
||||
return this.isEnabled;
|
||||
}
|
||||
|
||||
private run(stmt: string, ...params: any[]) {
|
||||
this.serialize_run(stmt, params).catch((err) =>
|
||||
log.error(`failed to run ${stmt}`, String(err))
|
||||
);
|
||||
}
|
||||
|
||||
private serialize_run(stmt: string, params: any[]): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.database.serialize(() => {
|
||||
this.database.run(stmt, params, (err) => {
|
||||
|
||||
if (err) {
|
||||
reject(err);
|
||||
return;
|
||||
@ -265,7 +261,7 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
|
||||
});
|
||||
}
|
||||
|
||||
private serialize_fetchall(stmt: string, ...params: any[]): Promise<any> {
|
||||
private serialize_fetchall(stmt: string, ...params: any[]): Promise<any[]> {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.database.serialize(() => {
|
||||
this.database.all(stmt, params, (err, rows) => {
|
||||
@ -281,12 +277,9 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
|
||||
}
|
||||
|
||||
private serialize_get(stmt: string, ...params: any[]): Promise<any> {
|
||||
const log_id = this.stmt_id();
|
||||
return new Promise((resolve, reject) => {
|
||||
this.database.serialize(() => {
|
||||
this.database.get(stmt, params, (err, row) => {
|
||||
log.debug(log_id, "callback", stmt);
|
||||
|
||||
if (err) {
|
||||
reject(err);
|
||||
return;
|
||||
|
@ -1,9 +1,8 @@
|
||||
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
||||
import fs from "fs";
|
||||
import fs from "fs/promises";
|
||||
import path from "path";
|
||||
import filenamify from "filenamify";
|
||||
|
||||
import log from "../../log";
|
||||
import Config from "../../config";
|
||||
import {MessageStorage} from "./types";
|
||||
import Client from "../../client";
|
||||
@ -20,19 +19,17 @@ class TextFileMessageStorage implements MessageStorage {
|
||||
this.isEnabled = false;
|
||||
}
|
||||
|
||||
enable() {
|
||||
// eslint-disable-next-line @typescript-eslint/require-await
|
||||
async enable() {
|
||||
this.isEnabled = true;
|
||||
}
|
||||
|
||||
close(callback: () => void) {
|
||||
// eslint-disable-next-line @typescript-eslint/require-await
|
||||
async close() {
|
||||
this.isEnabled = false;
|
||||
|
||||
if (callback) {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
index(network: Network, channel: Channel, msg: Message) {
|
||||
async index(network: Network, channel: Channel, msg: Message) {
|
||||
if (!this.isEnabled) {
|
||||
return;
|
||||
}
|
||||
@ -44,10 +41,9 @@ class TextFileMessageStorage implements MessageStorage {
|
||||
);
|
||||
|
||||
try {
|
||||
fs.mkdirSync(logPath, {recursive: true});
|
||||
} catch (e: any) {
|
||||
log.error("Unable to create logs directory", String(e));
|
||||
return;
|
||||
await fs.mkdir(logPath, {recursive: true});
|
||||
} catch (e) {
|
||||
throw new Error(`Unable to create logs directory: ${e}`);
|
||||
}
|
||||
|
||||
let line = `[${msg.time.toISOString()}] `;
|
||||
@ -106,35 +102,18 @@ class TextFileMessageStorage implements MessageStorage {
|
||||
|
||||
line += "\n";
|
||||
|
||||
fs.appendFile(
|
||||
try {
|
||||
await fs.appendFile(
|
||||
path.join(logPath, TextFileMessageStorage.getChannelFileName(channel)),
|
||||
line,
|
||||
(e) => {
|
||||
if (e) {
|
||||
log.error("Failed to write user log", e.message);
|
||||
}
|
||||
}
|
||||
line
|
||||
);
|
||||
} catch (e) {
|
||||
throw new Error(`Failed to write user log: ${e}`);
|
||||
}
|
||||
}
|
||||
|
||||
deleteChannel() {
|
||||
/* TODO: Truncating text logs is disabled, until we figure out some UI for it
|
||||
if (!this.isEnabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
const logPath = path.join(
|
||||
Config.getUserLogsPath(),
|
||||
this.client.name,
|
||||
TextFileMessageStorage.getNetworkFolderName(network),
|
||||
TextFileMessageStorage.getChannelFileName(channel)
|
||||
);
|
||||
|
||||
fs.truncate(logPath, 0, (e) => {
|
||||
if (e) {
|
||||
log.error("Failed to truncate user log", e);
|
||||
}
|
||||
});*/
|
||||
async deleteChannel() {
|
||||
// Not implemented for text log files
|
||||
}
|
||||
|
||||
getMessages() {
|
||||
|
13
server/plugins/messageStorage/types.d.ts
vendored
13
server/plugins/messageStorage/types.d.ts
vendored
@ -9,13 +9,13 @@ interface MessageStorage {
|
||||
client: Client;
|
||||
isEnabled: boolean;
|
||||
|
||||
enable(): void;
|
||||
enable(): Promise<void>;
|
||||
|
||||
close(callback?: () => void): void;
|
||||
close(): Promise<void>;
|
||||
|
||||
index(network: Network, channel: Channel, msg: Message): void;
|
||||
index(network: Network, channel: Channel, msg: Message): Promise<void>;
|
||||
|
||||
deleteChannel(network: Network, channel: Channel);
|
||||
deleteChannel(network: Network, channel: Channel): Promise<void>;
|
||||
|
||||
getMessages(network: Network, channel: Channel): Promise<Message[]>;
|
||||
|
||||
@ -30,12 +30,11 @@ export type SearchQuery = {
|
||||
};
|
||||
|
||||
export type SearchResponse =
|
||||
| (Omit<SearchQuery, "channelName" | "offset"> & {
|
||||
| Omit<SearchQuery, "channelName" | "offset"> & {
|
||||
results: Message[];
|
||||
target: string;
|
||||
offset: number;
|
||||
})
|
||||
| [];
|
||||
};
|
||||
|
||||
type SearchFunction = (query: SearchQuery) => Promise<SearchResponse>;
|
||||
|
||||
|
@ -37,18 +37,16 @@ describe("SQLite Message Storage", function () {
|
||||
fs.rmdir(path.join(Config.getHomePath(), "logs"), done);
|
||||
});
|
||||
|
||||
it("should resolve an empty array when disabled", function () {
|
||||
return store.getMessages(null as any, null as any).then((messages) => {
|
||||
it("should resolve an empty array when disabled", async function () {
|
||||
const messages = await store.getMessages(null as any, null as any);
|
||||
expect(messages).to.be.empty;
|
||||
});
|
||||
});
|
||||
|
||||
it("should create database file", function () {
|
||||
it("should create database file", async function () {
|
||||
expect(store.isEnabled).to.be.false;
|
||||
expect(fs.existsSync(expectedPath)).to.be.false;
|
||||
|
||||
store.enable();
|
||||
|
||||
await store.enable();
|
||||
expect(store.isEnabled).to.be.true;
|
||||
});
|
||||
|
||||
@ -90,8 +88,8 @@ describe("SQLite Message Storage", function () {
|
||||
);
|
||||
});
|
||||
|
||||
it("should store a message", function () {
|
||||
store.index(
|
||||
it("should store a message", async function () {
|
||||
await store.index(
|
||||
{
|
||||
uuid: "this-is-a-network-guid",
|
||||
} as any,
|
||||
@ -105,35 +103,30 @@ describe("SQLite Message Storage", function () {
|
||||
);
|
||||
});
|
||||
|
||||
it("should retrieve previously stored message", function () {
|
||||
return store
|
||||
.getMessages(
|
||||
it("should retrieve previously stored message", async function () {
|
||||
const messages = await store.getMessages(
|
||||
{
|
||||
uuid: "this-is-a-network-guid",
|
||||
} as any,
|
||||
{
|
||||
name: "#thisisaCHANNEL",
|
||||
} as any
|
||||
)
|
||||
.then((messages) => {
|
||||
);
|
||||
expect(messages).to.have.lengthOf(1);
|
||||
|
||||
const msg = messages[0];
|
||||
|
||||
expect(msg.text).to.equal("Hello from sqlite world!");
|
||||
expect(msg.type).to.equal(MessageType.MESSAGE);
|
||||
expect(msg.time.getTime()).to.equal(123456789);
|
||||
});
|
||||
});
|
||||
|
||||
it("should retrieve latest LIMIT messages in order", function () {
|
||||
it("should retrieve latest LIMIT messages in order", async function () {
|
||||
const originalMaxHistory = Config.values.maxHistory;
|
||||
|
||||
try {
|
||||
Config.values.maxHistory = 2;
|
||||
|
||||
for (let i = 0; i < 200; ++i) {
|
||||
store.index(
|
||||
await store.index(
|
||||
{uuid: "retrieval-order-test-network"} as any,
|
||||
{name: "#channel"} as any,
|
||||
new Msg({
|
||||
@ -143,60 +136,47 @@ describe("SQLite Message Storage", function () {
|
||||
);
|
||||
}
|
||||
|
||||
return store
|
||||
.getMessages(
|
||||
const messages = await store.getMessages(
|
||||
{uuid: "retrieval-order-test-network"} as any,
|
||||
{name: "#channel"} as any
|
||||
)
|
||||
.then((messages) => {
|
||||
);
|
||||
expect(messages).to.have.lengthOf(2);
|
||||
expect(messages.map((i) => i.text)).to.deep.equal(["msg 198", "msg 199"]);
|
||||
});
|
||||
expect(messages.map((i_1) => i_1.text)).to.deep.equal(["msg 198", "msg 199"]);
|
||||
} finally {
|
||||
Config.values.maxHistory = originalMaxHistory;
|
||||
}
|
||||
});
|
||||
|
||||
it("should search messages", function () {
|
||||
it("should search messages", async function () {
|
||||
const originalMaxHistory = Config.values.maxHistory;
|
||||
|
||||
try {
|
||||
Config.values.maxHistory = 2;
|
||||
|
||||
return store
|
||||
.search({
|
||||
const search = await store.search({
|
||||
searchTerm: "msg",
|
||||
networkUuid: "retrieval-order-test-network",
|
||||
} as any)
|
||||
.then((messages) => {
|
||||
// @ts-expect-error Property 'results' does not exist on type '[]'.
|
||||
expect(messages.results).to.have.lengthOf(100);
|
||||
|
||||
} as any);
|
||||
expect(search.results).to.have.lengthOf(100);
|
||||
const expectedMessages: string[] = [];
|
||||
|
||||
for (let i = 100; i < 200; ++i) {
|
||||
expectedMessages.push(`msg ${i}`);
|
||||
}
|
||||
|
||||
// @ts-expect-error Property 'results' does not exist on type '[]'.
|
||||
expect(messages.results.map((i) => i.text)).to.deep.equal(expectedMessages);
|
||||
});
|
||||
expect(search.results.map((i_1) => i_1.text)).to.deep.equal(expectedMessages);
|
||||
} finally {
|
||||
Config.values.maxHistory = originalMaxHistory;
|
||||
}
|
||||
});
|
||||
|
||||
it("should search messages with escaped wildcards", function () {
|
||||
function assertResults(query, expected) {
|
||||
return store
|
||||
.search({
|
||||
it("should search messages with escaped wildcards", async function () {
|
||||
async function assertResults(query: string, expected: string[]) {
|
||||
const search = await store.search({
|
||||
searchTerm: query,
|
||||
networkUuid: "this-is-a-network-guid2",
|
||||
} as any)
|
||||
.then((messages) => {
|
||||
// @ts-expect-error Property 'results' does not exist on type '[]'.
|
||||
expect(messages.results.map((i) => i.text)).to.deep.equal(expected);
|
||||
});
|
||||
} as any);
|
||||
expect(search.results.map((i) => i.text)).to.deep.equal(expected);
|
||||
}
|
||||
|
||||
const originalMaxHistory = Config.values.maxHistory;
|
||||
@ -204,7 +184,7 @@ describe("SQLite Message Storage", function () {
|
||||
try {
|
||||
Config.values.maxHistory = 3;
|
||||
|
||||
store.index(
|
||||
await store.index(
|
||||
{uuid: "this-is-a-network-guid2"} as any,
|
||||
{name: "#channel"} as any,
|
||||
new Msg({
|
||||
@ -213,7 +193,7 @@ describe("SQLite Message Storage", function () {
|
||||
} as any)
|
||||
);
|
||||
|
||||
store.index(
|
||||
await store.index(
|
||||
{uuid: "this-is-a-network-guid2"} as any,
|
||||
{name: "#channel"} as any,
|
||||
new Msg({
|
||||
@ -222,7 +202,7 @@ describe("SQLite Message Storage", function () {
|
||||
} as any)
|
||||
);
|
||||
|
||||
store.index(
|
||||
await store.index(
|
||||
{uuid: "this-is-a-network-guid2"} as any,
|
||||
{name: "#channel"} as any,
|
||||
new Msg({
|
||||
@ -231,32 +211,21 @@ describe("SQLite Message Storage", function () {
|
||||
} as any)
|
||||
);
|
||||
|
||||
return (
|
||||
store
|
||||
.getMessages(
|
||||
{uuid: "this-is-a-network-guid2"} as any,
|
||||
{name: "#channel"} as any
|
||||
)
|
||||
// .getMessages() waits for store.index() transactions to commit
|
||||
.then(() => assertResults("foo", ["foo % bar _ baz", "foo bar x baz"]))
|
||||
.then(() => assertResults("%", ["foo % bar _ baz"]))
|
||||
.then(() => assertResults("foo % bar ", ["foo % bar _ baz"]))
|
||||
.then(() => assertResults("_", ["foo % bar _ baz"]))
|
||||
.then(() => assertResults("bar _ baz", ["foo % bar _ baz"]))
|
||||
.then(() => assertResults("%%", []))
|
||||
.then(() => assertResults("@%", []))
|
||||
.then(() => assertResults("@", ["bar @ baz"]))
|
||||
);
|
||||
await assertResults("foo", ["foo % bar _ baz", "foo bar x baz"]);
|
||||
await assertResults("%", ["foo % bar _ baz"]);
|
||||
await assertResults("foo % bar ", ["foo % bar _ baz"]);
|
||||
await assertResults("_", ["foo % bar _ baz"]);
|
||||
await assertResults("bar _ baz", ["foo % bar _ baz"]);
|
||||
await assertResults("%%", []);
|
||||
await assertResults("@%", []);
|
||||
await assertResults("@", ["bar @ baz"]);
|
||||
} finally {
|
||||
Config.values.maxHistory = originalMaxHistory;
|
||||
}
|
||||
});
|
||||
|
||||
it("should close database", function (done) {
|
||||
store.close((err) => {
|
||||
expect(err).to.be.null;
|
||||
it("should close database", async function () {
|
||||
await store.close();
|
||||
expect(fs.existsSync(expectedPath)).to.be.true;
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user