From 057364a7930ea99b331913902d2f9a868717628e Mon Sep 17 00:00:00 2001 From: Abram Date: Wed, 17 Jul 2024 00:18:17 +0100 Subject: [PATCH 1/7] chore (backend): set async engine echo to True to view SQL queries and execution time in backend console --- agenta-backend/agenta_backend/models/db_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agenta-backend/agenta_backend/models/db_engine.py b/agenta-backend/agenta_backend/models/db_engine.py index f9379a969..e2b1c858d 100644 --- a/agenta-backend/agenta_backend/models/db_engine.py +++ b/agenta-backend/agenta_backend/models/db_engine.py @@ -97,7 +97,7 @@ def __init__(self) -> None: self.mode = os.environ.get("DATABASE_MODE", "v2") self.postgres_uri = os.environ.get("POSTGRES_URI") self.mongo_uri = os.environ.get("MONGODB_URI") - self.engine = create_async_engine(url=self.postgres_uri) # type: ignore + self.engine = create_async_engine(url=self.postgres_uri, echo=True) # type: ignore self.async_session_maker = async_sessionmaker( bind=self.engine, class_=AsyncSession, expire_on_commit=False ) From 0620577cc7e29830d3e29016bca5cbeddce86f4a Mon Sep 17 00:00:00 2001 From: Abram Date: Wed, 17 Jul 2024 00:19:48 +0100 Subject: [PATCH 2/7] refactor (backend -> celery): remove redundant re-fetching of evaluation and evaluator config from DB in evaluate task function --- .../agenta_backend/tasks/evaluations.py | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/agenta-backend/agenta_backend/tasks/evaluations.py b/agenta-backend/agenta_backend/tasks/evaluations.py index b3feade14..1a1e48032 100644 --- a/agenta-backend/agenta_backend/tasks/evaluations.py +++ b/agenta-backend/agenta_backend/tasks/evaluations.py @@ -113,9 +113,6 @@ def evaluate( ), f"App variant with id {variant_id} not found!" app_variant_parameters = app_variant_db.config_parameters testset_db = loop.run_until_complete(fetch_testset_by_id(testset_id)) - new_evaluation_db = loop.run_until_complete( - fetch_evaluation_by_id(evaluation_id) - ) evaluator_config_dbs = [] for evaluator_config_id in evaluators_config_ids: evaluator_config = loop.run_until_complete( @@ -195,7 +192,7 @@ def evaluate( loop.run_until_complete( create_new_evaluation_scenario( user_id=str(app.user_id), - evaluation=new_evaluation_db, + evaluation_id=evaluation_id, variant_id=variant_id, inputs=inputs, outputs=[ @@ -274,7 +271,7 @@ def evaluate( loop.run_until_complete( create_new_evaluation_scenario( user_id=str(app.user_id), - evaluation=new_evaluation_db, + evaluation_id=evaluation_id, variant_id=variant_id, inputs=inputs, outputs=[ @@ -341,15 +338,11 @@ def evaluate( ) loop.run_until_complete( - update_evaluation_with_aggregated_results( - str(new_evaluation_db.id), aggregated_results - ) + update_evaluation_with_aggregated_results(evaluation_id, aggregated_results) ) failed_evaluation_scenarios = loop.run_until_complete( - check_if_evaluation_contains_failed_evaluation_scenarios( - str(new_evaluation_db.id) - ) + check_if_evaluation_contains_failed_evaluation_scenarios(evaluation_id) ) evaluation_status = Result( @@ -365,7 +358,7 @@ def evaluate( loop.run_until_complete( update_evaluation( - evaluation_id=str(new_evaluation_db.id), + evaluation_id=evaluation_id, updates={"status": evaluation_status.model_dump()}, ) ) @@ -384,7 +377,7 @@ def evaluate( message="Evaluation Aggregation Failed", stacktrace=str(traceback.format_exc()), ), - ) + ).model_dump() }, ) ) @@ -441,9 +434,8 @@ async def aggregate_evaluator_results( type="error", value=None, error=Error(message="Aggregation failed") ) - evaluator_config = await fetch_evaluator_config(config_id) aggregated_result = AggregatedResult( - evaluator_config=str(evaluator_config.id), # type: ignore + evaluator_config=config_id, result=result, ) aggregated_results.append(aggregated_result) From fee9d9864c8d12122a53fb0a8e2fee95ceceb639 Mon Sep 17 00:00:00 2001 From: Abram Date: Wed, 17 Jul 2024 00:21:51 +0100 Subject: [PATCH 3/7] refactor (backend): improve SQL queries in SQLAlchemy by loading necessary fields and join-loading required table(s) to eliminate N+1 queries --- .../agenta_backend/models/converters.py | 59 ++++-------- .../routers/evaluation_router.py | 2 +- .../agenta_backend/services/db_manager.py | 92 +++++++------------ 3 files changed, 55 insertions(+), 98 deletions(-) diff --git a/agenta-backend/agenta_backend/models/converters.py b/agenta-backend/agenta_backend/models/converters.py index 72a0d174d..ff9acd8d2 100644 --- a/agenta-backend/agenta_backend/models/converters.py +++ b/agenta-backend/agenta_backend/models/converters.py @@ -119,16 +119,13 @@ async def human_evaluation_db_to_simple_evaluation_output( async def evaluation_db_to_pydantic( evaluation_db: EvaluationDB, ) -> Evaluation: - variant = await db_manager.get_app_variant_instance_by_id( - str(evaluation_db.variant_id) + variant_name = ( + evaluation_db.variant.variant_name + if evaluation_db.variant.variant_name + else str(evaluation_db.variant_id) ) - variant_name = variant.variant_name if variant else str(evaluation_db.variant_id) - variant_revision = await db_manager.get_app_variant_revision_by_id( - str(evaluation_db.variant_revision_id) - ) - revision = str(variant_revision.revision) - aggregated_results = await aggregated_result_of_evaluation_to_pydantic( - str(evaluation_db.id) + aggregated_results = aggregated_result_of_evaluation_to_pydantic( + evaluation_db.aggregated_results ) return Evaluation( @@ -139,7 +136,7 @@ async def evaluation_db_to_pydantic( status=evaluation_db.status, variant_ids=[str(evaluation_db.variant_id)], variant_revision_ids=[str(evaluation_db.variant_revision_id)], - revisions=[revision], + revisions=[str(evaluation_db.variant_revision.revision)], variant_names=[variant_name], testset_id=str(evaluation_db.testset_id), testset_name=evaluation_db.testset.name, @@ -213,12 +210,11 @@ def human_evaluation_scenario_db_to_pydantic( ) -async def aggregated_result_of_evaluation_to_pydantic(evaluation_id: str) -> List[dict]: +def aggregated_result_of_evaluation_to_pydantic( + evaluation_aggregated_results: List, +) -> List[dict]: transformed_results = [] - aggregated_results = await db_manager.fetch_eval_aggregated_results( - evaluation_id=evaluation_id - ) - for aggregated_result in aggregated_results: + for aggregated_result in evaluation_aggregated_results: evaluator_config_dict = ( { "id": str(aggregated_result.evaluator_config.id), @@ -242,27 +238,16 @@ async def aggregated_result_of_evaluation_to_pydantic(evaluation_id: str) -> Lis return transformed_results -async def evaluation_scenarios_results_to_pydantic( - evaluation_scenario_id: str, -) -> List[dict]: - scenario_results = await db_manager.fetch_evaluation_scenario_results( - evaluation_scenario_id - ) - return [ +async def evaluation_scenario_db_to_pydantic( + evaluation_scenario_db: EvaluationScenarioDB, evaluation_id: str +) -> EvaluationScenario: + scenario_results = [ { "evaluator_config": str(scenario_result.evaluator_config_id), "result": scenario_result.result, } - for scenario_result in scenario_results + for scenario_result in evaluation_scenario_db.results ] - - -async def evaluation_scenario_db_to_pydantic( - evaluation_scenario_db: EvaluationScenarioDB, evaluation_id: str -) -> EvaluationScenario: - scenario_results = await evaluation_scenarios_results_to_pydantic( - str(evaluation_scenario_db.id) - ) return EvaluationScenario( id=str(evaluation_scenario_db.id), evaluation_id=evaluation_id, @@ -308,17 +293,11 @@ async def app_variant_db_to_output(app_variant_db: AppVariantDB) -> AppVariantRe if isinstance(app_variant_db.base_id, uuid.UUID) and isinstance( app_variant_db.base.deployment_id, uuid.UUID ): - deployment = await db_manager.get_deployment_by_id( - str(app_variant_db.base.deployment_id) - ) - uri = deployment.uri + uri = app_variant_db.base.deployment.uri else: - deployment = None uri = None - logger.info( - f"uri: {uri} deployment: {str(app_variant_db.base.deployment_id)} {deployment}" - ) + logger.info(f"uri: {uri} deployment: {str(app_variant_db.base.deployment_id)}") variant_response = AppVariantResponse( app_id=str(app_variant_db.app_id), app_name=str(app_variant_db.app.app_name), @@ -329,7 +308,7 @@ async def app_variant_db_to_output(app_variant_db: AppVariantDB) -> AppVariantRe base_name=app_variant_db.base_name, # type: ignore base_id=str(app_variant_db.base_id), config_name=app_variant_db.config_name, # type: ignore - uri=uri, + uri=uri, # type: ignore revision=app_variant_db.revision, # type: ignore ) diff --git a/agenta-backend/agenta_backend/routers/evaluation_router.py b/agenta-backend/agenta_backend/routers/evaluation_router.py index 3ebe17177..9aab90d03 100644 --- a/agenta-backend/agenta_backend/routers/evaluation_router.py +++ b/agenta-backend/agenta_backend/routers/evaluation_router.py @@ -221,7 +221,7 @@ async def fetch_evaluation_results(evaluation_id: str, request: Request): ) results = await converters.aggregated_result_of_evaluation_to_pydantic( - str(evaluation.id) + evaluation.aggregated_results # type: ignore ) return {"results": results, "evaluation_id": evaluation_id} except Exception as exc: diff --git a/agenta-backend/agenta_backend/services/db_manager.py b/agenta-backend/agenta_backend/services/db_manager.py index 4b2e642df..691fccbd7 100644 --- a/agenta-backend/agenta_backend/services/db_manager.py +++ b/agenta-backend/agenta_backend/services/db_manager.py @@ -1123,7 +1123,10 @@ async def list_app_variants(app_id: str): async with db_engine.get_session() as session: result = await session.execute( select(AppVariantDB) - .options(joinedload(AppVariantDB.app), joinedload(AppVariantDB.base)) + .options( + joinedload(AppVariantDB.app).load_only(AppDB.id, AppDB.app_name), # type: ignore + joinedload(AppVariantDB.base).joinedload(VariantBaseDB.deployment).load_only(DeploymentDB.uri), # type: ignore + ) .filter_by(app_id=uuid.UUID(app_uuid)) ) app_variants = result.scalars().all() @@ -1827,26 +1830,6 @@ async def get_app_variant_instance_by_id(variant_id: str) -> AppVariantDB: return app_variant_db -async def get_app_variant_revision_by_id( - variant_revision_id: str, -) -> AppVariantRevisionsDB: - """Get the app variant revision object from the database with the provided id. - - Arguments: - variant_revision_id (str): The app variant revision unique identifier - - Returns: - AppVariantDB: instance of app variant object - """ - - async with db_engine.get_session() as session: - result = await session.execute( - select(AppVariantRevisionsDB).filter_by(id=uuid.UUID(variant_revision_id)) - ) - variant_revision_db = result.scalars().first() - return variant_revision_db - - async def fetch_testset_by_id(testset_id: str) -> Optional[TestSetDB]: """Fetches a testset by its ID. Args: @@ -1959,8 +1942,16 @@ async def fetch_evaluation_by_id(evaluation_id: str) -> Optional[EvaluationDB]: joinedload(EvaluationDB.user).load_only(UserDB.username), # type: ignore joinedload(EvaluationDB.testset).load_only(TestSetDB.id, TestSetDB.name), # type: ignore ) - result = await session.execute(query) - evaluation = result.scalars().first() + result = await session.execute( + query.options( + joinedload(EvaluationDB.variant).load_only(AppVariantDB.id, AppVariantDB.variant_name), # type: ignore + joinedload(EvaluationDB.variant_revision).load_only(AppVariantRevisionsDB.revision), # type: ignore + joinedload(EvaluationDB.aggregated_results).joinedload( + EvaluationAggregatedResultDB.evaluator_config + ), + ) + ) + evaluation = result.unique().scalars().first() return evaluation @@ -2289,40 +2280,14 @@ async def fetch_evaluation_scenarios(evaluation_id: str): async with db_engine.get_session() as session: result = await session.execute( - select(EvaluationScenarioDB).filter_by( - evaluation_id=uuid.UUID(evaluation_id) - ) + select(EvaluationScenarioDB) + .filter_by(evaluation_id=uuid.UUID(evaluation_id)) + .options(joinedload(EvaluationScenarioDB.results)) ) - evaluation_scenarios = result.scalars().all() + evaluation_scenarios = result.unique().scalars().all() return evaluation_scenarios -async def fetch_evaluation_scenario_results(evaluation_scenario_id: str): - """ - Fetches evaluation scenario results. - - Args: - evaluation_scenario_id (str): The evaluation scenario identifier - - Returns: - The evaluation scenario results. - """ - - async with db_engine.get_session() as session: - result = await session.execute( - select(EvaluationScenarioResultDB) - .options( - load_only( - EvaluationScenarioResultDB.evaluator_config_id, # type: ignore - EvaluationScenarioResultDB.result, # type: ignore - ) - ) - .filter_by(evaluation_scenario_id=uuid.UUID(evaluation_scenario_id)) - ) - scenario_results = result.scalars().all() - return scenario_results - - async def fetch_evaluation_scenario_by_id( evaluation_scenario_id: str, ) -> Optional[EvaluationScenarioDB]: @@ -2691,7 +2656,14 @@ async def create_new_evaluation( session.add(evaluation) await session.commit() await session.refresh( - evaluation, attribute_names=["user", "testset", "aggregated_results"] + evaluation, + attribute_names=[ + "user", + "testset", + "variant", + "variant_revision", + "aggregated_results", + ], ) return evaluation @@ -2718,7 +2690,13 @@ async def list_evaluations(app_id: str): ) result = await session.execute( - query.options(joinedload(EvaluationDB.aggregated_results)) + query.options( + joinedload(EvaluationDB.variant).load_only(AppVariantDB.id, AppVariantDB.variant_name), # type: ignore + joinedload(EvaluationDB.variant_revision).load_only(AppVariantRevisionsDB.revision), # type: ignore + joinedload(EvaluationDB.aggregated_results).joinedload( + EvaluationAggregatedResultDB.evaluator_config + ), + ) ) evaluations = result.unique().scalars().all() return evaluations @@ -2807,7 +2785,7 @@ async def delete_evaluations(evaluation_ids: List[str]) -> None: async def create_new_evaluation_scenario( user_id: str, - evaluation: EvaluationDB, + evaluation_id: str, variant_id: str, inputs: List[EvaluationScenarioInput], outputs: List[EvaluationScenarioOutput], @@ -2827,7 +2805,7 @@ async def create_new_evaluation_scenario( async with db_engine.get_session() as session: evaluation_scenario = EvaluationScenarioDB( user_id=uuid.UUID(user_id), - evaluation_id=evaluation.id, + evaluation_id=uuid.UUID(evaluation_id), variant_id=uuid.UUID(variant_id), inputs=[input.dict() for input in inputs], outputs=[output.dict() for output in outputs], From 4a07176825b73f86b63a2bfe1d38d17e4e81b79a Mon Sep 17 00:00:00 2001 From: Abram Date: Wed, 17 Jul 2024 08:50:44 +0100 Subject: [PATCH 4/7] fix (backend): resolve object list can't be used in 'await' expression error --- agenta-backend/agenta_backend/routers/evaluation_router.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agenta-backend/agenta_backend/routers/evaluation_router.py b/agenta-backend/agenta_backend/routers/evaluation_router.py index 9aab90d03..db213c829 100644 --- a/agenta-backend/agenta_backend/routers/evaluation_router.py +++ b/agenta-backend/agenta_backend/routers/evaluation_router.py @@ -220,7 +220,7 @@ async def fetch_evaluation_results(evaluation_id: str, request: Request): status_code=403, ) - results = await converters.aggregated_result_of_evaluation_to_pydantic( + results = converters.aggregated_result_of_evaluation_to_pydantic( evaluation.aggregated_results # type: ignore ) return {"results": results, "evaluation_id": evaluation_id} From 40b1d150bbd1b6a62141813323a8379150b5f819 Mon Sep 17 00:00:00 2001 From: Abram Date: Wed, 24 Jul 2024 09:18:31 +0100 Subject: [PATCH 5/7] refactor (tests): handle floating-point precision issue by using 'pytest.approx' for the assertion --- agenta-backend/agenta_backend/tests/unit/test_evaluators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agenta-backend/agenta_backend/tests/unit/test_evaluators.py b/agenta-backend/agenta_backend/tests/unit/test_evaluators.py index fdde4ef45..4a2994bf9 100644 --- a/agenta-backend/agenta_backend/tests/unit/test_evaluators.py +++ b/agenta-backend/agenta_backend/tests/unit/test_evaluators.py @@ -281,7 +281,7 @@ def test_auto_semantic_similarity_match( settings_values, {"OPENAI_API_KEY": os.environ.get("OPENAI_API_KEY")}, ) - assert round(result.value, 3) == expected_score + assert round(result.value, 3) == pytest.approx(expected_score, rel=1e-3) @pytest.mark.parametrize( From 377f77397a890e0dcde6b3d28f534d7a7d2cd0d1 Mon Sep 17 00:00:00 2001 From: Abram Date: Mon, 19 Aug 2024 01:16:56 +0100 Subject: [PATCH 6/7] refactor (backend): ensure that loaded relationship are of their respective types --- .../agenta_backend/services/db_manager.py | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/agenta-backend/agenta_backend/services/db_manager.py b/agenta-backend/agenta_backend/services/db_manager.py index e9a2d23e2..9ec74037f 100644 --- a/agenta-backend/agenta_backend/services/db_manager.py +++ b/agenta-backend/agenta_backend/services/db_manager.py @@ -209,7 +209,8 @@ async def fetch_app_variant_by_id( assert app_variant_id is not None, "app_variant_id cannot be None" async with db_engine.get_session() as session: base_query = select(AppVariantDB).options( - joinedload(AppVariantDB.base), joinedload(AppVariantDB.app) + joinedload(AppVariantDB.app.of_type(AppDB)).load_only(AppDB.id, AppDB.app_name), # type: ignore + joinedload(AppVariantDB.base.of_type(VariantBaseDB)).joinedload(VariantBaseDB.deployment.of_type(DeploymentDB)).load_only(DeploymentDB.id, DeploymentDB.uri), # type: ignore ) if isCloudEE(): query = base_query.options( @@ -1125,8 +1126,8 @@ async def list_app_variants(app_id: str): result = await session.execute( select(AppVariantDB) .options( - joinedload(AppVariantDB.app).load_only(AppDB.id, AppDB.app_name), # type: ignore - joinedload(AppVariantDB.base).joinedload(VariantBaseDB.deployment).load_only(DeploymentDB.uri), # type: ignore + joinedload(AppVariantDB.app.of_type(AppDB)).load_only(AppDB.id, AppDB.app_name), # type: ignore + joinedload(AppVariantDB.base.of_type(VariantBaseDB)).joinedload(VariantBaseDB.deployment.of_type(DeploymentDB)).load_only(DeploymentDB.uri), # type: ignore ) .filter_by(app_id=uuid.UUID(app_uuid)) ) @@ -1824,7 +1825,10 @@ async def get_app_variant_instance_by_id(variant_id: str) -> AppVariantDB: async with db_engine.get_session() as session: result = await session.execute( select(AppVariantDB) - .options(joinedload(AppVariantDB.base), joinedload(AppVariantDB.app)) + .options( + joinedload(AppVariantDB.app.of_type(AppDB)).load_only(AppDB.id, AppDB.app_name), # type: ignore + joinedload(AppVariantDB.base.of_type(VariantBaseDB)).joinedload(VariantBaseDB.deployment.of_type(DeploymentDB)).load_only(DeploymentDB.uri), # type: ignore + ) .filter_by(id=uuid.UUID(variant_id)) ) app_variant_db = result.scalars().first() @@ -1945,11 +1949,13 @@ async def fetch_evaluation_by_id(evaluation_id: str) -> Optional[EvaluationDB]: ) result = await session.execute( query.options( - joinedload(EvaluationDB.variant).load_only(AppVariantDB.id, AppVariantDB.variant_name), # type: ignore - joinedload(EvaluationDB.variant_revision).load_only(AppVariantRevisionsDB.revision), # type: ignore - joinedload(EvaluationDB.aggregated_results).joinedload( - EvaluationAggregatedResultDB.evaluator_config - ), + joinedload(EvaluationDB.variant.of_type(AppVariantDB)).load_only(AppVariantDB.id, AppVariantDB.variant_name), # type: ignore + joinedload(EvaluationDB.variant_revision.of_type(AppVariantRevisionsDB)).load_only(AppVariantRevisionsDB.revision), # type: ignore + joinedload( + EvaluationDB.aggregated_results.of_type( + EvaluationAggregatedResultDB + ) + ).joinedload(EvaluationAggregatedResultDB.evaluator_config), ) ) evaluation = result.unique().scalars().first() @@ -2692,11 +2698,13 @@ async def list_evaluations(app_id: str): result = await session.execute( query.options( - joinedload(EvaluationDB.variant).load_only(AppVariantDB.id, AppVariantDB.variant_name), # type: ignore - joinedload(EvaluationDB.variant_revision).load_only(AppVariantRevisionsDB.revision), # type: ignore - joinedload(EvaluationDB.aggregated_results).joinedload( - EvaluationAggregatedResultDB.evaluator_config - ), + joinedload(EvaluationDB.variant.of_type(AppVariantDB)).load_only(AppVariantDB.id, AppVariantDB.variant_name), # type: ignore + joinedload(EvaluationDB.variant_revision.of_type(AppVariantRevisionsDB)).load_only(AppVariantRevisionsDB.revision), # type: ignore + joinedload( + EvaluationDB.aggregated_results.of_type( + EvaluationAggregatedResultDB + ) + ).joinedload(EvaluationAggregatedResultDB.evaluator_config), ) ) evaluations = result.unique().scalars().all() From f3e9bd6c78af002057025a958be2e3e1d715de57 Mon Sep 17 00:00:00 2001 From: Abram Date: Fri, 23 Aug 2024 17:04:38 +0100 Subject: [PATCH 7/7] chore (style): format codebase --- website/scripts/generate_cookbooks.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/website/scripts/generate_cookbooks.py b/website/scripts/generate_cookbooks.py index b84cd2ee3..6b8a80922 100644 --- a/website/scripts/generate_cookbooks.py +++ b/website/scripts/generate_cookbooks.py @@ -5,6 +5,7 @@ import os import sys + def make_header(notebook_path): github_uri = "Agenta-AI/agenta/blob/main/cookbook" github_path = f"https://github.com/{github_uri}/{os.path.basename(notebook_path)}" @@ -52,11 +53,11 @@ def export_notebook(notebook_path, output_path): title = convert_to_title_case(os.path.basename(notebook_path)) # Add the title to the top of the markdown file - title_header = f"---\ntitle: \"{title}\"\n---\n\n" - + title_header = f'---\ntitle: "{title}"\n---\n\n' + # Add the header below the title header = make_header(notebook_path) - + # Combine the title, header, and the output markdown content output = title_header + header + output @@ -103,4 +104,4 @@ def main(notebook_filename=None): if __name__ == "__main__": # Get the filename argument from the command line notebook_filename = sys.argv[1] if len(sys.argv) > 1 else None - main(notebook_filename) \ No newline at end of file + main(notebook_filename)