Skip to content

Commit

Permalink
GZIP-pakkaus
Browse files Browse the repository at this point in the history
  • Loading branch information
augustk committed Jan 16, 2025
1 parent 82b713b commit 5bc4e68
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public static void main(String[] args) throws Exception {
System.getenv("DB_PASSWORD"),
System.getenv("OVARA_LAMPI_SIIRTAJA_BUCKET"),
System.getenv("LAMPI_S3_BUCKET"),
Regions.EU_WEST_1.getName());
Regions.EU_WEST_1.getName(),
"fulldump/ovara/v1/");

LampiSiirtajaService service = new LampiSiirtajaService(config);
service.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ public record Config(
String postgresPassword,
String ovaraS3Bucket,
String lampiS3Bucket,
String awsRegion) {}
String awsRegion,
String lampiKeyPrefix) {}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
import com.google.common.collect.Iterators;
import fi.oph.opintopolku.ovara.config.Config;
import fi.oph.opintopolku.ovara.io.MultiInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.*;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.zip.GZIPOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -25,7 +22,6 @@ public class LampiS3Transfer {
private static final Logger LOG = LoggerFactory.getLogger(LampiS3Transfer.class);

private static final int UPLOAD_PART_SIZE = 99 * 1024 * 1024;
private static final ExecutorService executor = Executors.newSingleThreadExecutor();

private final Config config;
private final AmazonS3 ovaraS3Client;
Expand All @@ -45,6 +41,24 @@ private Supplier<S3ObjectInputStream> constructSupplier(String downloadFilename)
ovaraS3Client.getObject(config.ovaraS3Bucket(), downloadFilename).getObjectContent();
}

public void startGZIPCompressing(OutputStream out, InputStream in) {
try (GZIPOutputStream gOut = new GZIPOutputStream(out)) {
byte[] buffer = new byte[10240];
int len;
while ((len = in.read(buffer)) != -1) {
gOut.write(buffer, 0, len);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
try {
out.close();
} catch (Exception e) {
LOG.error("GZIP-pakkauksen streamin sulkeminen epäonnistui", e);
}
}
}

private PartETag submitTaskForUploading(
String uploadFilename, ByteArrayInputStream inputStream, boolean isFinalPart) {
int eachPartId = uploadPartId.incrementAndGet();
Expand Down Expand Up @@ -73,7 +87,8 @@ private PartETag submitTaskForUploading(
return uploadResult.getPartETag();
}

public String transferToLampi(String filename, int numberOfFiles) throws Exception {
public String transferToLampi(String filename, String uploadFilename, int numberOfFiles)
throws Exception {

LOG.info(
"Aloitetaan tiedoston {} lähettäminen Lammen S3-ämpäriin joka on {} palassa",
Expand All @@ -82,8 +97,9 @@ public String transferToLampi(String filename, int numberOfFiles) throws Excepti

ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentType("text/csv");
metadata.setContentEncoding("gzip");
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(config.lampiS3Bucket(), filename)
new InitiateMultipartUploadRequest(config.lampiS3Bucket(), uploadFilename)
.withObjectMetadata(metadata);
InitiateMultipartUploadResult initResult = lampiS3Client.initiateMultipartUpload(initRequest);

Expand All @@ -102,14 +118,21 @@ public String transferToLampi(String filename, int numberOfFiles) throws Excepti
Enumeration<Supplier<S3ObjectInputStream>> streams =
Iterators.asEnumeration(streamsFList.iterator());

InputStream inputStream = new MultiInputStream(streams);
InputStream multiInputStream = new MultiInputStream(streams);

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
PipedInputStream pipedInputStream = new PipedInputStream();
pipedInputStream.connect(pipedOutputStream);

Thread thread = new Thread(() -> startGZIPCompressing(pipedOutputStream, multiInputStream));
thread.start();

int bytesRead, bytesAdded = 0;
byte[] data = new byte[UPLOAD_PART_SIZE];
ByteArrayOutputStream bufferOutputStream = new ByteArrayOutputStream();
List<PartETag> parts = new ArrayList<>();

while ((bytesRead = inputStream.read(data, 0, data.length)) != -1) {
while ((bytesRead = pipedInputStream.read(data, 0, data.length)) != -1) {
bufferOutputStream.write(data, 0, bytesRead);

if (bytesAdded < UPLOAD_PART_SIZE) {
Expand All @@ -118,19 +141,19 @@ public String transferToLampi(String filename, int numberOfFiles) throws Excepti
}
PartETag partETag =
submitTaskForUploading(
filename, new ByteArrayInputStream(bufferOutputStream.toByteArray()), false);
uploadFilename, new ByteArrayInputStream(bufferOutputStream.toByteArray()), false);
parts.add(partETag);
bufferOutputStream.reset(); // flush the bufferOutputStream
bytesAdded = 0; // reset the bytes added to 0
}

PartETag partETag =
submitTaskForUploading(
filename, new ByteArrayInputStream(bufferOutputStream.toByteArray()), true);
uploadFilename, new ByteArrayInputStream(bufferOutputStream.toByteArray()), true);
parts.add(partETag);

CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(config.lampiS3Bucket(), filename, uploadId, parts);
new CompleteMultipartUploadRequest(config.lampiS3Bucket(), uploadFilename, uploadId, parts);
CompleteMultipartUploadResult completeMultipartUploadResult =
lampiS3Client.completeMultipartUpload(completeRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@ public void run() throws Exception {
int numberOfFiles = result.getValue1().getFiles_uploaded();

String filename = String.format("%s.csv", tableName);
String uploadFilename =
String.format("%s%s.gz", config.lampiKeyPrefix(), filename);

LampiS3Transfer transfer = new LampiS3Transfer(config);

try {
String versionId = transfer.transferToLampi(filename, numberOfFiles);
manifestItems.add(new ManifestItem(filename, versionId));
String versionId =
transfer.transferToLampi(filename, uploadFilename, numberOfFiles);
manifestItems.add(new ManifestItem(uploadFilename, versionId));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 5bc4e68

Please sign in to comment.