diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index dd182e3040..93b34ea4e4 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -174,6 +174,19 @@ private Map runQueryWithLowLevelClient(String query) throws Atla } } + private Map runUpdateByQueryWithLowLevelClient(String query) throws AtlasBaseException { + try { + String responseString = performDirectUpdateByQuery(query); + + Map responseMap = AtlasType.fromJson(responseString, Map.class); + return responseMap; + + } catch (IOException e) { + LOG.error("Failed to execute direct query on ES {}", e.getMessage()); + throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage()); + } + } + private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchParams) throws AtlasBaseException, IOException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("performAsyncDirectIndexQuery"); DirectIndexQueryResult result = null; @@ -444,6 +457,30 @@ private String performDirectIndexQuery(String query, boolean source) throws Atla return EntityUtils.toString(response.getEntity()); } + private String performDirectUpdateByQuery(String query) throws AtlasBaseException, IOException { + HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON); + String endPoint; + + endPoint = index + "/_update_by_query"; + + Request request = new Request("POST", endPoint); + request.setEntity(entity); + + Response response; + try { + response = lowLevelRestClient.performRequest(request); + } catch (ResponseException rex) { + if (rex.getResponse().getStatusLine().getStatusCode() == 404) { + LOG.warn(String.format("ES index with name %s not found", index)); + throw new AtlasBaseException(INDEX_NOT_FOUND, index); + } else { + throw new AtlasBaseException(String.format("Error in executing elastic query: %s", EntityUtils.toString(entity)), rex); + } + } + + return EntityUtils.toString(response.getEntity()); + } + private DirectIndexQueryResult getResultFromResponse(String responseString, boolean async) throws IOException { Map responseMap = AtlasType.fromJson(responseString, Map.class); return getResultFromResponse(responseMap.get("response")); @@ -495,6 +532,10 @@ public Map directIndexQuery(String query) throws AtlasBaseExcept return runQueryWithLowLevelClient(query); } + public Map directUpdateByQuery(String query) throws AtlasBaseException { + return runUpdateByQueryWithLowLevelClient(query); + } + @Override public Iterator> vertices() { SearchRequest searchRequest = getSearchRequest(index, sourceBuilder); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java index 68dc122954..c4ccdc358e 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java @@ -87,40 +87,52 @@ public void run() { LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); } while (shouldRun.get()) { + LOG.info("TaskQueueWatcher: Starting a new iteration of task fetching and processing."); + TasksFetcher fetcher = new TasksFetcher(registry); try { + LOG.info("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK); if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) { + LOG.warn("TaskQueueWatcher: Could not acquire lock: {}. Retrying after {} ms.", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS); Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS); continue; } - - TasksFetcher fetcher = new TasksFetcher(registry); - - Thread tasksFetcherThread = new Thread(fetcher); - tasksFetcherThread.start(); - tasksFetcherThread.join(); + LOG.info("TaskQueueWatcher: Acquired distributed lock: {}", ATLAS_TASK_LOCK); List tasks = fetcher.getTasks(); + LOG.info("TaskQueueWatcher: Fetched {} tasks for processing.", CollectionUtils.isNotEmpty(tasks) ? tasks.size() : 0); + if (CollectionUtils.isNotEmpty(tasks)) { final CountDownLatch latch = new CountDownLatch(tasks.size()); + LOG.info("TaskQueueWatcher: Submitting {} tasks to the queue.", tasks.size()); submitAll(tasks, latch); - LOG.info("Submitted {} tasks to the queue", tasks.size()); + + LOG.info("TaskQueueWatcher: Waiting for submitted tasks to complete."); waitForTasksToComplete(latch); + LOG.info("TaskQueueWatcher: All tasks have been processed."); } else { + LOG.info("TaskQueueWatcher: No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); } - fetcher.clearTasks(); + + LOG.info("TaskQueueWatcher: Sleeping for {} ms before the next poll.", pollInterval); Thread.sleep(pollInterval); } catch (InterruptedException interruptedException) { - LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart"); + LOG.info("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException); break; } catch (Exception e) { - LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e); + LOG.info("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e); } finally { + LOG.info("TaskQueueWatcher: Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); + fetcher.clearTasks(); + LOG.info("TaskQueueWatcher: Cleared tasks from the fetcher."); } } + + LOG.info("TaskQueueWatcher: Thread has stopped. shouldRun is now set to false."); } + private void waitForTasksToComplete(final CountDownLatch latch) throws InterruptedException { if (latch.getCount() != 0) { LOG.info("TaskQueueWatcher: Waiting on Latch, current count: {}", latch.getCount()); @@ -146,7 +158,7 @@ private void submitAll(List tasks, CountDownLatch latch) { } } - static class TasksFetcher implements Runnable { + static class TasksFetcher { private TaskRegistry registry; private List tasks = new ArrayList<>(); @@ -154,7 +166,6 @@ public TasksFetcher(TaskRegistry registry) { this.registry = registry; } - @Override public void run() { if (LOG.isDebugEnabled()) { LOG.debug("TasksFetcher: Fetching tasks for queuing"); @@ -164,6 +175,7 @@ public void run() { } public List getTasks() { + run(); return tasks; } diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 8c08a26121..014532c794 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -17,6 +17,7 @@ */ package org.apache.atlas.tasks; +import com.datastax.oss.driver.shaded.fasterxml.jackson.databind.ObjectMapper; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.RequestContext; import org.apache.atlas.annotation.GraphTransaction; @@ -31,10 +32,12 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.DirectIndexQueryResult; +import org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchQuery; import org.apache.atlas.type.AtlasType; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections4.ListUtils; +import org.janusgraph.util.encoding.LongEncoding; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -49,6 +52,7 @@ import java.util.List; import java.util.Arrays; import java.util.Map; +import java.util.LinkedHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -61,6 +65,7 @@ public class TaskRegistry { private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class); public static final int TASK_FETCH_BATCH_SIZE = 100; public static final List> SORT_ARRAY = Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc"))); + public static final String JANUSGRAPH_VERTEX_INDEX = "janusgraph_vertex_index"; private AtlasGraph graph; private TaskService taskService; @@ -397,21 +402,24 @@ public List getTasksForReQueueIndexSearch() { int fetched = 0; try { if (totalFetched + size > queueSize) { - size = queueSize - totalFetched; + size = queueSize - totalFetched; // Adjust size to not exceed queue size + LOG.info("Adjusting fetch size to {} based on queue size constraint.", size); } dsl.put("from", from); dsl.put("size", size); + LOG.info("DSL Query for iteration: {}", dsl); indexSearchParams.setDsl(dsl); + LOG.info("Executing Elasticsearch query with from: {} and size: {}", from, size); AtlasIndexQuery indexQuery = graph.elasticsearchQuery(Constants.VERTEX_INDEX, indexSearchParams); try { indexQueryResult = indexQuery.vertices(indexSearchParams); + LOG.info("Query executed successfully for from: {} with size: {}", from, size); } catch (AtlasBaseException e) { - LOG.error("Failed to fetch pending/in-progress task vertices to re-que"); - e.printStackTrace(); + LOG.error("Failed to fetch PENDING/IN_PROGRESS task vertices. Exiting loop.", e); break; } @@ -423,35 +431,93 @@ public List getTasksForReQueueIndexSearch() { if (vertex != null) { AtlasTask atlasTask = toAtlasTask(vertex); + + LOG.info("Processing fetched task: {}", atlasTask); if (atlasTask.getStatus().equals(AtlasTask.Status.PENDING) || - atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){ - LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString())); + atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS)) { + LOG.info("Adding task to the result list: {}", atlasTask); ret.add(atlasTask); - } - else { - LOG.warn(String.format("There is a mismatch on tasks status between ES and Cassandra for guid: %s", atlasTask.getGuid())); + } else { + LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}", + atlasTask.getGuid(), atlasTask.getStatus()); + // Repair mismatched task + String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay())); + LOG.info("Repairing mismatched task with docId: {}", docId); + repairMismatchedTask(atlasTask, docId); } } else { - LOG.warn("Null vertex while re-queuing tasks at index {}", fetched); + LOG.warn("Null vertex encountered while re-queuing tasks at index {}", fetched); } fetched++; } + LOG.info("Fetched {} tasks in this iteration.", fetched); + } else { + LOG.warn("Index query result is null for from: {} and size: {}", from, size); } totalFetched += fetched; + LOG.info("Total tasks fetched so far: {}. Incrementing offset by {}.", totalFetched, size); + from += size; if (fetched < size || totalFetched >= queueSize) { + LOG.info("Breaking loop. Fetched fewer tasks ({}) than requested size ({}) or reached queue size limit ({}).", fetched, size, queueSize); break; } - } catch (Exception e){ + } catch (Exception e) { + LOG.error("Exception occurred during task fetching process. Exiting loop.", e); break; } } + LOG.info("Fetch process completed. Total tasks fetched: {}.", totalFetched); return ret; } + private void repairMismatchedTask(AtlasTask atlasTask, String docId) { + AtlasElasticsearchQuery indexQuery = null; + + try { + // Create a map for the fields to be updated + Map fieldsToUpdate = new HashMap<>(); + fieldsToUpdate.put("__task_endTime", atlasTask.getEndTime().getTime()); + fieldsToUpdate.put("__task_timeTakenInSeconds", atlasTask.getTimeTakenInSeconds()); + fieldsToUpdate.put("__task_status", atlasTask.getStatus().toString()); + fieldsToUpdate.put("__task_modificationTimestamp", atlasTask.getUpdatedTime().getTime()); // Set current timestamp + + // Convert fieldsToUpdate map to JSON using Jackson + ObjectMapper objectMapper = new ObjectMapper(); + String fieldsToUpdateJson = objectMapper.writeValueAsString(fieldsToUpdate); + + // Construct the Elasticsearch update by query DSL + String queryDsl = "{" + + "\"script\": {" + + " \"source\": \"for (entry in params.fields.entrySet()) { ctx._source[entry.getKey()] = entry.getValue() }\"," + + " \"params\": {" + + " \"fields\": " + fieldsToUpdateJson + + " }" + + "}," + + "\"query\": {" + + " \"term\": {" + + " \"_id\": \"" + docId + "\"" + + " }" + + "}" + + "}"; + + // Execute the Elasticsearch query + indexQuery = (AtlasElasticsearchQuery) graph.elasticsearchQuery(JANUSGRAPH_VERTEX_INDEX); + Map result = indexQuery.directUpdateByQuery(queryDsl); + + if (result != null) { + LOG.info("Elasticsearch UpdateByQuery Result: " + result + "\nfor task : " + atlasTask.getGuid()); + } else { + LOG.info("No documents updated in Elasticsearch for guid: " + atlasTask.getGuid()); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + public void commit() { this.graph.commit(); }