Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1942 Fix task fetching inconsistency causing excessive lock churn and memory heap buildup in pods #3800

Merged
merged 10 commits into from
Dec 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,19 @@ private Map<String, Object> runQueryWithLowLevelClient(String query) throws Atla
}
}

private Map<String, LinkedHashMap> runUpdateByQueryWithLowLevelClient(String query) throws AtlasBaseException {
try {
String responseString = performDirectUpdateByQuery(query);

Map<String, LinkedHashMap> responseMap = AtlasType.fromJson(responseString, Map.class);
return responseMap;

} catch (IOException e) {
LOG.error("Failed to execute direct query on ES {}", e.getMessage());
throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage());
}
}

private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchParams) throws AtlasBaseException, IOException {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("performAsyncDirectIndexQuery");
DirectIndexQueryResult result = null;
Expand Down Expand Up @@ -444,6 +457,30 @@ private String performDirectIndexQuery(String query, boolean source) throws Atla
return EntityUtils.toString(response.getEntity());
}

private String performDirectUpdateByQuery(String query) throws AtlasBaseException, IOException {
HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON);
String endPoint;

endPoint = index + "/_update_by_query";

Request request = new Request("POST", endPoint);
request.setEntity(entity);

Response response;
try {
response = lowLevelRestClient.performRequest(request);
} catch (ResponseException rex) {
if (rex.getResponse().getStatusLine().getStatusCode() == 404) {
LOG.warn(String.format("ES index with name %s not found", index));
throw new AtlasBaseException(INDEX_NOT_FOUND, index);
} else {
throw new AtlasBaseException(String.format("Error in executing elastic query: %s", EntityUtils.toString(entity)), rex);
}
}

return EntityUtils.toString(response.getEntity());
}

private DirectIndexQueryResult getResultFromResponse(String responseString, boolean async) throws IOException {
Map<String, LinkedHashMap> responseMap = AtlasType.fromJson(responseString, Map.class);
return getResultFromResponse(responseMap.get("response"));
Expand Down Expand Up @@ -495,6 +532,10 @@ public Map<String, Object> directIndexQuery(String query) throws AtlasBaseExcept
return runQueryWithLowLevelClient(query);
}

public Map<String, LinkedHashMap> directUpdateByQuery(String query) throws AtlasBaseException {
return runUpdateByQueryWithLowLevelClient(query);
}

@Override
public Iterator<Result<AtlasJanusVertex, AtlasJanusEdge>> vertices() {
SearchRequest searchRequest = getSearchRequest(index, sourceBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,40 +87,52 @@ public void run() {
LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId());
}
while (shouldRun.get()) {
LOG.info("TaskQueueWatcher: Starting a new iteration of task fetching and processing.");
TasksFetcher fetcher = new TasksFetcher(registry);
try {
LOG.info("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK);
if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) {
LOG.warn("TaskQueueWatcher: Could not acquire lock: {}. Retrying after {} ms.", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS);
Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS);
continue;
}

TasksFetcher fetcher = new TasksFetcher(registry);

Thread tasksFetcherThread = new Thread(fetcher);
tasksFetcherThread.start();
tasksFetcherThread.join();
LOG.info("TaskQueueWatcher: Acquired distributed lock: {}", ATLAS_TASK_LOCK);

List<AtlasTask> tasks = fetcher.getTasks();
LOG.info("TaskQueueWatcher: Fetched {} tasks for processing.", CollectionUtils.isNotEmpty(tasks) ? tasks.size() : 0);

if (CollectionUtils.isNotEmpty(tasks)) {
final CountDownLatch latch = new CountDownLatch(tasks.size());
LOG.info("TaskQueueWatcher: Submitting {} tasks to the queue.", tasks.size());
submitAll(tasks, latch);
LOG.info("Submitted {} tasks to the queue", tasks.size());

LOG.info("TaskQueueWatcher: Waiting for submitted tasks to complete.");
waitForTasksToComplete(latch);
LOG.info("TaskQueueWatcher: All tasks have been processed.");
} else {
LOG.info("TaskQueueWatcher: No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
}
fetcher.clearTasks();

LOG.info("TaskQueueWatcher: Sleeping for {} ms before the next poll.", pollInterval);
Thread.sleep(pollInterval);
} catch (InterruptedException interruptedException) {
LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart");
LOG.info("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException);
break;
} catch (Exception e) {
LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e);
LOG.info("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e);
} finally {
LOG.info("TaskQueueWatcher: Releasing distributed lock: {}", ATLAS_TASK_LOCK);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
fetcher.clearTasks();
LOG.info("TaskQueueWatcher: Cleared tasks from the fetcher.");
}
}

LOG.info("TaskQueueWatcher: Thread has stopped. shouldRun is now set to false.");
}


private void waitForTasksToComplete(final CountDownLatch latch) throws InterruptedException {
if (latch.getCount() != 0) {
LOG.info("TaskQueueWatcher: Waiting on Latch, current count: {}", latch.getCount());
Expand All @@ -146,15 +158,14 @@ private void submitAll(List<AtlasTask> tasks, CountDownLatch latch) {
}
}

static class TasksFetcher implements Runnable {
static class TasksFetcher {
private TaskRegistry registry;
private List<AtlasTask> tasks = new ArrayList<>();

public TasksFetcher(TaskRegistry registry) {
this.registry = registry;
}

@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("TasksFetcher: Fetching tasks for queuing");
Expand All @@ -164,6 +175,7 @@ public void run() {
}

public List<AtlasTask> getTasks() {
run();
return tasks;
}

Expand Down
86 changes: 76 additions & 10 deletions repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.atlas.tasks;

import com.datastax.oss.driver.shaded.fasterxml.jackson.databind.ObjectMapper;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
Expand All @@ -31,10 +32,12 @@
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.DirectIndexQueryResult;
import org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchQuery;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.janusgraph.util.encoding.LongEncoding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
Expand All @@ -49,6 +52,7 @@
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -61,6 +65,7 @@ public class TaskRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class);
public static final int TASK_FETCH_BATCH_SIZE = 100;
public static final List<Map<String, Object>> SORT_ARRAY = Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")));
public static final String JANUSGRAPH_VERTEX_INDEX = "janusgraph_vertex_index";

private AtlasGraph graph;
private TaskService taskService;
Expand Down Expand Up @@ -397,21 +402,24 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
int fetched = 0;
try {
if (totalFetched + size > queueSize) {
size = queueSize - totalFetched;
size = queueSize - totalFetched; // Adjust size to not exceed queue size
LOG.info("Adjusting fetch size to {} based on queue size constraint.", size);
}

dsl.put("from", from);
dsl.put("size", size);

LOG.info("DSL Query for iteration: {}", dsl);
indexSearchParams.setDsl(dsl);

LOG.info("Executing Elasticsearch query with from: {} and size: {}", from, size);
AtlasIndexQuery indexQuery = graph.elasticsearchQuery(Constants.VERTEX_INDEX, indexSearchParams);

try {
indexQueryResult = indexQuery.vertices(indexSearchParams);
LOG.info("Query executed successfully for from: {} with size: {}", from, size);
} catch (AtlasBaseException e) {
LOG.error("Failed to fetch pending/in-progress task vertices to re-que");
e.printStackTrace();
LOG.error("Failed to fetch PENDING/IN_PROGRESS task vertices. Exiting loop.", e);
break;
}

Expand All @@ -423,35 +431,93 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {

if (vertex != null) {
AtlasTask atlasTask = toAtlasTask(vertex);

LOG.info("Processing fetched task: {}", atlasTask);
if (atlasTask.getStatus().equals(AtlasTask.Status.PENDING) ||
atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){
LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString()));
atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS)) {
LOG.info("Adding task to the result list: {}", atlasTask);
ret.add(atlasTask);
}
else {
LOG.warn(String.format("There is a mismatch on tasks status between ES and Cassandra for guid: %s", atlasTask.getGuid()));
} else {
LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}",
atlasTask.getGuid(), atlasTask.getStatus());
// Repair mismatched task
String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay()));
LOG.info("Repairing mismatched task with docId: {}", docId);
repairMismatchedTask(atlasTask, docId);
}
} else {
LOG.warn("Null vertex while re-queuing tasks at index {}", fetched);
LOG.warn("Null vertex encountered while re-queuing tasks at index {}", fetched);
}

