Skip to content

Commit

Permalink
attempt to be more memory efficient when fetching logs (#8380)
Browse files Browse the repository at this point in the history
  • Loading branch information
colesnodgrass committed Aug 17, 2023
1 parent 56831cc commit 46ac990
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ static File getFile(final Storage gcsClient, final LogConfigs configs, final Str
final var os = new FileOutputStream(tmpOutputFile);
LOGGER.debug("Start getting GCS objects.");
// Objects are returned in lexicographical order.
for (final Blob blob : blobs.iterateAll()) {
blob.downloadTo(os);
}
blobs.iterateAll().forEach(blob -> blob.downloadTo(os));
os.close();
LOGGER.debug("Done retrieving GCS logs: {}.", logPath);
return tmpOutputFile;
Expand All @@ -76,38 +74,42 @@ public List<String> tailCloudLog(final LogConfigs configs, final String logPath,

LOGGER.debug("Start GCS list request.");

final Page<Blob> blobs = gcsClient.list(
configs.getStorageConfigs().getGcsConfig().getBucketName(),
Storage.BlobListOption.prefix(logPath));

final var ascendingTimestampBlobs = new ArrayList<Blob>();
for (final Blob blob : blobs.iterateAll()) {
ascendingTimestampBlobs.add(blob);
}
final var descendingTimestampBlobs = Lists.reverse(ascendingTimestampBlobs);
gcsClient.list(
configs.getStorageConfigs().getGcsConfig().getBucketName(),
Storage.BlobListOption.prefix(logPath))
.iterateAll()
.forEach(ascendingTimestampBlobs::add);

final var lines = new ArrayList<String>();
int linesRead = 0;

LOGGER.debug("Start getting GCS objects.");
while (linesRead <= numLines && !descendingTimestampBlobs.isEmpty()) {
final var poppedBlob = descendingTimestampBlobs.remove(0);
try (final var inMemoryData = new ByteArrayOutputStream()) {
poppedBlob.downloadTo(inMemoryData);
final var currFileLines = inMemoryData.toString(StandardCharsets.UTF_8).split("\n");
final List<String> currFileLinesReversed = Lists.reverse(List.of(currFileLines));
for (final var line : currFileLinesReversed) {
if (linesRead == numLines) {
break;
}
lines.add(0, line);
linesRead++;
final var inMemoryData = new ByteArrayOutputStream();
// iterate through blobs in descending order (oldest first)
for (var i = ascendingTimestampBlobs.size() - 1; i >= 0; i--) {
inMemoryData.reset();

final var blob = ascendingTimestampBlobs.get(i);
blob.downloadTo(inMemoryData);

final String[] currFileLines = inMemoryData.toString(StandardCharsets.UTF_8).split("\n");
// Iterate through the lines in reverse order. This ensures we keep the newer messages over the
// older messages if we hit the numLines limit.
for (var j = currFileLines.length - 1; j >= 0; j--) {
lines.add(currFileLines[j]);
if (lines.size() >= numLines) {
break;
}
}

if (lines.size() >= numLines) {
break;
}
}

LOGGER.debug("Done retrieving GCS logs: {}.", logPath);
return lines;
// finally reverse the lines so they're returned in ascending order
return Lists.reverse(lines);
}

@Override
Expand All @@ -117,9 +119,7 @@ public void deleteLogs(final LogConfigs configs, final String logPath) {

LOGGER.debug("Start GCS list and delete request.");
final Page<Blob> blobs = gcsClient.list(configs.getStorageConfigs().getGcsConfig().getBucketName(), Storage.BlobListOption.prefix(logPath));
for (final Blob blob : blobs.iterateAll()) {
blob.delete(BlobSourceOption.generationMatch());
}
blobs.iterateAll().forEach(blob -> blob.delete(BlobSourceOption.generationMatch()));
LOGGER.debug("Finished all deletes.");
}

Expand All @@ -130,4 +130,12 @@ private Storage getOrCreateGcsClient() {
return gcs;
}

/**
* This method exists only for unit testing purposes.
*/
@VisibleForTesting
static void resetGcs() {
gcs = null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.helpers;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.cloud.storage.Storage;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.storage.CloudStorageConfigs;
import io.airbyte.config.storage.DefaultGcsClientFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Slf4j
@Tag("logger-client")
class GcsLogsIntTest {

private static Storage getClientFactory() {
return new DefaultGcsClientFactory(new CloudStorageConfigs.GcsConfig(
System.getenv(LogClientSingleton.GCS_LOG_BUCKET),
System.getenv(LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS))).get();
}

/**
* The test files here were generated by {@link S3LogsTest}. See that class for more information.
*
* Generate enough files to force pagination and confirm all data is read.
*/
@Test
void testRetrieveAllLogs() throws IOException {
final File data;
data = GcsLogs.getFile(getClientFactory(), (new EnvConfigs()).getLogConfigs(), "paginate", 6);
final var retrieved = new ArrayList<String>();
Files.lines(data.toPath()).forEach(retrieved::add);

final var expected = List.of("Line 0", "Line 1", "Line 2", "Line 3", "Line 4", "Line 5", "Line 6", "Line 7", "Line 8");

assertEquals(expected, retrieved);

}

/**
* The test files for this test have been pre-generated and uploaded into the bucket folder. The
* folder contains the following files with these contents:
* <li>first-file.txt - Line 1, Line 2, Line 3</li>
* <li>second-file.txt - Line 4, Line 5, Line 6</li>
* <li>third-file.txt - Line 7, Line 8, Line 9</li>
*/
@Test
void testTail() throws IOException {
final var data = new GcsLogs(GcsLogsIntTest::getClientFactory).tailCloudLog((new EnvConfigs()).getLogConfigs(), "tail", 6);

final var expected = List.of("Line 4", "Line 5", "Line 6", "Line 7", "Line 8", "Line 9");
assertEquals(data, expected);
}

}
Loading

0 comments on commit 46ac990

Please sign in to comment.