From 7e1a8eac977c8d207c77853887ee9a0ac379e850 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Thu, 28 Nov 2024 19:32:08 +0530 Subject: [PATCH] Init loggings --- .github/workflows/maven.yml | 4 +- .../apache/atlas/tasks/TaskQueueWatcher.java | 25 +++++++++-- .../org/apache/atlas/tasks/TaskRegistry.java | 42 +++++++++++++------ 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f8a09b5589..2a122c20c1 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -25,7 +25,7 @@ on: - beta - development - master - - lineageondemand + - dg1942 jobs: build: @@ -64,7 +64,7 @@ jobs: - name: Build with Maven run: | branch_name=${{ steps.get_branch.outputs.branch }} - if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'lineageondemand' ]] + if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'dg1942' ]] then echo "build without dashboard" chmod +x ./build.sh && ./build.sh build_without_dashboard diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java index 9214cbfbf9..b11f8440df 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java @@ -88,33 +88,52 @@ public void run() { } while (shouldRun.get()) { try { + LOG.info("Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK); + if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) { + LOG.info("Distributed lock {} not acquired. Retrying after {} ms", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS); Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS); continue; } + LOG.info("Successfully acquired distributed lock: {}", ATLAS_TASK_LOCK); + + LOG.info("Initializing task fetcher..."); TasksFetcher fetcher = new TasksFetcher(registry); + LOG.info("Starting task fetcher thread..."); Thread tasksFetcherThread = new Thread(fetcher); tasksFetcherThread.start(); + + LOG.info("Waiting for task fetcher thread to complete..."); tasksFetcherThread.join(); List tasks = fetcher.getTasks(); if (CollectionUtils.isNotEmpty(tasks)) { + LOG.info("Fetched {} tasks for processing.", tasks.size()); + final CountDownLatch latch = new CountDownLatch(tasks.size()); + LOG.info("Submitting all tasks to the queue..."); submitAll(tasks, latch); - LOG.info("Submitted {} tasks to the queue", tasks.size()); + + LOG.info("Submitted {} tasks to the queue. Waiting for tasks to complete...", tasks.size()); waitForTasksToComplete(latch); + + LOG.info("All tasks processed successfully."); } else { + LOG.info("No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); } + + LOG.info("Sleeping for {} ms before next poll...", pollInterval); Thread.sleep(pollInterval); } catch (InterruptedException interruptedException) { - LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart"); + LOG.error("TaskQueueWatcher: Interrupted. Thread is terminated. No new tasks will be loaded into the queue until next restart.", interruptedException); break; } catch (Exception e) { - LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e); + LOG.error("TaskQueueWatcher: Exception occurred: {}", e.getMessage(), e); } finally { + LOG.info("Releasing distributed lock: {}", ATLAS_TASK_LOCK); redisService.releaseDistributedLock(ATLAS_TASK_LOCK); } } diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 8c08a26121..4b3f7db95b 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -382,6 +382,9 @@ public List getTasksForReQueueIndexSearch() { int size = 1000; int from = 0; + int totalFetched = 0; // Tracks the total number of tasks fetched + + LOG.info("Starting re-queue task search. Initial fetch size: {}, queue size: {}", size, queueSize); IndexSearchParams indexSearchParams = new IndexSearchParams(); @@ -392,25 +395,29 @@ public List getTasksForReQueueIndexSearch() { Map dsl = mapOf("query", mapOf("bool", mapOf("should", statusClauseList))); dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")))); dsl.put("size", size); - int totalFetched = 0; + while (true) { - int fetched = 0; + int fetched = 0; // Tracks tasks fetched in this iteration try { if (totalFetched + size > queueSize) { - size = queueSize - totalFetched; + size = queueSize - totalFetched; // Adjust size to avoid exceeding queue size + LOG.info("Adjusted fetch size to {} based on queue size constraint.", size); } dsl.put("from", from); dsl.put("size", size); + LOG.info("Query DSL updated for iteration: {}", dsl); indexSearchParams.setDsl(dsl); + LOG.info("Querying Elasticsearch from offset {} with size {}", from, size); AtlasIndexQuery indexQuery = graph.elasticsearchQuery(Constants.VERTEX_INDEX, indexSearchParams); try { indexQueryResult = indexQuery.vertices(indexSearchParams); + LOG.info("Query executed successfully for offset {} with size {}", from, size); } catch (AtlasBaseException e) { - LOG.error("Failed to fetch pending/in-progress task vertices to re-que"); + LOG.error("Failed to fetch pending/in-progress task vertices to re-queue. Exiting loop.", e); e.printStackTrace(); break; } @@ -424,31 +431,42 @@ public List getTasksForReQueueIndexSearch() { if (vertex != null) { AtlasTask atlasTask = toAtlasTask(vertex); if (atlasTask.getStatus().equals(AtlasTask.Status.PENDING) || - atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){ - LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString())); + atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS)) { + LOG.info("Fetched task from index search: {}", atlasTask); ret.add(atlasTask); - } - else { - LOG.warn(String.format("There is a mismatch on tasks status between ES and Cassandra for guid: %s", atlasTask.getGuid())); + } else { + LOG.warn("Mismatch in task status between ES and Cassandra for guid: {}", atlasTask.getGuid()); } } else { - LOG.warn("Null vertex while re-queuing tasks at index {}", fetched); + LOG.warn("Null vertex encountered while re-queuing tasks at index {}", fetched); } fetched++; } + LOG.info("Fetched {} tasks in this iteration.", fetched); + } else { + LOG.warn("Index query result is null for offset {} and size {}.", from, size); } totalFetched += fetched; + LOG.info("Total fetched so far: {}. Fetch size for this iteration: {}.", totalFetched, fetched); + from += size; - if (fetched < size || totalFetched >= queueSize) { + if (fetched < size) { + LOG.info("Breaking loop as fewer results ({}) than fetch size ({}) were returned.", fetched, size); + break; + } + if (totalFetched >= queueSize) { + LOG.info("Breaking loop as total fetched ({}) has reached queue size ({}).", totalFetched, queueSize); break; } - } catch (Exception e){ + } catch (Exception e) { + LOG.error("Unexpected exception occurred. Exiting loop.", e); break; } } + LOG.info("Re-queue task search completed. Total tasks fetched: {}.", totalFetched); return ret; }