Skip to content

Commit

Permalink
Merge branch 'quay:master' into Developer-Quay
Browse files Browse the repository at this point in the history
  • Loading branch information
sivaramsingana authored Sep 10, 2024
2 parents 33bf8c4 + 6470831 commit b2b27a5
Showing 1 changed file with 21 additions and 35 deletions.
56 changes: 21 additions & 35 deletions util/migrate/allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,27 @@

from bintrees import RBTree

# Set up the logger for this specific module
logger = logging.getLogger(__name__)

# Read the DEBUGLOG environment variable
# Read the DEBUGLOG environment variable to check if debugging is enabled
debug_log = os.getenv("DEBUGLOG", "false").lower() == "true"

# Set the logging level based on DEBUGLOG
if debug_log:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
# Configure logger independently for this module
logger.setLevel(logging.DEBUG if debug_log else logging.INFO)

# Create a console handler for logging
console_handler = logging.StreamHandler()

# Define the logging format to include worker_name and stream_type
log_format = "%(asctime)s [%(process)d] [%(levelname)s] [%(name)s] %(message)s"
formatter = logging.Formatter(log_format, datefmt="%Y-%m-%d %H:%M:%S")

# Apply the format to the handler
console_handler.setFormatter(formatter)

# Add the handler to the logger
logger.addHandler(console_handler)


class NoAvailableKeysError(ValueError):
Expand All @@ -39,7 +50,6 @@ def is_available(self, index):
if index >= self._max_index or index < self._min_index:
logger.debug("Index out of range")
return False

try:
prev_start, prev_length = self._slabs.floor_item(index)
logger.debug("Prev range: %s-%s", prev_start, prev_start + prev_length)
Expand All @@ -50,7 +60,6 @@ def is_available(self, index):
def mark_completed(self, start_index, past_last_index):
logger.debug("Marking the range completed: %s-%s", start_index, past_last_index)
num_completed = min(past_last_index, self._max_index) - max(start_index, self._min_index)

# Find the item directly before this and see if there is overlap
to_discard = set()
try:
Expand All @@ -59,27 +68,21 @@ def mark_completed(self, start_index, past_last_index):
if max_prev_completed >= start_index:
# we are going to merge with the range before us
logger.debug(
"Merging with the prev range: %s-%s",
prev_start,
prev_start + prev_length,
"Merging with the prev range: %s-%s", prev_start, prev_start + prev_length
)
to_discard.add(prev_start)
num_completed = max(num_completed - (max_prev_completed - start_index), 0)
start_index = prev_start
past_last_index = max(past_last_index, prev_start + prev_length)
except KeyError:
pass

# Find all keys between the start and last index and merge them into one block
for merge_start, merge_length in self._slabs.iter_items(start_index, past_last_index + 1):
if merge_start in to_discard:
logger.debug(
"Already merged with block %s-%s",
merge_start,
merge_start + merge_length,
"Already merged with block %s-%s", merge_start, merge_start + merge_length
)
continue

candidate_next_index = merge_start + merge_length
logger.debug("Merging with block %s-%s", merge_start, candidate_next_index)
num_completed -= merge_length - max(candidate_next_index - past_last_index, 0)
Expand All @@ -92,20 +95,16 @@ def mark_completed(self, start_index, past_last_index):
logger.debug("Discarding block and setting new max to: %s", start_index)
self._max_index = start_index
discard = True

if start_index <= self._min_index:
logger.debug("Discarding block and setting new min to: %s", past_last_index)
self._min_index = past_last_index
discard = True

if to_discard:
logger.debug("Discarding %s obsolete blocks", len(to_discard))
self._slabs.remove_items(to_discard)

if not discard:
logger.debug("Writing new block with range: %s-%s", start_index, past_last_index)
self._slabs.insert(start_index, past_last_index - start_index)

# Update the number of remaining items with the adjustments we've made
assert num_completed >= 0
self.num_remaining -= num_completed
Expand All @@ -115,22 +114,18 @@ def get_block_start_index(self, block_size_estimate):
logger.debug("Total range: %s-%s", self._min_index, self._max_index)
if self._max_index <= self._min_index:
raise NoAvailableKeysError("All indexes have been marked completed")

num_holes = len(self._slabs) + 1
random_hole = random.randint(0, num_holes - 1)
logger.debug("Selected random hole %s with %s total holes", random_hole, num_holes)

hole_start = self._min_index
past_hole_end = self._max_index

# Now that we have picked a hole, we need to define the bounds
if random_hole > 0:
# There will be a slab before this hole, find where it ends
bound_entries = self._slabs.nsmallest(random_hole + 1)[-2:]
left_index, left_len = bound_entries[0]
logger.debug("Left range %s-%s", left_index, left_index + left_len)
hole_start = left_index + left_len

if len(bound_entries) > 1:
right_index, right_len = bound_entries[1]
logger.debug("Right range %s-%s", right_index, right_index + right_len)
Expand All @@ -139,7 +134,6 @@ def get_block_start_index(self, block_size_estimate):
right_index, right_len = self._slabs.nsmallest(1)[0]
logger.debug("Right range %s-%s", right_index, right_index + right_len)
past_hole_end, _ = self._slabs.nsmallest(1)[0]

# Now that we have our hole bounds, select a random block from [0:len - block_size_estimate]
logger.debug("Selecting from hole range: %s-%s", hole_start, past_hole_end)
rand_max_bound = max(hole_start, past_hole_end - block_size_estimate)
Expand All @@ -152,17 +146,14 @@ def yield_random_entries(
):
"""
This method will yield items from random blocks in the database.
We will track metadata about which keys are available for work, and we will complete the
backfill when there is no more work to be done. The method yields tuples of (candidate, Event),
and if the work was already done by another worker, the caller should set the event. Batch
candidates must have an "id" field which can be inspected.
"""

min_id = max(min_id, 0)
max_id = max(max_id, 1)
allocator = CompletedKeys(max_id + 1, min_id)

try:
while True:
start_index = allocator.get_block_start_index(batch_size)
Expand All @@ -172,7 +163,6 @@ def yield_random_entries(
.where(primary_key_field >= start_index, primary_key_field < end_index)
.order_by(primary_key_field)
)

if len(all_candidates) == 0:
logger.debug(
"No candidates, marking entire block completed %s-%s by worker %s",
Expand All @@ -182,7 +172,6 @@ def yield_random_entries(
)
allocator.mark_completed(start_index, end_index)
continue

logger.debug(
"Found %s candidates, processing block start: %d end: %d by worker %s",
len(all_candidates),
Expand All @@ -196,20 +185,17 @@ def yield_random_entries(
yield candidate, abort_early, allocator.num_remaining - batch_completed
batch_completed += 1
if abort_early.is_set():
logger.debug(
"Overlap with another worker, aborting by worker %s",
worker_name,
)
logger.debug("Overlap with another worker, aborting by worker %s", worker_name)
break

completed_through = candidate.id + 1

logger.debug(
"Marking id range as completed: %s-%s by worker %s",
start_index,
completed_through,
worker_name,
)
allocator.mark_completed(start_index, completed_through)

except NoAvailableKeysError:
logger.debug("No more work by worker %s", worker_name)

0 comments on commit b2b27a5

Please sign in to comment.