Skip to content

Commit

Permalink
Rebased on top of mvp-demo, addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Saad Khan <[email protected]>
  • Loading branch information
khansaad committed Oct 10, 2024
1 parent 18fc290 commit ccc9c78
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 118 deletions.
166 changes: 63 additions & 103 deletions src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,27 @@
package com.autotune.analyzer.workerimpl;


import com.autotune.analyzer.kruizeObject.CreateExperimentConfigBean;
import com.autotune.analyzer.exceptions.FetchMetricsError;
import com.autotune.analyzer.kruizeObject.RecommendationSettings;
import com.autotune.analyzer.serviceObjects.*;
import com.autotune.analyzer.utils.AnalyzerConstants;
import com.autotune.common.data.dataSourceMetadata.*;
import com.autotune.common.datasource.DataSourceInfo;
import com.autotune.common.datasource.DataSourceManager;
import com.autotune.common.k8sObjects.TrialSettings;
import com.autotune.common.utils.CommonUtils;
import com.autotune.operator.KruizeDeploymentInfo;
import com.autotune.utils.GenericRestApiClient;
import com.autotune.utils.KruizeConstants;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.autotune.utils.Utils;
import org.json.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -118,7 +115,7 @@ public void run() {
jobData.setStatus(COMPLETED);
jobData.setMessage(NOTHING);
} else {
Map<String, CreateExperimentAPIObject> createExperimentAPIObjectMap = getExperimentMap(metadataInfo); //Todo Store this map in buffer and use it if BulkAPI pods restarts and support experiment_type
Map<String, CreateExperimentAPIObject> createExperimentAPIObjectMap = getExperimentMap(metadataInfo, datasource); //Todo Store this map in buffer and use it if BulkAPI pods restarts and support experiment_type
jobData.setTotal_experiments(createExperimentAPIObjectMap.size());
jobData.setProcessed_experiments(0);
if (jobData.getTotal_experiments() > KruizeDeploymentInfo.BULK_API_LIMIT) {
Expand All @@ -140,29 +137,17 @@ public void run() {
jobData.getData().getRecommendations().getData().setUnprocessed(
appendExperiments(recommendationData.getUnprocessed(), experiment_name)
);

URL url;
HttpURLConnection connection = null;
int statusCode = 0;
// send request to generateRecommendations API
GenericRestApiClient apiClient = new GenericRestApiClient(datasource);
apiClient.setBaseURL(String.format(KruizeDeploymentInfo.recommendations_url, experiment_name));
int responseCode = 0;
try {
url = new URL(String.format(KruizeDeploymentInfo.recommendations_url, experiment_name));
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");

recommendationData.moveToProgress(experiment_name);

statusCode = connection.getResponseCode();
} catch (IOException e) {
LOGGER.error(e.getMessage());

recommendationData.moveToFailed(experiment_name);

throw new RuntimeException(e);
} finally {
if (null != connection) connection.disconnect();
responseCode = apiClient.callKruizeAPI(null);
LOGGER.info("API Response code: {}", responseCode);
} catch (Exception | FetchMetricsError e) {
e.printStackTrace();
}
if (statusCode == HttpURLConnection.HTTP_CREATED) {

if (responseCode == HttpURLConnection.HTTP_CREATED) {
recommendationData.moveToCompleted(experiment_name);
jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1);

Expand Down Expand Up @@ -193,7 +178,7 @@ public void run() {
}


Map<String, CreateExperimentAPIObject> getExperimentMap(DataSourceMetadataInfo metadataInfo) throws IOException {
Map<String, CreateExperimentAPIObject> getExperimentMap(DataSourceMetadataInfo metadataInfo, DataSourceInfo datasource) throws Exception {
Map<String, CreateExperimentAPIObject> createExperimentAPIObjectMap = new HashMap<>();
Collection<DataSource> dataSourceCollection = metadataInfo.getDataSourceHashMap().values();
for (DataSource ds : dataSourceCollection) {
Expand All @@ -212,18 +197,24 @@ Map<String, CreateExperimentAPIObject> getExperimentMap(DataSourceMetadataInfo m
+ namespace.getDataSourceNamespaceName() + "|" + dsw.getDataSourceWorkloadName() + "("
+ dsw.getDataSourceWorkloadType() + ")" + "|" + dc.getDataSourceContainerName();
// create JSON to be passed in the createExperimentAPI
List<CreateExperimentAPIObject> createExperimentAPIObjectList = new ArrayList<>();
String createExpJSON = prepareCreateExperimentJSONInput(dc, dsc, dsw, namespace,
experiment_name, this.bulkInput.getDatasource());
experiment_name, this.bulkInput.getDatasource(), createExperimentAPIObjectList);
// send request to createExperiment API for experiment creation
int responseCode = sendCreateExperimentRequest(createExpJSON);
GenericRestApiClient apiClient = new GenericRestApiClient(datasource);
apiClient.setBaseURL(KruizeDeploymentInfo.experiments_url);
int responseCode;
try {
responseCode = apiClient.callKruizeAPI(createExpJSON);
LOGGER.info("API Response code: {}", responseCode);
} catch (Exception | FetchMetricsError e) {
e.printStackTrace();
continue;
}
// if the experiment is successfully created, add it in the map
if (responseCode == HttpURLConnection.HTTP_CREATED) {
// Parse and extract the first JSON object from the array
JsonObject jsonObject = JsonParser.parseString(createExpJSON).getAsJsonArray().get(0).getAsJsonObject();
// Convert JSON object to CreateExperimentAPIObject
CreateExperimentAPIObject createExperimentAPIObject = new Gson().fromJson(jsonObject, CreateExperimentAPIObject.class);
// Add to the map using the experiment_name
createExperimentAPIObjectMap.put(jsonObject.get("experiment_name").getAsString(), createExperimentAPIObject);
createExperimentAPIObjectMap.put(experiment_name, createExperimentAPIObjectList.get(0));
}
}
}
Expand Down Expand Up @@ -287,85 +278,54 @@ private JSONObject processDateRange(BulkInput.TimeRange timeRange) {


/**
* @param dc DataSourceContainer object to get the container details
* @param dsc DataSourceCluster object to get the cluster details
* @param dsw DataSourceWorkload object to get the workload details
* @param namespace DataSourceNamespace object to get the namespace details
* @param datasource Datasource name to be set
* @param dc DataSourceContainer object to get the container details
* @param dsc DataSourceCluster object to get the cluster details
* @param dsw DataSourceWorkload object to get the workload details
* @param namespace DataSourceNamespace object to get the namespace details
* @param datasource Datasource name to be set
* @param createExperimentAPIObjects
* @return Json string to be sent to the createExperimentAPI for experiment creation
* @throws JsonProcessingException
*/
private String prepareCreateExperimentJSONInput(DataSourceContainer dc, DataSourceCluster dsc, DataSourceWorkload dsw,
DataSourceNamespace namespace, String experiment_name, String datasource) throws IOException {

CreateExperimentConfigBean createExperimentConfigBean = CREATE_EXPERIMENT_CONFIG_BEAN;
// Experiment name
createExperimentConfigBean.setExperiment_name(experiment_name);
// Datasource
createExperimentConfigBean.setDatasource(datasource);
// Cluster name
createExperimentConfigBean.setCluster_name(dsc.getDataSourceClusterName());
// Kubernetes objects
DataSourceNamespace namespace, String experiment_name, String datasource, List<CreateExperimentAPIObject> createExperimentAPIObjects) throws IOException {

CreateExperimentAPIObject createExperimentAPIObject = new CreateExperimentAPIObject();
createExperimentAPIObject.setMode(CREATE_EXPERIMENT_CONFIG_BEAN.getMode());
createExperimentAPIObject.setTargetCluster(CREATE_EXPERIMENT_CONFIG_BEAN.getTarget_cluster());
createExperimentAPIObject.setApiVersion(CREATE_EXPERIMENT_CONFIG_BEAN.getVersion());
createExperimentAPIObject.setExperimentName(experiment_name);
createExperimentAPIObject.setDatasource(this.bulkInput.getDatasource());
createExperimentAPIObject.setClusterName(dsc.getDataSourceClusterName());
createExperimentAPIObject.setPerformanceProfile(CREATE_EXPERIMENT_CONFIG_BEAN.getPerformance_profile());
List<KubernetesAPIObject> kubernetesAPIObjectList = new ArrayList<>();
KubernetesAPIObject kubernetesAPIObject = new KubernetesAPIObject();
kubernetesAPIObject.setType(dsw.getDataSourceWorkloadType());
ContainerAPIObject cao = new ContainerAPIObject(dc.getDataSourceContainerName(),
dc.getDataSourceContainerImageName(), null, null);
kubernetesAPIObject.setContainerAPIObjects(Arrays.asList(cao));
kubernetesAPIObject.setName(dsw.getDataSourceWorkloadName());
kubernetesAPIObject.setType(dsw.getDataSourceWorkloadType());
kubernetesAPIObject.setNamespace(namespace.getDataSourceNamespaceName());
// Containers
ContainerAPIObject containerAPIObject = new ContainerAPIObject(dc.getDataSourceContainerName(),
dc.getDataSourceContainerImageName(), null, null);
// Add container to the Kubernetes object
kubernetesAPIObject.setContainerAPIObjects(Arrays.asList(containerAPIObject));
kubernetesAPIObjectList.add(kubernetesAPIObject);
// Add the Kubernetes objects to the createExperimentConfigBean
createExperimentConfigBean.setKubernetes_objects(kubernetesAPIObjectList);
createExperimentAPIObject.setKubernetesObjects(kubernetesAPIObjectList);
RecommendationSettings rs = new RecommendationSettings();
rs.setThreshold(CREATE_EXPERIMENT_CONFIG_BEAN.getThreshold());
createExperimentAPIObject.setRecommendationSettings(rs);
TrialSettings trialSettings = new TrialSettings();
trialSettings.setMeasurement_durationMinutes(CREATE_EXPERIMENT_CONFIG_BEAN.getMeasurementDurationStr());
createExperimentAPIObject.setTrialSettings(trialSettings);

// list to hold CreateExperimentConfigBean objects
List<CreateExperimentConfigBean> createExperimentConfigBeanList = new ArrayList<>();
createExperimentAPIObject.setExperiment_id(Utils.generateID(createExperimentAPIObject.toString()));
createExperimentAPIObject.setStatus(AnalyzerConstants.ExperimentStatus.IN_PROGRESS);
createExperimentAPIObject.setExperimentType(AnalyzerConstants.ExperimentTypes.CONTAINER_EXPERIMENT);

// Add CreateExperimentConfigBean object to the list
createExperimentConfigBeanList.add(createExperimentConfigBean);
createExperimentAPIObjects.add(createExperimentAPIObject);

// Convert to JSON
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(createExperimentConfigBeanList);
LOGGER.debug("CreateExp JSON: {}", json);
String json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(createExperimentAPIObjects);
LOGGER.info("CreateExp JSON: {}", json);
return json;
}

public static int sendCreateExperimentRequest(String jsonInput) throws IOException {
URL url;
try {
url = new URL(KruizeDeploymentInfo.experiments_url);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
HttpURLConnection connection;
try {
connection = (HttpURLConnection) url.openConnection();
} catch (IOException e) {
LOGGER.error(e.getMessage());
throw new RuntimeException(e);
}
try {
connection.setRequestMethod("POST");
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json; utf-8");
connection.setRequestProperty("Accept", "application/json");
connection.setDoOutput(true);
} catch (ProtocolException e) {
LOGGER.error(e.getMessage());
throw new RuntimeException(e);
}
// Write JSON input to the connection output stream
try(OutputStream os = connection.getOutputStream()) {
byte[] input = jsonInput.getBytes(StandardCharsets.UTF_8);
os.write(input, 0, input.length);
}

// Check for the response code to verify if request was successful
int responseCode = connection.getResponseCode();
System.out.println("POST Response Code : " + responseCode);
return responseCode;
}
}
Loading

0 comments on commit ccc9c78

Please sign in to comment.