From be801da3340a2a94d50b3083c5e5e2ab3a71b7a3 Mon Sep 17 00:00:00 2001 From: Suman Das <59254445+sumandas0@users.noreply.github.com> Date: Fri, 29 Nov 2024 13:11:42 +0530 Subject: [PATCH 01/10] feat: remove redundant thread, add commit for task status --- .../org/apache/atlas/tasks/TaskQueueWatcher.java | 14 ++++---------- .../java/org/apache/atlas/tasks/TaskRegistry.java | 3 +++ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java index 68dc122954..435741a7ec 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java @@ -87,18 +87,12 @@ public void run() { LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); } while (shouldRun.get()) { + TasksFetcher fetcher = new TasksFetcher(registry); try { if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) { Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS); continue; } - - TasksFetcher fetcher = new TasksFetcher(registry); - - Thread tasksFetcherThread = new Thread(fetcher); - tasksFetcherThread.start(); - tasksFetcherThread.join(); - List tasks = fetcher.getTasks(); if (CollectionUtils.isNotEmpty(tasks)) { final CountDownLatch latch = new CountDownLatch(tasks.size()); @@ -108,7 +102,6 @@ public void run() { } else { redisService.releaseDistributedLock(ATLAS_TASK_LOCK); } - fetcher.clearTasks(); Thread.sleep(pollInterval); } catch (InterruptedException interruptedException) { LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart"); @@ -117,6 +110,7 @@ public void run() { LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e); } finally { redisService.releaseDistributedLock(ATLAS_TASK_LOCK); + fetcher.clearTasks(); } } } @@ -146,7 +140,7 @@ private void submitAll(List tasks, CountDownLatch latch) { } } - static class TasksFetcher implements Runnable { + static class TasksFetcher { private TaskRegistry registry; private List tasks = new ArrayList<>(); @@ -154,7 +148,6 @@ public TasksFetcher(TaskRegistry registry) { this.registry = registry; } - @Override public void run() { if (LOG.isDebugEnabled()) { LOG.debug("TasksFetcher: Fetching tasks for queuing"); @@ -164,6 +157,7 @@ public void run() { } public List getTasks() { + run(); return tasks; } diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 8c08a26121..bffa4c8672 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -430,6 +430,7 @@ public List getTasksForReQueueIndexSearch() { } else { LOG.warn(String.format("There is a mismatch on tasks status between ES and Cassandra for guid: %s", atlasTask.getGuid())); + vertex.setProperty(Constants.TASK_STATUS, atlasTask.getStatus().toString()); } } else { LOG.warn("Null vertex while re-queuing tasks at index {}", fetched); @@ -446,6 +447,8 @@ public List getTasksForReQueueIndexSearch() { } } catch (Exception e){ break; + } finally { + graph.commit(); } } From 0befe573eea428d377dcb66836745a2681a8a3d4 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Fri, 29 Nov 2024 16:11:57 +0530 Subject: [PATCH 02/10] Added repair method for Task ES Docs mismatch --- .../janus/AtlasElasticsearchQuery.java | 41 +++++++++++++++ .../org/apache/atlas/tasks/TaskRegistry.java | 52 +++++++++++++++++-- 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index dd182e3040..93b34ea4e4 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -174,6 +174,19 @@ private Map runQueryWithLowLevelClient(String query) throws Atla } } + private Map runUpdateByQueryWithLowLevelClient(String query) throws AtlasBaseException { + try { + String responseString = performDirectUpdateByQuery(query); + + Map responseMap = AtlasType.fromJson(responseString, Map.class); + return responseMap; + + } catch (IOException e) { + LOG.error("Failed to execute direct query on ES {}", e.getMessage()); + throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED, e.getMessage()); + } + } + private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchParams) throws AtlasBaseException, IOException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("performAsyncDirectIndexQuery"); DirectIndexQueryResult result = null; @@ -444,6 +457,30 @@ private String performDirectIndexQuery(String query, boolean source) throws Atla return EntityUtils.toString(response.getEntity()); } + private String performDirectUpdateByQuery(String query) throws AtlasBaseException, IOException { + HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON); + String endPoint; + + endPoint = index + "/_update_by_query"; + + Request request = new Request("POST", endPoint); + request.setEntity(entity); + + Response response; + try { + response = lowLevelRestClient.performRequest(request); + } catch (ResponseException rex) { + if (rex.getResponse().getStatusLine().getStatusCode() == 404) { + LOG.warn(String.format("ES index with name %s not found", index)); + throw new AtlasBaseException(INDEX_NOT_FOUND, index); + } else { + throw new AtlasBaseException(String.format("Error in executing elastic query: %s", EntityUtils.toString(entity)), rex); + } + } + + return EntityUtils.toString(response.getEntity()); + } + private DirectIndexQueryResult getResultFromResponse(String responseString, boolean async) throws IOException { Map responseMap = AtlasType.fromJson(responseString, Map.class); return getResultFromResponse(responseMap.get("response")); @@ -495,6 +532,10 @@ public Map directIndexQuery(String query) throws AtlasBaseExcept return runQueryWithLowLevelClient(query); } + public Map directUpdateByQuery(String query) throws AtlasBaseException { + return runUpdateByQueryWithLowLevelClient(query); + } + @Override public Iterator> vertices() { SearchRequest searchRequest = getSearchRequest(index, sourceBuilder); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index bffa4c8672..5f0285bb36 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -17,6 +17,7 @@ */ package org.apache.atlas.tasks; +import com.datastax.oss.driver.shaded.fasterxml.jackson.databind.ObjectMapper; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.RequestContext; import org.apache.atlas.annotation.GraphTransaction; @@ -31,6 +32,7 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.DirectIndexQueryResult; +import org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchQuery; import org.apache.atlas.type.AtlasType; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; @@ -49,6 +51,7 @@ import java.util.List; import java.util.Arrays; import java.util.Map; +import java.util.LinkedHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -61,6 +64,7 @@ public class TaskRegistry { private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class); public static final int TASK_FETCH_BATCH_SIZE = 100; public static final List> SORT_ARRAY = Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc"))); + public static final String JANUSGRAPH_VERTEX_INDEX = "janusgraph_vertex_index"; private AtlasGraph graph; private TaskService taskService; @@ -430,7 +434,7 @@ public List getTasksForReQueueIndexSearch() { } else { LOG.warn(String.format("There is a mismatch on tasks status between ES and Cassandra for guid: %s", atlasTask.getGuid())); - vertex.setProperty(Constants.TASK_STATUS, atlasTask.getStatus().toString()); + repairMismatchedTask(atlasTask); } } else { LOG.warn("Null vertex while re-queuing tasks at index {}", fetched); @@ -447,14 +451,56 @@ public List getTasksForReQueueIndexSearch() { } } catch (Exception e){ break; - } finally { - graph.commit(); } } return ret; } + private void repairMismatchedTask(AtlasTask atlasTask) { + AtlasElasticsearchQuery indexQuery = null; + + try { + // Create a map for the fields to be updated + Map fieldsToUpdate = new HashMap<>(); + fieldsToUpdate.put("__task_endTime", atlasTask.getEndTime().getTime()); + fieldsToUpdate.put("__task_timeTakenInSeconds", atlasTask.getTimeTakenInSeconds()); + fieldsToUpdate.put("__task_status", atlasTask.getStatus().toString()); + fieldsToUpdate.put("__task_modificationTimestamp", atlasTask.getUpdatedTime().getTime()); // Set current timestamp + + // Convert fieldsToUpdate map to JSON using Jackson + ObjectMapper objectMapper = new ObjectMapper(); + String fieldsToUpdateJson = objectMapper.writeValueAsString(fieldsToUpdate); + + // Construct the Elasticsearch update by query DSL + String queryDsl = "{" + + "\"script\": {" + + " \"source\": \"for (entry in params.fields.entrySet()) { ctx._source[entry.getKey()] = entry.getValue() }\"," + + " \"params\": {" + + " \"fields\": " + fieldsToUpdateJson + + " }" + + "}," + + "\"query\": {" + + " \"match\": {" + + " \"__task_guid\": \"" + atlasTask.getGuid() + "\"" + + " }" + + "}" + + "}"; + + // Execute the Elasticsearch query + indexQuery = (AtlasElasticsearchQuery) graph.elasticsearchQuery(JANUSGRAPH_VERTEX_INDEX); + Map result = indexQuery.directUpdateByQuery(queryDsl); + + if (result != null) { + LOG.info("Elasticsearch UpdateByQuery Result: " + result); + } else { + LOG.info("No documents updated in Elasticsearch for guid: " + atlasTask.getGuid()); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + public void commit() { this.graph.commit(); } From 3b2e7d3600a55216f921055f5f207ccf119d9dcb Mon Sep 17 00:00:00 2001 From: hr2904 Date: Fri, 29 Nov 2024 16:13:02 +0530 Subject: [PATCH 03/10] Added branch for auto-image creation github action --- .github/workflows/maven.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f8a09b5589..ee574c61aa 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -25,7 +25,7 @@ on: - beta - development - master - - lineageondemand + - plt529 jobs: build: @@ -64,7 +64,7 @@ jobs: - name: Build with Maven run: | branch_name=${{ steps.get_branch.outputs.branch }} - if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'lineageondemand' ]] + if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'plt529' ]] then echo "build without dashboard" chmod +x ./build.sh && ./build.sh build_without_dashboard From 7878bafe663ed7d27f7a12658b569c5fd38a4f83 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Fri, 29 Nov 2024 16:19:34 +0530 Subject: [PATCH 04/10] Added branch for auto-image creation github action --- .github/workflows/maven.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index ee574c61aa..9a7174a37d 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -25,7 +25,7 @@ on: - beta - development - master - - plt529 + - plt529test jobs: build: @@ -64,7 +64,7 @@ jobs: - name: Build with Maven run: | branch_name=${{ steps.get_branch.outputs.branch }} - if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'plt529' ]] + if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'plt529test' ]] then echo "build without dashboard" chmod +x ./build.sh && ./build.sh build_without_dashboard From 9729887c9997b65f74b520b20ba5ccc7fe7cb3f0 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Fri, 29 Nov 2024 18:38:31 +0530 Subject: [PATCH 05/10] Added a doc targetting fix. instead of using guid using doc_id. --- .../apache/atlas/tasks/TaskQueueWatcher.java | 28 +++++++++++++++---- .../org/apache/atlas/tasks/TaskRegistry.java | 10 ++++--- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java index 435741a7ec..a1360938f6 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java @@ -78,43 +78,61 @@ public void shutdown() { public void run() { boolean isMaintenanceMode = AtlasConfiguration.ATLAS_MAINTENANCE_MODE.getBoolean(); if (isMaintenanceMode) { - LOG.info("TaskQueueWatcher: Maintenance mode is enabled, new tasks will not be loaded into the queue until next restart"); + LOG.info("TaskQueueWatcher: Maintenance mode is enabled. New tasks will not be loaded into the queue until next restart."); return; } shouldRun.set(true); if (LOG.isDebugEnabled()) { - LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); + LOG.debug("TaskQueueWatcher: Starting thread {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); } while (shouldRun.get()) { + LOG.info("TaskQueueWatcher: Starting a new iteration of task fetching and processing."); TasksFetcher fetcher = new TasksFetcher(registry); try { + LOG.debug("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK); if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) { + LOG.warn("TaskQueueWatcher: Could not acquire lock: {}. Retrying after {} ms.", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS); Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS); continue; } + LOG.info("TaskQueueWatcher: Acquired distributed lock: {}", ATLAS_TASK_LOCK); + List tasks = fetcher.getTasks(); + LOG.info("TaskQueueWatcher: Fetched {} tasks for processing.", CollectionUtils.isNotEmpty(tasks) ? tasks.size() : 0); + if (CollectionUtils.isNotEmpty(tasks)) { final CountDownLatch latch = new CountDownLatch(tasks.size()); + LOG.info("TaskQueueWatcher: Submitting {} tasks to the queue.", tasks.size()); submitAll(tasks, latch); - LOG.info("Submitted {} tasks to the queue", tasks.size()); + + LOG.info("TaskQueueWatcher: Waiting for submitted tasks to complete."); waitForTasksToComplete(latch); + LOG.info("TaskQueueWatcher: All tasks have been processed."); } else { + LOG.info("TaskQueueWatcher: No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); } + + LOG.info("TaskQueueWatcher: Sleeping for {} ms before the next poll.", pollInterval); Thread.sleep(pollInterval); } catch (InterruptedException interruptedException) { - LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart"); + LOG.error("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException); break; } catch (Exception e) { - LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e); + LOG.error("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e); } finally { + LOG.info("TaskQueueWatcher: Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); fetcher.clearTasks(); + LOG.debug("TaskQueueWatcher: Cleared tasks from the fetcher."); } } + + LOG.info("TaskQueueWatcher: Thread has stopped. shouldRun is now set to false."); } + private void waitForTasksToComplete(final CountDownLatch latch) throws InterruptedException { if (latch.getCount() != 0) { LOG.info("TaskQueueWatcher: Waiting on Latch, current count: {}", latch.getCount()); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 5f0285bb36..a642e81605 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -37,6 +37,7 @@ import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections4.ListUtils; +import org.janusgraph.util.encoding.LongEncoding; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -434,7 +435,8 @@ public List getTasksForReQueueIndexSearch() { } else { LOG.warn(String.format("There is a mismatch on tasks status between ES and Cassandra for guid: %s", atlasTask.getGuid())); - repairMismatchedTask(atlasTask); + String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay())); + repairMismatchedTask(atlasTask, docId); } } else { LOG.warn("Null vertex while re-queuing tasks at index {}", fetched); @@ -457,7 +459,7 @@ public List getTasksForReQueueIndexSearch() { return ret; } - private void repairMismatchedTask(AtlasTask atlasTask) { + private void repairMismatchedTask(AtlasTask atlasTask, String docId) { AtlasElasticsearchQuery indexQuery = null; try { @@ -481,8 +483,8 @@ private void repairMismatchedTask(AtlasTask atlasTask) { + " }" + "}," + "\"query\": {" - + " \"match\": {" - + " \"__task_guid\": \"" + atlasTask.getGuid() + "\"" + + " \"term\": {" + + " \"_id\": \"" + docId + "\"" + " }" + "}" + "}"; From 5920f1af6ca9ef920d0a79e001035d65aeb7889b Mon Sep 17 00:00:00 2001 From: hr2904 Date: Sat, 30 Nov 2024 02:45:53 +0530 Subject: [PATCH 06/10] Fixed logging --- .../java/org/apache/atlas/tasks/TaskQueueWatcher.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java index a1360938f6..f6165a639d 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java @@ -90,7 +90,7 @@ public void run() { LOG.info("TaskQueueWatcher: Starting a new iteration of task fetching and processing."); TasksFetcher fetcher = new TasksFetcher(registry); try { - LOG.debug("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK); + LOG.info("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK); if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) { LOG.warn("TaskQueueWatcher: Could not acquire lock: {}. Retrying after {} ms.", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS); Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS); @@ -117,15 +117,15 @@ public void run() { LOG.info("TaskQueueWatcher: Sleeping for {} ms before the next poll.", pollInterval); Thread.sleep(pollInterval); } catch (InterruptedException interruptedException) { - LOG.error("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException); + LOG.info("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException); break; } catch (Exception e) { - LOG.error("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e); + LOG.info("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e); } finally { LOG.info("TaskQueueWatcher: Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); fetcher.clearTasks(); - LOG.debug("TaskQueueWatcher: Cleared tasks from the fetcher."); + LOG.info("TaskQueueWatcher: Cleared tasks from the fetcher."); } } From 340fd369e1b04d64a5fc745d27ff1a1e75f101f0 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Sat, 30 Nov 2024 03:10:45 +0530 Subject: [PATCH 07/10] Fixed logging #2 --- .../org/apache/atlas/tasks/TaskRegistry.java | 52 +++++++++++++------ 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index a642e81605..8ddda944e5 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -385,38 +385,47 @@ public List getTasksForReQueueIndexSearch() { DirectIndexQueryResult indexQueryResult = null; List ret = new ArrayList<>(); - int size = 1000; - int from = 0; + int size = 1000; // Batch size for fetching tasks + int from = 0; // Pagination offset + int totalFetched = 0; // Tracks the total number of tasks fetched + + LOG.info("Starting fetch of PENDING and IN_PROGRESS tasks. Queue size limit: {}", queueSize); IndexSearchParams indexSearchParams = new IndexSearchParams(); - List statusClauseList = new ArrayList(); + // Build query for PENDING and IN_PROGRESS tasks + List> statusClauseList = new ArrayList<>(); statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString()))); statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.PENDING.toString()))); - Map dsl = mapOf("query", mapOf("bool", mapOf("should", statusClauseList))); + Map dsl = mapOf( + "query", mapOf("bool", mapOf("should", statusClauseList)) + ); dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")))); dsl.put("size", size); - int totalFetched = 0; + while (true) { int fetched = 0; try { if (totalFetched + size > queueSize) { - size = queueSize - totalFetched; + size = queueSize - totalFetched; // Adjust size to not exceed queue size + LOG.info("Adjusting fetch size to {} based on queue size constraint.", size); } dsl.put("from", from); dsl.put("size", size); + LOG.debug("DSL Query for iteration: {}", dsl); indexSearchParams.setDsl(dsl); + LOG.info("Executing Elasticsearch query with from: {} and size: {}", from, size); AtlasIndexQuery indexQuery = graph.elasticsearchQuery(Constants.VERTEX_INDEX, indexSearchParams); try { indexQueryResult = indexQuery.vertices(indexSearchParams); + LOG.info("Query executed successfully for from: {} with size: {}", from, size); } catch (AtlasBaseException e) { - LOG.error("Failed to fetch pending/in-progress task vertices to re-que"); - e.printStackTrace(); + LOG.error("Failed to fetch PENDING/IN_PROGRESS task vertices. Exiting loop.", e); break; } @@ -428,34 +437,47 @@ public List getTasksForReQueueIndexSearch() { if (vertex != null) { AtlasTask atlasTask = toAtlasTask(vertex); + + LOG.debug("Processing fetched task: {}", atlasTask); if (atlasTask.getStatus().equals(AtlasTask.Status.PENDING) || - atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){ - LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString())); + atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS)) { + LOG.info("Adding task to the result list: {}", atlasTask); ret.add(atlasTask); - } - else { - LOG.warn(String.format("There is a mismatch on tasks status between ES and Cassandra for guid: %s", atlasTask.getGuid())); + } else { + LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}", + atlasTask.getGuid(), atlasTask.getStatus()); + + // Repair mismatched task String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay())); + LOG.info("Repairing mismatched task with docId: {}", docId); repairMismatchedTask(atlasTask, docId); } } else { - LOG.warn("Null vertex while re-queuing tasks at index {}", fetched); + LOG.warn("Null vertex encountered while re-queuing tasks at index {}", fetched); } fetched++; } + LOG.info("Fetched {} tasks in this iteration.", fetched); + } else { + LOG.warn("Index query result is null for from: {} and size: {}", from, size); } totalFetched += fetched; + LOG.info("Total tasks fetched so far: {}. Incrementing offset by {}.", totalFetched, size); + from += size; if (fetched < size || totalFetched >= queueSize) { + LOG.info("Breaking loop. Fetched fewer tasks ({}) than requested size ({}) or reached queue size limit ({}).", fetched, size, queueSize); break; } - } catch (Exception e){ + } catch (Exception e) { + LOG.error("Exception occurred during task fetching process. Exiting loop.", e); break; } } + LOG.info("Fetch process completed. Total tasks fetched: {}.", totalFetched); return ret; } From c88de8e631e070573a94665f0cf085e8a2a963ad Mon Sep 17 00:00:00 2001 From: hr2904 Date: Mon, 2 Dec 2024 10:51:56 +0530 Subject: [PATCH 08/10] Refatoring code for PR --- .../apache/atlas/tasks/TaskQueueWatcher.java | 26 ++--------- .../org/apache/atlas/tasks/TaskRegistry.java | 46 +++++-------------- 2 files changed, 17 insertions(+), 55 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java index f6165a639d..c212aad09e 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java @@ -78,61 +78,45 @@ public void shutdown() { public void run() { boolean isMaintenanceMode = AtlasConfiguration.ATLAS_MAINTENANCE_MODE.getBoolean(); if (isMaintenanceMode) { - LOG.info("TaskQueueWatcher: Maintenance mode is enabled. New tasks will not be loaded into the queue until next restart."); + LOG.info("TaskQueueWatcher: Maintenance mode is enabled, new tasks will not be loaded into the queue until next restart"); return; } shouldRun.set(true); if (LOG.isDebugEnabled()) { - LOG.debug("TaskQueueWatcher: Starting thread {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); + LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); } while (shouldRun.get()) { - LOG.info("TaskQueueWatcher: Starting a new iteration of task fetching and processing."); TasksFetcher fetcher = new TasksFetcher(registry); try { - LOG.info("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK); if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) { - LOG.warn("TaskQueueWatcher: Could not acquire lock: {}. Retrying after {} ms.", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS); Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS); continue; } LOG.info("TaskQueueWatcher: Acquired distributed lock: {}", ATLAS_TASK_LOCK); List tasks = fetcher.getTasks(); - LOG.info("TaskQueueWatcher: Fetched {} tasks for processing.", CollectionUtils.isNotEmpty(tasks) ? tasks.size() : 0); - if (CollectionUtils.isNotEmpty(tasks)) { final CountDownLatch latch = new CountDownLatch(tasks.size()); - LOG.info("TaskQueueWatcher: Submitting {} tasks to the queue.", tasks.size()); submitAll(tasks, latch); - - LOG.info("TaskQueueWatcher: Waiting for submitted tasks to complete."); + LOG.info("Submitted {} tasks to the queue", tasks.size()); waitForTasksToComplete(latch); - LOG.info("TaskQueueWatcher: All tasks have been processed."); } else { - LOG.info("TaskQueueWatcher: No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); } - - LOG.info("TaskQueueWatcher: Sleeping for {} ms before the next poll.", pollInterval); Thread.sleep(pollInterval); } catch (InterruptedException interruptedException) { - LOG.info("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException); + LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart"); break; } catch (Exception e) { - LOG.info("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e); + LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e); } finally { - LOG.info("TaskQueueWatcher: Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); fetcher.clearTasks(); - LOG.info("TaskQueueWatcher: Cleared tasks from the fetcher."); } } - - LOG.info("TaskQueueWatcher: Thread has stopped. shouldRun is now set to false."); } - private void waitForTasksToComplete(final CountDownLatch latch) throws InterruptedException { if (latch.getCount() != 0) { LOG.info("TaskQueueWatcher: Waiting on Latch, current count: {}", latch.getCount()); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 8ddda944e5..d405a900c6 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -385,47 +385,38 @@ public List getTasksForReQueueIndexSearch() { DirectIndexQueryResult indexQueryResult = null; List ret = new ArrayList<>(); - int size = 1000; // Batch size for fetching tasks - int from = 0; // Pagination offset - int totalFetched = 0; // Tracks the total number of tasks fetched - - LOG.info("Starting fetch of PENDING and IN_PROGRESS tasks. Queue size limit: {}", queueSize); + int size = 1000; + int from = 0; IndexSearchParams indexSearchParams = new IndexSearchParams(); - // Build query for PENDING and IN_PROGRESS tasks - List> statusClauseList = new ArrayList<>(); + List statusClauseList = new ArrayList(); statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString()))); statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.PENDING.toString()))); - Map dsl = mapOf( - "query", mapOf("bool", mapOf("should", statusClauseList)) - ); + Map dsl = mapOf("query", mapOf("bool", mapOf("should", statusClauseList))); dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")))); dsl.put("size", size); - + int totalFetched = 0; while (true) { int fetched = 0; try { if (totalFetched + size > queueSize) { - size = queueSize - totalFetched; // Adjust size to not exceed queue size - LOG.info("Adjusting fetch size to {} based on queue size constraint.", size); + size = queueSize - totalFetched; } dsl.put("from", from); dsl.put("size", size); - LOG.debug("DSL Query for iteration: {}", dsl); indexSearchParams.setDsl(dsl); - LOG.info("Executing Elasticsearch query with from: {} and size: {}", from, size); AtlasIndexQuery indexQuery = graph.elasticsearchQuery(Constants.VERTEX_INDEX, indexSearchParams); try { indexQueryResult = indexQuery.vertices(indexSearchParams); - LOG.info("Query executed successfully for from: {} with size: {}", from, size); } catch (AtlasBaseException e) { - LOG.error("Failed to fetch PENDING/IN_PROGRESS task vertices. Exiting loop.", e); + LOG.error("Failed to fetch pending/in-progress task vertices to re-que"); + e.printStackTrace(); break; } @@ -437,47 +428,34 @@ public List getTasksForReQueueIndexSearch() { if (vertex != null) { AtlasTask atlasTask = toAtlasTask(vertex); - - LOG.debug("Processing fetched task: {}", atlasTask); if (atlasTask.getStatus().equals(AtlasTask.Status.PENDING) || - atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS)) { - LOG.info("Adding task to the result list: {}", atlasTask); + atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){ + LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString())); ret.add(atlasTask); } else { LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}", atlasTask.getGuid(), atlasTask.getStatus()); - - // Repair mismatched task String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay())); - LOG.info("Repairing mismatched task with docId: {}", docId); repairMismatchedTask(atlasTask, docId); } } else { - LOG.warn("Null vertex encountered while re-queuing tasks at index {}", fetched); + LOG.warn("Null vertex while re-queuing tasks at index {}", fetched); } fetched++; } - LOG.info("Fetched {} tasks in this iteration.", fetched); - } else { - LOG.warn("Index query result is null for from: {} and size: {}", from, size); } totalFetched += fetched; - LOG.info("Total tasks fetched so far: {}. Incrementing offset by {}.", totalFetched, size); - from += size; if (fetched < size || totalFetched >= queueSize) { - LOG.info("Breaking loop. Fetched fewer tasks ({}) than requested size ({}) or reached queue size limit ({}).", fetched, size, queueSize); break; } - } catch (Exception e) { - LOG.error("Exception occurred during task fetching process. Exiting loop.", e); + } catch (Exception e){ break; } } - LOG.info("Fetch process completed. Total tasks fetched: {}.", totalFetched); return ret; } From 0da6164de4ac8af03611c9108faa175c8b0937da Mon Sep 17 00:00:00 2001 From: hr2904 Date: Mon, 2 Dec 2024 10:58:29 +0530 Subject: [PATCH 09/10] Refatoring code for PR --- .github/workflows/maven.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 9a7174a37d..f8a09b5589 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -25,7 +25,7 @@ on: - beta - development - master - - plt529test + - lineageondemand jobs: build: @@ -64,7 +64,7 @@ jobs: - name: Build with Maven run: | branch_name=${{ steps.get_branch.outputs.branch }} - if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'plt529test' ]] + if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'lineageondemand' ]] then echo "build without dashboard" chmod +x ./build.sh && ./build.sh build_without_dashboard From 5047e77ce984ad3cbd634cbd5df92a79a8206e7f Mon Sep 17 00:00:00 2001 From: hr2904 Date: Mon, 2 Dec 2024 12:46:01 +0530 Subject: [PATCH 10/10] Added more logs. --- .../apache/atlas/tasks/TaskQueueWatcher.java | 22 +++++++++++-- .../org/apache/atlas/tasks/TaskRegistry.java | 31 ++++++++++++++----- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java index c212aad09e..c4ccdc358e 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java @@ -87,36 +87,52 @@ public void run() { LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); } while (shouldRun.get()) { + LOG.info("TaskQueueWatcher: Starting a new iteration of task fetching and processing."); TasksFetcher fetcher = new TasksFetcher(registry); try { + LOG.info("TaskQueueWatcher: Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK); if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) { + LOG.warn("TaskQueueWatcher: Could not acquire lock: {}. Retrying after {} ms.", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS); Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS); continue; } LOG.info("TaskQueueWatcher: Acquired distributed lock: {}", ATLAS_TASK_LOCK); List tasks = fetcher.getTasks(); + LOG.info("TaskQueueWatcher: Fetched {} tasks for processing.", CollectionUtils.isNotEmpty(tasks) ? tasks.size() : 0); + if (CollectionUtils.isNotEmpty(tasks)) { final CountDownLatch latch = new CountDownLatch(tasks.size()); + LOG.info("TaskQueueWatcher: Submitting {} tasks to the queue.", tasks.size()); submitAll(tasks, latch); - LOG.info("Submitted {} tasks to the queue", tasks.size()); + + LOG.info("TaskQueueWatcher: Waiting for submitted tasks to complete."); waitForTasksToComplete(latch); + LOG.info("TaskQueueWatcher: All tasks have been processed."); } else { + LOG.info("TaskQueueWatcher: No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); } + + LOG.info("TaskQueueWatcher: Sleeping for {} ms before the next poll.", pollInterval); Thread.sleep(pollInterval); } catch (InterruptedException interruptedException) { - LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart"); + LOG.info("TaskQueueWatcher: Interrupted. Thread is terminating. New tasks will not be loaded into the queue until next restart.", interruptedException); break; } catch (Exception e) { - LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e); + LOG.info("TaskQueueWatcher: Exception occurred during task processing: {}", e.getMessage(), e); } finally { + LOG.info("TaskQueueWatcher: Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); fetcher.clearTasks(); + LOG.info("TaskQueueWatcher: Cleared tasks from the fetcher."); } } + + LOG.info("TaskQueueWatcher: Thread has stopped. shouldRun is now set to false."); } + private void waitForTasksToComplete(final CountDownLatch latch) throws InterruptedException { if (latch.getCount() != 0) { LOG.info("TaskQueueWatcher: Waiting on Latch, current count: {}", latch.getCount()); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index d405a900c6..014532c794 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -402,21 +402,24 @@ public List getTasksForReQueueIndexSearch() { int fetched = 0; try { if (totalFetched + size > queueSize) { - size = queueSize - totalFetched; + size = queueSize - totalFetched; // Adjust size to not exceed queue size + LOG.info("Adjusting fetch size to {} based on queue size constraint.", size); } dsl.put("from", from); dsl.put("size", size); + LOG.info("DSL Query for iteration: {}", dsl); indexSearchParams.setDsl(dsl); + LOG.info("Executing Elasticsearch query with from: {} and size: {}", from, size); AtlasIndexQuery indexQuery = graph.elasticsearchQuery(Constants.VERTEX_INDEX, indexSearchParams); try { indexQueryResult = indexQuery.vertices(indexSearchParams); + LOG.info("Query executed successfully for from: {} with size: {}", from, size); } catch (AtlasBaseException e) { - LOG.error("Failed to fetch pending/in-progress task vertices to re-que"); - e.printStackTrace(); + LOG.error("Failed to fetch PENDING/IN_PROGRESS task vertices. Exiting loop.", e); break; } @@ -428,34 +431,46 @@ public List getTasksForReQueueIndexSearch() { if (vertex != null) { AtlasTask atlasTask = toAtlasTask(vertex); + + LOG.info("Processing fetched task: {}", atlasTask); if (atlasTask.getStatus().equals(AtlasTask.Status.PENDING) || - atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){ - LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString())); + atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS)) { + LOG.info("Adding task to the result list: {}", atlasTask); ret.add(atlasTask); } else { LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}", atlasTask.getGuid(), atlasTask.getStatus()); + // Repair mismatched task String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay())); + LOG.info("Repairing mismatched task with docId: {}", docId); repairMismatchedTask(atlasTask, docId); } } else { - LOG.warn("Null vertex while re-queuing tasks at index {}", fetched); + LOG.warn("Null vertex encountered while re-queuing tasks at index {}", fetched); } fetched++; } + LOG.info("Fetched {} tasks in this iteration.", fetched); + } else { + LOG.warn("Index query result is null for from: {} and size: {}", from, size); } totalFetched += fetched; + LOG.info("Total tasks fetched so far: {}. Incrementing offset by {}.", totalFetched, size); + from += size; if (fetched < size || totalFetched >= queueSize) { + LOG.info("Breaking loop. Fetched fewer tasks ({}) than requested size ({}) or reached queue size limit ({}).", fetched, size, queueSize); break; } - } catch (Exception e){ + } catch (Exception e) { + LOG.error("Exception occurred during task fetching process. Exiting loop.", e); break; } } + LOG.info("Fetch process completed. Total tasks fetched: {}.", totalFetched); return ret; } @@ -494,7 +509,7 @@ private void repairMismatchedTask(AtlasTask atlasTask, String docId) { Map result = indexQuery.directUpdateByQuery(queryDsl); if (result != null) { - LOG.info("Elasticsearch UpdateByQuery Result: " + result); + LOG.info("Elasticsearch UpdateByQuery Result: " + result + "\nfor task : " + atlasTask.getGuid()); } else { LOG.info("No documents updated in Elasticsearch for guid: " + atlasTask.getGuid()); }