11use super::*;
2+ use futures::stream::{self, StreamExt};
23use parking_lot::Mutex;
34use std::{collections::HashMap, sync::Arc};
45
@@ -33,6 +34,7 @@ pub struct Collector<'a, C: ConnectionTrait> {
3334 discovered: DiscoveredTracker,
3435 relationships: &'a HashSet<Relationship>,
3536 connection: &'a C,
37+ concurrency: usize,
3638}
3739
3840impl<'a, C: ConnectionTrait> Collector<'a, C> {
@@ -47,6 +49,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
4749 discovered: self.discovered.clone(),
4850 relationships: self.relationships,
4951 connection: self.connection,
52+ concurrency: self.concurrency,
5053 }
5154 }
5255
@@ -61,6 +64,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
6164 depth: u64,
6265 relationships: &'a HashSet<Relationship>,
6366 connection: &'a C,
67+ concurrency: usize,
6468 ) -> Self {
6569 Self {
6670 graph_cache,
@@ -72,6 +76,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
7276 discovered: Default::default(),
7377 relationships,
7478 connection,
79+ concurrency,
7580 }
7681 }
7782
@@ -100,6 +105,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
100105 discovered: self.discovered.clone(),
101106 relationships: self.relationships,
102107 connection: self.connection,
108+ concurrency: self.concurrency,
103109 }
104110 }
105111
@@ -160,74 +166,83 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
160166 let current_sbom_id = ¤t_node.sbom_id;
161167 let current_sbom_uuid = *current_sbom_id;
162168 let current_node_id = ¤t_node.node_id;
163- let mut resolved_external_nodes: Vec<Node> = vec![];
169+
164170 let find_sbom_externals = resolve_rh_external_sbom_ancestors(
165171 current_sbom_uuid,
166172 current_node.node_id.clone().to_string(),
167173 self.connection,
168174 )
169175 .await;
170176
171- for sbom_external_node in find_sbom_externals {
172- if &sbom_external_node.sbom_id == current_sbom_id {
173- continue;
174- }
175- // check this is a valid external relationship
176- match sbom_external_node::Entity::find()
177- .filter(
178- sbom_external_node::Column::SbomId
179- .eq(sbom_external_node.clone().sbom_id),
180- )
181- .filter(
182- sbom_external_node::Column::ExternalNodeRef
183- .eq(sbom_external_node.clone().node_id),
184- )
185- .one(self.connection)
186- .await
187- {
188- Ok(Some(matched)) => {
189- // get the external sbom graph
190- let Some(external_graph) =
191- self.graph_cache.clone().get(&matched.sbom_id.to_string())
192- else {
193- log::warn!(
194- "external sbom graph {:?} not found in graph cache",
195- &matched.sbom_id.to_string()
196- );
177+ let resolved_external_nodes: Vec<Node> = stream::iter(find_sbom_externals)
178+ .map(|sbom_external_node| {
179+ let collector = self.clone();
180+ async move {
181+ if &sbom_external_node.sbom_id == current_sbom_id {
197182 return None;
198- };
199- // find the node in retrieved external graph
200- let Some(external_node_index) = external_graph
201- .node_indices()
202- .find(|&node| external_graph[node].node_id.eq(&matched.node_id))
203- else {
204- log::warn!(
205- "Node with ID {current_node_id} not found in external sbom"
206- );
207- continue;
208- };
209- // recurse into those external sbom nodes and save
210- resolved_external_nodes.extend(
211- Box::pin(
212- self.clone()
213- .with(external_graph.as_ref(), external_node_index)
214- .collect_graph(),
183+ }
184+ // check this is a valid external relationship
185+ match sbom_external_node::Entity::find()
186+ .filter(
187+ sbom_external_node::Column::SbomId
188+ .eq(sbom_external_node.clone().sbom_id),
215189 )
216- .await,
217- );
218- }
219- Err(_) => {
220- log::warn!("Problem looking up sbom external node");
221- continue;
222- }
223- _ => {
224- log::debug!(
225- "not external sbom sbom_external_node {sbom_external_node:?}"
226- );
227- continue;
190+ .filter(
191+ sbom_external_node::Column::ExternalNodeRef
192+ .eq(sbom_external_node.clone().node_id),
193+ )
194+ .one(collector.connection)
195+ .await
196+ {
197+ Ok(Some(matched)) => {
198+ // get the external sbom graph
199+ let Some(external_graph) =
200+ collector.graph_cache.clone().get(&matched.sbom_id.to_string())
201+ else {
202+ log::warn!(
203+ "external sbom graph {:?} not found in graph cache",
204+ &matched.sbom_id.to_string()
205+ );
206+ return None;
207+ };
208+ // find the node in retrieved external graph
209+ let Some(external_node_index) = external_graph
210+ .node_indices()
211+ .find(|&node| {
212+ external_graph[node].node_id.eq(&matched.node_id)
213+ })
214+ else {
215+ log::warn!(
216+ "Node with ID {current_node_id} not found in external sbom"
217+ );
218+ return None;
219+ };
220+ // recurse into those external sbom nodes and save
221+ Some(
222+ collector
223+ .with(external_graph.as_ref(), external_node_index)
224+ .collect_graph()
225+ .await,
226+ )
227+ }
228+ Err(_) => {
229+ log::warn!("Problem looking up sbom external node");
230+ None
231+ }
232+ _ => {
233+ log::debug!(
234+ "not external sbom sbom_external_node {sbom_external_node:?}"
235+ );
236+ None
237+ }
238+ }
228239 }
229- }
230- }
240+ })
241+ .buffer_unordered(self.concurrency)
242+ .filter_map(|nodes| async move { nodes })
243+ .flat_map(stream::iter)
244+ .collect::<Vec<_>>()
245+ .await;
231246
232247 let mut result = self.collect_graph().await;
233248 if !resolved_external_nodes.is_empty() {
@@ -240,44 +255,49 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
240255 }
241256
242257 pub async fn collect_graph(&self) -> Vec<Node> {
243- let mut result = vec![];
244258 log::debug!("Collecting graph for {:?}", self.node);
245- for edge in self.graph.edges_directed(self.node, self.direction) {
246- log::debug!("edge {edge:?}");
247-
248- // we only recurse in one direction
249- let (ancestor, descendent, package_node) = match self.direction {
250- Direction::Incoming => (
251- Box::pin(self.continue_node(edge.source()).collect()).await,
252- None,
253- self.graph.node_weight(edge.source()),
254- ),
255- Direction::Outgoing => (
256- None,
257- Box::pin(self.continue_node(edge.target()).collect()).await,
258- self.graph.node_weight(edge.target()),
259- ),
260- };
261259
262- let relationship = edge.weight();
260+ stream::iter(self.graph.edges_directed(self.node, self.direction))
261+ .map(|edge| async move {
262+ log::debug!("edge {edge:?}");
263263
264- if !self.relationships.is_empty() && !self.relationships.contains(relationship) {
265- // if we have entries, and no match, continue with the next
266- continue;
267- }
264+ // we only recurse in one direction
265+ // Depending on the direction, we collect ancestors or descendants
266+ let (ancestor, descendent, package_node) = match self.direction {
267+ // If the direction is incoming, we are collecting ancestors.
268+ // We recursively call `collect` for the source of the edge.
269+ Direction::Incoming => (
270+ self.continue_node(edge.source()).collect().await,
271+ None,
272+ self.graph.node_weight(edge.source()),
273+ ),
274+ // If the direction is outgoing, we are collecting descendants.
275+ // We recursively call `collect` for the target of the edge.
276+ Direction::Outgoing => (
277+ None,
278+ self.continue_node(edge.target()).collect().await,
279+ self.graph.node_weight(edge.target()),
280+ ),
281+ };
268282
269- let Some(package_node) = package_node else {
270- continue;
271- };
283+ let relationship = edge.weight();
272284
273- result.push(Node {
274- base: BaseSummary::from(package_node),
275- relationship: Some(*relationship),
276- ancestors: ancestor,
277- descendants: descendent,
278- });
279- }
285+ if !self.relationships.is_empty() && !self.relationships.contains(relationship) {
286+ // if we have entries, and no match, continue with the next
287+ return None;
288+ }
280289
281- result
290+ // Create a new `Node` and add it to the result
291+ Some(Node {
292+ base: BaseSummary::from(package_node?),
293+ relationship: Some(*relationship),
294+ ancestors: ancestor,
295+ descendants: descendent,
296+ })
297+ })
298+ .buffer_unordered(self.concurrency)
299+ .filter_map(|node| async move { node })
300+ .collect::<Vec<_>>()
301+ .await
282302 }
283303}
0 commit comments