Skip to content

Commit ab2e4b6

Browse files
committed
feat(logs): paginate through logs after backoff
1 parent 928dbcb commit ab2e4b6

File tree

2 files changed

+138
-48
lines changed

2 files changed

+138
-48
lines changed

packages/serverless-api/src/api/logs.ts

Lines changed: 69 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,39 @@ import { LogApiResource, LogList, Sid, LogFilters } from '../types';
55
import { TwilioServerlessApiClient } from '../client';
66
import { getPaginatedResource } from './utils/pagination';
77
import { ClientApiError } from '../utils/error';
8+
import { OptionsOfJSONResponseBody } from 'got';
89

910
const log = debug('twilio-serverless-api:logs');
1011

12+
function urlWithFilters(
13+
environmentSid: Sid,
14+
serviceSid: Sid,
15+
filters: LogFilters = {}
16+
): string {
17+
const pageSize = filters.pageSize || 50;
18+
const { functionSid, startDate, endDate, pageToken } = filters;
19+
let url = `Services/${serviceSid}/Environments/${environmentSid}/Logs?PageSize=${pageSize}`;
20+
if (typeof functionSid !== 'undefined') {
21+
url += `&FunctionSid=${functionSid}`;
22+
}
23+
if (typeof startDate !== 'undefined') {
24+
url += `&StartDate=${
25+
startDate instanceof Date ? startDate.toISOString() : startDate
26+
}`;
27+
}
28+
if (typeof endDate !== 'undefined') {
29+
url += `&EndDate=${
30+
endDate instanceof Date ? endDate.toISOString() : endDate
31+
}`;
32+
}
33+
if (typeof pageToken !== 'undefined') {
34+
url += `&PageToken=${pageToken}`;
35+
}
36+
return url;
37+
}
38+
1139
/**
12-
* Calls the API to retrieve a list of all assets
40+
* Calls the API to retrieve a list of all logs
1341
*
1442
* @param {Sid} environmentSid environment in which to get logs
1543
* @param {Sid} serviceSid service to look for logs
@@ -24,7 +52,7 @@ export async function listLogResources(
2452
try {
2553
return getPaginatedResource<LogList, LogApiResource>(
2654
client,
27-
`Services/${serviceSid}/Environments/${environmentSid}/Logs`
55+
urlWithFilters(environmentSid, serviceSid)
2856
);
2957
} catch (err) {
3058
log('%O', new ClientApiError(err));
@@ -33,7 +61,7 @@ export async function listLogResources(
3361
}
3462

3563
/**
36-
* Calls the API to retrieve a list of all assets
64+
* Calls the API to retrieve one page of a list of logs
3765
*
3866
* @param {Sid} environmentSid environment in which to get logs
3967
* @param {Sid} serviceSid service to look for logs
@@ -46,28 +74,10 @@ export async function listOnePageLogResources(
4674
client: TwilioServerlessApiClient,
4775
filters: LogFilters
4876
): Promise<LogApiResource[]> {
49-
const pageSize = filters.pageSize || 50;
50-
const { functionSid, startDate, endDate, pageToken } = filters;
77+
const url = urlWithFilters(environmentSid, serviceSid, filters);
5178
try {
52-
let url = `Services/${serviceSid}/Environments/${environmentSid}/Logs?PageSize=${pageSize}`;
53-
if (typeof functionSid !== 'undefined') {
54-
url += `&FunctionSid=${functionSid}`;
55-
}
56-
if (typeof startDate !== 'undefined') {
57-
url += `&StartDate=${
58-
startDate instanceof Date ? startDate.toISOString() : startDate
59-
}`;
60-
}
61-
if (typeof endDate !== 'undefined') {
62-
url += `&EndDate=${
63-
endDate instanceof Date ? endDate.toISOString() : endDate
64-
}`;
65-
}
66-
if (typeof pageToken !== 'undefined') {
67-
url += `&PageToken=${pageToken}`;
68-
}
6979
const resp = await client.request('get', url);
70-
const content = (resp.body as unknown) as LogList;
80+
const content = resp.body as unknown as LogList;
7181
return content.logs as LogApiResource[];
7282
} catch (err) {
7383
log('%O', new ClientApiError(err));
@@ -76,7 +86,41 @@ export async function listOnePageLogResources(
7686
}
7787

7888
/**
79-
* Calls the API to retrieve a list of all assets
89+
* Calls the API to retrieve a paginated list of logs
90+
*
91+
* @param {Sid} environmentSid environment in which to get logs
92+
* @param {Sid} serviceSid service to look for logs
93+
* @param {TwilioServerlessApiClient} client API client
94+
* @param {LogFilters} filters filters to apply to the request
95+
* @param {string} nextPageUrl if you have a next page url, use it
96+
* @returns {Promise<LogList>}
97+
*/
98+
export async function listPaginatedLogs(
99+
environmentSid: Sid,
100+
serviceSid: Sid,
101+
client: TwilioServerlessApiClient,
102+
filters: LogFilters = {},
103+
nextPageUrl?: string
104+
): Promise<LogList> {
105+
try {
106+
const opts: OptionsOfJSONResponseBody = { responseType: 'json' };
107+
let url = nextPageUrl;
108+
if (typeof url === 'undefined') {
109+
url = urlWithFilters(environmentSid, serviceSid, filters);
110+
}
111+
if (url.startsWith('http')) {
112+
opts.prefixUrl = '';
113+
}
114+
const resp = await client.request('get', url, opts);
115+
return resp.body as unknown as LogList;
116+
} catch (err) {
117+
log('%O', new ClientApiError(err));
118+
throw err;
119+
}
120+
}
121+
122+
/**
123+
* Calls the API to retrieve a single log resource
80124
*
81125
* @param {Sid} logSid SID of log to retrieve
82126
* @param {Sid} environmentSid environment in which to get logs
@@ -95,7 +139,7 @@ export async function getLog(
95139
'get',
96140
`Services/${serviceSid}/Environments/${environmentSid}/Logs/${logSid}`
97141
);
98-
return (resp.body as unknown) as LogApiResource;
142+
return resp.body as unknown as LogApiResource;
99143
} catch (err) {
100144
log('%O', new ClientApiError(err));
101145
throw err;

packages/serverless-api/src/streams/logs.ts

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Readable } from 'stream';
2-
import { listOnePageLogResources } from '../api/logs';
2+
import { listPaginatedLogs } from '../api/logs';
33
import { TwilioServerlessApiClient } from '../client';
44
import { Sid } from '../types';
55
import { LogsConfig } from '../types/logs';
@@ -22,6 +22,7 @@ export class LogsStream extends Readable {
2222
private _interval: NodeJS.Timeout | undefined;
2323
private _viewedSids: Set<Sid>;
2424
private _viewedLogs: Array<{ sid: Sid; dateCreated: Date }>;
25+
private _paginating: boolean;
2526

2627
constructor(
2728
private environmentSid: Sid,
@@ -39,6 +40,7 @@ export class LogsStream extends Readable {
3940
config.maxPollingFrequency || defaultMaxPollingFrequency;
4041
this._pollsWithoutResults = 0;
4142
this._pollingCacheSize = config.logCacheSize || defaultLogCacheSize;
43+
this._paginating = false;
4244
}
4345

4446
set pollingFrequency(frequency: number) {
@@ -53,7 +55,12 @@ export class LogsStream extends Readable {
5355

5456
async _poll() {
5557
try {
56-
const logs = await listOnePageLogResources(
58+
if (this._paginating) {
59+
// We are going back through older logs that have been missed between
60+
// polls, so don't start a new poll of the latest logs yet.
61+
return;
62+
}
63+
let logPage = await listPaginatedLogs(
5764
this.environmentSid,
5865
this.serviceSid,
5966
this.client,
@@ -62,32 +69,39 @@ export class LogsStream extends Readable {
6269
pageSize: this.config.limit,
6370
}
6471
);
65-
const unviewedLogs = logs.filter((log) => !this._viewedSids.has(log.sid));
72+
let logs = logPage.logs;
73+
let unviewedLogs = logs.filter((log) => !this._viewedSids.has(log.sid));
74+
if (this._viewedSids.size > 0) {
75+
// if we have seen some logs, we need to check if more than one page of
76+
// logs are new.
77+
while (
78+
unviewedLogs.length === logs.length &&
79+
logPage.meta.next_page_url
80+
) {
81+
// all of the logs are new, so we should get the next page
82+
this._paginating = true;
83+
logPage = await listPaginatedLogs(
84+
this.environmentSid,
85+
this.serviceSid,
86+
this.client,
87+
{},
88+
logPage.meta.next_page_url
89+
);
90+
unviewedLogs = unviewedLogs.concat(
91+
logPage.logs.filter((log) => !this._viewedSids.has(log.sid))
92+
);
93+
logs = logs.concat(logPage.logs);
94+
}
95+
}
6696
if (unviewedLogs.length > 0) {
67-
this._pollsWithoutResults = 0;
68-
this.pollingFrequency = this._initialPollingFrequency;
69-
log(
70-
`New log received. Now polling once every ${this._pollingFrequency} milliseconds.`
71-
);
97+
// We got new logs, make sure we are polling at the base frequency
98+
this._resetPollingFrequency();
7299
unviewedLogs.reverse().forEach((log) => {
73100
this.push(log);
74101
});
75102
} else {
76-
if (this._pollsWithoutResults < pollsBeforeBackOff) {
77-
this._pollsWithoutResults++;
78-
} else {
79-
if (this._pollingFrequency < this._maxPollingFrequency) {
80-
log(
81-
`No new logs for ${
82-
this._pollsWithoutResults * this._pollingFrequency
83-
} milliseconds. Now polling once every ${
84-
this._pollingFrequency * 2
85-
} milliseconds.`
86-
);
87-
this.pollingFrequency = this._pollingFrequency * 2;
88-
this._pollsWithoutResults = 0;
89-
}
90-
}
103+
// No new logs this time, so maybe back off polling
104+
this._backOffPolling();
91105
}
92106

93107
// The logs endpoint is not reliably returning logs in the same order
@@ -122,9 +136,13 @@ export class LogsStream extends Readable {
122136
// Finally we create a set of just SIDs to compare against.
123137
this._viewedSids = new Set(this._viewedLogs.map((log) => log.sid));
124138

139+
// If this is not tailing the logs, stop the stream.
125140
if (!this.config.tail) {
126141
this.push(null);
127142
}
143+
// If we were paginating through older resources, we can now allow the
144+
// next poll when it is triggered.
145+
this._paginating = false;
128146
} catch (err) {
129147
this.destroy(err);
130148
}
@@ -148,4 +166,32 @@ export class LogsStream extends Readable {
148166
this._interval = undefined;
149167
}
150168
}
169+
170+
private _resetPollingFrequency() {
171+
this._pollsWithoutResults = 0;
172+
if (this.pollingFrequency !== this._initialPollingFrequency) {
173+
this.pollingFrequency = this._initialPollingFrequency;
174+
log(
175+
`New log received. Now polling once every ${this._pollingFrequency} milliseconds.`
176+
);
177+
}
178+
}
179+
180+
private _backOffPolling() {
181+
if (this._pollsWithoutResults < pollsBeforeBackOff) {
182+
this._pollsWithoutResults++;
183+
} else {
184+
if (this._pollingFrequency < this._maxPollingFrequency) {
185+
log(
186+
`No new logs for ${
187+
this._pollsWithoutResults * this._pollingFrequency
188+
} milliseconds. Now polling once every ${
189+
this._pollingFrequency * 2
190+
} milliseconds.`
191+
);
192+
this.pollingFrequency = this._pollingFrequency * 2;
193+
this._pollsWithoutResults = 0;
194+
}
195+
}
196+
}
151197
}

0 commit comments

Comments
 (0)