Skip to content

Commit

Permalink
Merge pull request #8957 from DarthMax/separate_graph_import_and_cyph…
Browse files Browse the repository at this point in the history
…er_aggregation

Separate GraphImport and cypher aggregation
  • Loading branch information
DarthMax authored Apr 23, 2024
2 parents a341f23 + e1783da commit 73db64b
Show file tree
Hide file tree
Showing 8 changed files with 396 additions and 77 deletions.
3 changes: 3 additions & 0 deletions cypher-aggregation/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ dependencies {
implementation project(':config-api')
implementation project(':core')
implementation project(':graph-schema-api')
implementation project(':neo4j-kernel-adapter-api')
implementation project(':proc-common')
implementation project(':progress-tracking')
implementation project(':string-formatting')
implementation project(':triplet-graph-builder')


implementation group: 'org.openjdk.jol', name: 'jol-core', version: ver.'jol'
implementation group: 'org.opencypher', name: 'cypher-javacc-parser-9.0', version: ver.'opencypher-front-end', transitive: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.neo4j.gds.compat.CompatUserAggregator;
import org.neo4j.gds.core.ConfigKeyValidation;
import org.neo4j.gds.core.loading.Capabilities.WriteMode;
import org.neo4j.gds.core.loading.GraphStoreCatalog;
import org.neo4j.gds.core.loading.LazyIdMapBuilder;
import org.neo4j.gds.core.loading.construction.NodeLabelToken;
import org.neo4j.gds.core.loading.construction.NodeLabelTokens;
import org.neo4j.gds.core.loading.construction.PropertyValues;
Expand Down Expand Up @@ -158,22 +160,51 @@ private GraphImporter initGraphData(TextValue graphName, AnyValue config) {
try {
data = this.importer;
if (data == null) {
this.importer = data = GraphImporter.of(
graphName,
this.username,
this.queryProvider.executingQuery().orElse(""),
this.databaseId,
config,
this.writeMode,
PropertyState.PERSISTENT
);
this.importer = data = createGraphImporter(graphName, config);
}
return data;
} finally {
this.lock.unlock();
}
}

private GraphImporter createGraphImporter(
TextValue graphNameValue,
AnyValue configMap
) {
var graphName = graphNameValue.stringValue();
var query = this.queryProvider.executingQuery().orElse("");

validateGraphName(graphName, this.username, this.databaseId);
var config = GraphProjectFromCypherAggregationConfig.of(
this.username,
graphName,
query,
(configMap instanceof MapValue) ? (MapValue) configMap : MapValue.EMPTY
);

var idMapBuilder = idMapBuilder(config.readConcurrency());

return new GraphImporter(
config,
config.undirectedRelationshipTypes(),
config.inverseIndexedRelationshipTypes(),
idMapBuilder,
this.writeMode,
query
);
}

private static LazyIdMapBuilder idMapBuilder(int readConcurrency) {
return new LazyIdMapBuilder(readConcurrency, true, true, PropertyState.PERSISTENT);
}

private static void validateGraphName(String graphName, String username, DatabaseId databaseId) {
if (GraphStoreCatalog.exists(username, databaseId, graphName)) {
throw new IllegalArgumentException("Graph " + graphName + " already exists");
}
}

private static NodeLabelToken labelsConfig(String nodeLabelKey, @NotNull MapValue nodesConfig) {
var nodeLabelsEntry = nodesConfig.get(nodeLabelKey);
return tryLabelsConfig(nodeLabelsEntry, nodeLabelKey);
Expand Down Expand Up @@ -243,6 +274,7 @@ public AnyValue result() throws ProcedureException {
.databaseId(this.databaseId)
.databaseLocation(DatabaseLocation.LOCAL)
.build();

this.result = importer.result(
databaseInfo,
this.progressTimer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private static Stream<? extends Member> resultFieldsFromCustomType(Class<?> resu

private static Stream<Field> resultFieldsFromClassFields(Class<?> resultClass) {
return Arrays
.stream(resultClass.getFields())
.stream(resultClass.isRecord() ? resultClass.getDeclaredFields() : resultClass.getFields())
.filter(ProcedureSyntaxAutoChecker::includeFieldInResult);
}

Expand Down
3 changes: 3 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ project(':csv').projectDir = file('io/csv')
include('cypher-aggregation')
project(':cypher-aggregation').projectDir = file('cypher-aggregation')

include('triplet-graph-builder')
project(':triplet-graph-builder').projectDir = file('triplet-graph-builder')

include('legacy-cypher-projection')
project(':legacy-cypher-projection').projectDir = file('legacy-cypher-projection')

Expand Down
51 changes: 51 additions & 0 deletions triplet-graph-builder/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
apply plugin: 'java-library'

description = 'Neo4j Graph Data Science :: GDS Cypher Aggregation'
group = 'org.neo4j.gds'

dependencies {
annotationProcessor project(':annotations')
annotationProcessor project(':config-generator')

annotationProcessor group: 'io.soabase.record-builder', name: 'record-builder-processor', version: ver.'record-builder'
annotationProcessor group: 'org.immutables', name: 'builder', version: ver.'immutables'
annotationProcessor group: 'org.immutables', name: 'value', version: ver.'immutables'
annotationProcessor group: 'org.neo4j', name: 'annotations', version: ver.'neo4j'

compileOnly group: 'io.soabase.record-builder', name: 'record-builder-processor', version: ver.'record-builder'
compileOnly group: 'org.immutables', name: 'value-annotations', version: ver.'immutables'
compileOnly group: 'org.jetbrains', name: 'annotations', version: ver.'jetbrains-annotations'

neodeps().each {
compileOnly(group: 'org.neo4j', name: it, version: ver.'neo4j') {
transitive = false
}
}

implementation project(':annotations')
implementation project(':config-api')
implementation project(':core')
implementation project(':graph-schema-api')
implementation project(':proc-common')
implementation project(':progress-tracking')
implementation project(':string-formatting')

implementation group: 'org.openjdk.jol', name: 'jol-core', version: ver.'jol'
implementation group: 'org.opencypher', name: 'cypher-javacc-parser-9.0', version: ver.'opencypher-front-end', transitive: false
implementation group: 'org.hdrhistogram', name: 'HdrHistogram', version: ver.'HdrHistogram'

testAnnotationProcessor project(':annotations')

testCompileOnly group: 'org.immutables', name: 'value-annotations', version: ver.'immutables'
testCompileOnly group: 'org.immutables', name: 'builder', version: ver.'immutables'
testCompileOnly group: 'org.jetbrains', name: 'annotations', version: ver.'jetbrains-annotations'

testImplementation project(':executor')
testImplementation project(':test-utils')
testImplementation project(':proc-catalog')
testImplementation project(':proc-community')

testImplementation group: 'org.neo4j', name: 'neo4j-cypher-dsl', version: ver.'cypher-dsl'

testImplementation project(':opengds-extension')
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,16 @@
*/
package org.neo4j.gds.projection;

import org.neo4j.gds.annotation.CustomProcedure;
import org.neo4j.gds.annotation.GenerateBuilder;

import java.util.Map;

@CustomProcedure.ResultType
@GenerateBuilder
public record AggregationResult(
@CustomProcedure.ResultField String graphName,
@CustomProcedure.ResultField long nodeCount,
@CustomProcedure.ResultField long relationshipCount,
@CustomProcedure.ResultField long projectMillis,
@CustomProcedure.ResultField Map<String, Object> configuration,
@CustomProcedure.ResultField String query
) {
public static AggregationResultBuilder builder() {
return AggregationResultBuilder.builder();
}
}
String graphName,
long nodeCount,
long relationshipCount,
long projectMillis,
Map<String, Object> configuration,
String query
){}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@

import org.jetbrains.annotations.Nullable;
import org.neo4j.gds.RelationshipType;
import org.neo4j.gds.api.DatabaseId;
import org.neo4j.gds.api.DatabaseInfo;
import org.neo4j.gds.api.DefaultValue;
import org.neo4j.gds.api.PropertyState;
import org.neo4j.gds.api.compress.AdjacencyCompressor;
import org.neo4j.gds.api.schema.ImmutableMutableGraphSchema;
import org.neo4j.gds.api.schema.MutableGraphSchema;
Expand All @@ -45,15 +43,13 @@
import org.neo4j.gds.core.loading.construction.PropertyValues;
import org.neo4j.gds.core.loading.construction.RelationshipsBuilder;
import org.neo4j.gds.core.utils.ProgressTimer;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.TextValue;
import org.neo4j.values.virtual.MapValue;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import static java.util.stream.Collectors.toMap;
import static org.neo4j.gds.Orientation.NATURAL;
import static org.neo4j.gds.Orientation.UNDIRECTED;

Expand Down Expand Up @@ -89,47 +85,6 @@ public GraphImporter(
this.graphSchemaBuilder = MutableGraphSchema.builder();
}

static GraphImporter of(
TextValue graphNameValue,
String username,
String query,
DatabaseId databaseId,
AnyValue configMap,
WriteMode writeMode,
PropertyState propertyState
) {
var graphName = graphNameValue.stringValue();

validateGraphName(graphName, username, databaseId);
var config = GraphProjectFromCypherAggregationConfig.of(
username,
graphName,
query,
(configMap instanceof MapValue) ? (MapValue) configMap : MapValue.EMPTY
);

var idMapBuilder = idMapBuilder(config.readConcurrency(), propertyState);

return new GraphImporter(
config,
config.undirectedRelationshipTypes(),
config.inverseIndexedRelationshipTypes(),
idMapBuilder,
writeMode,
query
);
}

private static void validateGraphName(String graphName, String username, DatabaseId databaseId) {
if (GraphStoreCatalog.exists(username, databaseId, graphName)) {
throw new IllegalArgumentException("Graph " + graphName + " already exists");
}
}

private static LazyIdMapBuilder idMapBuilder(int readConcurrency, PropertyState propertyState) {
return new LazyIdMapBuilder(readConcurrency, true, true, propertyState);
}

public void update(
long sourceNode,
long targetNode,
Expand All @@ -149,10 +104,9 @@ public void update(
if (this.relImporters.containsKey(relationshipType)) {
relImporter = this.relImporters.get(relationshipType);
} else {
var finalRelationshipProperties = relationshipProperties;
relImporter = this.relImporters.computeIfAbsent(
relationshipType,
type -> newRelImporter(type, finalRelationshipProperties)
type -> newRelImporter(type, relationshipProperties)
);
}

Expand Down Expand Up @@ -187,9 +141,9 @@ public AggregationResult result(
) {
var graphName = config.graphName();

// in case something else has written something with the same graph name
// validate again before doing the heavier graph building
validateGraphName(config.graphName(), config.username(), databaseInfo.databaseId());
if (GraphStoreCatalog.exists(config.username(), databaseInfo.databaseId(), graphName)) {
throw new IllegalArgumentException("Graph " + graphName + " already exists");
}

this.idMapBuilder.prepareForFlush();

Expand All @@ -211,15 +165,16 @@ public AggregationResult result(

var projectMillis = timer.stop().getDuration();

return AggregationResult.builder()
return AggregationResultBuilder.builder()
.graphName(graphName)
.nodeCount(graphStore.nodeCount())
.relationshipCount(graphStore.relationshipCount())
.projectMillis(projectMillis)
.addConfiguration(this.config.asProcedureResultConfigurationField()
.configuration(this.config.asProcedureResultConfigurationField()
.entrySet()
.stream()
.filter(e -> e.getValue() != null))
.filter(e -> e.getValue() != null)
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue)))
.query(this.query)
.build();
}
Expand Down
Loading

0 comments on commit 73db64b

Please sign in to comment.