Skip to content

Commit

Permalink
Non-blocking save asset metadata. (microsoft#910)
Browse files Browse the repository at this point in the history
* Non-blocking save asset metadata.

* Implement queue map.

* Add withQueueMap decorator.
  • Loading branch information
simotw authored Apr 9, 2021
1 parent 592e844 commit ffeb09d
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 7 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"compile": "tsc",
"build": "node ./scripts/dump_git_info.js && react-scripts build",
"react-start": "node ./scripts/dump_git_info.js && react-scripts start",
"test": "react-scripts test --env=jsdom --silent",
"test": "react-scripts test --env=jsdom",
"eject": "react-scripts eject",
"webpack:dev": "webpack --config ./config/webpack.dev.js",
"webpack:prod": "webpack --config ./config/webpack.prod.js",
Expand Down Expand Up @@ -108,6 +108,7 @@
"enzyme-adapter-react-16": "^1.15.1",
"eslint-utils": "^1.4.3",
"foreman": "^3.0.1",
"jest-enzyme": "^7.1.2",
"kind-of": "^6.0.3",
"mime": "^2.4.6",
"minimist": "^1.2.2",
Expand Down
16 changes: 16 additions & 0 deletions src/common/queueMap/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export type Args = any[];

export interface IQueue {
queue: Args[];
isLooping: boolean;
promise?: Promise<void>;
}

export class Queue implements IQueue {
queue: Args[];
isLooping: boolean;
constructor() {
this.queue = [];
this.isLooping = false;
}
}
114 changes: 114 additions & 0 deletions src/common/queueMap/queueMap.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import QueueMap from "./queueMap";

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

describe("QueueMap", () => {
test("dequeueUntilLast", () => {
const queueMap = new QueueMap();
const queueId = "1";
const a = ["a", 1];
const b = ["b", 2];
queueMap.enque(queueId, a);
queueMap.enque(queueId, b);
queueMap.dequeueUntilLast(queueId);
const { queue } = queueMap.getQueueById(queueId);
expect([b]).toEqual(queue);
})
test("call enque while looping items in the queue", async () => {
const queueMap = new QueueMap();
const mockWrite = jest.fn();
const queueId = "1";
const sleepThenReturn = ms => async (...params) => {
await mockWrite(...params);
await sleep(ms);
}
const a = ["a", 1];
const b = ["b", 2];
const c = ["c", 3];
const d = ["d", 4];
const expected = [b, d]
queueMap.enque(queueId, a);
queueMap.enque(queueId, b);
queueMap.on(queueId, sleepThenReturn(1000), params => params);
queueMap.enque(queueId, c);
queueMap.enque(queueId, d);
await sleep(2000);
expect(mockWrite.mock.calls.length).toBe(2);
expect([mockWrite.mock.calls[0], mockWrite.mock.calls[1]]).toEqual(expected);
})
test("prevent call on twice.", async () => {
const queueMap = new QueueMap();
const queueId = "1";
const mockWrite = jest.fn();
const sleepThenReturn = ms => async (...params) => {
await mockWrite(...params);
await sleep(ms);
}
const a = ["a", 1];
const b = ["b", 2];
const c = ["c", 3];
const d = ["d", 4];
const expected = [b, d]
queueMap.enque(queueId, a);
queueMap.enque(queueId, b);
queueMap.on(queueId, sleepThenReturn(1000), params => params);
queueMap.enque(queueId, c);
queueMap.on(queueId, sleepThenReturn(1000), params => params);
queueMap.enque(queueId, d);
await sleep(2000);
expect(mockWrite.mock.calls.length).toBe(2);
expect([mockWrite.mock.calls[0], mockWrite.mock.calls[1]]).toEqual(expected);
})
test("read last element.", async () => {
const queueMap = new QueueMap();
const queueId = "1";
const f = jest.fn();
const sleepThenReturn = ms => async (...params) => {
await f(...params);
await sleep(ms);
}
const a = ["a", 1];
const b = ["b", 2];
const c = ["c", 3];
const d = ["d", 4];
queueMap.enque(queueId, a);
queueMap.enque(queueId, b);
queueMap.on(queueId, sleepThenReturn(1000), params => params);
queueMap.enque(queueId, c);
queueMap.enque(queueId, d);
expect(queueMap.getLast(queueId)).toEqual(d);
})
test("delete after write finished", async () => {
const mockCallback = jest.fn();
const mockWrite = jest.fn();
const queueMap = new QueueMap();
const queueId = "1";
const mockAsync = ms => async (...params) => {
await mockWrite(...params);
await sleep(ms);
}
const a = ["a", 1];
const b = ["b", 2];
const c = ["c", 3];
const d = ["d", 4];
queueMap.enque(queueId, a);
queueMap.enque(queueId, b);
queueMap.on(queueId, mockAsync(1000));
queueMap.enque(queueId, c);
queueMap.enque(queueId, d);
const args = [a, b];
queueMap.callAfterLoop(queueId, mockCallback, args);
await sleep(3000);
expect(mockCallback.mock.calls.length).toBe(1);
expect(mockCallback.mock.calls[0]).toEqual(args);
})
test("can call callback finished", async () => {
const mockCallback = jest.fn();
const queueMap = new QueueMap();
const queueId = "1";
queueMap.callAfterLoop(queueId, mockCallback);
expect(mockCallback.mock.calls.length).toBe(1);
})
})
123 changes: 123 additions & 0 deletions src/common/queueMap/queueMap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { Args, IQueue, Queue } from "./queue";

interface IQueueMap {
[id: string]: IQueue;
}

export default class QueueMap {
queueById: IQueueMap;
constructor() {
this.queueById = {};
}

/**
* Get a copy of IQueueMap from QueueMap
* @return QueueMap - IQueueMap
*/
getMap = (): IQueueMap => {
return { ...this.queueById };
}

/**
* Get the IQueue by id. Create a new IQueue while get null.
* @param id - id of the queue
* @return IQueue
*/
getQueueById = (id: string): IQueue => {
if (!this.queueById.hasOwnProperty(id)) {
this.queueById[id] = new Queue();
}
return this.queueById[id];
}

/**
* Find a queue by id, then enqueue an object into the queue.
* @param id - id of the queue
* @param args - list of argument
*/
enque = (id: string, args: Args) => {
const { queue } = this.getQueueById(id);
queue.push(args);
}

/**
* @param id - id of the queue
* @return - dequeued object
*/
dequeue = (id: string): Args => {
const { queue } = this.getQueueById(id);
return queue.shift();
}

/**
* Find a queue by id then dequeue. Then clear objects before the last one.
* @param id - id of the queue
* @return - dequeue object
*/
dequeueUntilLast = (id: string): Args => {
let ret = [];
const { queue } = this.getQueueById(id);
while (queue.length > 1) {
ret = queue.shift();
}
return ret;
}

/** Find and return the last element in the queue
* @param id - id of the queue
* @return last element in the queue
*/
getLast = (id: string): Args => {
const { queue } = this.getQueueById(id);
if (queue.length) {
return queue[queue.length - 1];
}
return [];
}

/**
* loop to use last element as parameters to call async method.
* will prevent this function call while the queue is already looping by another function.
* @param id - id of the queue
* @param method - async method to call
* @param paramsHandler - process dequeue object to method parameters
* @param errorHandler - handle async method error
*/
on = (id: string, method: (...args: any[]) => void, paramsHandler = (params) => params, errorHandler = console.error) => {
const q = this.getQueueById(id);
const loop = async () => {
q.isLooping = true;
while (q.queue.length) {
this.dequeueUntilLast(id);
const args = this.getLast(id);
const params = args.map(paramsHandler);
try {
await method(...params);
} catch (err) {
errorHandler(err);
}
this.dequeue(id);
}
q.isLooping = false;
}
if (q.isLooping === false) {
q.promise = loop();
}
}

/**
* call the callback function after loop finished
* @param id - id of the queue
* @param callback - callback after loop finished
* @param args - callback arguments
*/
callAfterLoop = async (id: string, callback: (...args: any[]) => void, args: Args = []) => {
const q = this.getQueueById(id);
if (q.promise) {
await q.promise;
}
await callback(...args);
}
}

export const queueMap = new QueueMap();
44 changes: 44 additions & 0 deletions src/common/queueMap/withQueueMap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { IStorageProvider } from "../../providers/storage/storageProviderFactory";
import { constants } from "../constants";
import { queueMap } from "./queueMap";

// tslint:disable-next-line
export function withQueueMap<T extends { new(...args: any[]): IStorageProvider }>(constructor: T) {
return class extends constructor {
isQueuedFile = (filePath: string = ""): boolean => {
return filePath.endsWith(constants.labelFileExtension);
}

writeText = async (filePath: string, contents: string): Promise<void> => {
const parentWriteText = super.writeText.bind(this);
if (this.isQueuedFile(filePath)) {
queueMap.enque(filePath, [filePath, contents]);
queueMap.on(filePath, parentWriteText);
return;
}
return await parentWriteText(filePath, contents);
}

readText = async (filePath: string, ignoreNotFound?: boolean): Promise<string> => {
const parentReadText = super.readText.bind(this);
if (this.isQueuedFile(filePath)) {
const args = queueMap.getLast(filePath);
if (args.length >= 2) {
const contents = args[1] || "";
return (async () => contents)()
}
}
return parentReadText(filePath, ignoreNotFound);
}

deleteFile = async (filePath: string, ignoreNotFound?: boolean, ignoreForbidden?: boolean) => {
// Expect this function is not called too often or may cause race with readText.
const parentDeleteFile = super.deleteFile.bind(this);
if (this.isQueuedFile(filePath)) {
await queueMap.callAfterLoop(filePath, parentDeleteFile, [filePath, ignoreNotFound, ignoreForbidden])
return;
}
parentDeleteFile(filePath, ignoreNotFound, ignoreForbidden);
}
}
}
2 changes: 2 additions & 0 deletions src/providers/storage/azureBlobStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { AppError, AssetState, AssetType, ErrorCode, IAsset, StorageType, ILabel
import { throwUnhandledRejectionForEdge } from "../../react/components/common/errorHandler/errorHandler";
import { AssetService } from "../../services/assetService";
import { IStorageProvider } from "./storageProviderFactory";
import {withQueueMap} from "../../common/queueMap/withQueueMap"

/**
* Options for Azure Cloud Storage
Expand All @@ -21,6 +22,7 @@ export interface IAzureCloudStorageOptions {
/**
* Storage Provider for Azure Blob Storage
*/
@withQueueMap
export class AzureBlobStorage implements IStorageProvider {
/**
* Storage type
Expand Down
6 changes: 3 additions & 3 deletions src/redux/actions/projectActions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,9 @@ export function saveAssetMetadata(

return async (dispatch: Dispatch) => {
const assetService = new AssetService(project);
const savedMetadata = await assetService.save(newAssetMetadata);
dispatch(saveAssetMetadataAction(savedMetadata));
return { ...savedMetadata };
assetService.save(newAssetMetadata);
dispatch(saveAssetMetadataAction(newAssetMetadata));
return { ...newAssetMetadata };
};
}

Expand Down
3 changes: 1 addition & 2 deletions src/services/assetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ export class AssetService {
this.project.sourceConnection.providerOptions,
);
}

return this.storageProviderInstance;
}

Expand Down Expand Up @@ -745,7 +744,7 @@ export class AssetService {
* @param project to get assets and connect to file system.
* @returns updated project
*/
public static checkAndUpdateSchema = async(project: IProject): Promise<IProject> => {
public static checkAndUpdateSchema = async (project: IProject): Promise<IProject> => {
let shouldAssetsUpdate = false;
let updatedProject;
const { assets } = project;
Expand Down
Loading

0 comments on commit ffeb09d

Please sign in to comment.