Skip to content

Commit

Permalink
Quads Loader + Docker compose:
Browse files Browse the repository at this point in the history
- Use streams only
- Use sha256 images
  • Loading branch information
JPugetGil committed Oct 1, 2024
1 parent fac925e commit 89d9252
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 107 deletions.
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
postgres:
image: postgres:16.2
image: postgres@sha256:4ec37d2a07a0067f176fdcc9d4bb633a5724d2cc4f892c7a2046d054bb6939e5
ports:
- "5432:5432"
environment:
Expand All @@ -17,7 +17,7 @@ services:
memory: ${BD_RAM_LIMITATION:-16Gb}

postgres-test:
image: postgres:16.2
image: postgres@sha256:4ec37d2a07a0067f176fdcc9d4bb633a5724d2cc4f892c7a2046d054bb6939e5
ports:
- "5433:5432"
environment:
Expand All @@ -33,7 +33,7 @@ services:
memory: ${BD_RAM_LIMITATION:-16Gb}

blazegraph:
image: vcity/blazegraph-cors
image: vcity/blazegraph-cors@sha256:c6f9556ca53ff01304557e349d2f10b3e121dae7230426f4c64fa42b2cbaf805
ports:
- "9999:8080"
container_name: blazegraph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
import fr.vcity.converg.repository.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils;
import org.apache.jena.graph.Graph;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
import org.apache.jena.riot.RDFLanguages;
import org.apache.jena.riot.RDFParser;
import org.apache.jena.riot.RiotException;
import org.apache.jena.riot.system.ErrorHandlerFactory;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.riot.system.StreamRDF;
import org.apache.jena.riot.system.StreamRDFBase;
import org.apache.jena.sparql.core.Quad;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;
import java.io.InputStream;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

