Skip to content

Commit

Permalink
Connector builder server: Remove list endpoint (#8403)
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Reuter committed Aug 21, 2023
1 parent 271babb commit 5c0c404
Show file tree
Hide file tree
Showing 13 changed files with 5 additions and 351 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@
import io.airbyte.connector_builder.api.model.generated.ResolveManifestRequestBody;
import io.airbyte.connector_builder.api.model.generated.StreamRead;
import io.airbyte.connector_builder.api.model.generated.StreamReadRequestBody;
import io.airbyte.connector_builder.api.model.generated.StreamsListRead;
import io.airbyte.connector_builder.api.model.generated.StreamsListRequestBody;
import io.airbyte.connector_builder.handlers.HealthHandler;
import io.airbyte.connector_builder.handlers.ResolveManifestHandler;
import io.airbyte.connector_builder.handlers.StreamHandler;
import io.airbyte.connector_builder.handlers.StreamsHandler;
import io.micronaut.context.annotation.Context;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
Expand All @@ -39,16 +36,13 @@ public class ConnectorBuilderController implements V1Api {
private final HealthHandler healthHandler;
private final StreamHandler streamHandler;
private final ResolveManifestHandler resolveManifestHandler;
private final StreamsHandler streamsHandler;

public ConnectorBuilderController(final HealthHandler healthHandler,
final ResolveManifestHandler resolveManifestHandler,
final StreamHandler streamHandler,
final StreamsHandler streamsHandler) {
final StreamHandler streamHandler) {
this.healthHandler = healthHandler;
this.streamHandler = streamHandler;
this.resolveManifestHandler = resolveManifestHandler;
this.streamsHandler = streamsHandler;
}

@Override
Expand All @@ -60,15 +54,6 @@ public HealthCheckRead getHealthCheck() {
return healthHandler.getHealthCheck();
}

@Override
@Post(uri = "/streams/list",
produces = MediaType.APPLICATION_JSON)
@Secured({AUTHENTICATED_USER})
@ExecuteOn(TaskExecutors.IO)
public StreamsListRead listStreams(final StreamsListRequestBody streamsListRequestBody) {
return streamsHandler.listStreams(streamsListRequestBody);
}

@Override
@Post(uri = "/stream/read",
produces = MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ResolveManifest resolveManifest(
resolveManifestRequestBody.getWorkspaceId(), resolveManifestRequestBody.getProjectId());
return this.requester.resolveManifest(resolveManifestRequestBody.getManifest());
} catch (final IOException exc) {
LOGGER.error("Error handling list_streams request.", exc);
LOGGER.error("Error handling resolve_manifest request.", exc);
throw new ConnectorBuilderException("Error handling resolve_manifest request.", exc);
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.connector_builder.api.model.generated.ResolveManifest;
import io.airbyte.connector_builder.api.model.generated.StreamRead;
import io.airbyte.connector_builder.api.model.generated.StreamsListRead;
import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException;
import io.airbyte.connector_builder.exceptions.CdkProcessException;
import io.airbyte.connector_builder.exceptions.ConnectorBuilderException;
import java.io.IOException;

Expand All @@ -21,8 +19,6 @@ public interface AirbyteCdkRequester {
ResolveManifest resolveManifest(final JsonNode manifest)
throws IOException, AirbyteCdkInvalidInputException, ConnectorBuilderException;

StreamsListRead listStreams(final JsonNode manifest, JsonNode config) throws IOException, AirbyteCdkInvalidInputException, CdkProcessException;

StreamRead readStream(final JsonNode manifest, final JsonNode config, final String stream, final Integer recordLimit)
throws IOException, AirbyteCdkInvalidInputException, ConnectorBuilderException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
import io.airbyte.connector_builder.api.model.generated.StreamRead;
import io.airbyte.connector_builder.api.model.generated.StreamReadAuxiliaryRequestsInner;
import io.airbyte.connector_builder.api.model.generated.StreamReadSlicesInner;
import io.airbyte.connector_builder.api.model.generated.StreamsListRead;
import io.airbyte.connector_builder.api.model.generated.StreamsListReadStreamsInner;
import io.airbyte.connector_builder.command_runner.SynchronousCdkCommandRunner;
import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException;
import io.airbyte.connector_builder.exceptions.CdkProcessException;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.List;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,7 +37,6 @@ public class AirbyteCdkRequesterImpl implements AirbyteCdkRequester {
private static final String manifestKey = "__injected_declarative_manifest";
private static final String recordLimitKey = "max_records";
private static final String resolveManifestCommand = "resolve_manifest";
private static final String listStreamsCommand = "list_streams";
private static final String readStreamCommand = "test_read";
private static final String catalogTemplate = """
{
Expand Down Expand Up @@ -107,34 +103,6 @@ public ResolveManifest resolveManifest(final JsonNode manifest)
return new ResolveManifest().manifest(record.getData().get("manifest"));
}

@Override
@Trace(operationName = TracingHelper.CONNECTOR_BUILDER_OPERATION_NAME)
public StreamsListRead listStreams(final JsonNode manifest, final JsonNode config)
throws IOException, AirbyteCdkInvalidInputException, CdkProcessException {
return new StreamsListRead().streams(
StreamSupport.stream(request(manifest, config, listStreamsCommand).getData().get("streams").spliterator(), false).map(this::adaptStream)
.toList());
}

private StreamsListReadStreamsInner adaptStream(final JsonNode stream) {
if (isNull(stream, "name")) {
throw new AirbyteCdkInvalidInputException(String.format(
"Unexpected fatal error: streams are expected to have field 'name' but could not find it in %s. Please open a GitHub issue with Airbyte",
stream));
}
if (isNull(stream, "url")) {
throw new AirbyteCdkInvalidInputException(String.format(
"Unexpected fatal error: streams are expected to have field 'url' but could not find it in %s. Please open a GitHub issue with Airbyte",
stream));
}

return new StreamsListReadStreamsInner().name(stream.get("name").asText()).url(stream.get("url").asText());
}

private static boolean isNull(final JsonNode jsonNode, final String fieldName) {
return jsonNode.get(fieldName) == null || jsonNode.get(fieldName).isNull();
}

/**
* Launch a CDK process responsible for handling requests.
*/
Expand Down
63 changes: 0 additions & 63 deletions airbyte-connector-builder-server/src/main/openapi/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,6 @@ paths:
$ref: "#/components/responses/ExceptionResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/streams/list:
post:
summary: List all streams present in the connector manifest, along with their specific request URLs
operationId: listStreams
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/StreamsListRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/StreamsListRead"
"400":
$ref: "#/components/responses/ExceptionResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/manifest/resolve:
post:
summary: Given a JSON manifest, returns a JSON manifest with all of the $refs and $parameters resolved and flattened
Expand Down Expand Up @@ -251,48 +230,6 @@ components:
# --- Commenting out for now since they do not work with our orval openapi client generator ---
# AirbyteProtocol:
# $ref: ../../../../airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml
StreamsListRequestBody:
type: object
required:
- config
- manifest
properties:
config:
$ref: "#/components/schemas/ConnectorConfig"
manifest:
$ref: "#/components/schemas/ConnectorManifest"
workspace_id:
type: string
description: ID of the workspace where the request is coming from
project_id:
type: string
description: ID of the project where the request is coming from
StreamsListRead:
type: object
required:
- streams
properties:
streams:
type: array
items:
type: object
description: The stream names present in the connector manifest
required:
- name
- url
properties:
name:
type: string
description: The name of the stream
url:
type: string
description: The URL to which read requests will be made for this stream
# --- Potential addition for a later phase ---
# slices:
# type: array
# description: list of slices that will be retrieved for this stream
# items:
# type: object
ResolveManifestRequestBody:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ micronaut:
access-logger:
enabled: ${HTTP_ACCESS_LOG_ENABLED:true}
endpoints:
v1/list/streams:
enabled: true
sensitive: true
v1/manifest_template:
enable: true
sensitive: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import io.airbyte.connector_builder.api.model.generated.ResolveManifestRequestBody;
import io.airbyte.connector_builder.api.model.generated.StreamRead;
import io.airbyte.connector_builder.api.model.generated.StreamReadRequestBody;
import io.airbyte.connector_builder.api.model.generated.StreamsListRead;
import io.airbyte.connector_builder.api.model.generated.StreamsListRequestBody;
import io.airbyte.connector_builder.command_runner.MockSynchronousPythonCdkCommandRunner;
import io.airbyte.connector_builder.command_runner.SynchronousCdkCommandRunner;
import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException;
Expand All @@ -28,7 +26,6 @@
import io.airbyte.connector_builder.handlers.HealthHandler;
import io.airbyte.connector_builder.handlers.ResolveManifestHandler;
import io.airbyte.connector_builder.handlers.StreamHandler;
import io.airbyte.connector_builder.handlers.StreamsHandler;
import io.airbyte.connector_builder.requester.AirbyteCdkRequesterImpl;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.VersionedAirbyteStreamFactory;
Expand Down Expand Up @@ -62,7 +59,6 @@ class ConnectorBuilderControllerIntegrationTest {
private HealthHandler healthHandler;
static String cdkException;
static String streamRead;
static String streamsList;
static String recordManifestResolve;
static String traceManifestResolve;
static JsonNode validManifest;
Expand All @@ -81,7 +77,6 @@ public static void setUpClass() throws IOException {
final String relativeDir = "src/test/java/io/airbyte/connector_builder/fixtures";
validManifest = new ObjectMapper().readTree(readContents(String.format("%s/ValidManifest.json", relativeDir)));
streamRead = readContents(String.format("%s/RecordStreamRead.json", relativeDir));
streamsList = readContents(String.format("%s/RecordStreamsList.json", relativeDir));
recordManifestResolve = readContents(String.format("%s/RecordManifestResolve.json", relativeDir));
traceManifestResolve = readContents(String.format("%s/TraceManifestResolve.json", relativeDir));
cdkException = readContents(String.format("%s/CdkException.txt", relativeDir));
Expand All @@ -101,8 +96,7 @@ ConnectorBuilderController createControllerWithSynchronousRunner(
final SynchronousCdkCommandRunner commandRunner = new MockSynchronousPythonCdkCommandRunner(
this.writer, this.streamFactory, shouldThrow, exitCode, inputStream, errorStream, outputStream);
final AirbyteCdkRequesterImpl requester = new AirbyteCdkRequesterImpl(commandRunner);
return new ConnectorBuilderController(this.healthHandler, new ResolveManifestHandler(requester), new StreamHandler(requester),
new StreamsHandler(requester));
return new ConnectorBuilderController(this.healthHandler, new ResolveManifestHandler(requester), new StreamHandler(requester));
}

@Test
Expand All @@ -122,20 +116,6 @@ void givenTraceMessageWhenStreamReadThenThrowException() {
() -> controller.readStream(new StreamReadRequestBody().config(A_CONFIG).manifest(A_MANIFEST).stream(A_STREAM)));
}

@Test
void testStreamsList() {
final ConnectorBuilderController controller = givenAirbyteCdkReturnMessage(streamsList);
final StreamsListRead streamsListRead = controller.listStreams(new StreamsListRequestBody().config(A_CONFIG).manifest(A_MANIFEST));
assertTrue(streamsListRead.getStreams().size() > 0);
}

@Test
void givenTraceMessageWhenStreamsListThenThrowException() {
final ConnectorBuilderController controller = givenAirbyteCdkReturnMessage(traceManifestResolve);
Assertions.assertThrows(AirbyteCdkInvalidInputException.class,
() -> controller.listStreams(new StreamsListRequestBody().config(A_CONFIG).manifest(A_MANIFEST)));
}

@Test
void testResolveManifestSuccess() {
final InputStream stream = new ByteArrayInputStream(
Expand Down
Loading

0 comments on commit 5c0c404

Please sign in to comment.