-
Notifications
You must be signed in to change notification settings - Fork 5
/
BigQueryModel.ts
447 lines (383 loc) · 14.8 KB
/
BigQueryModel.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
/*
The Model.
Performs asynchronous BigQuery requests.
Uses query parameters to let BigQuery engine know that those parts of the
query need extra scrutiny to prevent SQL injection. This adds another layer
of protection to the parameters validation performed by both client and
backend. The regular expressions used for the validation must be either
simple or constructed using a library that provides protection against
DoS attack.
*/
import { BigQuery, Query as BigQueryRequestBase } from "@google-cloud/bigquery";
import { Job, QueryResultsOptions } from "@google-cloud/bigquery/build/src/job";
import * as NodeCache from "node-cache";
import { logger } from "../../utils/logger";
import { CustomError } from "../../utils/error";
import {
IBigQueryData,
BigQueryRequest,
BigQueryRetrieval,
BigQueryRetrievalResult,
BigQueryRetrievalRow
} from "../types/BigQueryTypes";
import {
EnvVariables,
EnvConfig,
getCacheLimit as getCacheSizeLimit,
getDataLimit,
getDataLimitClient
} from "../../utils/misc";
import { PersistentStorageManager } from "../../utils/storage";
/*
Model configuration
*/
export class BigQueryModelConfig {
constructor(
// Daily limit on BigQuery data usage in MB per client address
private limitDailyClientMB = BigQueryModelConfig.s_quotaDailyClientMB,
// Daily limit on BigQuery data usage in MB per backend instance
private limitDailyInstanceMB = BigQueryModelConfig.s_quotaDailyInstanceMB,
// Environment config
readonly envConfig: EnvVariables = EnvConfig.getVariables()
) {
if (limitDailyClientMB <= 0 || limitDailyClientMB > BigQueryModelConfig.s_quotaDailyClientMB) {
throw new RangeError("Client data limit is invalid");
}
if (limitDailyInstanceMB <= 0 || limitDailyInstanceMB > BigQueryModelConfig.s_quotaDailyInstanceMB) {
throw new RangeError("Instance data limit is invalid");
}
}
public readonly setClientDailyLimit = (limit: number): void => {
if (typeof limit !== "number" || !Number.isInteger(limit)) {
throw new TypeError("Client data limit is not an integer");
}
if (limit <= 0 || limit > BigQueryModelConfig.s_quotaDailyClientMB) {
throw new RangeError("Client data limit is invalid");
}
this.limitDailyClientMB = limit;
}
public readonly getClientDailyLimit = (): number => {
return this.limitDailyClientMB;
}
public readonly getInstanceDailyLimit = (): number => {
return this.limitDailyInstanceMB;
}
// Default daily quota on BigQuery data usage in MB per client address
private static readonly s_quotaDailyClientMB = getDataLimitClient();
// Default daily quota on BigQuery data usage in MB per backend instance
private static readonly s_quotaDailyInstanceMB = getDataLimit();
}
/*
Model interface.
Extends the data storage interface by adding data fetching capability.
*/
export interface IBigQueryFetcher extends IBigQueryData {
readonly fetch: (param: BigQueryRequest) => Promise<void>;
}
/*
Model implementation
Usage:
1. Use .Config setter once to set the configuration.
2. Use .Factory getter one or many times to get an instance of the class.
3. Use the instance of the class to await .fetch().
4. Use .Data getter to get either the data fetched or an Error object.
*/
export class BigQueryModel implements IBigQueryFetcher {
static initialize(): void {
BigQueryModel.s_cache.on("expired", BigQueryModel.handleCacheExpiry);
PersistentStorageManager.ReadLimitCounters(BigQueryModel.s_cache);
}
static set Config(config: BigQueryModelConfig) {
BigQueryModel.s_config = config;
}
static get Factory(): BigQueryModel {
const ret = new BigQueryModel();
// do some extra work that the model might require
return ret;
}
public async fetch(bqRequest: BigQueryRequest): Promise<void> {
this.m_bqRequest = bqRequest;
if (BigQueryModel.s_cache.getStats().keys > BigQueryModel.s_limitCache) {
this.m_queryResult = new CustomError(503, BigQueryModel.s_errBusy, false, false);
logger.warn({ message: `Client ${bqRequest.clientAddress} rejected due to cache limit` });
return;
}
const dataUsage = this.getDataUsage(bqRequest.clientAddress);
// Check data usage per client
if (dataUsage.client_data > BigQueryModel.s_config!.getClientDailyLimit()) {
this.m_queryResult = new CustomError(509, BigQueryModel.s_errLimitClient, false, false);
return;
}
// Check data usage by the instance of BigQueryModel class
if (dataUsage.instance_data > BigQueryModel.s_config!.getInstanceDailyLimit()) {
this.m_queryResult = new CustomError(509, BigQueryModel.s_errLimitInstance, false, false);
logger.warn({ message: `Client ${bqRequest.clientAddress} request denied due to backend reaching its daily data limit` });
return;
}
await this.fetchData();
this.deduplicateRetrieval(bqRequest.DeduplicationTimeDiff);
}
get Data(): BigQueryRetrieval {
return this.m_queryResult;
}
public getData(): BigQueryRetrieval {
return this.m_queryResult;
}
/********************** private methods and data ************************/
private constructor() {
if (!BigQueryModel.s_config) {
throw new Error("BigQueryModelConfig is undefined");
}
}
private async fetchData(): Promise<void> {
const bqRequest = this.m_bqRequest as BigQueryRequest;
try {
let jobId = "";
let job: Job | undefined;
if (bqRequest.JobId) {
jobId = bqRequest.JobId;
job = BigQueryModel.s_cache.get(jobId);
if (!job) {
job = this.m_bigquery.job(jobId);
job && BigQueryModel.s_cache.set(jobId, job);
}
} else {
const requestOptions = {
...this.m_requestOptions,
maxResults: bqRequest.RowCount,
params: this.getRequestParams(),
query: this.getQuerySql(),
useQueryCache: bqRequest.useQueryCache,
};
logger.info({ message: `Query: ${requestOptions.query}` });
const [jobCreated,] = await this.m_bigquery.createQueryJob(requestOptions);
if (jobCreated && jobCreated.id) {
jobId = jobCreated.id;
job = jobCreated;
BigQueryModel.s_cache.set(jobId, job);
} else {
throw new Error(BigQueryModel.s_errLaunch);
}
}
if (!job) {
this.m_queryResult = new CustomError(408, BigQueryModel.s_errStale, false, false);
logger.warn({ message: `Client ${bqRequest.clientAddress} has sent stale request` });
return;
}
const resultOptions = {
...this.m_resultOptions,
maxResults: bqRequest.RowCount,
pageToken: bqRequest.PageToken,
};
const data: ReadonlyArray<any> = await job.getQueryResults(resultOptions);
const rows: ReadonlyArray<BigQueryRetrievalRow> = data[0];
const { jobComplete, totalRows, totalBytesProcessed, pageToken } = data[2];
this.m_queryResult = jobComplete ?
new BigQueryRetrievalResult(rows,
jobComplete,
totalRows,
rows.length,
totalBytesProcessed,
pageToken,
pageToken ? jobId : undefined) :
new Error("Backend database query did not complete");
jobComplete && this.adjustDataUsage(bqRequest.clientAddress, totalBytesProcessed);
} catch (err) {
const errorMsg = err instanceof Error ? err.message : (
"Exception: <" +
Object.keys(err).map((key) => `${key}: ${err[key] ?? "no data"}`).join("\n") +
">"
);
logger.error({ message: `Backend data retrieval failed, error: ${errorMsg}` });
this.m_queryResult = new Error(BigQueryModel.s_errMsg);
}
}
// A sample routine which demonstrates post-processing in cases when its logic
// could not be implemented in the SQL statement for whatever reason.
private deduplicateRetrieval(deduplicationTimeDiff: number) {
if (this.m_queryResult instanceof Error) {
return;
}
const data = this.m_queryResult as BigQueryRetrievalResult;
if (data.jobComplete === false || data.totalRows === 0) {
return;
}
if (!data.rows || data.rows.length === 0) {
return;
}
if (deduplicationTimeDiff === 0) {
return; // client requested deduplication to be disabled
}
const outRows = new Array<BigQueryRetrievalRow>();
data.rows.reduce((acc, row, idx) => {
if (idx === 0) {
acc.push(row);
} else {
const row1 = acc[acc.length - 1];
BigQueryRetrievalRow.isDuplicate(row1, row, deduplicationTimeDiff) || acc.push(row);
}
return acc;
}, outRows);
data.rows = outRows;
}
private static handleCacheExpiry = (cache_key: string, value: any) => {
if (!cache_key) {
return;
}
if (cache_key.startsWith(BigQueryModel.s_limitPrefix)) {
PersistentStorageManager.WriteLimitCounters(
[cache_key, undefined],
0
);
return;
}
const job = value as Job;
try {
job.cancel().then(
() => logger.info({ message: `Cancelled stale BigQuery job ${job.id}` }),
() => logger.warn({ message: `Failed to cancel stale BigQuery job ${job.id}` })
);
} catch (err) {
const errorMsg = err instanceof Error ? err.message : (
"Exception: <" +
Object.keys(err).map((key) => `${key}: ${err[key] ?? "no data"}`).join("\n") +
">"
);
logger.warn({ message: `Failed to cancel stale BigQuery job, error: ${errorMsg}` });
}
}
private getDatasourceName(): string {
const str1 = `\`${BigQueryModel.s_config!.envConfig.gcpProjectId}`;
const str2 = `.${BigQueryModel.s_config!.envConfig.bqDatasetName}`;
const str3 = `.${BigQueryModel.s_config!.envConfig.bqTableName}\``;
return str1 + str2 + str3;
}
private getRequestParams() {
const bqRequest = this.m_bqRequest as BigQueryRequest;
return {
...(bqRequest.name && { name: bqRequest.name }),
...(bqRequest.language && { language: bqRequest.language }),
};
}
private getQuerySql(): string {
const bqRequest = this.m_bqRequest as BigQueryRequest;
const replacePatterns = {
_between_: bqRequest.SqlTimeClause(),
_datasource_: this.getDatasourceName(),
_params_: bqRequest.SqlWhereClause(),
};
return this.m_query.replace(/_datasource_|_between_|_params_/g,
match => replacePatterns[match as keyof typeof replacePatterns]);
}
// TODO Use durable cache
private getDataUsage(clientAddress: string): {
// Client address
client_key: string,
// amount of BigQuery data used by the client
client_data: number,
// amount of data used by the instance of BigQueryModel class.
instance_data: number
} {
if (!clientAddress) {
const errMsg = "BigQueryModel.checkLimit - missing clientAddress";
logger.error({ message: errMsg });
throw new Error(errMsg);
}
const clientKey = BigQueryModel.s_limitPrefix + clientAddress;
const cacheData = BigQueryModel.s_cache.mget([clientKey, BigQueryModel.s_limitInstance]);
const clientData = typeof cacheData[clientKey] === "number" ?
cacheData[clientKey] as number : 0;
const instanceData = typeof cacheData[BigQueryModel.s_limitInstance] === "number" ?
cacheData[BigQueryModel.s_limitInstance] as number : 0;
return { client_key: clientKey, client_data: clientData, instance_data: instanceData };
}
// TODO Use durable cache
private adjustDataUsage(clientAddress: string, cntBytes: number) {
if (cntBytes === 0) {
return;
}
const cntBytesMB = Math.ceil(cntBytes / (1024 * 1024));
const bqThresholdMB = 10;
const cntBytesProcessedMB = cntBytesMB > bqThresholdMB ? cntBytesMB : bqThresholdMB;
const { client_key, client_data, instance_data } = this.getDataUsage(clientAddress);
const ret = BigQueryModel.s_cache.mset([
{
key: client_key,
ttl: BigQueryModel.s_limitCleanupInterval,
val: client_data + cntBytesProcessedMB,
},
{
key: BigQueryModel.s_limitInstance,
ttl: BigQueryModel.s_limitCleanupInterval,
val: instance_data + cntBytesProcessedMB,
}
]);
if (!ret) {
const errMsg = "Failed to store data usage in the cache";
logger.error({ message: errMsg });
throw new Error(errMsg);
}
PersistentStorageManager.WriteLimitCounters(
[client_key, BigQueryModel.s_limitInstance],
cntBytesProcessedMB
);
}
private m_bqRequest?: BigQueryRequest = undefined;
private m_queryResult: BigQueryRetrieval = new Error("Backend database query not attempted");
private m_query = `#standardSQL
SELECT STRING(TIMESTAMP_TRUNC(created_time, SECOND)) as DateTime,
SUBSTR(repository_name, 1, 40) as Name,
SUBSTR(repository_language, 1, 25) as Language,
repository_size as Size,
repository_homepage as Homepage,
SUBSTR(actor_attributes_login, 1, 20) as Login,
SUBSTR(repository_owner, 1, 25) as Owner
FROM _datasource_
WHERE created_time _between_ _params_
ORDER BY DateTime ASC`;
private readonly m_requestOptions: BigQueryRequestBase = {
destination: undefined,
location: "US",
maxResults: undefined,
pageToken: undefined,
params: undefined,
query: undefined,
jobTimeoutMs: 10000,
useQueryCache: true,
};
private readonly m_resultOptions: QueryResultsOptions = {
autoPaginate: true,
maxApiCalls: undefined,
maxResults: undefined,
pageToken: undefined,
startIndex: undefined,
timeoutMs: 15000,
};
private readonly m_bigquery = new BigQuery(
{
autoRetry: true,
keyFilename: BigQueryModel.s_config!.envConfig.keyFilePath,
maxRetries: 5,
projectId: BigQueryModel.s_config!.envConfig.gcpProjectId,
}
);
private static s_config?: BigQueryModelConfig = undefined;
private static readonly s_errMsg = "Failed to query the backend database. Please retry later. If the problem persists contact Support";
private static readonly s_errStale = "Stale backend query";
private static readonly s_errBusy = "The server is busy";
private static readonly s_errLaunch = "Failed to launch BigQuery job to perform a query";
private static readonly s_errLimitClient = "The daily limit of database queries has been reached. Please contact Support if you feel this limit is inadequate.";
private static readonly s_errLimitInstance = "Temporary unable to query the backend database. Please contact Support.";
private static readonly s_limitPrefix = "bqlimit_";
private static readonly s_limitInstance = "bqlimit_instance";
private static readonly s_JobCleanupInterval = 1200;
private static readonly s_limitCleanupInterval = 3600 * 24;
private static readonly s_limitCache = getCacheSizeLimit();
private static readonly s_cache = new NodeCache({
checkperiod: 600,
deleteOnExpire: true,
stdTTL: BigQueryModel.s_JobCleanupInterval,
useClones: false
});
}
BigQueryModel.initialize();