Skip to content

Commit

Permalink
fix: rewrote backup_check with proper caching, switched to @opensumi/…
Browse files Browse the repository at this point in the history
…reconnecting-websocket (as previous package no longer maintained), increased SQLite busy_timeout from 15s to 30s, added courtesy email to customers if they are attempting to use IMAP/POP3 but do not have IMAP enabled on the alias yet (once a week), added _disconnect fix per <pladaria/reconnecting-websocket#197> (otherwise exception is thrown and process restarts), fixed spam/junk/trash check, added analysis_limit=400 to speed up pragma optimize, fixed POP3 exception with writeStream, fixed SEARCH bug of SqliteError - too many SQL variables due to SQLITE_MAX_VARIABLE_NUMBER max of 999, fixed RangeError: Maximum call stack size exceeded with msgpackr and Error objects, fixed payload.id missing for backups, only check for backups during read operation such as GETQUOTAROOT on SQLite server only, sync tmp with db every time user attempts to fetch mail (once every 10s)
  • Loading branch information
titanism committed Jun 27, 2024
1 parent 1c8be9a commit 776d45d
Show file tree
Hide file tree
Showing 47 changed files with 583 additions and 460 deletions.
3 changes: 3 additions & 0 deletions app/controllers/web/my-account/download-alias-backup.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ async function downloadAliasBackup(ctx) {

// send backup request
if (isSANB(ctx.request.body.password)) {
// purge cache so we can run another backup
await ctx.client.del(`backup_check:${alias.id}`);

const wsp = createWebSocketAsPromised();
wsp
.request({
Expand Down
30 changes: 18 additions & 12 deletions app/controllers/web/my-account/generate-alias-password.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ async function generateAliasPassword(ctx) {
await alias.save();

// close websocket
try {
wsp.close();
} catch (err) {
ctx.logger.fatal(err);
if (wsp.isOpened) {
try {
wsp.close();
} catch (err) {
ctx.logger.fatal(err);
}
}
} else if (boolean(ctx.request.body.is_override)) {
// reset existing mailbox and create new mailbox
Expand Down Expand Up @@ -204,10 +206,12 @@ async function generateAliasPassword(ctx) {
await alias.save();

// close websocket
try {
wsp.close();
} catch (err) {
ctx.logger.fatal(err);
if (wsp.isOpened) {
try {
wsp.close();
} catch (err) {
ctx.logger.fatal(err);
}
}
} else {
// create new mailbox
Expand Down Expand Up @@ -258,10 +262,12 @@ async function generateAliasPassword(ctx) {
await alias.save();

// close websocket
try {
wsp.close();
} catch (err) {
ctx.logger.fatal(err);
if (wsp.isOpened) {
try {
wsp.close();
} catch (err) {
ctx.logger.fatal(err);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ The Primary is running on the data servers with the mounted volumes containing t
We accomplish two-way communication with [WebSockets](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket):

* Primary servers use an instance of [ws](https://github.com/websockets/ws)'s `WebSocketServer` server.
* Secondary servers use an instance of [ws](https://github.com/websockets/ws)'s `WebSocket` client that is wrapped with [websocket-as-promised](https://github.com/vitalets/websocket-as-promised) and [reconnecting-websocket](https://github.com/pladaria/reconnecting-websocket). These two wrappers ensure that the `WebSocket` reconnects and can send and receive data for specific database writes.
* Secondary servers use an instance of [ws](https://github.com/websockets/ws)'s `WebSocket` client that is wrapped with [websocket-as-promised](https://github.com/vitalets/websocket-as-promised) and [reconnecting-websocket](https://github.com/opensumi/reconnecting-websocket). These two wrappers ensure that the `WebSocket` reconnects and can send and receive data for specific database writes.

### Backups

Expand Down
2 changes: 1 addition & 1 deletion config/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ const config = {
removedEmailDomain: env.REMOVED_EMAIL_DOMAIN,

// SQLite busy_timeout value (how long we should wait for locking too)
busyTimeout: ms('15s'),
busyTimeout: ms('30s'),

// server
env: env.NODE_ENV.toLowerCase(),
Expand Down
4 changes: 4 additions & 0 deletions config/phrases.js
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ module.exports = {
'We tried to create a new account with this email address, but it already exists. Please log in with this email address if it belongs to you and then try again.',
LOGOUT_REQUIRED: 'Please log out to view the page you requested.',
ALIAS_DOES_NOT_EXIST: 'Alias does not exist on the domain.',
IMAP_NOT_ENABLED_SUBJECT:
'Enable IMAP to receive mail for <span class="notranslate">%s</span>',
IMAP_NOT_ENABLED_MESSAGE:
'Please <a target="_blank" rel="noopener noreferrer" class="font-weight-bold" href="%s">edit your alias</a> and enable IMAP to receive mail for <strong class="text-monospace notranslate">%s</strong>.',
IMAP_MAILBOX_MAX_EXCEEDED: 'Maximum number of mailboxes exceeded',
IMAP_MESSAGE_SIZE_EXCEEDED: 'Maximum message size exceeded',
IMAP_MAILBOX_INBOX_CANNOT_STORE_DRAFTS: 'Inbox cannot store draft messages',
Expand Down
42 changes: 39 additions & 3 deletions helpers/create-websocket-as-promised.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
const { randomUUID } = require('node:crypto');

// <https://github.com/pladaria/reconnecting-websocket/issues/195>
// const ReconnectingWebSocket = require('reconnecting-websocket');

const ReconnectingWebSocket = require('reconnecting-websocket');
const ReconnectingWebSocket = require('@opensumi/reconnecting-websocket');
const WebSocketAsPromised = require('websocket-as-promised');
const mongoose = require('mongoose');
const ms = require('ms');
Expand Down Expand Up @@ -37,6 +35,44 @@ ReconnectingWebSocket.prototype._debug = () => {
// logger.debug('reconnectingwebsocket', { args });
};

class Event {
constructor(type, target) {
this.target = target;
this.type = type;
}
}

class CloseEvent extends Event {
constructor(code = 1000, reason = '', target) {
super('close', target);
this.code = code;
this.reason = reason;
}
}

// <https://github.com/pladaria/reconnecting-websocket/issues/197>
ReconnectingWebSocket.prototype._disconnect = function (code, reason) {
if (code === undefined) {
code = 1000;
}

this._clearTimeouts();
if (!this._ws) {
return;
}

this._removeListeners();
try {
if (this._ws.readyState !== ReconnectingWebSocket.CONNECTING) {
this._ws.close(code, reason);
}

this._handleClose(new CloseEvent(code, reason, this));
} catch (err) {
logger.fatal(err);
}
};

// <https://github.com/pladaria/reconnecting-websocket/issues/138#issuecomment-698206018>
function createWebSocketClass(options) {
return class extends WebSocket {
Expand Down
133 changes: 71 additions & 62 deletions helpers/get-database.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,40 @@ const path = require('node:path');

// <https://github.com/knex/knex-schema-inspector/pull/146>
const Database = require('better-sqlite3-multiple-ciphers');
const dayjs = require('dayjs-with-plugins');
const isSANB = require('is-string-and-not-blank');
const mongoose = require('mongoose');
const ms = require('ms');
const pRetry = require('p-retry');
const pify = require('pify');
const { Builder } = require('json-sql');
const { boolean } = require('boolean');

const parseErr = require('parse-err');
const Aliases = require('#models/aliases');
const Calendars = require('#models/calendars');
const CalendarEvents = require('#models/calendar-events');
const Attachments = require('#models/attachments');
const CalendarEvents = require('#models/calendar-events');
const Calendars = require('#models/calendars');
const Mailboxes = require('#models/mailboxes');
const Messages = require('#models/messages');
const Threads = require('#models/threads');
const config = require('#config');
const env = require('#config/env');
const email = require('#helpers/email');
const env = require('#config/env');
const getPathToDatabase = require('#helpers/get-path-to-database');
const isTimeoutError = require('#helpers/is-timeout-error');
const isValidPassword = require('#helpers/is-valid-password');
const logger = require('#helpers/logger');
const migrateSchema = require('#helpers/migrate-schema');
const onExpunge = require('#helpers/imap/on-expunge');
const setupPragma = require('#helpers/setup-pragma');
const { acquireLock, releaseLock } = require('#helpers/lock');
const { decrypt } = require('#helpers/encrypt-decrypt');

const onExpungePromise = pify(onExpunge, { multiArgs: true });

const builder = new Builder();

const HOSTNAME = os.hostname();

const AFFIXES = ['-wal', '-shm'];
Expand Down Expand Up @@ -591,18 +599,18 @@ async function getDatabase(

let migrateCheck = false;
let folderCheck = false;
// let trashCheck = false;
let trashCheck = false;

if (instance.client) {
try {
const results = await instance.client.mget([
`migrate_check:${session.user.alias_id}`,
`folder_check:${session.user.alias_id}`
// `trash_check:${session.user.alias_id}`
`folder_check:${session.user.alias_id}`,
`trash_check:${session.user.alias_id}`
]);
migrateCheck = boolean(results[0]);
folderCheck = boolean(results[1]);
// trashCheck = boolean(results[2]);
trashCheck = boolean(results[2]);
} catch (err) {
logger.fatal(err);
}
Expand Down Expand Up @@ -739,49 +747,76 @@ async function getDatabase(
logger.fatal(err, { session });
}

// TODO: redo this so it sets `undeleted: 0` instead
// TODO: redo this so it sets `undeleted: 0` instead
// TODO: redo this so it sets `undeleted: 0` instead
// TODO: redo this so it sets `undeleted: 0` instead
// TODO: redo this so it sets `undeleted: 0` instead
// TODO: redo this so it sets `undeleted: 0` instead
// release lock
try {
if (lock) {
await releaseLock(instance, db, lock);
}
} catch (err) {
logger.debug(err, { alias, session });
}

//
// NOTE: we leave it up to the user to delete messages
// but note that on CLOSE we call EXPUNGE on the mailbox
//
// remove messages in Junk/Trash folder that are >= 30 days old
// (only do this once every day)
/*
// NOTE: we remove messages in Junk/Trash folder that are >= 30 days old
// (but we only do this once every day)
try {
if (!trashCheck) {
const mailboxes = await Mailboxes.find(instance, session, {
path: {
$in: ['Trash', 'Junk']
$in: ['Trash', 'Spam', 'Junk']
},
specialUse: {
$in: ['\\Trash', '\\Junk']
}
});
if (mailboxes.length === 0) {

if (mailboxes.length === 0)
throw new TypeError('Trash folder(s) do not exist');
}

// NOTE: this does not support `prepareQuery` so you will need to convert _id -> id
// (as we've done below by simply mapping and returning `id` vs `_id`)
await Messages.deleteMany(instance, session, {
$or: [
{
mailbox: {
$in: mailboxes.map((m) => m._id.toString())
const sql = builder.build({
type: 'update',
table: 'Messages',
condition: {
$or: [
{
mailbox: {
$in: mailboxes.map((m) => m._id.toString())
},
exp: 1,
rdate: {
$lte: Date.now()
}
},
exp: true,
rdate: {
$lte: Date.now()
{
mailbox: {
$in: mailboxes.map((m) => m._id.toString())
},
rdate: {
$lte: dayjs().subtract(30, 'days').toDate().getTime()
}
}
]
},
modifier: {
$set: {
undeleted: false
}
]
}
});

db.prepare(sql.query).run(sql.values);

await Promise.all(
mailboxes.map((mailbox) =>
onExpungePromise.call(
instance,
mailbox._id.toString(),
{ silent: true },
session
)
)
);

await instance.client.set(
`trash_check:${session.user.alias_id}`,
true,
Expand All @@ -792,37 +827,11 @@ async function getDatabase(
} catch (err) {
logger.fatal(err, { session });
}
*/

//
// TODO: delete orphaned attachments (those without messages that reference them)

// release lock
try {
if (lock) {
await releaseLock(instance, db, lock);
}
} catch (err) {
logger.debug(err, { alias, session });
}

// if alias db size was 0 then we should update it
/*
try {
const storageUsed = await Aliases.getStorageUsed({
domain: new mongoose.Types.ObjectId(session.user.domain_id)
});
if (storageUsed === 0) {
const size = await instance.wsp.request({
action: 'size',
timeout: ms('15s'),
alias_id: alias.id
});
logger.debug('updating size', { size, alias, session });
}
} catch (err) {
logger.fatal(err, { alias, session });
}
*/
// (note this is unlikely as we already take care of this in EXPUNGE)
//

return db;
} catch (err) {
Expand Down
7 changes: 2 additions & 5 deletions helpers/imap-notifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

const { EventEmitter } = require('node:events');

const Axe = require('axe');
const Database = require('better-sqlite3-multiple-ciphers');
const _ = require('lodash');
const ms = require('ms');
Expand All @@ -26,13 +25,10 @@ const IMAPError = require('#helpers/imap-error');
const Journals = require('#models/journals');
const Mailboxes = require('#models/mailboxes');
const config = require('#config');
const helperLogger = require('#helpers/logger');
const logger = require('#helpers/logger');
const i18n = require('#helpers/i18n');
const { acquireLock, releaseLock } = require('#helpers/lock');

const logger =
config.env === 'development' ? helperLogger : new Axe({ silent: true });

const builder = new Builder();

class IMAPNotifier extends EventEmitter {
Expand Down Expand Up @@ -376,6 +372,7 @@ class IMAPNotifier extends EventEmitter {
data?.session?.db?.close === 'function'
) {
try {
data.session.db.pragma('analysis_limit=400');
data.session.db.pragma('optimize');
data.session.db.close();
} catch (err) {
Expand Down
2 changes: 1 addition & 1 deletion helpers/imap/on-expunge.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async function onExpunge(mailboxId, update, session, fn) {
update
});

if (Array.isArray(writeStream)) {
if (session?.writeStream?.write && Array.isArray(writeStream)) {
for (const write of writeStream) {
if (Array.isArray(write)) {
session.writeStream.write(session.formatResponse(...write));
Expand Down
Loading

0 comments on commit 776d45d

Please sign in to comment.