Skip to content

Commit

Permalink
limit bulkInsert and trimming bigger array insert
Browse files Browse the repository at this point in the history
  • Loading branch information
rjmasikome committed Oct 24, 2018
1 parent d145963 commit c760352
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 2 deletions.
4 changes: 4 additions & 0 deletions example/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,14 @@ const sleep = (ms: any) => {
},
], 5);

debug(`TTL For ${rowKey}:`, await myInstance.ttl(rowKey, "newColumn"));

await myInstance.multiSet(rowKey, {testColumn: "hello", anotherColumn: "yes"});
await myInstance.multiSet(rowKey, {testColumn: "hello", anotherColumn: "yes"}, 5);
await myInstance.multiSet(rowKey, {testColumn: "hello", anotherColumn: "no"}, 7);

debug(`TTL For ${rowKey}#testColumn:`, await myInstance.ttl(rowKey, "testColumn"));

await myInstance.increase(rowKey, "numberColumn");
await myInstance.increase(rowKey, "numberColumn");
await myInstance.decrease(rowKey, "numberColumn");
Expand Down
68 changes: 66 additions & 2 deletions lib/BigtableClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const DEFAULT_CLUSTER_COUNT = 3;
const DEFAULT_MURMUR_SEED = 0;
const DEFAULT_TTL_BATCHSIZE = 250;

const DEFAULT_INSERT_BULK_LIMIT = 3500;
const DEFAULT_INSERT_BULK_WITH_TTL_LIMIT = 1000;

const DEFAULT_COLUMN = "value";
const DEFAULT_COLUMN_FAMILY = "default";
const COUNTS = "counts";
Expand All @@ -43,6 +46,8 @@ export class BigtableClient extends EventEmitter {
private maxJitterMs: number;
private isInitialized: boolean;
private murmurSeed: number;
private insertBulkLimit: number;
private insertBulkLimitTTL: number;

constructor(
config: BigtableClientConfig,
Expand All @@ -53,6 +58,8 @@ export class BigtableClient extends EventEmitter {
clusterCount?: number,
murmurSeed?: number,
ttlBatchSize?: number,
insertBulkLimit?: number,
insertBulkLimitTTL?: number,
) {
super();

Expand All @@ -63,6 +70,8 @@ export class BigtableClient extends EventEmitter {
this.clusterCount = clusterCount || DEFAULT_CLUSTER_COUNT;
this.murmurSeed = murmurSeed || DEFAULT_MURMUR_SEED;
this.ttlBatchSize = ttlBatchSize || DEFAULT_TTL_BATCHSIZE;
this.insertBulkLimit = insertBulkLimit || DEFAULT_INSERT_BULK_LIMIT;
this.insertBulkLimitTTL = insertBulkLimitTTL || DEFAULT_INSERT_BULK_WITH_TTL_LIMIT;
this.config = config;
this.isInitialized = false;
this.job = new JobTTLEvent(this, this.intervalInMs);
Expand Down Expand Up @@ -208,8 +217,8 @@ export class BigtableClient extends EventEmitter {
},
}));

// Data to be inserted as ttl row in metadata table
const ttlData: Bigtable.TableInsertFormat[] = insertDataFull
// Data to be inserted as ttl row in metadata table as an reduced object to reduce the mutation
const ttlDataObj = insertDataFull
.filter((insertSingleData) => insertSingleData.ttlKey)
.map((insertSingleData) => ({
key: insertSingleData.ttlKey,
Expand All @@ -218,6 +227,24 @@ export class BigtableClient extends EventEmitter {
[insertSingleData.fullQualifier]: insertSingleData.ttl || ttlBulk,
},
},
}))
.reduce((cummulator, currObj) => {

if (!cummulator[currObj.key]) {
cummulator[currObj.key] = currObj.data;
return cummulator;
}

cummulator[currObj.key] = Object.assign({}, cummulator[currObj.key], currObj.data);
return cummulator;
}, {} as any);

// Return the correct schema before insertion
const ttlData: Bigtable.TableInsertFormat[] = Object
.keys(ttlDataObj)
.map((ttlDataKey: string) => ({
key: ttlDataKey,
data: ttlDataObj[ttlDataKey],
}));

await Promise.all([
Expand Down Expand Up @@ -729,6 +756,21 @@ export class BigtableClient extends EventEmitter {
},
}));

const insertDataWithTTLExists = insertData
.map((insertSingleData) => insertSingleData.ttl)
.filter((insertSingleData) => !!insertSingleData)
.length || ttl;

const insertDataLength = insertData.length;

if (insertDataLength > this.insertBulkLimit) {
throw new Error(`Bulk insert limit exceeded, please insert less than ${this.insertBulkLimit} cells`);
}

if (insertDataWithTTLExists && insertDataLength > this.insertBulkLimitTTL) {
throw new Error(`Bulk insert limit with TTL exceeded, please insert less than ${this.insertBulkLimitTTL} cells`);
}

// Push all promises into single Promise.all
await Promise.all([
this.upsertTTLOnBulk(insertData, ttl),
Expand Down Expand Up @@ -938,6 +980,28 @@ export class BigtableClient extends EventEmitter {
return counts || 0;
}

/**
* Get a TTL of a cell
* @param rowKey
* @param column
*/
public async ttl(rowKey: string, column?: string): Promise<any> {

const columnIdentifier = column || this.defaultColumn;
debug("Checking ttl for", `row: ${rowKey}`, `column: ${columnIdentifier}`);

const fullQualifier = `${this.cfName}#${rowKey}#${column}`;
const ttlKey = await this.retrieve(this.tableTTLReference, this.cfNameTTLReference, fullQualifier, "ttlKey");

if (!ttlKey) {
return -1;
}

const remainingTime = (Number(ttlKey.split("#")[2]) - Date.now()) / 1000;

return Number(remainingTime.toFixed());
}

public close(retry?: boolean) {

debug("Closing job..", this.config.name);
Expand Down
2 changes: 2 additions & 0 deletions lib/BigtableFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export class BigtableFactory {
this.config.clusterCount,
this.config.murmurSeed,
this.config.ttlBatchSize,
this.config.insertBulkLimit,
this.config.insertBulkLimitTTL,
);

await bigtableClient.init();
Expand Down
2 changes: 2 additions & 0 deletions lib/interfaces/BigtableConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export interface BigtableFactoryConfig {
clusterCount?: number;
murmurSeed?: number;
ttlBatchSize?: number;
insertBulkLimit?: number;
insertBulkLimitTTL?: number;
}

export interface BigtableClientConfig {
Expand Down

0 comments on commit c760352

Please sign in to comment.