fetched++;
}
LOG.info("Fetched {} tasks in this iteration.", fetched);
} else {
LOG.warn("Index query result is null for from: {} and size: {}", from, size);
}

totalFetched += fetched;
LOG.info("Total tasks fetched so far: {}. Incrementing offset by {}.", totalFetched, size);

from += size;
if (fetched < size || totalFetched >= queueSize) {
LOG.info("Breaking loop. Fetched fewer tasks ({}) than requested size ({}) or reached queue size limit ({}).", fetched, size, queueSize);
break;
}
} catch (Exception e){
} catch (Exception e) {
LOG.error("Exception occurred during task fetching process. Exiting loop.", e);
break;
}
}

LOG.info("Fetch process completed. Total tasks fetched: {}.", totalFetched);
return ret;
}

private void repairMismatchedTask(AtlasTask atlasTask, String docId) {
AtlasElasticsearchQuery indexQuery = null;

try {
// Create a map for the fields to be updated
Map<String, Object> fieldsToUpdate = new HashMap<>();
fieldsToUpdate.put("__task_endTime", atlasTask.getEndTime().getTime());
fieldsToUpdate.put("__task_timeTakenInSeconds", atlasTask.getTimeTakenInSeconds());
fieldsToUpdate.put("__task_status", atlasTask.getStatus().toString());
fieldsToUpdate.put("__task_modificationTimestamp", atlasTask.getUpdatedTime().getTime()); // Set current timestamp

// Convert fieldsToUpdate map to JSON using Jackson
ObjectMapper objectMapper = new ObjectMapper();
String fieldsToUpdateJson = objectMapper.writeValueAsString(fieldsToUpdate);

// Construct the Elasticsearch update by query DSL
String queryDsl = "{"
+ "\"script\": {"
+ " \"source\": \"for (entry in params.fields.entrySet()) { ctx._source[entry.getKey()] = entry.getValue() }\","
+ " \"params\": {"
+ " \"fields\": " + fieldsToUpdateJson
+ " }"
+ "},"
+ "\"query\": {"
+ " \"term\": {"
+ " \"_id\": \"" + docId + "\""
+ " }"
+ "}"
+ "}";

// Execute the Elasticsearch query
indexQuery = (AtlasElasticsearchQuery) graph.elasticsearchQuery(JANUSGRAPH_VERTEX_INDEX);
Map<String, LinkedHashMap> result = indexQuery.directUpdateByQuery(queryDsl);

if (result != null) {
LOG.info("Elasticsearch UpdateByQuery Result: " + result + "\nfor task : " + atlasTask.getGuid());
} else {
LOG.info("No documents updated in Elasticsearch for guid: " + atlasTask.getGuid());
}
} catch (Exception e) {
e.printStackTrace();
}
}

public void commit() {
this.graph.commit();
}
Expand Down
Loading