diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/GcsLogs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/GcsLogs.java index f17d642f211..8eef08e1686 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/GcsLogs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/GcsLogs.java @@ -61,9 +61,7 @@ static File getFile(final Storage gcsClient, final LogConfigs configs, final Str final var os = new FileOutputStream(tmpOutputFile); LOGGER.debug("Start getting GCS objects."); // Objects are returned in lexicographical order. - for (final Blob blob : blobs.iterateAll()) { - blob.downloadTo(os); - } + blobs.iterateAll().forEach(blob -> blob.downloadTo(os)); os.close(); LOGGER.debug("Done retrieving GCS logs: {}.", logPath); return tmpOutputFile; @@ -76,38 +74,42 @@ public List tailCloudLog(final LogConfigs configs, final String logPath, LOGGER.debug("Start GCS list request."); - final Page blobs = gcsClient.list( - configs.getStorageConfigs().getGcsConfig().getBucketName(), - Storage.BlobListOption.prefix(logPath)); - final var ascendingTimestampBlobs = new ArrayList(); - for (final Blob blob : blobs.iterateAll()) { - ascendingTimestampBlobs.add(blob); - } - final var descendingTimestampBlobs = Lists.reverse(ascendingTimestampBlobs); + gcsClient.list( + configs.getStorageConfigs().getGcsConfig().getBucketName(), + Storage.BlobListOption.prefix(logPath)) + .iterateAll() + .forEach(ascendingTimestampBlobs::add); final var lines = new ArrayList(); - int linesRead = 0; LOGGER.debug("Start getting GCS objects."); - while (linesRead <= numLines && !descendingTimestampBlobs.isEmpty()) { - final var poppedBlob = descendingTimestampBlobs.remove(0); - try (final var inMemoryData = new ByteArrayOutputStream()) { - poppedBlob.downloadTo(inMemoryData); - final var currFileLines = inMemoryData.toString(StandardCharsets.UTF_8).split("\n"); - final List currFileLinesReversed = Lists.reverse(List.of(currFileLines)); - for (final var line : currFileLinesReversed) { - if (linesRead == numLines) { - break; - } - lines.add(0, line); - linesRead++; + final var inMemoryData = new ByteArrayOutputStream(); + // iterate through blobs in descending order (oldest first) + for (var i = ascendingTimestampBlobs.size() - 1; i >= 0; i--) { + inMemoryData.reset(); + + final var blob = ascendingTimestampBlobs.get(i); + blob.downloadTo(inMemoryData); + + final String[] currFileLines = inMemoryData.toString(StandardCharsets.UTF_8).split("\n"); + // Iterate through the lines in reverse order. This ensures we keep the newer messages over the + // older messages if we hit the numLines limit. + for (var j = currFileLines.length - 1; j >= 0; j--) { + lines.add(currFileLines[j]); + if (lines.size() >= numLines) { + break; } } + + if (lines.size() >= numLines) { + break; + } } LOGGER.debug("Done retrieving GCS logs: {}.", logPath); - return lines; + // finally reverse the lines so they're returned in ascending order + return Lists.reverse(lines); } @Override @@ -117,9 +119,7 @@ public void deleteLogs(final LogConfigs configs, final String logPath) { LOGGER.debug("Start GCS list and delete request."); final Page blobs = gcsClient.list(configs.getStorageConfigs().getGcsConfig().getBucketName(), Storage.BlobListOption.prefix(logPath)); - for (final Blob blob : blobs.iterateAll()) { - blob.delete(BlobSourceOption.generationMatch()); - } + blobs.iterateAll().forEach(blob -> blob.delete(BlobSourceOption.generationMatch())); LOGGER.debug("Finished all deletes."); } @@ -130,4 +130,12 @@ private Storage getOrCreateGcsClient() { return gcs; } + /** + * This method exists only for unit testing purposes. + */ + @VisibleForTesting + static void resetGcs() { + gcs = null; + } + } diff --git a/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/GcsLogsIntTest.java b/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/GcsLogsIntTest.java new file mode 100644 index 00000000000..d81e53e2753 --- /dev/null +++ b/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/GcsLogsIntTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.config.helpers; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.google.cloud.storage.Storage; +import io.airbyte.config.EnvConfigs; +import io.airbyte.config.storage.CloudStorageConfigs; +import io.airbyte.config.storage.DefaultGcsClientFactory; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Slf4j +@Tag("logger-client") +class GcsLogsIntTest { + + private static Storage getClientFactory() { + return new DefaultGcsClientFactory(new CloudStorageConfigs.GcsConfig( + System.getenv(LogClientSingleton.GCS_LOG_BUCKET), + System.getenv(LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS))).get(); + } + + /** + * The test files here were generated by {@link S3LogsTest}. See that class for more information. + * + * Generate enough files to force pagination and confirm all data is read. + */ + @Test + void testRetrieveAllLogs() throws IOException { + final File data; + data = GcsLogs.getFile(getClientFactory(), (new EnvConfigs()).getLogConfigs(), "paginate", 6); + final var retrieved = new ArrayList(); + Files.lines(data.toPath()).forEach(retrieved::add); + + final var expected = List.of("Line 0", "Line 1", "Line 2", "Line 3", "Line 4", "Line 5", "Line 6", "Line 7", "Line 8"); + + assertEquals(expected, retrieved); + + } + + /** + * The test files for this test have been pre-generated and uploaded into the bucket folder. The + * folder contains the following files with these contents: + *
  • first-file.txt - Line 1, Line 2, Line 3
  • + *
  • second-file.txt - Line 4, Line 5, Line 6
  • + *
  • third-file.txt - Line 7, Line 8, Line 9
  • + */ + @Test + void testTail() throws IOException { + final var data = new GcsLogs(GcsLogsIntTest::getClientFactory).tailCloudLog((new EnvConfigs()).getLogConfigs(), "tail", 6); + + final var expected = List.of("Line 4", "Line 5", "Line 6", "Line 7", "Line 8", "Line 9"); + assertEquals(data, expected); + } + +} diff --git a/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/GcsLogsTest.java b/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/GcsLogsTest.java index 2eee0cb235b..d35d24b23b2 100644 --- a/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/GcsLogsTest.java +++ b/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/GcsLogsTest.java @@ -4,62 +4,208 @@ package io.airbyte.config.helpers; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import com.google.api.gax.paging.Page; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Blob.BlobSourceOption; import com.google.cloud.storage.Storage; -import io.airbyte.config.EnvConfigs; +import com.google.common.io.Files; import io.airbyte.config.storage.CloudStorageConfigs; -import io.airbyte.config.storage.DefaultGcsClientFactory; -import java.io.File; +import io.airbyte.config.storage.CloudStorageConfigs.GcsConfig; import java.io.IOException; -import java.nio.file.Files; -import java.util.ArrayList; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.List; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Tag; +import java.util.function.Consumer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; -@Slf4j -@Tag("logger-client") +/** + * GcsLogTest are unit tests that know a little too much about the internal workings of the GcsLogs + * class. Ideally this would integrate into some kind of GCS localstack or testcontainers. However, + * there doesn't appear to be a GCS localstack option and testcontainers does not have GCS Bucket + * support. + */ +@SuppressWarnings("PMD.AvoidDuplicateLiterals") class GcsLogsTest { - private static Storage getClientFactory() { - return new DefaultGcsClientFactory(new CloudStorageConfigs.GcsConfig( - System.getenv(LogClientSingleton.GCS_LOG_BUCKET), - System.getenv(LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS))).get(); + private static final String bucketName = "bucket"; + private static final String logPath = "/log/path"; + @Mock + Storage storage; + @Mock + LogConfigs logConfigs; + @Mock + CloudStorageConfigs cloudStorageConfigs; + @Mock + GcsConfig gcsConfig; + @Mock + Page page; + @Mock + Iterable iterable; + + private AutoCloseable closeable; + + @BeforeEach + void setup() { + closeable = MockitoAnnotations.openMocks(this); + when(logConfigs.getStorageConfigs()).thenReturn(cloudStorageConfigs); + when(cloudStorageConfigs.getGcsConfig()).thenReturn(gcsConfig); + when(gcsConfig.getBucketName()).thenReturn(bucketName); + } + + @AfterEach + void teardown() throws Exception { + GcsLogs.resetGcs(); + closeable.close(); } - /** - * The test files here were generated by {@link S3LogsTest}. See that class for more information. - * - * Generate enough files to force pagination and confirm all data is read. - */ @Test - void testRetrieveAllLogs() throws IOException { - final File data; - data = GcsLogs.getFile(getClientFactory(), (new EnvConfigs()).getLogConfigs(), "paginate", 6); - final var retrieved = new ArrayList(); - Files.lines(data.toPath()).forEach(retrieved::add); + void testTailCloudLog() throws IOException { + final var blob1 = mock(Blob.class); + final var blob2 = mock(Blob.class); + final var blob3 = mock(Blob.class); + + // Ensure the Blob mocks write to the outputstream that is passed to their downloadTo method. + // The first blob will contain the file contents: + // line 1 + // line 2 + // line 3 + // the second blob will contain the file contents: + // line 4 + // line 5 + // line 6 + // the third, and final, blob will contain the file contents: + // line 7 + // line 8 + // line 9 + doAnswer(i -> { + ((OutputStream) i.getArgument(0)).write("line 1\nline 2\nline 3\n".getBytes(StandardCharsets.UTF_8)); + return null; + }).when(blob1).downloadTo(Mockito.any(OutputStream.class)); - final var expected = List.of("Line 0", "Line 1", "Line 2", "Line 3", "Line 4", "Line 5", "Line 6", "Line 7", "Line 8"); + doAnswer(i -> { + ((OutputStream) i.getArgument(0)).write("line 4\nline 5\nline 6\n".getBytes(StandardCharsets.UTF_8)); + return null; + }).when(blob2).downloadTo(Mockito.any(OutputStream.class)); - assertEquals(expected, retrieved); + doAnswer(i -> { + ((OutputStream) i.getArgument(0)).write("line 7\nline 8\nline 9\n".getBytes(StandardCharsets.UTF_8)); + return null; + }).when(blob3).downloadTo(Mockito.any(OutputStream.class)); + when(storage.list(bucketName, Storage.BlobListOption.prefix(logPath))).thenReturn(page); + when(page.iterateAll()).thenReturn(iterable); + // Ensure the mock iterable's forEach method returns all three mocked blobs. + doAnswer(i -> { + ((Consumer) i.getArgument(0)).accept(blob1); + ((Consumer) i.getArgument(0)).accept(blob2); + ((Consumer) i.getArgument(0)).accept(blob3); + return null; + }).when(iterable).forEach(Mockito.any(Consumer.class)); + + final var gcsLogs = new GcsLogs(() -> storage); + + assertEquals(List.of("line 4", "line 5", "line 6", "line 7", "line 8", "line 9"), + gcsLogs.tailCloudLog(logConfigs, logPath, 6), + "the last 6 items should have been returned in the correct order"); + + assertEquals(List.of("line 9"), + gcsLogs.tailCloudLog(logConfigs, logPath, 1), + "the last item should have been returned in the correct order"); + + assertEquals(List.of("line 1", "line 2", "line 3", "line 4", "line 5", "line 6", "line 7", "line 8", "line 9"), + gcsLogs.tailCloudLog(logConfigs, logPath, 1000), + "all 9 items should have been returned in the correct order"); } - /** - * The test files for this test have been pre-generated and uploaded into the bucket folder. The - * folder contains the following files with these contents: - *
  • first-file.txt - Line 1, Line 2, Line 3
  • - *
  • second-file.txt - Line 4, Line 5, Line 6
  • - *
  • third-file.txt - Line 7, Line 8, Line 9
  • - */ @Test - void testTail() throws IOException { - final var data = new GcsLogs(GcsLogsTest::getClientFactory).tailCloudLog((new EnvConfigs()).getLogConfigs(), "tail", 6); + void testDeleteLogs() { + final var blob1 = mock(Blob.class); + final var blob2 = mock(Blob.class); + final var blob3 = mock(Blob.class); + + when(storage.list(bucketName, Storage.BlobListOption.prefix(logPath))).thenReturn(page); + when(page.iterateAll()).thenReturn(iterable); + // Ensure the mock iterable's forEach method returns all three mocked blobs. + doAnswer(i -> { + ((Consumer) i.getArgument(0)).accept(blob1); + ((Consumer) i.getArgument(0)).accept(blob2); + ((Consumer) i.getArgument(0)).accept(blob3); + return null; + }).when(iterable).forEach(Mockito.any(Consumer.class)); + + final var gcsLogs = new GcsLogs(() -> storage); + gcsLogs.deleteLogs(logConfigs, logPath); + // each Blob should have delete called on it + verify(blob1).delete(BlobSourceOption.generationMatch()); + verify(blob2).delete(BlobSourceOption.generationMatch()); + verify(blob3).delete(BlobSourceOption.generationMatch()); + } + + @Test + void testDownloadCloudLog() throws IOException { + final var blob1 = mock(Blob.class); + final var blob2 = mock(Blob.class); + final var blob3 = mock(Blob.class); + + // Ensure the Blob mocks write to the outputstream that is passed to their downloadTo method. + // The first blob will contain the file contents: + // line 1 + // line 2 + // line 3 + // the second blob will contain the file contents: + // line 4 + // line 5 + // line 6 + // the third, and final, blob will contain the file contents: + // line 7 + // line 8 + // line 9 + doAnswer(i -> { + ((OutputStream) i.getArgument(0)).write("line 1\nline 2\nline 3\n".getBytes(StandardCharsets.UTF_8)); + return null; + }).when(blob1).downloadTo(Mockito.any(OutputStream.class)); + + doAnswer(i -> { + ((OutputStream) i.getArgument(0)).write("line 4\nline 5\nline 6\n".getBytes(StandardCharsets.UTF_8)); + return null; + }).when(blob2).downloadTo(Mockito.any(OutputStream.class)); + + doAnswer(i -> { + ((OutputStream) i.getArgument(0)).write("line 7\nline 8\nline 9\n".getBytes(StandardCharsets.UTF_8)); + return null; + }).when(blob3).downloadTo(Mockito.any(OutputStream.class)); + + when(storage.list( + bucketName, + Storage.BlobListOption.prefix(logPath), + Storage.BlobListOption.pageSize(LogClientSingleton.DEFAULT_PAGE_SIZE))).thenReturn(page); + + when(page.iterateAll()).thenReturn(iterable); + // Ensure the mock iterable's forEach method returns all three mocked blobs. + doAnswer(i -> { + ((Consumer) i.getArgument(0)).accept(blob1); + ((Consumer) i.getArgument(0)).accept(blob2); + ((Consumer) i.getArgument(0)).accept(blob3); + return null; + }).when(iterable).forEach(Mockito.any(Consumer.class)); + + final var gcsLogs = new GcsLogs(() -> storage); + final var logs = gcsLogs.downloadCloudLog(logConfigs, logPath); + assertNotNull(logs, "log must not be null"); - final var expected = List.of("Line 4", "Line 5", "Line 6", "Line 7", "Line 8", "Line 9"); - assertEquals(data, expected); + final var expected = List.of("line 1", "line 2", "line 3", "line 4", "line 5", "line 6", "line 7", "line 8", "line 9"); + assertEquals(expected, Files.readLines(logs, StandardCharsets.UTF_8)); } } diff --git a/tools/bin/cloud_storage_logging_test.sh b/tools/bin/cloud_storage_logging_test.sh index 8d7b776354b..6e437758a7a 100755 --- a/tools/bin/cloud_storage_logging_test.sh +++ b/tools/bin/cloud_storage_logging_test.sh @@ -25,7 +25,7 @@ export GCS_LOG_BUCKET=airbyte-kube-integration-logging-test # Run the logging test first since the same client is used in the log4j2 integration test. echo "Running log client tests.." -./gradlew :airbyte-config:models:logClientsIntegrationTest --scan +./gradlew :airbyte-config:config-models:logClientsIntegrationTest --scan echo "Running cloud storage tests.." ./gradlew :airbyte-workers:cloudStorageIntegrationTest --scan