Skip to content

Commit

Permalink
Merge pull request #9430 from s1ck/relationship-spliterator
Browse files Browse the repository at this point in the history
Implement AllRelationshipsSpliterator
  • Loading branch information
s1ck committed Jul 29, 2024
2 parents b764ada + 80fe94b commit 4d0cee8
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.gds.core.huge;

import org.neo4j.gds.api.Graph;
import org.neo4j.gds.api.RelationshipCursor;
import org.neo4j.gds.api.RelationshipIterator;

import java.util.Iterator;
import java.util.Spliterator;
import java.util.function.Consumer;

public class AllRelationshipsSpliterator implements Spliterator<RelationshipCursor> {

private final RelationshipIterator relationshipIterator;
private final double fallbackValue;

// state
private long current;
private long limit;
private Iterator<RelationshipCursor> cursorIterator;

public AllRelationshipsSpliterator(Graph graph, double fallbackValue) {
this(graph, 0, graph.nodeCount(), fallbackValue);
}

private AllRelationshipsSpliterator(
RelationshipIterator relationshipIterator,
long fromNode,
long toNodeExclusive,
double fallbackValue
) {
this.relationshipIterator = relationshipIterator.concurrentCopy();
this.current = fromNode;
this.limit = toNodeExclusive;
this.fallbackValue = fallbackValue;
this.cursorIterator = this.relationshipIterator.streamRelationships(this.current, fallbackValue).iterator();
}

@Override
public boolean tryAdvance(Consumer<? super RelationshipCursor> action) {
boolean isAdvanced = advance(action);

while (!isAdvanced && hasRemaining()) {
this.current++;
this.cursorIterator = relationshipIterator.streamRelationships(current, this.fallbackValue).iterator();
isAdvanced = advance(action);
}

return isAdvanced || hasRemaining();
}

private boolean advance(Consumer<? super RelationshipCursor> action) {
if (this.cursorIterator.hasNext()) {
action.accept(this.cursorIterator.next());
return true;
}
return false;
}

private boolean hasRemaining() {
return this.current < this.limit - 1;
}

@Override
public Spliterator<RelationshipCursor> trySplit() {
long remaining = this.limit - this.current;
if (remaining < 2) {
return null;
}

long split = this.current + remaining / 2;
long newLimit = this.limit;
this.limit = split;

return new AllRelationshipsSpliterator(this.relationshipIterator, split, newLimit, this.fallbackValue);
}

@Override
public long estimateSize() {
return this.limit - this.current;
}

@Override
public int characteristics() {
return Spliterator.ORDERED | Spliterator.DISTINCT | Spliterator.SIZED;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.gds.core.huge;

import net.jqwik.api.ForAll;
import net.jqwik.api.Property;
import net.jqwik.api.constraints.IntRange;
import net.jqwik.api.constraints.LongRange;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.neo4j.gds.RelationshipType;
import org.neo4j.gds.TestSupport;
import org.neo4j.gds.api.Graph;
import org.neo4j.gds.beta.generator.PropertyProducer;
import org.neo4j.gds.beta.generator.RandomGraphGeneratorBuilder;
import org.neo4j.gds.beta.generator.RelationshipDistribution;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.StreamSupport;

import static org.assertj.core.api.Assertions.assertThat;

class AllRelationshipsSpliteratorTest {

private record Relationship(long target, double property) {
}

@Test
void tryAdvance() {
var graph = TestSupport.fromGdl("(a)-->(b), (b)-->(c), (c)-->(d)");

var spliterator = new AllRelationshipsSpliterator(graph, 1.0);

assertThat(spliterator.tryAdvance(relationshipCursor -> {})).isTrue();
assertThat(spliterator.tryAdvance(relationshipCursor -> {})).isTrue();
assertThat(spliterator.tryAdvance(relationshipCursor -> {})).isTrue();
assertThat(spliterator.tryAdvance(relationshipCursor -> {})).isFalse();
}

@Property(tries = 50)
void forEach(
@ForAll @LongRange(min = 10L, max = 10_000L) long nodeCount,
@ForAll @IntRange(min = 10, max = 100) int averageDegree,
@ForAll RelationshipDistribution relationshipDistribution,
@ForAll long seed,
@ForAll boolean parallel
) {
var graph = generateGraph(nodeCount, averageDegree, relationshipDistribution, seed);

var fallback = 42.0;
var expected = expectedAdjacencyList(graph, fallback);

var actual = new ConcurrentHashMap<Long, List<Relationship>>();
var allRelationshipsIterator = new AllRelationshipsSpliterator(graph, fallback);

StreamSupport.stream(allRelationshipsIterator, parallel).forEach(relationshipCursor -> actual
.computeIfAbsent(relationshipCursor.sourceId(), __ -> new ArrayList<>())
.add(new Relationship(
relationshipCursor.targetId(),
relationshipCursor.property()
)));

assertThat(actual).isEqualTo(expected);
}

@Property(tries = 50)
void iterator(
@ForAll @LongRange(min = 10L, max = 10_000L) long nodeCount,
@ForAll @IntRange(min = 10, max = 100) int averageDegree,
@ForAll RelationshipDistribution relationshipDistribution,
@ForAll long seed
) {
var graph = generateGraph(nodeCount, averageDegree, relationshipDistribution, seed);
var fallback = 42.0;
var expected = expectedAdjacencyList(graph, fallback);

var actual = new ConcurrentHashMap<Long, List<Relationship>>();
var allRelationshipsIterator = new AllRelationshipsSpliterator(graph, fallback);

var iterator = StreamSupport.stream(allRelationshipsIterator, true).iterator();

while (iterator.hasNext()) {
var relationshipCursor = iterator.next();
actual.computeIfAbsent(relationshipCursor.sourceId(), __ -> new ArrayList<>())
.add(new Relationship(
relationshipCursor.targetId(),
relationshipCursor.property()
));
}

assertThat(actual).isEqualTo(expected);
}

private static @NotNull HugeGraph generateGraph(
long nodeCount,
int averageDegree,
RelationshipDistribution relationshipDistribution,
long seed
) {
return new RandomGraphGeneratorBuilder()
.nodeCount(nodeCount)
.averageDegree(averageDegree)
.seed(seed)
.relationshipType(RelationshipType.ALL_RELATIONSHIPS)
.relationshipDistribution(relationshipDistribution)
.relationshipPropertyProducer(PropertyProducer.randomDouble("baz", 0, 100))
.build()
.generate();
}

private static @NotNull ConcurrentHashMap<Long, List<Relationship>> expectedAdjacencyList(
Graph graph,
double fallback
) {
var expected = new ConcurrentHashMap<Long, List<Relationship>>();
graph.forEachNode(node -> {
graph.forEachRelationship(node, fallback, (source, target, property) -> {
expected.computeIfAbsent(source, __ -> new ArrayList<>()).add(new Relationship(
target,
property
));
return true;
});
return true;
});
return expected;
}
}

0 comments on commit 4d0cee8

Please sign in to comment.