diff --git a/@xen-orchestra/backups/_runners/_vmRunners/FullRemote.mjs b/@xen-orchestra/backups/_runners/_vmRunners/FullRemote.mjs index c24e04540ee..4ac4a178fea 100644 --- a/@xen-orchestra/backups/_runners/_vmRunners/FullRemote.mjs +++ b/@xen-orchestra/backups/_runners/_vmRunners/FullRemote.mjs @@ -2,39 +2,39 @@ import { AbstractRemote } from './_AbstractRemote.mjs' import { FullRemoteWriter } from '../_writers/FullRemoteWriter.mjs' import { forkStreamUnpipe } from '../_forkStreamUnpipe.mjs' import { watchStreamSize } from '../../_watchStreamSize.mjs' -import { Task } from '../../Task.mjs' export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote { _getRemoteWriter() { return FullRemoteWriter } + + _filterTransferList(transferList) { + return transferList.filter(this._filterPredicate) + } + async _run() { const transferList = await this._computeTransferList(({ mode }) => mode === 'full') - if (transferList.length > 0) { - for (const metadata of transferList) { - const stream = await this._sourceRemoteAdapter.readFullVmBackup(metadata) - const sizeContainer = watchStreamSize(stream) + for (const metadata of transferList) { + const stream = await this._sourceRemoteAdapter.readFullVmBackup(metadata) + const sizeContainer = watchStreamSize(stream) - // @todo shouldn't transfer backup if it will be deleted by retention policy (higher retention on source than destination) - await this._callWriters( - writer => - writer.run({ - stream: forkStreamUnpipe(stream), - // stream will be forked and transformed, it's not safe to attach additionnal properties to it - streamLength: stream.length, - timestamp: metadata.timestamp, - vm: metadata.vm, - vmSnapshot: metadata.vmSnapshot, - sizeContainer, - }), - 'writer.run()' - ) - // for healthcheck - this._tags = metadata.vm.tags - } - } else { - Task.info('No new data to upload for this VM') + // @todo shouldn't transfer backup if it will be deleted by retention policy (higher retention on source than destination) + await this._callWriters( + writer => + writer.run({ + stream: forkStreamUnpipe(stream), + // stream will be forked and transformed, it's not safe to attach additionnal properties to it + streamLength: stream.length, + timestamp: metadata.timestamp, + vm: metadata.vm, + vmSnapshot: metadata.vmSnapshot, + sizeContainer, + }), + 'writer.run()' + ) + // for healthcheck + this._tags = metadata.vm.tags } } } diff --git a/@xen-orchestra/backups/_runners/_vmRunners/IncrementalRemote.mjs b/@xen-orchestra/backups/_runners/_vmRunners/IncrementalRemote.mjs index 754f52379bb..70d8c0f7819 100644 --- a/@xen-orchestra/backups/_runners/_vmRunners/IncrementalRemote.mjs +++ b/@xen-orchestra/backups/_runners/_vmRunners/IncrementalRemote.mjs @@ -7,7 +7,6 @@ import mapValues from 'lodash/mapValues.js' import { AbstractRemote } from './_AbstractRemote.mjs' import { forkDeltaExport } from './_forkDeltaExport.mjs' import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs' -import { Task } from '../../Task.mjs' import { Disposable } from 'promise-toolbox' import { openVhd } from 'vhd-lib' import { getVmBackupDir } from '../../_getVmBackupDir.mjs' @@ -16,6 +15,16 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote { _getRemoteWriter() { return IncrementalRemoteWriter } + + // we'll transfer the full list if at least one backup should be transfered + // to ensure we don't cut the delta chain + _filterTransferList(transferList) { + if (transferList.some(vmBackupMetadata => this._filterPredicate(vmBackupMetadata))) { + return transferList + } + return [] + } + async _selectBaseVm(metadata) { // for each disk , get the parent const baseUuidToSrcVdi = new Map() @@ -53,50 +62,46 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote { async _run() { const transferList = await this._computeTransferList(({ mode }) => mode === 'delta') - if (transferList.length > 0) { - for (const metadata of transferList) { - assert.strictEqual(metadata.mode, 'delta') - await this._selectBaseVm(metadata) - await this._callWriters(writer => writer.prepare({ isBase: metadata.isBase }), 'writer.prepare()') - const incrementalExport = await this._sourceRemoteAdapter.readIncrementalVmBackup(metadata, undefined, { - useChain: false, - }) + for (const metadata of transferList) { + assert.strictEqual(metadata.mode, 'delta') + await this._selectBaseVm(metadata) + await this._callWriters(writer => writer.prepare({ isBase: metadata.isBase }), 'writer.prepare()') + const incrementalExport = await this._sourceRemoteAdapter.readIncrementalVmBackup(metadata, undefined, { + useChain: false, + }) - const isVhdDifferencing = {} + const isVhdDifferencing = {} - await asyncEach(Object.entries(incrementalExport.streams), async ([key, stream]) => { - isVhdDifferencing[key] = await isVhdDifferencingDisk(stream) - }) + await asyncEach(Object.entries(incrementalExport.streams), async ([key, stream]) => { + isVhdDifferencing[key] = await isVhdDifferencingDisk(stream) + }) - incrementalExport.streams = mapValues(incrementalExport.streams, this._throttleStream) - await this._callWriters( - writer => - writer.transfer({ - deltaExport: forkDeltaExport(incrementalExport), - isVhdDifferencing, - timestamp: metadata.timestamp, - vm: metadata.vm, - vmSnapshot: metadata.vmSnapshot, - }), - 'writer.transfer()' - ) - // this will update parent name with the needed alias - await this._callWriters( - writer => - writer.updateUuidAndChain({ - isVhdDifferencing, - timestamp: metadata.timestamp, - vdis: incrementalExport.vdis, - }), - 'writer.updateUuidAndChain()' - ) + incrementalExport.streams = mapValues(incrementalExport.streams, this._throttleStream) + await this._callWriters( + writer => + writer.transfer({ + deltaExport: forkDeltaExport(incrementalExport), + isVhdDifferencing, + timestamp: metadata.timestamp, + vm: metadata.vm, + vmSnapshot: metadata.vmSnapshot, + }), + 'writer.transfer()' + ) + // this will update parent name with the needed alias + await this._callWriters( + writer => + writer.updateUuidAndChain({ + isVhdDifferencing, + timestamp: metadata.timestamp, + vdis: incrementalExport.vdis, + }), + 'writer.updateUuidAndChain()' + ) - await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()') - // for healthcheck - this._tags = metadata.vm.tags - } - } else { - Task.info('No new data to upload for this VM') + await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()') + // for healthcheck + this._tags = metadata.vm.tags } } } diff --git a/@xen-orchestra/backups/_runners/_vmRunners/_AbstractRemote.mjs b/@xen-orchestra/backups/_runners/_vmRunners/_AbstractRemote.mjs index b6cdc3633d5..1ae1a10243f 100644 --- a/@xen-orchestra/backups/_runners/_vmRunners/_AbstractRemote.mjs +++ b/@xen-orchestra/backups/_runners/_vmRunners/_AbstractRemote.mjs @@ -1,14 +1,18 @@ -import { asyncEach } from '@vates/async-each' +import groupBy from 'lodash/groupBy.js' + import { decorateMethodsWith } from '@vates/decorate-with' import { defer } from 'golike-defer' import { Disposable } from 'promise-toolbox' +import { createPredicate } from 'value-matcher' import { getVmBackupDir } from '../../_getVmBackupDir.mjs' import { Abstract } from './_Abstract.mjs' import { extractIdsFromSimplePattern } from '../../extractIdsFromSimplePattern.mjs' +import { Task } from '../../Task.mjs' export const AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstract { + _filterPredicate constructor({ config, job, @@ -57,39 +61,74 @@ export const AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstrac }) ) }) + const { filter } = job + if (filter === undefined) { + this._filterPredicate = () => true + } else { + this._filterPredicate = createPredicate(filter) + } } - async _computeTransferList(predicate) { - const vmBackups = await this._sourceRemoteAdapter.listVmBackups(this._vmUuid, predicate) + async #computeTransferListPerJob(sourceBackups, remotesBackups) { const localMetada = new Map() - Object.values(vmBackups).forEach(metadata => { + sourceBackups.forEach(metadata => { const timestamp = metadata.timestamp localMetada.set(timestamp, metadata) }) - const nbRemotes = Object.keys(this.remoteAdapters).length + const nbRemotes = remotesBackups.length const remoteMetadatas = {} - await asyncEach(Object.values(this.remoteAdapters), async remoteAdapter => { - const remoteMetadata = await remoteAdapter.listVmBackups(this._vmUuid, predicate) - remoteMetadata.forEach(metadata => { + remotesBackups.forEach(async remoteBackups => { + remoteBackups.forEach(metadata => { const timestamp = metadata.timestamp remoteMetadatas[timestamp] = (remoteMetadatas[timestamp] ?? 0) + 1 }) }) - let chain = [] + let transferList = [] const timestamps = [...localMetada.keys()] timestamps.sort() for (const timestamp of timestamps) { if (remoteMetadatas[timestamp] !== nbRemotes) { // this backup is not present in all the remote // should be retransfered if not found later - chain.push(localMetada.get(timestamp)) + transferList.push(localMetada.get(timestamp)) } else { // backup is present in local and remote : the chain has already been transferred - chain = [] + transferList = [] + } + } + if (transferList.length > 0) { + const filteredTransferList = this._filterTransferList(transferList) + if (filteredTransferList.length > 0) { + return filteredTransferList + } else { + Task.info('This VM is excluded by the job filter') + return [] } + } else { + Task.info('No new data to upload for this VM') } - return chain + + return [] + } + + /** + * + * @param {*} vmPredicate a callback checking if backup is eligible for transfer. This filter MUST NOT cut delta chains + * @returns + */ + async _computeTransferList(vmPredicate) { + const sourceBackups = Object.values(await this._sourceRemoteAdapter.listVmBackups(this._vmUuid, vmPredicate)) + const remotesBackups = await Promise.all( + Object.values(this.remoteAdapters).map(remoteAdapter => remoteAdapter.listVmBackups(this._vmUuid, vmPredicate)) + ) + const sourceBackupByJobId = groupBy(sourceBackups, 'jobId') + const transferByJobs = await Promise.all( + Object.values(sourceBackupByJobId).map(vmBackupsByJob => + this.#computeTransferListPerJob(vmBackupsByJob, remotesBackups) + ) + ) + return transferByJobs.flat(1) } async run($defer) { diff --git a/@xen-orchestra/backups/package.json b/@xen-orchestra/backups/package.json index d0392835f7d..8dbc03a23c0 100644 --- a/@xen-orchestra/backups/package.json +++ b/@xen-orchestra/backups/package.json @@ -46,6 +46,7 @@ "proper-lockfile": "^4.1.2", "tar": "^6.1.15", "uuid": "^9.0.0", + "value-matcher": "^0.2.0", "vhd-lib": "^4.11.0", "xen-api": "^4.2.0", "yazl": "^2.5.1" diff --git a/CHANGELOG.unreleased.md b/CHANGELOG.unreleased.md index 94c2c590c4f..0606b563e55 100644 --- a/CHANGELOG.unreleased.md +++ b/CHANGELOG.unreleased.md @@ -13,6 +13,7 @@ - [OTP] Key can be copied to clipboard because some clients cannot use the QR code - [Plugin/perf-alert] Add a toggle to exclude selected items (PR [#7911](https://github.com/vatesfr/xen-orchestra/pull/7911)) +- [Backup/Mirror] Filter the VM that must be mirrored [#7748](https://github.com/vatesfr/xen-orchestra/issues/7748) (PR [#7941](https://github.com/vatesfr/xen-orchestra/pull/7941)) ### Bug fixes @@ -37,7 +38,9 @@ +- @xen-orchestra/backups minor - @xen-orchestra/web-core patch +- xo-server minor - xo-server-perf-alert minor - xo-web minor diff --git a/packages/xo-server/src/api/mirror-backup.mjs b/packages/xo-server/src/api/mirror-backup.mjs index 4beb94d025c..308ca08f29c 100644 --- a/packages/xo-server/src/api/mirror-backup.mjs +++ b/packages/xo-server/src/api/mirror-backup.mjs @@ -21,6 +21,32 @@ const SCHEMA_SETTINGS = { optional: true, } +// a filter properties is allowed +// for now it only support by VM uuid +const MIRROR_BACKUP_FILTER = { + type: 'object', + nullable: true, + optional: true, + properties: { + vm: { + properties: { + uuid: { + type: 'object', + properties: { + __or: { + type: 'array', + items: { + type: 'string', + minItems: 1, + }, + }, + }, + }, + }, + }, + }, +} + export function createJob({ schedules, ...job }) { return this.createBackupNgJob('mirrorBackup', job, schedules).then(({ id }) => id) } @@ -48,6 +74,7 @@ createJob.params = { type: 'object', optional: true, }, + filter: MIRROR_BACKUP_FILTER, settings: SCHEMA_SETTINGS, } @@ -91,6 +118,7 @@ editJob.params = { type: 'object', optional: true, }, + filter: MIRROR_BACKUP_FILTER, settings: SCHEMA_SETTINGS, }