Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor release export to export releases artefacts as separate S3 uploads #1617

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 57 additions & 48 deletions backend/src/services/mirroredModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,7 @@ export async function exportModel(
const s3Stream = new PassThrough()
zip.pipe(s3Stream)

if (config.modelMirror.export.kmsSignature.enabled) {
log.debug({ modelId, semvers }, 'Using signatures. Uploading to temporary S3 location first.')
uploadToTemporaryS3Location(modelId, semvers, s3Stream).then(() =>
copyToExportBucketWithSignatures(modelId, semvers, mirroredModelId, user.dn).catch((error) =>
log.error({ modelId, semvers, error }, 'Failed to upload export to export location with signatures'),
),
)
} else {
log.debug({ modelId, semvers }, 'Signatures not enabled. Uploading to export S3 location.')
uploadToExportS3Location(modelId, semvers, s3Stream, { modelId, mirroredModelId })
}
uploadToS3(`${modelId}.zip`, s3Stream, user.dn, { modelId, semvers }, { modelId, mirroredModelId })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how much this matters but I like to avoid making any class asynchronous if they don't need to be and as such you could include an await here


try {
await addModelCardRevisionsToZip(user, model, zip)
Expand All @@ -77,7 +67,7 @@ export async function exportModel(
}
try {
if (semvers && semvers.length > 0) {
await addReleasesToZip(user, model, semvers, zip)
await addReleasesToZip(user, model, semvers, zip) // Adds releases to zip file
}
} catch (error) {
throw InternalError('Error when adding the release(s) to the zip file.', { error })
Expand Down Expand Up @@ -167,50 +157,68 @@ function parseModelCard(modelCardJson: string, mirroredModelId: string, sourceMo
return { modelCard }
}

async function uploadToS3(
fileName: string,
stream: Readable,
exporter: string,
logData: Record<string, unknown>,
metadata?: Record<string, string>,
) {
if (config.modelMirror.export.kmsSignature.enabled) {
log.debug(logData, 'Using signatures. Uploading to temporary S3 location first.')
uploadToTemporaryS3Location(fileName, stream, logData).then(() =>
copyToExportBucketWithSignatures(fileName, exporter, logData, metadata).catch((error) =>
log.error({ error, ...logData }, 'Failed to upload export to export location with signatures'),
),
)
} else {
log.debug(logData, 'Signatures not enabled. Uploading to export S3 location.')
uploadToExportS3Location(fileName, stream, logData, metadata)
}
}

async function copyToExportBucketWithSignatures(
modelId: string,
semvers: string[] | undefined,
mirroredModelId: string,
fileName: string,
exporter: string,
logData: Record<string, unknown>,
metadata?: Record<string, string>,
) {
let signatures = {}
log.debug({ modelId, semvers }, 'Getting stream from S3 to generate signatures.')
const streamForDigest = await getObjectFromTemporaryS3Location(modelId, semvers)
log.debug(logData, 'Getting stream from S3 to generate signatures.')
const streamForDigest = await getObjectFromTemporaryS3Location(fileName, logData)
const messageDigest = await generateDigest(streamForDigest)
log.debug({ modelId, semvers }, 'Generating signatures.')
log.debug(logData, 'Generating signatures.')
try {
signatures = await sign(messageDigest)
} catch (e) {
log.error({ modelId }, 'Error generating signature for export.')
log.error(logData, 'Error generating signature for export.')
throw e
}
log.debug({ modelId, semvers }, 'Successfully generated signatures')
log.debug({ modelId, semvers }, 'Getting stream from S3 to upload to export location.')
const streamToCopy = await getObjectFromTemporaryS3Location(modelId, semvers)
await uploadToExportS3Location(modelId, semvers, streamToCopy, {
modelId,
mirroredModelId,
log.debug(logData, 'Successfully generated signatures')
log.debug(logData, 'Getting stream from S3 to upload to export location.')
const streamToCopy = await getObjectFromTemporaryS3Location(fileName, logData)
await uploadToExportS3Location(fileName, streamToCopy, logData, {
exporter,
...signatures,
...metadata,
})
}

async function uploadToTemporaryS3Location(
modelId: string,
semvers: string[] | undefined,
fileName: string,
stream: Readable,
logData: Record<string, unknown>,
metadata?: Record<string, string>,
) {
const bucket = config.s3.buckets.uploads
const object = `exportQueue/${modelId}.zip`
const object = `exportQueue/${fileName}`
try {
await putObjectStream(bucket, object, stream, metadata)
log.debug(
{
bucket,
object,
modelId,
semvers,
...logData,
},
'Successfully uploaded export to temporary S3 location.',
)
Expand All @@ -219,26 +227,24 @@ async function uploadToTemporaryS3Location(
{
bucket,
object,
modelId,
semvers,
error,
...logData,
},
'Failed to export to temporary S3 location.',
)
}
}

async function getObjectFromTemporaryS3Location(modelId: string, semvers: string[] | undefined) {
async function getObjectFromTemporaryS3Location(fileName: string, logData: Record<string, unknown>) {
const bucket = config.s3.buckets.uploads
const object = `exportQueue/${modelId}.zip`
const object = `exportQueue/${fileName}`
try {
const stream = (await getObjectStream(bucket, object)).Body as Readable
log.debug(
{
bucket,
object,
modelId,
semvers,
...logData,
},
'Successfully retrieved stream from temporary S3 location.',
)
Expand All @@ -248,9 +254,8 @@ async function getObjectFromTemporaryS3Location(modelId: string, semvers: string
{
bucket,
object,
modelId,
semvers,
error,
...logData,
},
'Failed to retrieve stream from temporary S3 location.',
)
Expand All @@ -259,21 +264,19 @@ async function getObjectFromTemporaryS3Location(modelId: string, semvers: string
}

async function uploadToExportS3Location(
modelId: string,
semvers: string[] | undefined,
object: string,
stream: Readable,
logData: Record<string, unknown>,
metadata?: Record<string, string>,
) {
const bucket = config.modelMirror.export.bucket
const object = `${modelId}.zip`
try {
await putObjectStream(bucket, object, stream, metadata)
log.debug(
{
bucket,
object,
modelId,
semvers,
...logData,
},
'Successfully uploaded export to export S3 location.',
)
Expand All @@ -282,9 +285,8 @@ async function uploadToExportS3Location(
{
bucket,
object,
modelId,
semvers,
error,
...logData,
},
'Failed to export to export S3 location.',
)
Expand Down Expand Up @@ -323,9 +325,16 @@ async function addReleaseToZip(user: UserInterface, model: ModelDoc, release: Re
zip.append(JSON.stringify(release.toJSON()), { name: `${baseUri}/releaseDocument.json` })
for (const file of files) {
zip.append(JSON.stringify(file.toJSON()), { name: `${baseUri}/files/${file._id}/fileDocument.json` })
zip.append((await downloadFile(user, file._id)).Body as stream.Readable, {
name: `${baseUri}/files/${file._id}/fileContent`,
})
uploadToS3(
`${baseUri}/files/${file._id}/fileContent`,
(await downloadFile(user, file._id)).Body as stream.Readable,
user.dn,
{
modelId: model.id,
releaseId: release.id,
fileId: file.id,
},
)
}
} catch (error: any) {
throw InternalError('Error when generating the zip file.', { error })
Expand Down
Loading