diff --git a/docs/HTTP-batchsink.md b/docs/HTTP-batchsink.md index 66cae443..96178c0a 100644 --- a/docs/HTTP-batchsink.md +++ b/docs/HTTP-batchsink.md @@ -8,12 +8,28 @@ Sink plugin to send the messages from the pipeline to an external http endpoint. Properties ---------- -**url:** The URL to post data to. (Macro enabled) +**url:** The URL to post data to. Additionally, a placeholder like #columnName can be added to the URL that can be +substituted with column value at the runtime. E.g. https://customer-url/user/#user_id. Here user_id column should exist +in input schema. (Macro enabled) **method:** The HTTP request method. Defaults to POST. (Macro enabled) **batchSize:** Batch size. Defaults to 1. (Macro enabled) +**Write JSON As Array:** Whether to write the JSON as an array. Defaults to false. (Macro enabled) + +When set to true, the payload will be written as an array of JSON objects. +Example - If batch size is 2, then the payload will be `[{"key":"val"}, {"key":"val"}]` + +When false, the payload will be JSON objects separated by a delimiter. +Example - If batch size is 2, delimiter is "\n" , then the payload will be `{"key":"val"}\n{"key":"val"}` + +**Json Batch Key** Optional key to be used for wrapping json array as object +Leave empty for no wrapping of the array. Ignored if Write JSON As Array is set to false. (Macro Enabled) + +Example - If batch size is 2 and json batch key is "data", then the payload will be +`{"data": [{"key":"val"}, {"key":"val"}]}` instead of `[{"key":"val"}, {"key":"val"}]` + **messageFormat:** Format to send messsage in. Options are JSON, Form, Custom. Defaults to JSON. (Macro enabled) **body:** Optional custom message. This is required if the message format is set to 'Custom'. @@ -33,13 +49,49 @@ delimited by a colon (":") and each pair is delimited by a newline ("\n"). (Macr **disableSSLValidation:** If user enables SSL validation, they will be expected to add the certificate to the trustStore on each machine. Defaults to true. (Macro enabled) -**numRetries:** The number of times the request should be retried if the request fails. Defaults to 3. (Macro enabled) +### Error Handling + +**HTTP Errors Handling:** Defines the error handling strategy to use for certain HTTP response codes. +The left column contains a regular expression for HTTP status code. The right column contains an action which +is done in case of match. If HTTP status code matches multiple regular expressions, the first specified in mapping +is matched. + +Example: + +| HTTP Code Regexp | Error Handling | +| ----------------- |:-----------------------:| +| 2.. | Success | +| 401 | Retry and fail | +| 4.. | Fail | +| 5.. | Retry and send to error | +| .* | Fail | + +Note: pagination types "Link in response header", "Link in response body", "Token in response body" do not support +"Send to error", "Skip", "Retry and send to error", "Retry and skip" options. + +**Non-HTTP Error Handling:** Error handling strategy to use when the HTTP response cannot be transformed to an output record. +Possible values are: +Stop on error - Fails pipeline due to erroneous record. +Send to error - Sends erroneous record's text to error port and continues. +Skip on error - Ignores erroneous records. + +**Retry Policy:** Policy used to calculate delay between retries. Default Retry Policy is Exponential. + +**Linear Retry Interval:** Interval in seconds between retries. Is only used if retry policy is "linear". + +**Max Retry Duration:** Maximum time in seconds retries can take. Default value is 600 seconds (10 minute). **connectTimeout:** The time in milliseconds to wait for a connection. Set to 0 for infinite. Defaults to 60000 (1 minute). (Macro enabled) **readTimeout:** The time in milliseconds to wait for a read. Set to 0 for infinite. Defaults to 60000 (1 minute). (Macro enabled) -**failOnNon200Response** Whether to fail the pipeline on non-200 response from the http end point. Defaults to true. (Macro enabled) +### HTTP Proxy + +**Proxy URL:** Proxy URL. Must contain a protocol, address and port. + +**Username:** Proxy username. + +**Password:** Proxy password. Example ------- @@ -56,7 +108,6 @@ This example performs HTTP POST request to http://example.com/data. "charset": "UTF-8", "followRedirects": "true", "disableSSLValidation": "true", - "numRetries": 0, "connectTimeout": 60000, "readTimeout": 60000 } diff --git a/pom.xml b/pom.xml index 44f9d3ef..84877b2c 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ HTTP Plugins io.cdap http-plugins - 1.2.8-SNAPSHOT + 1.2.8 @@ -80,13 +80,14 @@ 3.9 1.12 2.8.5 + 0.4.0 2.3.0 4.5.9 - 2.3.0-SNAPSHOT + 2.10.0-SNAPSHOT 2.9.9 4.11 2.7.1 - 1.10.19 + 2.24.0 2.1.3 0.9 1.49 @@ -324,6 +325,17 @@ httpclient ${httpcomponents.version} + + com.google.auth + google-auth-library-oauth2-http + ${googleauth.version} + + + com.google.guava + guava + + + com.google.code.gson gson @@ -416,10 +428,22 @@ org.mockito - mockito-all + mockito-core ${mockito.version} test + + org.powermock + powermock-module-junit4 + 2.0.2 + test + + + org.powermock + powermock-api-mockito2 + 2.0.2 + test + com.github.tomakehurst wiremock diff --git a/src/main/java/io/cdap/plugin/http/common/BaseHttpConfig.java b/src/main/java/io/cdap/plugin/http/common/BaseHttpConfig.java new file mode 100644 index 00000000..02ca59f0 --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/common/BaseHttpConfig.java @@ -0,0 +1,396 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.http.common; + +import com.google.auth.oauth2.AccessToken; +import com.google.common.base.Strings; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; +import io.cdap.plugin.common.ReferencePluginConfig; +import io.cdap.plugin.http.common.http.AuthType; +import io.cdap.plugin.http.common.http.OAuthUtil; + +import java.io.File; +import java.util.Optional; +import javax.annotation.Nullable; + + +/** + * Base configuration for HTTP Source and HTTP Sink + */ +public abstract class BaseHttpConfig extends ReferencePluginConfig { + + public static final String PROPERTY_AUTH_TYPE = "authType"; + public static final String PROPERTY_OAUTH2_ENABLED = "oauth2Enabled"; + public static final String PROPERTY_AUTH_URL = "authUrl"; + public static final String PROPERTY_TOKEN_URL = "tokenUrl"; + public static final String PROPERTY_CLIENT_ID = "clientId"; + public static final String PROPERTY_CLIENT_SECRET = "clientSecret"; + public static final String PROPERTY_SCOPES = "scopes"; + public static final String PROPERTY_REFRESH_TOKEN = "refreshToken"; + public static final String PROPERTY_PROXY_URL = "proxyUrl"; + public static final String PROPERTY_PROXY_USERNAME = "proxyUsername"; + public static final String PROPERTY_PROXY_PASSWORD = "proxyPassword"; + + public static final String PROPERTY_AUTH_TYPE_LABEL = "Auth type"; + + public static final String PROPERTY_USERNAME = "username"; + + public static final String PROPERTY_PASSWORD = "password"; + + public static final String PROPERTY_NAME_SERVICE_ACCOUNT_TYPE = "serviceAccountType"; + + public static final String PROPERTY_NAME_SERVICE_ACCOUNT_FILE_PATH = "serviceAccountFilePath"; + + public static final String PROPERTY_NAME_SERVICE_ACCOUNT_JSON = "serviceAccountJSON"; + + public static final String PROPERTY_SERVICE_ACCOUNT_FILE_PATH = "filePath"; + + public static final String PROPERTY_SERVICE_ACCOUNT_JSON = "JSON"; + + public static final String PROPERTY_AUTO_DETECT_VALUE = "auto-detect"; + + public static final String PROPERTY_SERVICE_ACCOUNT_SCOPE = "serviceAccountScope"; + + @Name(PROPERTY_AUTH_TYPE) + @Description("Type of authentication used to submit request. \n" + + "OAuth2, Service account, Basic Authentication types are available.") + protected String authType; + + @Name(PROPERTY_OAUTH2_ENABLED) + @Description("If true, plugin will perform OAuth2 authentication.") + @Nullable + protected String oauth2Enabled; + + @Nullable + @Name(PROPERTY_AUTH_URL) + @Description("Endpoint for the authorization server used to retrieve the authorization code.") + @Macro + protected String authUrl; + + @Nullable + @Name(PROPERTY_TOKEN_URL) + @Description("Endpoint for the resource server, which exchanges the authorization code for an access token.") + @Macro + protected String tokenUrl; + + @Nullable + @Name(PROPERTY_PROXY_URL) + @Description("Proxy URL. Must contain a protocol, address and port.") + @Macro + protected String proxyUrl; + + @Nullable + @Name(PROPERTY_PROXY_USERNAME) + @Description("Proxy username.") + @Macro + protected String proxyUsername; + + @Nullable + @Name(PROPERTY_PROXY_PASSWORD) + @Description("Proxy password.") + @Macro + protected String proxyPassword; + + @Nullable + @Name(PROPERTY_CLIENT_ID) + @Description("Client identifier obtained during the Application registration process.") + @Macro + protected String clientId; + + @Nullable + @Name(PROPERTY_CLIENT_SECRET) + @Description("Client secret obtained during the Application registration process.") + @Macro + protected String clientSecret; + + @Nullable + @Name(PROPERTY_SCOPES) + @Description("Scope of the access request, which might have multiple space-separated values.") + @Macro + protected String scopes; + + @Nullable + @Name(PROPERTY_REFRESH_TOKEN) + @Description("Token used to receive accessToken, which is end product of OAuth2.") + @Macro + protected String refreshToken; + + @Nullable + @Name(PROPERTY_USERNAME) + @Description("Username for basic authentication.") + @Macro + protected String username; + + @Nullable + @Name(PROPERTY_PASSWORD) + @Description("Password for basic authentication.") + @Macro + protected String password; + + @Name(PROPERTY_NAME_SERVICE_ACCOUNT_TYPE) + @Description("Service account type, file path where the service account is located or the JSON content of the " + + "service account.") + @Nullable + protected String serviceAccountType; + + @Nullable + @Macro + @Name(PROPERTY_NAME_SERVICE_ACCOUNT_FILE_PATH) + @Description("Path on the local file system of the service account key used for authorization. " + + "Can be set to 'auto-detect' for getting service account from system variable. " + + "The file/system variable must be present on every node in the cluster. " + + "Service account json can be generated on Google Cloud " + + "Service Account page (https://console.cloud.google.com/iam-admin/serviceaccounts).") + protected String serviceAccountFilePath; + + @Name(PROPERTY_NAME_SERVICE_ACCOUNT_JSON) + @Description("Content of the service account file.") + @Nullable + @Macro + protected String serviceAccountJson; + + @Nullable + @Name(PROPERTY_SERVICE_ACCOUNT_SCOPE) + @Description("The additional Google credential scopes required to access entered url, " + + "cloud-platform is included by default, " + + "visit https://developers.google.com/identity/protocols/oauth2/scopes " + + "for more information.") + @Macro + protected String serviceAccountScope; + + public BaseHttpConfig(String referenceName) { + super(referenceName); + } + + public AuthType getAuthType() { + return AuthType.fromValue(authType); + } + + public String getAuthTypeString() { + return authType; + } + + public Boolean getOauth2Enabled() { + return Boolean.parseBoolean(oauth2Enabled); + } + + public String getOAuth2Enabled() { + return oauth2Enabled; + } + + @Nullable + public String getAuthUrl() { + return authUrl; + } + + @Nullable + public String getTokenUrl() { + return tokenUrl; + } + + @Nullable + public String getClientId() { + return clientId; + } + + @Nullable + public String getClientSecret() { + return clientSecret; + } + + @Nullable + public String getScopes() { + return scopes; + } + + @Nullable + public String getRefreshToken() { + return refreshToken; + } + + @Nullable + public String getUsername() { + return username; + } + + @Nullable + public String getPassword() { + return password; + } + + public void setServiceAccountType(String serviceAccountType) { + this.serviceAccountType = serviceAccountType; + } + + @Nullable + public String getServiceAccountType() { + return serviceAccountType; + } + + public void setServiceAccountJson(String serviceAccountJson) { + this.serviceAccountJson = serviceAccountJson; + } + + public void setAuthType(String authType) { + this.authType = authType; + } + + @Nullable + public String getServiceAccountJson() { + return serviceAccountJson; + } + + public void setServiceAccountFilePath(String serviceAccountFilePath) { + this.serviceAccountFilePath = serviceAccountFilePath; + } + + @Nullable + public String getServiceAccountFilePath() { + return serviceAccountFilePath; + } + + @Nullable + public String getServiceAccountScope() { + return serviceAccountScope; + } + + @Nullable + public Boolean isServiceAccountJson() { + String serviceAccountType = getServiceAccountType(); + return Strings.isNullOrEmpty(serviceAccountType) ? null : + serviceAccountType.equals(PROPERTY_SERVICE_ACCOUNT_JSON); + } + + @Nullable + public String getProxyUrl() { + return proxyUrl; + } + + @Nullable + public String getProxyUsername() { + return proxyUsername; + } + + @Nullable + public String getProxyPassword() { + return proxyPassword; + } + + @Nullable + public Boolean isServiceAccountFilePath() { + String serviceAccountType = getServiceAccountType(); + return Strings.isNullOrEmpty(serviceAccountType) ? null : + serviceAccountType.equals(PROPERTY_SERVICE_ACCOUNT_FILE_PATH); + } + + public boolean validateServiceAccount(FailureCollector collector) { + if (containsMacro(PROPERTY_NAME_SERVICE_ACCOUNT_FILE_PATH) || + containsMacro(PROPERTY_NAME_SERVICE_ACCOUNT_JSON)) { + return false; + } + final Boolean bServiceAccountFilePath = isServiceAccountFilePath(); + final Boolean bServiceAccountJson = isServiceAccountJson(); + + // we don't want the validation to fail because the VM used during the validation + // may be different from the VM used during runtime and may not have the Google Drive Api scope. + if (bServiceAccountFilePath && PROPERTY_AUTO_DETECT_VALUE.equalsIgnoreCase(serviceAccountFilePath)) { + return false; + } + + if (bServiceAccountFilePath != null && bServiceAccountFilePath) { + if (!PROPERTY_AUTO_DETECT_VALUE.equals(serviceAccountFilePath) && + !new File(serviceAccountFilePath).exists()) { + collector.addFailure("Service Account File Path is not available.", + "Please provide path to existing Service Account file.") + .withConfigProperty(PROPERTY_NAME_SERVICE_ACCOUNT_FILE_PATH); + } + } + if (bServiceAccountJson != null && bServiceAccountJson) { + if (!Optional.ofNullable(getServiceAccountJson()).isPresent()) { + collector.addFailure("Service Account JSON can not be empty.", + "Please provide Service Account JSON.") + .withConfigProperty(PROPERTY_NAME_SERVICE_ACCOUNT_JSON); + } + } + return collector.getValidationFailures().size() == 0; + } + + public void validate(FailureCollector failureCollector) { + // Validate OAuth2 properties + if (!containsMacro(PROPERTY_OAUTH2_ENABLED) && this.getOauth2Enabled()) { + String reasonOauth2 = "OAuth2 is enabled"; + assertIsSet(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2); + assertIsSet(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2); + assertIsSet(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2); + assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2); + } + + // Validate Authentication properties + AuthType authType = getAuthType(); + switch (authType) { + case OAUTH2: + String reasonOauth2 = "OAuth2 is enabled"; + if (!containsMacro(PROPERTY_TOKEN_URL)) { + assertIsSet(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2); + } + if (!containsMacro(PROPERTY_CLIENT_ID)) { + assertIsSet(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2); + } + if (!containsMacro((PROPERTY_CLIENT_SECRET))) { + assertIsSet(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2); + } + if (!containsMacro(PROPERTY_REFRESH_TOKEN)) { + assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2); + } + break; + case SERVICE_ACCOUNT: + String reasonSA = "Service Account is enabled"; + assertIsSet(getServiceAccountType(), PROPERTY_NAME_SERVICE_ACCOUNT_TYPE, reasonSA); + boolean propertiesAreValid = validateServiceAccount(failureCollector); + if (propertiesAreValid) { + try { + AccessToken accessToken = OAuthUtil.getAccessToken(this); + } catch (Exception e) { + failureCollector.addFailure("Unable to authenticate given service account info. ", + "Please make sure all infomation entered correctly") + .withStacktrace(e.getStackTrace()); + } + } + break; + case BASIC_AUTH: + String reasonBasicAuth = "Basic Authentication is enabled"; + if (!containsMacro(PROPERTY_USERNAME)) { + assertIsSet(getUsername(), PROPERTY_USERNAME, reasonBasicAuth); + } + if (!containsMacro(PROPERTY_PASSWORD)) { + assertIsSet(getPassword(), PROPERTY_PASSWORD, reasonBasicAuth); + } + break; + } + } + + public static void assertIsSet(Object propertyValue, String propertyName, String reason) { + if (propertyValue == null) { + throw new InvalidConfigPropertyException( + String.format("Property '%s' must be set, since %s", propertyName, reason), propertyName); + } + } +} diff --git a/src/main/java/io/cdap/plugin/http/common/exceptions/InvalidPropertyTypeException.java b/src/main/java/io/cdap/plugin/http/common/exceptions/InvalidPropertyTypeException.java new file mode 100644 index 00000000..20d9bdc6 --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/common/exceptions/InvalidPropertyTypeException.java @@ -0,0 +1,30 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.http.common.exceptions; + +import java.util.List; + +/** + * Indicates illegal property. + */ +public class InvalidPropertyTypeException extends RuntimeException { + + public InvalidPropertyTypeException(String propertyLabel, String value, List allowedValues) { + super(String.format("'%s' is not a value for '%s' property. Allowed values are: '%s'.", value, propertyLabel, + String.join(", ", allowedValues))); + } +} + diff --git a/src/main/java/io/cdap/plugin/http/common/http/AuthType.java b/src/main/java/io/cdap/plugin/http/common/http/AuthType.java new file mode 100644 index 00000000..93121d90 --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/common/http/AuthType.java @@ -0,0 +1,63 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.http.common.http; + + +import io.cdap.plugin.http.common.exceptions.InvalidPropertyTypeException; +import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Enum for different authentication methods. + */ +public enum AuthType { + NONE("none"), + OAUTH2("oAuth2"), + SERVICE_ACCOUNT("serviceAccount"), + BASIC_AUTH("basicAuth"); + + private final String value; + + AuthType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + /** + * Returns the AuthType. + * + * @param value the value is string type. + * @return The AuthType + */ + public static AuthType fromValue(String value) { + return Arrays.stream(AuthType.values()).filter(authtype -> authtype.getValue().equals(value)) + .findAny().orElseThrow(() -> new InvalidPropertyTypeException(BaseHttpSourceConfig.PROPERTY_AUTH_TYPE_LABEL, + value, getAllowedValues())); + } + + public static List getAllowedValues() { + return Arrays.stream(AuthType.values()).map(v -> v.getValue()) + .collect(Collectors.toList()); + } +} + diff --git a/src/main/java/io/cdap/plugin/http/common/http/HttpRequest.java b/src/main/java/io/cdap/plugin/http/common/http/HttpRequest.java new file mode 100644 index 00000000..f2be239c --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/common/http/HttpRequest.java @@ -0,0 +1,38 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.http.common.http; + +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; + +import java.net.URI; + +/** + * This class allows us to send body not only in POST/PUT but also in other requests. + */ +public class HttpRequest extends HttpEntityEnclosingRequestBase { + private final String methodName; + + public HttpRequest(URI uri, String methodName) { + super(); + this.setURI(uri); + this.methodName = methodName; + } + + @Override + public String getMethod() { + return methodName; + } +} diff --git a/src/main/java/io/cdap/plugin/http/common/http/OAuthUtil.java b/src/main/java/io/cdap/plugin/http/common/http/OAuthUtil.java new file mode 100644 index 00000000..251d80e4 --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/common/http/OAuthUtil.java @@ -0,0 +1,183 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.http.common.http; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.gson.JsonElement; +import io.cdap.plugin.http.common.BaseHttpConfig; +import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; +import io.cdap.plugin.http.source.common.pagination.page.JSONUtil; + +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Date; +import javax.annotation.Nullable; + +/** + * A class which contains utilities to make OAuth2 specific calls. + */ +public class OAuthUtil { + + /** + * Get Authorization header based on the config parameters provided + * + * @param config + * @return + * @throws IOException while creating the AccessToken + */ + @Nullable + public static AccessToken getAccessToken(BaseHttpConfig config) throws IOException { + + // auth check + AuthType authType = config.getAuthType(); + + // backward compatibility + if (config.getOauth2Enabled()) { + authType = AuthType.OAUTH2; + } + + switch (authType) { + case SERVICE_ACCOUNT: + // get accessToken from service account + return OAuthUtil.getAccessTokenByServiceAccount(config); + case OAUTH2: + try (CloseableHttpClient client = HttpClients.createDefault()) { + return OAuthUtil.getAccessTokenByRefreshToken(client, config); + } + } + return null; + } + + /** + * Returns true only if the expiration time set in the accessToken is before the current time. + * @param accessToken AccessToken instance + * @return TRUE if expiration time < current system time + * FALSE if the accessToken is null + * FALSE if the accessToken does not contain an expirationTime + */ + public static boolean tokenExpired(AccessToken accessToken) { + if (accessToken != null) { + Date expiryTime = accessToken.getExpirationTime(); + if (expiryTime != null) { + if (Date.from(Instant.now()).after(expiryTime)) { + return true; + } + } + } + + return false; + } + + /** + * Get the Access Token using the Refresh Token. The AccessToken returned has a valid expiration time if the + * token URL used to create the token returned a valid expires_in detail in the response. + * + * @param httpclient + * @param config + * @return + * @throws IOException + */ + public static AccessToken getAccessTokenByRefreshToken(CloseableHttpClient httpclient, + BaseHttpConfig config) throws IOException { + URI uri; + try { + uri = new URIBuilder(config.getTokenUrl()) + .setParameter("client_id", config.getClientId()) + .setParameter("client_secret", config.getClientSecret()) + .setParameter("refresh_token", config.getRefreshToken()) + .setParameter("grant_type", "refresh_token") + .build(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Failed to build token URI for OAuth2", e); + } + + HttpPost httppost = new HttpPost(uri); + CloseableHttpResponse response = httpclient.execute(httppost); + String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); + + JsonElement accessTokenElement = JSONUtil.toJsonObject(responseString).get("access_token"); + if (accessTokenElement == null) { + throw new IllegalArgumentException("Access token not found"); + } + + JsonElement expiresInElement = JSONUtil.toJsonObject(responseString).get("expires_in"); + Date expiresInDate = null; + if (expiresInElement != null) { + long expiresAtMilliseconds = System.currentTimeMillis() + + (long) (expiresInElement.getAsInt() * 1000) - 60000L; + expiresInDate = new Date(expiresAtMilliseconds); + } + + return new AccessToken(accessTokenElement.getAsString(), expiresInDate); + } + + /** + * Get the Access Token using the Service Account details from the config object. The AccessToken returned has a + * valid ExpirationTime set in the returned AccessToken instance. + * @param config + * @return + * @throws IOException + */ + private static AccessToken getAccessTokenByServiceAccount(BaseHttpConfig config) throws IOException { + try { + GoogleCredentials credential; + ImmutableSet scopeSet = ImmutableSet.of("https://www.googleapis.com/auth/cloud-platform"); + if (config.getServiceAccountScope() != null) { + String[] scopes = config.getServiceAccountScope().split("\n"); + for (String scope: scopes) { + scopeSet = ImmutableSet.builder().addAll(scopeSet).add(scope).build(); + } + } + credential = getGoogleCredentials(config).createScoped(scopeSet); + return credential.refreshAccessToken(); + } catch (Exception e) { + throw new IllegalArgumentException( + "Failed to generate Credentials with the given Service Account information", e); + } + } + + private static GoogleCredentials getGoogleCredentials(BaseHttpConfig config) throws IOException { + GoogleCredentials credential; + if (config.isServiceAccountJson()) { + InputStream jsonInputStream = new ByteArrayInputStream(config.getServiceAccountJson() + .getBytes(StandardCharsets.UTF_8)); + credential = GoogleCredentials.fromStream(jsonInputStream); + } else if (config.isServiceAccountFilePath() && !Strings.isNullOrEmpty(config.getServiceAccountFilePath()) + && !BaseHttpSourceConfig.PROPERTY_AUTO_DETECT_VALUE.equals(config.getServiceAccountFilePath())) { + credential = GoogleCredentials.fromStream(new FileInputStream(config.getServiceAccountFilePath())); + } else { + credential = GoogleCredentials.getApplicationDefault(); + } + return credential; + } +} + diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java index 559e0e71..867d46be 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java @@ -18,6 +18,7 @@ import com.google.gson.Gson; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -25,18 +26,23 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import java.io.IOException; + /** * OutputFormat for HTTP writing */ public class HTTPOutputFormat extends OutputFormat { private static final Gson GSON = new Gson(); static final String CONFIG_KEY = "http.sink.config"; + static final String INPUT_SCHEMA_KEY = "http.sink.input.schema"; @Override - public RecordWriter getRecordWriter(TaskAttemptContext context) { + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { Configuration hConf = context.getConfiguration(); HTTPSinkConfig config = GSON.fromJson(hConf.get(CONFIG_KEY), HTTPSinkConfig.class); - return new HTTPRecordWriter(config); + Schema inputSchema = Schema.parseJson(hConf.get(INPUT_SCHEMA_KEY)); + return new HTTPRecordWriter(config, inputSchema); } @Override diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java index b5851e37..eeb89406 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java @@ -16,30 +16,57 @@ package io.cdap.plugin.http.sink.batch; +import com.google.auth.oauth2.AccessToken; +import com.google.common.base.Charsets; +import com.google.common.base.Strings; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; -import io.cdap.cdap.format.StructuredRecordStringConverter; +import io.cdap.plugin.http.common.http.HttpRequest; +import io.cdap.plugin.http.common.http.OAuthUtil; +import io.cdap.plugin.http.source.common.RetryPolicy; +import io.cdap.plugin.http.source.common.error.HttpErrorHandler; +import io.cdap.plugin.http.source.common.error.RetryableErrorHandling; + import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.message.BasicHeader; +import org.awaitility.Awaitility; +import org.awaitility.Duration; +import org.awaitility.pollinterval.FixedPollInterval; +import org.awaitility.pollinterval.IterativePollInterval; +import org.awaitility.pollinterval.PollInterval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.OutputStream; import java.io.UnsupportedEncodingException; -import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.ProtocolException; +import java.net.URI; import java.net.URL; import java.net.URLEncoder; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.List; import java.util.Map; -import java.util.StringTokenizer; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; + import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; @@ -52,151 +79,67 @@ */ public class HTTPRecordWriter extends RecordWriter { private static final Logger LOG = LoggerFactory.getLogger(HTTPRecordWriter.class); - private static final String REGEX_HASHED_VAR = "#s*(\\w+)"; + private static final String REGEX_HASHED_VAR = "#(\\w+)"; + public static final String REQUEST_METHOD_POST = "POST"; + public static final String REQUEST_METHOD_PUT = "PUT"; + public static final String REQUEST_METHOD_DELETE = "DELETE"; private final HTTPSinkConfig config; - private StringBuilder messages = new StringBuilder(); + private final MessageBuffer messageBuffer; private String contentType; + private String url; + private String configURL; + private List placeHolderList; + private final Map headers; + + private AccessToken accessToken; + private final HttpErrorHandler httpErrorHandler; + private final PollInterval pollInterval; + private int httpStatusCode; + private static int retryCount; - HTTPRecordWriter(HTTPSinkConfig config) { + HTTPRecordWriter(HTTPSinkConfig config, Schema inputSchema) { + this.headers = config.getRequestHeadersMap(); this.config = config; + this.accessToken = null; + this.messageBuffer = new MessageBuffer( + config.getMessageFormat(), config.getJsonBatchKey(), config.shouldWriteJsonAsArray(), + config.getDelimiterForMessages(), config.getCharset(), config.getBody(), inputSchema + ); + this.httpErrorHandler = new HttpErrorHandler(config); + if (config.getRetryPolicy().equals(RetryPolicy.LINEAR)) { + pollInterval = FixedPollInterval.fixed(config.getLinearRetryInterval(), TimeUnit.SECONDS); + } else { + pollInterval = IterativePollInterval.iterative(duration -> duration.multiply(2), + Duration.FIVE_HUNDRED_MILLISECONDS); + } + url = config.getUrl(); + placeHolderList = getPlaceholderListFromURL(); } @Override public void write(StructuredRecord input, StructuredRecord unused) throws IOException { - String message = null; - if (config.getMethod().equals("POST") || config.getMethod().equals("PUT")) { - if (config.getMessageFormat().equals("JSON")) { - message = StructuredRecordStringConverter.toJsonString(input); - contentType = "application/json"; - } else if (config.getMessageFormat().equals("Form")) { - message = createFormMessage(input); - contentType = " application/x-www-form-urlencoded"; - } else if (config.getMessageFormat().equals("Custom")) { - message = createCustomMessage(config.getBody(), input); - contentType = " text/plain"; - } - messages.append(message).append(config.getDelimiterForMessages()); - } - StringTokenizer tokens = new StringTokenizer(messages.toString().trim(), config.getDelimiterForMessages()); - if (config.getBatchSize() == 1 || tokens.countTokens() == config.getBatchSize()) { - executeHTTPService(); + configURL = url; + if (config.getMethod().equals(REQUEST_METHOD_POST) || config.getMethod().equals(REQUEST_METHOD_PUT)) { + messageBuffer.add(input); } - } - @Override - public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - // Process remaining messages after batch executions. - if (!messages.toString().isEmpty()) { - try { - executeHTTPService(); - } catch (Exception e) { - throw new RuntimeException("Error while executing http request for remaining input messages " + - "after the batch execution. " + e); - } + if (config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_DELETE) + && !placeHolderList.isEmpty()) { + configURL = updateURLWithPlaceholderValue(input); } - } - private void executeHTTPService() throws IOException { - int responseCode; - int retries = 0; - IOException exception = null; - do { - HttpURLConnection conn = null; - Map headers = config.getRequestHeadersMap(); - try { - URL url = new URL(config.getUrl()); - conn = (HttpURLConnection) url.openConnection(); - if (conn instanceof HttpsURLConnection) { - //Disable SSLv3 - System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2"); - if (config.getDisableSSLValidation()) { - disableSSLValidation(); - } - } - conn.setRequestMethod(config.getMethod().toUpperCase()); - conn.setConnectTimeout(config.getConnectTimeout()); - conn.setReadTimeout(config.getReadTimeout()); - conn.setInstanceFollowRedirects(config.getFollowRedirects()); - conn.addRequestProperty("charset", config.getCharset()); - for (Map.Entry propertyEntry : headers.entrySet()) { - conn.addRequestProperty(propertyEntry.getKey(), propertyEntry.getValue()); - } - //Default contentType value would be added in the request properties if user has not added in the headers. - if (config.getMethod().equals("POST") || config.getMethod().equals("PUT")) { - if (!headers.containsKey("Content-Type")) { - conn.addRequestProperty("Content-Type", contentType); - } - } - if (messages.length() > 0) { - conn.setDoOutput(true); - try (OutputStream outputStream = conn.getOutputStream()) { - outputStream.write(messages.toString().trim().getBytes(config.getCharset())); - } - } - responseCode = conn.getResponseCode(); - messages.setLength(0); - if (config.getFailOnNon200Response() && !(responseCode >= 200 && responseCode < 300)) { - exception = new IOException("Received error response. Response code: " + responseCode); - } - break; - } catch (MalformedURLException | ProtocolException e) { - throw new IllegalStateException("Error opening url connection. Reason: " + e.getMessage(), e); - } catch (IOException e) { - LOG.warn("Error making {} request to url {} with headers {}.", config.getMethod(), config.getMethod(), headers); - exception = e; - } finally { - if (conn != null) { - conn.disconnect(); - } - } - retries++; - } while (retries < config.getNumRetries()); - if (exception != null) { - throw exception; + if (config.getBatchSize() == messageBuffer.size() || config.getMethod().equals(REQUEST_METHOD_DELETE)) { + flushMessageBuffer(); } } - private String createFormMessage(StructuredRecord input) { - boolean first = true; - String formMessage = null; - StringBuilder sb = new StringBuilder(""); - for (Schema.Field field : input.getSchema().getFields()) { - if (first) { - first = false; - } else { - sb.append("&"); - } - sb.append(field.getName()); - sb.append("="); - sb.append((String) input.get(field.getName())); - } - try { - formMessage = URLEncoder.encode(sb.toString(), config.getCharset()); - } catch (UnsupportedEncodingException e) { - throw new IllegalStateException("Error encoding Form message. Reason: " + e.getMessage(), e); - } - return formMessage; - } - - private String createCustomMessage(String body, StructuredRecord input) { - String customMessage = body; - Matcher matcher = Pattern.compile(REGEX_HASHED_VAR).matcher(customMessage); - HashMap findReplaceMap = new HashMap(); - while (matcher.find()) { - if (input.get(matcher.group(1)) != null) { - findReplaceMap.put(matcher.group(1), (String) input.get(matcher.group(1))); - } else { - throw new IllegalArgumentException(String.format( - "Field %s doesnt exist in the input schema.", matcher.group(1))); - } - } - Matcher replaceMatcher = Pattern.compile(REGEX_HASHED_VAR).matcher(customMessage); - while (replaceMatcher.find()) { - String val = replaceMatcher.group().replace("#", ""); - customMessage = (customMessage.replace(replaceMatcher.group(), findReplaceMap.get(val))); + @Override + public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + // Process remaining messages after batch executions. + if (!config.getMethod().equals(REQUEST_METHOD_DELETE)) { + flushMessageBuffer(); } - return customMessage; } private void disableSSLValidation() { @@ -227,4 +170,187 @@ public boolean verify(String hostname, SSLSession session) { }; HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid); } + + private boolean executeHTTPServiceAndCheckStatusCode() throws IOException { + LOG.debug("HTTP Request Attempt No. : {}", ++retryCount); + CloseableHttpClient httpClient = createHttpClient(configURL); + + CloseableHttpResponse response = null; + try { + URL url = new URL(configURL); + HttpEntityEnclosingRequestBase request = new HttpRequest(URI.create(String.valueOf(url)), + config.getMethod()); + + if (url.getProtocol().equalsIgnoreCase("https")) { + // Disable SSLv3 + System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2"); + if (config.getDisableSSLValidation()) { + disableSSLValidation(); + } + } + + if (!messageBuffer.isEmpty()) { + String requestBodyString = messageBuffer.getMessage(); + if (requestBodyString != null) { + StringEntity requestBody = new StringEntity(requestBodyString, Charsets.UTF_8.toString()); + request.setEntity(requestBody); + } + } + + request.setHeaders(getRequestHeaders()); + + response = httpClient.execute(request); + + httpStatusCode = response.getStatusLine().getStatusCode(); + LOG.debug("Response HTTP Status code: {}", httpStatusCode); + + } catch (MalformedURLException | ProtocolException e) { + throw new IllegalStateException("Error opening url connection. Reason: " + e.getMessage(), e); + } catch (IOException e) { + LOG.warn("Error making {} request to url {}.", config.getMethod(), config.getUrl()); + } finally { + if (response != null) { + response.close(); + } + } + RetryableErrorHandling errorHandlingStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode); + boolean shouldRetry = errorHandlingStrategy.shouldRetry(); + if (!shouldRetry) { + messageBuffer.clear(); + retryCount = 0; + } + return !shouldRetry; + } + + + public CloseableHttpClient createHttpClient(String pageUriStr) throws IOException { + HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); + + // set timeouts + Long connectTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getConnectTimeout()); + Long readTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getReadTimeout()); + RequestConfig.Builder requestBuilder = RequestConfig.custom(); + requestBuilder.setSocketTimeout(readTimeoutMillis.intValue()); + requestBuilder.setConnectTimeout(connectTimeoutMillis.intValue()); + requestBuilder.setConnectionRequestTimeout(connectTimeoutMillis.intValue()); + httpClientBuilder.setDefaultRequestConfig(requestBuilder.build()); + + // basic auth + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + if (!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) { + URI uri = URI.create(pageUriStr); + AuthScope authScope = new AuthScope(new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme())); + credentialsProvider.setCredentials(authScope, + new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); + } + + // proxy and proxy auth + if (!Strings.isNullOrEmpty(config.getProxyUrl())) { + HttpHost proxyHost = HttpHost.create(config.getProxyUrl()); + if (!Strings.isNullOrEmpty(config.getProxyUsername()) && !Strings.isNullOrEmpty(config.getProxyPassword())) { + credentialsProvider.setCredentials(new AuthScope(proxyHost), + new UsernamePasswordCredentials( + config.getProxyUsername(), config.getProxyPassword())); + } + httpClientBuilder.setProxy(proxyHost); + } + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + + return httpClientBuilder.build(); + } + + private Header[] getRequestHeaders() throws IOException { + ArrayList
clientHeaders = new ArrayList<>(); + + if (accessToken == null || OAuthUtil.tokenExpired(accessToken)) { + accessToken = OAuthUtil.getAccessToken(config); + } + + if (accessToken != null) { + Header authorizationHeader = getAuthorizationHeader(accessToken); + if (authorizationHeader != null) { + clientHeaders.add(authorizationHeader); + } + } + + headers.put("Request-Method", config.getMethod().toUpperCase()); + headers.put("Instance-Follow-Redirects", String.valueOf(config.getFollowRedirects())); + headers.put("charset", config.getCharset()); + + if (config.getMethod().equals(REQUEST_METHOD_POST) || config.getMethod().equals(REQUEST_METHOD_PUT)) { + if (!headers.containsKey("Content-Type")) { + headers.put("Content-Type", contentType); + } + } + + // set default headers + if (headers != null) { + for (Map.Entry headerEntry : this.headers.entrySet()) { + clientHeaders.add(new BasicHeader(headerEntry.getKey(), headerEntry.getValue())); + } + } + + return clientHeaders.toArray(new Header[clientHeaders.size()]); + } + + private Header getAuthorizationHeader(AccessToken accessToken) { + return new BasicHeader("Authorization", String.format("Bearer %s", accessToken.getTokenValue())); + } + + /** + * @return List of placeholders which should be replaced by actual value in the URL. + */ + private List getPlaceholderListFromURL() { + List placeholderList = new ArrayList<>(); + if (!(config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_DELETE))) { + return placeholderList; + } + Pattern pattern = Pattern.compile(REGEX_HASHED_VAR); + Matcher matcher = pattern.matcher(url); + while (matcher.find()) { + placeholderList.add(new PlaceholderBean(url, matcher.group(1))); + } + return placeholderList; // Return blank list if no match found + } + + private String updateURLWithPlaceholderValue(StructuredRecord inputRecord) { + try { + StringBuilder finalURLBuilder = new StringBuilder(url); + //Running a loop backwards so that it does not impact the start and end index for next record. + for (int i = placeHolderList.size() - 1; i >= 0; i--) { + PlaceholderBean key = placeHolderList.get(i); + String replacement = inputRecord.get(key.getPlaceHolderKey()); + if (replacement != null) { + String encodedReplacement = URLEncoder.encode(replacement, config.getCharset()); + finalURLBuilder.replace(key.getStartIndex(), key.getEndIndex(), encodedReplacement); + } + } + return finalURLBuilder.toString(); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException("Error encoding URL with placeholder value. Reason: " + e.getMessage(), e); + } + } + + /** + * Clears the message buffer if it is empty and the HTTP method is not 'DELETE'. + */ + private void flushMessageBuffer() { + if (messageBuffer.isEmpty() && !config.getMethod().equals(REQUEST_METHOD_DELETE)) { + return; + } + contentType = messageBuffer.getContentType(); + try { + Awaitility + .await().with() + .pollInterval(pollInterval) + .pollDelay(config.getReadTimeout() == null ? 0L : config.getReadTimeout(), TimeUnit.MILLISECONDS) + .timeout(config.getMaxRetryDuration(), TimeUnit.SECONDS) + .until(this::executeHTTPServiceAndCheckStatusCode); + } catch (Exception e) { + throw new RuntimeException("Error while executing http request for remaining input messages " + + "after the batch execution. " + e); + } + messageBuffer.clear(); + } + } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java index 1993c1e4..7f45b73d 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java @@ -16,6 +16,7 @@ package io.cdap.plugin.http.sink.batch; +import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; @@ -23,13 +24,19 @@ import io.cdap.cdap.api.data.batch.Output; import io.cdap.cdap.api.data.batch.OutputFormatProvider; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.plugin.common.Asset; +import io.cdap.plugin.common.LineageRecorder; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Sink plugin to send the messages from the pipeline to an external http endpoint. @@ -47,8 +54,10 @@ public HTTPSink(HTTPSinkConfig config) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); - FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); + FailureCollector collector = stageConfigurer.getFailureCollector(); config.validate(collector); + config.validateSchema(stageConfigurer.getInputSchema(), collector); collector.getOrThrowException(); } @@ -56,9 +65,21 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { public void prepareRun(BatchSinkContext context) { FailureCollector collector = context.getFailureCollector(); config.validate(collector); + config.validateSchema(context.getInputSchema(), collector); collector.getOrThrowException(); - context.addOutput(Output.of(config.referenceName, new HTTPSink.HTTPOutputFormatProvider(config))); + Schema inputSchema = context.getInputSchema(); + Asset asset = Asset.builder(config.getReferenceNameOrNormalizedFQN()) + .setFqn(config.getUrl()).build(); + LineageRecorder lineageRecorder = new LineageRecorder(context, asset); + lineageRecorder.createExternalDataset(context.getInputSchema()); + List fields = inputSchema == null ? + Collections.emptyList() : + inputSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()); + lineageRecorder.recordWrite("Write", String.format("Wrote to HTTP '%s'", config.getUrl()), fields); + + context.addOutput(Output.of(config.getReferenceNameOrNormalizedFQN(), + new HTTPSink.HTTPOutputFormatProvider(config, inputSchema))); } /** @@ -67,9 +88,11 @@ public void prepareRun(BatchSinkContext context) { private static class HTTPOutputFormatProvider implements OutputFormatProvider { private static final Gson GSON = new Gson(); private final HTTPSinkConfig config; + private final Schema inputSchema; - HTTPOutputFormatProvider(HTTPSinkConfig config) { + HTTPOutputFormatProvider(HTTPSinkConfig config, Schema inputSchema) { this.config = config; + this.inputSchema = inputSchema; } @Override @@ -79,7 +102,8 @@ public String getOutputFormatClassName() { @Override public Map getOutputFormatConfiguration() { - return Collections.singletonMap("http.sink.config", GSON.toJson(config)); + return ImmutableMap.of("http.sink.config", GSON.toJson(config), + "http.sink.input.schema", inputSchema == null ? "" : inputSchema.toString()); } } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java index 8f909347..378f3bc8 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java @@ -22,25 +22,47 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; + + +import io.cdap.plugin.common.ReferenceNames; import io.cdap.plugin.common.ReferencePluginConfig; +import io.cdap.plugin.http.common.BaseHttpConfig; +import io.cdap.plugin.http.source.common.EnumWithValue; +import io.cdap.plugin.http.source.common.RetryPolicy; +import io.cdap.plugin.http.source.common.error.ErrorHandling; +import io.cdap.plugin.http.source.common.error.HttpErrorHandlerEntity; +import io.cdap.plugin.http.source.common.error.RetryableErrorHandling; +import io.cdap.plugin.http.source.common.http.MessageFormatType; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nullable; import javax.ws.rs.HttpMethod; /** * Config class for {@link HTTPSink}. */ -public class HTTPSinkConfig extends ReferencePluginConfig { +public class HTTPSinkConfig extends BaseHttpConfig { public static final String URL = "url"; public static final String METHOD = "method"; public static final String BATCH_SIZE = "batchSize"; + public static final String WRITE_JSON_AS_ARRAY = "writeJsonAsArray"; + public static final String JSON_BATCH_KEY = "jsonBatchKey"; public static final String DELIMETER_FOR_MESSAGE = "delimiterForMessages"; public static final String MESSAGE_FORMAT = "messageFormat"; public static final String BODY = "body"; @@ -48,18 +70,24 @@ public class HTTPSinkConfig extends ReferencePluginConfig { public static final String CHARSET = "charset"; public static final String FOLLOW_REDIRECTS = "followRedirects"; public static final String DISABLE_SSL_VALIDATION = "disableSSLValidation"; - public static final String NUM_RETRIES = "numRetries"; + public static final String PROPERTY_HTTP_ERROR_HANDLING = "httpErrorsHandling"; + public static final String PROPERTY_ERROR_HANDLING = "errorHandling"; + public static final String PROPERTY_RETRY_POLICY = "retryPolicy"; + public static final String PROPERTY_LINEAR_RETRY_INTERVAL = "linearRetryInterval"; + public static final String PROPERTY_MAX_RETRY_DURATION = "maxRetryDuration"; public static final String CONNECTION_TIMEOUT = "connectTimeout"; public static final String READ_TIMEOUT = "readTimeout"; - public static final String FAIL_ON_NON_200_RESPONSE = "failOnNon200Response"; - private static final String KV_DELIMITER = ":"; private static final String DELIMITER = "\n"; + private static final String REGEX_HASHED_VAR = "#(\\w+)"; + private static final String PLACEHOLDER = "#"; private static final Set METHODS = ImmutableSet.of(HttpMethod.GET, HttpMethod.POST, HttpMethod.PUT, HttpMethod.DELETE); @Name(URL) - @Description("The URL to post data to. (Macro Enabled)") + @Description("The URL to post data to. Additionally, a placeholder like #columnName can be added to the URL that " + + "can be substituted with column value at the runtime. E.g. https://customer-url/user/#user_id. Here user_id " + + "column should exist in input schema. (Macro Enabled)") @Macro private final String url; @@ -73,6 +101,19 @@ public class HTTPSinkConfig extends ReferencePluginConfig { @Macro private final Integer batchSize; + @Name(WRITE_JSON_AS_ARRAY) + @Nullable + @Description("Whether to write json as array. Defaults to false. (Macro Enabled)") + @Macro + private final Boolean writeJsonAsArray; + + @Name(JSON_BATCH_KEY) + @Nullable + @Description("Optional key to be used for wrapping json array as object. " + + "Leave empty for no wrapping of the array (Macro Enabled)") + @Macro + private final String jsonBatchKey; + @Name(DELIMETER_FOR_MESSAGE) @Nullable @Description("Delimiter for messages to be used while batching. Defaults to \"\\n\". (Macro Enabled)") @@ -116,11 +157,35 @@ public class HTTPSinkConfig extends ReferencePluginConfig { @Macro private final Boolean disableSSLValidation; - @Name(NUM_RETRIES) - @Description("The number of times the request should be retried if the request fails. Defaults to 3. " + - "(Macro enabled)") + @Nullable + @Name(PROPERTY_HTTP_ERROR_HANDLING) + @Description("Defines the error handling strategy to use for certain HTTP response codes." + + "The left column contains a regular expression for HTTP status code. The right column contains an action which" + + "is done in case of match. If HTTP status code matches multiple regular expressions, " + + "the first specified in mapping is matched.") + protected String httpErrorsHandling; + + @Nullable + @Name(PROPERTY_ERROR_HANDLING) + @Description("Error handling strategy to use when the HTTP response cannot be transformed to an output record.") + protected String errorHandling; + + @Nullable + @Name(PROPERTY_RETRY_POLICY) + @Description("Policy used to calculate delay between retries. Default Retry Policy is Exponential.") + protected String retryPolicy; + + @Nullable + @Name(PROPERTY_LINEAR_RETRY_INTERVAL) + @Description("Interval in seconds between retries. Is only used if retry policy is \"linear\".") @Macro - private final Integer numRetries; + protected Long linearRetryInterval; + + @Nullable + @Name(PROPERTY_MAX_RETRY_DURATION) + @Description("Maximum time in seconds retries can take. Default value is 600 seconds (10 minute).") + @Macro + protected Long maxRetryDuration; @Name(CONNECTION_TIMEOUT) @Description("Sets the connection timeout in milliseconds. Set to 0 for infinite. Default is 60000 (1 minute). " + @@ -136,17 +201,14 @@ public class HTTPSinkConfig extends ReferencePluginConfig { @Macro private final Integer readTimeout; - @Name(FAIL_ON_NON_200_RESPONSE) - @Description("Whether to fail the pipeline on non-200 response from the http end point. Defaults to true. " + - "(Macro enabled)") - @Macro - private final Boolean failOnNon200Response; - public HTTPSinkConfig(String referenceName, String url, String method, Integer batchSize, @Nullable String delimiterForMessages, String messageFormat, @Nullable String body, @Nullable String requestHeaders, String charset, - boolean followRedirects, boolean disableSSLValidation, @Nullable int numRetries, - @Nullable int readTimeout, @Nullable int connectTimeout, boolean failOnNon200Response) { + boolean followRedirects, boolean disableSSLValidation, @Nullable String httpErrorsHandling, + String errorHandling, String retryPolicy, @Nullable Long linearRetryInterval, + Long maxRetryDuration, @Nullable int readTimeout, @Nullable int connectTimeout, + String oauth2Enabled, String authType, @Nullable String jsonBatchKey, + Boolean writeJsonAsArray) { super(referenceName); this.url = url; this.method = method; @@ -158,10 +220,17 @@ public HTTPSinkConfig(String referenceName, String url, String method, Integer b this.charset = charset; this.followRedirects = followRedirects; this.disableSSLValidation = disableSSLValidation; - this.numRetries = numRetries; + this.httpErrorsHandling = httpErrorsHandling; + this.errorHandling = errorHandling; + this.retryPolicy = retryPolicy; + this.linearRetryInterval = linearRetryInterval; + this.maxRetryDuration = maxRetryDuration; this.readTimeout = readTimeout; this.connectTimeout = connectTimeout; - this.failOnNon200Response = failOnNon200Response; + this.jsonBatchKey = jsonBatchKey; + this.writeJsonAsArray = writeJsonAsArray; + this.oauth2Enabled = oauth2Enabled; + this.authType = authType; } private HTTPSinkConfig(Builder builder) { @@ -176,10 +245,12 @@ private HTTPSinkConfig(Builder builder) { charset = builder.charset; followRedirects = builder.followRedirects; disableSSLValidation = builder.disableSSLValidation; - numRetries = builder.numRetries; connectTimeout = builder.connectTimeout; readTimeout = builder.readTimeout; - failOnNon200Response = builder.failOnNon200Response; + jsonBatchKey = builder.jsonBatchKey; + writeJsonAsArray = builder.writeJsonAsArray; + oauth2Enabled = builder.oauth2Enabled; + authType = builder.authType; } public static Builder newBuilder() { @@ -193,16 +264,16 @@ public static Builder newBuilder(HTTPSinkConfig copy) { builder.method = copy.getMethod(); builder.batchSize = copy.getBatchSize(); builder.delimiterForMessages = copy.getDelimiterForMessages(); - builder.messageFormat = copy.getMessageFormat(); + builder.messageFormat = copy.getMessageFormat().getValue(); builder.body = copy.getBody(); builder.requestHeaders = copy.getRequestHeaders(); builder.charset = copy.getCharset(); builder.followRedirects = copy.getFollowRedirects(); builder.disableSSLValidation = copy.getDisableSSLValidation(); - builder.numRetries = copy.getNumRetries(); builder.connectTimeout = copy.getConnectTimeout(); builder.readTimeout = copy.getReadTimeout(); - builder.failOnNon200Response = copy.getFailOnNon200Response(); + builder.oauth2Enabled = copy.getOAuth2Enabled(); + builder.authType = copy.getAuthTypeString(); return builder; } @@ -218,13 +289,21 @@ public Integer getBatchSize() { return batchSize; } + public boolean shouldWriteJsonAsArray() { + return writeJsonAsArray != null && writeJsonAsArray; + } + + public String getJsonBatchKey() { + return jsonBatchKey; + } + @Nullable public String getDelimiterForMessages() { - return delimiterForMessages; + return Strings.isNullOrEmpty(delimiterForMessages) ? "\n" : delimiterForMessages; } - public String getMessageFormat() { - return messageFormat; + public MessageFormatType getMessageFormat() { + return MessageFormatType.valueOf(messageFormat.toUpperCase()); } @Nullable @@ -249,8 +328,41 @@ public Boolean getDisableSSLValidation() { return disableSSLValidation; } - public Integer getNumRetries() { - return numRetries; + @Nullable + public String getHttpErrorsHandling() { + return httpErrorsHandling; + } + + public ErrorHandling getErrorHandling() { + return getEnumValueByString(ErrorHandling.class, errorHandling, PROPERTY_ERROR_HANDLING); + } + + public RetryPolicy getRetryPolicy() { + if (retryPolicy == null) { + return RetryPolicy.EXPONENTIAL; + } + return getEnumValueByString(RetryPolicy.class, retryPolicy, PROPERTY_RETRY_POLICY); + } + + private static T + getEnumValueByString(Class enumClass, String stringValue, String propertyName) { + return Stream.of(enumClass.getEnumConstants()) + .filter(keyType -> keyType.getValue().equalsIgnoreCase(stringValue)) + .findAny() + .orElseThrow(() -> new InvalidConfigPropertyException( + String.format("Unsupported value for '%s': '%s'", propertyName, stringValue), propertyName)); + } + + @Nullable + public Long getLinearRetryInterval() { + return linearRetryInterval; + } + + public Long getMaxRetryDuration() { + if (maxRetryDuration == null) { + return 600L; + } + return maxRetryDuration; } @Nullable @@ -263,15 +375,59 @@ public Integer getReadTimeout() { return readTimeout; } - public Boolean getFailOnNon200Response() { - return failOnNon200Response; - } - public Map getRequestHeadersMap() { return convertHeadersToMap(requestHeaders); } + public Map getHeadersMap(String header) { + return convertHeadersToMap(header); + } + + public String getReferenceNameOrNormalizedFQN() { + return Strings.isNullOrEmpty(referenceName) ? ReferenceNames.normalizeFqn(url) : referenceName; + } + + public List getHttpErrorHandlingEntries() { + Map httpErrorsHandlingMap = getMapFromKeyValueString(httpErrorsHandling); + List results = new ArrayList<>(httpErrorsHandlingMap.size()); + + for (Map.Entry entry : httpErrorsHandlingMap.entrySet()) { + String regex = entry.getKey(); + try { + results.add(new HttpErrorHandlerEntity(Pattern.compile(regex), + getEnumValueByString(RetryableErrorHandling.class, + entry.getValue(), PROPERTY_HTTP_ERROR_HANDLING))); + } catch (PatternSyntaxException e) { + // We embed causing exception message into this one. Since this message is shown on UI when validation fails. + throw new InvalidConfigPropertyException( + String.format( + "Error handling regex '%s' is not valid. %s", regex, e.getMessage()), PROPERTY_HTTP_ERROR_HANDLING); + } + } + return results; + } + + public static Map getMapFromKeyValueString(String keyValueString) { + Map result = new LinkedHashMap<>(); + + if (Strings.isNullOrEmpty(keyValueString)) { + return result; + } + + String[] mappings = keyValueString.split(","); + for (String map : mappings) { + String[] columns = map.split(":"); + if (columns.length < 2) { //For scenario where either of key or value not provided + throw new IllegalArgumentException(String.format("Missing value for key %s", columns[0])); + } + result.put(columns[0], columns[1]); + } + return result; + } + public void validate(FailureCollector collector) { + super.validate(collector); + if (!containsMacro(URL)) { try { new URL(url); @@ -299,9 +455,14 @@ public void validate(FailureCollector collector) { .withConfigProperty(METHOD); } - if (!containsMacro(NUM_RETRIES) && numRetries < 0) { - collector.addFailure("Number of Retries cannot be a negative number.", null) - .withConfigProperty(NUM_RETRIES); + if (!containsMacro(BATCH_SIZE) && batchSize != null && batchSize < 1) { + collector.addFailure("Batch size must be greater than 0.", null) + .withConfigProperty(BATCH_SIZE); + } + + // Validate Linear Retry Interval + if (!containsMacro(PROPERTY_RETRY_POLICY) && getRetryPolicy() == RetryPolicy.LINEAR) { + assertIsSet(getLinearRetryInterval(), PROPERTY_LINEAR_RETRY_INTERVAL, "retry policy is linear"); } if (!containsMacro(READ_TIMEOUT) && Objects.nonNull(readTimeout) && readTimeout < 0) { @@ -314,6 +475,38 @@ public void validate(FailureCollector collector) { collector.addFailure("For Custom message format, message cannot be null.", null) .withConfigProperty(MESSAGE_FORMAT); } + + if (!containsMacro(PROPERTY_MAX_RETRY_DURATION) && Objects.nonNull(maxRetryDuration) && maxRetryDuration < 0) { + collector.addFailure("Max Retry Duration cannot be a negative number.", null) + .withConfigProperty(PROPERTY_MAX_RETRY_DURATION); + } + } + + public void validateSchema(@Nullable Schema schema, FailureCollector collector) { + if (schema == null) { + return; + } + List fields = schema.getFields(); + if (fields == null || fields.isEmpty()) { + collector.addFailure("Schema must contain at least one field", null); + throw collector.getOrThrowException(); + } + + if (containsMacro(URL) || containsMacro(METHOD)) { + return; + } + + if ((method.equals("PUT") || method.equals("DELETE")) && url.contains(PLACEHOLDER)) { + Pattern pattern = Pattern.compile(REGEX_HASHED_VAR); + Matcher matcher = pattern.matcher(url); + List fieldNames = fields.stream().map(field -> field.getName()).collect(Collectors.toList()); + while (matcher.find()) { + if (!fieldNames.contains(matcher.group(1))) { + collector.addFailure(String.format("Schema must contain '%s' field mentioned in the url", matcher.group(1)), + null).withConfigProperty(URL); + } + } + } } private Map convertHeadersToMap(String headersString) { @@ -339,6 +532,8 @@ public static final class Builder { private String url; private String method; private Integer batchSize; + private Boolean writeJsonAsArray; + private String jsonBatchKey; private String delimiterForMessages; private String messageFormat; private String body; @@ -346,10 +541,10 @@ public static final class Builder { private String charset; private Boolean followRedirects; private Boolean disableSSLValidation; - private Integer numRetries; private Integer connectTimeout; private Integer readTimeout; - private Boolean failOnNon200Response; + private String oauth2Enabled; + private String authType; private Builder() { } @@ -374,6 +569,16 @@ public Builder setBatchSize(Integer batchSize) { return this; } + public Builder setWriteJsonAsArray(Boolean writeJsonAsArray) { + this.writeJsonAsArray = writeJsonAsArray; + return this; + } + + public Builder setJsonBatchKey(String jsonBatchKey) { + this.jsonBatchKey = jsonBatchKey; + return this; + } + public Builder setDelimiterForMessages(String delimiterForMessages) { this.delimiterForMessages = delimiterForMessages; return this; @@ -409,11 +614,6 @@ public Builder setDisableSSLValidation(Boolean disableSSLValidation) { return this; } - public Builder setNumRetries(Integer numRetries) { - this.numRetries = numRetries; - return this; - } - public Builder setConnectTimeout(Integer connectTimeout) { this.connectTimeout = connectTimeout; return this; @@ -424,11 +624,6 @@ public Builder setReadTimeout(Integer readTimeout) { return this; } - public Builder setFailOnNon200Response(Boolean failOnNon200Response) { - this.failOnNon200Response = failOnNon200Response; - return this; - } - public HTTPSinkConfig build() { return new HTTPSinkConfig(this); } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java b/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java new file mode 100644 index 00000000..5f08ba45 --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java @@ -0,0 +1,233 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.http.sink.batch; + +import com.google.common.base.Strings; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.format.StructuredRecordStringConverter; +import io.cdap.plugin.http.source.common.http.MessageFormatType; + + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * MessageBuffer is used to store the structured records in a buffer till the batch size is reached. + * Once the batch size is reached, the records are converted to the appropriate format and appended to the message. + * The message is then returned to the HTTPRecordWriter. + */ +public class MessageBuffer { + private static final String REGEX_HASHED_VAR = "#(\\w+)"; + private final List buffer; + private final String jsonBatchKey; + private final Boolean shouldWriteJsonAsArray; + private final String delimiterForMessages; + private final String charset; + private final String customMessageBody; + private final Function, String> messageFormatter; + private final String contentType; + private final Schema wrappedMessageSchema; + + + /** + * Constructor for MessageBuffer. + * + * @param messageFormat The format of the message. Can be JSON, FORM or CUSTOM. + * @param jsonBatchKey The key to be used for the JSON batch message. + * @param shouldWriteJsonAsArray Whether the JSON message should be written as an array. + * @param delimiterForMessages The delimiter to be used for messages. + * @param charset The charset to be used for the message. + * @param customMessageBody The custom message body to be used. + */ + public MessageBuffer( + MessageFormatType messageFormat, String jsonBatchKey, boolean shouldWriteJsonAsArray, + String delimiterForMessages, String charset, String customMessageBody, Schema inputSchema + ) { + this.jsonBatchKey = jsonBatchKey; + this.delimiterForMessages = delimiterForMessages; + this.charset = charset; + this.shouldWriteJsonAsArray = shouldWriteJsonAsArray; + this.customMessageBody = customMessageBody; + this.buffer = new ArrayList<>(); + switch (messageFormat) { + case JSON: + messageFormatter = this::formatAsJson; + contentType = "application/json"; + break; + case FORM: + messageFormatter = this::formatAsForm; + contentType = "application/x-www-form-urlencoded"; + break; + case CUSTOM: + messageFormatter = this::formatAsCustom; + contentType = "text/plain"; + break; + default: + throw new IllegalArgumentException("Invalid message format: " + messageFormat); + } + // A new StructuredRecord is created with the jsonBatchKey as the + // field name and the array of records as the value + Schema bufferRecordArraySchema = Schema.arrayOf(inputSchema); + wrappedMessageSchema = Schema.recordOf("wrapper", + Schema.Field.of(jsonBatchKey, bufferRecordArraySchema)); + } + + /** + * Adds a record to the buffer. + * + * @param record The record to be added. + */ + public void add(StructuredRecord record) { + buffer.add(record); + } + + /** + * Clears the buffer. + */ + public void clear() { + buffer.clear(); + } + + /** + * Returns the size of the buffer. + */ + public int size() { + return buffer.size(); + } + + /** + * Returns whether the buffer is empty. + */ + public boolean isEmpty() { + return buffer.isEmpty(); + } + + /** + * Returns the content type of the message. + */ + public String getContentType() { + return contentType; + } + + /** + * Converts the buffer to the appropriate format and returns the message. + */ + public String getMessage() throws IOException { + return messageFormatter.apply(buffer); + } + + private String formatAsJson(List buffer) { + try { + return formatAsJsonInternal(buffer); + } catch (IOException e) { + throw new IllegalStateException("Error formatting JSON message. Reason: " + e.getMessage(), e); + } + } + + private String formatAsJsonInternal(List buffer) throws IOException { + boolean useJsonBatchKey = !Strings.isNullOrEmpty(jsonBatchKey); + if (!shouldWriteJsonAsArray || !useJsonBatchKey) { + return getBufferAsJsonList(); + } + StructuredRecord wrappedMessageRecord = StructuredRecord.builder(wrappedMessageSchema) + .set(jsonBatchKey, buffer).build(); + return StructuredRecordStringConverter.toJsonString(wrappedMessageRecord); + } + + private String formatAsForm(List buffer) { + return buffer.stream() + .map(this::createFormMessage) + .collect(Collectors.joining(delimiterForMessages)); + } + + private String formatAsCustom(List buffer) { + return buffer.stream() + .map(this::createCustomMessage) + .collect(Collectors.joining(delimiterForMessages)); + } + + private String getBufferAsJsonList() throws IOException { + StringBuilder sb = new StringBuilder(); + String delimiter = shouldWriteJsonAsArray ? "," : delimiterForMessages; + if (shouldWriteJsonAsArray) { + sb.append("["); + } + for (StructuredRecord record : buffer) { + sb.append(StructuredRecordStringConverter.toJsonString(record)); + sb.append(delimiter); + } + if (!buffer.isEmpty()) { + sb.setLength(sb.length() - delimiter.length()); + } + if (shouldWriteJsonAsArray) { + sb.append("]"); + } + return sb.toString(); + } + + private String createFormMessage(StructuredRecord input) { + boolean first = true; + String formMessage = null; + StringBuilder sb = new StringBuilder(""); + for (Schema.Field field : input.getSchema().getFields()) { + if (first) { + first = false; + } else { + sb.append("&"); + } + sb.append(field.getName()); + sb.append("="); + sb.append((String) input.get(field.getName())); + } + try { + formMessage = URLEncoder.encode(sb.toString(), charset); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException("Error encoding Form message. Reason: " + e.getMessage(), e); + } + return formMessage; + } + + private String createCustomMessage(StructuredRecord input) { + String customMessage = customMessageBody; + Matcher matcher = Pattern.compile(REGEX_HASHED_VAR).matcher(customMessage); + HashMap findReplaceMap = new HashMap(); + while (matcher.find()) { + if (input.get(matcher.group(1)) != null) { + findReplaceMap.put(matcher.group(1), (String) input.get(matcher.group(1))); + } else { + throw new IllegalArgumentException(String.format( + "Field %s doesnt exist in the input schema.", matcher.group(1))); + } + } + Matcher replaceMatcher = Pattern.compile(REGEX_HASHED_VAR).matcher(customMessage); + while (replaceMatcher.find()) { + String val = replaceMatcher.group().replace("#", ""); + customMessage = (customMessage.replace(replaceMatcher.group(), findReplaceMap.get(val))); + } + return customMessage; + } + +} diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java b/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java new file mode 100644 index 00000000..1520f8e0 --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java @@ -0,0 +1,47 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.http.sink.batch; + +/** + * This class stores the placeholder information to avoid performing string functions for each record. + */ +public class PlaceholderBean { + private static final String PLACEHOLDER_FORMAT = "#%s"; + private final String placeHolderKey; + private final String placeHolderKeyWithPrefix; + private final int startIndex; + private final int endIndex; + + public PlaceholderBean(String url, String placeHolderKey) { + this.placeHolderKey = placeHolderKey; + this.placeHolderKeyWithPrefix = String.format(PLACEHOLDER_FORMAT, placeHolderKey); + this.startIndex = url.indexOf(placeHolderKeyWithPrefix); + this.endIndex = startIndex + placeHolderKeyWithPrefix.length(); + } + + public String getPlaceHolderKey() { + return placeHolderKey; + } + + public int getStartIndex() { + return startIndex; + } + + public int getEndIndex() { + return endIndex; + } +} diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java index dc3be1b7..6c90f5cc 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java @@ -25,6 +25,7 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.InvalidEntry; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; @@ -59,7 +60,8 @@ public HttpBatchSource(HttpBatchSourceConfig config) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { - config.validate(); // validate when macros not yet substituted + FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + config.validate(failureCollector); // validate when macros not yet substituted config.validateSchema(); pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema()); @@ -67,7 +69,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { @Override public void prepareRun(BatchSourceContext context) { - config.validate(); // validate when macros are already substituted + FailureCollector failureCollector = context.getFailureCollector(); + config.validate(failureCollector); // validate when macros are already substituted config.validateSchema(); schema = config.getSchema(); diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java index 1b02eece..7f55e6b5 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java @@ -15,7 +15,29 @@ */ package io.cdap.plugin.http.source.batch; +import com.google.common.base.Strings; +import com.google.gson.JsonSyntaxException; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.http.common.http.AuthType; +import io.cdap.plugin.http.common.http.OAuthUtil; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; +import io.cdap.plugin.http.source.common.http.HttpClient; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.HttpStatus; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; /** * Provides all the configurations required for configuring the {@link HttpBatchSource} plugin. @@ -24,4 +46,270 @@ public class HttpBatchSourceConfig extends BaseHttpSourceConfig { protected HttpBatchSourceConfig(String referenceName) { super(referenceName); } + + @Override + public void validate(FailureCollector failureCollector) { + super.validate(failureCollector); + validateCredentials(failureCollector); + } + + public void validateCredentials(FailureCollector collector) { + try { + if (getAuthType() == AuthType.OAUTH2) { + validateOAuth2Credentials(collector); + } else if (getAuthType() == AuthType.BASIC_AUTH) { + validateBasicAuthCredentials(collector); + } + } catch (IOException e) { + String errorMessage = "Unable to authenticate the given info : " + e.getMessage(); + collector.addFailure(errorMessage, null); + } + } + + private void validateOAuth2Credentials(FailureCollector collector) throws IOException { + if (!containsMacro(PROPERTY_CLIENT_ID) && !containsMacro(PROPERTY_CLIENT_SECRET) && + !containsMacro(PROPERTY_TOKEN_URL) && !containsMacro(PROPERTY_REFRESH_TOKEN) && + !containsMacro(PROPERTY_PROXY_PASSWORD) && !containsMacro(PROPERTY_PROXY_USERNAME) && + !containsMacro(PROPERTY_PROXY_URL)) { + HttpClientBuilder httpclientBuilder = HttpClients.custom(); + if (!Strings.isNullOrEmpty(getProxyUrl())) { + HttpHost proxyHost = HttpHost.create(getProxyUrl()); + if (!Strings.isNullOrEmpty(getProxyUsername()) && !Strings.isNullOrEmpty(getProxyPassword())) { + CredentialsProvider credsProvider = new BasicCredentialsProvider(); + credsProvider.setCredentials(new AuthScope(proxyHost), + new UsernamePasswordCredentials(getProxyUsername(), getProxyPassword())); + httpclientBuilder.setDefaultCredentialsProvider(credsProvider); + } + httpclientBuilder.setProxy(proxyHost); + } + + try (CloseableHttpClient closeableHttpClient = httpclientBuilder.build()) { + OAuthUtil.getAccessTokenByRefreshToken(closeableHttpClient, this); + } catch (JsonSyntaxException | HttpHostConnectException e) { + String errorMessage = "Error occurred during credential validation : " + e.getMessage(); + collector.addFailure(errorMessage, null); + } + } + } + + public void validateBasicAuthCredentials(FailureCollector collector) throws IOException { + try { + if (!containsMacro(PROPERTY_URL) && !containsMacro(PROPERTY_USERNAME) && !containsMacro(PROPERTY_PASSWORD) && + !containsMacro(PROPERTY_PROXY_USERNAME) && !containsMacro(PROPERTY_PROXY_PASSWORD) + && !containsMacro(PROPERTY_PROXY_URL)) { + HttpClient httpClient = new HttpClient(this); + validateBasicAuthResponse(collector, httpClient); + } + } catch (HttpHostConnectException e) { + String errorMessage = "Error occurred during credential validation : " + e.getMessage(); + collector.addFailure(errorMessage, "Please ensure that correct credentials are provided."); + } + } + + public void validateBasicAuthResponse(FailureCollector collector, HttpClient httpClient) throws IOException { + try (CloseableHttpResponse response = httpClient.executeHTTP(getUrl())) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != HttpStatus.SC_OK) { + HttpEntity entity = response.getEntity(); + if (entity != null) { + String errorResponse = EntityUtils.toString(entity, "UTF-8"); + String errorMessage = String.format("Credential validation request failed with Http Status code: '%d', " + + "Response: '%s'", statusCode, errorResponse); + collector.addFailure(errorMessage, "Please ensure that correct credentials are provided."); + } + } + } + } + + private HttpBatchSourceConfig(HttpBatchSourceConfigBuilder builder) { + super(builder.referenceName); + this.url = builder.url; + this.httpMethod = builder.httpMethod; + this.headers = builder.headers; + this.format = builder.format; + this.oauth2Enabled = builder.oauth2Enabled; + this.errorHandling = builder.errorHandling; + this.retryPolicy = builder.retryPolicy; + this.maxRetryDuration = builder.maxRetryDuration; + this.connectTimeout = builder.connectTimeout; + this.readTimeout = builder.readTimeout; + this.paginationType = builder.paginationType; + this.verifyHttps = builder.verifyHttps; + this.authType = builder.authType; + this.authUrl = builder.authUrl; + this.clientId = builder.clientId; + this.clientSecret = builder.clientSecret; + this.username = builder.username; + this.password = builder.password; + this.tokenUrl = builder.tokenUrl; + this.refreshToken = builder.refreshToken; + this.proxyUrl = builder.proxyUrl; + this.proxyUsername = builder.proxyUsername; + this.proxyPassword = builder.proxyPassword; + } + + public static HttpBatchSourceConfigBuilder builder() { + return new HttpBatchSourceConfigBuilder(); + } + + /** + * Builder for HttpBatchSourceConfig + */ + public static class HttpBatchSourceConfigBuilder { + + private String referenceName; + private String url; + private String httpMethod; + private String headers; + private String format; + private String oauth2Enabled; + private String errorHandling; + private String retryPolicy; + private Long maxRetryDuration; + private Integer connectTimeout; + private Integer readTimeout; + private String paginationType; + private String verifyHttps; + private String authType; + private String authUrl; + private String tokenUrl; + private String clientId; + private String clientSecret; + private String scopes; + private String refreshToken; + private String proxyUrl; + private String proxyUsername; + private String proxyPassword; + private String username; + private String password; + + + public HttpBatchSourceConfigBuilder setReferenceName (String referenceName) { + this.referenceName = referenceName; + return this; + } + public HttpBatchSourceConfigBuilder setAuthUrl(String authUrl) { + this.authUrl = authUrl; + return this; + } + + public HttpBatchSourceConfigBuilder setTokenUrl(String tokenUrl) { + this.tokenUrl = tokenUrl; + return this; + } + + public HttpBatchSourceConfigBuilder setClientId(String clientId) { + this.clientId = clientId; + return this; + } + + public HttpBatchSourceConfigBuilder setClientSecret(String clientSecret) { + this.clientSecret = clientSecret; + return this; + } + + public HttpBatchSourceConfigBuilder setScopes(String scopes) { + this.scopes = scopes; + return this; + } + + public HttpBatchSourceConfigBuilder setRefreshToken(String refreshToken) { + this.refreshToken = refreshToken; + return this; + } + + public HttpBatchSourceConfigBuilder setProxyUrl(String proxyUrl) { + this.proxyUrl = proxyUrl; + return this; + } + + public HttpBatchSourceConfigBuilder setProxyUsername(String proxyUsername) { + this.proxyUsername = proxyUsername; + return this; + } + + public HttpBatchSourceConfigBuilder setProxyPassword(String proxyPassword) { + this.proxyPassword = proxyPassword; + return this; + } + + public HttpBatchSourceConfigBuilder setUsername(String username) { + this.username = username; + return this; + } + + public HttpBatchSourceConfigBuilder setPassword(String password) { + this.password = password; + return this; + } + + public HttpBatchSourceConfigBuilder setUrl(String url) { + this.url = url; + return this; + } + + public HttpBatchSourceConfigBuilder setHttpMethod(String httpMethod) { + this.httpMethod = httpMethod; + return this; + } + + public HttpBatchSourceConfigBuilder setHeaders(String headers) { + this.headers = headers; + return this; + } + + public HttpBatchSourceConfigBuilder setFormat(String format) { + this.format = format; + return this; + } + + public HttpBatchSourceConfigBuilder setOauth2Enabled(String oauth2Enabled) { + this.oauth2Enabled = oauth2Enabled; + return this; + } + + public HttpBatchSourceConfigBuilder setErrorHandling(String errorHandling) { + this.errorHandling = errorHandling; + return this; + } + + public HttpBatchSourceConfigBuilder setRetryPolicy(String retryPolicy) { + this.retryPolicy = retryPolicy; + return this; + } + + public HttpBatchSourceConfigBuilder setMaxRetryDuration(Long maxRetryDuration) { + this.maxRetryDuration = maxRetryDuration; + return this; + } + + public HttpBatchSourceConfigBuilder setConnectTimeout(Integer connectTimeout) { + this.connectTimeout = connectTimeout; + return this; + } + + public HttpBatchSourceConfigBuilder setReadTimeout(Integer readTimeout) { + this.readTimeout = readTimeout; + return this; + } + + public HttpBatchSourceConfigBuilder setPaginationType(String paginationType) { + this.paginationType = paginationType; + return this; + } + + public HttpBatchSourceConfigBuilder setVerifyHttps(String verifyHttps) { + this.verifyHttps = verifyHttps; + return this; + } + + public HttpBatchSourceConfigBuilder setAuthType(String authType) { + this.authType = authType; + return this; + } + + public HttpBatchSourceConfig build() { + return new HttpBatchSourceConfig(this); + } + } } diff --git a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java index 396cac63..05d6ac11 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java +++ b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java @@ -20,9 +20,12 @@ import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; import io.cdap.cdap.etl.api.validation.InvalidStageException; +import io.cdap.plugin.common.ReferenceNames; import io.cdap.plugin.common.ReferencePluginConfig; +import io.cdap.plugin.http.common.BaseHttpConfig; import io.cdap.plugin.http.source.common.error.ErrorHandling; import io.cdap.plugin.http.source.common.error.HttpErrorHandlerEntity; import io.cdap.plugin.http.source.common.error.RetryableErrorHandling; @@ -49,7 +52,7 @@ /** * Base configuration for HTTP Streaming and Batch plugins. */ -public abstract class BaseHttpSourceConfig extends ReferencePluginConfig { +public abstract class BaseHttpSourceConfig extends BaseHttpConfig { public static final String PROPERTY_REFERENCE_NAME = "referenceName"; public static final String PROPERTY_URL = "url"; public static final String PROPERTY_HTTP_METHOD = "httpMethod"; @@ -59,11 +62,6 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig { public static final String PROPERTY_RESULT_PATH = "resultPath"; public static final String PROPERTY_FIELDS_MAPPING = "fieldsMapping"; public static final String PROPERTY_CSV_SKIP_FIRST_ROW = "csvSkipFirstRow"; - public static final String PROPERTY_USERNAME = "username"; - public static final String PROPERTY_PASSWORD = "password"; - public static final String PROPERTY_PROXY_URL = "proxyUrl"; - public static final String PROPERTY_PROXY_USERNAME = "proxyUsername"; - public static final String PROPERTY_PROXY_PASSWORD = "proxyPassword"; public static final String PROPERTY_HTTP_ERROR_HANDLING = "httpErrorsHandling"; public static final String PROPERTY_ERROR_HANDLING = "errorHandling"; public static final String PROPERTY_RETRY_POLICY = "retryPolicy"; @@ -81,12 +79,6 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig { public static final String PROPERTY_CUSTOM_PAGINATION_CODE = "customPaginationCode"; public static final String PROPERTY_WAIT_TIME_BETWEEN_PAGES = "waitTimeBetweenPages"; public static final String PROPERTY_OAUTH2_ENABLED = "oauth2Enabled"; - public static final String PROPERTY_AUTH_URL = "authUrl"; - public static final String PROPERTY_TOKEN_URL = "tokenUrl"; - public static final String PROPERTY_CLIENT_ID = "clientId"; - public static final String PROPERTY_CLIENT_SECRET = "clientSecret"; - public static final String PROPERTY_SCOPES = "scopes"; - public static final String PROPERTY_REFRESH_TOKEN = "refreshToken"; public static final String PROPERTY_VERIFY_HTTPS = "verifyHttps"; public static final String PROPERTY_KEYSTORE_FILE = "keystoreFile"; public static final String PROPERTY_KEYSTORE_TYPE = "keystoreType"; @@ -152,36 +144,6 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig { @Macro protected String csvSkipFirstRow; - @Nullable - @Name(PROPERTY_USERNAME) - @Description("Username for basic authentication.") - @Macro - protected String username; - - @Nullable - @Name(PROPERTY_PASSWORD) - @Description("Password for basic authentication.") - @Macro - protected String password; - - @Nullable - @Name(PROPERTY_PROXY_URL) - @Description("Proxy URL. Must contain a protocol, address and port.") - @Macro - protected String proxyUrl; - - @Nullable - @Name(PROPERTY_PROXY_USERNAME) - @Description("Proxy username.") - @Macro - protected String proxyUsername; - - @Nullable - @Name(PROPERTY_PROXY_PASSWORD) - @Description("Proxy password.") - @Macro - protected String proxyPassword; - @Nullable @Name(PROPERTY_HTTP_ERROR_HANDLING) @Description("Defines the error handling strategy to use for certain HTTP response codes." + @@ -276,46 +238,6 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig { @Macro protected Long waitTimeBetweenPages; - @Name(PROPERTY_OAUTH2_ENABLED) - @Description("If true, plugin will perform OAuth2 authentication.") - protected String oauth2Enabled; - - @Nullable - @Name(PROPERTY_AUTH_URL) - @Description("Endpoint for the authorization server used to retrieve the authorization code.") - @Macro - protected String authUrl; - - @Nullable - @Name(PROPERTY_TOKEN_URL) - @Description("Endpoint for the resource server, which exchanges the authorization code for an access token.") - @Macro - protected String tokenUrl; - - @Nullable - @Name(PROPERTY_CLIENT_ID) - @Description("Client identifier obtained during the Application registration process.") - @Macro - protected String clientId; - - @Nullable - @Name(PROPERTY_CLIENT_SECRET) - @Description("Client secret obtained during the Application registration process.") - @Macro - protected String clientSecret; - - @Nullable - @Name(PROPERTY_SCOPES) - @Description("Scope of the access request, which might have multiple space-separated values.") - @Macro - protected String scopes; - - @Nullable - @Name(PROPERTY_REFRESH_TOKEN) - @Description("Token used to receive accessToken, which is end product of OAuth2.") - @Macro - protected String refreshToken; - @Name(PROPERTY_VERIFY_HTTPS) @Description("If false, untrusted trust certificates (e.g. self signed), will not lead to an" + "error. Do not disable this in production environment on a network you do not entirely trust. " + @@ -384,9 +306,9 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig { @Macro protected String cipherSuites; + @Nullable @Name(PROPERTY_SCHEMA) @Macro - @Nullable @Description("Output schema. Is required to be set.") protected String schema; @@ -430,31 +352,6 @@ public Boolean getCsvSkipFirstRow() { return Boolean.parseBoolean(csvSkipFirstRow); } - @Nullable - public String getUsername() { - return username; - } - - @Nullable - public String getPassword() { - return password; - } - - @Nullable - public String getProxyUrl() { - return proxyUrl; - } - - @Nullable - public String getProxyUsername() { - return proxyUsername; - } - - @Nullable - public String getProxyPassword() { - return proxyPassword; - } - @Nullable public String getHttpErrorsHandling() { return httpErrorsHandling; @@ -529,52 +426,31 @@ public Long getWaitTimeBetweenPages() { return waitTimeBetweenPages; } - public Boolean getOauth2Enabled() { - return Boolean.parseBoolean(oauth2Enabled); - } - - @Nullable - public String getAuthUrl() { - return authUrl; - } - - @Nullable - public String getTokenUrl() { - return tokenUrl; - } - - @Nullable - public String getClientId() { - return clientId; + public Boolean getVerifyHttps() { + return Boolean.parseBoolean(verifyHttps); } @Nullable - public String getClientSecret() { - return clientSecret; + public String getKeystoreFile() { + return keystoreFile; } @Nullable - public String getScopes() { - return scopes; + public KeyStoreType getKeystoreType() { + return getEnumValueByString(KeyStoreType.class, keystoreType, PROPERTY_KEYSTORE_TYPE); } - @Nullable - public String getRefreshToken() { - return refreshToken; - } - public Boolean getVerifyHttps() { - return Boolean.parseBoolean(verifyHttps); + public void setServiceAccountType(String serviceAccountType) { + this.serviceAccountType = serviceAccountType; } - @Nullable - public String getKeystoreFile() { - return keystoreFile; + public void setServiceAccountJson(String serviceAccountJson) { + this.serviceAccountJson = serviceAccountJson; } - @Nullable - public KeyStoreType getKeystoreType() { - return getEnumValueByString(KeyStoreType.class, keystoreType, PROPERTY_KEYSTORE_TYPE); + public void setServiceAccountFilePath(String serviceAccountFilePath) { + this.serviceAccountFilePath = serviceAccountFilePath; } @Nullable @@ -670,38 +546,44 @@ public List getTransportProtocolsList() { return getListFromString(transportProtocols); } - public void validate() { + public String getReferenceNameOrNormalizedFQN() { + return Strings.isNullOrEmpty(referenceName) ? ReferenceNames.normalizeFqn(url) : referenceName; + } + + public void validate(FailureCollector failureCollector) { + super.validate(failureCollector); + // Validate URL if (!containsMacro(PROPERTY_URL)) { try { // replace with placeholder with anything just during pagination new URI(getUrl().replaceAll(PAGINATION_INDEX_PLACEHOLDER_REGEX, "0")); - } catch (URISyntaxException e) { - throw new InvalidConfigPropertyException( - String.format("URL value is not valid: '%s'", getUrl()), e, PROPERTY_URL); - } - } - // Validate HTTP Error Handling Map - if (!containsMacro(PROPERTY_HTTP_ERROR_HANDLING)) { - List httpErrorsHandlingEntries = getHttpErrorHandlingEntries(); - boolean supportsSkippingPages = PaginationIteratorFactory - .createInstance(this, null).supportsSkippingPages(); - - if (!supportsSkippingPages) { - for (HttpErrorHandlerEntity httpErrorsHandlingEntry : httpErrorsHandlingEntries) { - ErrorHandling postRetryStrategy = httpErrorsHandlingEntry.getStrategy().getAfterRetryStrategy(); - if (postRetryStrategy.equals(ErrorHandling.SEND) || - postRetryStrategy.equals(ErrorHandling.SKIP)) { - throw new InvalidConfigPropertyException( - String.format("Error handling strategy '%s' is not support in combination with pagination type", - httpErrorsHandlingEntry.getStrategy(), getPaginationType()), PROPERTY_HTTP_ERROR_HANDLING); + // Validate HTTP Error Handling Map + if (!containsMacro(PROPERTY_HTTP_ERROR_HANDLING)) { + List httpErrorsHandlingEntries = getHttpErrorHandlingEntries(); + boolean supportsSkippingPages = PaginationIteratorFactory + .createInstance(this, null).supportsSkippingPages(); + + if (!supportsSkippingPages) { + for (HttpErrorHandlerEntity httpErrorsHandlingEntry : httpErrorsHandlingEntries) { + ErrorHandling postRetryStrategy = httpErrorsHandlingEntry.getStrategy().getAfterRetryStrategy(); + if (postRetryStrategy.equals(ErrorHandling.SEND) || + postRetryStrategy.equals(ErrorHandling.SKIP)) { + throw new InvalidConfigPropertyException( + String.format("Error handling strategy '%s' is not support in combination with pagination type", + httpErrorsHandlingEntry.getStrategy(), getPaginationType()), + PROPERTY_HTTP_ERROR_HANDLING); + } + } } } + } catch (URISyntaxException e) { + throw new InvalidConfigPropertyException( + String.format("URL value is not valid: '%s'", getUrl()), e, PROPERTY_URL); } } - // Validate Linear Retry Interval if (!containsMacro(PROPERTY_RETRY_POLICY) && getRetryPolicy() == RetryPolicy.LINEAR) { assertIsSet(getLinearRetryInterval(), PROPERTY_LINEAR_RETRY_INTERVAL, "retry policy is linear"); @@ -741,7 +623,7 @@ public void validate() { propertiesShouldBeNull.remove(PROPERTY_INDEX_INCREMENT)); propertiesShouldBeNull.remove(PROPERTY_MAX_INDEX); // can be both null and non null - if (!url.contains(PAGINATION_INDEX_PLACEHOLDER)) { + if (!containsMacro(PROPERTY_URL) && !url.contains(PAGINATION_INDEX_PLACEHOLDER)) { throw new InvalidConfigPropertyException( String.format("Url '%s' must contain '%s' placeholder when pagination type is '%s'", getUrl(), PAGINATION_INDEX_PLACEHOLDER, getPaginationType()), @@ -768,7 +650,6 @@ PAGINATION_INDEX_PLACEHOLDER, getPaginationType()), } } - // Validate format properties if (!containsMacro(PROPERTY_FORMAT)) { String reasonFormat = String.format("page format is '%s'", getFormat()); @@ -784,26 +665,6 @@ PAGINATION_INDEX_PLACEHOLDER, getPaginationType()), } } - // Validate OAuth2 properties - if (!containsMacro(PROPERTY_OAUTH2_ENABLED) && this.getOauth2Enabled()) { - String reasonOauth2 = "OAuth2 is enabled"; - if (!containsMacro(PROPERTY_AUTH_URL)) { - assertIsSet(getAuthUrl(), PROPERTY_AUTH_URL, reasonOauth2); - } - if (!containsMacro(PROPERTY_TOKEN_URL)) { - assertIsSet(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2); - } - if (!containsMacro(PROPERTY_CLIENT_ID)) { - assertIsSet(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2); - } - if (!containsMacro((PROPERTY_CLIENT_SECRET))) { - assertIsSet(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2); - } - if (!containsMacro(PROPERTY_REFRESH_TOKEN)) { - assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2); - } - } - if (!containsMacro(PROPERTY_VERIFY_HTTPS) && !getVerifyHttps()) { assertIsNotSet(getTrustStoreFile(), PROPERTY_TRUSTSTORE_FILE, String.format("trustore settings are ignored due to disabled %s", PROPERTY_VERIFY_HTTPS)); @@ -811,6 +672,11 @@ PAGINATION_INDEX_PLACEHOLDER, getPaginationType()), } public void validateSchema() { + Schema schema = getSchema(); + if (!containsMacro(PROPERTY_SCHEMA) && schema == null) { + throw new InvalidConfigPropertyException( + String.format("Output schema cannot be empty"), PROPERTY_SCHEMA); + } if (!containsMacro(PROPERTY_FORMAT)) { PageFormat format = getFormat(); @@ -880,6 +746,9 @@ public static Map getMapFromKeyValueString(String keyValueString String[] mappings = keyValueString.split(","); for (String map : mappings) { String[] columns = map.split(":"); + if (columns.length < 2) { //For scenario where either of key or value not provided + throw new IllegalArgumentException(String.format("Missing value for key %s", columns[0])); + } result.put(columns[0], columns[1]); } return result; diff --git a/src/main/java/io/cdap/plugin/http/source/common/error/HttpErrorHandler.java b/src/main/java/io/cdap/plugin/http/source/common/error/HttpErrorHandler.java index c6a802f3..b260abb3 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/error/HttpErrorHandler.java +++ b/src/main/java/io/cdap/plugin/http/source/common/error/HttpErrorHandler.java @@ -15,6 +15,7 @@ */ package io.cdap.plugin.http.source.common.error; +import io.cdap.plugin.http.sink.batch.HTTPSinkConfig; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +35,10 @@ public HttpErrorHandler(BaseHttpSourceConfig config) { this.httpErrorsHandlingEntries = config.getHttpErrorHandlingEntries(); } + public HttpErrorHandler(HTTPSinkConfig config) { + this.httpErrorsHandlingEntries = config.getHttpErrorHandlingEntries(); + } + public RetryableErrorHandling getErrorHandlingStrategy(int httpCode) { String httpCodeString = Integer.toString(httpCode); diff --git a/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java b/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java index 85853d7f..eb69617c 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java +++ b/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java @@ -17,6 +17,7 @@ import com.google.common.base.Charsets; import com.google.common.base.Strings; +import io.cdap.plugin.http.common.http.HttpRequest; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; import org.apache.http.Header; import org.apache.http.HttpHost; @@ -146,22 +147,4 @@ private CloseableHttpClient createHttpClient() throws IOException { return httpClientBuilder.build(); } - - /** - * This class allows us to send body not only in POST/PUT but also in other requests. - */ - private static class HttpRequest extends HttpEntityEnclosingRequestBase { - private final String methodName; - - HttpRequest(URI uri, String methodName) { - super(); - this.setURI(uri); - this.methodName = methodName; - } - - @Override - public String getMethod() { - return methodName; - } - } } diff --git a/src/main/java/io/cdap/plugin/http/source/common/http/MessageFormatType.java b/src/main/java/io/cdap/plugin/http/source/common/http/MessageFormatType.java new file mode 100644 index 00000000..5c60a39a --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/source/common/http/MessageFormatType.java @@ -0,0 +1,37 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.http.source.common.http; +import io.cdap.plugin.http.source.common.EnumWithValue; + +/** + * An enum which represent a type of message format of an HTTP request body. + */ +public enum MessageFormatType implements EnumWithValue { + + JSON, + FORM, + CUSTOM; + + @Override + public String getValue() { + return name(); + } + + @Override + public String toString() { + return this.getValue(); + } +} diff --git a/src/main/java/io/cdap/plugin/http/source/streaming/HttpStreamingSource.java b/src/main/java/io/cdap/plugin/http/source/streaming/HttpStreamingSource.java index 4d9367d1..16ac745c 100644 --- a/src/main/java/io/cdap/plugin/http/source/streaming/HttpStreamingSource.java +++ b/src/main/java/io/cdap/plugin/http/source/streaming/HttpStreamingSource.java @@ -21,6 +21,7 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.dataset.DatasetProperties; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.streaming.StreamingContext; import io.cdap.cdap.etl.api.streaming.StreamingSource; @@ -48,16 +49,18 @@ public HttpStreamingSource(HttpStreamingSourceConfig config) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); // Verify that reference name meets dataset id constraints IdUtils.validateId(config.referenceName); pipelineConfigurer.createDataset(config.referenceName, Constants.EXTERNAL_DATASET_TYPE, DatasetProperties.EMPTY); - config.validate(); // validate when macros are not substituted + config.validate(failureCollector); // validate when macros are not substituted config.validateSchema(); } @Override public JavaDStream getStream(StreamingContext context) { - config.validate(); // validate when macros are substituted + FailureCollector failureCollector = context.getFailureCollector(); + config.validate(failureCollector); // validate when macros are substituted config.validateSchema(); ClassTag tag = ClassTag$.MODULE$.apply(StructuredRecord.class); diff --git a/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfigTest.java b/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfigTest.java index f9a3a92f..ecc3c9df 100644 --- a/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfigTest.java +++ b/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfigTest.java @@ -16,6 +16,7 @@ package io.cdap.plugin.http.sink.batch; +import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.validation.CauseAttributes; import io.cdap.cdap.etl.api.validation.ValidationException; import io.cdap.cdap.etl.api.validation.ValidationFailure; @@ -44,10 +45,17 @@ public class HTTPSinkConfigTest { "UTF8", true, true, + "2..:Success,.*:Fail", + "stopOnError", + "exponential", + 30L, + 600L, 1, 1, - 1, - true + "false", + "none", + "results", + false ); @Test @@ -101,17 +109,6 @@ public void testInvalidMethod() { assertPropertyValidationFailed(failureCollector, HTTPSinkConfig.METHOD); } - @Test - public void testInvalidNumRetries() { - HTTPSinkConfig config = HTTPSinkConfig.newBuilder(VALID_CONFIG) - .setNumRetries(-1) - .build(); - - MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); - config.validate(failureCollector); - assertPropertyValidationFailed(failureCollector, HTTPSinkConfig.NUM_RETRIES); - } - @Test public void testInvalidReadTimeout() { HTTPSinkConfig config = HTTPSinkConfig.newBuilder(VALID_CONFIG) @@ -146,6 +143,50 @@ public void testHTTPSinkWithEmptyUrl() { collector.getOrThrowException(); } + @Test() + public void testValidInputSchema() { + Schema schema = Schema.recordOf("record", + Schema.Field.of("id", Schema.of(Schema.Type.LONG)), + Schema.Field.of("name", Schema.of(Schema.Type.STRING))); + HTTPSinkConfig config = HTTPSinkConfig.newBuilder(VALID_CONFIG).build(); + MockFailureCollector collector = new MockFailureCollector("httpsinkwithvalidinputschema"); + config.validateSchema(schema, collector); + Assert.assertTrue(collector.getValidationFailures().isEmpty()); + } + + @Test(expected = ValidationException.class) + public void testHTTPSinkWithNegativeBatchSize() { + HTTPSinkConfig config = HTTPSinkConfig.newBuilder(VALID_CONFIG) + .setBatchSize(-1) + .build(); + + MockFailureCollector collector = new MockFailureCollector("httpsinkwithnegativebatchsize"); + config.validate(collector); + collector.getOrThrowException(); + } + + @Test(expected = ValidationException.class) + public void testHTTPSinkWithZeroBatchSize() { + HTTPSinkConfig config = HTTPSinkConfig.newBuilder(VALID_CONFIG) + .setBatchSize(0) + .build(); + + MockFailureCollector collector = new MockFailureCollector("httpsinkwithzerobatchsize"); + config.validate(collector); + collector.getOrThrowException(); + } + + @Test + public void testHTTPSinkWithPositiveBatchSize() { + HTTPSinkConfig config = HTTPSinkConfig.newBuilder(VALID_CONFIG) + .setBatchSize(42) + .build(); + + MockFailureCollector collector = new MockFailureCollector("httpsinkwithpositivebatchsize"); + config.validate(collector); + Assert.assertTrue(collector.getValidationFailures().isEmpty()); + } + public static void assertPropertyValidationFailed(MockFailureCollector failureCollector, String paramName) { List failureList = failureCollector.getValidationFailures(); Assert.assertEquals(1, failureList.size()); diff --git a/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java b/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java index b2b9f270..e3423a01 100644 --- a/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java +++ b/src/test/java/io/cdap/plugin/http/sink/batch/HTTPSinkTest.java @@ -123,12 +123,15 @@ public void testHTTPSink() throws Exception { .put("batchSize", "1") .put("referenceName", "HTTPSinkReference") .put("delimiterForMessages", "\n") - .put("numRetries", "3") .put("followRedirects", "true") .put("disableSSLValidation", "true") + .put("httpErrorsHandling", "2..:Success,.*:Fail") + .put("errorHandling", "stopOnError") + .put("retryPolicy", "exponential") + .put("maxRetryDuration", "600") .put("connectTimeout", "60000") .put("readTimeout", "60000") - .put("failOnNon200Response", "true") + .put("authType", "none") .build(); ETLStage sink = new ETLStage("HTTP", new ETLPlugin("HTTP", BatchSink.PLUGIN_TYPE, properties, null)); @@ -191,12 +194,15 @@ public void testHTTPSinkMacroUrl() throws Exception { .put("batchSize", "1") .put("referenceName", "HTTPSinkReference") .put("delimiterForMessages", "\n") - .put("numRetries", "3") .put("followRedirects", "true") .put("disableSSLValidation", "true") + .put("httpErrorsHandling", "2..:Success,.*:Fail") + .put("errorHandling", "stopOnError") + .put("retryPolicy", "exponential") + .put("maxRetryDuration", "600") .put("connectTimeout", "60000") .put("readTimeout", "60000") - .put("failOnNon200Response", "true") + .put("authType", "none") .build(); ImmutableMap runtimeProperties = diff --git a/src/test/java/io/cdap/plugin/http/sink/batch/MessageBufferTest.java b/src/test/java/io/cdap/plugin/http/sink/batch/MessageBufferTest.java new file mode 100644 index 00000000..ef0872a5 --- /dev/null +++ b/src/test/java/io/cdap/plugin/http/sink/batch/MessageBufferTest.java @@ -0,0 +1,389 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.http.sink.batch; + +import com.google.gson.stream.JsonWriter; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.StringWriter; + +/** + * Tests for {@link MessageBuffer} + */ +public class MessageBufferTest { + private static final HTTPSinkConfig VALID_CONFIG = new HTTPSinkConfig( + "test", + "http://localhost", + "GET", + 1, + ":", + "JSON", + "body", + "", + "UTF8", + true, + true, + "2..:Success,.*:Fail", + "stopOnError", + "exponential", + 30L, + 600L, + 1, + 1, + "false", + "none", + "results", + true); + MessageBuffer messageBuffer; + Schema dummySchema = Schema.recordOf("dummy", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("country", Schema.of(Schema.Type.STRING))); + StructuredRecord[] dummyRecords; + String[] dummyRecordsJsonString; + StringWriter stringWriter; + JsonWriter jsonWriter; + + @Before + public void setUp() throws Exception { + stringWriter = new StringWriter(); + jsonWriter = new JsonWriter(stringWriter); + dummyRecords = new StructuredRecord[]{ + StructuredRecord.builder(dummySchema).set("id", 1).set("name", "John").set("country", "USA").build(), + StructuredRecord.builder(dummySchema).set("id", 2).set("name", "Jane").set("country", "Canada").build(), + StructuredRecord.builder(dummySchema).set("id", 3).set("name", "Jack").set("country", "USA").build(), + StructuredRecord.builder(dummySchema).set("id", 4).set("name", "Jill").set("country", "Canada").build(), + StructuredRecord.builder(dummySchema).set("id", 5).set("name", "Joe").set("country", "USA").build(), + }; + dummyRecordsJsonString = new String[]{ + "{\"id\":1,\"name\":\"John\",\"country\":\"USA\"}", + "{\"id\":2,\"name\":\"Jane\",\"country\":\"Canada\"}", + "{\"id\":3,\"name\":\"Jack\",\"country\":\"USA\"}", + "{\"id\":4,\"name\":\"Jill\",\"country\":\"Canada\"}", + "{\"id\":5,\"name\":\"Joe\",\"country\":\"USA\"}" + }; + } + + @Test + public void testAdding5RecordsWithBatchSize1() throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize1 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1).build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithBatchSize1.getMessageFormat(), httpSinkConfigWithBatchSize1.getJsonBatchKey(), + httpSinkConfigWithBatchSize1.shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize1.getDelimiterForMessages(), + httpSinkConfigWithBatchSize1.getCharset(), httpSinkConfigWithBatchSize1.getBody(), dummySchema + ); + for (StructuredRecord record : dummyRecords) { + messageBuffer.add(record); + } + Assert.assertEquals(dummyRecords.length, messageBuffer.size()); + } + + @Test + public void testAdding5RecordsWithBatchSize3() throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize3 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(3).build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithBatchSize3.getMessageFormat(), httpSinkConfigWithBatchSize3.getJsonBatchKey(), + httpSinkConfigWithBatchSize3.shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize3.getDelimiterForMessages(), + httpSinkConfigWithBatchSize3.getCharset(), httpSinkConfigWithBatchSize3.getBody(), dummySchema + ); + for (StructuredRecord record : dummyRecords) { + messageBuffer.add(record); + } + Assert.assertEquals(dummyRecords.length, messageBuffer.size()); + } + + @Test + public void testClear() throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize1 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1).build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithBatchSize1.getMessageFormat(), httpSinkConfigWithBatchSize1.getJsonBatchKey(), + httpSinkConfigWithBatchSize1.shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize1.getDelimiterForMessages(), + httpSinkConfigWithBatchSize1.getCharset(), httpSinkConfigWithBatchSize1.getBody(), dummySchema + ); + for (StructuredRecord record : dummyRecords) { + messageBuffer.add(record); + } + Assert.assertEquals(dummyRecords.length, messageBuffer.size()); + messageBuffer.clear(); + Assert.assertEquals(0, messageBuffer.size()); + } + + @Test + public void testIsEmpty() throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize1 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1).build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithBatchSize1.getMessageFormat(), httpSinkConfigWithBatchSize1.getJsonBatchKey(), + httpSinkConfigWithBatchSize1.shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize1.getDelimiterForMessages(), + httpSinkConfigWithBatchSize1.getCharset(), httpSinkConfigWithBatchSize1.getBody(), dummySchema + ); + Assert.assertTrue(messageBuffer.isEmpty()); + } + + @Test + public void testIsEmptyAfterClear() throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize1 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1).build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithBatchSize1.getMessageFormat(), httpSinkConfigWithBatchSize1.getJsonBatchKey(), + httpSinkConfigWithBatchSize1.shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize1.getDelimiterForMessages(), + httpSinkConfigWithBatchSize1.getCharset(), httpSinkConfigWithBatchSize1.getBody(), dummySchema + ); + for (StructuredRecord record : dummyRecords) { + messageBuffer.add(record); + } + Assert.assertEquals(dummyRecords.length, messageBuffer.size()); + messageBuffer.clear(); + Assert.assertTrue(messageBuffer.isEmpty()); + } + + @Test + public void testGetContentTypeWithJsonFormat() throws Exception { + HTTPSinkConfig httpSinkConfigWithMessageFormatJson = HTTPSinkConfig.newBuilder(VALID_CONFIG) + .setMessageFormat("JSON").build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithMessageFormatJson.getMessageFormat(), + httpSinkConfigWithMessageFormatJson.getJsonBatchKey(), + httpSinkConfigWithMessageFormatJson.shouldWriteJsonAsArray(), + httpSinkConfigWithMessageFormatJson.getDelimiterForMessages(), + httpSinkConfigWithMessageFormatJson.getCharset(), httpSinkConfigWithMessageFormatJson.getBody(), dummySchema + ); + Assert.assertEquals("application/json", messageBuffer.getContentType()); + } + + @Test + public void testGetMessageWithBatchSize1() throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize1 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1) + .setMessageFormat("JSON").build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithBatchSize1.getMessageFormat(), httpSinkConfigWithBatchSize1.getJsonBatchKey(), + httpSinkConfigWithBatchSize1.shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize1.getDelimiterForMessages(), + httpSinkConfigWithBatchSize1.getCharset(), httpSinkConfigWithBatchSize1.getBody(), dummySchema + ); + + messageBuffer.add(dummyRecords[0]); + + Assert.assertEquals(dummyRecordsJsonString[0], messageBuffer.getMessage()); + } + + @Test + public void testGetMessageWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString() throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString = + HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(3).setMessageFormat("JSON").setWriteJsonAsArray(true) + .setJsonBatchKey("").build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.getMessageFormat(), + httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.getJsonBatchKey(), + httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.getDelimiterForMessages(), + httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.getCharset(), + httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.getBody(), dummySchema + ); + + int batchSize = 3; + for (int i = 0; i < batchSize; i++) { + messageBuffer.add(dummyRecords[i]); + } + + Assert.assertEquals("[" + dummyRecordsJsonString[0] + "," + dummyRecordsJsonString[1] + "," + + dummyRecordsJsonString[2] + "]", messageBuffer.getMessage()); + } + + @Test + public void testGetMessageWithBatchSize2AndJsonArrayTrueAndWrapperKeyData() throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData = + HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(2).setMessageFormat("JSON").setWriteJsonAsArray(true) + .setJsonBatchKey("data").build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.getMessageFormat(), + httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.getJsonBatchKey(), + httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.getDelimiterForMessages(), + httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.getCharset(), + httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.getBody(), dummySchema + ); + + int batchSize = 2; + for (int i = 0; i < batchSize; i++) { + messageBuffer.add(dummyRecords[i]); + } + + jsonWriter.beginObject(); + jsonWriter.name("data"); + jsonWriter.beginArray(); + for (int i = 0; i < batchSize; i++) { + jsonWriter.jsonValue(dummyRecordsJsonString[i]); + } + jsonWriter.endArray(); + jsonWriter.endObject(); + + Assert.assertEquals(stringWriter.toString(), messageBuffer.getMessage()); + + } + + @Test + public void testGetMessageWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter() + throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter = + HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(4).setMessageFormat("JSON") + .setWriteJsonAsArray(false).setJsonBatchKey("").setDelimiterForMessages("|").build(); + messageBuffer = + new MessageBuffer( + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter + .getMessageFormat(), + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter + .getJsonBatchKey(), + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter + .shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter + .getDelimiterForMessages(), + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter + .getCharset(), + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter.getBody(), + dummySchema + ); + + int batchSize = 4; + for (int i = 0; i < batchSize; i++) { + messageBuffer.add(dummyRecords[i]); + } + + Assert.assertEquals(dummyRecordsJsonString[0] + "|" + dummyRecordsJsonString[1] + "|" + + dummyRecordsJsonString[2] + "|" + dummyRecordsJsonString[3], messageBuffer.getMessage()); + } + + @Test + public void testGetMessageWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter() + throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter = + HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(4).setMessageFormat("JSON") + .setWriteJsonAsArray(false).setJsonBatchKey("data").setDelimiterForMessages("|").build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.getMessageFormat(), + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.getJsonBatchKey(), + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.getDelimiterForMessages(), + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.getCharset(), + httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.getBody(), dummySchema + ); + + int batchSize = 4; + for (int i = 0; i < batchSize; i++) { + messageBuffer.add(dummyRecords[i]); + } + + Assert.assertEquals(dummyRecordsJsonString[0] + "|" + dummyRecordsJsonString[1] + "|" + + dummyRecordsJsonString[2] + "|" + dummyRecordsJsonString[3], messageBuffer.getMessage()); + } + + @Test + public void testGetMessageWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems() throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems = + HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(5).setMessageFormat("JSON").setWriteJsonAsArray(true) + .setJsonBatchKey("items").build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.getMessageFormat(), + httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.getJsonBatchKey(), + httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.getDelimiterForMessages(), + httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.getCharset(), + httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.getBody(), dummySchema + ); + + int batchSize = 5; + for (int i = 0; i < batchSize; i++) { + messageBuffer.add(dummyRecords[i]); + } + + jsonWriter.beginObject(); + jsonWriter.name("items"); + jsonWriter.beginArray(); + for (String jsonRecord : dummyRecordsJsonString) { + jsonWriter.jsonValue(jsonRecord); + } + jsonWriter.endArray(); + jsonWriter.endObject(); + + Assert.assertEquals(stringWriter.toString(), messageBuffer.getMessage()); + } + + @Test + public void testGetMessageWithBatchSize1AndJsonArrayTrueAndWrapperKeyData() throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData = + HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1).setMessageFormat("JSON").setWriteJsonAsArray(true) + .setJsonBatchKey("data").build(); + messageBuffer = new MessageBuffer( + httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.getMessageFormat(), + httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.getJsonBatchKey(), + httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.getDelimiterForMessages(), + httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.getCharset(), + httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.getBody(), dummySchema + ); + + messageBuffer.add(dummyRecords[0]); + + jsonWriter.beginObject(); + jsonWriter.name("data"); + jsonWriter.beginArray(); + jsonWriter.jsonValue(dummyRecordsJsonString[0]); + jsonWriter.endArray(); + jsonWriter.endObject(); + + Assert.assertEquals(stringWriter.toString(), messageBuffer.getMessage()); + + } + + @Test + public void testGetMessageWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter() + throws Exception { + HTTPSinkConfig httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter = + HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(2).setMessageFormat("JSON") + .setWriteJsonAsArray(false).setJsonBatchKey("").setDelimiterForMessages(",").build(); + messageBuffer = + new MessageBuffer( + httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter + .getMessageFormat(), + httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter + .getJsonBatchKey(), + httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter + .shouldWriteJsonAsArray(), + httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter + .getDelimiterForMessages(), + httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter + .getCharset(), + httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter + .getBody(), dummySchema + ); + + int batchSize = 2; + for (int i = 0; i < batchSize; i++) { + messageBuffer.add(dummyRecords[i]); + } + + Assert.assertEquals(dummyRecordsJsonString[0] + "," + dummyRecordsJsonString[1], + messageBuffer.getMessage()); + } +} diff --git a/src/test/java/io/cdap/plugin/http/source/common/HttpBatchSourceConfigTest.java b/src/test/java/io/cdap/plugin/http/source/common/HttpBatchSourceConfigTest.java new file mode 100644 index 00000000..0f691169 --- /dev/null +++ b/src/test/java/io/cdap/plugin/http/source/common/HttpBatchSourceConfigTest.java @@ -0,0 +1,228 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.http.source.common; + +import com.google.auth.oauth2.AccessToken; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; +import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import io.cdap.plugin.http.common.http.OAuthUtil; +import io.cdap.plugin.http.source.batch.HttpBatchSourceConfig; +import io.cdap.plugin.http.source.common.http.HttpClient; +import io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator; +import io.cdap.plugin.http.source.common.pagination.PaginationIteratorFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.StatusLine; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.IOException; + +/** + * Unit tests for HttpBatchSourceConfig + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({PaginationIteratorFactory.class, HttpClientBuilder.class, HttpClients.class, OAuthUtil.class, + HttpHost.class, EntityUtils.class, HttpClient.class}) +@PowerMockIgnore("javax.management.*") +public class HttpBatchSourceConfigTest { + + @Mock + private HttpClient httpClient; + + @Mock + private CloseableHttpResponse response; + + @Mock + private StatusLine statusLine; + + @Mock + private HttpEntity entity; + + @Test(expected = IllegalArgumentException.class) + public void testMissingKeyValue() { + FailureCollector collector = new MockFailureCollector(); + HttpBatchSourceConfig config = HttpBatchSourceConfig.builder() + .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:") + .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY) + .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120) + .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").build(); + config.validate(collector); + } + + @Test(expected = InvalidConfigPropertyException.class) + public void testEmptySchemaKeyValue() { + HttpBatchSourceConfig config = HttpBatchSourceConfig.builder() + .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth") + .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY) + .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120) + .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").build(); + config.validateSchema(); + } + + @Test + public void testValidateOAuth2() throws Exception { + FailureCollector collector = new MockFailureCollector(); + HttpBatchSourceConfig config = HttpBatchSourceConfig.builder() + .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth") + .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY) + .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120) + .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").setAuthType("oAuth2").setClientId("id"). + setClientSecret("secret").setRefreshToken("token").setScopes("scope").setTokenUrl("https//:token").setRetryPolicy( + "exponential").build(); + PowerMockito.mockStatic(PaginationIteratorFactory.class); + BaseHttpPaginationIterator baseHttpPaginationIterator = Mockito.mock(BaseHttpPaginationIterator.class); + PowerMockito.when(PaginationIteratorFactory.createInstance(Mockito.any(), Mockito.any())) + .thenReturn(baseHttpPaginationIterator); + PowerMockito.when(baseHttpPaginationIterator.supportsSkippingPages()).thenReturn(true); + PowerMockito.mockStatic(HttpClients.class); + HttpClientBuilder httpClientBuilder = Mockito.mock(HttpClientBuilder.class); + Mockito.when(HttpClients.custom()).thenReturn(httpClientBuilder); + AccessToken accessToken = Mockito.mock(AccessToken.class); + Mockito.when(accessToken.getTokenValue()).thenReturn("1234"); + PowerMockito.mockStatic(OAuthUtil.class); + Mockito.when(OAuthUtil.getAccessTokenByRefreshToken(Mockito.any(), Mockito.any())).thenReturn(accessToken); + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + + @Test + public void testValidateOAuth2CredentialsWithProxy() throws IOException { + FailureCollector collector = new MockFailureCollector(); + FailureCollector collectorMock = new MockFailureCollector(); + HttpBatchSourceConfig config = HttpBatchSourceConfig.builder() + .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth") + .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY) + .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120) + .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").setAuthType("oAuth2").setClientId("id"). + setClientSecret("secret").setRefreshToken("token").setScopes("scope").setTokenUrl("https//:token").setRetryPolicy( + "exponential").setProxyUrl("https://proxy").setProxyUsername("proxyuser").setProxyPassword("proxypassword") + .build(); + HttpClientBuilder httpClientBuilder = Mockito.mock(HttpClientBuilder.class); + CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); + HttpHost proxy = PowerMockito.mock(HttpHost.class); + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + httpClientBuilder.setProxy(proxy); + PowerMockito.mockStatic(HttpClients.class); + CloseableHttpClient closeableHttpClient = Mockito.mock(CloseableHttpClient.class); + Mockito.when(HttpClients.createDefault()).thenReturn(closeableHttpClient); + Mockito.when(HttpClients.custom()).thenReturn(httpClientBuilder); + Mockito.when(HttpClients.custom() + .setDefaultCredentialsProvider(credentialsProvider) + .setProxy(proxy) + .build()).thenReturn(closeableHttpClient); + AccessToken accessToken = Mockito.mock(AccessToken.class); + Mockito.when(accessToken.getTokenValue()).thenReturn("1234"); + PowerMockito.mockStatic(OAuthUtil.class); + Mockito.when(OAuthUtil.getAccessTokenByRefreshToken(Mockito.any(), Mockito.any())).thenReturn(accessToken); + config.validate(collectorMock); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateCredentialsOAuth2WithInvalidAccessTokenRequest() throws Exception { + FailureCollector collector = new MockFailureCollector(); + HttpBatchSourceConfig config = HttpBatchSourceConfig.builder() + .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth") + .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY) + .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120) + .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").setAuthType("oAuth2").setClientId("id"). + setClientSecret("secret").setRefreshToken("token").setScopes("scope").setTokenUrl("https//:token").setRetryPolicy( + "exponential").build(); + CloseableHttpClient httpClientMock = Mockito.mock(CloseableHttpClient.class); + CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); + Mockito.when(httpClientMock.execute(Mockito.any())).thenReturn(httpResponse); + HttpEntity entity = Mockito.mock(HttpEntity.class); + Mockito.when(httpResponse.getEntity()).thenReturn(entity); + PowerMockito.mockStatic(EntityUtils.class); + String response = " Error 404 (Not Found)!!1\n" + + " \n" + + "

