Skip to content

Commit

Permalink
wip: pruning features of file tree serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
aryanjassal committed Aug 30, 2024
1 parent 83a10a6 commit 5aaae26
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 201 deletions.
278 changes: 78 additions & 200 deletions src/vaults/fileTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,65 +342,43 @@ async function* encodeContent(
function serializerStreamFactory(
fs: FileSystem | FileSystemReadable,
treeGen: AsyncGenerator<TreeNode, void, void>,
yieldContents: boolean = true,
): ReadableStream<Uint8Array> {
const files: Array<[number, string]> = [];
let treeDataGen: AsyncGenerator<TreeNode, void, void> | undefined = treeGen;
let contentsGen: AsyncGenerator<Uint8Array, void, void> | undefined =
undefined;
// Will get the next content chunk or return undefined if there is no more data to send
let contentsGen: AsyncGenerator<Uint8Array, void, void> | undefined;
let fileNode: TreeNode | undefined;

async function getNextFileNode(): Promise<TreeNode | undefined> {
while (true) {
const result = await treeGen.next();
if (result.done) return undefined;
if (result.value.type === 'FILE') return result.value;
// If it's not a file, keep iterating
}
}
async function getNextContentChunk(): Promise<Uint8Array | undefined> {
if (!yieldContents) return undefined;
while (true) {
if (contentsGen == null) {
const next = files.shift();
// No more files means we're done
if (next == null) return undefined;
const [iNode, path] = next;
contentsGen = encodeContent(fs, path, iNode);
fileNode = await getNextFileNode();
if (fileNode == null) return undefined;
contentsGen = encodeContent(fs, fileNode.path, fileNode.iNode);
}
const result = await contentsGen.next();
if (!result.done) return result.value;
else contentsGen = undefined;
const contentChunk = await contentsGen.next();
if (!contentChunk.done) return contentChunk.value;
contentsGen = undefined;
}
}
async function cleanup(reason: unknown) {
await treeDataGen?.throw(reason).catch(() => {});
await treeGen?.throw(reason).catch(() => {});
await contentsGen?.throw(reason).catch(() => {});
}
return new ReadableStream<Uint8Array>({
start: (controller) => {
controller.enqueue(generateGenericHeader({ type: HeaderType.TREE }));
},
pull: async (controller) => {
try {
if (treeDataGen != null) {
const result = await treeGen.next();
if (!result.done) {
// If a file, add to the file list to encode contents later
if (result.value.type === 'FILE') {
files.push([result.value.iNode, result.value.path]);
}
// Normal tree nodes are just serialized and converted to `UInt8Array`
const jsonSerialized = JSON.stringify(result.value);
controller.enqueue(
vaultsUtils.bufferToUint8ArrayCopyless(
Buffer.from(jsonSerialized, 'utf-8'),
),
);
} else {
const treeDoneMessage = JSON.stringify({ type: 'DONE' });
controller.enqueue(
vaultsUtils.bufferToUint8ArrayCopyless(
Buffer.from(treeDoneMessage, 'utf-8'),
),
);
treeDataGen = undefined;
}
} else {
const contentDataChunk = await getNextContentChunk();
if (contentDataChunk == null) return controller.close();
controller.enqueue(contentDataChunk);
const contentChunk = await getNextContentChunk();
if (contentChunk == null) {
return controller.close();
}
else {
controller.enqueue(contentChunk);
}
} catch (e) {
await cleanup(e);
Expand Down Expand Up @@ -469,84 +447,17 @@ function parseTreeNode(data: unknown): asserts data is TreeNode {
*/
function parserTransformStreamFactory(): TransformStream<
Uint8Array,
TreeNode | ContentNode | Uint8Array
string | ContentNode | Uint8Array
> {
let workingBuffer: Uint8Array = new Uint8Array(0);
let phase: 'START' | 'TREE' | 'CONTENT' = 'START';
let jsonParser: JSONParser | undefined = undefined;
let lastChunk: Uint8Array | undefined;
let contentLength: bigint | undefined = undefined;
const enterTreeState = (
controller: TransformStreamDefaultController<
TreeNode | ContentNode | Uint8Array
>,
initialChunk: Uint8Array,
) => {
let done = false;
phase = 'TREE';
workingBuffer = new Uint8Array(0);
// Setting up the JSON stream parser
jsonParser = new JSONParser({
separator: '',
paths: ['$'],
});
const handleEnd = (e?: unknown) => {
if (e != null && !(done && e instanceof TokenizerError)) {
controller.error(e);
return;
}
if (e instanceof TokenizerError) {
// Extracting error position.
const match = e.message.match(/at position "(.*)" in state/);
if (match == null) {
controller.error(
new utilsErrors.ErrorUtilsUndefinedBehaviour(
'failed to match for buffer index',
),
);
return;
}
const bufferIndex = parseInt(match[1]);
if (isNaN(bufferIndex)) {
controller.error(
new utilsErrors.ErrorUtilsUndefinedBehaviour(
'failed to parse buffer index',
),
);
return;
}
if (lastChunk == null) {
controller.error(
new utilsErrors.ErrorUtilsUndefinedBehaviour(
'lastChunk was undefined',
),
);
return;
}
workingBuffer = lastChunk.subarray(bufferIndex);
}
jsonParser = undefined;
};
jsonParser.onEnd = handleEnd;
jsonParser.onError = handleEnd;
jsonParser.onValue = (value) => {
const message = value.value;
if (isDoneMessage(message)) {
done = true;
jsonParser?.end();
phase = 'CONTENT';
return;
}
parseTreeNode(message);
controller.enqueue(message);
};
jsonParser.write(initialChunk);
};
/* Check if any chunks have been processed. If the stream is being flushed
* without processing any chunks, then something went wrong with the stream.
*/
let processedChunks: boolean = false;
return new TransformStream<Uint8Array, TreeNode | ContentNode | Uint8Array>({

return new TransformStream<Uint8Array, ContentNode | Uint8Array | string>({
/**
* Check if any chunks have been processed. If the stream is being flushed
* without processing any chunks, then something went wrong with the stream.
*/
flush: (controller) => {
if (!processedChunks) {
controller.error(
Expand All @@ -556,91 +467,58 @@ function parserTransformStreamFactory(): TransformStream<
},
transform: (chunk, controller) => {
if (chunk.byteLength > 0) processedChunks = true;
switch (phase) {
case 'START': {
workingBuffer = vaultsUtils.uint8ArrayConcat([workingBuffer, chunk]);
// Start phase expects a TREE header to indicate start of TREE data
const { data, remainder } = parseGenericHeader(workingBuffer);
if (data == null) {
// Wait for more data
workingBuffer = remainder;
return;
}
if (data.type !== HeaderType.TREE) {
controller.error(
new validationErrors.ErrorParse(
`expected TREE header, got "${HeaderType[data.type]}"`,
),
);
return;
}
// We have the tree header, so we switch to tree mode
enterTreeState(controller, remainder);
lastChunk = remainder;
return;
}
case 'TREE':
{
// Tree needs to parse a JSON stream
lastChunk = chunk;
jsonParser?.write(chunk);
}
return;
case 'CONTENT':
{
workingBuffer = vaultsUtils.uint8ArrayConcat([
workingBuffer,
chunk,
]);
if (contentLength == null) {
const genericHeader = parseGenericHeader(workingBuffer);
if (genericHeader.data == null) return;
if (genericHeader.data.type === HeaderType.TREE) {
enterTreeState(controller, genericHeader.remainder);
lastChunk = genericHeader.remainder;
return;
}
if (genericHeader.data.type !== HeaderType.CONTENT) {
controller.error(
new validationErrors.ErrorParse(
`expected CONTENT or TREE message, got "${genericHeader.data.type}"`,
),
);
return;
}
const contentHeader = parseContentHeader(genericHeader.remainder);
if (contentHeader.data == null) return;

const { dataSize, iNode } = contentHeader.data;
controller.enqueue({ type: 'CONTENT', dataSize, iNode });
contentLength = dataSize;
workingBuffer = contentHeader.remainder;
}
// We yield the whole buffer, or split it for the next header
if (workingBuffer.byteLength === 0) return;
if (workingBuffer.byteLength <= contentLength) {
contentLength -= BigInt(workingBuffer.byteLength);
controller.enqueue(workingBuffer);
workingBuffer = new Uint8Array(0);
if (contentLength === 0n) contentLength = undefined;
return;
} else {
controller.enqueue(
workingBuffer.subarray(0, Number(contentLength)),
);
workingBuffer = workingBuffer.subarray(Number(contentLength));
contentLength = undefined;
}
}
return;
default:
workingBuffer = vaultsUtils.uint8ArrayConcat([
workingBuffer,
chunk,
]);
if (contentLength == null) {
const genericHeader = parseGenericHeader(workingBuffer);
if (genericHeader.data == null) return;
if (genericHeader.data.type !== HeaderType.CONTENT) {
controller.error(
new utilsErrors.ErrorUtilsUndefinedBehaviour(
`invalid state "${phase}"`,
new validationErrors.ErrorParse(
`expected CONTENT message, got "${genericHeader.data.type}"`,
),
);
return;
}
const contentHeader = parseContentHeader(genericHeader.remainder);
if (contentHeader.data == null) return;

const { dataSize, iNode } = contentHeader.data;
controller.enqueue({ type: 'CONTENT', dataSize, iNode });
contentLength = dataSize;
workingBuffer = contentHeader.remainder;
}
// We yield the whole buffer, or split it for the next header
if (workingBuffer.byteLength === 0) return;
if (workingBuffer.byteLength <= contentLength) {
contentLength -= BigInt(workingBuffer.byteLength);
const fileContents = new TextDecoder().decode(workingBuffer); // newcode
controller.enqueue(fileContents); // newcode
// controller.enqueue(workingBuffer);
workingBuffer = new Uint8Array(0);
if (contentLength === 0n) contentLength = undefined;
// return;
} else {
// controller.enqueue(
// workingBuffer.subarray(0, Number(contentLength)),
// );
const contentChunk = workingBuffer.subarray(0, Number(contentLength)); // new
const contentString = new TextDecoder().decode(contentChunk); // nwe
controller.enqueue(contentString); // nwe
workingBuffer = workingBuffer.subarray(Number(contentLength));
contentLength = undefined;
}
// return;
// default:
// controller.error(
// new utilsErrors.ErrorUtilsUndefinedBehaviour(
// `invalid state "${phase}"`,
// ),
// );
// return;
// }
},
});
}
Expand Down
29 changes: 28 additions & 1 deletion tests/vaults/fileTree.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import os from 'os';
import path from 'path';
import { ReadableStream } from 'stream/web';
import { test } from '@fast-check/jest';
import fc from 'fast-check';
import fc, { uint8Array } from 'fast-check';
import * as fileTree from '@/vaults/fileTree';
import * as vaultsTestUtils from './utils';

Expand Down Expand Up @@ -718,5 +718,32 @@ describe('fileTree', () => {
// TODO: tests for
// - empty files
// - files larger than content chunks

// TEST: DEBUGGGG
test('view serializer', async () => {
const fileTreeGen = fileTree.globWalk({
fs,
yieldStats: false,
yieldRoot: false,
yieldFiles: true,
yieldParents: false,
yieldDirectories: false,
});
let serializedStream = fileTree.serializerStreamFactory(fs, fileTreeGen);
// const data: Array<Uint8Array> = [];
// for await (const d of serializedStream) {
// data.push(d);
// }
// console.log(data.map((v) => Buffer.from(v as Uint8Array).toString()));

// serializedStream = fileTree.serializerStreamFactory(fs, fileTreeGen);
const parserTransform = fileTree.parserTransformStreamFactory();
const outputStream = serializedStream.pipeThrough(parserTransform);
const output: Array<string> = [];
for await (const d of outputStream) {
output.push(d);
}
console.log(output);
});
});
});

0 comments on commit 5aaae26

Please sign in to comment.