Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.nifi.processors.snowflake.snowpipe.InsertReport;

import java.io.IOException;
import java.net.Authenticator;
import java.net.HttpURLConnection;
import java.net.ProxySelector;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
Expand Down Expand Up @@ -75,6 +77,25 @@ class SnowpipeIngestClient implements AutoCloseable {
final URI baseUri,
final String pipeName,
final RSAKeyAuthorizationProvider authorizationProvider
) {
this(baseUri, pipeName, authorizationProvider, null, null);
}

/**
* Snowpipe Ingest Client with proxy support
*
* @param baseUri Base URI for the Snowpipe REST API
* @param pipeName Fully qualified pipe name
* @param authorizationProvider RSA Key Authorization Provider for JWT authentication
* @param proxySelector Optional proxy selector; {@code null} uses the system default
* @param proxyAuthenticator Optional authenticator for proxy credentials; {@code null} when not required
*/
SnowpipeIngestClient(
final URI baseUri,
final String pipeName,
final RSAKeyAuthorizationProvider authorizationProvider,
final ProxySelector proxySelector,
final Authenticator proxyAuthenticator
) {
Objects.requireNonNull(baseUri, "Base URI required");
Objects.requireNonNull(pipeName, "Pipe Name required");
Expand All @@ -87,9 +108,16 @@ class SnowpipeIngestClient implements AutoCloseable {
this.insertReportUri = baseUri.resolve(insertReportPath);

this.authorizationProvider = authorizationProvider;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(CONNECT_TIMEOUT)
.build();

final HttpClient.Builder builder = HttpClient.newBuilder()
.connectTimeout(CONNECT_TIMEOUT);
if (proxySelector != null) {
builder.proxy(proxySelector);
}
if (proxyAuthenticator != null) {
builder.authenticator(proxyAuthenticator);
}
this.httpClient = builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
Expand All @@ -32,14 +34,24 @@
import org.apache.nifi.processors.snowflake.snowpipe.InsertFiles;
import org.apache.nifi.processors.snowflake.snowpipe.InsertReport;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
import org.apache.nifi.snowflake.service.util.AccountIdentifierFormatParameters;
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;

import java.io.IOException;
import java.net.Authenticator;
import java.net.PasswordAuthentication;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.SocketAddress;
import java.net.URI;
import java.security.PrivateKey;
import java.security.interfaces.RSAPrivateCrtKey;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

@Tags({"snowflake", "snowpipe", "database", "connection"})
Expand Down Expand Up @@ -124,6 +136,19 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr
.required(true)
.build();

// Only HTTP proxy type is supported. SOCKS is silently ignored by java.net.http.HttpClient.
// For authenticated proxies: the JDK disables Basic auth over CONNECT tunneling by default
// (jdk.http.auth.tunneling.disabledSchemes=Basic). To use proxy credentials with an HTTPS
// target like Snowflake, add -Djdk.http.auth.tunneling.disabledSchemes="" to bootstrap.conf.
static final PropertyDescriptor PROXY_CONFIGURATION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP_AUTH))
.description("Specifies the Proxy Configuration Controller Service to proxy network requests."
+ " Only HTTP proxy type is supported."
+ " For authenticated proxy with HTTPS targets, the JDK disables Basic authentication"
+ " over CONNECT tunneling by default. To enable it, add"
+ " -Djdk.http.auth.tunneling.disabledSchemes=\"\" to bootstrap.conf.")
.build();

static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
ACCOUNT_IDENTIFIER_FORMAT,
HOST_URL,
Expand All @@ -136,7 +161,8 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr
PRIVATE_KEY_SERVICE,
DATABASE,
SCHEMA,
PIPE
PIPE,
PROXY_CONFIGURATION
);

private static final String HTTPS_URI_FORMAT = "https://%s";
Expand All @@ -147,6 +173,13 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr

private static final char HYPHEN = '-';

@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
ProxyConfiguration.validateProxySpec(context, results, ProxySpec.HTTP_AUTH);
return results;
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
Expand All @@ -173,7 +206,11 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE

final URI baseUri = URI.create(HTTPS_URI_FORMAT.formatted(hostNormalized));
final RSAKeyAuthorizationProvider authorizationProvider = new RSAKeyAuthorizationProvider(account, user, rsaPrivateKey);
ingestClient = new SnowpipeIngestClient(baseUri, qualifiedPipeName, authorizationProvider);

final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
final ProxySelector proxySelector = buildProxySelector(proxyConfiguration);
final Authenticator proxyAuthenticator = buildProxyAuthenticator(proxyConfiguration);
ingestClient = new SnowpipeIngestClient(baseUri, qualifiedPipeName, authorizationProvider, proxySelector, proxyAuthenticator);
} else {
throw new InitializationException("RSA Private Key not provided");
}
Expand Down Expand Up @@ -213,6 +250,37 @@ public void migrateProperties(PropertyConfiguration config) {
config.renameProperty(SnowflakeProperties.OLD_SCHEMA_PROPERTY_NAME, SnowflakeProperties.SCHEMA.getName());
}

