Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
Restore support for autodiscovering credentials
Browse files Browse the repository at this point in the history
PR #228 removed support for autodiscovered credentials by replacing the same configuration
with a NoCredentials implementation.

This change introduces a new property, `gcs.credentials.default` which matches that
supported by the TieredStorageManager for configuring autodiscovery of default credentials.

When this property is set to `false`, or omitted entirely the NoCredentials option will be
used, but when it is defined and set to `true` it will autodiscover credentials.

Closes #291
  • Loading branch information
markallanson committed Jan 10, 2024
1 parent c971c32 commit afe80e2
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 20 deletions.
48 changes: 39 additions & 9 deletions src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -53,6 +54,7 @@ public final class GcsSinkConfig extends AivenCommonConfig {
public static final String GCS_CREDENTIALS_PATH_CONFIG = "gcs.credentials.path";
public static final String GCS_ENDPOINT_CONFIG = "gcs.endpoint";
public static final String GCS_CREDENTIALS_JSON_CONFIG = "gcs.credentials.json";
public static final String GCS_CREDENTIALS_DEFAULT_CONFIG = "gcs.credentials.default";
public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name";
public static final String GCS_USER_AGENT = "gcs.user.agent";
private static final String GROUP_FILE = "File";
Expand Down Expand Up @@ -114,16 +116,23 @@ private static void addGcsConfigGroup(final ConfigDef configDef) {
GCS_ENDPOINT_CONFIG);
configDef.define(GCS_CREDENTIALS_PATH_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW,
"The path to a GCP credentials file. "
+ "If not provided, the connector will try to detect the credentials automatically. "
+ "Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG + "\"",
+ "Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG + "\" "
+ "or \"" + GCS_CREDENTIALS_DEFAULT_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_PATH_CONFIG);

configDef.define(GCS_CREDENTIALS_JSON_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.LOW,
"GCP credentials as a JSON string. "
+ "If not provided, the connector will try to detect the credentials automatically. "
+ "Cannot be set together with \"" + GCS_CREDENTIALS_PATH_CONFIG + "\"",
+ "Cannot be set together with \"" + GCS_CREDENTIALS_PATH_CONFIG + "\" "
+ "or \"" + GCS_CREDENTIALS_DEFAULT_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_JSON_CONFIG);

configDef.define(GCS_CREDENTIALS_DEFAULT_CONFIG, ConfigDef.Type.BOOLEAN, null, ConfigDef.Importance.LOW,
"Whether to connect using default the GCP SDK default credential discovery. When set to"
+ "null (the default) or false, will fall back to connecting with No Credentials."
+ "Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG + "\" "
+ "or \"" + GCS_CREDENTIALS_PATH_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_DEFAULT_CONFIG);

configDef.define(GCS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH,
"The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD
Expand Down Expand Up @@ -315,25 +324,46 @@ static Map<String, String> handleDeprecatedYyyyUppercase(final Map<String, Strin
private void validate() {
final String credentialsPath = getString(GCS_CREDENTIALS_PATH_CONFIG);
final Password credentialsJson = getPassword(GCS_CREDENTIALS_JSON_CONFIG);
if (credentialsPath != null && credentialsJson != null) {
final String msg = String.format("\"%s\" and \"%s\" are mutually exclusive options, but both are set.",
GCS_CREDENTIALS_PATH_CONFIG, GCS_CREDENTIALS_JSON_CONFIG);
throw new ConfigException(msg);
final Boolean defaultCredentials = getBoolean(GCS_CREDENTIALS_DEFAULT_CONFIG);

final long nonNulls = Stream.of(defaultCredentials, credentialsJson, credentialsPath)
.filter(Objects::nonNull).count();

// only validate non nulls here, since all nulls means falling back to the default "no credential" behavour.
if (nonNulls > 1) {
throw new ConfigException(
String.format(
"Only one of %s, %s, and %s can be non-null.",
GCS_CREDENTIALS_DEFAULT_CONFIG,
GCS_CREDENTIALS_JSON_CONFIG,
GCS_CREDENTIALS_PATH_CONFIG
)
);
}
}

public OAuth2Credentials getCredentials() {
final String credentialsPath = getString(GCS_CREDENTIALS_PATH_CONFIG);
final Password credentialsJsonPwd = getPassword(GCS_CREDENTIALS_JSON_CONFIG);
if (credentialsPath == null && credentialsJsonPwd == null) {
final Boolean defaultCredentials = getBoolean(GCS_CREDENTIALS_DEFAULT_CONFIG);

// if we've got no path, json and not configured to use default credentials, fall back to connecting without
// any credentials at all.
if (credentialsPath == null
&& credentialsJsonPwd == null
&& (defaultCredentials == null || !defaultCredentials)
) {
LOG.warn("No GCS credentials provided, trying to connect without credentials.");
return NoCredentials.getInstance();
}

try {
String credentialsJson = null;
if (credentialsJsonPwd != null) {
credentialsJson = credentialsJsonPwd.value();
}
// in the case where both path and json are empty the GoogleCredentialsBuilder will fall back to
// connecting with default credential discovery.
return GoogleCredentialsBuilder.build(credentialsPath, credentialsJson);
} catch (final Exception e) { // NOPMD broad exception catched
throw new ConfigException("Failed to create GCS credentials: " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package io.aiven.kafka.connect.gcs.config;

import static io.aiven.kafka.connect.gcs.GcsSinkConfig.*;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;

import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -50,13 +53,21 @@
import io.aiven.kafka.connect.common.templating.Template;
import io.aiven.kafka.connect.common.templating.VariableTemplatePart;
import io.aiven.kafka.connect.gcs.GcsSinkConfig;
import io.aiven.kafka.connect.gcs.GoogleCredentialsBuilder;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.auth.oauth2.UserCredentials;
import com.google.cloud.NoCredentials;
import com.google.common.io.Resources;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.threeten.bp.Duration;

/**
Expand Down Expand Up @@ -379,25 +390,89 @@ void gcsCredentialsJson() throws IOException {
assertEquals("test-client-secret", credentials.getClientSecret());
}

/**
* This test validates that the NoCredentials are used when default is specified as false. This behaviour
* mimics that of the Tiered Storage Manager.
*/
@Test
void gcsCredentialsExclusivity() throws IOException {
final Map<String, String> properties = new HashMap<>();
properties.put("gcs.bucket.name", "test-bucket");
void gcsCredentialsNoCredentialsWhenDefaultCredentialsFalse() {
final Map<String, String> properties = Map.of(
GCS_BUCKET_NAME_CONFIG, "test-bucket",
GCS_CREDENTIALS_DEFAULT_CONFIG, String.valueOf(false)
);

final URL credentialsResource = Thread.currentThread()
.getContextClassLoader()
.getResource("test_gcs_credentials.json");
final String credentialsJson = Resources.toString(credentialsResource, StandardCharsets.UTF_8);
properties.put("gcs.credentials.json", credentialsJson);
properties.put("gcs.credentials.path", credentialsResource.getPath());
assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);

final Credentials credentials = config.getCredentials();
assertEquals(NoCredentials.getInstance(), credentials);
}

/** Verifies that NoCredentials are used when no credential configurations is supplied. */
@Test
void gcsCredentialsNoCredentialsWhenNoCredentialsSupplied() {
final Map<String, String> properties = Map.of(GCS_BUCKET_NAME_CONFIG, "test-bucket");

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);

final Credentials credentials = config.getCredentials();
assertEquals(NoCredentials.getInstance(), credentials);
}

@Test
void gcsCredentialsDefault() {
final Map<String, String> properties = Map.of(
GCS_BUCKET_NAME_CONFIG, "test-bucket",
GCS_CREDENTIALS_DEFAULT_CONFIG, String.valueOf(true)
);

assertConfigDefValidationPasses(properties);

try (MockedStatic<GoogleCredentialsBuilder> mocked = mockStatic(GoogleCredentialsBuilder.class)) {
final GoogleCredentials googleCredentials = mock(GoogleCredentials.class);
mocked.when(() -> GoogleCredentialsBuilder.build(null, null)).thenReturn(googleCredentials);

final GcsSinkConfig config = new GcsSinkConfig(properties);

final OAuth2Credentials credentials = config.getCredentials();
assertEquals(googleCredentials, credentials);
}
}

@ParameterizedTest
@MethodSource("provideMoreThanOneNonNull")
void gcsCredentialsExclusivity(
final Boolean defaultCredentials, final String credentialsJson, final String credentialsPath
) {
final Map<String, String> properties = new HashMap<>();
properties.put(GCS_BUCKET_NAME_CONFIG, "test-bucket");
properties.put(GCS_CREDENTIALS_DEFAULT_CONFIG, defaultCredentials == null ? null : String.valueOf(defaultCredentials));
properties.put(GCS_CREDENTIALS_JSON_CONFIG, credentialsJson);
properties.put(GCS_CREDENTIALS_PATH_CONFIG, credentialsPath);

// Should pass here, because ConfigDef validation doesn't check interdependencies.
assertConfigDefValidationPasses(properties);

final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties));
assertEquals(
"\"gcs.credentials.path\" and \"gcs.credentials.json\" are mutually exclusive options, but both are set.",
throwable.getMessage());
"Only one of gcs.credentials.default, gcs.credentials.json, and gcs.credentials.path can be non-null.",
throwable.getMessage()
);
}

private static Stream<Arguments> provideMoreThanOneNonNull() {
return Stream.of(
Arguments.of(true, "json", "path"),
Arguments.of(false, "json", "path"),
Arguments.of(true, "json", null),
Arguments.of(false, "json", null),
Arguments.of(true, null, "path"),
Arguments.of(false, null, "path"),
Arguments.of(null, "json", "path")
);
}

@Test
Expand Down

0 comments on commit afe80e2

Please sign in to comment.