Skip to content

Commit

Permalink
Merge pull request #1031 from ie3-institute/ms/#1007-refactor-CsvFile…
Browse files Browse the repository at this point in the history
…Connector-and-CsvDataSource

Refactor `CsvFileConnector` and `CsvDataSource`.
  • Loading branch information
sebastian-peter committed Jun 13, 2024
2 parents b6b1508 + 97913de commit e392ecc
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 335 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
- Improvements to the search for corner points in `IdCoordinateSource` [#1016](https://github.com/ie3-institute/PowerSystemDataModel/issues/1016)
- Refactor `CsvFileConnector` and `CsvDataSource` [#1007](https://github.com/ie3-institute/PowerSystemDataModel/issues/1007)


## [5.0.1] - 2024-03-07
Expand Down
140 changes: 15 additions & 125 deletions src/main/java/edu/ie3/datamodel/io/connectors/CsvFileConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,23 @@

import edu.ie3.datamodel.exceptions.ConnectorException;
import edu.ie3.datamodel.io.IoUtil;
import edu.ie3.datamodel.io.csv.*;
import edu.ie3.datamodel.io.naming.FileNamingStrategy;
import edu.ie3.datamodel.io.naming.TimeSeriesMetaInformation;
import edu.ie3.datamodel.io.naming.timeseries.ColumnScheme;
import edu.ie3.datamodel.io.naming.timeseries.IndividualTimeSeriesMetaInformation;
import edu.ie3.datamodel.io.csv.BufferedCsvWriter;
import edu.ie3.datamodel.io.csv.CsvFileDefinition;
import edu.ie3.datamodel.models.Entity;
import edu.ie3.datamodel.models.timeseries.TimeSeries;
import edu.ie3.datamodel.models.timeseries.TimeSeriesEntry;
import edu.ie3.datamodel.models.value.Value;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Provides the connector (here: buffered writer) for specific files to be used by a {@link
* edu.ie3.datamodel.io.sink.CsvFileSink}
* edu.ie3.datamodel.io.sink.CsvFileSink} or {@link edu.ie3.datamodel.io.source.csv.CsvDataSource}
*
* @version 0.1
* @since 19.03.20
Expand All @@ -39,27 +33,26 @@ public class CsvFileConnector implements DataConnector {

private final Map<Class<? extends Entity>, BufferedCsvWriter> entityWriters = new HashMap<>();
private final Map<UUID, BufferedCsvWriter> timeSeriesWriters = new HashMap<>();

private final FileNamingStrategy fileNamingStrategy;
private final Path baseDirectory;

private static final String FILE_ENDING = ".csv";

public CsvFileConnector(Path baseDirectory, FileNamingStrategy fileNamingStrategy) {
public CsvFileConnector(Path baseDirectory) {
this.baseDirectory = baseDirectory;
this.fileNamingStrategy = fileNamingStrategy;
}

/** Returns the base directory of this connector. */
public Path getBaseDirectory() {
return baseDirectory;
}

public synchronized BufferedCsvWriter getOrInitWriter(
Class<? extends Entity> clz, String[] headerElements, String csvSep)
throws ConnectorException {
/* Try to the the right writer */
Class<? extends Entity> clz, CsvFileDefinition fileDefinition) throws ConnectorException {
/* Try to the right writer */
BufferedCsvWriter predefinedWriter = entityWriters.get(clz);
if (predefinedWriter != null) return predefinedWriter;

/* If it is not available, build and register one */
try {
CsvFileDefinition fileDefinition = buildFileDefinition(clz, headerElements, csvSep);
BufferedCsvWriter newWriter = initWriter(baseDirectory, fileDefinition);

entityWriters.put(clz, newWriter);
Expand All @@ -71,15 +64,14 @@ public synchronized BufferedCsvWriter getOrInitWriter(
}

public synchronized <T extends TimeSeries<E, V>, E extends TimeSeriesEntry<V>, V extends Value>
BufferedCsvWriter getOrInitWriter(T timeSeries, String[] headerElements, String csvSep)
BufferedCsvWriter getOrInitWriter(T timeSeries, CsvFileDefinition fileDefinition)
throws ConnectorException {
/* Try to the the right writer */
/* Try to the right writer */
BufferedCsvWriter predefinedWriter = timeSeriesWriters.get(timeSeries.getUuid());
if (predefinedWriter != null) return predefinedWriter;

/* If it is not available, build and register one */
try {
CsvFileDefinition fileDefinition = buildFileDefinition(timeSeries, headerElements, csvSep);
BufferedCsvWriter newWriter = initWriter(baseDirectory, fileDefinition);

timeSeriesWriters.put(timeSeries.getUuid(), newWriter);
Expand Down Expand Up @@ -131,8 +123,7 @@ public synchronized void closeTimeSeriesWriter(UUID uuid) throws IOException {
Optional<BufferedCsvWriter> maybeWriter = Optional.ofNullable(timeSeriesWriters.get(uuid));
if (maybeWriter.isPresent()) {
log.debug("Remove reference to time series writer for UUID '{}'.", uuid);
timeSeriesWriters.remove(uuid);
maybeWriter.get().close();
timeSeriesWriters.remove(uuid).close();
} else {
log.warn("No writer found for time series '{}'.", uuid);
}
Expand All @@ -149,8 +140,7 @@ public synchronized <C extends Entity> void closeEntityWriter(Class<C> clz) thro
Optional<BufferedCsvWriter> maybeWriter = Optional.ofNullable(entityWriters.get(clz));
if (maybeWriter.isPresent()) {
log.debug("Remove reference to entity writer for class '{}'.", clz);
entityWriters.remove(clz);
maybeWriter.get().close();
entityWriters.remove(clz).close();
} else {
log.warn("No writer found for class '{}'.", clz);
}
Expand All @@ -170,106 +160,6 @@ public BufferedReader initReader(Path filePath) throws FileNotFoundException {
new InputStreamReader(new FileInputStream(fullPath), StandardCharsets.UTF_8), 16384);
}

/**
* Receive the information for specific time series. They are given back filtered by the column
* scheme in order to allow for accounting the different content types.
*
* @param columnSchemes the column schemes to initialize readers for. If no scheme is given, all
* possible readers will be initialized.
* @return A mapping from column scheme to the individual time series meta information
*/
public Map<UUID, CsvIndividualTimeSeriesMetaInformation>
getCsvIndividualTimeSeriesMetaInformation(final ColumnScheme... columnSchemes) {
return getIndividualTimeSeriesFilePaths().parallelStream()
.map(
filePath -> {
/* Extract meta information from file path and enhance it with the file path itself */
IndividualTimeSeriesMetaInformation metaInformation =
fileNamingStrategy.individualTimeSeriesMetaInformation(filePath.toString());
return new CsvIndividualTimeSeriesMetaInformation(
metaInformation, FileNamingStrategy.removeFileNameEnding(filePath.getFileName()));
})
.filter(
metaInformation ->
columnSchemes == null
|| columnSchemes.length == 0
|| Stream.of(columnSchemes)
.anyMatch(scheme -> scheme.equals(metaInformation.getColumnScheme())))
.collect(Collectors.toMap(TimeSeriesMetaInformation::getUuid, Function.identity()));
}

/**
* Returns a set of relative paths strings to time series files, with respect to the base folder
* path
*
* @return A set of relative paths to time series files, with respect to the base folder path
*/
private Set<Path> getIndividualTimeSeriesFilePaths() {
try (Stream<Path> pathStream = Files.walk(baseDirectory)) {
return pathStream
.map(baseDirectory::relativize)
.filter(
path -> {
Path withoutEnding =
Path.of(FileNamingStrategy.removeFileNameEnding(path.toString()));
return fileNamingStrategy
.getIndividualTimeSeriesPattern()
.matcher(withoutEnding.toString())
.matches();
})
.collect(Collectors.toSet());
} catch (IOException e) {
log.error("Unable to determine time series files readers for time series.", e);
return Collections.emptySet();
}
}

/**
* Builds a new file definition consisting of file name and head line elements
*
* @param timeSeries Time series to derive naming information from
* @param headLineElements Array of head line elements
* @param csvSep Separator for csv columns
* @return A suitable file definition
* @throws ConnectorException If the definition cannot be determined
*/
private <T extends TimeSeries<E, V>, E extends TimeSeriesEntry<V>, V extends Value>
CsvFileDefinition buildFileDefinition(T timeSeries, String[] headLineElements, String csvSep)
throws ConnectorException {
Path directoryPath = fileNamingStrategy.getDirectoryPath(timeSeries).orElse(Path.of(""));
String fileName =
fileNamingStrategy
.getEntityName(timeSeries)
.orElseThrow(
() ->
new ConnectorException(
"Cannot determine the file name for time series '" + timeSeries + "'."));
return new CsvFileDefinition(fileName, directoryPath, headLineElements, csvSep);
}

/**
* Builds a new file definition consisting of file name and head line elements
*
* @param clz Class that is meant to be serialized into this file
* @param headLineElements Array of head line elements
* @param csvSep Separator for csv columns
* @return A suitable file definition
* @throws ConnectorException If the definition cannot be determined
*/
private CsvFileDefinition buildFileDefinition(
Class<? extends Entity> clz, String[] headLineElements, String csvSep)
throws ConnectorException {
Path directoryPath = fileNamingStrategy.getDirectoryPath(clz).orElse(Path.of(""));
String fileName =
fileNamingStrategy
.getEntityName(clz)
.orElseThrow(
() ->
new ConnectorException(
"Cannot determine the file name for class '" + clz.getSimpleName() + "'."));
return new CsvFileDefinition(fileName, directoryPath, headLineElements, csvSep);
}

@Override
public void shutdown() {
Stream.of(entityWriters.values(), timeSeriesWriters.values())
Expand Down
67 changes: 67 additions & 0 deletions src/main/java/edu/ie3/datamodel/io/csv/CsvFileDefinition.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
*/
package edu.ie3.datamodel.io.csv;

import edu.ie3.datamodel.exceptions.FileException;
import edu.ie3.datamodel.io.naming.FileNamingStrategy;
import edu.ie3.datamodel.models.Entity;
import edu.ie3.datamodel.models.timeseries.TimeSeries;
import edu.ie3.datamodel.models.timeseries.TimeSeriesEntry;
import edu.ie3.datamodel.models.value.Value;
import edu.ie3.datamodel.utils.FileUtils;
import java.nio.file.Path;
import java.util.Arrays;
Expand All @@ -23,6 +29,67 @@ public CsvFileDefinition(
this(FileUtils.ofCsv(fileName, directoryPath), headLineElements, csvSep);
}

/**
* Builds a new file definition consisting of file name and headline elements
*
* @param clz Class that is meant to be serialized into this file
* @param headLineElements Array of headline elements
* @param csvSep Separator for csv columns
* @param fileNamingStrategy that should be used
* @throws FileException If the definition cannot be determined
*/
public CsvFileDefinition(
Class<? extends Entity> clz,
String[] headLineElements,
String csvSep,
FileNamingStrategy fileNamingStrategy)
throws FileException {
this(
FileUtils.ofCsv(
fileNamingStrategy
.getEntityName(clz)
.orElseThrow(
() ->
new FileException(
"Cannot determine the file name for class '"
+ clz.getSimpleName()
+ "'.")),
fileNamingStrategy.getDirectoryPath(clz).orElse(Path.of(""))),
headLineElements,
csvSep);
}

/**
* Builds a new file definition consisting of file name and headline elements
*
* @param timeSeries Time series to derive naming information from
* @param headLineElements Array of headline elements
* @param csvSep Separator for csv columns
* @param fileNamingStrategy that should be used
* @throws FileException If the definition cannot be determined
*/
public <T extends TimeSeries<E, V>, E extends TimeSeriesEntry<V>, V extends Value>
CsvFileDefinition(
T timeSeries,
String[] headLineElements,
String csvSep,
FileNamingStrategy fileNamingStrategy)
throws FileException {
this(
FileUtils.ofCsv(
fileNamingStrategy
.getEntityName(timeSeries)
.orElseThrow(
() ->
new FileException(
"Cannot determine the file name for time series '"
+ timeSeries
+ "'.")),
fileNamingStrategy.getDirectoryPath(timeSeries).orElse(Path.of(""))),
headLineElements,
csvSep);
}

/**
* @return The path to the file relative to a not explicitly defined base directory, including the
* file extension
Expand Down
19 changes: 13 additions & 6 deletions src/main/java/edu/ie3/datamodel/io/sink/CsvFileSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import edu.ie3.datamodel.exceptions.*;
import edu.ie3.datamodel.io.connectors.CsvFileConnector;
import edu.ie3.datamodel.io.csv.BufferedCsvWriter;
import edu.ie3.datamodel.io.csv.CsvFileDefinition;
import edu.ie3.datamodel.io.extractor.Extractor;
import edu.ie3.datamodel.io.extractor.NestedEntity;
import edu.ie3.datamodel.io.naming.FileNamingStrategy;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class CsvFileSink implements InputDataSink, OutputDataSink {

private final CsvFileConnector connector;
private final ProcessorProvider processorProvider;

private final FileNamingStrategy fileNamingStrategy;
private final String csvSep;

public CsvFileSink(Path baseFolderPath) throws EntityProcessorException {
Expand Down Expand Up @@ -95,7 +96,8 @@ public CsvFileSink(
String csvSep) {
this.csvSep = csvSep;
this.processorProvider = processorProvider;
this.connector = new CsvFileConnector(baseFolderPath, fileNamingStrategy);
this.connector = new CsvFileConnector(baseFolderPath);
this.fileNamingStrategy = fileNamingStrategy;
}

@Override
Expand Down Expand Up @@ -246,13 +248,16 @@ public <E extends TimeSeriesEntry<V>, V extends Value> void persistTimeSeries(
try {
TimeSeriesProcessorKey key = new TimeSeriesProcessorKey(timeSeries);
String[] headerElements = csvHeaderElements(processorProvider.getHeaderElements(key));
BufferedCsvWriter writer = connector.getOrInitWriter(timeSeries, headerElements, csvSep);
BufferedCsvWriter writer =
connector.getOrInitWriter(
timeSeries,
new CsvFileDefinition(timeSeries, headerElements, csvSep, fileNamingStrategy));
persistTimeSeries(timeSeries, writer);
connector.closeTimeSeriesWriter(timeSeries.getUuid());
} catch (ProcessorProviderException e) {
log.error(
"Exception occurred during receiving of header elements. Cannot write this element.", e);
} catch (ConnectorException e) {
} catch (ConnectorException | FileException e) {
log.error("Exception occurred during acquisition of writer.", e);
} catch (IOException e) {
log.error("Exception occurred during closing of writer.", e);
Expand Down Expand Up @@ -292,12 +297,14 @@ private <C extends Entity> void write(C entity) {
processorProvider.handleEntity(entity).map(this::csvEntityFieldData).getOrThrow();
String[] headerElements = processorProvider.getHeaderElements(entity.getClass());
BufferedCsvWriter writer =
connector.getOrInitWriter(entity.getClass(), headerElements, csvSep);
connector.getOrInitWriter(
entity.getClass(),
new CsvFileDefinition(entity.getClass(), headerElements, csvSep, fileNamingStrategy));
writer.write(entityFieldData);
} catch (ProcessorProviderException e) {
log.error(
"Exception occurred during receiving of header elements. Cannot write this element.", e);
} catch (ConnectorException e) {
} catch (ConnectorException | FileException e) {
log.error("Exception occurred during retrieval of writer. Cannot write this element.", e);
} catch (IOException e) {
log.error("Exception occurred during writing of this element. Cannot write this element.", e);
Expand Down
Loading

0 comments on commit e392ecc

Please sign in to comment.