Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backups/mirror): filter backup to be transfered #7748

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions @xen-orchestra/backups/_runners/_vmRunners/FullRemote.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,39 @@ import { AbstractRemote } from './_AbstractRemote.mjs'
import { FullRemoteWriter } from '../_writers/FullRemoteWriter.mjs'
import { forkStreamUnpipe } from '../_forkStreamUnpipe.mjs'
import { watchStreamSize } from '../../_watchStreamSize.mjs'
import { Task } from '../../Task.mjs'

export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote {
_getRemoteWriter() {
return FullRemoteWriter
}

_filterTransferList(transferList) {
return transferList.filter(this._filterPredicate)
}

async _run() {
const transferList = await this._computeTransferList(({ mode }) => mode === 'full')

if (transferList.length > 0) {
for (const metadata of transferList) {
const stream = await this._sourceRemoteAdapter.readFullVmBackup(metadata)
const sizeContainer = watchStreamSize(stream)
for (const metadata of transferList) {
const stream = await this._sourceRemoteAdapter.readFullVmBackup(metadata)
const sizeContainer = watchStreamSize(stream)

// @todo shouldn't transfer backup if it will be deleted by retention policy (higher retention on source than destination)
await this._callWriters(
writer =>
writer.run({
stream: forkStreamUnpipe(stream),
// stream will be forked and transformed, it's not safe to attach additionnal properties to it
streamLength: stream.length,
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
sizeContainer,
}),
'writer.run()'
)
// for healthcheck
this._tags = metadata.vm.tags
}
} else {
Task.info('No new data to upload for this VM')
// @todo shouldn't transfer backup if it will be deleted by retention policy (higher retention on source than destination)
await this._callWriters(
writer =>
writer.run({
stream: forkStreamUnpipe(stream),
// stream will be forked and transformed, it's not safe to attach additionnal properties to it
streamLength: stream.length,
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
sizeContainer,
}),
'writer.run()'
)
// for healthcheck
this._tags = metadata.vm.tags
}
}
}
87 changes: 46 additions & 41 deletions @xen-orchestra/backups/_runners/_vmRunners/IncrementalRemote.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import mapValues from 'lodash/mapValues.js'
import { AbstractRemote } from './_AbstractRemote.mjs'
import { forkDeltaExport } from './_forkDeltaExport.mjs'
import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs'
import { Task } from '../../Task.mjs'
import { Disposable } from 'promise-toolbox'
import { openVhd } from 'vhd-lib'
import { getVmBackupDir } from '../../_getVmBackupDir.mjs'
Expand All @@ -16,6 +15,16 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote {
_getRemoteWriter() {
return IncrementalRemoteWriter
}

// we'll transfer the full list if at least one backup should be transfered
// to ensure we don't cut the delta chain
_filterTransferList(transferList) {
if (transferList.some(vmBackupMetadata => this._filterPredicate(vmBackupMetadata))) {
return transferList
}
return []
}

async _selectBaseVm(metadata) {
// for each disk , get the parent
const baseUuidToSrcVdi = new Map()
Expand Down Expand Up @@ -53,50 +62,46 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote {
async _run() {
const transferList = await this._computeTransferList(({ mode }) => mode === 'delta')

if (transferList.length > 0) {
for (const metadata of transferList) {
assert.strictEqual(metadata.mode, 'delta')
await this._selectBaseVm(metadata)
await this._callWriters(writer => writer.prepare({ isBase: metadata.isBase }), 'writer.prepare()')
const incrementalExport = await this._sourceRemoteAdapter.readIncrementalVmBackup(metadata, undefined, {
useChain: false,
})
for (const metadata of transferList) {
assert.strictEqual(metadata.mode, 'delta')
await this._selectBaseVm(metadata)
await this._callWriters(writer => writer.prepare({ isBase: metadata.isBase }), 'writer.prepare()')
const incrementalExport = await this._sourceRemoteAdapter.readIncrementalVmBackup(metadata, undefined, {
useChain: false,
})

const isVhdDifferencing = {}
const isVhdDifferencing = {}

await asyncEach(Object.entries(incrementalExport.streams), async ([key, stream]) => {
isVhdDifferencing[key] = await isVhdDifferencingDisk(stream)
})
await asyncEach(Object.entries(incrementalExport.streams), async ([key, stream]) => {
isVhdDifferencing[key] = await isVhdDifferencingDisk(stream)
})

incrementalExport.streams = mapValues(incrementalExport.streams, this._throttleStream)
await this._callWriters(
writer =>
writer.transfer({
deltaExport: forkDeltaExport(incrementalExport),
isVhdDifferencing,
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
}),
'writer.transfer()'
)
// this will update parent name with the needed alias
await this._callWriters(
writer =>
writer.updateUuidAndChain({
isVhdDifferencing,
timestamp: metadata.timestamp,
vdis: incrementalExport.vdis,
}),
'writer.updateUuidAndChain()'
)
incrementalExport.streams = mapValues(incrementalExport.streams, this._throttleStream)
await this._callWriters(
writer =>
writer.transfer({
deltaExport: forkDeltaExport(incrementalExport),
isVhdDifferencing,
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
}),
'writer.transfer()'
)
// this will update parent name with the needed alias
await this._callWriters(
writer =>
writer.updateUuidAndChain({
isVhdDifferencing,
timestamp: metadata.timestamp,
vdis: incrementalExport.vdis,
}),
'writer.updateUuidAndChain()'
)

await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()')
// for healthcheck
this._tags = metadata.vm.tags
}
} else {
Task.info('No new data to upload for this VM')
await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()')
// for healthcheck
this._tags = metadata.vm.tags
}
}
}
Expand Down
63 changes: 51 additions & 12 deletions @xen-orchestra/backups/_runners/_vmRunners/_AbstractRemote.mjs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import { asyncEach } from '@vates/async-each'
import groupBy from 'lodash/groupBy.js'

import { decorateMethodsWith } from '@vates/decorate-with'
import { defer } from 'golike-defer'
import { Disposable } from 'promise-toolbox'
import { createPredicate } from 'value-matcher'

import { getVmBackupDir } from '../../_getVmBackupDir.mjs'

import { Abstract } from './_Abstract.mjs'
import { extractIdsFromSimplePattern } from '../../extractIdsFromSimplePattern.mjs'
import { Task } from '../../Task.mjs'

export const AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstract {
_filterPredicate
constructor({
config,
job,
Expand Down Expand Up @@ -57,39 +61,74 @@ export const AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstrac
})
)
})
const { filter } = job
if (filter === undefined) {
this._filterPredicate = () => true
} else {
this._filterPredicate = createPredicate(filter)
}
}

async _computeTransferList(predicate) {
const vmBackups = await this._sourceRemoteAdapter.listVmBackups(this._vmUuid, predicate)
async #computeTransferListPerJob(sourceBackups, remotesBackups) {
const localMetada = new Map()
Object.values(vmBackups).forEach(metadata => {
sourceBackups.forEach(metadata => {
const timestamp = metadata.timestamp
localMetada.set(timestamp, metadata)
})
const nbRemotes = Object.keys(this.remoteAdapters).length
const nbRemotes = remotesBackups.length
const remoteMetadatas = {}
await asyncEach(Object.values(this.remoteAdapters), async remoteAdapter => {
const remoteMetadata = await remoteAdapter.listVmBackups(this._vmUuid, predicate)
remoteMetadata.forEach(metadata => {
remotesBackups.forEach(async remoteBackups => {
remoteBackups.forEach(metadata => {
const timestamp = metadata.timestamp
remoteMetadatas[timestamp] = (remoteMetadatas[timestamp] ?? 0) + 1
})
})

let chain = []
let transferList = []
const timestamps = [...localMetada.keys()]
timestamps.sort()
for (const timestamp of timestamps) {
if (remoteMetadatas[timestamp] !== nbRemotes) {
// this backup is not present in all the remote
// should be retransfered if not found later
chain.push(localMetada.get(timestamp))
transferList.push(localMetada.get(timestamp))
} else {
// backup is present in local and remote : the chain has already been transferred
chain = []
transferList = []
}
}
if (transferList.length > 0) {
const filteredTransferList = this._filterTransferList(transferList)
if (filteredTransferList.length > 0) {
return filteredTransferList
} else {
Task.info('This VM is excluded by the job filter')
return []
}
} else {
Task.info('No new data to upload for this VM')
}
return chain

return []
}

/**
*
* @param {*} vmPredicate a callback checking if backup is eligible for transfer. This filter MUST NOT cut delta chains
* @returns
*/
async _computeTransferList(vmPredicate) {
const sourceBackups = Object.values(await this._sourceRemoteAdapter.listVmBackups(this._vmUuid, vmPredicate))
const remotesBackups = await Promise.all(
Object.values(this.remoteAdapters).map(remoteAdapter => remoteAdapter.listVmBackups(this._vmUuid, vmPredicate))
)
const sourceBackupByJobId = groupBy(sourceBackups, 'jobId')
const transferByJobs = await Promise.all(
Object.values(sourceBackupByJobId).map(vmBackupsByJob =>
this.#computeTransferListPerJob(vmBackupsByJob, remotesBackups)
)
)
return transferByJobs.flat(1)
}

async run($defer) {
Expand Down
1 change: 1 addition & 0 deletions @xen-orchestra/backups/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"proper-lockfile": "^4.1.2",
"tar": "^6.1.15",
"uuid": "^9.0.0",
"value-matcher": "^0.2.0",
"vhd-lib": "^4.11.0",
"xen-api": "^4.2.0",
"yazl": "^2.5.1"
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

- [OTP] Key can be copied to clipboard because some clients cannot use the QR code
- [Plugin/perf-alert] Add a toggle to exclude selected items (PR [#7911](https://github.com/vatesfr/xen-orchestra/pull/7911))
- [Backup/Mirror] Filter the VM that must be mirrored [#7748](https://github.com/vatesfr/xen-orchestra/issues/7748) (PR [#7941](https://github.com/vatesfr/xen-orchestra/pull/7941))
julien-f marked this conversation as resolved.
Show resolved Hide resolved

### Bug fixes

Expand All @@ -37,7 +38,9 @@

<!--packages-start-->

- @xen-orchestra/backups minor
- @xen-orchestra/web-core patch
- xo-server minor
- xo-server-perf-alert minor
- xo-web minor
julien-f marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
28 changes: 28 additions & 0 deletions packages/xo-server/src/api/mirror-backup.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,32 @@ const SCHEMA_SETTINGS = {
optional: true,
}

// a filter properties is allowed
// for now it only support by VM uuid
const MIRROR_BACKUP_FILTER = {
type: 'object',
nullable: true,
optional: true,
properties: {
vm: {
properties: {
uuid: {
type: 'object',
properties: {
__or: {
type: 'array',
items: {
type: 'string',
minItems: 1,
},
},
},
},
},
},
},
}

export function createJob({ schedules, ...job }) {
return this.createBackupNgJob('mirrorBackup', job, schedules).then(({ id }) => id)
}
Expand Down Expand Up @@ -48,6 +74,7 @@ createJob.params = {
type: 'object',
optional: true,
},
filter: MIRROR_BACKUP_FILTER,
settings: SCHEMA_SETTINGS,
}

Expand Down Expand Up @@ -91,6 +118,7 @@ editJob.params = {
type: 'object',
optional: true,
},
filter: MIRROR_BACKUP_FILTER,
settings: SCHEMA_SETTINGS,
}

Expand Down
Loading