Skip to content

Commit

Permalink
Improve MongoDB implementation and minor linting fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
GJFR committed Oct 22, 2024
1 parent 0a61592 commit 44b7f73
Show file tree
Hide file tree
Showing 24 changed files with 133 additions and 666 deletions.
16 changes: 6 additions & 10 deletions bci/database/mongo/binary_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ def fetch_binary_files(binary_executable_path: str, state: State) -> bool:
if MongoDB.binary_cache_limit <= 0:
return False

db = MongoDB.get_instance().db
files_collection = db['fs.files']
files_collection = MongoDB().get_collection('fs.files')

query = {
'file_type': 'binary',
Expand All @@ -58,7 +57,7 @@ def write_from_db(file_path: str, grid_file_id: str) -> None:
os.chmod(file_path, 0o744)

grid_cursor = files_collection.find(query)
fs = gridfs.GridFS(db)
fs = MongoDB().gridfs
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
for grid_doc in grid_cursor:
Expand Down Expand Up @@ -89,8 +88,7 @@ def store_binary_files(binary_executable_path: str, state: State) -> bool:
return False
BinaryCache.__remove_least_used_revision_binary_files()

db = MongoDB.get_instance().db
fs = gridfs.GridFS(db)
fs = MongoDB().gridfs
binary_folder_path = os.path.dirname(binary_executable_path)
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
Expand Down Expand Up @@ -122,8 +120,7 @@ def __count_cached_binaries(state_type: Optional[str] = None) -> int:
:param state_type: The type of the state.
:return: The number of cached binaries.
"""
db = MongoDB.get_instance()
files_collection = db.get_collection('fs.files')
files_collection = MongoDB().get_collection('fs.files')
if state_type:
query = {'file_type': 'binary', 'state_type': state_type}
else:
Expand All @@ -135,9 +132,8 @@ def __remove_least_used_revision_binary_files() -> None:
"""
Removes the least used revision binary files from the database.
"""
db = MongoDB.get_instance().db
fs = gridfs.GridFS(db)
files_collection = db.get_collection('fs.files')
fs = MongoDB().gridfs
files_collection = MongoDB().get_collection('fs.files')

grid_cursor = files_collection.find(
{'file_type': 'binary', 'state_type': 'revision'},
Expand Down
4 changes: 3 additions & 1 deletion bci/database/mongo/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def stop():


def __create_new_container(user: str, pw: str, db_name, db_host):
if (host_pwd := os.getenv('HOST_PWD', None)) is None:
raise AttributeError("Could not create container because of missing HOST_PWD environment variable")
docker_client = docker.from_env()
docker_client.containers.run(
'mongo:5.0.17',
Expand All @@ -56,7 +58,7 @@ def __create_new_container(user: str, pw: str, db_name, db_host):
detach=True,
remove=True,
labels=['bh_db'],
volumes=[os.path.join(os.getenv('HOST_PWD'), 'database/data') + ':/data/db'],
volumes=[os.path.join(host_pwd, 'database/data') + ':/data/db'],
ports={'27017/tcp': 27017},
environment={'MONGO_INITDB_ROOT_USERNAME': DEFAULT_ROOT_USER, 'MONGO_INITDB_ROOT_PASSWORD': DEFAULT_ROOT_PW},
)
Expand Down
163 changes: 86 additions & 77 deletions bci/database/mongo/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import logging
from datetime import datetime, timezone
from typing import Optional

from flatten_dict import flatten
from gridfs import GridFS
from pymongo import ASCENDING, MongoClient
from pymongo.collection import Collection
from pymongo.database import Database
from pymongo.errors import ServerSelectionTimeoutError

from bci.evaluations.logic import (
Expand All @@ -22,11 +25,19 @@

logger = logging.getLogger(__name__)

# pylint: disable=global-statement
CLIENT = None
DB = None

def singleton(class_):
instances = {}

def get_instance(*args, **kwargs):
if class_ not in instances:
instances[class_] = class_(*args, **kwargs)
return instances[class_]

return get_instance


@singleton
class MongoDB:
instance = None
binary_cache_limit = 0
Expand All @@ -37,21 +48,13 @@ class MongoDB:
}

def __init__(self):
self.client: MongoClient = CLIENT
self.db = DB

@classmethod
def get_instance(cls) -> MongoDB:
if cls.instance is None:
cls.instance = cls()
return cls.instance

@classmethod
def connect(cls, db_params: DatabaseParameters):
global CLIENT, DB
self.client: Optional[MongoClient] = None
self._db: Optional[Database] = None

def connect(self, db_params: DatabaseParameters) -> None:
assert db_params is not None

CLIENT = MongoClient(
self.client = MongoClient(
host=db_params.host,
port=27017,
username=db_params.username,
Expand All @@ -60,54 +63,68 @@ def connect(cls, db_params: DatabaseParameters):
retryWrites=False,
serverSelectionTimeoutMS=10000,
)
cls.binary_cache_limit = db_params.binary_cache_limit
logger.info(f'Binary cache limit set to {cls.binary_cache_limit}')
logger.info(f'Binary cache limit set to {db_params.binary_cache_limit}')
# Force connection to check whether MongoDB server is reachable
try:
CLIENT.server_info()
DB = CLIENT[db_params.database_name]
self.client.server_info()
self._db = self.client[db_params.database_name]
logger.info('Connected to database!')
except ServerSelectionTimeoutError as e:
logger.info('A timeout occurred while attempting to establish connection.', exc_info=True)
raise ServerException from e

# Initialize collections
MongoDB.__initialize_collections()
self.__initialize_collections()

def disconnect(self):
if self.client:
self.client.close()
self.client = None
self._db = None

@staticmethod
def disconnect():
global CLIENT, DB
CLIENT.close()
CLIENT = None
DB = None
def __initialize_collections(self):
if self._db is None:
raise

@staticmethod
def __initialize_collections():
for collection_name in ['chromium_binary_availability']:
if collection_name not in DB.list_collection_names():
DB.create_collection(collection_name)
if collection_name not in self._db.list_collection_names():
self._db.create_collection(collection_name)

# Binary cache
if 'fs.files' not in DB.list_collection_names():
if 'fs.files' not in self._db.list_collection_names():
# Create the 'fs.files' collection with indexes
DB.create_collection('fs.files')
DB['fs.files'].create_index(['state_type', 'browser_name', 'state_index', 'relative_file_path'], unique=True)
if 'fs.chunks' not in DB.list_collection_names():
self._db.create_collection('fs.files')
self._db['fs.files'].create_index(
['state_type', 'browser_name', 'state_index', 'relative_file_path'], unique=True
)
if 'fs.chunks' not in self._db.list_collection_names():
# Create the 'fs.chunks' collection with zstd compression
DB.create_collection('fs.chunks', storageEngine={'wiredTiger': {'configString': 'block_compressor=zstd'}})
DB['fs.chunks'].create_index(['files_id', 'n'], unique=True)
self._db.create_collection(
'fs.chunks', storageEngine={'wiredTiger': {'configString': 'block_compressor=zstd'}}
)
self._db['fs.chunks'].create_index(['files_id', 'n'], unique=True)

# Revision cache
if 'firefox_binary_availability' not in DB.list_collection_names():
DB.create_collection('firefox_binary_availability')
DB['firefox_binary_availability'].create_index([('revision_number', ASCENDING)])
DB['firefox_binary_availability'].create_index(['node'])
if 'firefox_binary_availability' not in self._db.list_collection_names():
self._db.create_collection('firefox_binary_availability')
self._db['firefox_binary_availability'].create_index([('revision_number', ASCENDING)])
self._db['firefox_binary_availability'].create_index(['node'])

def get_collection(self, name: str, create_if_not_found: bool = False) -> Collection:
if self._db is None:
raise ServerException('Database server does not have a database')
if name not in self._db.list_collection_names():
if create_if_not_found:
return self._db.create_collection(name)
else:
raise ServerException(f"Could not find collection '{name}'")
return self._db[name]

def get_collection(self, name: str):
if name not in DB.list_collection_names():
logger.info(f"Collection '{name}' does not exist, creating it...")
DB.create_collection(name)
return DB[name]
@property
def gridfs(self) -> GridFS:
if self._db is None:
raise ServerException('Database server does not have a database')
return GridFS(self._db)

def store_result(self, result: TestResult):
browser_config = result.params.browser_configuration
Expand Down Expand Up @@ -140,7 +157,7 @@ def store_result(self, result: TestResult):

collection.insert_one(document)

def get_result(self, params: TestParameters) -> TestResult:
def get_result(self, params: TestParameters) -> Optional[TestResult]:
collection = self.__get_data_collection(params)
query = self.__to_query(params)
document = collection.find_one(query)
Expand All @@ -150,6 +167,7 @@ def get_result(self, params: TestParameters) -> TestResult:
)
else:
logger.error(f'Could not find document for query {query}')
return None

def has_result(self, params: TestParameters) -> bool:
collection = self.__get_data_collection(params)
Expand Down Expand Up @@ -230,30 +248,23 @@ def __to_query(self, params: TestParameters) -> dict:

def __get_data_collection(self, test_params: TestParameters) -> Collection:
collection_name = test_params.database_collection
if collection_name not in self.db.list_collection_names():
return self.db.create_collection(collection_name)
return self.db[collection_name]
return self.get_collection(collection_name, create_if_not_found=True)

@staticmethod
def get_binary_availability_collection(browser_name: str):
collection_name = MongoDB.binary_availability_collection_names[browser_name]
if collection_name not in DB.list_collection_names():
raise AttributeError("Collection '%s' not found in database" % collection_name)
return DB[collection_name]
def get_binary_availability_collection(self, browser_name: str):
collection_name = self.binary_availability_collection_names[browser_name]
return self.get_collection(collection_name, create_if_not_found=True)

# Caching of online binary availability

@staticmethod
def has_binary_available_online(browser: str, state: State):
collection = MongoDB.get_binary_availability_collection(browser)
def has_binary_available_online(self, browser: str, state: State):
collection = self.get_binary_availability_collection(browser)
document = collection.find_one({'state': state.to_dict()})
if document is None:
return None
return document['binary_online']

@staticmethod
def get_stored_binary_availability(browser):
collection = MongoDB.get_binary_availability_collection(browser)
def get_stored_binary_availability(self, browser):
collection = MongoDB().get_binary_availability_collection(browser)
result = collection.find(
{'binary_online': True},
{
Expand All @@ -265,9 +276,8 @@ def get_stored_binary_availability(browser):
result.sort('build_id', -1)
return result

@staticmethod
def get_complete_state_dict_from_binary_availability_cache(state: State) -> dict:
collection = MongoDB.get_binary_availability_collection(state.browser_name)
def get_complete_state_dict_from_binary_availability_cache(self, state: State) -> Optional[dict]:
collection = MongoDB().get_binary_availability_collection(state.browser_name)
# We have to flatten the state dictionary to ignore missing attributes.
state_dict = {'state': state.to_dict()}
query = flatten(state_dict, reducer='dot')
Expand All @@ -276,9 +286,10 @@ def get_complete_state_dict_from_binary_availability_cache(state: State) -> dict
return None
return document['state']

@staticmethod
def store_binary_availability_online_cache(browser: str, state: State, binary_online: bool, url: str = None):
collection = MongoDB.get_binary_availability_collection(browser)
def store_binary_availability_online_cache(
self, browser: str, state: State, binary_online: bool, url: Optional[str] = None
):
collection = MongoDB().get_binary_availability_collection(browser)
collection.update_one(
{'state': state.to_dict()},
{
Expand All @@ -292,9 +303,8 @@ def store_binary_availability_online_cache(browser: str, state: State, binary_on
upsert=True,
)

@staticmethod
def get_build_id_firefox(state: State):
collection = MongoDB.get_binary_availability_collection('firefox')
def get_build_id_firefox(self, state: State):
collection = MongoDB().get_binary_availability_collection('firefox')

result = collection.find_one({'state': state.to_dict()}, {'_id': False, 'build_id': 1})
# Result can only be None if the binary associated with the state_id is artisanal:
Expand All @@ -309,11 +319,11 @@ def get_documents_for_plotting(self, params: PlotParameters, releases: bool = Fa
'mech_group': params.mech_group,
'browser_config': params.browser_config,
'state.type': 'version' if releases else 'revision',
'extensions': {'$size': len(params.extensions) if params.extensions else 0},
'cli_options': {'$size': len(params.cli_options) if params.cli_options else 0}
}
query['extensions'] = {'$size': len(params.extensions) if params.extensions else 0}
if params.extensions:
query['extensions']['$all'] = params.extensions
query['cli_options'] = {'$size': len(params.cli_options) if params.cli_options else 0}
if params.cli_options:
query['cli_options']['$all'] = params.cli_options
if params.revision_number_range:
Expand All @@ -336,10 +346,9 @@ def get_documents_for_plotting(self, params: PlotParameters, releases: bool = Fa
)
return list(docs)

@staticmethod
def get_info() -> dict:
if CLIENT and CLIENT.address:
return {'type': 'mongo', 'host': CLIENT.address[0], 'connected': True}
def get_info(self) -> dict:
if self.client and self.client.address:
return {'type': 'mongo', 'host': self.client.address[0], 'connected': True}
else:
return {'type': 'mongo', 'host': None, 'connected': False}

Expand Down
Loading

0 comments on commit 44b7f73

Please sign in to comment.