Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Aug 29, 2024
1 parent 89fcf0a commit 45e2ff4
Show file tree
Hide file tree
Showing 27 changed files with 3,066 additions and 777 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"name" : "flowconfigs",
"namespace" : "org.apache.gobblin.service",
"path" : "/flowconfigs",
"schema" : "org.apache.gobblin.service.FlowConfig",
"doc" : "Resource for handling flow configuration requests\n\ngenerated from: org.apache.gobblin.service.FlowConfigsResource",
"collection" : {
"identifier" : {
"name" : "id",
"type" : "org.apache.gobblin.service.FlowId",
"params" : "com.linkedin.restli.common.EmptyRecord"
},
"supports" : [ "create", "delete", "get", "update" ],
"methods" : [ {
"method" : "create",
"doc" : "Create a flow configuration that the service will forward to execution instances for execution"
}, {
"method" : "get",
"doc" : "Retrieve the flow configuration with the given key"
}, {
"method" : "update",
"doc" : "Update the flow configuration with the specified key. Running flows are not affected.\n An error is raised if the flow configuration does not exist."
}, {
"method" : "delete",
"doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows."
} ],
"entity" : {
"path" : "/flowconfigs/{id}"
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
{
"annotations" : {
"deprecated" : { }
},
"name" : "flowstatuses",
"namespace" : "org.apache.gobblin.service",
"path" : "/flowstatuses",
"schema" : "org.apache.gobblin.service.FlowStatus",
"doc" : "Resource for handling flow status requests. Deprecated, use {@link FlowExecutionResource}\n\ngenerated from: org.apache.gobblin.service.FlowStatusResource",
"doc" : "Resource for handling flow status requests\n\ngenerated from: org.apache.gobblin.service.FlowStatusResource",
"collection" : {
"identifier" : {
"name" : "id",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
{
"models" : [ {
"type" : "record",
"name" : "EmptyRecord",
"namespace" : "com.linkedin.restli.common",
"doc" : "An literally empty record. Intended as a marker to indicate the absence of content where a record type is required. If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. For example, CreateRequest extends Request<EmptyRecord> to indicate it has no response body. Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
"fields" : [ ],
"validate" : {
"com.linkedin.restli.common.EmptyRecordValidator" : { }
}
}, {
"type" : "record",
"name" : "FlowConfig",
"namespace" : "org.apache.gobblin.service",
"doc" : "Defines a flow configuration that can be compiled into Gobblin jobs",
"fields" : [ {
"name" : "id",
"type" : {
"type" : "record",
"name" : "FlowId",
"doc" : "Identifier for a Gobblin as a Service flow",
"fields" : [ {
"name" : "flowName",
"type" : "string",
"doc" : "Name of the flow",
"validate" : {
"strlen" : {
"max" : 128,
"min" : 1
}
}
}, {
"name" : "flowGroup",
"type" : "string",
"doc" : "Group of the flow. This defines the namespace for the flow.",
"validate" : {
"strlen" : {
"max" : 128,
"min" : 1
}
}
} ]
},
"doc" : "Identifier for the flow"
}, {
"name" : "schedule",
"type" : {
"type" : "record",
"name" : "Schedule",
"doc" : "Attributes for defining a job schedule",
"fields" : [ {
"name" : "cronSchedule",
"type" : "string",
"doc" : "Schedule for flow in cron format",
"validate" : {
"org.apache.gobblin.service.validator.CronValidator" : { }
}
}, {
"name" : "runImmediately",
"type" : "boolean",
"doc" : "Set to true to request that a job with a schedule be run immediately in addition to being scheduled",
"default" : false
} ]
},
"doc" : "Optional schedule for when to execution the flow. If a schedule is not specified then the flow is executed immediately.",
"optional" : true
}, {
"name" : "templateUris",
"type" : "string",
"doc" : "Comma separated list of URIs for templates used in the flow. The template location is defined by the multiproduct that packages the template.",
"validate" : {
"org.apache.gobblin.service.validator.TemplateUriValidator" : { }
}
}, {
"name" : "explain",
"type" : "boolean",
"doc" : "Return the compiled flow as a string. If enabled, the flow is not added.",
"default" : false
}, {
"name" : "owningGroup",
"type" : "string",
"doc" : "Optional string name of group that the requester belongs to for group ownership of flows.",
"optional" : true
}, {
"name" : "properties",
"type" : {
"type" : "map",
"values" : "string"
},
"doc" : "Properties for the flow. These properties are passed to the compiled Gobblin jobs."
} ]
}, "org.apache.gobblin.service.FlowId", "org.apache.gobblin.service.Schedule" ],
"schema" : {
"name" : "flowconfigs",
"namespace" : "org.apache.gobblin.service",
"path" : "/flowconfigs",
"schema" : "org.apache.gobblin.service.FlowConfig",
"doc" : "Resource for handling flow configuration requests\n\ngenerated from: org.apache.gobblin.service.FlowConfigsResource",
"collection" : {
"identifier" : {
"name" : "id",
"type" : "org.apache.gobblin.service.FlowId",
"params" : "com.linkedin.restli.common.EmptyRecord"
},
"supports" : [ "create", "delete", "get", "update" ],
"methods" : [ {
"method" : "create",
"doc" : "Create a flow configuration that the service will forward to execution instances for execution"
}, {
"method" : "get",
"doc" : "Retrieve the flow configuration with the given key"
}, {
"method" : "update",
"doc" : "Update the flow configuration with the specified key. Running flows are not affected.\n An error is raised if the flow configuration does not exist."
}, {
"method" : "delete",
"doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows."
} ],
"entity" : {
"path" : "/flowconfigs/{id}"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,11 @@
"deprecated" : "Use FlowExecution instead"
}, "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.Issue", "org.apache.gobblin.service.IssueSeverity", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus", "org.apache.gobblin.service.Timestamp" ],
"schema" : {
"annotations" : {
"deprecated" : { }
},
"name" : "flowstatuses",
"namespace" : "org.apache.gobblin.service",
"path" : "/flowstatuses",
"schema" : "org.apache.gobblin.service.FlowStatus",
"doc" : "Resource for handling flow status requests. Deprecated, use {@link FlowExecutionResource}\n\ngenerated from: org.apache.gobblin.service.FlowStatusResource",
"doc" : "Resource for handling flow status requests\n\ngenerated from: org.apache.gobblin.service.FlowStatusResource",
"collection" : {
"identifier" : {
"name" : "id",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.linkedin.common.callback.FutureCallback;
import com.linkedin.common.util.None;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.transport.common.Client;
import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.CreateIdRequest;
import com.linkedin.restli.client.DeleteRequest;
import com.linkedin.restli.client.GetRequest;
import com.linkedin.restli.client.Response;
import com.linkedin.restli.client.ResponseFuture;
import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.client.UpdateRequest;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.IdResponse;


/**
* Flow Configuration client for REST flow configuration server
*/
public class FlowConfigClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FlowConfigClient.class);

private Optional<HttpClientFactory> _httpClientFactory;
private Optional<RestClient> _restClient;
private final FlowconfigsRequestBuilders _flowconfigsRequestBuilders;
public static final String DELETE_STATE_STORE_KEY = "delete.state.store";

/**
* Construct a {@link FlowConfigClient} to communicate with http flow config server at URI serverUri
* @param serverUri address and port of the REST server
*/
public FlowConfigClient(String serverUri) {
this(serverUri, Collections.emptyMap());
}

public FlowConfigClient(String serverUri, Map<String, String> properties) {
LOG.debug("FlowConfigClient with serverUri " + serverUri);

_httpClientFactory = Optional.of(new HttpClientFactory());
Client r2Client = new TransportClientAdapter(_httpClientFactory.get().getClient(properties));
_restClient = Optional.of(new RestClient(r2Client, serverUri));

_flowconfigsRequestBuilders = createRequestBuilders();
}

/**
* Construct a {@link FlowConfigClient} to communicate with http flow config server at URI serverUri
* @param restClient restClient to send restli request
*/
public FlowConfigClient(RestClient restClient) {
LOG.debug("FlowConfigClient with restClient " + restClient);

_httpClientFactory = Optional.absent();
_restClient = Optional.of(restClient);

_flowconfigsRequestBuilders = createRequestBuilders();
}

// Clients using different service name can override this method
// RequestBuilders decide the name of the service requests go to.
protected FlowconfigsRequestBuilders createRequestBuilders() {
return new FlowconfigsRequestBuilders();
}

/**
* Create a flow configuration
* @param flowConfig flow configuration attributes
* @throws RemoteInvocationException
*/
public void createFlowConfig(FlowConfig flowConfig)
throws RemoteInvocationException {
LOG.debug("createFlowConfig with groupName " + flowConfig.getId().getFlowGroup() + " flowName " +
flowConfig.getId().getFlowName());

CreateIdRequest<ComplexResourceKey<FlowId, EmptyRecord>, FlowConfig> request =
_flowconfigsRequestBuilders.create().input(flowConfig).build();
ResponseFuture<IdResponse<ComplexResourceKey<FlowId, EmptyRecord>>> flowConfigResponseFuture =
_restClient.get().sendRequest(request);

flowConfigResponseFuture.getResponse();
}

/**
* Update a flow configuration
* @param flowConfig flow configuration attributes
* @throws RemoteInvocationException
*/
public void updateFlowConfig(FlowConfig flowConfig)
throws RemoteInvocationException {
LOG.debug("updateFlowConfig with groupName " + flowConfig.getId().getFlowGroup() + " flowName " +
flowConfig.getId().getFlowName());

FlowId flowId = new FlowId().setFlowGroup(flowConfig.getId().getFlowGroup())
.setFlowName(flowConfig.getId().getFlowName());

UpdateRequest<FlowConfig> updateRequest =
_flowconfigsRequestBuilders.update().id(new ComplexResourceKey<>(flowId, new EmptyRecord()))
.input(flowConfig).build();

ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(updateRequest);

response.getResponse();
}

/**
* Get a flow configuration
* @param flowId identifier of flow configuration to get
* @return a {@link FlowConfig} with the flow configuration
* @throws RemoteInvocationException
*/
public FlowConfig getFlowConfig(FlowId flowId)
throws RemoteInvocationException {
LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName());

GetRequest<FlowConfig> getRequest = _flowconfigsRequestBuilders.get()
.id(new ComplexResourceKey<>(flowId, new EmptyRecord())).build();

Response<FlowConfig> response =
_restClient.get().sendRequest(getRequest).getResponse();
return response.getEntity();
}

/**
* Delete a flow configuration
* @param flowId identifier of flow configuration to delete
* @throws RemoteInvocationException
*/
public void deleteFlowConfig(FlowId flowId)
throws RemoteInvocationException {
LOG.debug("deleteFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
flowId.getFlowName());

DeleteRequest<FlowConfig> deleteRequest = _flowconfigsRequestBuilders.delete()
.id(new ComplexResourceKey<>(flowId, new EmptyRecord())).build();
ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest);

response.getResponse();
}

/**
* Delete a flow configuration
* @param flowId identifier of flow configuration to delete
* @throws RemoteInvocationException
*/
public void deleteFlowConfigWithStateStore(FlowId flowId)
throws RemoteInvocationException {
LOG.debug("deleteFlowConfig and state store with groupName " + flowId.getFlowGroup() + " flowName " +
flowId.getFlowName());

DeleteRequest<FlowConfig> deleteRequest = _flowconfigsRequestBuilders.delete()
.id(new ComplexResourceKey<>(flowId, new EmptyRecord())).setHeader(DELETE_STATE_STORE_KEY, Boolean.TRUE.toString()).build();
ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest);

response.getResponse();
}

@Override
public void close()
throws IOException {
if (_restClient.isPresent()) {
_restClient.get().shutdown(new FutureCallback<None>());
}

if (_httpClientFactory.isPresent()) {
_httpClientFactory.get().shutdown(new FutureCallback<None>());
}
}
}
Loading

0 comments on commit 45e2ff4

Please sign in to comment.