Skip to content

Commit

Permalink
fix: fixed another sqlite-bree wsp issue
Browse files Browse the repository at this point in the history
  • Loading branch information
titanism committed May 7, 2024
1 parent cd5f699 commit 59984ce
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 147 deletions.
5 changes: 4 additions & 1 deletion helpers/get-database.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ async function getDatabase(

// instance must be either IMAP, POP3, SQLite, or CalDAV
if (
!['IMAP', 'POP3', 'SQLite', 'CalDAV'].includes(instance?.constructor?.name)
!['IMAP', 'POP3', 'SQLite', 'CalDAV'].includes(
instance?.constructor?.name
) &&
HOSTNAME !== env.SQLITE_HOST
)
throw new TypeError(
'Instance must be either IMAP, POP3, SQLite, or CalDAV'
Expand Down
302 changes: 156 additions & 146 deletions jobs/cleanup-sqlite.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ const _ = require('lodash');
const dayjs = require('dayjs-with-plugins');
const mongoose = require('mongoose');
const ms = require('ms');
const parseErr = require('parse-err');
const pEvent = require('p-event');
const pMap = require('p-map');
const parseErr = require('parse-err');
const prettyBytes = require('pretty-bytes');
const sharedConfig = require('@ladjs/shared-config');

Expand All @@ -35,6 +36,7 @@ const logger = require('#helpers/logger');
const setupMongoose = require('#helpers/setup-mongoose');
const wsp = require('#helpers/wsp-server');

const concurrency = os.cpus().length;
const breeSharedConfig = sharedConfig('BREE');
const client = new Redis(breeSharedConfig.redis, logger);
const subscriber = new Redis(breeSharedConfig.redis, logger);
Expand Down Expand Up @@ -75,7 +77,6 @@ const AFFIXES = ['-backup', '-backup-wal', '-backup-shm'];

const mountDir = config.env === 'production' ? '/mnt' : tmpdir;

// eslint-disable-next-line complexity
(async () => {
await setupMongoose(logger);

Expand Down Expand Up @@ -184,168 +185,177 @@ const mountDir = config.env === 'production' ? '/mnt' : tmpdir;
}

// now iterate through all ids and update their sizes and send (or unset) quota alerts
for (const id of ids) {
// ensure ID is hex string
if (!mongoose.isObjectIdOrHexString(id)) continue;

try {
//
// attempt to vacuum database
// (if and only if the user was logged in via IMAP)
// (this fetches the password in-memory real-time)
// (similar to when we write to tmp storage)
//
await pMap(
ids,
async (id) => {
logger.info('cleanup', id);

// ensure ID is hex string
if (!mongoose.isObjectIdOrHexString(id)) return;

try {
client.publish('sqlite_auth_request', id);
// eslint-disable-next-line no-await-in-loop
const [, response] = await pEvent(subscriber, 'message', {
filter(args) {
const [channel, data] = args;
if (channel !== 'sqlite_auth_response' || !data) return;
try {
const d = JSON.parse(data);
return d.id === id;
} catch {}
},
multiArgs: true,
timeout: ms('10s')
});
const user = JSON.parse(response);
if (typeof user.password !== 'string') {
const err = new TypeError('User payload did not have password');
err.user = user;
err.id = id;
throw err;
//
// attempt to vacuum database
// (if and only if the user was logged in via IMAP)
// (this fetches the password in-memory real-time)
// (similar to when we write to tmp storage)
//
try {
client.publish('sqlite_auth_request', id);

const [, response] = await pEvent(subscriber, 'message', {
filter(args) {
const [channel, data] = args;
if (channel !== 'sqlite_auth_response' || !data) return;
try {
const d = JSON.parse(data);
return d.id === id;
} catch {}
},
multiArgs: true,
timeout: ms('3s')
});

const user = JSON.parse(response);
if (typeof user.password !== 'string') {
const err = new TypeError('User payload did not have password');
err.user = user;
err.id = id;
throw err;
}

await wsp.request(
{
action: 'vacuum',
timeout: ms('5m'),
session: { user }
},
0
);
} catch (err) {
logger.error(err);
}

// eslint-disable-next-line no-await-in-loop
// update `storage_used` for given alias

await wsp.request(
{
action: 'vacuum',
timeout: ms('5m'),
session: { user }
action: 'size',
timeout: ms('15s'),
alias_id: id
},
0
);
} catch (err) {
logger.error(err);
}

// update `storage_used` for given alias
// eslint-disable-next-line no-await-in-loop
await wsp.request(
{
action: 'size',
timeout: ms('15s'),
alias_id: id
},
0
);

// get total storage used for an alias (includes across all relevant domains/aliases)
// eslint-disable-next-line no-await-in-loop
const alias = await Aliases.findOne({ id });
// get total storage used for an alias (includes across all relevant domains/aliases)

if (!alias) return;
const alias = await Aliases.findOne({ id });

// if the alias did not have imap or it was not enabled
// then we can return early since the check is not useful
if (!alias.has_imap || !alias.is_enabled) return;
if (!alias) return;

// eslint-disable-next-line no-await-in-loop
const [storageUsed, maxQuotaPerAlias] = await Promise.all([
Aliases.getStorageUsed(alias),
Domains.getMaxQuota(alias.domain)
]);

const percentageUsed = Math.round(
(storageUsed / maxQuotaPerAlias) * 100
);

// find closest threshold
let threshold;
for (const percentage of [50, 60, 70, 80, 90, 100]) {
if (percentageUsed >= percentage) threshold = percentage;
}
// if the alias did not have imap or it was not enabled
// then we can return early since the check is not useful
if (!alias.has_imap || !alias.is_enabled) return;

// return early if no threshold found
if (!threshold) return;

// if user already received threshold notification
// and the notification was sent within the past 7 days
// then we can return early
if (
typeof alias.storage_thresholds_sent_at === 'object' &&
alias.storage_thresholds_sent_at[threshold.toString()] &&
_.isDate(alias.storage_thresholds_sent_at[threshold.toString()]) &&
new Date(
alias.storage_thresholds_sent_at[threshold.toString()]
).getTime() >= dayjs().subtract(1, 'week').toDate().getTime()
)
return;

if (typeof alias.storage_thresholds_sent_at !== 'object')
alias.storage_thresholds_sent_at = {};
const [storageUsed, maxQuotaPerAlias] = await Promise.all([
Aliases.getStorageUsed(alias),
Domains.getMaxQuota(alias.domain)
]);

// eslint-disable-next-line no-await-in-loop
const domain = await Domains.findById(alias.domain);
const percentageUsed = Math.round(
(storageUsed / maxQuotaPerAlias) * 100
);

if (!domain) return;
// find closest threshold
let threshold;
for (const percentage of [50, 60, 70, 80, 90, 100]) {
if (percentageUsed >= percentage) threshold = percentage;
}

// get recipients and the majority favored locale
// eslint-disable-next-line no-await-in-loop
const { to, locale } = await Domains.getToAndMajorityLocaleByDomain(
domain
);

// send the email to the user with threshold notification
const subject =
config.views.locals.emoji('warning') +
' ' +
i18n.translate('STORAGE_THRESHOLD_SUBJECT', locale, percentageUsed);

const message = i18n.translate(
'STORAGE_THRESHOLD_MESSAGE',
locale,
percentageUsed,
prettyBytes(storageUsed),
prettyBytes(maxQuotaPerAlias),
`${config.urls.web}/${locale}/my-account/billing`
);
// return early if no threshold found
if (!threshold) return;

// eslint-disable-next-line no-await-in-loop
await emailHelper({
template: 'alert',
message: {
to,
bcc: config.email.message.from,
subject
},
locals: {
message,
locale
}
});
// if user already received threshold notification
// and the notification was sent within the past 7 days
// then we can return early
if (
typeof alias.storage_thresholds_sent_at === 'object' &&
alias.storage_thresholds_sent_at[threshold.toString()] &&
_.isDate(
alias.storage_thresholds_sent_at[threshold.toString()]
) &&
new Date(
alias.storage_thresholds_sent_at[threshold.toString()]
).getTime() >= dayjs().subtract(1, 'week').toDate().getTime()
)
return;

// mark when the email was successfully sent/queued
alias.storage_thresholds_sent_at[threshold.toString()] = new Date();
alias.markModified('storage_thresholds_sent_at');
// eslint-disable-next-line no-await-in-loop
await alias.save();
} catch (err) {
logger.error(err);
// commented out as a safeguard
// easy way to cleanup non-production environments tmpdir folders
// if (
// config.env !== 'production' &&
// err.message === 'Alias does not exist'
// ) {
// await fs.promises.unlink(
// path.join(mountDir, config.defaultStoragePath, `${id}.sqlite`)
// );
// }
}
}
if (typeof alias.storage_thresholds_sent_at !== 'object')
alias.storage_thresholds_sent_at = {};

const domain = await Domains.findById(alias.domain);

if (!domain) return;

// get recipients and the majority favored locale

const { to, locale } = await Domains.getToAndMajorityLocaleByDomain(
domain
);

// send the email to the user with threshold notification
const subject =
config.views.locals.emoji('warning') +
' ' +
i18n.translate(
'STORAGE_THRESHOLD_SUBJECT',
locale,
percentageUsed
);

const message = i18n.translate(
'STORAGE_THRESHOLD_MESSAGE',
locale,
percentageUsed,
prettyBytes(storageUsed),
prettyBytes(maxQuotaPerAlias),
`${config.urls.web}/${locale}/my-account/billing`
);

await emailHelper({
template: 'alert',
message: {
to,
bcc: config.email.message.from,
subject
},
locals: {
message,
locale
}
});

// mark when the email was successfully sent/queued
alias.storage_thresholds_sent_at[threshold.toString()] = new Date();
alias.markModified('storage_thresholds_sent_at');

await alias.save();
} catch (err) {
logger.error(err);
// commented out as a safeguard
// easy way to cleanup non-production environments tmpdir folders
// if (
// config.env !== 'production' &&
// err.message === 'Alias does not exist'
// ) {
// await fs.promises.unlink(
// path.join(mountDir, config.defaultStoragePath, `${id}.sqlite`)
// );
// }
}
},
{ concurrency }
);
}
} catch (err) {
await logger.error(err);
Expand Down

0 comments on commit 59984ce

Please sign in to comment.