runtimeProperties =
diff --git a/src/test/java/io/cdap/plugin/http/sink/batch/MessageBufferTest.java b/src/test/java/io/cdap/plugin/http/sink/batch/MessageBufferTest.java
new file mode 100644
index 00000000..ef0872a5
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/http/sink/batch/MessageBufferTest.java
@@ -0,0 +1,389 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.http.sink.batch;
+
+import com.google.gson.stream.JsonWriter;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.StringWriter;
+
+/**
+ * Tests for {@link MessageBuffer}
+ */
+public class MessageBufferTest {
+ private static final HTTPSinkConfig VALID_CONFIG = new HTTPSinkConfig(
+ "test",
+ "http://localhost",
+ "GET",
+ 1,
+ ":",
+ "JSON",
+ "body",
+ "",
+ "UTF8",
+ true,
+ true,
+ "2..:Success,.*:Fail",
+ "stopOnError",
+ "exponential",
+ 30L,
+ 600L,
+ 1,
+ 1,
+ "false",
+ "none",
+ "results",
+ true);
+ MessageBuffer messageBuffer;
+ Schema dummySchema = Schema.recordOf("dummy",
+ Schema.Field.of("id", Schema.of(Schema.Type.INT)),
+ Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("country", Schema.of(Schema.Type.STRING)));
+ StructuredRecord[] dummyRecords;
+ String[] dummyRecordsJsonString;
+ StringWriter stringWriter;
+ JsonWriter jsonWriter;
+
+ @Before
+ public void setUp() throws Exception {
+ stringWriter = new StringWriter();
+ jsonWriter = new JsonWriter(stringWriter);
+ dummyRecords = new StructuredRecord[]{
+ StructuredRecord.builder(dummySchema).set("id", 1).set("name", "John").set("country", "USA").build(),
+ StructuredRecord.builder(dummySchema).set("id", 2).set("name", "Jane").set("country", "Canada").build(),
+ StructuredRecord.builder(dummySchema).set("id", 3).set("name", "Jack").set("country", "USA").build(),
+ StructuredRecord.builder(dummySchema).set("id", 4).set("name", "Jill").set("country", "Canada").build(),
+ StructuredRecord.builder(dummySchema).set("id", 5).set("name", "Joe").set("country", "USA").build(),
+ };
+ dummyRecordsJsonString = new String[]{
+ "{\"id\":1,\"name\":\"John\",\"country\":\"USA\"}",
+ "{\"id\":2,\"name\":\"Jane\",\"country\":\"Canada\"}",
+ "{\"id\":3,\"name\":\"Jack\",\"country\":\"USA\"}",
+ "{\"id\":4,\"name\":\"Jill\",\"country\":\"Canada\"}",
+ "{\"id\":5,\"name\":\"Joe\",\"country\":\"USA\"}"
+ };
+ }
+
+ @Test
+ public void testAdding5RecordsWithBatchSize1() throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize1 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1).build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithBatchSize1.getMessageFormat(), httpSinkConfigWithBatchSize1.getJsonBatchKey(),
+ httpSinkConfigWithBatchSize1.shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize1.getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize1.getCharset(), httpSinkConfigWithBatchSize1.getBody(), dummySchema
+ );
+ for (StructuredRecord record : dummyRecords) {
+ messageBuffer.add(record);
+ }
+ Assert.assertEquals(dummyRecords.length, messageBuffer.size());
+ }
+
+ @Test
+ public void testAdding5RecordsWithBatchSize3() throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize3 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(3).build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithBatchSize3.getMessageFormat(), httpSinkConfigWithBatchSize3.getJsonBatchKey(),
+ httpSinkConfigWithBatchSize3.shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize3.getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize3.getCharset(), httpSinkConfigWithBatchSize3.getBody(), dummySchema
+ );
+ for (StructuredRecord record : dummyRecords) {
+ messageBuffer.add(record);
+ }
+ Assert.assertEquals(dummyRecords.length, messageBuffer.size());
+ }
+
+ @Test
+ public void testClear() throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize1 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1).build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithBatchSize1.getMessageFormat(), httpSinkConfigWithBatchSize1.getJsonBatchKey(),
+ httpSinkConfigWithBatchSize1.shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize1.getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize1.getCharset(), httpSinkConfigWithBatchSize1.getBody(), dummySchema
+ );
+ for (StructuredRecord record : dummyRecords) {
+ messageBuffer.add(record);
+ }
+ Assert.assertEquals(dummyRecords.length, messageBuffer.size());
+ messageBuffer.clear();
+ Assert.assertEquals(0, messageBuffer.size());
+ }
+
+ @Test
+ public void testIsEmpty() throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize1 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1).build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithBatchSize1.getMessageFormat(), httpSinkConfigWithBatchSize1.getJsonBatchKey(),
+ httpSinkConfigWithBatchSize1.shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize1.getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize1.getCharset(), httpSinkConfigWithBatchSize1.getBody(), dummySchema
+ );
+ Assert.assertTrue(messageBuffer.isEmpty());
+ }
+
+ @Test
+ public void testIsEmptyAfterClear() throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize1 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1).build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithBatchSize1.getMessageFormat(), httpSinkConfigWithBatchSize1.getJsonBatchKey(),
+ httpSinkConfigWithBatchSize1.shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize1.getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize1.getCharset(), httpSinkConfigWithBatchSize1.getBody(), dummySchema
+ );
+ for (StructuredRecord record : dummyRecords) {
+ messageBuffer.add(record);
+ }
+ Assert.assertEquals(dummyRecords.length, messageBuffer.size());
+ messageBuffer.clear();
+ Assert.assertTrue(messageBuffer.isEmpty());
+ }
+
+ @Test
+ public void testGetContentTypeWithJsonFormat() throws Exception {
+ HTTPSinkConfig httpSinkConfigWithMessageFormatJson = HTTPSinkConfig.newBuilder(VALID_CONFIG)
+ .setMessageFormat("JSON").build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithMessageFormatJson.getMessageFormat(),
+ httpSinkConfigWithMessageFormatJson.getJsonBatchKey(),
+ httpSinkConfigWithMessageFormatJson.shouldWriteJsonAsArray(),
+ httpSinkConfigWithMessageFormatJson.getDelimiterForMessages(),
+ httpSinkConfigWithMessageFormatJson.getCharset(), httpSinkConfigWithMessageFormatJson.getBody(), dummySchema
+ );
+ Assert.assertEquals("application/json", messageBuffer.getContentType());
+ }
+
+ @Test
+ public void testGetMessageWithBatchSize1() throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize1 = HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1)
+ .setMessageFormat("JSON").build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithBatchSize1.getMessageFormat(), httpSinkConfigWithBatchSize1.getJsonBatchKey(),
+ httpSinkConfigWithBatchSize1.shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize1.getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize1.getCharset(), httpSinkConfigWithBatchSize1.getBody(), dummySchema
+ );
+
+ messageBuffer.add(dummyRecords[0]);
+
+ Assert.assertEquals(dummyRecordsJsonString[0], messageBuffer.getMessage());
+ }
+
+ @Test
+ public void testGetMessageWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString() throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString =
+ HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(3).setMessageFormat("JSON").setWriteJsonAsArray(true)
+ .setJsonBatchKey("").build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.getMessageFormat(),
+ httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.getJsonBatchKey(),
+ httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.getCharset(),
+ httpSinkConfigWithBatchSize3AndJsonArrayTrueAndWrapperKeyEmptyString.getBody(), dummySchema
+ );
+
+ int batchSize = 3;
+ for (int i = 0; i < batchSize; i++) {
+ messageBuffer.add(dummyRecords[i]);
+ }
+
+ Assert.assertEquals("[" + dummyRecordsJsonString[0] + "," + dummyRecordsJsonString[1] + "," +
+ dummyRecordsJsonString[2] + "]", messageBuffer.getMessage());
+ }
+
+ @Test
+ public void testGetMessageWithBatchSize2AndJsonArrayTrueAndWrapperKeyData() throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData =
+ HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(2).setMessageFormat("JSON").setWriteJsonAsArray(true)
+ .setJsonBatchKey("data").build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.getMessageFormat(),
+ httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.getJsonBatchKey(),
+ httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.getCharset(),
+ httpSinkConfigWithBatchSize2AndJsonArrayTrueAndWrapperKeyData.getBody(), dummySchema
+ );
+
+ int batchSize = 2;
+ for (int i = 0; i < batchSize; i++) {
+ messageBuffer.add(dummyRecords[i]);
+ }
+
+ jsonWriter.beginObject();
+ jsonWriter.name("data");
+ jsonWriter.beginArray();
+ for (int i = 0; i < batchSize; i++) {
+ jsonWriter.jsonValue(dummyRecordsJsonString[i]);
+ }
+ jsonWriter.endArray();
+ jsonWriter.endObject();
+
+ Assert.assertEquals(stringWriter.toString(), messageBuffer.getMessage());
+
+ }
+
+ @Test
+ public void testGetMessageWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter()
+ throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter =
+ HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(4).setMessageFormat("JSON")
+ .setWriteJsonAsArray(false).setJsonBatchKey("").setDelimiterForMessages("|").build();
+ messageBuffer =
+ new MessageBuffer(
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter
+ .getMessageFormat(),
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter
+ .getJsonBatchKey(),
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter
+ .shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter
+ .getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter
+ .getCharset(),
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter.getBody(),
+ dummySchema
+ );
+
+ int batchSize = 4;
+ for (int i = 0; i < batchSize; i++) {
+ messageBuffer.add(dummyRecords[i]);
+ }
+
+ Assert.assertEquals(dummyRecordsJsonString[0] + "|" + dummyRecordsJsonString[1] + "|" +
+ dummyRecordsJsonString[2] + "|" + dummyRecordsJsonString[3], messageBuffer.getMessage());
+ }
+
+ @Test
+ public void testGetMessageWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter()
+ throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter =
+ HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(4).setMessageFormat("JSON")
+ .setWriteJsonAsArray(false).setJsonBatchKey("data").setDelimiterForMessages("|").build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.getMessageFormat(),
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.getJsonBatchKey(),
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.getCharset(),
+ httpSinkConfigWithBatchSize4AndJsonArrayFalseAndWrapperKeyDataAndCustomDelimiter.getBody(), dummySchema
+ );
+
+ int batchSize = 4;
+ for (int i = 0; i < batchSize; i++) {
+ messageBuffer.add(dummyRecords[i]);
+ }
+
+ Assert.assertEquals(dummyRecordsJsonString[0] + "|" + dummyRecordsJsonString[1] + "|" +
+ dummyRecordsJsonString[2] + "|" + dummyRecordsJsonString[3], messageBuffer.getMessage());
+ }
+
+ @Test
+ public void testGetMessageWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems() throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems =
+ HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(5).setMessageFormat("JSON").setWriteJsonAsArray(true)
+ .setJsonBatchKey("items").build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.getMessageFormat(),
+ httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.getJsonBatchKey(),
+ httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.getCharset(),
+ httpSinkConfigWithBatchSize5AndJsonArrayTrueAndWrapperKeyItems.getBody(), dummySchema
+ );
+
+ int batchSize = 5;
+ for (int i = 0; i < batchSize; i++) {
+ messageBuffer.add(dummyRecords[i]);
+ }
+
+ jsonWriter.beginObject();
+ jsonWriter.name("items");
+ jsonWriter.beginArray();
+ for (String jsonRecord : dummyRecordsJsonString) {
+ jsonWriter.jsonValue(jsonRecord);
+ }
+ jsonWriter.endArray();
+ jsonWriter.endObject();
+
+ Assert.assertEquals(stringWriter.toString(), messageBuffer.getMessage());
+ }
+
+ @Test
+ public void testGetMessageWithBatchSize1AndJsonArrayTrueAndWrapperKeyData() throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData =
+ HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(1).setMessageFormat("JSON").setWriteJsonAsArray(true)
+ .setJsonBatchKey("data").build();
+ messageBuffer = new MessageBuffer(
+ httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.getMessageFormat(),
+ httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.getJsonBatchKey(),
+ httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.getCharset(),
+ httpSinkConfigWithBatchSize1AndJsonArrayTrueAndWrapperKeyData.getBody(), dummySchema
+ );
+
+ messageBuffer.add(dummyRecords[0]);
+
+ jsonWriter.beginObject();
+ jsonWriter.name("data");
+ jsonWriter.beginArray();
+ jsonWriter.jsonValue(dummyRecordsJsonString[0]);
+ jsonWriter.endArray();
+ jsonWriter.endObject();
+
+ Assert.assertEquals(stringWriter.toString(), messageBuffer.getMessage());
+
+ }
+
+ @Test
+ public void testGetMessageWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter()
+ throws Exception {
+ HTTPSinkConfig httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter =
+ HTTPSinkConfig.newBuilder(VALID_CONFIG).setBatchSize(2).setMessageFormat("JSON")
+ .setWriteJsonAsArray(false).setJsonBatchKey("").setDelimiterForMessages(",").build();
+ messageBuffer =
+ new MessageBuffer(
+ httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter
+ .getMessageFormat(),
+ httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter
+ .getJsonBatchKey(),
+ httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter
+ .shouldWriteJsonAsArray(),
+ httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter
+ .getDelimiterForMessages(),
+ httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter
+ .getCharset(),
+ httpSinkConfigWithBatchSize2AndJsonArrayFalseAndWrapperKeyEmptyStringAndCustomDelimiter
+ .getBody(), dummySchema
+ );
+
+ int batchSize = 2;
+ for (int i = 0; i < batchSize; i++) {
+ messageBuffer.add(dummyRecords[i]);
+ }
+
+ Assert.assertEquals(dummyRecordsJsonString[0] + "," + dummyRecordsJsonString[1],
+ messageBuffer.getMessage());
+ }
+}
diff --git a/src/test/java/io/cdap/plugin/http/source/common/HttpBatchSourceConfigTest.java b/src/test/java/io/cdap/plugin/http/source/common/HttpBatchSourceConfigTest.java
new file mode 100644
index 00000000..0f691169
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/http/source/common/HttpBatchSourceConfigTest.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright © 2022 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.http.source.common;
+
+import com.google.auth.oauth2.AccessToken;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
+import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
+import io.cdap.plugin.http.common.http.OAuthUtil;
+import io.cdap.plugin.http.source.batch.HttpBatchSourceConfig;
+import io.cdap.plugin.http.source.common.http.HttpClient;
+import io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator;
+import io.cdap.plugin.http.source.common.pagination.PaginationIteratorFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.StatusLine;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+
+/**
+ * Unit tests for HttpBatchSourceConfig
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({PaginationIteratorFactory.class, HttpClientBuilder.class, HttpClients.class, OAuthUtil.class,
+ HttpHost.class, EntityUtils.class, HttpClient.class})
+@PowerMockIgnore("javax.management.*")
+public class HttpBatchSourceConfigTest {
+
+ @Mock
+ private HttpClient httpClient;
+
+ @Mock
+ private CloseableHttpResponse response;
+
+ @Mock
+ private StatusLine statusLine;
+
+ @Mock
+ private HttpEntity entity;
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMissingKeyValue() {
+ FailureCollector collector = new MockFailureCollector();
+ HttpBatchSourceConfig config = HttpBatchSourceConfig.builder()
+ .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:")
+ .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY)
+ .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120)
+ .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").build();
+ config.validate(collector);
+ }
+
+ @Test(expected = InvalidConfigPropertyException.class)
+ public void testEmptySchemaKeyValue() {
+ HttpBatchSourceConfig config = HttpBatchSourceConfig.builder()
+ .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth")
+ .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY)
+ .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120)
+ .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").build();
+ config.validateSchema();
+ }
+
+ @Test
+ public void testValidateOAuth2() throws Exception {
+ FailureCollector collector = new MockFailureCollector();
+ HttpBatchSourceConfig config = HttpBatchSourceConfig.builder()
+ .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth")
+ .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY)
+ .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120)
+ .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").setAuthType("oAuth2").setClientId("id").
+ setClientSecret("secret").setRefreshToken("token").setScopes("scope").setTokenUrl("https//:token").setRetryPolicy(
+ "exponential").build();
+ PowerMockito.mockStatic(PaginationIteratorFactory.class);
+ BaseHttpPaginationIterator baseHttpPaginationIterator = Mockito.mock(BaseHttpPaginationIterator.class);
+ PowerMockito.when(PaginationIteratorFactory.createInstance(Mockito.any(), Mockito.any()))
+ .thenReturn(baseHttpPaginationIterator);
+ PowerMockito.when(baseHttpPaginationIterator.supportsSkippingPages()).thenReturn(true);
+ PowerMockito.mockStatic(HttpClients.class);
+ HttpClientBuilder httpClientBuilder = Mockito.mock(HttpClientBuilder.class);
+ Mockito.when(HttpClients.custom()).thenReturn(httpClientBuilder);
+ AccessToken accessToken = Mockito.mock(AccessToken.class);
+ Mockito.when(accessToken.getTokenValue()).thenReturn("1234");
+ PowerMockito.mockStatic(OAuthUtil.class);
+ Mockito.when(OAuthUtil.getAccessTokenByRefreshToken(Mockito.any(), Mockito.any())).thenReturn(accessToken);
+ config.validate(collector);
+ Assert.assertEquals(0, collector.getValidationFailures().size());
+ }
+
+
+ @Test
+ public void testValidateOAuth2CredentialsWithProxy() throws IOException {
+ FailureCollector collector = new MockFailureCollector();
+ FailureCollector collectorMock = new MockFailureCollector();
+ HttpBatchSourceConfig config = HttpBatchSourceConfig.builder()
+ .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth")
+ .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY)
+ .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120)
+ .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").setAuthType("oAuth2").setClientId("id").
+ setClientSecret("secret").setRefreshToken("token").setScopes("scope").setTokenUrl("https//:token").setRetryPolicy(
+ "exponential").setProxyUrl("https://proxy").setProxyUsername("proxyuser").setProxyPassword("proxypassword")
+ .build();
+ HttpClientBuilder httpClientBuilder = Mockito.mock(HttpClientBuilder.class);
+ CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class);
+ HttpHost proxy = PowerMockito.mock(HttpHost.class);
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ httpClientBuilder.setProxy(proxy);
+ PowerMockito.mockStatic(HttpClients.class);
+ CloseableHttpClient closeableHttpClient = Mockito.mock(CloseableHttpClient.class);
+ Mockito.when(HttpClients.createDefault()).thenReturn(closeableHttpClient);
+ Mockito.when(HttpClients.custom()).thenReturn(httpClientBuilder);
+ Mockito.when(HttpClients.custom()
+ .setDefaultCredentialsProvider(credentialsProvider)
+ .setProxy(proxy)
+ .build()).thenReturn(closeableHttpClient);
+ AccessToken accessToken = Mockito.mock(AccessToken.class);
+ Mockito.when(accessToken.getTokenValue()).thenReturn("1234");
+ PowerMockito.mockStatic(OAuthUtil.class);
+ Mockito.when(OAuthUtil.getAccessTokenByRefreshToken(Mockito.any(), Mockito.any())).thenReturn(accessToken);
+ config.validate(collectorMock);
+ Assert.assertEquals(0, collector.getValidationFailures().size());
+ }
+
+ @Test
+ public void testValidateCredentialsOAuth2WithInvalidAccessTokenRequest() throws Exception {
+ FailureCollector collector = new MockFailureCollector();
+ HttpBatchSourceConfig config = HttpBatchSourceConfig.builder()
+ .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth")
+ .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY)
+ .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120)
+ .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").setAuthType("oAuth2").setClientId("id").
+ setClientSecret("secret").setRefreshToken("token").setScopes("scope").setTokenUrl("https//:token").setRetryPolicy(
+ "exponential").build();
+ CloseableHttpClient httpClientMock = Mockito.mock(CloseableHttpClient.class);
+ CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class);
+ Mockito.when(httpClientMock.execute(Mockito.any())).thenReturn(httpResponse);
+ HttpEntity entity = Mockito.mock(HttpEntity.class);
+ Mockito.when(httpResponse.getEntity()).thenReturn(entity);
+ PowerMockito.mockStatic(EntityUtils.class);
+ String response = " Error 404 (Not Found)!!1\n" +
+ " \n" +
+ " 404. That’s an error.\n";
+
+ Mockito.when(EntityUtils.toString(entity, "UTF-8")).thenReturn(response);
+ PowerMockito.mockStatic(PaginationIteratorFactory.class);
+ BaseHttpPaginationIterator baseHttpPaginationIterator = Mockito.mock(BaseHttpPaginationIterator.class);
+ PowerMockito.when(PaginationIteratorFactory.createInstance(Mockito.any(), Mockito.any()))
+ .thenReturn(baseHttpPaginationIterator);
+ PowerMockito.when(baseHttpPaginationIterator.supportsSkippingPages()).thenReturn(true);
+ PowerMockito.mockStatic(HttpClients.class);
+ HttpClientBuilder httpClientBuilder = Mockito.mock(HttpClientBuilder.class);
+ Mockito.when(HttpClients.custom()).thenReturn(httpClientBuilder);
+ Mockito.when(httpClientBuilder.build()).thenReturn(httpClientMock);
+ try {
+ config.validate(collector);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(1, collector.getValidationFailures().size());
+ }
+ }
+
+ @Test
+ public void testBasicAuthWithValidResponse() throws IOException {
+ FailureCollector failureCollector = new MockFailureCollector();
+ HttpBatchSourceConfig config = HttpBatchSourceConfig.builder()
+ .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth")
+ .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY)
+ .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120)
+ .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").setAuthType("basicAuth").setUsername(
+ "username").setPassword("password").setRetryPolicy(
+ "exponential").build();
+ Mockito.when(httpClient.executeHTTP(Mockito.any())).thenReturn(response);
+ Mockito.when(response.getStatusLine()).thenReturn(statusLine);
+ Mockito.when(statusLine.getStatusCode()).thenReturn(200);
+ config.validateBasicAuthResponse(failureCollector, httpClient);
+ Assert.assertEquals(0, failureCollector.getValidationFailures().size());
+ }
+
+ @Test
+ public void testValidConfigWithInvalidResponse() throws IOException {
+ FailureCollector failureCollector = new MockFailureCollector();
+ HttpBatchSourceConfig config = HttpBatchSourceConfig.builder()
+ .setReferenceName("test").setUrl("http://localhost").setHttpMethod("GET").setHeaders("Auth:auth")
+ .setFormat("JSON").setAuthType("none").setErrorHandling(StringUtils.EMPTY)
+ .setRetryPolicy(StringUtils.EMPTY).setMaxRetryDuration(600L).setConnectTimeout(120)
+ .setReadTimeout(120).setPaginationType("NONE").setVerifyHttps("true").setAuthType("basicAuth").setUsername(
+ "username").setPassword("password").setRetryPolicy(
+ "exponential").build();
+ Mockito.when(httpClient.executeHTTP(Mockito.any())).thenReturn(response);
+ Mockito.when(response.getStatusLine()).thenReturn(statusLine);
+ Mockito.when(statusLine.getStatusCode()).thenReturn(400);
+ Mockito.when(response.getEntity()).thenReturn(entity);
+ config.validateBasicAuthResponse(failureCollector, httpClient);
+ Assert.assertEquals(1, failureCollector.getValidationFailures().size());
+ Assert.assertEquals("Credential validation request failed with Http Status code: '400', Response: 'null'",
+ failureCollector
+ .getValidationFailures().get(0).getMessage());
+ }
+
+}
diff --git a/widgets/HTTP-batchsink.json b/widgets/HTTP-batchsink.json
index 7dac9024..d6665244 100644
--- a/widgets/HTTP-batchsink.json
+++ b/widgets/HTTP-batchsink.json
@@ -38,6 +38,30 @@
"default": "1"
}
},
+ {
+ "name": "writeJsonAsArray",
+ "label": "Write JSON As Array",
+ "widget-type": "toggle",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "True"
+ },
+ "off": {
+ "value": "false",
+ "label": "False"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Json Batch Key",
+ "name": "jsonBatchKey",
+ "widget-attributes": {
+ "default": ""
+ }
+ },
{
"widget-type": "select",
"label": "Message Format",
@@ -114,27 +138,6 @@
"default": "true"
}
},
- {
- "widget-type": "select",
- "label": "Number of Retries",
- "name": "numRetries",
- "widget-attributes": {
- "values": [
- "3",
- "0",
- "1",
- "2",
- "4",
- "5",
- "6",
- "7",
- "8",
- "9",
- "10"
- ],
- "default": "3"
- }
- },
{
"widget-type": "textbox",
"label": "Connection Timeout (milliseconds)",
@@ -152,19 +155,382 @@
}
},
{
- "widget-type": "select",
- "label": "Fail Pipeline On Non-200 Response ?",
- "name": "failOnNon200Response",
+ "widget-type": "keyvalue-dropdown",
+ "label": "HTTP Errors Handling",
+ "name": "httpErrorsHandling",
"widget-attributes": {
- "values": [
- "true",
- "false"
+ "default": "2..:Success,.*:Fail",
+ "showDelimiter": "false",
+ "dropdownOptions": [
+ "Success",
+ "Fail",
+ "Skip",
+ "Send to error",
+ "Retry and fail",
+ "Retry and skip",
+ "Retry and send to error"
],
- "default": "true"
+ "key-placeholder": "HTTP Status Code Regex"
+ }
+ },
+ {
+ "widget-type": "radio-group",
+ "label": "Non-HTTP Error Handling",
+ "name": "errorHandling",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "stopOnError",
+ "options": [
+ {
+ "id": "stopOnError",
+ "label": "Stop on error"
+ },
+ {
+ "id": "sendToError",
+ "label": "Send to error"
+ },
+ {
+ "id": "skipOnError",
+ "label": "Skip on error"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "radio-group",
+ "label": "Retry Policy",
+ "name": "retryPolicy",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "exponential",
+ "options": [
+ {
+ "id": "exponential",
+ "label": "Exponential"
+ },
+ {
+ "id": "linear",
+ "label": "Linear"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Linear Retry Interval",
+ "name": "linearRetryInterval",
+ "widget-attributes": {
+ "min": "0",
+ "default": "30"
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Max Retry Duration",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "min": "0",
+ "default": "600"
}
}
]
+ } ,
+ {
+ "label": "Authentication",
+ "properties": [
+ {
+ "widget-type": "radio-group",
+ "label": "Authentication Type",
+ "name": "authType",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "none",
+ "options": [
+ {
+ "id": "none",
+ "label": "None"
+ },
+ {
+ "id": "oAuth2",
+ "label": "OAuth2"
+ },
+ {
+ "id": "serviceAccount",
+ "label": "Service account"
+ },
+ {
+ "id": "basicAuth",
+ "label": "Basic Authentication"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Auth URL",
+ "name": "authUrl"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Token URL",
+ "name": "tokenUrl"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Client ID",
+ "name": "clientId"
+ },
+ {
+ "widget-type": "password",
+ "label": "Client Secret",
+ "name": "clientSecret"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Scopes",
+ "name": "scopes"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Refresh Token",
+ "name": "refreshToken"
+ },
+ {
+ "name": "serviceAccountType",
+ "label": "Service Account Type",
+ "widget-type": "radio-group",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "filePath",
+ "options": [
+ {
+ "id": "filePath",
+ "label": "File Path"
+ },
+ {
+ "id": "JSON",
+ "label": "JSON"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Service Account File Path",
+ "name": "serviceAccountFilePath",
+ "widget-attributes": {
+ "default": "auto-detect"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Service Account JSON",
+ "name": "serviceAccountJSON"
+ },
+ {
+ "widget-type": "textarea",
+ "label": "Service Account Scope",
+ "name": "serviceAccountScope"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Username",
+ "name": "username"
+ },
+ {
+ "widget-type": "password",
+ "label": "Password",
+ "name": "password"
+ },
+ {
+ "widget-type": "hidden",
+ "label": "OAuth2 Enabled",
+ "name": "oauth2Enabled",
+ "widget-attributes": {
+ "default": "false",
+ "on": {
+ "label": "True",
+ "value": "true"
+ },
+ "off": {
+ "label": "False",
+ "value": "false"
+ }
+ }
+ }
+ ]
+ },
+ {
+ "label": "HTTP Proxy",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Proxy URL",
+ "name": "proxyUrl"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Username",
+ "name": "proxyUsername"
+ },
+ {
+ "widget-type": "password",
+ "label": "Password",
+ "name": "proxyPassword"
+ }
+ ]
}
],
- "outputs": []
+ "outputs": [],
+ "filters": [
+ {
+ "name": "Linear Retry Interval",
+ "condition": {
+ "property": "retryPolicy",
+ "operator": "equal to",
+ "value": "linear"
+ },
+ "show": [
+ {
+ "name": "linearRetryInterval",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "name": "Should Write Json As Array",
+ "condition": {
+ "expression": "messageFormat == 'JSON'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "writeJsonAsArray"
+ }
+ ]
+ },
+ {
+ "name": "JsonBatchKey",
+ "condition": {
+ "expression": "messageFormat == 'JSON'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "jsonBatchKey"
+ }
+ ]
+ },
+ {
+ "name": "Proxy authentication",
+ "condition": {
+ "property": "proxyUrl",
+ "operator": "exists"
+ },
+ "show": [
+ {
+ "name": "proxyUsername",
+ "type": "property"
+ },
+ {
+ "name": "proxyPassword",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "name": "Authenticate with Basic Auth",
+ "condition": {
+ "property": "authType",
+ "operator": "equal to",
+ "value": "basicAuth"
+ },
+ "show": [
+ {
+ "name": "username",
+ "type": "property"
+ },
+ {
+ "name": "password",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "name": "Authenticate with OAuth2",
+ "condition": {
+ "property": "authType",
+ "operator": "equal to",
+ "value": "oAuth2"
+ },
+ "show": [
+ {
+ "name": "authUrl",
+ "type": "property"
+ },
+ {
+ "name": "tokenUrl",
+ "type": "property"
+ },
+ {
+ "name": "clientId",
+ "type": "property"
+ },
+ {
+ "name": "clientSecret",
+ "type": "property"
+ },
+ {
+ "name": "scopes",
+ "type": "property"
+ },
+ {
+ "name": "refreshToken",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "name": "Authenticate with service account",
+ "condition": {
+ "property": "authType",
+ "operator": "equal to",
+ "value": "serviceAccount"
+ },
+ "show": [
+ {
+ "name": "serviceAccountType",
+ "type": "property"
+ },
+ {
+ "name": "serviceAccountScope",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "name": "ServiceAuthenticationTypeFilePath",
+ "condition": {
+ "expression": "authType == 'serviceAccount' && serviceAccountType == 'filePath'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "serviceAccountFilePath"
+ }
+ ]
+ },
+ {
+ "name": "ServiceAuthenticationTypeJSON",
+ "condition": {
+ "expression": "authType == 'serviceAccount' && serviceAccountType == 'JSON'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "serviceAccountJSON"
+ }
+ ]
+ }
+ ]
}
diff --git a/widgets/HTTP-batchsource.json b/widgets/HTTP-batchsource.json
index 6c053781..1f142ae8 100644
--- a/widgets/HTTP-batchsource.json
+++ b/widgets/HTTP-batchsource.json
@@ -103,22 +103,33 @@
]
},
{
- "label": "OAuth2",
+ "label": "Authentication",
"properties": [
{
- "widget-type": "toggle",
- "label": "OAuth2 Enabled",
- "name": "oauth2Enabled",
+ "widget-type": "radio-group",
+ "label": "Authentication Type",
+ "name": "authType",
"widget-attributes": {
- "default": "false",
- "on": {
- "label": "True",
- "value": "true"
- },
- "off": {
- "label": "False",
- "value": "false"
- }
+ "layout": "inline",
+ "default": "none",
+ "options": [
+ {
+ "id": "none",
+ "label": "None"
+ },
+ {
+ "id": "oAuth2",
+ "label": "OAuth2"
+ },
+ {
+ "id": "serviceAccount",
+ "label": "Service account"
+ },
+ {
+ "id": "basicAuth",
+ "label": "Basic Authentication"
+ }
+ ]
}
},
{
@@ -150,12 +161,44 @@
"widget-type": "textbox",
"label": "Refresh Token",
"name": "refreshToken"
- }
- ]
- },
- {
- "label": "Basic Authentication",
- "properties": [
+ },
+ {
+ "name": "serviceAccountType",
+ "label": "Service Account Type",
+ "widget-type": "radio-group",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "filePath",
+ "options": [
+ {
+ "id": "filePath",
+ "label": "File Path"
+ },
+ {
+ "id": "JSON",
+ "label": "JSON"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Service Account File Path",
+ "name": "serviceAccountFilePath",
+ "widget-attributes": {
+ "default": "auto-detect"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Service Account JSON",
+ "name": "serviceAccountJSON"
+ },
+ {
+ "widget-type": "textarea",
+ "label": "Service Account Scope",
+ "name": "serviceAccountScope"
+ },
{
"widget-type": "textbox",
"label": "Username",
@@ -165,6 +208,22 @@
"widget-type": "password",
"label": "Password",
"name": "password"
+ },
+ {
+ "widget-type": "hidden",
+ "label": "OAuth2 Enabled",
+ "name": "oauth2Enabled",
+ "widget-attributes": {
+ "default": "false",
+ "on": {
+ "label": "True",
+ "value": "true"
+ },
+ "off": {
+ "label": "False",
+ "value": "false"
+ }
+ }
}
]
},
@@ -503,7 +562,7 @@
"name": "Proxy authentication",
"condition": {
"property": "proxyUrl",
- "operator": "exists",
+ "operator": "exists"
},
"show": [
{
@@ -599,11 +658,11 @@
]
},
{
- "name": "OAuth 2 disabled",
+ "name": "Authenticate with Basic Auth",
"condition": {
- "property": "oauth2Enabled",
+ "property": "authType",
"operator": "equal to",
- "value": "false"
+ "value": "basicAuth"
},
"show": [
{
@@ -617,11 +676,11 @@
]
},
{
- "name": "OAuth 2 enabled",
+ "name": "Authenticate with OAuth2",
"condition": {
- "property": "oauth2Enabled",
+ "property": "authType",
"operator": "equal to",
- "value": "true"
+ "value": "oAuth2"
},
"show": [
{
@@ -650,6 +709,48 @@
}
]
},
+ {
+ "name": "Authenticate with service account",
+ "condition": {
+ "property": "authType",
+ "operator": "equal to",
+ "value": "serviceAccount"
+ },
+ "show": [
+ {
+ "name": "serviceAccountType",
+ "type": "property"
+ },
+ {
+ "name": "serviceAccountScope",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "name": "ServiceAuthenticationTypeFilePath",
+ "condition": {
+ "expression": "authType == 'serviceAccount' && serviceAccountType == 'filePath'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "serviceAccountFilePath"
+ }
+ ]
+ },
+ {
+ "name": "ServiceAuthenticationTypeJSON",
+ "condition": {
+ "expression": "authType == 'serviceAccount' && serviceAccountType == 'JSON'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "serviceAccountJSON"
+ }
+ ]
+ },
{
"name": "JSON/XML Formatting",
"condition": {
diff --git a/widgets/HTTP-streamingsource.json b/widgets/HTTP-streamingsource.json
index e7abdb68..01a77c65 100644
--- a/widgets/HTTP-streamingsource.json
+++ b/widgets/HTTP-streamingsource.json
@@ -108,22 +108,33 @@
]
},
{
- "label": "OAuth2",
+ "label": "Authentication",
"properties": [
{
- "widget-type": "toggle",
- "label": "OAuth2 Enabled",
- "name": "oauth2Enabled",
+ "widget-type": "radio-group",
+ "label": "Authentication Type",
+ "name": "authType",
"widget-attributes": {
- "default": "false",
- "on": {
- "label": "True",
- "value": "true"
- },
- "off": {
- "label": "False",
- "value": "false"
- }
+ "layout": "inline",
+ "default": "none",
+ "options": [
+ {
+ "id": "none",
+ "label": "None"
+ },
+ {
+ "id": "oAuth2",
+ "label": "OAuth2"
+ },
+ {
+ "id": "serviceAccount",
+ "label": "Service account"
+ },
+ {
+ "id": "basicAuth",
+ "label": "Basic Authentication"
+ }
+ ]
}
},
{
@@ -155,12 +166,44 @@
"widget-type": "textbox",
"label": "Refresh Token",
"name": "refreshToken"
- }
- ]
- },
- {
- "label": "Basic Authentication",
- "properties": [
+ },
+ {
+ "name": "serviceAccountType",
+ "label": "Service Account Type",
+ "widget-type": "radio-group",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "filePath",
+ "options": [
+ {
+ "id": "filePath",
+ "label": "File Path"
+ },
+ {
+ "id": "JSON",
+ "label": "JSON"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Service Account File Path",
+ "name": "serviceAccountFilePath",
+ "widget-attributes": {
+ "default": "auto-detect"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Service Account JSON",
+ "name": "serviceAccountJSON"
+ },
+ {
+ "widget-type": "textarea",
+ "label": "Service Account Scope",
+ "name": "serviceAccountScope"
+ },
{
"widget-type": "textbox",
"label": "Username",
@@ -170,6 +213,22 @@
"widget-type": "password",
"label": "Password",
"name": "password"
+ },
+ {
+ "widget-type": "hidden",
+ "label": "OAuth2 Enabled",
+ "name": "oauth2Enabled",
+ "widget-attributes": {
+ "default": "false",
+ "on": {
+ "label": "True",
+ "value": "true"
+ },
+ "off": {
+ "label": "False",
+ "value": "false"
+ }
+ }
}
]
},