Skip to content

feat: wire up direct uploads with local file provider #12643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 10, 2025
9 changes: 9 additions & 0 deletions .changeset/bright-parents-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@medusajs/file-local": patch
"@medusajs/core-flows": patch
"@medusajs/js-sdk": patch
"@medusajs/dashboard": patch
"integration-tests-http": patch
---

feat: wire up direct uploads with local file provider
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,22 @@ medusaIntegrationTestRunner({
'Invalid column name(s) "Product field"'
)
})

it("should handle error when the source file does not exists", async () => {
const { body, meta } = getUploadReq({
name: "test.csv",
key: "test.csv",
size: 0,
})

const batchJobRes = await api
.post("/admin/products/imports", body, meta)
.catch((e) => e)

expect(batchJobRes.response.data.message).toEqual(
"An unknown error occurred."
)
})
})
},
})
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ medusaIntegrationTestRunner({
extension: "csv",
mime_type: "text/csv",
size: file.size,
url: expect.stringContaining(response.data.filename),
url: "/admin/uploads",
})
)
expect(response.status).toEqual(200)
Expand Down
2 changes: 1 addition & 1 deletion packages/admin/dashboard/src/hooks/api/products.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ export const useImportProducts = (
>
) => {
return useMutation({
mutationFn: (payload) => sdk.admin.product.import(payload),
mutationFn: (payload) => sdk.admin.product.createImport(payload),
onSuccess: (data, variables, context) => {
options?.onSuccess?.(data, variables, context)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export type NormalizeProductCsvV1StepInput = string

export const normalizeCsvToChunksStepId = "normalize-product-csv-to-chunks"

type Chunk = { id: string; toCreate: number; toUpdate: number }

/**
* Processes a chunk of products by writing them to a file. Later the
* file will be processed after the import has been confirmed.
Expand All @@ -23,7 +25,7 @@ async function processChunk(
fileKey: string,
csvRows: ReturnType<(typeof CSVNormalizer)["preProcess"]>[],
currentRowNumber: number
) {
): Promise<Chunk> {
const normalizer = new CSVNormalizer(csvRows)
const products = normalizer.proccess(currentRowNumber)

Expand Down Expand Up @@ -76,7 +78,7 @@ async function createChunks(
file: IFileModuleService,
fileKey: string,
stream: Parser
) {
): Promise<Chunk[]> {
/**
* The row under process
*/
Expand All @@ -97,7 +99,7 @@ async function createChunks(
* Validated chunks that have been written with the file
* provider
*/
const chunks: { id: string; toCreate: number; toUpdate: number }[] = []
const chunks: Chunk[] = []

/**
* Currently collected rows to be processed as one chunk
Expand Down Expand Up @@ -192,36 +194,51 @@ async function createChunks(
export const normalizeCsvToChunksStep = createStep(
normalizeCsvToChunksStepId,
async (fileKey: NormalizeProductCsvV1StepInput, { container }) => {
const file = container.resolve(Modules.FILE)
const contents = await file.getDownloadStream(fileKey)
const chunks = await createChunks(
file,
fileKey,
contents.pipe(
parse({
return new Promise<
StepResponse<{
chunks: Chunk[]
summary: Omit<Chunk, "id">
}>
>(async (resolve, reject) => {
try {
const file = container.resolve(Modules.FILE)
const contents = await file.getDownloadStream(fileKey)
const transformer = parse({
columns: true,
skip_empty_lines: true,
})
)
)

const summary = chunks.reduce<{ toCreate: number; toUpdate: number }>(
(result, chunk) => {
result.toCreate = result.toCreate + chunk.toCreate
result.toUpdate = result.toUpdate + chunk.toUpdate
return result
},
{ toCreate: 0, toUpdate: 0 }
)
contents.on("error", reject)

/**
* Delete CSV file once we have the chunks
*/
await file.deleteFiles(fileKey)
const chunks = await createChunks(
file,
fileKey,
contents.pipe(transformer)
)

return new StepResponse({
chunks,
summary,
const summary = chunks.reduce<{ toCreate: number; toUpdate: number }>(
(result, chunk) => {
result.toCreate = result.toCreate + chunk.toCreate
result.toUpdate = result.toUpdate + chunk.toUpdate
return result
},
{ toCreate: 0, toUpdate: 0 }
)

/**
* Delete CSV file once we have the chunks
*/
await file.deleteFiles(fileKey)

resolve(
new StepResponse({
chunks,
summary,
})
)
} catch (error) {
reject(error)
}
})
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ export const processImportChunksStepId = "process-import-chunks"
* const data = parseProductCsvStep("products.csv")
*/
export const processImportChunksStep = createStep(
processImportChunksStepId,
{
name: processImportChunksStepId,
async: true,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I do not convert this step to async, then the imports confirm endpoint waits until this step is done (which takes minutes with larger uploads).

@carlos-r-l-rodrigues can you confirm if my assumption is correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When composing the workflow, as the step to wait for confirmation is async, this workflow will be automatically flagged as async.
so, here the async flag is just optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. But I noticed that without this flag, the endpoint waits for the workflow to finish. Maybe what I need is background execution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. But I noticed that without this flag, the endpoint waits for the workflow to finish. Maybe what I need is background execution

This shouldn't happen. When there is one async step, the worklfow.run will return as soon as it get to this step. When that step is finished, it will continue in another process.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

programatically speaking, we do not await when we execute an async step. if you want we can take a look together to be sure of what is happening

},
async (input: { chunks: { id: string }[] }, { container }) => {
const file = container.resolve(Modules.FILE)

Expand Down
42 changes: 36 additions & 6 deletions packages/core/js-sdk/src/admin/product.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { HttpTypes, SelectParams } from "@medusajs/types"
import { Client } from "../client"
import { Client, FetchError } from "../client"
import { ClientHeaders } from "../types"

export class Product {
Expand Down Expand Up @@ -114,10 +114,40 @@ export class Product {
* special headers in this request, since external services like S3 will
* give a CORS error.
*/
await fetch(response.url, {
method: "PUT",
body: body.file,
})
if (
response.url.startsWith("http://") ||
response.url.startsWith("https://")
) {
const uploadResponse = await fetch(response.url, {
method: "PUT",
body: body.file,
})
if (uploadResponse.status >= 400) {
throw new FetchError(
uploadResponse.statusText,
uploadResponse.statusText,
uploadResponse.status
)
}
} else {
const form = new FormData()
form.append("files", body.file)

const localUploadResponse = await this.client.fetch<{
files: HttpTypes.AdminUploadFile
}>("admin/uploads", {
method: "POST",
headers: {
...headers,
// Let the browser determine the content type.
"content-type": null,
},
body: form,
query,
})

response.filename = localUploadResponse.files[0].id
}

/**
* Perform products import using the uploaded file name
Expand Down Expand Up @@ -164,7 +194,7 @@ export class Product {
headers?: ClientHeaders
) {
return await this.client.fetch<{}>(
`/admin/products/import/${transactionId}/confirm`,
`/admin/products/imports/${transactionId}/confirm`,
{
method: "POST",
headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,6 @@ export class LocalFileService extends AbstractFileProviderService {
* Returns the pre-signed URL that the client (frontend) can use to trigger
* a file upload. In this case, the Medusa backend will implement the
* "/upload" endpoint to perform the file upload.
*
* Since, we do not want the client to perform link detection on the frontend
* and then prepare a different kind of request for cloud providers and different
* request for the local server, we will have to make these URLs self sufficient.
*
* What is a self sufficient URL
*
* - There should be no need to specify the MIME type or filename separately in request body (cloud providers don't allow it).
* - There should be no need to pass auth headers like cookies. Again cloud providers
* won't allow it and will likely result in a CORS error.
*/
async getPresignedUploadUrl(
fileData: FileTypes.ProviderGetPresignedUploadUrlDTO
Expand All @@ -159,18 +149,8 @@ export class LocalFileService extends AbstractFileProviderService {
)
}

const uploadUrl = new URL(
"upload",
`${this.backendUrl_.replace(/\/$/, "")}/`
)

uploadUrl.searchParams.set("filename", fileData.filename)
if (fileData.mimeType) {
uploadUrl.searchParams.set("type", fileData.mimeType)
}

return {
url: uploadUrl.toString(),
url: "/admin/uploads",
key: fileData.filename,
}
}
Expand Down