Skip to content

Commit

Permalink
add config to disable count on the table and bump
Browse files Browse the repository at this point in the history
  • Loading branch information
rjmasikome committed Oct 16, 2019
1 parent 355a967 commit 6ab517e
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 94 deletions.
194 changes: 107 additions & 87 deletions lib/BigtableClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export class BigtableClient extends EventEmitter {
private murmurSeed: number;
private insertBulkLimit: number;
private insertBulkLimitTTL: number;
private enableCount: boolean;

public cfName!: string;
public tableMetadata!: Bigtable.Table;
Expand Down Expand Up @@ -73,6 +74,8 @@ export class BigtableClient extends EventEmitter {
this.insertBulkLimitTTL = insertBulkLimitTTL || DEFAULT_INSERT_BULK_WITH_TTL_LIMIT;
this.config = config;
this.isInitialized = false;
this.enableCount = typeof config.enableCount === "undefined" ? true : config.enableCount;

this.job = new JobTTLEvent(this, this.intervalInMs);
}

Expand Down Expand Up @@ -139,7 +142,7 @@ export class BigtableClient extends EventEmitter {
* @param columnNames as column identifier
* @param ttlRowkey to get uniform ttlRowKey
*/
private async getMutateArrayForBulk(rowKey: string, ttl: number, columnNames: string[], ttlRowKey: string) {
private async deletePreviousTTL(rowKey: string, ttl: number, columnNames: string[], ttlRowKey: string) {

const ttlData: Bigtable.GenericObject = {};
const referenceKeys: string[] = [];
Expand All @@ -163,11 +166,19 @@ export class BigtableClient extends EventEmitter {
});

await this.deleteReferenceKeys(referenceKeys);
const promises: Array<Promise<any>> = [];

return {
ttlData,
ttlReferenceData,
};
if (ttlReferenceData) {
promises.push(this.tableTTLReference.insert(ttlReferenceData));
}

if (ttlData) {
promises.push(
this.insert(this.tableMetadata, this.cfNameMetadata, ttlRowKey, ttlData),
);
}

await Promise.all(promises);
}

/**
Expand Down Expand Up @@ -367,7 +378,7 @@ export class BigtableClient extends EventEmitter {
let rowGet = null;

try {
rowGet = await row.get(filter ? {filter} : undefined);
rowGet = await row.get(filter ? {filter} as object : undefined);
} catch (error) {

if (!error.message.startsWith("Unknown row")) {
Expand Down Expand Up @@ -560,7 +571,7 @@ export class BigtableClient extends EventEmitter {
debug("Multi-adding cells for", this.config.name, rowKey, ttl);

const row = this.table.row(rowKey + "");
const insertPromises: Array<Promise<any>> = [];
const promises: Array<Promise<any>> = [];
const columnNames = Object.keys(data);

if (!columnNames.length) {
Expand All @@ -573,17 +584,9 @@ export class BigtableClient extends EventEmitter {

if (ttl) {
const ttlRowKey = this.getTTLRowKey(ttl);
const { ttlData, ttlReferenceData } = await this.getMutateArrayForBulk(rowKey, ttl, columnNames, ttlRowKey);

if (ttlReferenceData) {
insertPromises.push(this.tableTTLReference.insert(ttlReferenceData));
}

if (ttlData) {
insertPromises.push(
this.insert(this.tableMetadata, this.cfNameMetadata, ttlRowKey, ttlData),
);
}
promises.push(
this.deletePreviousTTL(rowKey, ttl, columnNames, ttlRowKey),
);
}

const rules = columnNames
Expand All @@ -604,12 +607,12 @@ export class BigtableClient extends EventEmitter {
) as Bigtable.RowRule[];

if (rules.length > 0) {
insertPromises.push(
promises.push(
row.createRules(rules),
);
}

return Promise.all(insertPromises);
return Promise.all(promises);
}

/**
Expand All @@ -627,7 +630,7 @@ export class BigtableClient extends EventEmitter {
[columnName]: value,
};

const insertPromises: Array<Promise<any>> = [];
const promises: Array<Promise<any>> = [];

if (ttl) {
const ttlRowKey = this.getTTLRowKey(ttl);
Expand All @@ -647,25 +650,27 @@ export class BigtableClient extends EventEmitter {
[columnQualifier]: ttl,
};

insertPromises.push(
promises.push(
this.insert(this.tableTTLReference, this.cfNameTTLReference, columnQualifier, { ttlKey: ttlRowKey }),
this.insert(this.tableMetadata, this.cfNameMetadata, ttlRowKey, ttlData),
);
}

const rowExists = await this.table.row(rowKey + "").exists();
if (!rowExists || !rowExists[0]) {
insertPromises.push(
this.tableMetadata.row(COUNTS)
.increment(`${this.cfNameMetadata}:${COUNTS}`, 1),
);
if (this.enableCount) {
const rowExists = await this.table.row(rowKey + "").exists();
if (!rowExists || !rowExists[0]) {
promises.push(
this.tableMetadata.row(COUNTS)
.increment(`${this.cfNameMetadata}:${COUNTS}`, 1),
);
}
}

insertPromises.push(
promises.push(
this.insert(this.table, this.cfName, rowKey, data),
);

return Promise.all(insertPromises);
return Promise.all(promises);
}

/**
Expand Down Expand Up @@ -718,13 +723,14 @@ export class BigtableClient extends EventEmitter {

const etl = (result: any) => result.id ? true : false;

const rowExists = await this.scanCellsInternal(this.table, options, etl);

// Would be negative int if some rows are empty
const difference = rowExists.length - distinctRows.length;
if (this.enableCount) {
const rowExists = await this.scanCellsInternal(this.table, options, etl);
// Would be negative int if some rows are empty
const difference = rowExists.length - distinctRows.length;

if (difference !== 0) {
await this.tableMetadata.row(COUNTS).increment(`${this.cfNameMetadata}:${COUNTS}`, difference);
if (difference !== 0) {
await this.tableMetadata.row(COUNTS).increment(`${this.cfNameMetadata}:${COUNTS}`, difference);
}
}
}
}
Expand All @@ -747,10 +753,12 @@ export class BigtableClient extends EventEmitter {

await row.deleteCells([`${this.cfName}:${columnName}`]);

const rowExists = await this.table.row(rowKey + "").exists();
if (!rowExists || !rowExists[0]) {
await this.tableMetadata.row(COUNTS)
.increment(`${this.cfNameMetadata}:${COUNTS}`, -1);
if (this.enableCount) {
const rowExists = await this.table.row(rowKey + "").exists();
if (!rowExists || !rowExists[0]) {
await this.tableMetadata.row(COUNTS)
.increment(`${this.cfNameMetadata}:${COUNTS}`, -1);
}
}
}
/**
Expand Down Expand Up @@ -791,12 +799,19 @@ export class BigtableClient extends EventEmitter {
throw new Error(`Bulk insert limit with TTL exceeded, please insert less than ${this.insertBulkLimitTTL} cells`);
}

// Push all promises into single Promise.all
await Promise.all([
const promises: Array<Promise<any>> = [
this.upsertTTLOnBulk(insertData, ttl),
this.upsertCountOnBulk(insertData),
this.table.insert(insertRules),
]);
];

if (this.enableCount) {
promises.push(
this.upsertCountOnBulk(insertData),
);
}

// Push all promises into single Promise.all
await Promise.all(promises);
}

/**
Expand All @@ -809,7 +824,7 @@ export class BigtableClient extends EventEmitter {

debug("Running multi-set for", this.config.name, rowKey, ttl);

const insertPromises: Array<Promise<any>> = [];
const promises: Array<Promise<any>> = [];
const columnNames = Object.keys(columnsObject);

if (!columnNames.length) {
Expand All @@ -818,32 +833,26 @@ export class BigtableClient extends EventEmitter {

if (ttl) {
const ttlRowKey = this.getTTLRowKey(ttl);
const { ttlData, ttlReferenceData } = await this.getMutateArrayForBulk(rowKey, ttl, columnNames, ttlRowKey);

if (ttlReferenceData) {
insertPromises.push(this.tableTTLReference.insert(ttlReferenceData));
}
promises.push(
this.deletePreviousTTL(rowKey, ttl, columnNames, ttlRowKey),
);
}

if (ttlData) {
insertPromises.push(
this.insert(this.tableMetadata, this.cfNameMetadata, ttlRowKey, ttlData),
if (this.enableCount) {
const rowExists = await this.table.row(rowKey + "").exists();
if (!rowExists || !rowExists[0]) {
promises.push(
this.tableMetadata.row(COUNTS)
.increment(`${this.cfNameMetadata}:${COUNTS}`, 1),
);
}
}

const rowExists = await this.table.row(rowKey + "").exists();
if (!rowExists || !rowExists[0]) {
insertPromises.push(
this.tableMetadata.row(COUNTS)
.increment(`${this.cfNameMetadata}:${COUNTS}`, 1),
);
}

insertPromises.push(
promises.push(
this.insert(this.table, this.cfName, rowKey, columnsObject),
);

return Promise.all(insertPromises);
return Promise.all(promises);
}

/**
Expand All @@ -868,11 +877,17 @@ export class BigtableClient extends EventEmitter {
debug("Deleting row for", this.config.name, rowKey);

const row = this.table.row(rowKey);

return Promise.all([
this.tableMetadata.row(COUNTS).increment(`${this.cfNameMetadata}:${COUNTS}`, -1),
const promises: Array<Promise<any>> = [
row.delete(),
]);
];

if (this.enableCount) {
promises.push(
this.tableMetadata.row(COUNTS).increment(`${this.cfNameMetadata}:${COUNTS}`, -1),
);
}

return Promise.all(promises);
}

/**
Expand All @@ -892,7 +907,7 @@ export class BigtableClient extends EventEmitter {
const columnName = column || this.defaultColumn || "";
const row = this.table.row(rowKey);

const insertPromises: Array<Promise<any>> = [];
const promises: Array<Promise<any>> = [];

if (ttl) {
const ttlRowKey = this.getTTLRowKey(ttl);
Expand All @@ -912,25 +927,27 @@ export class BigtableClient extends EventEmitter {
[columnQualifier]: ttl,
};

insertPromises.push(
promises.push(
this.insert(this.tableTTLReference, this.cfNameTTLReference, columnQualifier, { ttlKey: ttlRowKey }),
this.insert(this.tableMetadata, this.cfNameMetadata, ttlRowKey, ttlData),
);
}

const rowExists = await row.exists();
if (!rowExists || !rowExists[0]) {
insertPromises.push(
this.tableMetadata.row(COUNTS)
.increment(`${this.cfNameMetadata}:${COUNTS}`, 1),
);
if (this.enableCount) {
const rowExists = await row.exists();
if (!rowExists || !rowExists[0]) {
promises.push(
this.tableMetadata.row(COUNTS)
.increment(`${this.cfNameMetadata}:${COUNTS}`, 1),
);
}
}

insertPromises.push(
promises.push(
row.increment(`${this.cfName}:${columnName}`, 1),
);

return Promise.all(insertPromises);
return Promise.all(promises);
}

/**
Expand All @@ -950,7 +967,7 @@ export class BigtableClient extends EventEmitter {
const columnName = column || this.defaultColumn || "";
const row = this.table.row(rowKey);

const insertPromises: Array<Promise<any>> = [];
const promises: Array<Promise<any>> = [];

if (ttl) {
const ttlRowKey = this.getTTLRowKey(ttl);
Expand All @@ -970,31 +987,34 @@ export class BigtableClient extends EventEmitter {
[columnQualifier]: ttl,
};

insertPromises.push(
promises.push(
this.insert(this.tableTTLReference, this.cfNameTTLReference, columnQualifier, { ttlKey: ttlRowKey }),
this.insert(this.tableMetadata, this.cfNameMetadata, ttlRowKey, ttlData),
);
}

const rowExists = await row.exists();
if (!rowExists || !rowExists[0]) {
insertPromises.push(
this.tableMetadata.row(COUNTS)
.increment(`${this.cfNameMetadata}:${COUNTS}`, 1),
);
if (this.enableCount) {
const rowExists = await row.exists();
if (!rowExists || !rowExists[0]) {
promises.push(
this.tableMetadata.row(COUNTS)
.increment(`${this.cfNameMetadata}:${COUNTS}`, 1),
);
}
}

insertPromises.push(
promises.push(
row.increment(`${this.cfName}:${columnName}`, -1),
);

return Promise.all(insertPromises);
return Promise.all(promises);
}

/**
* Get a count of a table
*/
public async count(): Promise<any> {

debug("Checking count for", this.config.name);
const counts = await this.retrieve(this.tableMetadata, this.cfNameMetadata, COUNTS, COUNTS);
return counts || 0;
Expand Down
1 change: 1 addition & 0 deletions lib/interfaces/BigtableConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface BigtableClientConfig {
defaultValue?: string;
maxVersions?: number;
maxAgeSecond?: number;
enableCount?: boolean;
}

export interface RuleColumnFamily {
Expand Down
Loading

0 comments on commit 6ab517e

Please sign in to comment.