Skip to content

Commit

Permalink
migrate celf write
Browse files Browse the repository at this point in the history
  • Loading branch information
lassewesth committed May 30, 2024
1 parent d226976 commit 08c22c6
Show file tree
Hide file tree
Showing 17 changed files with 223 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@

import org.neo4j.gds.algorithms.estimation.AlgorithmEstimator;
import org.neo4j.gds.applications.algorithms.machinery.MemoryEstimateResult;
import org.neo4j.gds.influenceMaximization.CELFMemoryEstimateDefinition;
import org.neo4j.gds.influenceMaximization.InfluenceMaximizationBaseConfig;
import org.neo4j.gds.pagerank.PageRankConfig;
import org.neo4j.gds.pagerank.PageRankMemoryEstimateDefinition;

import java.util.Optional;

public class CentralityAlgorithmsEstimateBusinessFacade {

private final AlgorithmEstimator algorithmEstimator;
Expand All @@ -38,19 +34,6 @@ public CentralityAlgorithmsEstimateBusinessFacade(
this.algorithmEstimator = algorithmEstimator;
}

public <C extends InfluenceMaximizationBaseConfig> MemoryEstimateResult celf(
Object graphNameOrConfiguration,
C configuration
) {

return algorithmEstimator.estimate(
graphNameOrConfiguration,
configuration,
Optional.empty(),
new CELFMemoryEstimateDefinition(configuration.toParameters())
);
}

public <C extends PageRankConfig> MemoryEstimateResult pageRank(
Object graphNameOrConfiguration,
C configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@
import org.jetbrains.annotations.NotNull;
import org.neo4j.gds.algorithms.AlgorithmComputationResult;
import org.neo4j.gds.algorithms.NodePropertyWriteResult;
import org.neo4j.gds.algorithms.centrality.specificfields.CELFSpecificFields;
import org.neo4j.gds.algorithms.centrality.specificfields.CentralityStatisticsSpecificFields;
import org.neo4j.gds.algorithms.centrality.specificfields.PageRankSpecificFields;
import org.neo4j.gds.algorithms.runner.AlgorithmResultWithTiming;
import org.neo4j.gds.algorithms.runner.AlgorithmRunner;
import org.neo4j.gds.algorithms.writeservices.WriteNodePropertyService;
import org.neo4j.gds.api.ResultStore;
import org.neo4j.gds.config.AlgoBaseConfig;
import org.neo4j.gds.config.ArrowConnectionInfo;
import org.neo4j.gds.core.concurrency.Concurrency;
import org.neo4j.gds.core.concurrency.DefaultPool;
import org.neo4j.gds.influenceMaximization.CELFNodeProperties;
import org.neo4j.gds.influenceMaximization.InfluenceMaximizationWriteConfig;
import org.neo4j.gds.pagerank.PageRankResult;
import org.neo4j.gds.pagerank.PageRankWriteConfig;
import org.neo4j.gds.result.CentralityStatistics;
Expand Down Expand Up @@ -151,46 +147,6 @@ private NodePropertyWriteResult<PageRankSpecificFields> pageRankVariant(
}).orElseGet(() -> NodePropertyWriteResult.empty(PageRankSpecificFields.EMPTY, configuration));
}

public NodePropertyWriteResult<CELFSpecificFields> celf(
String graphName,
InfluenceMaximizationWriteConfig configuration
) {
// 1. Run the algorithm and time the execution
var intermediateResult = AlgorithmRunner.runWithTiming(
() -> centralityAlgorithmsFacade.celf(graphName, configuration)
);
var algorithmResult = intermediateResult.algorithmResult;

var writeResultBuilder = NodePropertyWriteResult.<CELFSpecificFields>builder()
.computeMillis(intermediateResult.computeMilliseconds)
.postProcessingMillis(0L)
.configuration(configuration);

algorithmResult.result().ifPresentOrElse(
result -> {
var nodeCount = algorithmResult.graph().nodeCount();
var nodeProperties = new CELFNodeProperties(result.seedSetNodes(), nodeCount);
var writeResult = writeNodePropertyService.write(
algorithmResult.graph(),
algorithmResult.graphStore(),
nodeProperties,
configuration.writeConcurrency(),
configuration.writeProperty(),
"CELFWrite",
configuration.arrowConnectionInfo(),
configuration.resolveResultStore(algorithmResult.resultStore()),
configuration.jobId()
);
writeResultBuilder.writeMillis(writeResult.writeMilliseconds());
writeResultBuilder.nodePropertiesWritten(writeResult.nodePropertiesWritten());
writeResultBuilder.algorithmSpecificFields(new CELFSpecificFields(result.totalSpread(), nodeCount));
},
() -> writeResultBuilder.algorithmSpecificFields(CELFSpecificFields.EMPTY)
);

return writeResultBuilder.build();
}