private ProxySelector buildProxySelector(final ProxyConfiguration proxyConfiguration) {
final Proxy proxy = proxyConfiguration.createProxy();
return new ProxySelector() {
@Override
public List<Proxy> select(final URI uri) {
return List.of(proxy);
}

@Override
public void connectFailed(final URI uri, final SocketAddress sa, final IOException ioe) {
}
};
}

private Authenticator buildProxyAuthenticator(final ProxyConfiguration proxyConfiguration) {
if (!proxyConfiguration.hasCredential()) {
return null;
}
final String proxyUser = proxyConfiguration.getProxyUserName();
final String proxyPassword = proxyConfiguration.getProxyUserPassword();
return new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
if (getRequestorType() == RequestorType.PROXY) {
return new PasswordAuthentication(proxyUser, proxyPassword.toCharArray());
}
return super.getPasswordAuthentication();
}
};
}

private AccountIdentifierFormatParameters getAccountIdentifierFormatParameters(ConfigurationContext context) {
return new AccountIdentifierFormatParameters(
context.getProperty(HOST_URL).evaluateAttributeExpressions().getValue(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.SocketAddress;
import java.net.URI;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
Expand Down Expand Up @@ -264,6 +269,49 @@ void testGetInsertReportErrorResponse() {
assertTrue(exception.getMessage().contains(String.valueOf(HttpURLConnection.HTTP_INTERNAL_ERROR)));
}

@Test
void testInsertFilesViaProxy() throws InterruptedException, NoSuchAlgorithmException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses an HTTP base URI and a null authenticator, so it does not cover HTTPS tunneling or proxy credentials, which are the parts most likely to break. Can we add coverage for the authenticated proxy and HTTPS cases?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the proxy test, it now points the base URI to a different host (http://fake-snowflake.example.com) while the proxy selector points to MockWebServer. When Java's HttpClient routes through an HTTP proxy, it sends the request target in absolute form, so we assert the recorded request starts with http://fake-snowflake.example.com. This proves the proxy selector is actually being used. For the HTTPS + credentials case as mentioned in the comment above, this hits the JDK limitation with CONNECT tunneling and would need a full TLS proxy setup to test properly, which is more of an integration test scenario. The property description now documents what users need to configure to make it work.

// MockWebServer acts as the HTTP proxy. The base URI points to a distinct fake host
// so that the request only reaches MockWebServer if the proxy selector is honoured.
// When Java's HttpClient routes a request through an HTTP proxy it sends the target
// in absolute form (e.g. "http://fake-snowflake.example.com/v1/..."), which lets us
// assert that the proxy path was actually exercised.
mockWebServer.enqueue(new MockResponse.Builder()
.code(HttpURLConnection.HTTP_OK)
.addHeader(CONTENT_TYPE_HEADER, APPLICATION_JSON)
.body(INSERT_FILES_SUCCESS_RESPONSE)
.build());

final InetSocketAddress proxyAddress = new InetSocketAddress(mockWebServer.getHostName(), mockWebServer.getPort());
final Proxy proxy = new Proxy(Proxy.Type.HTTP, proxyAddress);
final ProxySelector proxySelector = new ProxySelector() {
@Override
public List<Proxy> select(final URI uri) {
return List.of(proxy);
}

@Override
public void connectFailed(final URI uri, final SocketAddress sa, final IOException ioe) {
}
};

// Intentionally different from the proxy address to prove routing goes via the proxy
final URI targetBaseUri = URI.create("http://fake-snowflake.example.com");
final RSAPrivateCrtKey privateKey = generatePrivateKey();
final RSAKeyAuthorizationProvider authProvider = new RSAKeyAuthorizationProvider(ACCOUNT, USER, privateKey);

try (final SnowpipeIngestClient proxyClient = new SnowpipeIngestClient(targetBaseUri, PIPE_NAME, authProvider, proxySelector, null)) {
proxyClient.insertFiles(new InsertFiles(List.of(new InsertFile(STAGED_FILE_PATH))));
}

final RecordedRequest request = mockWebServer.takeRequest();
// Absolute-form target confirms the request was routed through the configured proxy
final String target = request.getTarget();
assertNotNull(target);
assertTrue(target.startsWith("http://fake-snowflake.example.com"), "Expected absolute-form proxy target but got: " + target);
assertNotNull(request.getHeaders().get(AUTHORIZATION_HEADER));
}

private static RSAPrivateCrtKey generatePrivateKey() throws NoSuchAlgorithmException {
final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance(KEY_ALGORITHM);
final KeyPair keyPair = keyPairGenerator.generateKeyPair();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ void testMigrateProperties() {
assertEquals(expectedRenamed, propertiesRenamed);
}

@Test
void testProxyConfigurationServicePropertyPresent() {
final StandardSnowflakeIngestManagerProviderService service = new StandardSnowflakeIngestManagerProviderService();
final boolean hasProxyProperty = service.getSupportedPropertyDescriptors()
.stream()
.anyMatch(pd -> pd.getName().equals(StandardSnowflakeIngestManagerProviderService.PROXY_CONFIGURATION.getName()));
assertTrue(hasProxyProperty, "Proxy Configuration Service property should be present");
}

@Test
void testInsertFilesRequest() throws Exception {
mockWebServer.enqueue(new MockResponse.Builder()
Expand Down
Loading