Skip to content

Commit

Permalink
Init loggings
Browse files Browse the repository at this point in the history
  • Loading branch information
hr2904 committed Nov 28, 2024
1 parent 2e353ad commit 7e1a8ea
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 17 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ on:
- beta
- development
- master
- lineageondemand
- dg1942

jobs:
build:
Expand Down Expand Up @@ -64,7 +64,7 @@ jobs:
- name: Build with Maven
run: |
branch_name=${{ steps.get_branch.outputs.branch }}
if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'lineageondemand' ]]
if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'dg1942' ]]
then
echo "build without dashboard"
chmod +x ./build.sh && ./build.sh build_without_dashboard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,33 +88,52 @@ public void run() {
}
while (shouldRun.get()) {
try {
LOG.info("Attempting to acquire distributed lock: {}", ATLAS_TASK_LOCK);

if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) {
LOG.info("Distributed lock {} not acquired. Retrying after {} ms", ATLAS_TASK_LOCK, AtlasConstants.TASK_WAIT_TIME_MS);
Thread.sleep(AtlasConstants.TASK_WAIT_TIME_MS);
continue;
}

LOG.info("Successfully acquired distributed lock: {}", ATLAS_TASK_LOCK);

LOG.info("Initializing task fetcher...");
TasksFetcher fetcher = new TasksFetcher(registry);

LOG.info("Starting task fetcher thread...");
Thread tasksFetcherThread = new Thread(fetcher);
tasksFetcherThread.start();

LOG.info("Waiting for task fetcher thread to complete...");
tasksFetcherThread.join();

List<AtlasTask> tasks = fetcher.getTasks();
if (CollectionUtils.isNotEmpty(tasks)) {
LOG.info("Fetched {} tasks for processing.", tasks.size());

final CountDownLatch latch = new CountDownLatch(tasks.size());
LOG.info("Submitting all tasks to the queue...");
submitAll(tasks, latch);
LOG.info("Submitted {} tasks to the queue", tasks.size());

LOG.info("Submitted {} tasks to the queue. Waiting for tasks to complete...", tasks.size());
waitForTasksToComplete(latch);

LOG.info("All tasks processed successfully.");
} else {
LOG.info("No tasks fetched. Releasing distributed lock: {}", ATLAS_TASK_LOCK);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
}

LOG.info("Sleeping for {} ms before next poll...", pollInterval);
Thread.sleep(pollInterval);
} catch (InterruptedException interruptedException) {
LOG.error("TaskQueueWatcher: Interrupted: thread is terminated, new tasks will not be loaded into the queue until next restart");
LOG.error("TaskQueueWatcher: Interrupted. Thread is terminated. No new tasks will be loaded into the queue until next restart.", interruptedException);
break;
} catch (Exception e) {
LOG.error("TaskQueueWatcher: Exception occurred " + e.getMessage(), e);
LOG.error("TaskQueueWatcher: Exception occurred: {}", e.getMessage(), e);
} finally {
LOG.info("Releasing distributed lock: {}", ATLAS_TASK_LOCK);
redisService.releaseDistributedLock(ATLAS_TASK_LOCK);
}
}
Expand Down
42 changes: 30 additions & 12 deletions repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {

int size = 1000;
int from = 0;
int totalFetched = 0; // Tracks the total number of tasks fetched

LOG.info("Starting re-queue task search. Initial fetch size: {}, queue size: {}", size, queueSize);

IndexSearchParams indexSearchParams = new IndexSearchParams();

Expand All @@ -392,25 +395,29 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
Map<String, Object> dsl = mapOf("query", mapOf("bool", mapOf("should", statusClauseList)));
dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc"))));
dsl.put("size", size);
int totalFetched = 0;

while (true) {
int fetched = 0;
int fetched = 0; // Tracks tasks fetched in this iteration
try {
if (totalFetched + size > queueSize) {
size = queueSize - totalFetched;
size = queueSize - totalFetched; // Adjust size to avoid exceeding queue size
LOG.info("Adjusted fetch size to {} based on queue size constraint.", size);
}

dsl.put("from", from);
dsl.put("size", size);
LOG.info("Query DSL updated for iteration: {}", dsl);

indexSearchParams.setDsl(dsl);

LOG.info("Querying Elasticsearch from offset {} with size {}", from, size);
AtlasIndexQuery indexQuery = graph.elasticsearchQuery(Constants.VERTEX_INDEX, indexSearchParams);

try {
indexQueryResult = indexQuery.vertices(indexSearchParams);
LOG.info("Query executed successfully for offset {} with size {}", from, size);
} catch (AtlasBaseException e) {
LOG.error("Failed to fetch pending/in-progress task vertices to re-que");
LOG.error("Failed to fetch pending/in-progress task vertices to re-queue. Exiting loop.", e);
e.printStackTrace();
break;
}
Expand All @@ -424,31 +431,42 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
if (vertex != null) {
AtlasTask atlasTask = toAtlasTask(vertex);
if (atlasTask.getStatus().equals(AtlasTask.Status.PENDING) ||
atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS) ){
LOG.info(String.format("Fetched task from index search: %s", atlasTask.toString()));
atlasTask.getStatus().equals(AtlasTask.Status.IN_PROGRESS)) {
LOG.info("Fetched task from index search: {}", atlasTask);
ret.add(atlasTask);
}
else {
LOG.warn(String.format("There is a mismatch on tasks status between ES and Cassandra for guid: %s", atlasTask.getGuid()));
} else {
LOG.warn("Mismatch in task status between ES and Cassandra for guid: {}", atlasTask.getGuid());
}
} else {
LOG.warn("Null vertex while re-queuing tasks at index {}", fetched);
LOG.warn("Null vertex encountered while re-queuing tasks at index {}", fetched);
}

fetched++;
}
LOG.info("Fetched {} tasks in this iteration.", fetched);
} else {
LOG.warn("Index query result is null for offset {} and size {}.", from, size);
}

totalFetched += fetched;
LOG.info("Total fetched so far: {}. Fetch size for this iteration: {}.", totalFetched, fetched);

from += size;
if (fetched < size || totalFetched >= queueSize) {
if (fetched < size) {
LOG.info("Breaking loop as fewer results ({}) than fetch size ({}) were returned.", fetched, size);
break;
}
if (totalFetched >= queueSize) {
LOG.info("Breaking loop as total fetched ({}) has reached queue size ({}).", totalFetched, queueSize);
break;
}
} catch (Exception e){
} catch (Exception e) {
LOG.error("Unexpected exception occurred. Exiting loop.", e);
break;
}
}

LOG.info("Re-queue task search completed. Total tasks fetched: {}.", totalFetched);
return ret;
}

Expand Down

0 comments on commit 7e1a8ea

Please sign in to comment.