diff --git a/journey/k8s.ts b/journey/k8s.ts index 05fb0d7d9..479c27d9c 100644 --- a/journey/k8s.ts +++ b/journey/k8s.ts @@ -28,6 +28,14 @@ export async function waitForDeploymentReady(namespace: string, name: string) { } } +export async function noWaitPeprStoreKey(name: string, matchKey: string) { + const store = await K8s(PeprStore).InNamespace("pepr-system").Get(name); + if (store.data[matchKey]) { + return store.data[matchKey]; + } +} + + export async function waitForPeprStoreKey(name: string, matchKey: string) { try { const store = await K8s(PeprStore).InNamespace("pepr-system").Get(name); diff --git a/journey/pepr-deploy.ts b/journey/pepr-deploy.ts index 75afc874c..de90f3f03 100644 --- a/journey/pepr-deploy.ts +++ b/journey/pepr-deploy.ts @@ -1,15 +1,15 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: 2023-Present The Pepr Authors -import { beforeAll, jest, afterAll, describe, expect, it } from "@jest/globals"; -import { execSync, spawnSync, spawn, ChildProcess } from "child_process"; +import { describe, expect, it } from "@jest/globals"; +import { execSync, spawnSync, spawn } from "child_process"; import { K8s, kind } from "kubernetes-fluent-client"; import { resolve } from "path"; - import { destroyModule } from "../src/lib/assets/destroy"; import { cwd } from "./entrypoint.test"; import { deleteConfigMap, + noWaitPeprStoreKey, waitForConfigMap, waitForDeploymentReady, waitForNamespace, @@ -23,7 +23,18 @@ export function peprDeploy() { // Purge the Pepr module from the cluster before running the tests destroyModule("pepr-static-test"); + it("should deploy the Pepr controller into the test cluster", async () => { + // Apply the store crd and pepr-system ns + await applyStoreCRD(); + + // Apply the store + await applyLegacyStoreResource(); + + /* + * when controller starts up, it will migrate the store + * and later on the keys will be tested to validate the migration + */ execSync("npx pepr deploy -i pepr:dev --confirm", { cwd, stdio: "inherit" }); // Wait for the deployments to be ready @@ -50,7 +61,7 @@ export function peprDeploy() { it("npx pepr monitor should display validation results to console", async () => { await testValidate(); - + const cmd = ['pepr', 'monitor', 'static-test'] const proc = spawn('npx', cmd, { shell: true }) @@ -68,7 +79,7 @@ export function peprDeploy() { proc.stderr.destroy() } }) - + proc.on('exit', () => state.done = true); await until(() => state.done) @@ -164,6 +175,7 @@ function testIgnore() { expect(cm.metadata?.labels?.["pepr"]).toBeUndefined(); }); } + async function testValidate() { // Apply the sample yaml for the HelloPepr capability const applyOut = spawnSync("kubectl apply -f hello-pepr.samples.yaml", { @@ -260,6 +272,7 @@ function testMutate() { }); } + function testStore() { it("should create the PeprStore", async () => { const resp = await waitForPeprStoreKey("pepr-static-test-store", "__pepr_do_not_delete__"); @@ -267,15 +280,49 @@ function testStore() { }); it("should write the correct data to the PeprStore", async () => { - const key1 = await waitForPeprStoreKey("pepr-static-test-store", "hello-pepr-example-1"); + const key1 = await waitForPeprStoreKey("pepr-static-test-store", `hello-pepr-v2-example-1`); expect(key1).toBe("was-here"); - const key2 = await waitForPeprStoreKey("pepr-static-test-store", "hello-pepr-example-1-data"); + // Should have been migrated and removed + const nullKey1 = await noWaitPeprStoreKey("pepr-static-test-store", `hello-pepr-example-1`); + expect(nullKey1).toBeUndefined(); + + const key2 = await waitForPeprStoreKey("pepr-static-test-store", `hello-pepr-v2-example-1-data`); expect(key2).toBe(JSON.stringify({ key: "ex-1-val" })); + + // Should have been migrated and removed + const nullKey2 = await noWaitPeprStoreKey("pepr-static-test-store", `hello-pepr-example-1-data`); + expect(nullKey1).toBeUndefined(); }); it("should write the correct data to the PeprStore from a Watch Action", async () => { - const key = await waitForPeprStoreKey("pepr-static-test-store", "hello-pepr-watch-data"); + const key = await waitForPeprStoreKey("pepr-static-test-store", `hello-pepr-v2-watch-data`); expect(key).toBe("This data was stored by a Watch Action."); }); } + + + +async function applyStoreCRD() { + // Apply the store crd + const appliedStoreCRD = spawnSync("kubectl apply -f journey/resources/pepr-store-crd.yaml", { + shell: true, // Run command in a shell + encoding: "utf-8", // Encode result as string + cwd: resolve(cwd, ".."), + }); + const { stdout } = appliedStoreCRD; + + expect(stdout).toContain("customresourcedefinition.apiextensions.k8s.io/peprstores.pepr.dev"); +} + +async function applyLegacyStoreResource() { + // Apply the store + const appliedStore = spawnSync("kubectl apply -f journey/resources/non-migrated-peprstore.yaml", { + shell: true, // Run command in a shell + encoding: "utf-8", // Encode result as string + cwd: resolve(cwd, ".."), + }); + const { stdout } = appliedStore; + + expect(stdout).toContain("peprstore.pepr.dev/pepr-static-test-store"); +} diff --git a/journey/pepr-dev.ts b/journey/pepr-dev.ts index b7615133b..806a9b4da 100644 --- a/journey/pepr-dev.ts +++ b/journey/pepr-dev.ts @@ -6,7 +6,6 @@ import { ChildProcessWithoutNullStreams, spawn } from "child_process"; import { Agent } from "https"; import { fetch } from "kubernetes-fluent-client"; import { RequestInit } from "node-fetch"; - import { cwd } from "./entrypoint.test"; import { sleep } from "./k8s"; @@ -25,8 +24,8 @@ let expectedLines = [ "Validate Action configured for CREATE", "Server listening on port 3000", "Controller startup complete", - `"hello-pepr-example-1-data": "{\\"key\\":\\"ex-1-val\\"}"`, - `"hello-pepr-watch-data": "This data was stored by a Watch Action."`, + `"hello-pepr-v2-example-1-data": "{\\"key\\":\\"ex-1-val\\"}"`, + `"hello-pepr-v2-watch-data": "This data was stored by a Watch Action."`, ]; export function peprDev() { diff --git a/journey/resources/clusterrole.yaml b/journey/resources/clusterrole.yaml index 02fa801fb..b08644bc8 100644 --- a/journey/resources/clusterrole.yaml +++ b/journey/resources/clusterrole.yaml @@ -1,3 +1,4 @@ +--- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: diff --git a/journey/resources/non-migrated-peprstore.yaml b/journey/resources/non-migrated-peprstore.yaml new file mode 100644 index 000000000..346ba5b13 --- /dev/null +++ b/journey/resources/non-migrated-peprstore.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: pepr.dev/v1 +data: + __pepr_do_not_delete__: k-thx-bye + hello-pepr-example-1: was-here + hello-pepr-example-1-data: '{"key":"ex-1-val"}' + hello-pepr-watch-data: "This data was stored by a Watch Action." +kind: PeprStore +metadata: + name: pepr-static-test-store + namespace: pepr-system + diff --git a/journey/resources/pepr-store-crd.yaml b/journey/resources/pepr-store-crd.yaml new file mode 100644 index 000000000..4390a6cd8 --- /dev/null +++ b/journey/resources/pepr-store-crd.yaml @@ -0,0 +1,36 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: peprstores.pepr.dev +spec: + conversion: + strategy: None + group: pepr.dev + names: + kind: PeprStore + listKind: PeprStoreList + plural: peprstores + singular: peprstore + scope: Namespaced + versions: + - name: v1 + schema: + openAPIV3Schema: + properties: + data: + additionalProperties: + type: string + type: object + type: object + served: true + storage: true +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + kubernetes.io/metadata.name: pepr-system + name: pepr-system +spec: {} + diff --git a/package-lock.json b/package-lock.json index ddeffc2ee..4150a7cbf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,6 +12,7 @@ "@types/ramda": "0.30.1", "express": "4.19.2", "fast-json-patch": "3.1.1", + "json-pointer": "^0.6.2", "kubernetes-fluent-client": "2.6.5", "pino": "9.3.1", "pino-pretty": "11.2.1", @@ -28,6 +29,7 @@ "@jest/globals": "29.7.0", "@types/eslint": "9.6.0", "@types/express": "4.17.21", + "@types/json-pointer": "^1.0.34", "@types/node": "18.x.x", "@types/node-forge": "1.3.11", "@types/prompts": "2.4.9", @@ -2530,6 +2532,13 @@ "resolved": "https://registry.npmjs.org/@types/js-yaml/-/js-yaml-4.0.9.tgz", "integrity": "sha512-k4MGaQl5TGo/iipqb2UDG2UwjXziSWkh0uysQelTlJpX1qGlpUZYm8PnO4DxG1qBomtJUdYJ6qR6xdIah10JLg==" }, + "node_modules/@types/json-pointer": { + "version": "1.0.34", + "resolved": "https://registry.npmjs.org/@types/json-pointer/-/json-pointer-1.0.34.tgz", + "integrity": "sha512-JRnWcxzXSaLei98xgw1B7vAeBVOrkyw0+Rt9j1QoJrczE78OpHsyQC8GNbuhw+/2vxxDe58QvWnngS86CoIbRg==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/json-schema": { "version": "7.0.15", "resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.15.tgz", @@ -4600,6 +4609,12 @@ "integrity": "sha512-36yxDn5H7OFZQla0/jFJmbIKTdZAQHngCedGxiMmpNfEZM0sdEeT+WczLQrjK6D7o2aiyLYDnkw0R3JK0Qv1RQ==", "peer": true }, + "node_modules/foreach": { + "version": "2.0.6", + "resolved": "https://registry.npmjs.org/foreach/-/foreach-2.0.6.tgz", + "integrity": "sha512-k6GAGDyqLe9JaebCsFCoudPPWfihKu8pylYXRlqP1J7ms39iPoTtk2fviNglIeQEwdh0bQeKJ01ZPyuyQvKzwg==", + "license": "MIT" + }, "node_modules/foreground-child": { "version": "3.2.1", "resolved": "https://registry.npmjs.org/foreground-child/-/foreground-child-3.2.1.tgz", @@ -5927,6 +5942,15 @@ "integrity": "sha512-xyFwyhro/JEof6Ghe2iz2NcXoj2sloNsWr/XsERDK/oiPCfaNhl5ONfp+jQdAZRQQ0IJWNzH9zIZF7li91kh2w==", "dev": true }, + "node_modules/json-pointer": { + "version": "0.6.2", + "resolved": "https://registry.npmjs.org/json-pointer/-/json-pointer-0.6.2.tgz", + "integrity": "sha512-vLWcKbOaXlO+jvRy4qNd+TI1QUPZzfJj1tpJ3vAXDych5XJf93ftpUKe5pKCrzyIIwgBJcOcCVRUfqQP25afBw==", + "license": "MIT", + "dependencies": { + "foreach": "^2.0.4" + } + }, "node_modules/json-schema-traverse": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", diff --git a/package.json b/package.json index d13d7951c..2868d8141 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "@types/ramda": "0.30.1", "express": "4.19.2", "fast-json-patch": "3.1.1", + "json-pointer": "^0.6.2", "kubernetes-fluent-client": "2.6.5", "pino": "9.3.1", "pino-pretty": "11.2.1", @@ -48,6 +49,7 @@ "@jest/globals": "29.7.0", "@types/eslint": "9.6.0", "@types/express": "4.17.21", + "@types/json-pointer": "^1.0.34", "@types/node": "18.x.x", "@types/node-forge": "1.3.11", "@types/prompts": "2.4.9", diff --git a/src/lib/controller/store.ts b/src/lib/controller/store.ts index 9439feb86..b495cab5a 100644 --- a/src/lib/controller/store.ts +++ b/src/lib/controller/store.ts @@ -61,8 +61,8 @@ export class PeprControllerStore { K8s(PeprStore) .InNamespace(namespace) .Get(this.#name) - // If the get succeeds, setup the watch - .then(this.#setupWatch) + // If the get succeeds, migrate and setup the watch + .then(async (store: PeprStore) => await this.#migrateAndSetupWatch(store)) // Otherwise, create the resource .catch(this.#createStoreResource), Math.random() * 3000, @@ -74,6 +74,91 @@ export class PeprControllerStore { watcher.start().catch(e => Log.error(e, "Error starting Pepr store watch")); }; + #migrateAndSetupWatch = async (store: PeprStore) => { + Log.debug(store, "Pepr Store migration"); + const data: DataStore = store.data || {}; + const migrateCache: Record = {}; + + // Send the cached updates to the cluster + const flushCache = async () => { + const indexes = Object.keys(migrateCache); + const payload = Object.values(migrateCache); + + // Loop over each key in the cache and delete it to avoid collisions with other sender calls + for (const idx of indexes) { + delete migrateCache[idx]; + } + + try { + // Send the patch to the cluster + await K8s(PeprStore, { namespace, name: this.#name }).Patch(payload); + } catch (err) { + Log.error(err, "Pepr store update failure"); + + if (err.status === 422) { + Object.keys(migrateCache).forEach(key => delete migrateCache[key]); + } else { + // On failure to update, re-add the operations to the cache to be retried + for (const idx of indexes) { + migrateCache[idx] = payload[Number(idx)]; + } + } + } + }; + + const fillCache = (name: string, op: DataOp, key: string[], val?: string) => { + if (op === "add") { + // adjust the path for the capability + const path = `/data/${name}-v2-${key}`; + const value = val || ""; + const cacheIdx = [op, path, value].join(":"); + + // Add the operation to the cache + migrateCache[cacheIdx] = { op, path, value }; + + return; + } + + if (op === "remove") { + if (key.length < 1) { + throw new Error(`Key is required for REMOVE operation`); + } + + for (const k of key) { + const path = `/data/${name}-${k}`; + const cacheIdx = [op, path].join(":"); + + // Add the operation to the cache + migrateCache[cacheIdx] = { op, path }; + } + + return; + } + + // If we get here, the operation is not supported + throw new Error(`Unsupported operation: ${op}`); + }; + + for (const name of Object.keys(this.#stores)) { + // Get the prefix offset for the keys + const offset = `${name}-`.length; + + // Loop over each key in the store + for (const key of Object.keys(data)) { + // Match on the capability name as a prefix for non v2 keys + if (startsWith(name, key) && !startsWith(`${name}-v2`, key)) { + // populate migrate cache + fillCache(name, "remove", [key.slice(offset)], data[key]); + fillCache(name, "add", [key.slice(offset)], data[key]); + } + } + + // await K8s(PeprStore, { namespace, name: this.#name }).Patch(payload); + } + await flushCache(); + this.#setupWatch(); + }; + #receive = (store: PeprStore) => { Log.debug(store, "Pepr Store update"); @@ -121,6 +206,7 @@ export class PeprControllerStore { // Load the sendCache with patch operations const fillCache = (op: DataOp, key: string[], val?: string) => { if (op === "add") { + // adjust the path for the capability const path = `/data/${capabilityName}-${key}`; const value = val || ""; const cacheIdx = [op, path, value].join(":"); diff --git a/src/lib/errors.test.ts b/src/lib/errors.test.ts index 7c94fb5f1..10a0a9698 100644 --- a/src/lib/errors.test.ts +++ b/src/lib/errors.test.ts @@ -9,7 +9,6 @@ describe("ValidateError Fuzz Testing", () => { test("should only accept predefined error values", () => { fc.assert( fc.property(fc.string(), error => { - console.log("error", error); if (ErrorList.includes(error)) { expect(() => ValidateError(error)).not.toThrow(); } else { diff --git a/src/lib/storage.test.ts b/src/lib/storage.test.ts index 7d4650e58..e963d912e 100644 --- a/src/lib/storage.test.ts +++ b/src/lib/storage.test.ts @@ -2,9 +2,91 @@ // SPDX-FileCopyrightText: 2023-Present The Pepr Authors import { beforeEach, describe, expect, it, jest } from "@jest/globals"; +import { DataStore, Storage, v2StoreKey, stripV2Prefix } from "./storage"; +import fc from "fast-check"; + +describe("stripV2Prefix", () => { + it("should remove the v2 prefix", () => { + const keys = ["v2-key1", "v2-key2", "v2-key3", "v2-key4", "v2-key5"]; + const results = ["key1", "key2", "key3", "key4", "key5"]; + + for (let i = 0; i < keys.length; i++) { + const result = stripV2Prefix(keys[i]); + expect(result).toEqual(results[i]); + } + }); +}); +describe("v2StoreKey", () => { + it("should prefix the key with v2", () => { + const keys = ["key1", "key2", "key3", "key4", "key5"]; + const results = ["v2-key1", "v2-key2", "v2-key3", "v2-key4", "v2-key5"]; + + for (let i = 0; i < keys.length; i++) { + const result = v2StoreKey(keys[i]); + expect(result).toEqual(results[i]); + } + }); +}); +describe("Storage with fuzzing and property-based tests", () => { + let storage: Storage; -import { DataStore, Storage } from "./storage"; + beforeEach(() => { + storage = new Storage(); + storage.registerSender(jest.fn()); + }); + + it("should correctly set and retrieve items", () => { + fc.assert( + fc.property(fc.string(), fc.string(), (key, value) => { + storage.setItem(key, value); + const mockData: DataStore = { [v2StoreKey(key)]: value }; + storage.receive(mockData); + if (value === "") { + expect(storage.getItem(key)).toBeNull(); + } else { + expect(storage.getItem(key)).toEqual(value); + } + }), + { numRuns: 100 }, + ); + }); + it("should return null for non-existing items", () => { + fc.assert( + fc.property(fc.string(), key => { + expect(storage.getItem(key)).toBeNull(); + }), + { numRuns: 100 }, + ); + }); + + it("should correctly remove items", () => { + fc.assert( + fc.property(fc.string(), fc.string(), (key, value) => { + storage.setItem(key, value); + storage.removeItem(key); + expect(storage.getItem(key)).toBeNull(); + }), + { numRuns: 100 }, + ); + }); + + it("should ensure all set items are v2-coded internally", () => { + fc.assert( + fc.property(fc.string(), fc.string(), (key, value) => { + storage.setItem(key, value); + const mockData: DataStore = { [v2StoreKey(key)]: value }; + storage.receive(mockData); + if (value === "") { + expect(storage.getItem(key)).toBeNull(); + } else { + expect(storage.getItem(key)).toEqual(value); + } + }), + { numRuns: 100 }, + ); + }); +}); describe("Storage", () => { let storage: Storage; @@ -15,21 +97,22 @@ describe("Storage", () => { it("should set an item", () => { const mockSender = jest.fn(); storage.registerSender(mockSender); + const key = "key1"; + storage.setItem(key, "value1"); - storage.setItem("key1", "value1"); - - expect(mockSender).toHaveBeenCalledWith("add", ["key1"], "value1"); + expect(mockSender).toHaveBeenCalledWith("add", [v2StoreKey(key)], "value1"); }); it("should set an item and wait", () => { const mockSender = jest.fn(); storage.registerSender(mockSender); jest.useFakeTimers(); + const key = "key1"; // asserting on sender invocation rather than Promise so no need to wait - void storage.setItemAndWait("key1", "value1"); + void storage.setItemAndWait(key, "value1"); - expect(mockSender).toHaveBeenCalledWith("add", ["key1"], "value1"); + expect(mockSender).toHaveBeenCalledWith("add", [v2StoreKey(key)], "value1"); jest.useRealTimers(); }); @@ -38,20 +121,21 @@ describe("Storage", () => { storage.registerSender(mockSender); jest.useFakeTimers(); + const key = "key1"; // asserting on sender invocation rather than Promise so no need to wait - void storage.removeItemAndWait("key1"); + void storage.removeItemAndWait(key); - expect(mockSender).toHaveBeenCalledWith("remove", ["key1"], undefined); + expect(mockSender).toHaveBeenCalledWith("remove", [v2StoreKey(key)], undefined); jest.useRealTimers(); }); it("should remove an item", () => { const mockSender = jest.fn(); storage.registerSender(mockSender); + const key = "key1"; + storage.removeItem(key); - storage.removeItem("key1"); - - expect(mockSender).toHaveBeenCalledWith("remove", ["key1"], undefined); + expect(mockSender).toHaveBeenCalledWith("remove", [v2StoreKey(key)], undefined); }); it("should clear all items", () => { @@ -65,12 +149,16 @@ describe("Storage", () => { }); it("should get an item", () => { - const mockData: DataStore = { key1: "value1" }; - storage.receive(mockData); + const keys = ["key1", "!", "!", "pepr", "https://google.com", "sftp://here:22", "!"]; + const results = ["value1", null, "!", "was-here", "3f7dd007-568f-4f4a-bbac-2e6bfff93860", "your-machine", " "]; - const value = storage.getItem("key1"); + keys.map((key, i) => { + const mockData: DataStore = { [v2StoreKey(key)]: results[i]! }; - expect(value).toEqual("value1"); + storage.receive(mockData); + const value = storage.getItem(keys[i]); + expect(value).toEqual(results[i]); + }); }); it("should return null for non-existing item", () => { diff --git a/src/lib/storage.ts b/src/lib/storage.ts index 6877ce8d3..e607f62e7 100644 --- a/src/lib/storage.ts +++ b/src/lib/storage.ts @@ -3,7 +3,7 @@ import { clone } from "ramda"; import Log from "./logger"; - +import pointer from "json-pointer"; export type DataOp = "add" | "remove"; export type DataStore = Record; export type DataSender = (op: DataOp, keys: string[], value?: string) => void; @@ -11,6 +11,15 @@ export type DataReceiver = (data: DataStore) => void; export type Unsubscribe = () => void; const MAX_WAIT_TIME = 15000; +const STORE_VERSION_PREFIX = "v2"; + +export function v2StoreKey(key: string) { + return `${STORE_VERSION_PREFIX}-${pointer.escape(key)}`; +} + +export function stripV2Prefix(key: string) { + return key.replace(/^v2-/, ""); +} export interface PeprStore { /** * Returns the current value associated with the given key, or null if the given key does not exist. @@ -60,6 +69,7 @@ export interface PeprStore { * * The API is similar to the [Storage API](https://developer.mozilla.org/docs/Web/API/Storage) */ + export class Storage implements PeprStore { #store: DataStore = {}; #send!: DataSender; @@ -85,8 +95,11 @@ export class Storage implements PeprStore { }; getItem = (key: string) => { - // Return null if the value is the empty string - return this.#store[key] || null; + const result = this.#store[v2StoreKey(key)] || null; + if (result !== null && typeof result !== "function" && typeof result !== "object") { + return result; + } + return null; }; clear = () => { @@ -94,11 +107,11 @@ export class Storage implements PeprStore { }; removeItem = (key: string) => { - this.#dispatchUpdate("remove", [key]); + this.#dispatchUpdate("remove", [v2StoreKey(key)]); }; setItem = (key: string, value: string) => { - this.#dispatchUpdate("add", [key], value); + this.#dispatchUpdate("add", [v2StoreKey(key)], value); }; /** @@ -110,10 +123,10 @@ export class Storage implements PeprStore { * @returns */ setItemAndWait = (key: string, value: string) => { - this.#dispatchUpdate("add", [key], value); + this.#dispatchUpdate("add", [v2StoreKey(key)], value); return new Promise((resolve, reject) => { const unsubscribe = this.subscribe(data => { - if (data[key] === value) { + if (data[`${v2StoreKey(key)}`] === value) { unsubscribe(); resolve(); } @@ -135,10 +148,10 @@ export class Storage implements PeprStore { * @returns */ removeItemAndWait = (key: string) => { - this.#dispatchUpdate("remove", [key]); + this.#dispatchUpdate("remove", [v2StoreKey(key)]); return new Promise((resolve, reject) => { const unsubscribe = this.subscribe(data => { - if (!Object.hasOwn(data, key)) { + if (!Object.hasOwn(data, `${v2StoreKey(key)}`)) { unsubscribe(); resolve(); }