Skip to content

Commit 95d6101

Browse files
authored
chore: Optimize garbage collection for Topology instance (#33)
Extract crawl iteration run to a separate method. This way the local `topology` instance is cleaned up from memory immediately after the crawl. Before this PR the `topology` instance was kept in memory until the start of the next iteration. This happens because the method is async, in a non-async method the `topology` would have been cleaned-up earlier (already when it execution exists the `try-catch` scope as it is a local variable there). ## Other changes Also simplified handling of `subcriberGate`. There is no need to pass it as an argument as it is a field of Crawler.
1 parent 3b7408f commit 95d6101

File tree

1 file changed

+23
-21
lines changed

1 file changed

+23
-21
lines changed

src/crawler/Crawler.ts

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -135,20 +135,7 @@ export class Crawler {
135135
this.client.on('streamCreated', this.onStreamCreated)
136136
let iterationIndex = 0
137137
while (true) {
138-
try {
139-
const topology = await crawlTopology(
140-
networkNodeFacade,
141-
this.client.getEntryPoints(),
142-
(nodeInfo: NormalizedNodeInfo) => nodeInfo.controlLayer.neighbors,
143-
`full-${Date.now()}`
144-
)
145-
await this.nodeRepository.replaceNetworkTopology(topology)
146-
await this.analyzeContractStreams(topology, this.subscribeGate)
147-
} catch (e) {
148-
logger.error('Error', { err: e })
149-
await wait(RECOVERY_DELAY)
150-
}
151-
logger.info('Crawl iteration completed')
138+
await this.runCrawlIteration(networkNodeFacade)
152139
if ((iterationCount === undefined) || (iterationIndex < iterationCount - 1)) {
153140
await wait(this.config.crawler.iterationDelay)
154141
iterationIndex++
@@ -158,9 +145,25 @@ export class Crawler {
158145
}
159146
}
160147

148+
private async runCrawlIteration(networkNodeFacade: NetworkNodeFacade): Promise<void> {
149+
try {
150+
const topology = await crawlTopology(
151+
networkNodeFacade,
152+
this.client.getEntryPoints(),
153+
(nodeInfo: NormalizedNodeInfo) => nodeInfo.controlLayer.neighbors,
154+
`full-${Date.now()}`
155+
)
156+
await this.nodeRepository.replaceNetworkTopology(topology)
157+
await this.analyzeContractStreams(topology)
158+
} catch (e) {
159+
logger.error('Error', { err: e })
160+
await wait(RECOVERY_DELAY)
161+
}
162+
logger.info('Crawl iteration completed')
163+
}
164+
161165
private async analyzeContractStreams(
162-
topology: Topology,
163-
subscribeGate: SubscribeGate
166+
topology: Topology
164167
): Promise<void> {
165168
// wrap this.client.getAllStreams() with retry because in streamr-docker-dev environment
166169
// the graph-node dependency may not be available immediately after the service has
@@ -176,7 +179,7 @@ export class Crawler {
176179
const workedThreadLimit = pLimit(MAX_SUBSCRIPTION_COUNT)
177180
await Promise.all(sortedContractStreams.map((stream: Stream) => {
178181
return workedThreadLimit(async () => {
179-
await this.analyzeStream(stream.id, await stream.getMetadata(), topology, subscribeGate)
182+
await this.analyzeStream(stream.id, await stream.getMetadata(), topology)
180183
})
181184
}))
182185

@@ -186,8 +189,7 @@ export class Crawler {
186189
private async analyzeStream(
187190
id: StreamID,
188191
metadata: StreamMetadata,
189-
topology: Topology,
190-
subscribeGate: SubscribeGate
192+
topology: Topology
191193
): Promise<void> {
192194
logger.info(`Analyze ${id}`)
193195
const peersByPartition = new Map<number, Set<DhtAddress>>
@@ -204,7 +206,7 @@ export class Crawler {
204206
[...peersByPartition.keys()],
205207
isPublicStream(subscriberCount),
206208
await this.client.getNetworkNodeFacade(),
207-
subscribeGate,
209+
this.subscribeGate!,
208210
this.config
209211
)
210212
: { messagesPerSecond: 0, bytesPerSecond: 0 }
@@ -275,7 +277,7 @@ export class Crawler {
275277
return (streamPartitions.map((sp) => sp.contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor))).flat()
276278
}, `stream-${payload.streamId}-${Date.now()}`)
277279
// TODO could add new nodes and neighbors to NodeRepository?
278-
await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!)
280+
await this.analyzeStream(payload.streamId, payload.metadata, topology)
279281
} catch (e: any) {
280282
logger.error(`Failed to handle new stream ${payload.streamId}`, e)
281283
}

0 commit comments

Comments
 (0)