<RESULT, CONFIG extends AlgoBaseConfig, ASF extends CentralityStatisticsSpecificFields> NodePropertyWriteResult<ASF> writeToDatabase(
AlgorithmComputationResult<RESULT> algorithmResult,
CONFIG configuration,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public NodePropertiesWritten execute(
configuration,
configuration,
BetweennessCentrality,
result,
jobId
jobId,
result.nodePropertyValues()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.gds.applications.algorithms.centrality;

import org.neo4j.gds.api.Graph;
import org.neo4j.gds.api.GraphStore;
import org.neo4j.gds.api.ResultStore;
import org.neo4j.gds.applications.algorithms.machinery.MutateOrWriteStep;
import org.neo4j.gds.applications.algorithms.metadata.NodePropertiesWritten;
import org.neo4j.gds.core.utils.progress.JobId;
import org.neo4j.gds.influenceMaximization.CELFNodeProperties;
import org.neo4j.gds.influenceMaximization.CELFResult;
import org.neo4j.gds.influenceMaximization.InfluenceMaximizationWriteConfig;

import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.CELF;

class CelfWriteStep implements MutateOrWriteStep<CELFResult, NodePropertiesWritten> {
private final WriteToDatabase writeToDatabase;
private final InfluenceMaximizationWriteConfig configuration;

CelfWriteStep(WriteToDatabase writeToDatabase, InfluenceMaximizationWriteConfig configuration) {
this.writeToDatabase = writeToDatabase;
this.configuration = configuration;
}

@Override
public NodePropertiesWritten execute(
Graph graph,
GraphStore graphStore,
ResultStore resultStore,
CELFResult result,
JobId jobId
) {
var nodePropertyValues = new CELFNodeProperties(result.seedSetNodes(), graph.nodeCount());

return writeToDatabase.perform(
graph,
graphStore,
resultStore,
configuration,
configuration,
CELF,
jobId,
nodePropertyValues
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
import org.neo4j.gds.degree.DegreeCentralityWriteConfig;
import org.neo4j.gds.harmonic.HarmonicCentralityWriteConfig;
import org.neo4j.gds.harmonic.HarmonicResult;
import org.neo4j.gds.influenceMaximization.CELFResult;
import org.neo4j.gds.influenceMaximization.InfluenceMaximizationWriteConfig;
import org.neo4j.gds.logging.Log;

import java.util.Optional;

import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.BetweennessCentrality;
import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.CELF;
import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.ClosenessCentrality;
import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.DegreeCentrality;
import static org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking.HarmonicCentrality;
Expand Down Expand Up @@ -94,6 +97,24 @@ public <RESULT> RESULT betweennessCentrality(
);
}

public <CONFIGURATION extends InfluenceMaximizationWriteConfig, RESULT> RESULT celf(
GraphName graphName,
CONFIGURATION configuration,
ResultBuilder<CONFIGURATION, CELFResult, RESULT, NodePropertiesWritten> resultBuilder
) {
var writeStep = new CelfWriteStep(writeToDatabase, configuration);

return algorithmProcessingTemplate.processAlgorithm(
graphName,
configuration,
CELF,
() -> estimationFacade.celf(configuration),
graph -> centralityAlgorithms.celf(graph, configuration),
Optional.of(writeStep),
resultBuilder
);
}

public <RESULT> RESULT closenessCentrality(
GraphName graphName,
ClosenessCentralityWriteConfig configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public NodePropertiesWritten execute(
configuration,
configuration,
ClosenessCentrality,
result,
jobId
jobId,
result.nodePropertyValues()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public NodePropertiesWritten execute(
configuration,
configuration,
DegreeCentrality,
result,
jobId
jobId,
result.nodePropertyValues()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public NodePropertiesWritten execute(
configuration,
configuration,
HarmonicCentrality,
result,
jobId
jobId,
result.nodePropertyValues()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
*/
package org.neo4j.gds.applications.algorithms.centrality;

import org.neo4j.gds.algorithms.centrality.CentralityAlgorithmResult;
import org.neo4j.gds.algorithms.writeservices.WriteNodePropertyService;
import org.neo4j.gds.api.Graph;
import org.neo4j.gds.api.GraphStore;
import org.neo4j.gds.api.ResultStore;
import org.neo4j.gds.api.properties.nodes.NodePropertyValues;
import org.neo4j.gds.applications.algorithms.metadata.LabelForProgressTracking;
import org.neo4j.gds.applications.algorithms.metadata.NodePropertiesWritten;
import org.neo4j.gds.config.WriteConfig;
Expand All @@ -44,13 +44,13 @@ NodePropertiesWritten perform(
WriteConfig writeConfiguration,
WritePropertyConfig writePropertyConfiguration,
LabelForProgressTracking label,
CentralityAlgorithmResult result,
JobId jobId
JobId jobId,
NodePropertyValues nodePropertyValues
) {
var writeNodePropertyResult = writeNodePropertyService.write(
graph,
graphStore,
result.nodePropertyValues(),
nodePropertyValues,
writeConfiguration.writeConcurrency(),
writePropertyConfiguration.writeProperty(),
label.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.neo4j.gds.influenceMaximization;

import org.neo4j.gds.procedures.GraphDataScienceProcedures;
import org.neo4j.gds.procedures.centrality.celf.CELFWriteResult;
import org.neo4j.gds.procedures.algorithms.centrality.CELFWriteResult;
import org.neo4j.gds.applications.algorithms.machinery.MemoryEstimateResult;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
Expand All @@ -46,7 +46,7 @@ public Stream<CELFWriteResult> write(
@Name(value = "graphName") String graphName,
@Name(value = "configuration", defaultValue = "{}") Map<String, Object> configuration
) {
return facade.centrality().celfWrite(graphName, configuration);
return facade.algorithms().centrality().celfWrite(graphName, configuration);
}

@Procedure(name = "gds.influenceMaximization.celf.write.estimate", mode = READ)
Expand All @@ -55,7 +55,7 @@ public Stream<MemoryEstimateResult> estimate(
@Name(value = "graphNameOrConfiguration") Object graphNameOrConfiguration,
@Name(value = "algoConfiguration") Map<String, Object> algoConfiguration
) {
return facade.centrality().celfWriteEstimate(graphNameOrConfiguration, algoConfiguration);
return facade.algorithms().centrality().celfWriteEstimate(graphNameOrConfiguration, algoConfiguration);
}

@Procedure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.neo4j.gds.executor.ExecutionContext;
import org.neo4j.gds.executor.GdsCallable;
import org.neo4j.gds.procedures.algorithms.configuration.NewConfigFunction;
import org.neo4j.gds.procedures.centrality.celf.CELFWriteResult;
import org.neo4j.gds.procedures.algorithms.centrality.CELFWriteResult;

import java.util.stream.Stream;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.gds.procedures.centrality.celf;
package org.neo4j.gds.procedures.algorithms.centrality;

import org.neo4j.gds.applications.algorithms.machinery.AlgorithmProcessingTimings;
import org.neo4j.gds.result.AbstractResultBuilder;

import java.util.Map;
Expand Down Expand Up @@ -51,6 +52,17 @@ public static Builder builder() {
return new Builder();
}

static CELFWriteResult emptyFrom(AlgorithmProcessingTimings timings, Map<String, Object> configurationMap) {
return new CELFWriteResult(
timings.mutateOrWriteMillis,
0,
timings.computeMillis,
0,
0,
configurationMap
);
}

public static class Builder extends AbstractResultBuilder<CELFWriteResult> {
private double totalSpread;

Expand Down
Loading

0 comments on commit 08c22c6

Please sign in to comment.