From 9729887c9997b65f74b520b20ba5ccc7fe7cb3f0 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Fri, 29 Nov 2024 18:38:31 +0530 Subject: [PATCH] Added a doc targetting fix. instead of using guid using doc_id. --- .../apache/atlas/tasks/TaskQueueWatcher.java | 28 +++++++++++++++---- .../org/apache/atlas/tasks/TaskRegistry.java | 10 ++++--- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java index 435741a7ec..a1360938f6 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java @@ -78,43 +78,61 @@ public void shutdown() { public void run() { boolean isMaintenanceMode = AtlasConfiguration.ATLAS_MAINTENANCE_MODE.getBoolean(); if (isMaintenanceMode) { - LOG.info("TaskQueueWatcher: Maintenance mode is enabled, new tasks will not be loaded into the queue until next restart"); + LOG.info("TaskQueueWatcher: Maintenance mode is enabled. New tasks will not be loaded into the queue until next restart."); return; } shouldRun.set(true); if (LOG.isDebugEnabled()) { - LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); + LOG.debug("TaskQueueWatcher: Starting thread {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); } while (shouldRun.get()) { + LOG.info("TaskQueueWatcher: Starting a new iteration of task fetching and processing."); TasksFetcher fetcher = new TasksFetcher(registry); try { + LOG.debug("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK); if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) { + LOG.warn("TaskQueueWatcher: Could not acquire lock: {}. Retrying after {} ms.", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS); Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS); continue; } + LOG.info("TaskQueueWatcher: Acquired distributed lock: {}", ATLAS_TASK_LOCK); + List tasks = fetcher.getTasks(); + LOG.info("TaskQueueWatcher: Fetched {} tasks for processing.", CollectionUtils.isNotEmpty(tasks) ? tasks.size() : 0); + if (CollectionUtils.isNotEmpty(tasks)) { final CountDownLatch latch = new CountDownLatch(tasks.size()); + LOG.info("TaskQueueWatcher: Submitting {} tasks to the queue.", tasks.size()); submitAll(tasks, latch); - LOG.info("Submitted {} tasks to the queue", tasks.size()); + + LOG.info("TaskQueueWatcher: Waiting for submitted tasks to complete."); waitForTasksToComplete(latch); + LOG.info("TaskQueueWatcher: All tasks have been processed."); } else { + LOG.info("TaskQueueWatcher: No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); } + + LOG.info("TaskQueueWatcher: Sleeping for {} ms before the next poll.", pollInterval); Thread.sleep(pollInterval); } catch (InterruptedException interruptedException) { - LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart"); + LOG.error("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException); break; } catch (Exception e) { - LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e); + LOG.error("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e); } finally { + LOG.info("TaskQueueWatcher: Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); fetcher.clearTasks(); + LOG.debug("TaskQueueWatcher: Cleared tasks from the fetcher."); } } + + LOG.info("TaskQueueWatcher: Thread has stopped. shouldRun is now set to false."); } + private void waitForTasksToComplete(final CountDownLatch latch) throws InterruptedException { if (latch.getCount() != 0) { LOG.info("TaskQueueWatcher: Waiting on Latch, current count: {}", latch.getCount()); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 5f0285bb36..a642e81605 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -37,6 +37,7 @@ import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections4.ListUtils; +import org.janusgraph.util.encoding.LongEncoding; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -434,7 +435,8 @@ public List getTasksForReQueueIndexSearch() { } else { LOG.warn(String.format("There is a mismatch on tasks status between ES and Cassandra for guid: %s", atlasTask.getGuid())); - repairMismatchedTask(atlasTask); + String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay())); + repairMismatchedTask(atlasTask, docId); } } else { LOG.warn("Null vertex while re-queuing tasks at index {}", fetched); @@ -457,7 +459,7 @@ public List getTasksForReQueueIndexSearch() { return ret; } - private void repairMismatchedTask(AtlasTask atlasTask) { + private void repairMismatchedTask(AtlasTask atlasTask, String docId) { AtlasElasticsearchQuery indexQuery = null; try { @@ -481,8 +483,8 @@ private void repairMismatchedTask(AtlasTask atlasTask) { + " }" + "}," + "\"query\": {" - + " \"match\": {" - + " \"__task_guid\": \"" + atlasTask.getGuid() + "\"" + + " \"term\": {" + + " \"_id\": \"" + docId + "\"" + " }" + "}" + "}";