404. That’s an error.\n"; + + Mockito.when(EntityUtils.toString(entity, "UTF-8")).thenReturn(response); + PowerMockito.mockStatic(PaginationIteratorFactory.class); + BaseHttpPaginationIterator baseHttpPaginationIterator = Mockito.mock(BaseHttpPaginationIterator.class); + PowerMockito.when(PaginationIteratorFactory.createInstance(Mockito.any(), Mockito.any())) + .thenReturn(baseHttpPaginationIterator); + PowerMockito.when(baseHttpPaginationIterator.supportsSkippingPages()).thenReturn(true); + PowerMockito.mockStatic(HttpClients.class); + HttpClientBuilder httpClientBuilder = Mockito.mock(HttpClientBuilder.class); + Mockito.when(HttpClients.custom()).thenReturn(httpClientBuilder); + Mockito.when(httpClientBuilder.build()).thenReturn(httpClientMock); + try { + config.validate(collector); + } catch (IllegalStateException e) { + Assert.assertEquals(1, collector.getValidationFailures().size()); + } + } + + @Test + public void testBasicAuthWithValidResponse() throws IOException { + FailureCollector failureCollector = new MockFailureCollector(); + HttpBatchSourceConfig config = HttpBatchSourceConfig.builder() + .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth") + .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY) + .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120) + .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").setAuthType("basicAuth").setUsername( + "username").setPassword("password").setRetryPolicy( + "exponential").build(); + Mockito.when(httpClient.executeHTTP(Mockito.any())).thenReturn(response); + Mockito.when(response.getStatusLine()).thenReturn(statusLine); + Mockito.when(statusLine.getStatusCode()).thenReturn(200); + config.validateBasicAuthResponse(failureCollector, httpClient); + Assert.assertEquals(0, failureCollector.getValidationFailures().size()); + } + + @Test + public void testValidConfigWithInvalidResponse() throws IOException { + FailureCollector failureCollector = new MockFailureCollector(); + HttpBatchSourceConfig config = HttpBatchSourceConfig.builder() + .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth") + .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY) + .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120) + .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").setAuthType("basicAuth").setUsername( + "username").setPassword("password").setRetryPolicy( + "exponential").build(); + Mockito.when(httpClient.executeHTTP(Mockito.any())).thenReturn(response); + Mockito.when(response.getStatusLine()).thenReturn(statusLine); + Mockito.when(statusLine.getStatusCode()).thenReturn(400); + Mockito.when(response.getEntity()).thenReturn(entity); + config.validateBasicAuthResponse(failureCollector, httpClient); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Credential validation request failed with Http Status code: '400', Response: 'null'", + failureCollector + .getValidationFailures().get(0).getMessage()); + } + +} diff --git a/widgets/HTTP-batchsink.json b/widgets/HTTP-batchsink.json index 7dac9024..d6665244 100644 --- a/widgets/HTTP-batchsink.json +++ b/widgets/HTTP-batchsink.json @@ -38,6 +38,30 @@ "default": "1" } }, + { + "name": "writeJsonAsArray", + "label": "Write JSON As Array", + "widget-type": "toggle", + "widget-attributes": { + "on": { + "value": "true", + "label": "True" + }, + "off": { + "value": "false", + "label": "False" + }, + "default": "false" + } + }, + { + "widget-type": "textbox", + "label": "Json Batch Key", + "name": "jsonBatchKey", + "widget-attributes": { + "default": "" + } + }, { "widget-type": "select", "label": "Message Format", @@ -114,27 +138,6 @@ "default": "true" } }, - { - "widget-type": "select", - "label": "Number of Retries", - "name": "numRetries", - "widget-attributes": { - "values": [ - "3", - "0", - "1", - "2", - "4", - "5", - "6", - "7", - "8", - "9", - "10" - ], - "default": "3" - } - }, { "widget-type": "textbox", "label": "Connection Timeout (milliseconds)", @@ -152,19 +155,382 @@ } }, { - "widget-type": "select", - "label": "Fail Pipeline On Non-200 Response ?", - "name": "failOnNon200Response", + "widget-type": "keyvalue-dropdown", + "label": "HTTP Errors Handling", + "name": "httpErrorsHandling", "widget-attributes": { - "values": [ - "true", - "false" + "default": "2..:Success,.*:Fail", + "showDelimiter": "false", + "dropdownOptions": [ + "Success", + "Fail", + "Skip", + "Send to error", + "Retry and fail", + "Retry and skip", + "Retry and send to error" ], - "default": "true" + "key-placeholder": "HTTP Status Code Regex" + } + }, + { + "widget-type": "radio-group", + "label": "Non-HTTP Error Handling", + "name": "errorHandling", + "widget-attributes": { + "layout": "inline", + "default": "stopOnError", + "options": [ + { + "id": "stopOnError", + "label": "Stop on error" + }, + { + "id": "sendToError", + "label": "Send to error" + }, + { + "id": "skipOnError", + "label": "Skip on error" + } + ] + } + }, + { + "widget-type": "radio-group", + "label": "Retry Policy", + "name": "retryPolicy", + "widget-attributes": { + "layout": "inline", + "default": "exponential", + "options": [ + { + "id": "exponential", + "label": "Exponential" + }, + { + "id": "linear", + "label": "Linear" + } + ] + } + }, + { + "widget-type": "number", + "label": "Linear Retry Interval", + "name": "linearRetryInterval", + "widget-attributes": { + "min": "0", + "default": "30" + } + }, + { + "widget-type": "number", + "label": "Max Retry Duration", + "name": "maxRetryDuration", + "widget-attributes": { + "min": "0", + "default": "600" } } ] + } , + { + "label": "Authentication", + "properties": [ + { + "widget-type": "radio-group", + "label": "Authentication Type", + "name": "authType", + "widget-attributes": { + "layout": "inline", + "default": "none", + "options": [ + { + "id": "none", + "label": "None" + }, + { + "id": "oAuth2", + "label": "OAuth2" + }, + { + "id": "serviceAccount", + "label": "Service account" + }, + { + "id": "basicAuth", + "label": "Basic Authentication" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Auth URL", + "name": "authUrl" + }, + { + "widget-type": "textbox", + "label": "Token URL", + "name": "tokenUrl" + }, + { + "widget-type": "textbox", + "label": "Client ID", + "name": "clientId" + }, + { + "widget-type": "password", + "label": "Client Secret", + "name": "clientSecret" + }, + { + "widget-type": "textbox", + "label": "Scopes", + "name": "scopes" + }, + { + "widget-type": "textbox", + "label": "Refresh Token", + "name": "refreshToken" + }, + { + "name": "serviceAccountType", + "label": "Service Account Type", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "filePath", + "options": [ + { + "id": "filePath", + "label": "File Path" + }, + { + "id": "JSON", + "label": "JSON" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Service Account File Path", + "name": "serviceAccountFilePath", + "widget-attributes": { + "default": "auto-detect" + } + }, + { + "widget-type": "textbox", + "label": "Service Account JSON", + "name": "serviceAccountJSON" + }, + { + "widget-type": "textarea", + "label": "Service Account Scope", + "name": "serviceAccountScope" + }, + { + "widget-type": "textbox", + "label": "Username", + "name": "username" + }, + { + "widget-type": "password", + "label": "Password", + "name": "password" + }, + { + "widget-type": "hidden", + "label": "OAuth2 Enabled", + "name": "oauth2Enabled", + "widget-attributes": { + "default": "false", + "on": { + "label": "True", + "value": "true" + }, + "off": { + "label": "False", + "value": "false" + } + } + } + ] + }, + { + "label": "HTTP Proxy", + "properties": [ + { + "widget-type": "textbox", + "label": "Proxy URL", + "name": "proxyUrl" + }, + { + "widget-type": "textbox", + "label": "Username", + "name": "proxyUsername" + }, + { + "widget-type": "password", + "label": "Password", + "name": "proxyPassword" + } + ] } ], - "outputs": [] + "outputs": [], + "filters": [ + { + "name": "Linear Retry Interval", + "condition": { + "property": "retryPolicy", + "operator": "equal to", + "value": "linear" + }, + "show": [ + { + "name": "linearRetryInterval", + "type": "property" + } + ] + }, + { + "name": "Should Write Json As Array", + "condition": { + "expression": "messageFormat == 'JSON'" + }, + "show": [ + { + "type": "property", + "name": "writeJsonAsArray" + } + ] + }, + { + "name": "JsonBatchKey", + "condition": { + "expression": "messageFormat == 'JSON'" + }, + "show": [ + { + "type": "property", + "name": "jsonBatchKey" + } + ] + }, + { + "name": "Proxy authentication", + "condition": { + "property": "proxyUrl", + "operator": "exists" + }, + "show": [ + { + "name": "proxyUsername", + "type": "property" + }, + { + "name": "proxyPassword", + "type": "property" + } + ] + }, + { + "name": "Authenticate with Basic Auth", + "condition": { + "property": "authType", + "operator": "equal to", + "value": "basicAuth" + }, + "show": [ + { + "name": "username", + "type": "property" + }, + { + "name": "password", + "type": "property" + } + ] + }, + { + "name": "Authenticate with OAuth2", + "condition": { + "property": "authType", + "operator": "equal to", + "value": "oAuth2" + }, + "show": [ + { + "name": "authUrl", + "type": "property" + }, + { + "name": "tokenUrl", + "type": "property" + }, + { + "name": "clientId", + "type": "property" + }, + { + "name": "clientSecret", + "type": "property" + }, + { + "name": "scopes", + "type": "property" + }, + { + "name": "refreshToken", + "type": "property" + } + ] + }, + { + "name": "Authenticate with service account", + "condition": { + "property": "authType", + "operator": "equal to", + "value": "serviceAccount" + }, + "show": [ + { + "name": "serviceAccountType", + "type": "property" + }, + { + "name": "serviceAccountScope", + "type": "property" + } + ] + }, + { + "name": "ServiceAuthenticationTypeFilePath", + "condition": { + "expression": "authType == 'serviceAccount' && serviceAccountType == 'filePath'" + }, + "show": [ + { + "type": "property", + "name": "serviceAccountFilePath" + } + ] + }, + { + "name": "ServiceAuthenticationTypeJSON", + "condition": { + "expression": "authType == 'serviceAccount' && serviceAccountType == 'JSON'" + }, + "show": [ + { + "type": "property", + "name": "serviceAccountJSON" + } + ] + } + ] } diff --git a/widgets/HTTP-batchsource.json b/widgets/HTTP-batchsource.json index 6c053781..1f142ae8 100644 --- a/widgets/HTTP-batchsource.json +++ b/widgets/HTTP-batchsource.json @@ -103,22 +103,33 @@ ] }, { - "label": "OAuth2", + "label": "Authentication", "properties": [ { - "widget-type": "toggle", - "label": "OAuth2 Enabled", - "name": "oauth2Enabled", + "widget-type": "radio-group", + "label": "Authentication Type", + "name": "authType", "widget-attributes": { - "default": "false", - "on": { - "label": "True", - "value": "true" - }, - "off": { - "label": "False", - "value": "false" - } + "layout": "inline", + "default": "none", + "options": [ + { + "id": "none", + "label": "None" + }, + { + "id": "oAuth2", + "label": "OAuth2" + }, + { + "id": "serviceAccount", + "label": "Service account" + }, + { + "id": "basicAuth", + "label": "Basic Authentication" + } + ] } }, { @@ -150,12 +161,44 @@ "widget-type": "textbox", "label": "Refresh Token", "name": "refreshToken" - } - ] - }, - { - "label": "Basic Authentication", - "properties": [ + }, + { + "name": "serviceAccountType", + "label": "Service Account Type", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "filePath", + "options": [ + { + "id": "filePath", + "label": "File Path" + }, + { + "id": "JSON", + "label": "JSON" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Service Account File Path", + "name": "serviceAccountFilePath", + "widget-attributes": { + "default": "auto-detect" + } + }, + { + "widget-type": "textbox", + "label": "Service Account JSON", + "name": "serviceAccountJSON" + }, + { + "widget-type": "textarea", + "label": "Service Account Scope", + "name": "serviceAccountScope" + }, { "widget-type": "textbox", "label": "Username", @@ -165,6 +208,22 @@ "widget-type": "password", "label": "Password", "name": "password" + }, + { + "widget-type": "hidden", + "label": "OAuth2 Enabled", + "name": "oauth2Enabled", + "widget-attributes": { + "default": "false", + "on": { + "label": "True", + "value": "true" + }, + "off": { + "label": "False", + "value": "false" + } + } } ] }, @@ -503,7 +562,7 @@ "name": "Proxy authentication", "condition": { "property": "proxyUrl", - "operator": "exists", + "operator": "exists" }, "show": [ { @@ -599,11 +658,11 @@ ] }, { - "name": "OAuth 2 disabled", + "name": "Authenticate with Basic Auth", "condition": { - "property": "oauth2Enabled", + "property": "authType", "operator": "equal to", - "value": "false" + "value": "basicAuth" }, "show": [ { @@ -617,11 +676,11 @@ ] }, { - "name": "OAuth 2 enabled", + "name": "Authenticate with OAuth2", "condition": { - "property": "oauth2Enabled", + "property": "authType", "operator": "equal to", - "value": "true" + "value": "oAuth2" }, "show": [ { @@ -650,6 +709,48 @@ } ] }, + { + "name": "Authenticate with service account", + "condition": { + "property": "authType", + "operator": "equal to", + "value": "serviceAccount" + }, + "show": [ + { + "name": "serviceAccountType", + "type": "property" + }, + { + "name": "serviceAccountScope", + "type": "property" + } + ] + }, + { + "name": "ServiceAuthenticationTypeFilePath", + "condition": { + "expression": "authType == 'serviceAccount' && serviceAccountType == 'filePath'" + }, + "show": [ + { + "type": "property", + "name": "serviceAccountFilePath" + } + ] + }, + { + "name": "ServiceAuthenticationTypeJSON", + "condition": { + "expression": "authType == 'serviceAccount' && serviceAccountType == 'JSON'" + }, + "show": [ + { + "type": "property", + "name": "serviceAccountJSON" + } + ] + }, { "name": "JSON/XML Formatting", "condition": { diff --git a/widgets/HTTP-streamingsource.json b/widgets/HTTP-streamingsource.json index e7abdb68..01a77c65 100644 --- a/widgets/HTTP-streamingsource.json +++ b/widgets/HTTP-streamingsource.json @@ -108,22 +108,33 @@ ] }, { - "label": "OAuth2", + "label": "Authentication", "properties": [ { - "widget-type": "toggle", - "label": "OAuth2 Enabled", - "name": "oauth2Enabled", + "widget-type": "radio-group", + "label": "Authentication Type", + "name": "authType", "widget-attributes": { - "default": "false", - "on": { - "label": "True", - "value": "true" - }, - "off": { - "label": "False", - "value": "false" - } + "layout": "inline", + "default": "none", + "options": [ + { + "id": "none", + "label": "None" + }, + { + "id": "oAuth2", + "label": "OAuth2" + }, + { + "id": "serviceAccount", + "label": "Service account" + }, + { + "id": "basicAuth", + "label": "Basic Authentication" + } + ] } }, { @@ -155,12 +166,44 @@ "widget-type": "textbox", "label": "Refresh Token", "name": "refreshToken" - } - ] - }, - { - "label": "Basic Authentication", - "properties": [ + }, + { + "name": "serviceAccountType", + "label": "Service Account Type", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "filePath", + "options": [ + { + "id": "filePath", + "label": "File Path" + }, + { + "id": "JSON", + "label": "JSON" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Service Account File Path", + "name": "serviceAccountFilePath", + "widget-attributes": { + "default": "auto-detect" + } + }, + { + "widget-type": "textbox", + "label": "Service Account JSON", + "name": "serviceAccountJSON" + }, + { + "widget-type": "textarea", + "label": "Service Account Scope", + "name": "serviceAccountScope" + }, { "widget-type": "textbox", "label": "Username", @@ -170,6 +213,22 @@ "widget-type": "password", "label": "Password", "name": "password" + }, + { + "widget-type": "hidden", + "label": "OAuth2 Enabled", + "name": "oauth2Enabled", + "widget-attributes": { + "default": "false", + "on": { + "label": "True", + "value": "true" + }, + "off": { + "label": "False", + "value": "false" + } + } } ] },