Skip to content

Commit

Permalink
optimize streaming response for NodeHttpServer (#4354)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Jan 28, 2025
1 parent 8f6006a commit 62934fc
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 20 deletions.
5 changes: 5 additions & 0 deletions .changeset/rare-planets-obey.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/platform-node": patch
---

optimize streaming response for NodeHttpServer
34 changes: 14 additions & 20 deletions packages/platform-node/src/internal/httpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type * as Multipart from "@effect/platform/Multipart"
import type * as Path from "@effect/platform/Path"
import * as Socket from "@effect/platform/Socket"
import type * as Cause from "effect/Cause"
import * as Chunk from "effect/Chunk"
import * as Config from "effect/Config"
import * as Effect from "effect/Effect"
import * as FiberSet from "effect/FiberSet"
Expand All @@ -33,7 +34,6 @@ import { pipeline } from "node:stream/promises"
import * as WS from "ws"
import * as NodeContext from "../NodeContext.js"
import * as NodeHttpClient from "../NodeHttpClient.js"
import * as NodeSink from "../NodeSink.js"
import { HttpIncomingMessageImpl } from "./httpIncomingMessage.js"
import * as internalPlatform from "./httpPlatform.js"

Expand Down Expand Up @@ -451,27 +451,21 @@ const handleResponse = (request: ServerRequest.HttpServerRequest, response: Serv
}
case "Stream": {
nodeResponse.writeHead(response.status, headers)
return Stream.run(
Stream.mapError(
body.stream,
(cause) =>
new Error.ResponseError({
request,
response,
reason: "Decode",
cause
})
return body.stream.pipe(
Stream.runForEachChunk((chunk) =>
Effect.async<void>((resume) => {
const array = Chunk.toReadonlyArray(chunk)
for (let i = 0; i < array.length - 1; i++) {
nodeResponse.write(array[i])
}
nodeResponse.write(array[array.length - 1], () => resume(Effect.void))
})
),
NodeSink.fromWritable(() => nodeResponse, (cause) =>
new Error.ResponseError({
request,
response,
reason: "Decode",
cause
}))
).pipe(
Effect.interruptible,
Effect.tapErrorCause(handleCause(nodeResponse))
Effect.matchCause({
onSuccess: () => nodeResponse.end(),
onFailure: handleCause(nodeResponse)
})
)
}
}
Expand Down

0 comments on commit 62934fc

Please sign in to comment.