@Service
@Slf4j
Expand Down Expand Up @@ -66,6 +66,10 @@ public record QuadValueType(TripleValueType tripleValueType, String namedGraph,
IVersionRepository versionRepository;
VersionedQuadComponent versionedQuadComponent;

List<QuadValueType> quadValueTypes = new ArrayList<>();
Set<String> namedGraphs = new HashSet<>();
List<TripleValueType> tripleValueTypes = new ArrayList<>();

public QuadImportService(
IFlatModelQuadRepository flatModelQuadRepository,
IFlatModelTripleRepository flatModelTripleRepository,
Expand Down Expand Up @@ -106,18 +110,12 @@ public Integer importModel(MultipartFile file) throws RiotException {

log.info("Current file: {}", file.getOriginalFilename());

try (InputStream inputStream = file.getInputStream()) {
DatasetGraph datasetGraph =
RDFParser.create()
.source(inputStream)
.lang(RDFLanguages.nameToLang(FilenameUtils.getExtension(file.getOriginalFilename())))
.errorHandler(ErrorHandlerFactory.errorHandlerStrict)
.toDatasetGraph();

try (InputStream inputStream = file.getInputStream()) {
Long start = System.nanoTime();

extractAndInsertVersionedNamedGraph(file, datasetGraph, version);
extractAndInsertQuads(datasetGraph, version);
getQuadsStreamRDF(inputStream, file.getOriginalFilename(), version.getIndexVersion())
.finish();

log.info("Saving quads to catalog");
rdfResourceRepository.flatModelQuadsToCatalog();
Expand Down Expand Up @@ -149,23 +147,10 @@ public void importMetadata(MultipartFile file) throws RiotException {
log.info("Current file: {}", file.getOriginalFilename());

try (InputStream inputStream = file.getInputStream()) {
DatasetGraph datasetGraph =
RDFParser.create()
.source(inputStream)
.lang(RDFLanguages.nameToLang(FilenameUtils.getExtension(file.getOriginalFilename())))
.errorHandler(ErrorHandlerFactory.errorHandlerStrict)
.toDatasetGraph();

Long start = System.nanoTime();

if (!datasetGraph.getDefaultGraph().isEmpty()) {
importDefaultModel(datasetGraph.getDefaultGraph());
}

for (Iterator<Node> i = datasetGraph.listGraphNodes(); i.hasNext(); ) {
var graphNode = i.next();
importDefaultModel(datasetGraph.getGraph(graphNode));
}
getTriplesStreamRDF(inputStream, file.getOriginalFilename())
.finish();

log.info("Saving triples to catalog");
rdfResourceRepository.flatModelTriplesToCatalog();
Expand Down Expand Up @@ -200,96 +185,99 @@ public void removeMetadata() {
}

/**
* Import RDF default model statements
* Save the triples in batch
*/
private void saveBatchTriples() {
metadataComponent.saveTriples(tripleValueTypes);
tripleValueTypes.clear();
}

/**
* Save the quads in batch
*/
private void saveBatchQuads() {
versionedQuadComponent.saveQuads(quadValueTypes);
quadValueTypes.clear();
}

/**
* Save the versioned named graph in batch
*
* @param graph The default graph
* @param filename The input filename
* @param version The version number
*/
private void importDefaultModel(Graph graph) {
List<TripleValueType> tripleValueTypes = new ArrayList<>();

graph.stream()
.forEach(triple -> {
tripleValueTypes.add(getTripleValueType(triple));

if (tripleValueTypes.size() == 50000) {
log.info("50000 records found. Executing batch save triples");
metadataComponent.saveTriples(tripleValueTypes);
tripleValueTypes.clear();
}
});

if (!tripleValueTypes.isEmpty()) {
metadataComponent.saveTriples(tripleValueTypes);
}
private void saveBatchVersionedNamedGraph(String filename, Integer version) {
versionedNamedGraphComponent.saveVersionedNamedGraph(namedGraphs.stream().toList(), filename, version);
namedGraphs.clear();
}

/**
* Extract and insert the quads
* Converts a stream into a stream of quads
*
* @param datasetGraph The datasetGraph
* @param version The version
* @param in The input stream of the dataset
* @return A stream of quads
*/
private void extractAndInsertQuads(DatasetGraph datasetGraph, Version version) {
List<QuadValueType> quadValueTypes = new ArrayList<>();

datasetGraph.stream()
.forEach(quad -> {
QuadValueType quadValueType = new QuadValueType(
getTripleValueType(
quad.asTriple()
),
quad.getGraph().getURI(),
version.getIndexVersion() - 1
);
quadValueTypes.add(quadValueType);

if (quadValueTypes.size() == 50000) {
log.info("50000 records found. Executing batch save quads");
versionedQuadComponent.saveQuads(quadValueTypes);
quadValueTypes.clear();
}
});

datasetGraph.getDefaultGraph()
.stream()
.forEach(triple -> {
QuadValueType quadValueType = new QuadValueType(getTripleValueType(triple), defaultGraphURI.getName(), version.getIndexVersion() - 1);
quadValueTypes.add(quadValueType);

if (quadValueTypes.size() == 50000) {
log.info("50000 records found. Executing batch save quads");
versionedQuadComponent.saveQuads(quadValueTypes);
quadValueTypes.clear();
}
});

if (!quadValueTypes.isEmpty()) {
versionedQuadComponent.saveQuads(quadValueTypes);
}
private StreamRDF getQuadsStreamRDF(InputStream in, String filename, Integer version) {
StreamRDF quadStreamRDF = new StreamRDFBase() {
@Override
public void quad(Quad quad) {
namedGraphs.add(quad.getGraph().getURI());
quadValueTypes.add(new QuadValueType(
getTripleValueType(quad.asTriple()),
quad.getGraph().getURI(),
version - 1
));

if (namedGraphs.size() == 50000) {
log.info("50000 named graph records found. Executing batch save named graph");
saveBatchVersionedNamedGraph(filename, version);
}

if (quadValueTypes.size() == 50000) {
log.info("50000 quads records found. Executing batch save quads");
saveBatchQuads();
}
}
};

RDFParser.create()
.source(in)
.lang(RDFLanguages.nameToLang(FilenameUtils.getExtension(filename)))
.parse(quadStreamRDF);

saveBatchVersionedNamedGraph(filename, version);
saveBatchQuads();

return quadStreamRDF;
}

/**
* Extract and insert the versioned named graph
* Converts a stream into a stream of quads
*
* @param file The input file
* @param datasetGraph The datasetGraph
* @param version The version
* @param in The input stream of the dataset
* @return A stream of quads
*/
private void extractAndInsertVersionedNamedGraph(MultipartFile file, DatasetGraph datasetGraph, Version version) {
List<String> namedGraphs = new ArrayList<>();
private StreamRDF getTriplesStreamRDF(InputStream in, String filename) {
StreamRDF tripleStreamRDF = new StreamRDFBase() {
@Override
public void triple(Triple triple) {
tripleValueTypes.add(getTripleValueType(triple));

if (tripleValueTypes.size() == 50000) {
log.info("50000 triples records found. Executing batch save quads");
saveBatchTriples();
}
}
};

for (Iterator<Node> i = datasetGraph.listGraphNodes(); i.hasNext(); ) {
var namedModel = i.next();
namedGraphs.add(namedModel.getURI());
}
RDFParser.create()
.source(in)
.lang(RDFLanguages.nameToLang(FilenameUtils.getExtension(filename)))
.parse(tripleStreamRDF);

if (!datasetGraph.getDefaultGraph().isEmpty()) {
namedGraphs.add(defaultGraphURI.getName());
}
saveBatchTriples();

if (!namedGraphs.isEmpty()) {
versionedNamedGraphComponent.saveVersionedNamedGraph(namedGraphs, file.getOriginalFilename(), version.getIndexVersion());
}
return tripleStreamRDF;
}

/**
Expand Down

0 comments on commit 89d9252

Please sign in to comment.