Skip to content

Commit

Permalink
Merge branch 'improvement/UTAPI-103/support_reindex_by_account' into …
Browse files Browse the repository at this point in the history
…tmp/octopus/w/8.1/improvement/UTAPI-103/support_reindex_by_account
  • Loading branch information
bert-e committed Jun 12, 2024
2 parents 90beb29 + 0c5a7ce commit b335381
Showing 1 changed file with 18 additions and 27 deletions.
45 changes: 18 additions & 27 deletions lib/reindex/s3_bucketd.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ def get_options():
parser.add_argument("--dry-run", action="store_true", help="Do not update redis")
group = parser.add_mutually_exclusive_group()
group.add_argument("-a", "--account", default=[], help="account canonical ID (all account buckets will be processed)", action="append", type=nonempty_string('account'))
group.add_argument("--account-file", default=None, help="file containing account canonical IDs", type=existing_file)
group.add_argument("--account-file", default=None, help="file containing account canonical IDs, one ID per line", type=existing_file)
group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket'))
group.add_argument("--bucket-file", default=None, help="file containing bucket names", type=existing_file)
group.add_argument("--bucket-file", default=None, help="file containing bucket names, one bucket name per line", type=existing_file)

options = parser.parse_args()
if options.bucket_file:
Expand All @@ -63,7 +63,7 @@ def inner(value):
def existing_file(path):
path = Path(path).resolve()
if not path.exists():
raise argparse.ArgumentTypeError("File does not exist")
raise argparse.ArgumentTypeError("File does not exist: %s"%path)
return path

def chunks(iterable, size):
Expand Down Expand Up @@ -204,7 +204,7 @@ def get_bucket_md(self, name):
raise InvalidListing(name)
return Bucket(canonId, name, md.get('objectLockEnabled', False))

def list_buckets(self, names = None, account = None):
def list_buckets(self, account=None):

def get_next_marker(p):
if p is None:
Expand All @@ -220,29 +220,20 @@ def get_next_marker(p):
if account is not None:
params['prefix'] = '%s..|..' % account

seen_buckets = set()

for _, payload in self._list_bucket(USERS_BUCKET, **params):
buckets = []
for result in payload.get('Contents', []):
match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups(), False)
if names is None or bucket.name in names:
# We need to get the attributes for each bucket to determine if it is locked
if self._only_latest_when_locked:
bucket_attrs = self._get_bucket_attributes(bucket.name)
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
buckets.append(bucket)
# We need to get the attributes for each bucket to determine if it is locked
if self._only_latest_when_locked:
bucket_attrs = self._get_bucket_attributes(bucket.name)
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
buckets.append(bucket)

if buckets:
yield buckets
if names:
seen_buckets.update(b.name for b in buckets)
# Break if we have seen all the buckets we are looking for
if all(b in seen_buckets for b in names):
break


def list_mpus(self, bucket):
_bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name
Expand Down Expand Up @@ -305,7 +296,7 @@ def _sum_objects(self, bucket, listing, only_latest_when_locked = False):
total_size += size

except InvalidListing:
_log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket.name, status_code))
_log.error('Invalid contents in listing. bucket:%s'%bucket.name)
raise InvalidListing(bucket.name)
return count, total_size

Expand Down Expand Up @@ -504,8 +495,8 @@ def log_report(resource, name, obj_count, total_size):
# Bucket reports can be updated as we get them
if options.dry_run:
for bucket, report in bucket_reports.items():
_log.warning(
"DryRun: resource buckets [%s] will be not updated with obj_count %i and total_size %i" % (
_log.info(
"DryRun: resource buckets [%s] would be updated with obj_count %i and total_size %i" % (
bucket, report['obj_count'], report['total_size']
)
)
Expand All @@ -527,7 +518,7 @@ def log_report(resource, name, obj_count, total_size):

_log.info('Found %s stale buckets' % len(stale_buckets))
if options.dry_run:
_log.warning("DryRun: not updating stale buckets")
_log.info("DryRun: not updating stale buckets")
else:
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
Expand All @@ -544,8 +535,8 @@ def log_report(resource, name, obj_count, total_size):
without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items())
if options.dry_run:
for userid, report in account_reports.items():
_log.warning(
"DryRun: resource account [%s] will be not updated with obj_count %i and total_size %i" % (
_log.info(
"DryRun: resource account [%s] would be updated with obj_count %i and total_size %i" % (
userid, report['obj_count'], report['total_size']
)
)
Expand All @@ -561,7 +552,7 @@ def log_report(resource, name, obj_count, total_size):
if options.account:
for account in options.account:
if account in failed_accounts:
_log.error("No metrics updated for %s, one or more buckets failed" % account)
_log.error("No metrics updated for account %s, one or more buckets failed" % account)

# Include failed_accounts in observed_accounts to avoid clearing metrics
observed_accounts = failed_accounts.union(set(account_reports.keys()))
Expand All @@ -575,7 +566,7 @@ def log_report(resource, name, obj_count, total_size):

_log.info('Found %s stale accounts' % len(stale_accounts))
if options.dry_run:
_log.warning("DryRun: not updating stale accounts")
_log.info("DryRun: not updating stale accounts")
else:
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
Expand Down

0 comments on commit b335381

Please sign in to comment.