diff --git a/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/App.java b/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/App.java index 524919f..e87fe63 100644 --- a/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/App.java +++ b/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/App.java @@ -22,7 +22,8 @@ public static void main(String[] args) throws Exception { System.getenv("DB_PASSWORD"), System.getenv("OVARA_LAMPI_SIIRTAJA_BUCKET"), System.getenv("LAMPI_S3_BUCKET"), - Regions.EU_WEST_1.getName()); + Regions.EU_WEST_1.getName(), + "fulldump/ovara/v1/"); LampiSiirtajaService service = new LampiSiirtajaService(config); service.run(); diff --git a/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/config/Config.java b/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/config/Config.java index 65f6f5c..5dcf057 100644 --- a/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/config/Config.java +++ b/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/config/Config.java @@ -7,4 +7,5 @@ public record Config( String postgresPassword, String ovaraS3Bucket, String lampiS3Bucket, - String awsRegion) {} + String awsRegion, + String lampiKeyPrefix) {} diff --git a/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/s3/LampiS3Transfer.java b/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/s3/LampiS3Transfer.java index 529b94c..f88783c 100644 --- a/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/s3/LampiS3Transfer.java +++ b/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/s3/LampiS3Transfer.java @@ -6,17 +6,14 @@ import com.google.common.collect.Iterators; import fi.oph.opintopolku.ovara.config.Config; import fi.oph.opintopolku.ovara.io.MultiInputStream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.InputStream; +import java.io.*; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.IntStream; +import java.util.zip.GZIPOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,7 +22,6 @@ public class LampiS3Transfer { private static final Logger LOG = LoggerFactory.getLogger(LampiS3Transfer.class); private static final int UPLOAD_PART_SIZE = 99 * 1024 * 1024; - private static final ExecutorService executor = Executors.newSingleThreadExecutor(); private final Config config; private final AmazonS3 ovaraS3Client; @@ -45,6 +41,24 @@ private Supplier constructSupplier(String downloadFilename) ovaraS3Client.getObject(config.ovaraS3Bucket(), downloadFilename).getObjectContent(); } + public void startGZIPCompressing(OutputStream out, InputStream in) { + try (GZIPOutputStream gOut = new GZIPOutputStream(out)) { + byte[] buffer = new byte[10240]; + int len; + while ((len = in.read(buffer)) != -1) { + gOut.write(buffer, 0, len); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } finally { + try { + out.close(); + } catch (Exception e) { + LOG.error("GZIP-pakkauksen streamin sulkeminen epäonnistui", e); + } + } + } + private PartETag submitTaskForUploading( String uploadFilename, ByteArrayInputStream inputStream, boolean isFinalPart) { int eachPartId = uploadPartId.incrementAndGet(); @@ -73,7 +87,8 @@ private PartETag submitTaskForUploading( return uploadResult.getPartETag(); } - public String transferToLampi(String filename, int numberOfFiles) throws Exception { + public String transferToLampi(String filename, String uploadFilename, int numberOfFiles) + throws Exception { LOG.info( "Aloitetaan tiedoston {} lähettäminen Lammen S3-ämpäriin joka on {} palassa", @@ -82,8 +97,9 @@ public String transferToLampi(String filename, int numberOfFiles) throws Excepti ObjectMetadata metadata = new ObjectMetadata(); metadata.setContentType("text/csv"); + metadata.setContentEncoding("gzip"); InitiateMultipartUploadRequest initRequest = - new InitiateMultipartUploadRequest(config.lampiS3Bucket(), filename) + new InitiateMultipartUploadRequest(config.lampiS3Bucket(), uploadFilename) .withObjectMetadata(metadata); InitiateMultipartUploadResult initResult = lampiS3Client.initiateMultipartUpload(initRequest); @@ -102,14 +118,21 @@ public String transferToLampi(String filename, int numberOfFiles) throws Excepti Enumeration> streams = Iterators.asEnumeration(streamsFList.iterator()); - InputStream inputStream = new MultiInputStream(streams); + InputStream multiInputStream = new MultiInputStream(streams); + + final PipedOutputStream pipedOutputStream = new PipedOutputStream(); + PipedInputStream pipedInputStream = new PipedInputStream(); + pipedInputStream.connect(pipedOutputStream); + + Thread thread = new Thread(() -> startGZIPCompressing(pipedOutputStream, multiInputStream)); + thread.start(); int bytesRead, bytesAdded = 0; byte[] data = new byte[UPLOAD_PART_SIZE]; ByteArrayOutputStream bufferOutputStream = new ByteArrayOutputStream(); List parts = new ArrayList<>(); - while ((bytesRead = inputStream.read(data, 0, data.length)) != -1) { + while ((bytesRead = pipedInputStream.read(data, 0, data.length)) != -1) { bufferOutputStream.write(data, 0, bytesRead); if (bytesAdded < UPLOAD_PART_SIZE) { @@ -118,7 +141,7 @@ public String transferToLampi(String filename, int numberOfFiles) throws Excepti } PartETag partETag = submitTaskForUploading( - filename, new ByteArrayInputStream(bufferOutputStream.toByteArray()), false); + uploadFilename, new ByteArrayInputStream(bufferOutputStream.toByteArray()), false); parts.add(partETag); bufferOutputStream.reset(); // flush the bufferOutputStream bytesAdded = 0; // reset the bytes added to 0 @@ -126,11 +149,11 @@ public String transferToLampi(String filename, int numberOfFiles) throws Excepti PartETag partETag = submitTaskForUploading( - filename, new ByteArrayInputStream(bufferOutputStream.toByteArray()), true); + uploadFilename, new ByteArrayInputStream(bufferOutputStream.toByteArray()), true); parts.add(partETag); CompleteMultipartUploadRequest completeRequest = - new CompleteMultipartUploadRequest(config.lampiS3Bucket(), filename, uploadId, parts); + new CompleteMultipartUploadRequest(config.lampiS3Bucket(), uploadFilename, uploadId, parts); CompleteMultipartUploadResult completeMultipartUploadResult = lampiS3Client.completeMultipartUpload(completeRequest); diff --git a/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/service/LampiSiirtajaService.java b/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/service/LampiSiirtajaService.java index f66ade3..d010e3d 100644 --- a/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/service/LampiSiirtajaService.java +++ b/lampi-siirtaja-container/lampi-siirtaja/src/main/java/fi/oph/opintopolku/ovara/service/LampiSiirtajaService.java @@ -44,12 +44,15 @@ public void run() throws Exception { int numberOfFiles = result.getValue1().getFiles_uploaded(); String filename = String.format("%s.csv", tableName); + String uploadFilename = + String.format("%s%s.gz", config.lampiKeyPrefix(), filename); LampiS3Transfer transfer = new LampiS3Transfer(config); try { - String versionId = transfer.transferToLampi(filename, numberOfFiles); - manifestItems.add(new ManifestItem(filename, versionId)); + String versionId = + transfer.transferToLampi(filename, uploadFilename, numberOfFiles); + manifestItems.add(new ManifestItem(uploadFilename, versionId)); } catch (Exception e) { throw new RuntimeException(e); }