diff --git a/test/integration/bucket_cleaner.py b/test/integration/bucket_cleaner.py index e1b1e5783..d54f2156b 100644 --- a/test/integration/bucket_cleaner.py +++ b/test/integration/bucket_cleaner.py @@ -8,11 +8,17 @@ # ###################################################################### +import re +import time + from typing import Optional +from b2sdk.bucket import Bucket +from b2sdk.file_lock import NO_RETENTION_FILE_SETTING, LegalHold, RetentionMode +from b2sdk.utils import current_time_millis from b2sdk.v2 import * -from .helpers import GENERAL_BUCKET_NAME_PREFIX, BUCKET_CREATED_AT_MILLIS, authorize +from .helpers import BUCKET_CREATED_AT_MILLIS, GENERAL_BUCKET_NAME_PREFIX, authorize ONE_HOUR_MILLIS = 60 * 60 * 1000 @@ -88,3 +94,61 @@ def cleanup_buckets(self): else: print('Removing bucket:', bucket.name) b2_api.delete_bucket(bucket) + + +def _cleanup_old_buckets(raw_api, api_url, account_auth_token, account_id, bucket_list_dict): + for bucket_dict in bucket_list_dict['buckets']: + bucket_id = bucket_dict['bucketId'] + bucket_name = bucket_dict['bucketName'] + if _should_delete_bucket(bucket_name): + print('cleaning up old bucket: ' + bucket_name) + _clean_and_delete_bucket( + raw_api, + api_url, + account_auth_token, + account_id, + bucket_id, + ) + + +def _clean_and_delete_bucket(raw_api, api_url, account_auth_token, account_id, bucket_id): + # Delete the files. This test never creates more than a few files, + # so one call to list_file_versions should get them all. + versions_dict = raw_api.list_file_versions(api_url, account_auth_token, bucket_id) + for version_dict in versions_dict['files']: + file_id = version_dict['fileId'] + file_name = version_dict['fileName'] + action = version_dict['action'] + if action in ['hide', 'upload']: + print('b2_delete_file', file_name, action) + if action == 'upload' and version_dict[ + 'fileRetention'] and version_dict['fileRetention']['value']['mode'] is not None: + raw_api.update_file_retention( + api_url, + account_auth_token, + file_id, + file_name, + NO_RETENTION_FILE_SETTING, + bypass_governance=True + ) + raw_api.delete_file_version(api_url, account_auth_token, file_id, file_name) + else: + print('b2_cancel_large_file', file_name) + raw_api.cancel_large_file(api_url, account_auth_token, file_id) + + # Delete the bucket + print('b2_delete_bucket', bucket_id) + raw_api.delete_bucket(api_url, account_auth_token, account_id, bucket_id) + + +def _should_delete_bucket(bucket_name): + # Bucket names for this test look like: c7b22d0b0ad7-1460060364-5670 + # Other buckets should not be deleted. + match = re.match(r'^test-raw-api-[a-f0-9]+-([0-9]+)-([0-9]+)', bucket_name) + if match is None: + return False + + # Is it more than an hour old? + bucket_time = int(match.group(1)) + now = time.time() + return bucket_time + 3600 <= now diff --git a/test/integration/conftest.py b/test/integration/conftest.py index 7b2c2d316..c5940759e 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -8,8 +8,20 @@ # ###################################################################### +from io import BytesIO +from os import environ +from random import randint +from time import time + import pytest +from b2sdk.b2http import B2Http +from b2sdk.encryption.setting import EncryptionAlgorithm, EncryptionMode, EncryptionSetting +from b2sdk.raw_api import ALL_CAPABILITIES, REALM_URLS, B2RawHTTPApi +from b2sdk.utils import hex_sha1_of_stream + +from .bucket_cleaner import _clean_and_delete_bucket + def pytest_addoption(parser): """Add a flag for not cleaning up old buckets""" @@ -20,6 +32,144 @@ def pytest_addoption(parser): ) -@pytest.fixture +@pytest.fixture(scope='session') def dont_cleanup_old_buckets(request): return request.config.getoption("--dont-cleanup-old-buckets") + + +@pytest.fixture(scope='session') +def application_key_id() -> str: + key_id = environ.get('B2_TEST_APPLICATION_KEY_ID') + assert key_id + return key_id + + +@pytest.fixture(scope='session') +def application_key() -> str: + key = environ.get('B2_TEST_APPLICATION_KEY') + assert key + return key + + +@pytest.fixture(scope='module') +def raw_api() -> B2RawHTTPApi: + return B2RawHTTPApi(B2Http()) + + +@pytest.fixture(scope='session') +def realm_url() -> str: + realm = environ.get('B2_TEST_ENVIRONMENT', 'production') + return REALM_URLS.get(realm, realm) + + +@pytest.fixture(scope='module') +def auth_dict(raw_api, application_key, application_key_id, realm_url) -> dict: + result = raw_api.authorize_account(realm_url, application_key_id, application_key) + + missing_capabilities = set(ALL_CAPABILITIES) - \ + {'readBuckets', 'listAllBucketNames'} - set(result['allowed']['capabilities']) + assert not missing_capabilities, \ + f'Seems like a non-full key. Missing capabilities: {missing_capabilities}' + + return result + + +@pytest.fixture(scope='module') +def account_id(auth_dict) -> str: + return auth_dict['accountId'] + + +@pytest.fixture(scope='module') +def account_auth_token(auth_dict) -> str: + return auth_dict['authorizationToken'] + + +@pytest.fixture(scope='module') +def api_url(auth_dict) -> str: + return auth_dict['apiUrl'] + + +@pytest.fixture(scope='module') +def download_url(auth_dict) -> str: + return auth_dict['downloadUrl'] + + +@pytest.fixture(scope='module') +def bucket_dict(raw_api, api_url, account_auth_token, account_id) -> dict: + # include the account ID in the bucket name to be + # sure it doesn't collide with bucket names from + # other accounts. + bucket_name = f'test-raw-api-{account_id}-{time():.0f}-{randint(1000, 9999)}' + + bucket = raw_api.create_bucket( + api_url, + account_auth_token, + account_id, + bucket_name, + 'allPublic', + is_file_lock_enabled=True, + ) + yield bucket + + _clean_and_delete_bucket( + raw_api, + api_url, + account_auth_token, + account_id, + bucket['bucketId'], + ) + + +@pytest.fixture(scope='module') +def bucket_id(bucket_dict) -> str: + return bucket_dict['bucketId'] + + +@pytest.fixture(scope='module') +def bucket_name(bucket_dict) -> str: + return bucket_dict['bucketName'] + + +@pytest.fixture(scope='module') +def upload_url_dict(raw_api, api_url, account_auth_token, bucket_id) -> dict: + return raw_api.get_upload_url(api_url, account_auth_token, bucket_id) + + +TEST_FILE_NAME = 'test.txt' +TEST_FILE_CONTENTS = b'hello world' + + +@pytest.fixture(scope='module') +def file_dict(raw_api, upload_url_dict, sse_b2_aes) -> dict: + return raw_api.upload_file( + upload_url_dict['uploadUrl'], + upload_url_dict['authorizationToken'], + TEST_FILE_NAME, + len(TEST_FILE_CONTENTS), + 'text/plain', + hex_sha1_of_stream(BytesIO(TEST_FILE_CONTENTS), len(TEST_FILE_CONTENTS)), + {'color': 'blue'}, + BytesIO(TEST_FILE_CONTENTS), + server_side_encryption=sse_b2_aes, + ) + + +@pytest.fixture(scope='module') +def file_id(file_dict) -> str: + return file_dict['fileId'] + + +@pytest.fixture(scope='module') +def download_auth_dict(raw_api, api_url, account_auth_token, bucket_id) -> dict: + return raw_api.get_download_authorization( + api_url, + account_auth_token, + bucket_id, + TEST_FILE_NAME[:-2], + 12345, + ) + + +@pytest.fixture(scope='module') +def download_auth_token(download_auth_dict) -> str: + return download_auth_dict['authorizationToken'] diff --git a/test/integration/test_raw_api.py b/test/integration/test_raw_api.py index 798a62fbf..ecdc0f49c 100644 --- a/test/integration/test_raw_api.py +++ b/test/integration/test_raw_api.py @@ -8,101 +8,51 @@ # ###################################################################### import io -import os -import random -import re -import sys -import time -import traceback + +from random import randint +from time import time import pytest -from b2sdk.b2http import B2Http -from b2sdk.encryption.setting import EncryptionAlgorithm, EncryptionMode, EncryptionSetting -from b2sdk.replication.setting import ReplicationConfiguration, ReplicationSourceConfiguration, ReplicationRule, ReplicationDestinationConfiguration +from b2sdk.encryption.setting import SSE_B2_AES, SSE_NONE +from b2sdk.file_lock import BucketRetentionSetting, RetentionMode, RetentionPeriod +from b2sdk.replication.setting import ReplicationConfiguration, ReplicationDestinationConfiguration, ReplicationRule, ReplicationSourceConfiguration from b2sdk.replication.types import ReplicationStatus -from b2sdk.file_lock import BucketRetentionSetting, NO_RETENTION_FILE_SETTING, RetentionMode, RetentionPeriod -from b2sdk.raw_api import ALL_CAPABILITIES, B2RawHTTPApi, REALM_URLS from b2sdk.utils import hex_sha1_of_stream +from .bucket_cleaner import _clean_and_delete_bucket, _cleanup_old_buckets +from .conftest import TEST_FILE_CONTENTS, TEST_FILE_NAME -# TODO: rewrite to separate test cases -def test_raw_api(dont_cleanup_old_buckets): - """ - Exercise the code in B2RawHTTPApi by making each call once, just - to make sure the parameters are passed in, and the result is - passed back. - - The goal is to be a complete test of B2RawHTTPApi, so the tests for - the rest of the code can use the simulator. - - Prints to stdout if things go wrong. - - :return: 0 on success, non-zero on failure - """ - application_key_id = os.environ.get('B2_TEST_APPLICATION_KEY_ID') - if application_key_id is None: - pytest.fail('B2_TEST_APPLICATION_KEY_ID is not set.') - - application_key = os.environ.get('B2_TEST_APPLICATION_KEY') - if application_key is None: - pytest.fail('B2_TEST_APPLICATION_KEY is not set.') - - print() - - try: - raw_api = B2RawHTTPApi(B2Http()) - raw_api_test_helper(raw_api, not dont_cleanup_old_buckets) - except Exception: - traceback.print_exc(file=sys.stdout) - pytest.fail('test_raw_api failed') - - -def authorize_raw_api(raw_api): - application_key_id = os.environ.get('B2_TEST_APPLICATION_KEY_ID') - if application_key_id is None: - print('B2_TEST_APPLICATION_KEY_ID is not set.', file=sys.stderr) - sys.exit(1) - - application_key = os.environ.get('B2_TEST_APPLICATION_KEY') - if application_key is None: - print('B2_TEST_APPLICATION_KEY is not set.', file=sys.stderr) - sys.exit(1) - - realm = os.environ.get('B2_TEST_ENVIRONMENT', 'production') - realm_url = REALM_URLS.get(realm, realm) - auth_dict = raw_api.authorize_account(realm_url, application_key_id, application_key) - return auth_dict - - -def raw_api_test_helper(raw_api, should_cleanup_old_buckets): - """ - Try each of the calls to the raw api. Raise an - exception if anything goes wrong. - - This uses a Backblaze account that is just for this test. - The account uses the free level of service, which should - be enough to run this test a reasonable number of times - each day. If somebody abuses the account for other things, - this test will break and we'll have to do something about - it. - """ - # b2_authorize_account - print('b2_authorize_account') - auth_dict = authorize_raw_api(raw_api) - missing_capabilities = set(ALL_CAPABILITIES) - set(['readBuckets'] - ) - set(auth_dict['allowed']['capabilities']) - assert not missing_capabilities, 'it appears that the raw_api integration test is being run with a non-full key. Missing capabilities: %s' % ( - missing_capabilities, - ) - account_id = auth_dict['accountId'] - account_auth_token = auth_dict['authorizationToken'] - api_url = auth_dict['apiUrl'] - download_url = auth_dict['downloadUrl'] +""" +Try each of the calls to the raw api. + +This uses a Backblaze account that is just for this test. +The account uses the free level of service, which should +be enough to run this test a reasonable number of times +each day. If somebody abuses the account for other things, +this test will break and we'll have to do something about +it. +""" + + +@pytest.fixture(scope='module', autouse=True) +def cleanup_buckets(raw_api, account_id, account_auth_token, api_url, dont_cleanup_old_buckets): + """ Remove all "stale" test buckets """ + + if dont_cleanup_old_buckets: + return + + bucket_list_dict = raw_api.list_buckets(api_url, account_auth_token, account_id) + + _cleanup_old_buckets(raw_api, api_url, account_auth_token, account_id, bucket_list_dict) - # b2_create_key - print('b2_create_key') + +def test_auth(auth_dict): + pass # `auth_dict` implicitly invokes testing + + +def test_keys(raw_api, api_url, account_auth_token, account_id): key_dict = raw_api.create_key( api_url, account_auth_token, @@ -113,65 +63,52 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): None, None, ) - - # b2_list_keys - print('b2_list_keys') raw_api.list_keys(api_url, account_auth_token, account_id, 10) - - # b2_delete_key - print('b2_delete_key') raw_api.delete_key(api_url, account_auth_token, key_dict['applicationKeyId']) - # b2_create_bucket, with a unique bucket name - # Include the account ID in the bucket name to be - # sure it doesn't collide with bucket names from - # other accounts. - print('b2_create_bucket') - bucket_name = 'test-raw-api-%s-%d-%d' % ( - account_id, int(time.time()), random.randint(1000, 9999) - ) - # very verbose http debug - #import http.client; http.client.HTTPConnection.debuglevel = 1 +def test_bucket_creation(bucket_dict): + pass # `bucket_dict` implicitly invokes testing - bucket_dict = raw_api.create_bucket( - api_url, - account_auth_token, - account_id, - bucket_name, - 'allPublic', - is_file_lock_enabled=True, - ) - bucket_id = bucket_dict['bucketId'] - first_bucket_revision = bucket_dict['revision'] - ################################# - print('b2 / replication') +class TestReplication: + @pytest.fixture(scope='class') + def source_key_id(self, raw_api, api_url, account_auth_token, account_id): + key_dict = raw_api.create_key( + api_url, + account_auth_token, + account_id, + ['listBuckets', 'listFiles', 'readFiles'], + 'testReplicationSourceKey', + None, + None, + None, + ) + key_id = key_dict['applicationKeyId'] + yield key_id + raw_api.delete_key( + api_url, + account_auth_token, + key_id, + ) - # 1) create source key (read permissions) - replication_source_key_dict = raw_api.create_key( + @pytest.fixture(scope='class') + def source_bucket_dict( + self, + raw_api, api_url, account_auth_token, account_id, - ['listBuckets', 'listFiles', 'readFiles'], - 'testReplicationSourceKey', - None, - None, - None, - ) - replication_source_key = replication_source_key_dict['applicationKeyId'] - - # 2) create source bucket with replication to destination - existing bucket - try: - # in order to test replication, we need to create a second bucket - replication_source_bucket_name = 'test-raw-api-%s-%d-%d' % ( - account_id, int(time.time()), random.randint(1000, 9999) - ) - replication_source_bucket_dict = raw_api.create_bucket( + source_key_id, + bucket_id, + bucket_name, + ): + bucket_name = f'test-raw-api-{account_id}-{time():.0f}-{randint(1000, 9999)}' + bucket = raw_api.create_bucket( api_url, account_auth_token, account_id, - replication_source_bucket_name, + bucket_name, 'allPublic', is_file_lock_enabled=True, replication=ReplicationConfiguration( @@ -183,12 +120,52 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): name='test-rule', ), ], - source_application_key_id=replication_source_key, + source_application_key_id=source_key_id, ), ), ) - assert 'replicationConfiguration' in replication_source_bucket_dict - assert replication_source_bucket_dict['replicationConfiguration'] == { + yield bucket + _clean_and_delete_bucket( + raw_api, + api_url, + account_auth_token, + account_id, + bucket['bucketId'], + ) + + @pytest.fixture(scope='class') + def destination_key_id(self, raw_api, api_url, account_auth_token, account_id): + key_dict = raw_api.create_key( + api_url, + account_auth_token, + account_id, + ['listBuckets', 'listFiles', 'writeFiles'], + 'testReplicationDestinationKey', + None, + None, + None, + ) + key_id = key_dict['applicationKeyId'] + yield key_id + raw_api.delete_key( + api_url, + account_auth_token, + key_id, + ) + + def test_replication( + self, + raw_api, + api_url, + account_auth_token, + account_id, + source_key_id, + source_bucket_dict, + bucket_id, + destination_key_id, + ): + assert 'replicationConfiguration' in source_bucket_dict + assert source_bucket_dict['replicationConfiguration'] == { 'isClientAuthorizedToRead': True, 'value': { @@ -205,51 +182,33 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): "replicationRuleName": "test-rule" }, ], - "sourceApplicationKeyId": replication_source_key, + "sourceApplicationKeyId": source_key_id, }, "asReplicationDestination": None, }, } - # 3) upload test file and check replication status + # upload test file and check replication status upload_url_dict = raw_api.get_upload_url( api_url, account_auth_token, - replication_source_bucket_dict['bucketId'], + source_bucket_dict['bucketId'], ) - file_contents = b'hello world' file_dict = raw_api.upload_file( upload_url_dict['uploadUrl'], upload_url_dict['authorizationToken'], 'test.txt', - len(file_contents), + len(TEST_FILE_CONTENTS), 'text/plain', - hex_sha1_of_stream(io.BytesIO(file_contents), len(file_contents)), + hex_sha1_of_stream(io.BytesIO(TEST_FILE_CONTENTS), len(TEST_FILE_CONTENTS)), {'color': 'blue'}, - io.BytesIO(file_contents), + io.BytesIO(TEST_FILE_CONTENTS), ) assert ReplicationStatus[file_dict['replicationStatus'].upper() ] == ReplicationStatus.PENDING - finally: - raw_api.delete_key(api_url, account_auth_token, replication_source_key) - - # 4) create destination key (write permissions) - replication_destination_key_dict = raw_api.create_key( - api_url, - account_auth_token, - account_id, - ['listBuckets', 'listFiles', 'writeFiles'], - 'testReplicationDestinationKey', - None, - None, - None, - ) - replication_destination_key = replication_destination_key_dict['applicationKeyId'] - - # 5) update destination bucket to receive updates - try: + # update destination bucket to receive updates bucket_dict = raw_api.update_bucket( api_url, account_auth_token, @@ -258,9 +217,7 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): 'allPublic', replication=ReplicationConfiguration( as_replication_destination=ReplicationDestinationConfiguration( - source_to_destination_key_mapping={ - replication_source_key: replication_destination_key, - }, + source_to_destination_key_mapping={source_key_id: destination_key_id}, ), ), ) @@ -270,59 +227,40 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): { 'asReplicationDestination': { - 'sourceToDestinationKeyMapping': - { - replication_source_key: replication_destination_key, - }, + 'sourceToDestinationKeyMapping': { + source_key_id: destination_key_id + }, }, 'asReplicationSource': None, }, } - finally: - raw_api.delete_key( + + bucket_dict = raw_api.update_bucket( api_url, account_auth_token, - replication_destination_key_dict['applicationKeyId'], + account_id, + bucket_id, + 'allPublic', + replication=ReplicationConfiguration(), ) + assert bucket_dict['replicationConfiguration'] == { + 'isClientAuthorizedToRead': True, + 'value': None, + } - # 6) cleanup: disable replication for destination and remove source - bucket_dict = raw_api.update_bucket( - api_url, - account_auth_token, - account_id, - bucket_id, - 'allPublic', - replication=ReplicationConfiguration(), - ) - assert bucket_dict['replicationConfiguration'] == { - 'isClientAuthorizedToRead': True, - 'value': None, - } - _clean_and_delete_bucket( - raw_api, - api_url, - account_auth_token, - account_id, - replication_source_bucket_dict['bucketId'], - ) - - ################# - print('b2_update_bucket') - sse_b2_aes = EncryptionSetting( - mode=EncryptionMode.SSE_B2, - algorithm=EncryptionAlgorithm.AES256, - ) - sse_none = EncryptionSetting(mode=EncryptionMode.NONE) +def test_update_bucket( + raw_api, api_url, account_auth_token, account_id, bucket_id, +): for encryption_setting, default_retention in [ ( - sse_none, + SSE_NONE, BucketRetentionSetting(mode=RetentionMode.GOVERNANCE, period=RetentionPeriod(days=1)) ), - (sse_b2_aes, None), - (sse_b2_aes, BucketRetentionSetting(RetentionMode.NONE)), + (SSE_B2_AES, None), + (SSE_B2_AES, BucketRetentionSetting(RetentionMode.NONE)), ]: - bucket_dict = raw_api.update_bucket( + raw_api.update_bucket( api_url, account_auth_token, account_id, @@ -332,182 +270,192 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): default_retention=default_retention, ) - # b2_list_buckets - print('b2_list_buckets') - bucket_list_dict = raw_api.list_buckets(api_url, account_auth_token, account_id) - #print(bucket_list_dict) - - # b2_get_upload_url - print('b2_get_upload_url') - upload_url_dict = raw_api.get_upload_url(api_url, account_auth_token, bucket_id) - upload_url = upload_url_dict['uploadUrl'] - upload_auth_token = upload_url_dict['authorizationToken'] - - # b2_upload_file - print('b2_upload_file') - file_name = 'test.txt' - file_contents = b'hello world' - file_sha1 = hex_sha1_of_stream(io.BytesIO(file_contents), len(file_contents)) - file_dict = raw_api.upload_file( - upload_url, - upload_auth_token, - file_name, - len(file_contents), - 'text/plain', - file_sha1, - {'color': 'blue'}, - io.BytesIO(file_contents), - server_side_encryption=sse_b2_aes, - ) - file_id = file_dict['fileId'] +def test_list_buckets(raw_api, api_url, account_auth_token, account_id): + raw_api.list_buckets(api_url, account_auth_token, account_id) + + +def test_upload_url(upload_url_dict): + pass + - # b2_list_file_versions - print('b2_list_file_versions') +def test_file_upload(file_dict): + pass + + +def test_list_file_versions(raw_api, api_url, account_auth_token, bucket_id, file_dict): list_versions_dict = raw_api.list_file_versions(api_url, account_auth_token, bucket_id) - assert [file_name] == [f_dict['fileName'] for f_dict in list_versions_dict['files']] + assert [TEST_FILE_NAME] == [f_dict['fileName'] for f_dict in list_versions_dict['files']] - # b2_download_file_by_id with auth - print('b2_download_file_by_id (auth)') - url = raw_api.get_download_url_by_id(download_url, file_id) + +def test_download_file_by_id_with_auth( + raw_api, download_url, file_dict, account_auth_token, +): + url = raw_api.get_download_url_by_id(download_url, file_dict['fileId']) with raw_api.download_file_from_url(account_auth_token, url) as response: - data = next(response.iter_content(chunk_size=len(file_contents))) - assert data == file_contents, data + data = next(response.iter_content(chunk_size=len(TEST_FILE_CONTENTS))) + assert data == TEST_FILE_CONTENTS, data + - # b2_download_file_by_id no auth - print('b2_download_file_by_id (no auth)') - url = raw_api.get_download_url_by_id(download_url, file_id) +def test_download_file_by_id_no_auth(raw_api, download_url, file_dict): + url = raw_api.get_download_url_by_id(download_url, file_dict['fileId']) with raw_api.download_file_from_url(None, url) as response: - data = next(response.iter_content(chunk_size=len(file_contents))) - assert data == file_contents, data + data = next(response.iter_content(chunk_size=len(TEST_FILE_CONTENTS))) + assert data == TEST_FILE_CONTENTS, data - # b2_download_file_by_name with auth - print('b2_download_file_by_name (auth)') - url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) + +def test_download_file_by_name_with_auth( + raw_api, download_url, bucket_name, account_auth_token, file_dict +): + url = raw_api.get_download_url_by_name(download_url, bucket_name, TEST_FILE_NAME) with raw_api.download_file_from_url(account_auth_token, url) as response: - data = next(response.iter_content(chunk_size=len(file_contents))) - assert data == file_contents, data + data = next(response.iter_content(chunk_size=len(TEST_FILE_CONTENTS))) + assert data == TEST_FILE_CONTENTS, data + - # b2_download_file_by_name no auth - print('b2_download_file_by_name (no auth)') - url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) +def test_download_file_by_name_no_auth( + raw_api, download_url, bucket_name, file_dict +): + url = raw_api.get_download_url_by_name(download_url, bucket_name, TEST_FILE_NAME) with raw_api.download_file_from_url(None, url) as response: - data = next(response.iter_content(chunk_size=len(file_contents))) - assert data == file_contents, data + data = next(response.iter_content(chunk_size=len(TEST_FILE_CONTENTS))) + assert data == TEST_FILE_CONTENTS, data - # b2_get_download_authorization - print('b2_get_download_authorization') - download_auth = raw_api.get_download_authorization( - api_url, account_auth_token, bucket_id, file_name[:-2], 12345 - ) - download_auth_token = download_auth['authorizationToken'] - # b2_download_file_by_name with download auth - print('b2_download_file_by_name (download auth)') - url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) +def test_get_download_authorization(download_auth_dict): + pass + + +def test_download_file_by_name_download_auth( + download_auth_dict, raw_api, download_url, bucket_name, file_dict +): + download_auth_token = download_auth_dict['authorizationToken'] + url = raw_api.get_download_url_by_name(download_url, bucket_name, TEST_FILE_NAME) with raw_api.download_file_from_url(download_auth_token, url) as response: - data = next(response.iter_content(chunk_size=len(file_contents))) - assert data == file_contents, data + data = next(response.iter_content(chunk_size=len(TEST_FILE_CONTENTS))) + assert data == TEST_FILE_CONTENTS, data + - # b2_list_file_names - print('b2_list_file_names') +def test_list_file_names(raw_api, api_url, account_auth_token, bucket_id, file_dict): list_names_dict = raw_api.list_file_names(api_url, account_auth_token, bucket_id) - assert [file_name] == [f_dict['fileName'] for f_dict in list_names_dict['files']] + assert [TEST_FILE_NAME] == [f_dict['fileName'] for f_dict in list_names_dict['files']] + - # b2_list_file_names (start, count) - print('b2_list_file_names (start, count)') +def test_list_file_names_start_count(raw_api, api_url, account_auth_token, bucket_id, file_dict): list_names_dict = raw_api.list_file_names( - api_url, account_auth_token, bucket_id, start_file_name=file_name, max_file_count=5 + api_url, account_auth_token, bucket_id, start_file_name=TEST_FILE_NAME, max_file_count=5 ) - assert [file_name] == [f_dict['fileName'] for f_dict in list_names_dict['files']] + assert [TEST_FILE_NAME] == [f_dict['fileName'] for f_dict in list_names_dict['files']] - # b2_copy_file - print('b2_copy_file') + +def test_copy_file(raw_api, api_url, account_auth_token, file_id): copy_file_name = 'test_copy.txt' raw_api.copy_file(api_url, account_auth_token, file_id, copy_file_name) - # b2_get_file_info_by_id - print('b2_get_file_info_by_id') + +def test_get_file_info_by_id(raw_api, api_url, account_auth_token, file_id): file_info_dict = raw_api.get_file_info_by_id(api_url, account_auth_token, file_id) - assert file_info_dict['fileName'] == file_name + assert file_info_dict['fileName'] == TEST_FILE_NAME - # b2_get_file_info_by_name - print('b2_get_file_info_by_name (no auth)') - info_headers = raw_api.get_file_info_by_name(download_url, None, bucket_name, file_name) + +def test_get_file_info_by_name_no_auth( + raw_api, api_url, account_id, bucket_name, file_id, download_url +): + info_headers = raw_api.get_file_info_by_name(download_url, None, bucket_name, TEST_FILE_NAME) assert info_headers['x-bz-file-id'] == file_id - # b2_get_file_info_by_name - print('b2_get_file_info_by_name (auth)') + +def test_get_file_info_by_name_with_auth( + raw_api, download_url, account_auth_token, bucket_name, file_id +): info_headers = raw_api.get_file_info_by_name( - download_url, account_auth_token, bucket_name, file_name + download_url, account_auth_token, bucket_name, TEST_FILE_NAME ) assert info_headers['x-bz-file-id'] == file_id - # b2_get_file_info_by_name - print('b2_get_file_info_by_name (download auth)') + +def test_get_file_info_by_name_download_auth( + raw_api, download_url, download_auth_token, bucket_name, file_id +): info_headers = raw_api.get_file_info_by_name( - download_url, download_auth_token, bucket_name, file_name + download_url, download_auth_token, bucket_name, TEST_FILE_NAME ) assert info_headers['x-bz-file-id'] == file_id - # b2_hide_file - print('b2_hide_file') - raw_api.hide_file(api_url, account_auth_token, bucket_id, file_name) - # b2_start_large_file - print('b2_start_large_file') - file_info = {'color': 'red'} - large_info = raw_api.start_large_file( - api_url, - account_auth_token, - bucket_id, - file_name, - 'text/plain', - file_info, - server_side_encryption=sse_b2_aes, - ) - large_file_id = large_info['fileId'] - - # b2_get_upload_part_url - print('b2_get_upload_part_url') - upload_part_dict = raw_api.get_upload_part_url(api_url, account_auth_token, large_file_id) - upload_part_url = upload_part_dict['uploadUrl'] - upload_path_auth = upload_part_dict['authorizationToken'] - - # b2_upload_part - print('b2_upload_part') - part_contents = b'hello part' - part_sha1 = hex_sha1_of_stream(io.BytesIO(part_contents), len(part_contents)) - raw_api.upload_part( - upload_part_url, upload_path_auth, 1, len(part_contents), part_sha1, - io.BytesIO(part_contents) - ) +def test_hide_file(raw_api, api_url, account_auth_token, bucket_id, file_dict): + raw_api.hide_file(api_url, account_auth_token, bucket_id, TEST_FILE_NAME) + + +class TestLargeFile: + @pytest.fixture(scope='class') + def file_info(self) -> dict: + return {'color': 'red'} + + @pytest.fixture(scope='class') + def large_file_id( + self, raw_api, api_url, account_auth_token, bucket_id, file_info, file_dict + ) -> str: + large_info = raw_api.start_large_file( + api_url, + account_auth_token, + bucket_id, + TEST_FILE_NAME, + 'text/plain', + file_info, + server_side_encryption=SSE_B2_AES, + ) + return large_info['fileId'] + + @pytest.fixture(scope='class') + def part_contents(self) -> bytes: + return b'hello part' + + @pytest.fixture(scope='class') + def part_sha1(self, part_contents) -> bytes: + return hex_sha1_of_stream(io.BytesIO(part_contents), len(part_contents)) + + def test_upload_part( + self, raw_api, api_url, account_auth_token, large_file_id, part_contents, + part_sha1 + ): + upload_part_dict = raw_api.get_upload_part_url(api_url, account_auth_token, large_file_id) + upload_part_url = upload_part_dict['uploadUrl'] + upload_path_auth = upload_part_dict['authorizationToken'] + + raw_api.upload_part( + upload_part_url, upload_path_auth, 1, len(part_contents), part_sha1, + io.BytesIO(part_contents) + ) + + def test_copy_part(self, raw_api, api_url, account_auth_token, file_id, large_file_id): + raw_api.copy_part(api_url, account_auth_token, file_id, large_file_id, 2, (0, 5)) + + def test_list_parts(self, raw_api, api_url, account_auth_token, large_file_id): + parts_response = raw_api.list_parts(api_url, account_auth_token, large_file_id, 1, 100) + assert [1, 2] == [part['partNumber'] for part in parts_response['parts']] + + def test_list_unfinished_large_files( + self, raw_api, api_url, account_auth_token, bucket_id, file_info, file_dict + ): + unfinished_list = raw_api.list_unfinished_large_files( + api_url, account_auth_token, bucket_id + ) + assert [TEST_FILE_NAME] == [f_dict['fileName'] for f_dict in unfinished_list['files']] + assert file_info == unfinished_list['files'][0]['fileInfo'] + + def test_finish_large_file( + self, raw_api, api_url, account_auth_token, large_file_id, part_sha1 + ): + with pytest.raises(Exception, match='large files must have at least 2 parts'): + raw_api.finish_large_file(api_url, account_auth_token, large_file_id, [part_sha1]) - # b2_copy_part - print('b2_copy_part') - raw_api.copy_part(api_url, account_auth_token, file_id, large_file_id, 2, (0, 5)) - - # b2_list_parts - print('b2_list_parts') - parts_response = raw_api.list_parts(api_url, account_auth_token, large_file_id, 1, 100) - assert [1, 2] == [part['partNumber'] for part in parts_response['parts']] - - # b2_list_unfinished_large_files - unfinished_list = raw_api.list_unfinished_large_files(api_url, account_auth_token, bucket_id) - assert [file_name] == [f_dict['fileName'] for f_dict in unfinished_list['files']] - assert file_info == unfinished_list['files'][0]['fileInfo'] - - # b2_finish_large_file - print('b2_finish_large_file') - try: - raw_api.finish_large_file(api_url, account_auth_token, large_file_id, [part_sha1]) - raise Exception('finish should have failed') - except Exception as e: - assert 'large files must have at least 2 parts' in str(e) # TODO: make another attempt to finish but this time successfully - # b2_update_bucket - print('b2_update_bucket') + +def test_update_bucket_revision( + raw_api, api_url, account_auth_token, bucket_dict, bucket_id, account_id +): updated_bucket = raw_api.update_bucket( api_url, account_auth_token, @@ -519,78 +467,4 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): mode=RetentionMode.GOVERNANCE, period=RetentionPeriod(days=1) ), ) - assert first_bucket_revision < updated_bucket['revision'] - - # Clean up this test. - _clean_and_delete_bucket(raw_api, api_url, account_auth_token, account_id, bucket_id) - - if should_cleanup_old_buckets: - # Clean up from old tests. Empty and delete any buckets more than an hour old. - _cleanup_old_buckets(raw_api, auth_dict, bucket_list_dict) - - -def cleanup_old_buckets(): - raw_api = B2RawHTTPApi(B2Http()) - auth_dict = authorize_raw_api(raw_api) - bucket_list_dict = raw_api.list_buckets( - auth_dict['apiUrl'], auth_dict['authorizationToken'], auth_dict['accountId'] - ) - _cleanup_old_buckets(raw_api, auth_dict, bucket_list_dict) - - -def _cleanup_old_buckets(raw_api, auth_dict, bucket_list_dict): - for bucket_dict in bucket_list_dict['buckets']: - bucket_id = bucket_dict['bucketId'] - bucket_name = bucket_dict['bucketName'] - if _should_delete_bucket(bucket_name): - print('cleaning up old bucket: ' + bucket_name) - _clean_and_delete_bucket( - raw_api, - auth_dict['apiUrl'], - auth_dict['authorizationToken'], - auth_dict['accountId'], - bucket_id, - ) - - -def _clean_and_delete_bucket(raw_api, api_url, account_auth_token, account_id, bucket_id): - # Delete the files. This test never creates more than a few files, - # so one call to list_file_versions should get them all. - versions_dict = raw_api.list_file_versions(api_url, account_auth_token, bucket_id) - for version_dict in versions_dict['files']: - file_id = version_dict['fileId'] - file_name = version_dict['fileName'] - action = version_dict['action'] - if action in ['hide', 'upload']: - print('b2_delete_file', file_name, action) - if action == 'upload' and version_dict[ - 'fileRetention'] and version_dict['fileRetention']['value']['mode'] is not None: - raw_api.update_file_retention( - api_url, - account_auth_token, - file_id, - file_name, - NO_RETENTION_FILE_SETTING, - bypass_governance=True - ) - raw_api.delete_file_version(api_url, account_auth_token, file_id, file_name) - else: - print('b2_cancel_large_file', file_name) - raw_api.cancel_large_file(api_url, account_auth_token, file_id) - - # Delete the bucket - print('b2_delete_bucket', bucket_id) - raw_api.delete_bucket(api_url, account_auth_token, account_id, bucket_id) - - -def _should_delete_bucket(bucket_name): - # Bucket names for this test look like: c7b22d0b0ad7-1460060364-5670 - # Other buckets should not be deleted. - match = re.match(r'^test-raw-api-[a-f0-9]+-([0-9]+)-([0-9]+)', bucket_name) - if match is None: - return False - - # Is it more than an hour old? - bucket_time = int(match.group(1)) - now = time.time() - return bucket_time + 3600 <= now + assert bucket_dict['revision'] < updated_bucket['revision']