Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use experimental streams for interop with NodeJS Writables and Readables #382

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions packages/node/src/Stream/Experimental/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/**
* ets_tracing: off
*/

import * as C from "@effect-ts/core/Collections/Immutable/Chunk"
import * as T from "@effect-ts/core/Effect"
import * as S from "@effect-ts/core/Effect/Experimental/Stream"
import * as Sink from "@effect-ts/core/Effect/Experimental/Stream/Sink"
import * as M from "@effect-ts/core/Effect/Managed"
import { pipe } from "@effect-ts/core/Function"
import type * as stream from "stream"

import * as Byte from "../../Byte"

export class ReadableError {
readonly _tag = "ReadableError"
constructor(readonly error: Error) {}
}

/**
* Captures a Node `Readable`, converting it into a `Stream`. The size
*
* Note: your Readable should not have an encoding set in order to work with buffers,
* calling this with a Readable with an encoding set will `Die`.
*/
export function streamFromReadable(
r: () => stream.Readable,
bufferSize: number = S.DEFAULT_CHUNK_SIZE
): S.Stream<unknown, ReadableError, Byte.Byte> {
return pipe(
T.succeedWith(r),
T.tap((sr) =>
sr.readableEncoding != null
? T.dieMessage(
`stream.Readable encoding set to ${sr.readableEncoding} cannot be used to produce Buffer`
)
: T.unit
),
S.acquireReleaseWith((sr) =>
T.succeedWith(() => {
sr.destroy()
})
),
S.chain((sr) =>
S.async<unknown, ReadableError, Byte.Byte>((emit) => {
sr.on("readable", () => {
let buffer: Buffer | null = null

while ((buffer = sr.read(bufferSize)) !== null) {
emit.chunk(Byte.chunk(buffer))
}
})
sr.on("end", () => {
emit.end()
})
sr.on("error", (err) => {
emit.fail(new ReadableError(err))
})
})
)
)
}

export class WritableError {
readonly _tag = "WritableError"
constructor(readonly error: Error) {}
}

/**
* Uses the provided NodeJS `Writable` stream to create a `Sink` that consumes
* byte chunks and writes them to the `Writable` stream. The sink will yield
* the count of bytes written.
*
* The `Writable` stream will be automatically closed after the stream is
* finished or an error occurred.
*/
export function sinkFromWritable(w: () => stream.Writable) {
return Sink.unwrapManaged(
pipe(
T.succeedWith(w),
M.makeExit((sw) =>
T.succeedWith(() => {
sw.destroy()
})
),
M.map((sw) =>
Sink.foldLeftChunksEffect(0, (bytesWritten, byteChunk: C.Chunk<Byte.Byte>) =>
T.effectAsync<unknown, WritableError, number>((resume) => {
sw.write(Byte.buffer(byteChunk), (err) => {
if (err) {
resume(T.fail(new WritableError(err)))
} else {
resume(T.succeed(bytesWritten + byteChunk.length))
}
})
})
)
)
)
)
}

export class TransformError {
readonly _tag = "TransformError"
constructor(readonly error: Error) {}
}

/**
* A sink that collects all of its inputs into a `Buffer`.
*/
export function collectBuffer<E>(): Sink.Sink<
unknown,
E,
Byte.Byte,
E,
unknown,
Buffer
> {
return Sink.map_(
Sink.foldLeftChunks(C.empty<Byte.Byte>(), (s, i: C.Chunk<Byte.Byte>) =>
IMax153 marked this conversation as resolved.
Show resolved Hide resolved
C.concat_(s, i)
),
Byte.buffer
)
}

/**
* Runs the stream and collects all of its elements to a buffer.
*/
export function runBuffer<R, E>(
self: S.Stream<R, E, Byte.Byte>
): T.Effect<R, E, Buffer> {
return S.run_(self, collectBuffer())
}
54 changes: 54 additions & 0 deletions packages/node/test/stream.experimental.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import * as C from "@effect-ts/core/Collections/Immutable/Chunk"
import * as T from "@effect-ts/core/Effect"
import * as S from "@effect-ts/core/Effect/Experimental/Stream"
import * as Ref from "@effect-ts/core/Effect/Ref"
import { pipe } from "@effect-ts/core/Function"
import * as fs from "fs"
import * as path from "path"
import * as stream from "stream"

import * as Byte from "../src/Byte"
import * as NS from "../src/Stream/Experimental"

describe("Node Stream", () => {
it("should build an Effect-TS Stream from a NodeJS stream.Readable", async () => {
const res = await pipe(
NS.streamFromReadable(() =>
fs.createReadStream(path.join(__dirname, "fix/data.txt"))
),
NS.runBuffer,
T.runPromise
)

expect(res.toString("utf-8")).toEqual("a, b, c")
})

it("should build an Effect-TS Sink from a NodeJS stream.Writable", async () => {
const mockStream = new stream.PassThrough()
let output: C.Chunk<Byte.Byte> = C.empty()

mockStream.on("data", (chunk) => {
output = C.concat_(output, Byte.chunk(chunk))
})

const res = await pipe(
T.do,
T.bind("bytesWritten", () =>
pipe(
Ref.makeRef(0),
T.map((ref) =>
S.repeatEffect(T.delay(10)(Ref.updateAndGet_(ref, (n) => n + 1)))
),
S.unwrap,
S.take(5),
S.map(Byte.byte),
S.run(NS.sinkFromWritable(() => mockStream))
)
),
T.runPromise
)

expect(res.bytesWritten).toEqual(5)
expect(C.toArray(output)).toEqual([1, 2, 3, 4, 5])
})
})