Skip to content

Commit

Permalink
Merge pull request kruize#1369 from khansaad/filter-abstraction
Browse files Browse the repository at this point in the history
Add Time Range Filter for Bulk API
  • Loading branch information
dinogun authored Nov 15, 2024
2 parents 4bd9baa + 5dd63e9 commit 6143696
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 34 deletions.
47 changes: 34 additions & 13 deletions design/BulkAPI.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ progress of the job.
}
}
},
"time_range": {},
"time_range": {
"start": "2024-11-01T00:00:00.000Z",
"end": "2024-11-15T23:59:59.000Z"
},
"datasource": "Cbank1Xyz",
"experiment_types": [
"container",
Expand Down Expand Up @@ -82,6 +85,24 @@ progress of the job.
"job_id": "123e4567-e89b-12d3-a456-426614174000"
}
```
### Different payload parameters examples

#### 1. **Request Payload with `time_range` specified:**

This object allows users to specify the duration for which they want to query data and receive recommendations. It consists of the following fields:

- **`start`**: The starting timestamp of the query duration in ISO 8601 format (`YYYY-MM-DDTHH:mm:ss.sssZ`).
- **`end`**: The ending timestamp of the query duration in ISO 8601 format (`YYYY-MM-DDTHH:mm:ss.sssZ`).

The specified time range determines the period over which the data is analyzed to provide recommendations at the container or namespace level. Ensure that:
- Both `start` and `end` are valid timestamps.
- The `start` timestamp precedes the `end` timestamp.

#### 2. **Request Payload with `exclude` filter specified:**
TBA

#### 3. **Request Payload with `include` filter specified:**
TBA

### GET Request:

Expand Down Expand Up @@ -216,28 +237,28 @@ resource optimization in Kubernetes environments. Below is a breakdown of the JS

- Each object in the `experiments` array has the following structure:

| Field | Type | Description |
|-------------------------|--------------|--------------------------------------------------------------------------|
| `name` | `string` | Name of the experiment, typically indicating a service name and deployment context. |
| `notification` | `object` | Notifications specific to this experiment (if any). |
| `recommendation` | `object` | Recommendation status and notifications specific to this experiment. |
| Field | Type | Description |
|-------------------|----------|-------------------------------------------------------------------------------------|
| `name` | `string` | Name of the experiment, typically indicating a service name and deployment context. |
| `notifications` | `object` | Notifications specific to this experiment (if any). |
| `recommendations` | `object` | Recommendation status and notifications specific to this experiment. |

#### Recommendation Object

The `recommendation` field within each experiment provides information about recommendation processing status and
The `recommendations` field within each experiment provides information about recommendation processing status and
errors (if any).

| Field | Type | Description |
|-------------------------|--------------|--------------------------------------------------------------------------|
| `status` | `string` | Status of the recommendation (e.g., `"unprocessed"`, `"processed"`, `"processing"`, `"failed"`). |
| `notification` | `object` | Notifications related to recommendation processing. |
| Field | Type | Description |
|-----------------|----------|--------------------------------------------------------------------------------------------------|
| `status` | `string` | Status of the recommendation (e.g., `"unprocessed"`, `"processed"`, `"processing"`, `"failed"`). |
| `notifications` | `object` | Notifications related to recommendation processing. |

#### Notification Object

Both the `notification` and `recommendation.notification` fields may contain error messages or warnings as follows:
Both the `notifications` and `recommendations.notifications` fields may contain error messages or warnings as follows:

| Field | Type | Description |
|-------------------------|--------------|----------------------------------------------------------------------------|
|-------------------------|--------------|----------------------------------------------------------------------------|
| `type` | `string` | Type of notification (e.g., `"info"`,`"error"`, `"warning"`). |
| `message` | `string` | Description of the notification message. |
| `code` | `integer` | HTTP-like code indicating the type of error (e.g., `400` for bad request). |
Expand Down
21 changes: 11 additions & 10 deletions src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void run() {
if (null != datasource) {
JSONObject daterange = processDateRange(this.bulkInput.getTime_range());
if (null != daterange)
metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, (Long) daterange.get("start_time"), (Long) daterange.get("end_time"), (Integer) daterange.get("steps"));
metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, (Long) daterange.get(START_TIME), (Long) daterange.get(END_TIME), (Integer) daterange.get(STEPS));
else {
metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, 0, 0, 0);
}
Expand Down Expand Up @@ -233,7 +233,7 @@ public void run() {
}
} catch (IOException e) {
LOGGER.error(e.getMessage());
jobData.setStatus("FAILED");
jobData.setStatus(FAILED);
jobData.setEndTime(Instant.now());
BulkJobStatus.Notification notification;
if (e instanceof SocketTimeoutException) {
Expand All @@ -248,7 +248,7 @@ public void run() {
} catch (Exception e) {
LOGGER.error(e.getMessage());
e.printStackTrace();
jobData.setStatus("FAILED");
jobData.setStatus(FAILED);
jobData.setEndTime(Instant.now());
jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_INTERNAL_ERROR), new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR));
}
Expand Down Expand Up @@ -298,10 +298,10 @@ private String getLabels(BulkInput.FilterWrapper filter) {
includeLabelsBuilder.append(key).append("=").append("\"" + value + "\"").append(",")
);
// Remove trailing comma
if (includeLabelsBuilder.length() > 0) {
if (!includeLabelsBuilder.isEmpty()) {
includeLabelsBuilder.setLength(includeLabelsBuilder.length() - 1);
}
LOGGER.debug("Include Labels: " + includeLabelsBuilder);
LOGGER.debug("Include Labels: {}", includeLabelsBuilder);
uniqueKey = includeLabelsBuilder.toString();
}
}
Expand All @@ -313,10 +313,11 @@ private String getLabels(BulkInput.FilterWrapper filter) {
}

private JSONObject processDateRange(BulkInput.TimeRange timeRange) {
//TODO: add validations for the time range
JSONObject dateRange = null;
if (null != timeRange && timeRange.getStart() != null && timeRange.getEnd() != null) {
String intervalEndTimeStr = timeRange.getStart();
String intervalStartTimeStr = timeRange.getEnd();
String intervalStartTimeStr = timeRange.getStart();
String intervalEndTimeStr = timeRange.getEnd();
long interval_end_time_epoc = 0;
long interval_start_time_epoc = 0;
LocalDateTime localDateTime = LocalDateTime.parse(intervalEndTimeStr, DateTimeFormatter.ofPattern(KruizeConstants.DateFormats.STANDARD_JSON_DATE_FORMAT));
Expand All @@ -327,9 +328,9 @@ private JSONObject processDateRange(BulkInput.TimeRange timeRange) {
Timestamp interval_start_time = Timestamp.from(localDateTime.toInstant(ZoneOffset.UTC));
int steps = CREATE_EXPERIMENT_CONFIG_BEAN.getMeasurementDuration() * KruizeConstants.TimeConv.NO_OF_SECONDS_PER_MINUTE; // todo fetch experiment recommendations setting measurement
dateRange = new JSONObject();
dateRange.put("start_time", interval_start_time_epoc);
dateRange.put("end_time", interval_end_time_epoc);
dateRange.put("steps", steps);
dateRange.put(START_TIME, interval_start_time_epoc);
dateRange.put(END_TIME, interval_end_time_epoc);
dateRange.put(STEPS, steps);
}
return dateRange;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,31 @@
*******************************************************************************/
package com.autotune.common.datasource;

import com.autotune.analyzer.exceptions.FetchMetricsError;
import com.autotune.common.data.dataSourceMetadata.*;
import com.autotune.common.data.dataSourceQueries.PromQLDataSourceQueries;
import com.autotune.utils.GenericRestApiClient;
import com.autotune.utils.KruizeConstants;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.io.IOException;
import java.net.URLEncoder;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;

import static com.autotune.analyzer.utils.AnalyzerConstants.ServiceConstants.CHARACTER_ENCODING;

/**
* DataSourceMetadataOperator is an abstraction with CRUD operations to manage DataSourceMetadataInfo Object
* representing JSON for a given data source
Expand Down Expand Up @@ -168,19 +180,20 @@ public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(Da
String containerQuery = PromQLDataSourceQueries.CONTAINER_QUERY;
if (null != uniqueKey && !uniqueKey.isEmpty()) {
LOGGER.debug("uniquekey: {}", uniqueKey);
namespaceQuery = namespaceQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey);
workloadQuery = workloadQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey);
containerQuery = containerQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey);
namespaceQuery = namespaceQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "," + uniqueKey);
workloadQuery = workloadQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "," + uniqueKey);
containerQuery = containerQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "," + uniqueKey);
} else {
namespaceQuery = namespaceQuery.replace("ADDITIONAL_LABEL", "");
workloadQuery = workloadQuery.replace("ADDITIONAL_LABEL", "");
containerQuery = containerQuery.replace("ADDITIONAL_LABEL", "");
namespaceQuery = namespaceQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "");
workloadQuery = workloadQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "");
containerQuery = containerQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "");
}
LOGGER.info("namespaceQuery: {}", namespaceQuery);
LOGGER.info("workloadQuery: {}", workloadQuery);
LOGGER.info("containerQuery: {}", containerQuery);

JsonArray namespacesDataResultArray = op.getResultArrayForQuery(dataSourceInfo, namespaceQuery);
JsonArray namespacesDataResultArray = fetchQueryResults(dataSourceInfo, namespaceQuery, startTime, endTime, steps);
LOGGER.debug("namespacesDataResultArray: {}", namespacesDataResultArray);
if (!op.validateResultArray(namespacesDataResultArray)) {
dataSourceMetadataInfo = dataSourceDetailsHelper.createDataSourceMetadataInfoObject(dataSourceName, null);
} else {
Expand All @@ -201,8 +214,8 @@ public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(Da
* TODO - get workload metadata for a given namespace
*/
HashMap<String, HashMap<String, DataSourceWorkload>> datasourceWorkloads = new HashMap<>();
JsonArray workloadDataResultArray = op.getResultArrayForQuery(dataSourceInfo,
workloadQuery);
JsonArray workloadDataResultArray = fetchQueryResults(dataSourceInfo, workloadQuery, startTime, endTime, steps);
LOGGER.debug("workloadDataResultArray: {}", workloadDataResultArray);

if (op.validateResultArray(workloadDataResultArray)) {
datasourceWorkloads = dataSourceDetailsHelper.getWorkloadInfo(workloadDataResultArray);
Expand All @@ -220,8 +233,8 @@ public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(Da
* TODO - get container metadata for a given workload
*/
HashMap<String, HashMap<String, DataSourceContainer>> datasourceContainers = new HashMap<>();
JsonArray containerDataResultArray = op.getResultArrayForQuery(dataSourceInfo,
containerQuery);
JsonArray containerDataResultArray = fetchQueryResults(dataSourceInfo, containerQuery, startTime, endTime, steps);

LOGGER.debug("containerDataResultArray: {}", containerDataResultArray);

if (op.validateResultArray(containerDataResultArray)) {
Expand All @@ -235,4 +248,19 @@ public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(Da
return null;

}

private JsonArray fetchQueryResults(DataSourceInfo dataSourceInfo, String query, long startTime, long endTime, int steps) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
GenericRestApiClient client = new GenericRestApiClient(dataSourceInfo);
String metricsUrl = String.format(KruizeConstants.DataSourceConstants.DATASOURCE_ENDPOINT_WITH_QUERY,
dataSourceInfo.getUrl(),
URLEncoder.encode(query, CHARACTER_ENCODING),
startTime,
endTime,
steps);
LOGGER.debug("MetricsUrl: {}", metricsUrl);
client.setBaseURL(metricsUrl);
JSONObject genericJsonObject = client.fetchMetricsJson(KruizeConstants.APIMessages.GET, "");
JsonObject jsonObject = new Gson().fromJson(genericJsonObject.toString(), JsonObject.class);
return jsonObject.getAsJsonObject(KruizeConstants.JSONKeys.DATA).getAsJsonArray(KruizeConstants.DataSourceConstants.DataSourceQueryJSONKeys.RESULT);
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/autotune/utils/KruizeConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,11 @@ public static final class KRUIZE_BULK_API {
public static final String FAILED = "FAILED";
public static final String LIMIT_MESSAGE = "The number of experiments exceeds %s.";
public static final String NOTHING = "Nothing to do.";
public static final String START_TIME = "start_time";
public static final String END_TIME = "end_time";
public static final String STEPS = "steps";
public static final String ADDITIONAL_LABEL = "ADDITIONAL_LABEL";

// TODO : Bulk API Create Experiments defaults
public static final CreateExperimentConfigBean CREATE_EXPERIMENT_CONFIG_BEAN;

Expand Down

0 comments on commit 6143696

Please sign in to comment.