From f737511ad93a912abe582ff89e47db378980b825 Mon Sep 17 00:00:00 2001 From: aakrem Date: Fri, 7 Jun 2024 15:38:58 +0200 Subject: [PATCH 01/34] initial migration --- .../migrations/mongo_to_postgres/migration.py | 436 ++++++++++++++++++ .../migrations/mongo_to_postgres/utils.py | 140 ++++++ 2 files changed, 576 insertions(+) create mode 100644 agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py create mode 100644 agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py new file mode 100644 index 000000000..ce55e1b5e --- /dev/null +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -0,0 +1,436 @@ +import os +import asyncio +from datetime import datetime, timezone + +from pymongo import MongoClient +from bson import DBRef +from sqlalchemy import text +from sqlalchemy.dialects.postgresql import UUID +import uuid_utils.compat as uuid + +# Assuming agenta_backend.models.db_models contains your SQLAlchemy models +from agenta_backend.models.db_models import ( + Base, + UserDB, + ImageDB, + AppDB, + DeploymentDB, + VariantBaseDB, + AppVariantDB, + AppVariantRevisionsDB, + AppEnvironmentDB, + AppEnvironmentRevisionDB, + TemplateDB, + TestSetDB, + EvaluatorConfigDB, + HumanEvaluationDB, + HumanEvaluationScenarioDB, + EvaluationDB, + EvaluationScenarioDB, + IDsMappingDB, +) + +from agenta_backend.migrations.mongo_to_postgres.utils import ( + drop_all_tables, + create_all_tables, + print_migration_report, + store_mapping, + get_mapped_uuid, + generate_uuid, + get_datetime, + migrate_collection, +) + +from agenta_backend.models.shared_models import TemplateType + +tables = [ + UserDB, + ImageDB, + AppDB, + DeploymentDB, + VariantBaseDB, + AppVariantDB, + AppVariantRevisionsDB, + AppEnvironmentDB, + AppEnvironmentRevisionDB, + TemplateDB, + TestSetDB, + EvaluatorConfigDB, + HumanEvaluationDB, + HumanEvaluationScenarioDB, + EvaluationDB, + EvaluationScenarioDB, + IDsMappingDB, +] + + +async def transform_user(user): + user_uuid = generate_uuid() + await store_mapping("users", user["_id"], user_uuid) + return { + "id": user_uuid, + "uid": user["uid"], + "username": user["username"], + "email": user["email"], + "created_at": get_datetime(user.get("created_at")), + "updated_at": get_datetime(user.get("updated_at")), + } + + +async def transform_image(image): + user_uuid = await get_mapped_uuid( + image["user"].id if isinstance(image["user"], DBRef) else image["user"] + ) + image_uuid = generate_uuid() + await store_mapping("docker_images", image["_id"], image_uuid) + return { + "id": image_uuid, + "type": image["type"], + "template_uri": image.get("template_uri"), + "docker_id": image.get("docker_id"), + "tags": image.get("tags"), + "deletable": image.get("deletable", True), + "user_id": user_uuid, + "created_at": get_datetime(image.get("created_at")), + "updated_at": get_datetime(image.get("updated_at")), + } + + +async def transform_app(app): + user_uuid = await get_mapped_uuid(app["user"].id) + app_uuid = generate_uuid() + await store_mapping("app_db", app["_id"], app_uuid) + return { + "id": app_uuid, + "app_name": app["app_name"], + "user_id": user_uuid, + "created_at": get_datetime(app.get("created_at")), + "updated_at": get_datetime(app.get("updated_at")), + } + + +async def transform_deployment(deployment): + app_uuid = await get_mapped_uuid(deployment["app"].id) + user_uuid = await get_mapped_uuid(deployment["user"].id) + deployment_uuid = generate_uuid() + await store_mapping("deployments", deployment["_id"], deployment_uuid) + return { + "id": deployment_uuid, + "app_id": app_uuid, + "user_id": user_uuid, + "container_name": deployment.get("container_name"), + "container_id": deployment.get("container_id"), + "uri": deployment.get("uri"), + "status": deployment["status"], + "created_at": get_datetime(deployment.get("created_at")), + "updated_at": get_datetime(deployment.get("updated_at")), + } + + +async def transform_variant_base(base): + app_uuid = await get_mapped_uuid(base["app"].id) + user_uuid = await get_mapped_uuid(base["user"].id) + image_uuid = await get_mapped_uuid(base["image"].id) + deployment_uuid = base["deployment"] and await get_mapped_uuid(base["deployment"]) + base_uuid = generate_uuid() + await store_mapping("bases", base["_id"], base_uuid) + return { + "id": base_uuid, + "app_id": app_uuid, + "user_id": user_uuid, + "base_name": base["base_name"], + "image_id": image_uuid, + "deployment_id": deployment_uuid, + "created_at": get_datetime(base.get("created_at")), + "updated_at": get_datetime(base.get("updated_at")), + } + + +async def transform_app_variant(variant): + app_uuid = await get_mapped_uuid(variant["app"].id) + image_uuid = await get_mapped_uuid(variant["image"].id) + user_uuid = await get_mapped_uuid(variant["user"].id) + modified_by_uuid = await get_mapped_uuid(variant["modified_by"].id) + base_uuid = await get_mapped_uuid(variant["base"].id) + variant_uuid = generate_uuid() + await store_mapping("app_variants", variant["_id"], variant_uuid) + return { + "id": variant_uuid, + "app_id": app_uuid, + "variant_name": variant["variant_name"], + "revision": variant["revision"], + "image_id": image_uuid, + "user_id": user_uuid, + "modified_by_id": modified_by_uuid, + "base_name": variant.get("base_name"), + "base_id": base_uuid, + "config_name": variant["config_name"], + "config_parameters": variant["config"], + "created_at": get_datetime(variant.get("created_at")), + "updated_at": get_datetime(variant.get("updated_at")), + } + + +async def transform_app_variant_revision(revision): + variant_uuid = await get_mapped_uuid(revision["variant"].id) + modified_by_uuid = await get_mapped_uuid(revision["modified_by"].id) + base_uuid = await get_mapped_uuid(revision["base"].id) + revision_uuid = generate_uuid() + await store_mapping("app_variant_revisions", revision["_id"], revision_uuid) + return { + "id": revision_uuid, + "variant_id": variant_uuid, + "revision": revision["revision"], + "modified_by_id": modified_by_uuid, + "base_id": base_uuid, + "config_name": revision["config"]["config_name"], + "config_parameters": revision["config"]["parameters"], + "created_at": get_datetime(revision["created_at"]), + "updated_at": get_datetime(revision["updated_at"]), + } + + +async def transform_app_environment(environment): + app_uuid = await get_mapped_uuid(environment["app"].id) + user_uuid = await get_mapped_uuid(environment["user"].id) + variant_uuid = await get_mapped_uuid(environment["deployed_app_variant"]) + revision_uuid = await get_mapped_uuid(environment["deployed_app_variant_revision"]) + deployment_uuid = await get_mapped_uuid(environment["deployment"]) + environment_uuid = generate_uuid() + await store_mapping("environments", environment["_id"], environment_uuid) + return { + "id": environment_uuid, + "app_id": app_uuid, + "name": environment["name"], + "user_id": user_uuid, + "revision": environment["revision"], + "deployed_app_variant_id": variant_uuid, + "deployed_app_variant_revision_id": revision_uuid, + "deployment_id": deployment_uuid, + "created_at": get_datetime(environment.get("created_at")), + } + + +async def transform_app_environment_revision(revision): + environment_uuid = await get_mapped_uuid(revision["environment"].id) + modified_by_uuid = await get_mapped_uuid(revision["modified_by"].id) + variant_revision_uuid = await get_mapped_uuid( + revision["deployed_app_variant_revision"] + ) + deployment_uuid = await get_mapped_uuid(revision["deployment"]) + revision_uuid = generate_uuid() + await store_mapping("environments_revisions", revision["_id"], revision_uuid) + return { + "id": revision_uuid, + "environment_id": environment_uuid, + "revision": revision["revision"], + "modified_by_id": modified_by_uuid, + "deployed_app_variant_revision_id": variant_revision_uuid, + "deployment_id": deployment_uuid, + "created_at": get_datetime(revision["created_at"]), + } + + +async def transform_template(template): + template_uuid = generate_uuid() + await store_mapping("templates", template["_id"], template_uuid) + + # Ensure type is correctly mapped to TemplateType enum + template_type = ( + TemplateType(template["type"]) if "type" in template else TemplateType.IMAGE + ) + + return { + "id": template_uuid, + "type": template_type, + "template_uri": template.get("template_uri"), + "tag_id": template.get("tag_id"), + "name": template["name"], + "repo_name": template.get("repo_name"), + "title": template["title"], + "description": template["description"], + "size": template.get("size"), + "digest": template.get("digest"), + "last_pushed": get_datetime(template.get("last_pushed")), + } + + +async def transform_test_set(test_set): + app_uuid = await get_mapped_uuid(test_set["app"].id) + user_uuid = await get_mapped_uuid(test_set["user"].id) + test_set_uuid = generate_uuid() + await store_mapping("testsets", test_set["_id"], test_set_uuid) + return { + "id": test_set_uuid, + "name": test_set["name"], + "app_id": app_uuid, + "csvdata": test_set["csvdata"], + "user_id": user_uuid, + "created_at": get_datetime(test_set.get("created_at")), + "updated_at": get_datetime(test_set.get("updated_at")), + } + + +async def transform_evaluator_config(config): + evaluation_uuid = await get_mapped_uuid(config["evaluation"].id) + scenario_uuid = await get_mapped_uuid(config["evaluation_scenario"].id) + app_uuid = await get_mapped_uuid(config["app"].id) + user_uuid = await get_mapped_uuid(config["user"].id) + config_uuid = generate_uuid() + await store_mapping("evaluators_configs", config["_id"], config_uuid) + return { + "id": config_uuid, + "evaluation_id": evaluation_uuid, + "evaluation_scenario_id": scenario_uuid, + "app_id": app_uuid, + "user_id": user_uuid, + "name": config["name"], + "evaluator_key": config["evaluator_key"], + "settings_values": config["settings_values"], + "created_at": get_datetime(config.get("created_at")), + "updated_at": get_datetime(config.get("updated_at")), + } + + +async def transform_human_evaluation(evaluation): + app_uuid = await get_mapped_uuid(evaluation["app"].id) + user_uuid = await get_mapped_uuid(evaluation["user"].id) + test_set_uuid = await get_mapped_uuid(evaluation["testset"].id) + variant_uuid = await get_mapped_uuid(evaluation["variants"][0]) + revision_uuid = await get_mapped_uuid(evaluation["variants_revisions"][0]) + evaluation_uuid = generate_uuid() + await store_mapping("human_evaluations", evaluation["_id"], evaluation_uuid) + return { + "id": evaluation_uuid, + "app_id": app_uuid, + "user_id": user_uuid, + "status": evaluation["status"], + "evaluation_type": evaluation["evaluation_type"], + "variant_id": variant_uuid, + "variant_revision_id": revision_uuid, + "testset_id": test_set_uuid, + "created_at": get_datetime(evaluation.get("created_at")), + "updated_at": get_datetime(evaluation.get("updated_at")), + } + + +async def transform_human_evaluation_scenario(scenario): + user_uuid = await get_mapped_uuid(scenario["user"].id) + evaluation_uuid = await get_mapped_uuid(scenario["evaluation"].id) + scenario_uuid = generate_uuid() + await store_mapping("human_evaluations_scenarios", scenario["_id"], scenario_uuid) + return { + "id": scenario_uuid, + "user_id": user_uuid, + "evaluation_id": evaluation_uuid, + "inputs": scenario["inputs"], + "outputs": scenario["outputs"], + "vote": scenario.get("vote"), + "score": scenario.get("score"), + "correct_answer": scenario.get("correct_answer"), + "created_at": get_datetime(scenario.get("created_at")), + "updated_at": get_datetime(scenario.get("updated_at")), + "is_pinned": scenario.get("is_pinned"), + "note": scenario.get("note"), + } + + +async def transform_evaluation(evaluation): + app_uuid = await get_mapped_uuid(evaluation["app"].id) + user_uuid = await get_mapped_uuid(evaluation["user"].id) + test_set_uuid = await get_mapped_uuid(evaluation["testset"].id) + variant_uuid = await get_mapped_uuid(evaluation["variant"]) + revision_uuid = await get_mapped_uuid(evaluation["variant_revision"]) + evaluation_uuid = generate_uuid() + await store_mapping("evaluations", evaluation["_id"], evaluation_uuid) + return { + "id": evaluation_uuid, + "app_id": app_uuid, + "user_id": user_uuid, + "status": evaluation["status"], + "testset_id": test_set_uuid, + "variant_id": variant_uuid, + "variant_revision_id": revision_uuid, + "aggregated_results": evaluation["aggregated_results"], + "average_cost": evaluation["average_cost"], + "total_cost": evaluation["total_cost"], + "average_latency": evaluation["average_latency"], + "created_at": get_datetime(evaluation.get("created_at")), + "updated_at": get_datetime(evaluation.get("updated_at")), + } + + +async def transform_evaluation_scenario(scenario): + user_uuid = await get_mapped_uuid(scenario["user"].id) + evaluation_uuid = await get_mapped_uuid(scenario["evaluation"].id) + variant_uuid = await get_mapped_uuid(scenario["variant_id"]) + scenario_uuid = generate_uuid() + await store_mapping("evaluation_scenarios", scenario["_id"], scenario_uuid) + return { + "id": scenario_uuid, + "user_id": user_uuid, + "evaluation_id": evaluation_uuid, + "variant_id": variant_uuid, + "inputs": scenario["inputs"], + "outputs": scenario["outputs"], + "correct_answers": scenario.get("correct_answers"), + "is_pinned": scenario.get("is_pinned"), + "note": scenario.get("note"), + "results": scenario["results"], + "latency": scenario.get("latency"), + "cost": scenario.get("cost"), + "created_at": get_datetime(scenario.get("created_at")), + "updated_at": get_datetime(scenario.get("updated_at")), + } + + +async def main(): + try: + await drop_all_tables() + await create_all_tables(tables=tables) + await migrate_collection("users", UserDB, transform_user) + await migrate_collection("docker_images", ImageDB, transform_image) + await migrate_collection("app_db", AppDB, transform_app) + await migrate_collection("deployments", DeploymentDB, transform_deployment) + await migrate_collection("bases", VariantBaseDB, transform_variant_base) + await migrate_collection("app_variants", AppVariantDB, transform_app_variant) + await migrate_collection( + "app_variant_revisions", + AppVariantRevisionsDB, + transform_app_variant_revision, + ) + await migrate_collection( + "environments", AppEnvironmentDB, transform_app_environment + ) + await migrate_collection( + "environments_revisions", + AppEnvironmentRevisionDB, + transform_app_environment_revision, + ) + await migrate_collection("templates", TemplateDB, transform_template) + await migrate_collection("testsets", TestSetDB, transform_test_set) + await migrate_collection( + "evaluators_configs", EvaluatorConfigDB, transform_evaluator_config + ) + await migrate_collection( + "human_evaluations", HumanEvaluationDB, transform_human_evaluation + ) + await migrate_collection( + "human_evaluations_scenarios", + HumanEvaluationScenarioDB, + transform_human_evaluation_scenario, + ) + await migrate_collection("evaluations", EvaluationDB, transform_evaluation) + await migrate_collection( + "evaluation_scenarios", EvaluationScenarioDB, transform_evaluation_scenario + ) + print("Migration completed successfully.") + except Exception as e: + print(f"\n====================== Error ======================\n") + print(f"Error occurred: {e}") + finally: + print_migration_report() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py new file mode 100644 index 000000000..5951651a8 --- /dev/null +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -0,0 +1,140 @@ +import os +import asyncio +from datetime import datetime, timezone + +from pymongo import MongoClient +from bson import ObjectId, DBRef +from sqlalchemy import MetaData, Column, String, DateTime, text, create_engine +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +import uuid_utils.compat as uuid +from sqlalchemy.future import select + + +from agenta_backend.models.db_engine import db_engine + +from agenta_backend.models.db_models import ( + IDsMappingDB, + Base, +) + +BATCH_SIZE = 1000 + +# MongoDB connection +MONGO_URI = os.environ.get("MONGODB_URI") +DATABASE_MODE = os.environ.get("DATABASE_MODE") +mongo_client = MongoClient(MONGO_URI) +mongo_db_name = f"agenta_{DATABASE_MODE}" +mongo_db = mongo_client[mongo_db_name] + +migration_report = {} + + +async def drop_all_tables(): + """Drop all tables in the database.""" + async with db_engine.engine.begin() as conn: + await conn.run_sync(Base.metadata.reflect) + await conn.run_sync(Base.metadata.drop_all) + + +async def create_all_tables(tables): + """Create all tables in the database.""" + async with db_engine.engine.begin() as conn: + for table in tables: + print(f"====================== Creating table for {table.__name__}") + await conn.run_sync(table.metadata.create_all) + print("All tables dropped and created.") + + +async def store_mapping(table_name, mongo_id, uuid): + """Store the mapping of MongoDB ObjectId to UUID in the mapping table.""" + async with db_engine.get_session() as session: + async with session.begin(): + mapping = IDsMappingDB( + table_name=table_name, objectid=str(mongo_id), uuid=uuid + ) + session.add(mapping) + + +async def get_mapped_uuid(mongo_id): + """Retrieve the mapped UUID for a given MongoDB ObjectId.""" + async with db_engine.get_session() as session: + async with session.begin(): + stmt = select(IDsMappingDB.uuid).filter( + IDsMappingDB.objectid == str(mongo_id) + ) + result = await session.execute(stmt) + row = result.first() + return row[0] if row else None + + +def get_datetime(value): + """Helper function to handle datetime fields.""" + if isinstance(value, str): + return datetime.fromisoformat(value.replace("Z", "+00:00")) + return value if value else datetime.now(timezone.utc) + + +def generate_uuid(): + """Generate a new UUID.""" + return uuid.uuid7() + + +def update_migration_report(collection_name, total_docs, migrated_docs): + migration_report[collection_name] = {"total": total_docs, "migrated": migrated_docs} + + +def print_migration_report(): + print("\n====================== Migration Report ======================") + + # Headers + headers = ["Table", "Total in MongoDB", "Migrated to PostgreSQL"] + + # Determine the maximum lengths for each column including headers + max_table_length = max( + len(headers[0]), max(len(table) for table in migration_report.keys()) + ) + max_total_length = max( + len(headers[1]), + max(len(str(counts["total"])) for counts in migration_report.values()), + ) + max_migrated_length = max( + len(headers[2]), + max(len(str(counts["migrated"])) for counts in migration_report.values()), + ) + + # Set the header and divider with appropriate padding + table_header = f"| {headers[0].ljust(max_table_length)} | {headers[1].ljust(max_total_length)} | {headers[2].ljust(max_migrated_length)} |" + table_divider = f"|{'-' * (max_table_length + 2)}|{'-' * (max_total_length + 2)}|{'-' * (max_migrated_length + 2)}|" + + print(table_header) + print(table_divider) + + for table, counts in migration_report.items(): + table_row = f"| {table.ljust(max_table_length)} | {str(counts['total']).ljust(max_total_length)} | {str(counts['migrated']).ljust(max_migrated_length)} |" + print(table_row) + + +async def migrate_collection(collection_name, model_class, transformation_func): + """General function to migrate a collection to a SQL table.""" + print( + f"\n====================== Migrating {collection_name}... ======================\n" + ) + total_docs = mongo_db[collection_name].count_documents({}) + migrated_docs = 0 + async with db_engine.get_session() as session: + async with session.begin(): + for skip in range(0, total_docs, BATCH_SIZE): + batch = await asyncio.get_event_loop().run_in_executor( + None, + lambda: list( + mongo_db[collection_name].find().skip(skip).limit(BATCH_SIZE) + ), + ) + for document in batch: + transformed_document = await transformation_func(document) + session.add(model_class(**transformed_document)) + migrated_docs += 1 + await session.commit() + update_migration_report(collection_name, total_docs, migrated_docs) From 73e6677067c2abe82daeba05f64d30b99cc42261 Mon Sep 17 00:00:00 2001 From: aakrem Date: Fri, 7 Jun 2024 22:08:58 +0200 Subject: [PATCH 02/34] fix evaluations --- .../migrations/mongo_to_postgres/migration.py | 81 ++++++++++++++++--- .../migrations/mongo_to_postgres/utils.py | 17 +++- 2 files changed, 85 insertions(+), 13 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py index ce55e1b5e..cd56d3dab 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -1,3 +1,4 @@ +import json import os import asyncio from datetime import datetime, timezone @@ -10,6 +11,7 @@ # Assuming agenta_backend.models.db_models contains your SQLAlchemy models from agenta_backend.models.db_models import ( + EvaluationAggregatedResultDB, Base, UserDB, ImageDB, @@ -28,6 +30,8 @@ EvaluationDB, EvaluationScenarioDB, IDsMappingDB, + EvaluationEvaluatorConfigDB, + EvaluationScenarioResultDB, ) from agenta_backend.migrations.mongo_to_postgres.utils import ( @@ -61,6 +65,8 @@ EvaluationDB, EvaluationScenarioDB, IDsMappingDB, + EvaluationEvaluatorConfigDB, + EvaluationScenarioResultDB, ] @@ -272,16 +278,12 @@ async def transform_test_set(test_set): async def transform_evaluator_config(config): - evaluation_uuid = await get_mapped_uuid(config["evaluation"].id) - scenario_uuid = await get_mapped_uuid(config["evaluation_scenario"].id) app_uuid = await get_mapped_uuid(config["app"].id) user_uuid = await get_mapped_uuid(config["user"].id) config_uuid = generate_uuid() await store_mapping("evaluators_configs", config["_id"], config_uuid) return { "id": config_uuid, - "evaluation_id": evaluation_uuid, - "evaluation_scenario_id": scenario_uuid, "app_id": app_uuid, "user_id": user_uuid, "name": config["name"], @@ -335,6 +337,40 @@ async def transform_human_evaluation_scenario(scenario): } +async def convert_aggregated_results(results, evaluation_id): + """Convert evaluator_config ObjectIds in aggregated_results to UUIDs and structure them.""" + aggregated_results = [] + for result in results: + evaluator_config_uuid = await get_mapped_uuid(result["evaluator_config"]) + result_uuid = generate_uuid() + aggregated_results.append( + { + "id": result_uuid, + "evaluation_id": evaluation_id, + "evaluator_config_id": evaluator_config_uuid, + "result": result["result"], + } + ) + return aggregated_results + + +async def convert_scenario_aggregated_results(results, scenario_id): + """Convert evaluator_config ObjectIds in scenario aggregated_results to UUIDs and structure them.""" + scenario_aggregated_results = [] + for result in results: + evaluator_config_uuid = await get_mapped_uuid(result["evaluator_config"]) + result_uuid = generate_uuid() + scenario_aggregated_results.append( + { + "id": result_uuid, + "evaluation_scenario_id": scenario_id, + "evaluator_config_id": evaluator_config_uuid, + "result": result["result"], + } + ) + return scenario_aggregated_results + + async def transform_evaluation(evaluation): app_uuid = await get_mapped_uuid(evaluation["app"].id) user_uuid = await get_mapped_uuid(evaluation["user"].id) @@ -342,8 +378,10 @@ async def transform_evaluation(evaluation): variant_uuid = await get_mapped_uuid(evaluation["variant"]) revision_uuid = await get_mapped_uuid(evaluation["variant_revision"]) evaluation_uuid = generate_uuid() + await store_mapping("evaluations", evaluation["_id"], evaluation_uuid) - return { + + transformed_evaluation = { "id": evaluation_uuid, "app_id": app_uuid, "user_id": user_uuid, @@ -351,7 +389,6 @@ async def transform_evaluation(evaluation): "testset_id": test_set_uuid, "variant_id": variant_uuid, "variant_revision_id": revision_uuid, - "aggregated_results": evaluation["aggregated_results"], "average_cost": evaluation["average_cost"], "total_cost": evaluation["total_cost"], "average_latency": evaluation["average_latency"], @@ -359,14 +396,22 @@ async def transform_evaluation(evaluation): "updated_at": get_datetime(evaluation.get("updated_at")), } + aggregated_results = await convert_aggregated_results( + evaluation["aggregated_results"], evaluation_uuid + ) + + return transformed_evaluation, aggregated_results + async def transform_evaluation_scenario(scenario): user_uuid = await get_mapped_uuid(scenario["user"].id) evaluation_uuid = await get_mapped_uuid(scenario["evaluation"].id) variant_uuid = await get_mapped_uuid(scenario["variant_id"]) scenario_uuid = generate_uuid() + await store_mapping("evaluation_scenarios", scenario["_id"], scenario_uuid) - return { + + transformed_scenario = { "id": scenario_uuid, "user_id": user_uuid, "evaluation_id": evaluation_uuid, @@ -376,13 +421,20 @@ async def transform_evaluation_scenario(scenario): "correct_answers": scenario.get("correct_answers"), "is_pinned": scenario.get("is_pinned"), "note": scenario.get("note"), - "results": scenario["results"], "latency": scenario.get("latency"), "cost": scenario.get("cost"), "created_at": get_datetime(scenario.get("created_at")), "updated_at": get_datetime(scenario.get("updated_at")), } + aggregated_results = [] + if "results" in scenario: + aggregated_results = await convert_scenario_aggregated_results( + scenario["results"], scenario_uuid + ) + + return transformed_scenario, aggregated_results + async def main(): try: @@ -420,10 +472,19 @@ async def main(): HumanEvaluationScenarioDB, transform_human_evaluation_scenario, ) - await migrate_collection("evaluations", EvaluationDB, transform_evaluation) await migrate_collection( - "evaluation_scenarios", EvaluationScenarioDB, transform_evaluation_scenario + "new_evaluations", + EvaluationDB, + transform_evaluation, + EvaluationAggregatedResultDB, ) + await migrate_collection( + "new_evaluation_scenarios", + EvaluationScenarioDB, + transform_evaluation_scenario, + EvaluationScenarioResultDB, + ) + print("Migration completed successfully.") except Exception as e: print(f"\n====================== Error ======================\n") diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index 5951651a8..bc41d81dd 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -116,7 +116,9 @@ def print_migration_report(): print(table_row) -async def migrate_collection(collection_name, model_class, transformation_func): +async def migrate_collection( + collection_name, model_class, transformation_func, association_model=None +): """General function to migrate a collection to a SQL table.""" print( f"\n====================== Migrating {collection_name}... ======================\n" @@ -133,8 +135,17 @@ async def migrate_collection(collection_name, model_class, transformation_func): ), ) for document in batch: - transformed_document = await transformation_func(document) - session.add(model_class(**transformed_document)) + if association_model: + ( + transformed_document, + associated_entities, + ) = await transformation_func(document) + session.add(model_class(**transformed_document)) + for assoc_entity in associated_entities: + session.add(association_model(**assoc_entity)) + else: + transformed_document = await transformation_func(document) + session.add(model_class(**transformed_document)) migrated_docs += 1 await session.commit() update_migration_report(collection_name, total_docs, migrated_docs) From e3aba63bf2ec877a49d20a89a6a5e085c3a8bbc0 Mon Sep 17 00:00:00 2001 From: aakrem Date: Fri, 14 Jun 2024 08:03:31 +0200 Subject: [PATCH 03/34] small improvements --- .../migrations/mongo_to_postgres/utils.py | 27 ++++++++++++------- .../agenta_backend/models/db_models.py | 9 ++++++- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index bc41d81dd..2e64dedf2 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -49,12 +49,14 @@ async def create_all_tables(tables): async def store_mapping(table_name, mongo_id, uuid): """Store the mapping of MongoDB ObjectId to UUID in the mapping table.""" + id = generate_uuid() async with db_engine.get_session() as session: async with session.begin(): mapping = IDsMappingDB( - table_name=table_name, objectid=str(mongo_id), uuid=uuid + id=id, table_name=table_name, objectid=str(mongo_id), uuid=uuid ) session.add(mapping) + await session.commit() async def get_mapped_uuid(mongo_id): @@ -91,6 +93,10 @@ def print_migration_report(): # Headers headers = ["Table", "Total in MongoDB", "Migrated to PostgreSQL"] + if not migration_report: + print("No data available in the migration report.") + return + # Determine the maximum lengths for each column including headers max_table_length = max( len(headers[0]), max(len(table) for table in migration_report.keys()) @@ -125,15 +131,16 @@ async def migrate_collection( ) total_docs = mongo_db[collection_name].count_documents({}) migrated_docs = 0 + async with db_engine.get_session() as session: - async with session.begin(): - for skip in range(0, total_docs, BATCH_SIZE): - batch = await asyncio.get_event_loop().run_in_executor( - None, - lambda: list( - mongo_db[collection_name].find().skip(skip).limit(BATCH_SIZE) - ), - ) + for skip in range(0, total_docs, BATCH_SIZE): + batch = await asyncio.get_event_loop().run_in_executor( + None, + lambda: list( + mongo_db[collection_name].find().skip(skip).limit(BATCH_SIZE) + ), + ) + async with session.begin(): for document in batch: if association_model: ( @@ -147,5 +154,5 @@ async def migrate_collection( transformed_document = await transformation_func(document) session.add(model_class(**transformed_document)) migrated_docs += 1 - await session.commit() + await session.commit() update_migration_report(collection_name, total_docs, migrated_docs) diff --git a/agenta-backend/agenta_backend/models/db_models.py b/agenta-backend/agenta_backend/models/db_models.py index af2bea84a..51a45b13c 100644 --- a/agenta-backend/agenta_backend/models/db_models.py +++ b/agenta-backend/agenta_backend/models/db_models.py @@ -523,6 +523,13 @@ class EvaluationScenarioDB(Base): class IDsMappingDB(Base): __tablename__ = "ids_mapping" + id = Column( + UUID(as_uuid=True), + primary_key=True, + default=uuid.uuid7, + unique=True, + nullable=False, + ) table_name = Column(String, nullable=False) - objectid = Column(String, primary_key=True) + objectid = Column(String, nullable=False) uuid = Column(UUID(as_uuid=True), nullable=False) From 04075eb301e51e6b5067378c835f899fa1989586 Mon Sep 17 00:00:00 2001 From: aakrem Date: Fri, 21 Jun 2024 13:03:20 +0200 Subject: [PATCH 04/34] fix human evaluations --- .../migrations/mongo_to_postgres/migration.py | 47 ++++++++++++++----- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py index cd56d3dab..952925681 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -11,27 +11,27 @@ # Assuming agenta_backend.models.db_models contains your SQLAlchemy models from agenta_backend.models.db_models import ( - EvaluationAggregatedResultDB, - Base, UserDB, ImageDB, AppDB, DeploymentDB, VariantBaseDB, AppVariantDB, - AppVariantRevisionsDB, AppEnvironmentDB, AppEnvironmentRevisionDB, TemplateDB, TestSetDB, EvaluatorConfigDB, HumanEvaluationDB, + HumanEvaluationVariantDB, HumanEvaluationScenarioDB, + EvaluationAggregatedResultDB, + EvaluationScenarioResultDB, EvaluationDB, + EvaluationEvaluatorConfigDB, EvaluationScenarioDB, IDsMappingDB, - EvaluationEvaluatorConfigDB, - EvaluationScenarioResultDB, + AppVariantRevisionsDB, ) from agenta_backend.migrations.mongo_to_postgres.utils import ( @@ -294,27 +294,49 @@ async def transform_evaluator_config(config): } +async def convert_human_evaluations_associated_variants( + variants, variants_revisions, evaluation_id +): + """Convert variant and revision ObjectIds to UUIDs and structure them.""" + associated_variants = [] + for variant_id, revision_id in zip(variants, variants_revisions): + variant_uuid = await get_mapped_uuid(variant_id) + revision_uuid = await get_mapped_uuid(revision_id) + associated_variants.append( + { + "human_evaluation_id": evaluation_id, + "variant_id": variant_uuid, + "variant_revision_id": revision_uuid, + } + ) + return associated_variants + + async def transform_human_evaluation(evaluation): app_uuid = await get_mapped_uuid(evaluation["app"].id) user_uuid = await get_mapped_uuid(evaluation["user"].id) test_set_uuid = await get_mapped_uuid(evaluation["testset"].id) - variant_uuid = await get_mapped_uuid(evaluation["variants"][0]) - revision_uuid = await get_mapped_uuid(evaluation["variants_revisions"][0]) evaluation_uuid = generate_uuid() + await store_mapping("human_evaluations", evaluation["_id"], evaluation_uuid) - return { + + transformed_evaluation = { "id": evaluation_uuid, "app_id": app_uuid, "user_id": user_uuid, "status": evaluation["status"], "evaluation_type": evaluation["evaluation_type"], - "variant_id": variant_uuid, - "variant_revision_id": revision_uuid, "testset_id": test_set_uuid, "created_at": get_datetime(evaluation.get("created_at")), "updated_at": get_datetime(evaluation.get("updated_at")), } + associated_variants = await convert_human_evaluations_associated_variants( + evaluation["variants"], evaluation["variants_revisions"], evaluation_uuid + ) + + return transformed_evaluation, associated_variants + async def transform_human_evaluation_scenario(scenario): user_uuid = await get_mapped_uuid(scenario["user"].id) @@ -465,7 +487,10 @@ async def main(): "evaluators_configs", EvaluatorConfigDB, transform_evaluator_config ) await migrate_collection( - "human_evaluations", HumanEvaluationDB, transform_human_evaluation + "human_evaluations", + HumanEvaluationDB, + transform_human_evaluation, + HumanEvaluationVariantDB, ) await migrate_collection( "human_evaluations_scenarios", From f93ba1d6812667422a61fab29e8ffef249b355cd Mon Sep 17 00:00:00 2001 From: aakrem Date: Fri, 21 Jun 2024 13:04:14 +0200 Subject: [PATCH 05/34] add human evaluations variant --- .../migrations/mongo_to_postgres/utils.py | 55 +++++++++---------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index 2e64dedf2..abe759acb 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -11,7 +11,6 @@ import uuid_utils.compat as uuid from sqlalchemy.future import select - from agenta_backend.models.db_engine import db_engine from agenta_backend.models.db_models import ( @@ -35,7 +34,9 @@ async def drop_all_tables(): """Drop all tables in the database.""" async with db_engine.engine.begin() as conn: await conn.run_sync(Base.metadata.reflect) - await conn.run_sync(Base.metadata.drop_all) + # Drop all tables with CASCADE option + for table in reversed(Base.metadata.sorted_tables): + await conn.execute(text(f"DROP TABLE IF EXISTS {table.name} CASCADE")) async def create_all_tables(tables): @@ -51,24 +52,20 @@ async def store_mapping(table_name, mongo_id, uuid): """Store the mapping of MongoDB ObjectId to UUID in the mapping table.""" id = generate_uuid() async with db_engine.get_session() as session: - async with session.begin(): - mapping = IDsMappingDB( - id=id, table_name=table_name, objectid=str(mongo_id), uuid=uuid - ) - session.add(mapping) + mapping = IDsMappingDB( + id=id, table_name=table_name, objectid=str(mongo_id), uuid=uuid + ) + session.add(mapping) await session.commit() async def get_mapped_uuid(mongo_id): """Retrieve the mapped UUID for a given MongoDB ObjectId.""" async with db_engine.get_session() as session: - async with session.begin(): - stmt = select(IDsMappingDB.uuid).filter( - IDsMappingDB.objectid == str(mongo_id) - ) - result = await session.execute(stmt) - row = result.first() - return row[0] if row else None + stmt = select(IDsMappingDB.uuid).filter(IDsMappingDB.objectid == str(mongo_id)) + result = await session.execute(stmt) + row = result.first() + return row[0] if row else None def get_datetime(value): @@ -140,19 +137,19 @@ async def migrate_collection( mongo_db[collection_name].find().skip(skip).limit(BATCH_SIZE) ), ) - async with session.begin(): - for document in batch: - if association_model: - ( - transformed_document, - associated_entities, - ) = await transformation_func(document) - session.add(model_class(**transformed_document)) - for assoc_entity in associated_entities: - session.add(association_model(**assoc_entity)) - else: - transformed_document = await transformation_func(document) - session.add(model_class(**transformed_document)) - migrated_docs += 1 - await session.commit() + for document in batch: + if association_model: + ( + transformed_document, + associated_entities, + ) = await transformation_func(document) + session.add(model_class(**transformed_document)) + for assoc_entity in associated_entities: + session.add(association_model(**assoc_entity)) + else: + transformed_document = await transformation_func(document) + session.add(model_class(**transformed_document)) + await session.commit() + migrated_docs += 1 + update_migration_report(collection_name, total_docs, migrated_docs) From 405a1f6631b30e0c47145d7c1045afab5ccd6f84 Mon Sep 17 00:00:00 2001 From: aakrem Date: Sun, 23 Jun 2024 16:01:01 +0200 Subject: [PATCH 06/34] add db name and create db on postgres startup if doesn't exist --- docker-assets/postgres/init-db.sql | 1 + docker-compose.yml | 13 ++++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) create mode 100644 docker-assets/postgres/init-db.sql diff --git a/docker-assets/postgres/init-db.sql b/docker-assets/postgres/init-db.sql new file mode 100644 index 000000000..9bd429417 --- /dev/null +++ b/docker-assets/postgres/init-db.sql @@ -0,0 +1 @@ +CREATE DATABASE agenta_oss; diff --git a/docker-compose.yml b/docker-compose.yml index 59babfa63..c65c10268 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,7 +15,7 @@ services: build: ./agenta-backend environment: - MONGODB_URI=mongodb://username:password@mongo:27017 - - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432 + - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432/agenta_oss - REDIS_URL=redis://redis:6379/0 - ENVIRONMENT=development - DATABASE_MODE=v2 @@ -146,7 +146,7 @@ services: command: > watchmedo auto-restart --directory=./agenta_backend --pattern=*.py --recursive -- celery -A agenta_backend.main.celery_app worker --concurrency=1 --loglevel=INFO environment: - - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432 + - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432/agenta_oss - MONGODB_URI=mongodb://username:password@mongo:27017 - REDIS_URL=redis://redis:6379/0 - CELERY_BROKER_URL=amqp://guest@rabbitmq// @@ -172,12 +172,14 @@ services: environment: POSTGRES_USER: username POSTGRES_PASSWORD: password + POSTGRES_DB: agenta_oss ports: - "5432:5432" networks: - agenta-network volumes: - - postgresdb-data:/var/lib/postgresql/data/ + - postgresdb-data:/var/lib/postgresql/data/ + - ./docker-assets/postgres/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 10s @@ -194,12 +196,13 @@ services: PGADMIN_SERVER_PORT: 5432 PGADMIN_SERVER_USER: "username" PGADMIN_SERVER_PASSWORD: "password" + PGADMIN_SERVER_DB: agenta_oss ports: - "5050:80" networks: - agenta-network volumes: - - pgadmin-data:/var/lib/pgadmin + - pgadmin-data:/var/lib/pgadmin depends_on: postgres: condition: service_healthy @@ -213,4 +216,4 @@ volumes: redis_data: nextjs_cache: postgresdb-data: - pgadmin-data: \ No newline at end of file + pgadmin-data: From c96d25a347415027e56ee2d61612cd1eed5368ac Mon Sep 17 00:00:00 2001 From: aakrem Date: Mon, 24 Jun 2024 12:21:55 +0200 Subject: [PATCH 07/34] remove comment from chatgpt --- .../agenta_backend/migrations/mongo_to_postgres/migration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py index 952925681..8784e808f 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -9,7 +9,6 @@ from sqlalchemy.dialects.postgresql import UUID import uuid_utils.compat as uuid -# Assuming agenta_backend.models.db_models contains your SQLAlchemy models from agenta_backend.models.db_models import ( UserDB, ImageDB, From 1417acaa77a362232ab62c3b4454b94e81db323b Mon Sep 17 00:00:00 2001 From: aakrem Date: Mon, 24 Jun 2024 12:28:01 +0200 Subject: [PATCH 08/34] add separated messages --- .../agenta_backend/migrations/mongo_to_postgres/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index abe759acb..ba83cb026 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -37,6 +37,7 @@ async def drop_all_tables(): # Drop all tables with CASCADE option for table in reversed(Base.metadata.sorted_tables): await conn.execute(text(f"DROP TABLE IF EXISTS {table.name} CASCADE")) + print("All tables are dropped.") async def create_all_tables(tables): @@ -45,7 +46,7 @@ async def create_all_tables(tables): for table in tables: print(f"====================== Creating table for {table.__name__}") await conn.run_sync(table.metadata.create_all) - print("All tables dropped and created.") + print("All tables are created.") async def store_mapping(table_name, mongo_id, uuid): From 808439c5be4967e91454a07addb8a496fd94f8ff Mon Sep 17 00:00:00 2001 From: aakrem Date: Mon, 24 Jun 2024 15:45:06 +0200 Subject: [PATCH 09/34] add printing traceback --- .../agenta_backend/migrations/mongo_to_postgres/migration.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py index 8784e808f..ba525c106 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -511,8 +511,11 @@ async def main(): print("Migration completed successfully.") except Exception as e: + import traceback + print(f"\n====================== Error ======================\n") print(f"Error occurred: {e}") + traceback.print_exc() finally: print_migration_report() From 316fa202ceb45de30c983dd08b1c7f4eb2b2369e Mon Sep 17 00:00:00 2001 From: aakrem Date: Mon, 24 Jun 2024 16:27:36 +0200 Subject: [PATCH 10/34] improve migration output --- .../migrations/mongo_to_postgres/migration.py | 2 +- .../migrations/mongo_to_postgres/utils.py | 21 ++++++++++++------- agenta-backend/poetry.lock | 5 +++-- agenta-backend/pyproject.toml | 1 + 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py index ba525c106..9a7826406 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -508,7 +508,7 @@ async def main(): transform_evaluation_scenario, EvaluationScenarioResultDB, ) - + print("\n ========================================================") print("Migration completed successfully.") except Exception as e: import traceback diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index ba83cb026..08dab2a26 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -1,6 +1,7 @@ import os import asyncio from datetime import datetime, timezone +from tqdm import tqdm from pymongo import MongoClient from bson import ObjectId, DBRef @@ -37,16 +38,16 @@ async def drop_all_tables(): # Drop all tables with CASCADE option for table in reversed(Base.metadata.sorted_tables): await conn.execute(text(f"DROP TABLE IF EXISTS {table.name} CASCADE")) - print("All tables are dropped.") + print("\n====================== All tables are dropped.\n") async def create_all_tables(tables): """Create all tables in the database.""" async with db_engine.engine.begin() as conn: for table in tables: - print(f"====================== Creating table for {table.__name__}") + print(f"Creating table for {table.__name__}") await conn.run_sync(table.metadata.create_all) - print("All tables are created.") + print("\n====================== All tables are created.\n") async def store_mapping(table_name, mongo_id, uuid): @@ -86,7 +87,9 @@ def update_migration_report(collection_name, total_docs, migrated_docs): def print_migration_report(): - print("\n====================== Migration Report ======================") + print( + "\n ============================ Migration Report ============================" + ) # Headers headers = ["Table", "Total in MongoDB", "Migrated to PostgreSQL"] @@ -124,14 +127,16 @@ async def migrate_collection( collection_name, model_class, transformation_func, association_model=None ): """General function to migrate a collection to a SQL table.""" - print( - f"\n====================== Migrating {collection_name}... ======================\n" - ) + print(f"\n") total_docs = mongo_db[collection_name].count_documents({}) migrated_docs = 0 async with db_engine.get_session() as session: - for skip in range(0, total_docs, BATCH_SIZE): + for skip in tqdm( + range(0, total_docs, BATCH_SIZE), + total=(total_docs - 1) // BATCH_SIZE + 1, + desc=f"Migrating: {collection_name}", + ): batch = await asyncio.get_event_loop().run_in_executor( None, lambda: list( diff --git a/agenta-backend/poetry.lock b/agenta-backend/poetry.lock index 20bb34137..16b80ad6c 100644 --- a/agenta-backend/poetry.lock +++ b/agenta-backend/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "aioboto3" @@ -2700,6 +2700,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -3899,4 +3900,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "7118cded062bfcd960f08cc272d4eab330222cc1e51e9e03e7b728931642e0b2" +content-hash = "27f39a7d172ca89900f8feb0fd495ddc55977f205571823a19842417976be8c2" diff --git a/agenta-backend/pyproject.toml b/agenta-backend/pyproject.toml index d9a714281..cf624168c 100644 --- a/agenta-backend/pyproject.toml +++ b/agenta-backend/pyproject.toml @@ -39,6 +39,7 @@ asyncpg = "^0.29.0" psycopg2-binary = "^2.9.9" uuid-utils = "^0.7.0" sqlalchemy-json = "^0.7.0" +tqdm = "^4.66.4" [tool.poetry.group.dev.dependencies] pytest = "^7.3.1" From 92145d3cd10ef7c632c208fc4d85b25f6ca6e271 Mon Sep 17 00:00:00 2001 From: aakrem Date: Mon, 24 Jun 2024 16:47:30 +0200 Subject: [PATCH 11/34] add filtering by table --- .../migrations/mongo_to_postgres/migration.py | 107 ++++++++++-------- .../migrations/mongo_to_postgres/utils.py | 9 +- 2 files changed, 66 insertions(+), 50 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py index 9a7826406..b2fd594e5 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -84,7 +84,7 @@ async def transform_user(user): async def transform_image(image): user_uuid = await get_mapped_uuid( - image["user"].id if isinstance(image["user"], DBRef) else image["user"] + "users", image["user"].id if isinstance(image["user"], DBRef) else image["user"] ) image_uuid = generate_uuid() await store_mapping("docker_images", image["_id"], image_uuid) @@ -102,7 +102,7 @@ async def transform_image(image): async def transform_app(app): - user_uuid = await get_mapped_uuid(app["user"].id) + user_uuid = await get_mapped_uuid("users", app["user"].id) app_uuid = generate_uuid() await store_mapping("app_db", app["_id"], app_uuid) return { @@ -115,8 +115,8 @@ async def transform_app(app): async def transform_deployment(deployment): - app_uuid = await get_mapped_uuid(deployment["app"].id) - user_uuid = await get_mapped_uuid(deployment["user"].id) + app_uuid = await get_mapped_uuid("app_db", deployment["app"].id) + user_uuid = await get_mapped_uuid("users", deployment["user"].id) deployment_uuid = generate_uuid() await store_mapping("deployments", deployment["_id"], deployment_uuid) return { @@ -133,10 +133,12 @@ async def transform_deployment(deployment): async def transform_variant_base(base): - app_uuid = await get_mapped_uuid(base["app"].id) - user_uuid = await get_mapped_uuid(base["user"].id) - image_uuid = await get_mapped_uuid(base["image"].id) - deployment_uuid = base["deployment"] and await get_mapped_uuid(base["deployment"]) + app_uuid = await get_mapped_uuid("app_db", base["app"].id) + user_uuid = await get_mapped_uuid("users", base["user"].id) + image_uuid = await get_mapped_uuid("docker_images", base["image"].id) + deployment_uuid = base["deployment"] and await get_mapped_uuid( + "deployments", base["deployment"] + ) base_uuid = generate_uuid() await store_mapping("bases", base["_id"], base_uuid) return { @@ -152,11 +154,11 @@ async def transform_variant_base(base): async def transform_app_variant(variant): - app_uuid = await get_mapped_uuid(variant["app"].id) - image_uuid = await get_mapped_uuid(variant["image"].id) - user_uuid = await get_mapped_uuid(variant["user"].id) - modified_by_uuid = await get_mapped_uuid(variant["modified_by"].id) - base_uuid = await get_mapped_uuid(variant["base"].id) + app_uuid = await get_mapped_uuid("app_db", variant["app"].id) + image_uuid = await get_mapped_uuid("docker_images", variant["image"].id) + user_uuid = await get_mapped_uuid("users", variant["user"].id) + modified_by_uuid = await get_mapped_uuid("users", variant["modified_by"].id) + base_uuid = await get_mapped_uuid("bases", variant["base"].id) variant_uuid = generate_uuid() await store_mapping("app_variants", variant["_id"], variant_uuid) return { @@ -177,9 +179,9 @@ async def transform_app_variant(variant): async def transform_app_variant_revision(revision): - variant_uuid = await get_mapped_uuid(revision["variant"].id) - modified_by_uuid = await get_mapped_uuid(revision["modified_by"].id) - base_uuid = await get_mapped_uuid(revision["base"].id) + variant_uuid = await get_mapped_uuid("app_variants", revision["variant"].id) + modified_by_uuid = await get_mapped_uuid("users", revision["modified_by"].id) + base_uuid = await get_mapped_uuid("bases", revision["base"].id) revision_uuid = generate_uuid() await store_mapping("app_variant_revisions", revision["_id"], revision_uuid) return { @@ -196,11 +198,15 @@ async def transform_app_variant_revision(revision): async def transform_app_environment(environment): - app_uuid = await get_mapped_uuid(environment["app"].id) - user_uuid = await get_mapped_uuid(environment["user"].id) - variant_uuid = await get_mapped_uuid(environment["deployed_app_variant"]) - revision_uuid = await get_mapped_uuid(environment["deployed_app_variant_revision"]) - deployment_uuid = await get_mapped_uuid(environment["deployment"]) + app_uuid = await get_mapped_uuid("app_db", environment["app"].id) + user_uuid = await get_mapped_uuid("users", environment["user"].id) + variant_uuid = await get_mapped_uuid( + "app_variants", environment["deployed_app_variant"] + ) + revision_uuid = await get_mapped_uuid( + "app_variant_revisions", environment["deployed_app_variant_revision"] + ) + deployment_uuid = await get_mapped_uuid("deployments", environment["deployment"]) environment_uuid = generate_uuid() await store_mapping("environments", environment["_id"], environment_uuid) return { @@ -217,12 +223,12 @@ async def transform_app_environment(environment): async def transform_app_environment_revision(revision): - environment_uuid = await get_mapped_uuid(revision["environment"].id) - modified_by_uuid = await get_mapped_uuid(revision["modified_by"].id) + environment_uuid = await get_mapped_uuid("environments", revision["environment"].id) + modified_by_uuid = await get_mapped_uuid("users", revision["modified_by"].id) variant_revision_uuid = await get_mapped_uuid( - revision["deployed_app_variant_revision"] + "app_variant_revisions", revision["deployed_app_variant_revision"] ) - deployment_uuid = await get_mapped_uuid(revision["deployment"]) + deployment_uuid = await get_mapped_uuid("deployments", revision["deployment"]) revision_uuid = generate_uuid() await store_mapping("environments_revisions", revision["_id"], revision_uuid) return { @@ -240,7 +246,6 @@ async def transform_template(template): template_uuid = generate_uuid() await store_mapping("templates", template["_id"], template_uuid) - # Ensure type is correctly mapped to TemplateType enum template_type = ( TemplateType(template["type"]) if "type" in template else TemplateType.IMAGE ) @@ -261,8 +266,8 @@ async def transform_template(template): async def transform_test_set(test_set): - app_uuid = await get_mapped_uuid(test_set["app"].id) - user_uuid = await get_mapped_uuid(test_set["user"].id) + app_uuid = await get_mapped_uuid("app_db", test_set["app"].id) + user_uuid = await get_mapped_uuid("users", test_set["user"].id) test_set_uuid = generate_uuid() await store_mapping("testsets", test_set["_id"], test_set_uuid) return { @@ -277,8 +282,8 @@ async def transform_test_set(test_set): async def transform_evaluator_config(config): - app_uuid = await get_mapped_uuid(config["app"].id) - user_uuid = await get_mapped_uuid(config["user"].id) + app_uuid = await get_mapped_uuid("app_db", config["app"].id) + user_uuid = await get_mapped_uuid("users", config["user"].id) config_uuid = generate_uuid() await store_mapping("evaluators_configs", config["_id"], config_uuid) return { @@ -299,8 +304,8 @@ async def convert_human_evaluations_associated_variants( """Convert variant and revision ObjectIds to UUIDs and structure them.""" associated_variants = [] for variant_id, revision_id in zip(variants, variants_revisions): - variant_uuid = await get_mapped_uuid(variant_id) - revision_uuid = await get_mapped_uuid(revision_id) + variant_uuid = await get_mapped_uuid("app_variants", variant_id) + revision_uuid = await get_mapped_uuid("app_variant_revisions", revision_id) associated_variants.append( { "human_evaluation_id": evaluation_id, @@ -312,9 +317,9 @@ async def convert_human_evaluations_associated_variants( async def transform_human_evaluation(evaluation): - app_uuid = await get_mapped_uuid(evaluation["app"].id) - user_uuid = await get_mapped_uuid(evaluation["user"].id) - test_set_uuid = await get_mapped_uuid(evaluation["testset"].id) + app_uuid = await get_mapped_uuid("app_db", evaluation["app"].id) + user_uuid = await get_mapped_uuid("users", evaluation["user"].id) + test_set_uuid = await get_mapped_uuid("testsets", evaluation["testset"].id) evaluation_uuid = generate_uuid() await store_mapping("human_evaluations", evaluation["_id"], evaluation_uuid) @@ -338,8 +343,10 @@ async def transform_human_evaluation(evaluation): async def transform_human_evaluation_scenario(scenario): - user_uuid = await get_mapped_uuid(scenario["user"].id) - evaluation_uuid = await get_mapped_uuid(scenario["evaluation"].id) + user_uuid = await get_mapped_uuid("users", scenario["user"].id) + evaluation_uuid = await get_mapped_uuid( + "human_evaluations", scenario["evaluation"].id + ) scenario_uuid = generate_uuid() await store_mapping("human_evaluations_scenarios", scenario["_id"], scenario_uuid) return { @@ -362,7 +369,9 @@ async def convert_aggregated_results(results, evaluation_id): """Convert evaluator_config ObjectIds in aggregated_results to UUIDs and structure them.""" aggregated_results = [] for result in results: - evaluator_config_uuid = await get_mapped_uuid(result["evaluator_config"]) + evaluator_config_uuid = await get_mapped_uuid( + "evaluators_configs", result["evaluator_config"] + ) result_uuid = generate_uuid() aggregated_results.append( { @@ -379,7 +388,9 @@ async def convert_scenario_aggregated_results(results, scenario_id): """Convert evaluator_config ObjectIds in scenario aggregated_results to UUIDs and structure them.""" scenario_aggregated_results = [] for result in results: - evaluator_config_uuid = await get_mapped_uuid(result["evaluator_config"]) + evaluator_config_uuid = await get_mapped_uuid( + "evaluators_configs", result["evaluator_config"] + ) result_uuid = generate_uuid() scenario_aggregated_results.append( { @@ -393,11 +404,13 @@ async def convert_scenario_aggregated_results(results, scenario_id): async def transform_evaluation(evaluation): - app_uuid = await get_mapped_uuid(evaluation["app"].id) - user_uuid = await get_mapped_uuid(evaluation["user"].id) - test_set_uuid = await get_mapped_uuid(evaluation["testset"].id) - variant_uuid = await get_mapped_uuid(evaluation["variant"]) - revision_uuid = await get_mapped_uuid(evaluation["variant_revision"]) + app_uuid = await get_mapped_uuid("app_db", evaluation["app"].id) + user_uuid = await get_mapped_uuid("users", evaluation["user"].id) + test_set_uuid = await get_mapped_uuid("testsets", evaluation["testset"].id) + variant_uuid = await get_mapped_uuid("app_variants", evaluation["variant"]) + revision_uuid = await get_mapped_uuid( + "app_variant_revisions", evaluation["variant_revision"] + ) evaluation_uuid = generate_uuid() await store_mapping("evaluations", evaluation["_id"], evaluation_uuid) @@ -425,9 +438,9 @@ async def transform_evaluation(evaluation): async def transform_evaluation_scenario(scenario): - user_uuid = await get_mapped_uuid(scenario["user"].id) - evaluation_uuid = await get_mapped_uuid(scenario["evaluation"].id) - variant_uuid = await get_mapped_uuid(scenario["variant_id"]) + user_uuid = await get_mapped_uuid("users", scenario["user"].id) + evaluation_uuid = await get_mapped_uuid("evaluations", scenario["evaluation"].id) + variant_uuid = await get_mapped_uuid("app_variants", scenario["variant_id"]) scenario_uuid = generate_uuid() await store_mapping("evaluation_scenarios", scenario["_id"], scenario_uuid) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index 08dab2a26..45e74355d 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -61,10 +61,13 @@ async def store_mapping(table_name, mongo_id, uuid): await session.commit() -async def get_mapped_uuid(mongo_id): - """Retrieve the mapped UUID for a given MongoDB ObjectId.""" +async def get_mapped_uuid(table_name, mongo_id): + """Retrieve the mapped UUID for a given MongoDB ObjectId and table name.""" async with db_engine.get_session() as session: - stmt = select(IDsMappingDB.uuid).filter(IDsMappingDB.objectid == str(mongo_id)) + stmt = select(IDsMappingDB.uuid).filter( + IDsMappingDB.table_name == table_name, + IDsMappingDB.objectid == str(mongo_id), + ) result = await session.execute(stmt) row = result.first() return row[0] if row else None From 74ba503b314fc6792c46d8bb39cc6a50b64f4b94 Mon Sep 17 00:00:00 2001 From: aakrem Date: Mon, 24 Jun 2024 16:52:01 +0200 Subject: [PATCH 12/34] replace id with _id --- .../agenta_backend/migrations/mongo_to_postgres/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index 45e74355d..c9c55702b 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -52,10 +52,10 @@ async def create_all_tables(tables): async def store_mapping(table_name, mongo_id, uuid): """Store the mapping of MongoDB ObjectId to UUID in the mapping table.""" - id = generate_uuid() + id_ = generate_uuid() async with db_engine.get_session() as session: mapping = IDsMappingDB( - id=id, table_name=table_name, objectid=str(mongo_id), uuid=uuid + id=id_, table_name=table_name, objectid=str(mongo_id), uuid=uuid ) session.add(mapping) await session.commit() From 4c48c403b7a2caa42e73dde64c02002001623631 Mon Sep 17 00:00:00 2001 From: aakrem Date: Mon, 24 Jun 2024 17:18:48 +0200 Subject: [PATCH 13/34] fix base --- .../agenta_backend/migrations/mongo_to_postgres/utils.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index c9c55702b..ce994d54d 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -14,10 +14,8 @@ from agenta_backend.models.db_engine import db_engine -from agenta_backend.models.db_models import ( - IDsMappingDB, - Base, -) +from agenta_backend.models.db_models import IDsMappingDB +from agenta_backend.models.base import Base BATCH_SIZE = 1000 From a950c4008c94cbcb5dd13544cc18f1904fb62d7e Mon Sep 17 00:00:00 2001 From: aakrem Date: Tue, 25 Jun 2024 08:54:43 +0200 Subject: [PATCH 14/34] add assertion to check length of variants and variants_revisions --- .../agenta_backend/migrations/mongo_to_postgres/migration.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py index b2fd594e5..894773293 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -303,6 +303,10 @@ async def convert_human_evaluations_associated_variants( ): """Convert variant and revision ObjectIds to UUIDs and structure them.""" associated_variants = [] + assert len(variants) == len( + variants_revisions + ), "variants and variants_revisions must have the same length" + for variant_id, revision_id in zip(variants, variants_revisions): variant_uuid = await get_mapped_uuid("app_variants", variant_id) revision_uuid = await get_mapped_uuid("app_variant_revisions", revision_id) From 1d201ab5329f244915c11b349b2daff06ac0040c Mon Sep 17 00:00:00 2001 From: aakrem Date: Tue, 25 Jun 2024 11:14:46 +0200 Subject: [PATCH 15/34] add migration steps --- .../agenta_backend/migrations/mongo_to_postgres/README.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 agenta-backend/agenta_backend/migrations/mongo_to_postgres/README.md diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/README.md b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/README.md new file mode 100644 index 000000000..0c6f6c4b8 --- /dev/null +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/README.md @@ -0,0 +1,6 @@ +```bash +docker ps +docker exec -it {backend-container-id} bash +cd /app/agenta_backend/migrations/mongo_to_postgres +python3 migration.py +``` \ No newline at end of file From 92fd40d742f63e8946cb843cb5ef89f969b37c9f Mon Sep 17 00:00:00 2001 From: aakrem Date: Tue, 25 Jun 2024 13:46:11 +0200 Subject: [PATCH 16/34] fix deployed_app_variant_revision --- .../migrations/mongo_to_postgres/migration.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py index 894773293..04780119e 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -203,9 +203,14 @@ async def transform_app_environment(environment): variant_uuid = await get_mapped_uuid( "app_variants", environment["deployed_app_variant"] ) - revision_uuid = await get_mapped_uuid( - "app_variant_revisions", environment["deployed_app_variant_revision"] - ) + + if environment["deployed_app_variant_revision"] is None: + revision_uuid = None + else: + revision_uuid = await get_mapped_uuid( + "app_variant_revisions", environment["deployed_app_variant_revision"].id + ) + deployment_uuid = await get_mapped_uuid("deployments", environment["deployment"]) environment_uuid = generate_uuid() await store_mapping("environments", environment["_id"], environment_uuid) From 7d016c3f61a2b395be60e440bec1e4b83af7da90 Mon Sep 17 00:00:00 2001 From: Mahmoud Mabrouk Date: Tue, 25 Jun 2024 16:29:35 +0200 Subject: [PATCH 17/34] fix(tool): [bug] Incorrect Configuration for Variants fix migration fixed migation for app variant parameters Closes: [bug] Incorrect Configuration for Variants --- .../agenta_backend/migrations/mongo_to_postgres/migration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py index 04780119e..a41a84b2b 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -172,7 +172,7 @@ async def transform_app_variant(variant): "base_name": variant.get("base_name"), "base_id": base_uuid, "config_name": variant["config_name"], - "config_parameters": variant["config"], + "config_parameters": variant["config"]["parameters"], "created_at": get_datetime(variant.get("created_at")), "updated_at": get_datetime(variant.get("updated_at")), } From 582b60d0b3fd27ea7b6fd37a89ca22069eaed167 Mon Sep 17 00:00:00 2001 From: aakrem Date: Tue, 25 Jun 2024 18:16:00 +0200 Subject: [PATCH 18/34] fix human evaluation variants results --- .../agenta_backend/migrations/mongo_to_postgres/migration.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py index a41a84b2b..6d65bf071 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -356,7 +356,9 @@ async def transform_human_evaluation_scenario(scenario): evaluation_uuid = await get_mapped_uuid( "human_evaluations", scenario["evaluation"].id ) + variant_uuid = str(await get_mapped_uuid("app_variants", scenario["vote"])) scenario_uuid = generate_uuid() + await store_mapping("human_evaluations_scenarios", scenario["_id"], scenario_uuid) return { "id": scenario_uuid, @@ -364,7 +366,7 @@ async def transform_human_evaluation_scenario(scenario): "evaluation_id": evaluation_uuid, "inputs": scenario["inputs"], "outputs": scenario["outputs"], - "vote": scenario.get("vote"), + "vote": variant_uuid, "score": scenario.get("score"), "correct_answer": scenario.get("correct_answer"), "created_at": get_datetime(scenario.get("created_at")), From b396da1b01d82cd299b1c5e6cdcdecc8bfe9390f Mon Sep 17 00:00:00 2001 From: aakrem Date: Tue, 25 Jun 2024 18:25:16 +0200 Subject: [PATCH 19/34] fix tqdm progress length --- .../agenta_backend/migrations/mongo_to_postgres/migration.py | 2 +- .../agenta_backend/migrations/mongo_to_postgres/utils.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py index 6d65bf071..d2d595873 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -532,7 +532,7 @@ async def main(): transform_evaluation_scenario, EvaluationScenarioResultDB, ) - print("\n ========================================================") + print("\n========================================================") print("Migration completed successfully.") except Exception as e: import traceback diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index ce994d54d..ac9f80e76 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -137,6 +137,7 @@ async def migrate_collection( range(0, total_docs, BATCH_SIZE), total=(total_docs - 1) // BATCH_SIZE + 1, desc=f"Migrating: {collection_name}", + ncols=85, ): batch = await asyncio.get_event_loop().run_in_executor( None, From 0c6520fdf9cf4f2188a5dd1297eede6158bb066b Mon Sep 17 00:00:00 2001 From: aakrem Date: Wed, 26 Jun 2024 10:48:38 +0200 Subject: [PATCH 20/34] move all previous migrations to a separate folder --- .../v0_10_0_to_v0_11_0/20240131130415_updating_app_environment.py | 0 .../20240131132738_create_app_environment_revision.py | 0 .../v0_11_0_to_v0_12_0/20240126100524_models_revamp.py | 0 .../v0_11_0_to_v0_12_0/20240126144938_drop_organization_model.py | 0 .../20240509122536_evaluation_scenario_correct_answer.py | 0 .../v0_16_0_1_to_v0_16_0_2/20240603145957_code_evaluators.py | 0 .../{ => mongo}/v0_16_0_to_v0_16_0_1/20240602150517_evaluators.py | 0 .../v0_7_0_to_v0_8_0/20240110001454_initial_migration.py | 0 .../20240110132547_create_exact_match_evaluator_config.py | 0 .../v0_7_0_to_v0_8_0/20240110165900_evaluations_revamp.py | 0 .../20240112120721_evaluation_scenarios_revamp.py | 0 .../20240112120740_human_a_b_evaluation_scenarios.py | 0 .../20240112120800_human_single_model_evaluation_scenarios.py | 0 .../20240113131802_new_evaluation_results_aggregation.py | 0 .../20240113204909_change_odmantic_reference_to_link.py | 0 .../v0_8_0_to_v0_9_0/20240130133518_updating_app_variant.py | 0 .../20240130133544_create_app_variant_revision.py | 0 .../v0_8_0_to_v0_9_0/20240130133603_updating_app_environment.py | 0 .../v0_8_0_to_v0_9_0/20240130140202_connection_evaluation.py | 0 .../v0_9_0_to_v10_0_0/20240124174954_evaluation_error_handling.py | 0 .../20240124225808_evaluation_scenario_error_handling.py | 0 21 files changed, 0 insertions(+), 0 deletions(-) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_10_0_to_v0_11_0/20240131130415_updating_app_environment.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_10_0_to_v0_11_0/20240131132738_create_app_environment_revision.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_11_0_to_v0_12_0/20240126100524_models_revamp.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_11_0_to_v0_12_0/20240126144938_drop_organization_model.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_12_0_to_v0_16_0/20240509122536_evaluation_scenario_correct_answer.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_16_0_1_to_v0_16_0_2/20240603145957_code_evaluators.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_16_0_to_v0_16_0_1/20240602150517_evaluators.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_7_0_to_v0_8_0/20240110001454_initial_migration.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_7_0_to_v0_8_0/20240110132547_create_exact_match_evaluator_config.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_7_0_to_v0_8_0/20240110165900_evaluations_revamp.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_7_0_to_v0_8_0/20240112120721_evaluation_scenarios_revamp.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_7_0_to_v0_8_0/20240112120740_human_a_b_evaluation_scenarios.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_7_0_to_v0_8_0/20240112120800_human_single_model_evaluation_scenarios.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_7_0_to_v0_8_0/20240113131802_new_evaluation_results_aggregation.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_7_0_to_v0_8_0/20240113204909_change_odmantic_reference_to_link.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_8_0_to_v0_9_0/20240130133518_updating_app_variant.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_8_0_to_v0_9_0/20240130133544_create_app_variant_revision.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_8_0_to_v0_9_0/20240130133603_updating_app_environment.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_8_0_to_v0_9_0/20240130140202_connection_evaluation.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_9_0_to_v10_0_0/20240124174954_evaluation_error_handling.py (100%) rename agenta-backend/agenta_backend/migrations/{ => mongo}/v0_9_0_to_v10_0_0/20240124225808_evaluation_scenario_error_handling.py (100%) diff --git a/agenta-backend/agenta_backend/migrations/v0_10_0_to_v0_11_0/20240131130415_updating_app_environment.py b/agenta-backend/agenta_backend/migrations/mongo/v0_10_0_to_v0_11_0/20240131130415_updating_app_environment.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_10_0_to_v0_11_0/20240131130415_updating_app_environment.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_10_0_to_v0_11_0/20240131130415_updating_app_environment.py diff --git a/agenta-backend/agenta_backend/migrations/v0_10_0_to_v0_11_0/20240131132738_create_app_environment_revision.py b/agenta-backend/agenta_backend/migrations/mongo/v0_10_0_to_v0_11_0/20240131132738_create_app_environment_revision.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_10_0_to_v0_11_0/20240131132738_create_app_environment_revision.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_10_0_to_v0_11_0/20240131132738_create_app_environment_revision.py diff --git a/agenta-backend/agenta_backend/migrations/v0_11_0_to_v0_12_0/20240126100524_models_revamp.py b/agenta-backend/agenta_backend/migrations/mongo/v0_11_0_to_v0_12_0/20240126100524_models_revamp.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_11_0_to_v0_12_0/20240126100524_models_revamp.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_11_0_to_v0_12_0/20240126100524_models_revamp.py diff --git a/agenta-backend/agenta_backend/migrations/v0_11_0_to_v0_12_0/20240126144938_drop_organization_model.py b/agenta-backend/agenta_backend/migrations/mongo/v0_11_0_to_v0_12_0/20240126144938_drop_organization_model.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_11_0_to_v0_12_0/20240126144938_drop_organization_model.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_11_0_to_v0_12_0/20240126144938_drop_organization_model.py diff --git a/agenta-backend/agenta_backend/migrations/v0_12_0_to_v0_16_0/20240509122536_evaluation_scenario_correct_answer.py b/agenta-backend/agenta_backend/migrations/mongo/v0_12_0_to_v0_16_0/20240509122536_evaluation_scenario_correct_answer.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_12_0_to_v0_16_0/20240509122536_evaluation_scenario_correct_answer.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_12_0_to_v0_16_0/20240509122536_evaluation_scenario_correct_answer.py diff --git a/agenta-backend/agenta_backend/migrations/v0_16_0_1_to_v0_16_0_2/20240603145957_code_evaluators.py b/agenta-backend/agenta_backend/migrations/mongo/v0_16_0_1_to_v0_16_0_2/20240603145957_code_evaluators.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_16_0_1_to_v0_16_0_2/20240603145957_code_evaluators.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_16_0_1_to_v0_16_0_2/20240603145957_code_evaluators.py diff --git a/agenta-backend/agenta_backend/migrations/v0_16_0_to_v0_16_0_1/20240602150517_evaluators.py b/agenta-backend/agenta_backend/migrations/mongo/v0_16_0_to_v0_16_0_1/20240602150517_evaluators.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_16_0_to_v0_16_0_1/20240602150517_evaluators.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_16_0_to_v0_16_0_1/20240602150517_evaluators.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110001454_initial_migration.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110001454_initial_migration.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110001454_initial_migration.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110001454_initial_migration.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110132547_create_exact_match_evaluator_config.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110132547_create_exact_match_evaluator_config.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110132547_create_exact_match_evaluator_config.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110132547_create_exact_match_evaluator_config.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110165900_evaluations_revamp.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110165900_evaluations_revamp.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110165900_evaluations_revamp.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110165900_evaluations_revamp.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120721_evaluation_scenarios_revamp.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120721_evaluation_scenarios_revamp.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120721_evaluation_scenarios_revamp.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120721_evaluation_scenarios_revamp.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120740_human_a_b_evaluation_scenarios.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120740_human_a_b_evaluation_scenarios.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120740_human_a_b_evaluation_scenarios.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120740_human_a_b_evaluation_scenarios.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120800_human_single_model_evaluation_scenarios.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120800_human_single_model_evaluation_scenarios.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120800_human_single_model_evaluation_scenarios.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120800_human_single_model_evaluation_scenarios.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240113131802_new_evaluation_results_aggregation.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240113131802_new_evaluation_results_aggregation.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240113131802_new_evaluation_results_aggregation.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240113131802_new_evaluation_results_aggregation.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240113204909_change_odmantic_reference_to_link.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240113204909_change_odmantic_reference_to_link.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240113204909_change_odmantic_reference_to_link.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240113204909_change_odmantic_reference_to_link.py diff --git a/agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133518_updating_app_variant.py b/agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133518_updating_app_variant.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133518_updating_app_variant.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133518_updating_app_variant.py diff --git a/agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133544_create_app_variant_revision.py b/agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133544_create_app_variant_revision.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133544_create_app_variant_revision.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133544_create_app_variant_revision.py diff --git a/agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133603_updating_app_environment.py b/agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133603_updating_app_environment.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133603_updating_app_environment.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133603_updating_app_environment.py diff --git a/agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130140202_connection_evaluation.py b/agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130140202_connection_evaluation.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130140202_connection_evaluation.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130140202_connection_evaluation.py diff --git a/agenta-backend/agenta_backend/migrations/v0_9_0_to_v10_0_0/20240124174954_evaluation_error_handling.py b/agenta-backend/agenta_backend/migrations/mongo/v0_9_0_to_v10_0_0/20240124174954_evaluation_error_handling.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_9_0_to_v10_0_0/20240124174954_evaluation_error_handling.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_9_0_to_v10_0_0/20240124174954_evaluation_error_handling.py diff --git a/agenta-backend/agenta_backend/migrations/v0_9_0_to_v10_0_0/20240124225808_evaluation_scenario_error_handling.py b/agenta-backend/agenta_backend/migrations/mongo/v0_9_0_to_v10_0_0/20240124225808_evaluation_scenario_error_handling.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_9_0_to_v10_0_0/20240124225808_evaluation_scenario_error_handling.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_9_0_to_v10_0_0/20240124225808_evaluation_scenario_error_handling.py From 1002e642ec4aead5f0bd1f1700302e230aba10c8 Mon Sep 17 00:00:00 2001 From: aakrem Date: Wed, 26 Jun 2024 10:49:24 +0200 Subject: [PATCH 21/34] add backup --- agenta-backend/agenta_backend/migrations/{ => mongo}/backup.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename agenta-backend/agenta_backend/migrations/{ => mongo}/backup.py (100%) diff --git a/agenta-backend/agenta_backend/migrations/backup.py b/agenta-backend/agenta_backend/migrations/mongo/backup.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/backup.py rename to agenta-backend/agenta_backend/migrations/mongo/backup.py From 49c8f9dd0f49319675c9e3c8feb10fcc70b346d9 Mon Sep 17 00:00:00 2001 From: aakrem Date: Thu, 27 Jun 2024 11:30:20 +0200 Subject: [PATCH 22/34] add assertion to prevent duplicated values --- .../migrations/mongo_to_postgres/utils.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index ac9f80e76..38ff517f2 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -11,7 +11,7 @@ from sqlalchemy.orm import sessionmaker import uuid_utils.compat as uuid from sqlalchemy.future import select - +from sqlalchemy.exc import NoResultFound from agenta_backend.models.db_engine import db_engine from agenta_backend.models.db_models import IDsMappingDB @@ -59,7 +59,7 @@ async def store_mapping(table_name, mongo_id, uuid): await session.commit() -async def get_mapped_uuid(table_name, mongo_id): +async def get_mapped_uuid(table_name, mongo_id, print_result=False): """Retrieve the mapped UUID for a given MongoDB ObjectId and table name.""" async with db_engine.get_session() as session: stmt = select(IDsMappingDB.uuid).filter( @@ -67,8 +67,11 @@ async def get_mapped_uuid(table_name, mongo_id): IDsMappingDB.objectid == str(mongo_id), ) result = await session.execute(stmt) - row = result.first() - return row[0] if row else None + try: + row = result.one() + except NoResultFound: + return None + return row[0] def get_datetime(value): From fca96ea34a988697786831d6b87f40fea6403f25 Mon Sep 17 00:00:00 2001 From: aakrem Date: Sun, 30 Jun 2024 16:23:17 +0200 Subject: [PATCH 23/34] support skipping documents in case of duplicated values --- .../migrations/mongo_to_postgres/utils.py | 68 +++++++++++++------ 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index 38ff517f2..04dc8a08e 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -1,5 +1,6 @@ import os import asyncio +import asyncpg from datetime import datetime, timezone from tqdm import tqdm @@ -13,6 +14,7 @@ from sqlalchemy.future import select from sqlalchemy.exc import NoResultFound from agenta_backend.models.db_engine import db_engine +from sqlalchemy.exc import IntegrityError from agenta_backend.models.db_models import IDsMappingDB from agenta_backend.models.base import Base @@ -59,7 +61,7 @@ async def store_mapping(table_name, mongo_id, uuid): await session.commit() -async def get_mapped_uuid(table_name, mongo_id, print_result=False): +async def get_mapped_uuid(table_name, mongo_id): """Retrieve the mapped UUID for a given MongoDB ObjectId and table name.""" async with db_engine.get_session() as session: stmt = select(IDsMappingDB.uuid).filter( @@ -86,8 +88,12 @@ def generate_uuid(): return uuid.uuid7() -def update_migration_report(collection_name, total_docs, migrated_docs): - migration_report[collection_name] = {"total": total_docs, "migrated": migrated_docs} +def update_migration_report(collection_name, total_docs, migrated_docs, skipped_docs): + migration_report[collection_name] = { + "total": total_docs, + "migrated": migrated_docs, + "skipped": skipped_docs, + } def print_migration_report(): @@ -96,7 +102,7 @@ def print_migration_report(): ) # Headers - headers = ["Table", "Total in MongoDB", "Migrated to PostgreSQL"] + headers = ["Table", "Total in MongoDB", "Migrated to PostgreSQL", "Skipped"] if not migration_report: print("No data available in the migration report.") @@ -114,18 +120,25 @@ def print_migration_report(): len(headers[2]), max(len(str(counts["migrated"])) for counts in migration_report.values()), ) + max_skipped_length = max( + len(headers[3]), + max(len(str(counts.get("skipped", 0))) for counts in migration_report.values()), + ) # Set the header and divider with appropriate padding - table_header = f"| {headers[0].ljust(max_table_length)} | {headers[1].ljust(max_total_length)} | {headers[2].ljust(max_migrated_length)} |" - table_divider = f"|{'-' * (max_table_length + 2)}|{'-' * (max_total_length + 2)}|{'-' * (max_migrated_length + 2)}|" + table_header = f"| {headers[0].ljust(max_table_length)} | {headers[1].ljust(max_total_length)} | {headers[2].ljust(max_migrated_length)} | {headers[3].ljust(max_skipped_length)} |" + table_divider = f"|{'-' * (max_table_length + 2)}|{'-' * (max_total_length + 2)}|{'-' * (max_migrated_length + 2)}|{'-' * (max_skipped_length + 2)}|" print(table_header) print(table_divider) for table, counts in migration_report.items(): - table_row = f"| {table.ljust(max_table_length)} | {str(counts['total']).ljust(max_total_length)} | {str(counts['migrated']).ljust(max_migrated_length)} |" + skipped = counts.get("skipped", 0) + table_row = f"| {table.ljust(max_table_length)} | {str(counts['total']).ljust(max_total_length)} | {str(counts['migrated']).ljust(max_migrated_length)} | {str(skipped).ljust(max_skipped_length)} |" print(table_row) + print(table_divider) + async def migrate_collection( collection_name, model_class, transformation_func, association_model=None @@ -134,6 +147,7 @@ async def migrate_collection( print(f"\n") total_docs = mongo_db[collection_name].count_documents({}) migrated_docs = 0 + skipped_docs = 0 async with db_engine.get_session() as session: for skip in tqdm( @@ -149,18 +163,28 @@ async def migrate_collection( ), ) for document in batch: - if association_model: - ( - transformed_document, - associated_entities, - ) = await transformation_func(document) - session.add(model_class(**transformed_document)) - for assoc_entity in associated_entities: - session.add(association_model(**assoc_entity)) - else: - transformed_document = await transformation_func(document) - session.add(model_class(**transformed_document)) - await session.commit() - migrated_docs += 1 - - update_migration_report(collection_name, total_docs, migrated_docs) + try: + if association_model: + ( + transformed_document, + associated_entities, + ) = await transformation_func(document) + session.add(model_class(**transformed_document)) + for assoc_entity in associated_entities: + session.add(association_model(**assoc_entity)) + else: + transformed_document = await transformation_func(document) + session.add(model_class(**transformed_document)) + await session.commit() + migrated_docs += 1 + except (asyncpg.exceptions.UniqueViolationError, IntegrityError) as e: + await session.rollback() + print(f"\nSkipping duplicate document in {collection_name}: {e}\n") + skipped_docs += 1 + pass + except Exception as e: + print(f"Error migrating document in {collection_name}: {e}") + print(f"Failing migration for collection: {collection_name}") + raise + + update_migration_report(collection_name, total_docs, migrated_docs, skipped_docs) From 3fa794be21877e16cef71f05e8c1272319cb3242 Mon Sep 17 00:00:00 2001 From: aakrem Date: Sun, 30 Jun 2024 19:19:46 +0200 Subject: [PATCH 24/34] separate db engine for migration --- .../migrations/mongo_to_postgres/db_engine.py | 152 ++++++++++++++++++ .../migrations/mongo_to_postgres/utils.py | 2 +- 2 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 agenta-backend/agenta_backend/migrations/mongo_to_postgres/db_engine.py diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/db_engine.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/db_engine.py new file mode 100644 index 000000000..30aa53262 --- /dev/null +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/db_engine.py @@ -0,0 +1,152 @@ +import os +import logging +from asyncio import current_task +from typing import AsyncGenerator +from contextlib import asynccontextmanager + +from sqlalchemy.ext.asyncio import ( + AsyncSession, + create_async_engine, + async_sessionmaker, + async_scoped_session, +) + +from agenta_backend.utils.common import isCloudEE + +if isCloudEE(): + from agenta_backend.commons.observability.models.db import SpanDB + from agenta_backend.commons.models.db_models import ( + APIKeyDB, + WorkspaceDB, + OrganizationDB, + AppDB_ as AppDB, + UserDB_ as UserDB, + ImageDB_ as ImageDB, + TestSetDB_ as TestSetDB, + AppVariantDB_ as AppVariantDB, + EvaluationDB_ as EvaluationDB, + DeploymentDB_ as DeploymentDB, + VariantBaseDB_ as VariantBaseDB, + AppEnvironmentDB_ as AppEnvironmentDB, + AppEnvironmentRevisionDB_ as AppEnvironmentRevisionDB, + EvaluatorConfigDB_ as EvaluatorConfigDB, + HumanEvaluationDB_ as HumanEvaluationDB, + EvaluationScenarioDB_ as EvaluationScenarioDB, + HumanEvaluationScenarioDB_ as HumanEvaluationScenarioDB, + ) +else: + from agenta_backend.models.db_models import ( + AppDB, + UserDB, + ImageDB, + TestSetDB, + EvaluationDB, + DeploymentDB, + AppVariantDB, + VariantBaseDB, + AppEnvironmentDB, + AppEnvironmentRevisionDB, + EvaluatorConfigDB, + HumanEvaluationDB, + EvaluationScenarioDB, + HumanEvaluationScenarioDB, + ) + +from agenta_backend.models.db_models import ( + TemplateDB, + AppVariantRevisionsDB, +) + +models = [ + AppDB, + UserDB, + ImageDB, + TestSetDB, + TemplateDB, + AppVariantDB, + DeploymentDB, + EvaluationDB, + VariantBaseDB, + AppEnvironmentDB, + AppEnvironmentRevisionDB, + EvaluatorConfigDB, + HumanEvaluationDB, + EvaluationScenarioDB, + AppVariantRevisionsDB, + HumanEvaluationScenarioDB, +] + +if isCloudEE(): + models.extend([OrganizationDB, WorkspaceDB, APIKeyDB]) # type: ignore + + +# Configure and set logging level +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class DBEngine: + """ + Database engine to initialize SQLAlchemy and return the engine based on mode. + """ + + def __init__(self) -> None: + self.mode = os.environ.get("DATABASE_MODE", "v2") + self.db_url = f"{os.environ.get('POSTGRES_URI')}" + self.engine = create_async_engine(url=self.db_url) + self.async_session_maker = async_sessionmaker( + bind=self.engine, class_=AsyncSession, expire_on_commit=False + ) + self.async_session = async_scoped_session( + session_factory=self.async_session_maker, scopefunc=current_task + ) + + async def init_db(self): + """ + Initialize the database based on the mode and create all tables. + """ + async with self.engine.begin() as conn: + # Drop all existing tables (if needed) + # await conn.run_sync(Base.metadata.drop_all) + # Create tables + for model in models: + await conn.run_sync(model.metadata.create_all) + logger.info(f"Using {self.mode} database...") + + async def remove_db(self) -> None: + """ + Remove the database based on the mode. + """ + async with self.engine.begin() as conn: + for model in models: + await conn.run_sync(model.metadata.drop_all) + + @asynccontextmanager + async def get_session(self) -> AsyncGenerator[AsyncSession, None]: + session = self.async_session() + try: + yield session + except Exception as e: + await session.rollback() + raise e + finally: + await session.close() + + async def close(self): + """ + Closes and dispose all the connections using the engine. + + :raises Exception: if engine is initialized + """ + + if self.engine is None: + raise Exception("DBEngine is not initialized") + + await self.engine.dispose() + + self.engine = None + self.async_session_maker = None + self.async_session = None + + +db_engine = DBEngine() diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index 04dc8a08e..90bf2fbc9 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -13,7 +13,7 @@ import uuid_utils.compat as uuid from sqlalchemy.future import select from sqlalchemy.exc import NoResultFound -from agenta_backend.models.db_engine import db_engine +from agenta_backend.migrations.mongo_to_postgres.db_engine import db_engine from sqlalchemy.exc import IntegrityError from agenta_backend.models.db_models import IDsMappingDB From abc39ab93e9ccc503a82097cff0a2e4c0c811275 Mon Sep 17 00:00:00 2001 From: aakrem Date: Mon, 1 Jul 2024 20:08:10 +0200 Subject: [PATCH 25/34] small refactoring --- .../migrations/mongo_to_postgres/mongo_db_engine.py | 13 +++++++++++++ .../migrations/mongo_to_postgres/utils.py | 9 ++------- 2 files changed, 15 insertions(+), 7 deletions(-) create mode 100644 agenta-backend/agenta_backend/migrations/mongo_to_postgres/mongo_db_engine.py diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/mongo_db_engine.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/mongo_db_engine.py new file mode 100644 index 000000000..828a9753b --- /dev/null +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/mongo_db_engine.py @@ -0,0 +1,13 @@ +import os +from pymongo import MongoClient + +# MongoDB connection +MONGO_URI = os.environ.get("MONGODB_URI") +DATABASE_MODE = os.environ.get("DATABASE_MODE") +mongo_client = MongoClient(MONGO_URI) +mongo_db_name = f"agenta_{DATABASE_MODE}" +mongo_db = mongo_client[mongo_db_name] + + +def get_mongo_db(): + return mongo_db diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py index 90bf2fbc9..c4f5a0a0c 100644 --- a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -4,7 +4,6 @@ from datetime import datetime, timezone from tqdm import tqdm -from pymongo import MongoClient from bson import ObjectId, DBRef from sqlalchemy import MetaData, Column, String, DateTime, text, create_engine from sqlalchemy.dialects.postgresql import UUID @@ -18,15 +17,11 @@ from agenta_backend.models.db_models import IDsMappingDB from agenta_backend.models.base import Base +from agenta_backend.migrations.mongo_to_postgres.mongo_db_engine import get_mongo_db BATCH_SIZE = 1000 -# MongoDB connection -MONGO_URI = os.environ.get("MONGODB_URI") -DATABASE_MODE = os.environ.get("DATABASE_MODE") -mongo_client = MongoClient(MONGO_URI) -mongo_db_name = f"agenta_{DATABASE_MODE}" -mongo_db = mongo_client[mongo_db_name] +mongo_db = get_mongo_db() migration_report = {} From e37497361ab04124a31350cbb73398f3121c5f52 Mon Sep 17 00:00:00 2001 From: Abram Date: Mon, 1 Jul 2024 19:24:38 +0100 Subject: [PATCH 26/34] refactor (docs): moved and renamed migration.mdx to migration folder under self-host --- .../{migration.mdx => migration/migration-to-mongodb.mdx} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename docs/self-host/{migration.mdx => migration/migration-to-mongodb.mdx} (97%) diff --git a/docs/self-host/migration.mdx b/docs/self-host/migration/migration-to-mongodb.mdx similarity index 97% rename from docs/self-host/migration.mdx rename to docs/self-host/migration/migration-to-mongodb.mdx index 75d272cfd..d382b8558 100644 --- a/docs/self-host/migration.mdx +++ b/docs/self-host/migration/migration-to-mongodb.mdx @@ -1,5 +1,5 @@ --- -title: Migration +title: Migration to MongoDB (Deprecated) description: 'This is a step-by-step guide for upgrading to the latest version of Agenta' --- From f114e42f10fbc1071f33be4a43a89caf4f9957d8 Mon Sep 17 00:00:00 2001 From: Abram Date: Mon, 1 Jul 2024 19:24:56 +0100 Subject: [PATCH 27/34] feat (docs): created documentation for Postgres migration --- .../migration/migration-to-postgres.mdx | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 docs/self-host/migration/migration-to-postgres.mdx diff --git a/docs/self-host/migration/migration-to-postgres.mdx b/docs/self-host/migration/migration-to-postgres.mdx new file mode 100644 index 000000000..82c1227a4 --- /dev/null +++ b/docs/self-host/migration/migration-to-postgres.mdx @@ -0,0 +1,46 @@ +--- +title: Migration to PostgreSQL +description: 'This is a step-by-step instructions for migrating Agenta to the newly released PostgreSQL version.' +--- + +This guide provides step-by-step instructions for migrating your Agenta instance from MongoDB to the newly released PostgreSQL + +> ⚠️ As of version 0.11.0, Agenta is transitioning from MongoDB to PostgreSQL. Users need to migrate their MongoDB databases to this latest version, as this will be the only version receiving feature updates and patches. + +**Table of content:** + - [Prepare for Migration](#prepare-for-migration) + - [Start the Migration](#start-the-migration) + - [Post Migration](#post-migrataion) +
+ +### Prepare for Migration + +Before starting the migration, ensure that you have backed up your production data. + +While the migration will not modify any data in your MongoDB instance, it is highly recommended that you create a [backup](https://www.mongodb.com/docs/manual/tutorial/backup-and-restore-tools/) of your database in the MongoDB instance before running the migration script. This ensures you have a recovery point in case of any issues. + +### Start the Migration + +1. Start the local instance of Agenta, and ensure that both MongoDB and Postgres instances are active. +2. Use the following commands to initiate the migration: +```bash +docker ps +``` +The above command will list the running docker containers that you have. Copy the backend container id and execute bash. + +```bash +docker exec -it {backend-container-id} bash +``` + +Next, navigate to the `mongo_to_postgres` folder to execute the migration script. + +```bash +cd /app/agenta_backend/migrations/mongo_to_postgres +python3 migration.py +``` + +### Post Migration + +After completing the migration, ensure you check the data integrity in PostgreSQL by accessing Agenta on the web and verifying that your data is intact and everything works fine. + +In the event that you encounter issues and need to revert the migration, rest assured that your data in the MongoDB instance is still intact. All you need to do to revert is to check out the last commit you were on before the PostgreSQL migration and create a Github [issue](https://github.com/Agenta-AI/agenta/issues/new?assignees=&labels=postgres,bug,Backend&projects=&template=bug_report.md&title=[Bug]+) describing the problem you encountered. From 9d4b31ce66df3834197dc5528c1fcdc60250241e Mon Sep 17 00:00:00 2001 From: Abram Date: Mon, 1 Jul 2024 19:30:23 +0100 Subject: [PATCH 28/34] minor refactor (docs): replace tag with 'version' --- docs/self-host/migration/migration-to-postgres.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/self-host/migration/migration-to-postgres.mdx b/docs/self-host/migration/migration-to-postgres.mdx index 82c1227a4..2d47e151c 100644 --- a/docs/self-host/migration/migration-to-postgres.mdx +++ b/docs/self-host/migration/migration-to-postgres.mdx @@ -3,7 +3,7 @@ title: Migration to PostgreSQL description: 'This is a step-by-step instructions for migrating Agenta to the newly released PostgreSQL version.' --- -This guide provides step-by-step instructions for migrating your Agenta instance from MongoDB to the newly released PostgreSQL +This guide provides step-by-step instructions for migrating your Agenta instance from MongoDB to the newly released PostgreSQL version. > ⚠️ As of version 0.11.0, Agenta is transitioning from MongoDB to PostgreSQL. Users need to migrate their MongoDB databases to this latest version, as this will be the only version receiving feature updates and patches. From 573ca2a680c9fa10f6c851272043b45469686103 Mon Sep 17 00:00:00 2001 From: Abram Date: Tue, 2 Jul 2024 11:58:48 +0100 Subject: [PATCH 29/34] refactor (backend): replace deprecated legacy query api .one_or_none() to .first() --- .../agenta_backend/services/db_manager.py | 100 +++++++++--------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/agenta-backend/agenta_backend/services/db_manager.py b/agenta-backend/agenta_backend/services/db_manager.py index ffd153911..1bdca299f 100644 --- a/agenta-backend/agenta_backend/services/db_manager.py +++ b/agenta-backend/agenta_backend/services/db_manager.py @@ -166,7 +166,7 @@ async def get_image_by_id(image_id: str) -> ImageDB: result = await session.execute( select(ImageDB).filter_by(id=uuid.UUID(image_id)) ) - image = result.scalars().one_or_none() + image = result.scalars().first() return image @@ -187,7 +187,7 @@ async def fetch_app_by_id(app_id: str) -> AppDB: ) result = await session.execute(base_query) - app = result.unique().scalars().one_or_none() + app = result.unique().scalars().first() return app @@ -222,7 +222,7 @@ async def fetch_app_variant_by_id( ) result = await session.execute(query.filter_by(id=uuid.UUID(app_variant_id))) - app_variant = result.scalars().one_or_none() + app_variant = result.scalars().first() return app_variant @@ -242,7 +242,7 @@ async def fetch_app_variant_by_base_id(base_id: str) -> Optional[AppVariantDB]: result = await session.execute( select(AppVariantDB).filter_by(base_id=uuid.UUID(base_id)) ) - app_variant = result.scalars().one_or_none() + app_variant = result.scalars().first() return app_variant @@ -268,7 +268,7 @@ async def fetch_app_variant_by_base_id_and_config_name( base_id=uuid.UUID(base_id), config_name=config_name ) ) - app_variant = result.scalars().one_or_none() + app_variant = result.scalars().first() return app_variant @@ -294,7 +294,7 @@ async def fetch_app_variant_revision_by_variant( variant_id=uuid.UUID(app_variant_id), revision=revision ) ) - app_variant_revision = result.scalars().one_or_none() + app_variant_revision = result.scalars().first() if app_variant_revision is None: raise Exception( f"app variant revision for app_variant {app_variant_id} and revision {revision} not found" @@ -320,7 +320,7 @@ async def fetch_base_by_id(base_id: str) -> Optional[VariantBaseDB]: ) .filter_by(id=uuid.UUID(base_id)) ) - base = result.scalars().one_or_none() + base = result.scalars().first() return base @@ -343,7 +343,7 @@ async def fetch_app_variant_by_name_and_appid( variant_name=variant_name, app_id=uuid.UUID(app_id) ) ) - app_variant = result.scalars().one_or_none() + app_variant = result.scalars().first() return app_variant @@ -684,7 +684,7 @@ async def get_deployment_by_id( result = await session.execute( select(DeploymentDB).filter_by(id=uuid.UUID(deployment_id)) ) - deployment = result.scalars().one_or_none() + deployment = result.scalars().first() return deployment @@ -702,7 +702,7 @@ async def get_deployment_by_appid(app_id: str) -> DeploymentDB: result = await session.execute( select(DeploymentDB).filter_by(app_id=uuid.UUID(app_id)) ) - deployment = result.scalars().one_or_none() + deployment = result.scalars().first() logger.debug(f"deployment: {deployment}") return deployment @@ -781,7 +781,7 @@ async def get_user(user_uid: str) -> UserDB: async with db_engine.get_session() as session: result = await session.execute(select(UserDB).filter_by(uid=user_uid)) - user = result.scalars().one_or_none() + user = result.scalars().first() if user is None and isCloudEE(): raise Exception("Please login or signup") @@ -814,7 +814,7 @@ async def get_user_with_id(user_id: str): async with db_engine.get_session() as session: result = await session.execute(select(UserDB).filter_by(id=uuid.UUID(user_id))) - user = result.scalars().one_or_none() + user = result.scalars().first() if user is None: logger.error("Failed to get user with id") raise Exception("Error while getting user") @@ -844,7 +844,7 @@ async def get_user_with_email(email: str): async with db_engine.get_session() as session: result = await session.execute(select(UserDB).filter_by(email=email)) - user = result.scalars().one_or_none() + user = result.scalars().first() return user @@ -893,7 +893,7 @@ async def get_orga_image_instance_by_docker_id( ) result = await session.execute(query) - image = result.scalars().one_or_none() + image = result.scalars().first() return image @@ -931,7 +931,7 @@ async def get_orga_image_instance_by_uri( ) result = await session.execute(query) - image = result.scalars().one_or_none() + image = result.scalars().first() return image @@ -947,7 +947,7 @@ async def get_app_instance_by_id(app_id: str) -> AppDB: async with db_engine.get_session() as session: result = await session.execute(select(AppDB).filter_by(id=uuid.UUID(app_id))) - app = result.scalars().one_or_none() + app = result.scalars().first() return app @@ -1158,7 +1158,7 @@ async def remove_deployment(deployment_id: str): result = await session.execute( select(DeploymentDB).filter_by(id=uuid.UUID(deployment_id)) ) - deployment = result.scalars().one_or_none() + deployment = result.scalars().first() if not deployment: raise NoResultFound(f"Deployment with {deployment_id} not found") @@ -1254,7 +1254,7 @@ async def deploy_to_environment( app_id=app_variant_db.app_id, name=environment_name ) ) - environment_db = result.scalars().one_or_none() + environment_db = result.scalars().first() if environment_db is None: raise ValueError(f"Environment {environment_name} not found") @@ -1298,7 +1298,7 @@ async def fetch_app_environment_by_name_and_appid( joinedload(AppEnvironmentDB.deployed_app_variant.of_type(AppVariantDB)), # type: ignore ) result = await session.execute(query) - app_environment = result.scalars().one_or_none() + app_environment = result.scalars().first() return app_environment @@ -1318,7 +1318,7 @@ async def fetch_app_variant_revision_by_id( result = await session.execute( select(AppVariantRevisionsDB).filter_by(id=uuid.UUID(variant_revision_id)) ) - app_revision = result.scalars().one_or_none() + app_revision = result.scalars().first() return app_revision @@ -1359,7 +1359,7 @@ async def fetch_app_environment_revision(revision_id: str) -> AppEnvironmentRevi result = await session.execute( select(AppEnvironmentRevisionDB).filter_by(id=uuid.UUID(revision_id)) ) - environment_revision = result.scalars().one_or_none() + environment_revision = result.scalars().first() return environment_revision @@ -1398,7 +1398,7 @@ async def update_app_environment_deployed_variant_revision( id=uuid.UUID(deployed_variant_revision) ) ) - app_variant_revision = result.scalars().one_or_none() + app_variant_revision = result.scalars().first() if app_variant_revision is None: raise Exception( f"App variant revision {deployed_variant_revision} not found" @@ -1407,7 +1407,7 @@ async def update_app_environment_deployed_variant_revision( app_environment_result = await session.execute( select(AppEnvironmentDB).filter_by(id=uuid.UUID(app_environment_id)) ) - app_environment = app_environment_result.scalars().one_or_none() + app_environment = app_environment_result.scalars().first() app_environment.deployed_app_variant_revision_id = app_variant_revision.id # type: ignore await session.commit() @@ -1600,7 +1600,7 @@ async def fetch_app_variant_revision(app_variant: str, revision_number: int): ) # type: ignore ) result = await session.execute(query) - app_variant_revisions = result.scalars().one_or_none() + app_variant_revisions = result.scalars().first() return app_variant_revisions @@ -1645,7 +1645,7 @@ async def remove_image(image: ImageDB): async with db_engine.get_session() as session: result = await session.execute(select(ImageDB).filter_by(id=image.id)) - image = result.scalars().one_or_none() + image = result.scalars().first() await session.delete(image) await session.commit() @@ -1739,7 +1739,7 @@ async def remove_base_from_db(base_id: str): result = await session.execute( select(VariantBaseDB).filter_by(id=uuid.UUID(base_id)) ) - base = result.scalars().one_or_none() + base = result.scalars().first() if not base: raise NoResultFound(f"Base with id {base_id} not found") @@ -1764,7 +1764,7 @@ async def remove_app_by_id(app_id: str): assert app_id is not None, "app_id cannot be None" async with db_engine.get_session() as session: result = await session.execute(select(AppDB).filter_by(id=uuid.UUID(app_id))) - app_db = result.scalars().one_or_none() + app_db = result.scalars().first() if not app_db: raise NoResultFound(f"App with id {app_id} not found") @@ -1792,7 +1792,7 @@ async def update_variant_parameters( result = await session.execute( select(AppVariantDB).filter_by(id=uuid.UUID(app_variant_id)) ) - app_variant_db = result.scalars().one_or_none() + app_variant_db = result.scalars().first() if not app_variant_db: raise NoResultFound(f"App variant with id {app_variant_id} not found") @@ -1836,7 +1836,7 @@ async def get_app_variant_instance_by_id(variant_id: str) -> AppVariantDB: .options(joinedload(AppVariantDB.base), joinedload(AppVariantDB.app)) .filter_by(id=uuid.UUID(variant_id)) ) - app_variant_db = result.scalars().one_or_none() + app_variant_db = result.scalars().first() return app_variant_db @@ -1856,7 +1856,7 @@ async def get_app_variant_revision_by_id( result = await session.execute( select(AppVariantRevisionsDB).filter_by(id=uuid.UUID(variant_revision_id)) ) - variant_revision_db = result.scalars().one_or_none() + variant_revision_db = result.scalars().first() return variant_revision_db @@ -1878,7 +1878,7 @@ async def fetch_testset_by_id(testset_id: str) -> Optional[TestSetDB]: async with db_engine.get_session() as session: result = await session.execute(select(TestSetDB).filter_by(id=testset_uuid)) - testset = result.scalars().one_or_none() + testset = result.scalars().first() return testset @@ -1921,7 +1921,7 @@ async def update_testset(testset_id: str, values_to_update: dict) -> None: result = await session.execute( select(TestSetDB).filter_by(id=uuid.UUID(testset_id)) ) - testset = result.scalars().one_or_none() + testset = result.scalars().first() # Validate keys in values_to_update and update attributes valid_keys = [key for key in values_to_update.keys() if hasattr(testset, key)] @@ -1973,7 +1973,7 @@ async def fetch_evaluation_by_id(evaluation_id: str) -> Optional[EvaluationDB]: joinedload(EvaluationDB.testset).load_only(TestSetDB.id, TestSetDB.name), # type: ignore ) result = await session.execute(query) - evaluation = result.scalars().one_or_none() + evaluation = result.scalars().first() return evaluation @@ -2145,7 +2145,7 @@ async def fetch_human_evaluation_by_id( joinedload(HumanEvaluationDB.testset).load_only(TestSetDB.id, TestSetDB.name), # type: ignore ) result = await session.execute(query) - evaluation = result.scalars().one_or_none() + evaluation = result.scalars().first() return evaluation @@ -2164,7 +2164,7 @@ async def update_human_evaluation(evaluation_id: str, values_to_update: dict): result = await session.execute( select(HumanEvaluationDB).filter_by(id=uuid.UUID(evaluation_id)) ) - human_evaluation = result.scalars().one_or_none() + human_evaluation = result.scalars().first() if not human_evaluation: raise NoResultFound(f"Human evaluation with id {evaluation_id} not found") @@ -2188,7 +2188,7 @@ async def delete_human_evaluation(evaluation_id: str): result = await session.execute( select(HumanEvaluationDB).filter_by(id=uuid.UUID(evaluation_id)) ) - evaluation = result.scalars().one_or_none() + evaluation = result.scalars().first() if not evaluation: raise NoResultFound(f"Human evaluation with id {evaluation_id} not found") @@ -2250,7 +2250,7 @@ async def update_human_evaluation_scenario( id=uuid.UUID(evaluation_scenario_id) ) ) - human_evaluation_scenario = result.scalars().one_or_none() + human_evaluation_scenario = result.scalars().first() if not human_evaluation_scenario: raise NoResultFound( f"Human evaluation scenario with id {evaluation_scenario_id} not found" @@ -2347,7 +2347,7 @@ async def fetch_evaluation_scenario_by_id( result = await session.execute( select(EvaluationScenarioDB).filter_by(id=uuid.UUID(evaluation_scenario_id)) ) - evaluation_scenario = result.scalars().one_or_none() + evaluation_scenario = result.scalars().first() return evaluation_scenario @@ -2368,7 +2368,7 @@ async def fetch_human_evaluation_scenario_by_id( id=uuid.UUID(evaluation_scenario_id) ) ) - evaluation_scenario = result.scalars().one_or_none() + evaluation_scenario = result.scalars().first() return evaluation_scenario @@ -2389,7 +2389,7 @@ async def fetch_human_evaluation_scenario_by_evaluation_id( evaluation_id=evaluation.id # type: ignore ) ) - human_eval_scenario = result.scalars().one_or_none() + human_eval_scenario = result.scalars().first() return human_eval_scenario @@ -2433,7 +2433,7 @@ async def add_template(**kwargs: dict) -> str: result = await session.execute( select(TemplateDB).filter_by(tag_id=kwargs["tag_id"]) ) - existing_template = result.scalars().one_or_none() + existing_template = result.scalars().first() if existing_template is None: db_template = TemplateDB(**kwargs) @@ -2513,7 +2513,7 @@ async def get_template(template_id: str) -> TemplateDB: result = await session.execute( select(TemplateDB).filter_by(id=uuid.UUID(template_id)) ) - template_db = result.scalars().one_or_none() + template_db = result.scalars().first() return template_db @@ -2570,7 +2570,7 @@ async def update_base( result = await session.execute( select(VariantBaseDB).filter_by(id=uuid.UUID(base_id)) ) - base = result.scalars().one_or_none() + base = result.scalars().first() for key, value in kwargs.items(): if hasattr(base, key): setattr(base, key, value) @@ -2607,7 +2607,7 @@ async def update_app_variant( result = await session.execute( select(AppVariantDB).filter_by(id=uuid.UUID(app_variant_id)) ) - app_variant = result.scalars().one_or_none() + app_variant = result.scalars().first() if not app_variant: raise NoResultFound(f"App variant with id {app_variant_id} not found") @@ -2657,7 +2657,7 @@ async def fetch_app_by_name_and_parameters( query = base_query.join(UserDB).filter(UserDB.uid == user_uid) result = await session.execute(query) - app_db = result.unique().scalars().one_or_none() + app_db = result.unique().scalars().first() return app_db @@ -2939,7 +2939,7 @@ async def fetch_evaluator_config(evaluator_config_id: str): result = await session.execute( select(EvaluatorConfigDB).filter_by(id=uuid.UUID(evaluator_config_id)) ) - evaluator_config = result.scalars().one_or_none() + evaluator_config = result.scalars().first() return evaluator_config @@ -2988,7 +2988,7 @@ async def fetch_evaluator_config_by_appId( app_id=uuid.UUID(app_id), evaluator_key=evaluator_name ) ) - evaluator_config = result.scalars().one_or_none() + evaluator_config = result.scalars().first() return evaluator_config @@ -3039,7 +3039,7 @@ async def update_evaluator_config( result = await session.execute( select(EvaluatorConfigDB).filter_by(id=uuid.UUID(evaluator_config_id)) ) - evaluator_config = result.scalars().one_or_none() + evaluator_config = result.scalars().first() if not evaluator_config: raise NoResultFound( f"Evaluator config with id {evaluator_config_id} not found" @@ -3064,7 +3064,7 @@ async def delete_evaluator_config(evaluator_config_id: str) -> bool: result = await session.execute( select(EvaluatorConfigDB).filter_by(id=uuid.UUID(evaluator_config_id)) ) - evaluator_config = result.scalars().one_or_none() + evaluator_config = result.scalars().first() if evaluator_config is None: raise NoResultFound( f"Evaluator config with id {evaluator_config_id} not found" @@ -3094,7 +3094,7 @@ async def update_evaluation( result = await session.execute( select(EvaluationDB).filter_by(id=uuid.UUID(evaluation_id)) ) - evaluation = result.scalars().one_or_none() + evaluation = result.scalars().first() for key, value in updates.items(): if hasattr(evaluation, key): setattr(evaluation, key, value) From 747b5be39db10bdb76dcff031590c1baedc5550f Mon Sep 17 00:00:00 2001 From: Abram Date: Tue, 2 Jul 2024 17:36:32 +0100 Subject: [PATCH 30/34] refactor (tools): set db name in postgres_uri to backend and celery_worker compose services --- docker-compose.test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.test.yml b/docker-compose.test.yml index fbacfd81c..b4b0770b8 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -15,7 +15,7 @@ services: build: ./agenta-backend container_name: agenta-backend-test environment: - - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432 + - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432/agenta_test - REDIS_URL=redis://redis:6379/0 - ENVIRONMENT=${ENVIRONMENT} - BARE_DOMAIN_NAME=localhost @@ -111,7 +111,7 @@ services: command: > watchmedo auto-restart --directory=./agenta_backend --pattern=*.py --recursive -- celery -A agenta_backend.main.celery_app worker --concurrency=1 --loglevel=INFO environment: - - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432 + - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432/agenta_test - REDIS_URL=redis://redis:6379/0 - ENVIRONMENT=${ENVIRONMENT} - CELERY_BROKER_URL=amqp://guest@rabbitmq// From 3a05bcd7659f9ef1f56aeffe99247a1116bd68ce Mon Sep 17 00:00:00 2001 From: Abram Date: Wed, 3 Jul 2024 06:41:56 +0100 Subject: [PATCH 31/34] refactor (backend): move initialization of async sqlalchemy engine to __init__ method --- .../agenta_backend/models/db_engine.py | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/agenta-backend/agenta_backend/models/db_engine.py b/agenta-backend/agenta_backend/models/db_engine.py index 361bff503..f9379a969 100644 --- a/agenta-backend/agenta_backend/models/db_engine.py +++ b/agenta-backend/agenta_backend/models/db_engine.py @@ -95,8 +95,15 @@ class DBEngine: def __init__(self) -> None: self.mode = os.environ.get("DATABASE_MODE", "v2") - self.postgres_uri = os.environ.get("POSTGRES_URI", None) + self.postgres_uri = os.environ.get("POSTGRES_URI") self.mongo_uri = os.environ.get("MONGODB_URI") + self.engine = create_async_engine(url=self.postgres_uri) # type: ignore + self.async_session_maker = async_sessionmaker( + bind=self.engine, class_=AsyncSession, expire_on_commit=False + ) + self.async_session = async_scoped_session( + session_factory=self.async_session_maker, scopefunc=current_task + ) async def initialize_async_postgres(self): """ @@ -106,14 +113,6 @@ async def initialize_async_postgres(self): if not self.postgres_uri: raise ValueError("Postgres URI cannot be None.") - self.engine = create_async_engine(self.postgres_uri) - self.async_session_maker = async_sessionmaker( - bind=self.engine, class_=AsyncSession, expire_on_commit=False - ) - self.async_session = async_scoped_session( - session_factory=self.async_session_maker, scopefunc=current_task - ) - async with self.engine.begin() as conn: # Drop and create tables if needed for model in models: @@ -184,9 +183,5 @@ async def close(self): await self.engine.dispose() - self.engine = None - self.async_session_maker = None - self.async_session = None - db_engine = DBEngine() From 8dd92a24f83d5184ce1cda5bf613e775b0a3797d Mon Sep 17 00:00:00 2001 From: Abram Date: Wed, 3 Jul 2024 06:43:00 +0100 Subject: [PATCH 32/34] minor refactor (tests): initialize db_engine and ensure that database is dropped before closing pool connection --- agenta-backend/agenta_backend/tests/conftest.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/agenta-backend/agenta_backend/tests/conftest.py b/agenta-backend/agenta_backend/tests/conftest.py index e78d0f2b4..943fcb6e4 100644 --- a/agenta-backend/agenta_backend/tests/conftest.py +++ b/agenta-backend/agenta_backend/tests/conftest.py @@ -16,10 +16,11 @@ def event_loop(): res._close = res.close # type: ignore # Initialize database and create tables - res.run_until_complete(DBEngine().init_db()) + db_engine = DBEngine() + res.run_until_complete(db_engine.init_db()) yield res - res.run_until_complete(DBEngine().close()) # close connections to database - res.run_until_complete(DBEngine().remove_db()) # drop database + res.run_until_complete(db_engine.close()) # close connections to database + res.run_until_complete(db_engine.remove_db()) # drop database res._close() # close event loop # type: ignore From 31247252b19fb18bfdb1c64ae44550aabd965cf4 Mon Sep 17 00:00:00 2001 From: Abram Date: Wed, 3 Jul 2024 06:46:09 +0100 Subject: [PATCH 33/34] minor refactor (tests): ensure that test db is dropped before closing engine connection --- agenta-backend/agenta_backend/tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agenta-backend/agenta_backend/tests/conftest.py b/agenta-backend/agenta_backend/tests/conftest.py index 943fcb6e4..c603898e2 100644 --- a/agenta-backend/agenta_backend/tests/conftest.py +++ b/agenta-backend/agenta_backend/tests/conftest.py @@ -21,6 +21,6 @@ def event_loop(): yield res - res.run_until_complete(db_engine.close()) # close connections to database res.run_until_complete(db_engine.remove_db()) # drop database + res.run_until_complete(db_engine.close()) # close connections to database res._close() # close event loop # type: ignore From a8c05516b6afb7f017f55a3aa237ca5f295901ee Mon Sep 17 00:00:00 2001 From: Abram Date: Wed, 3 Jul 2024 07:08:24 +0100 Subject: [PATCH 34/34] minor refactor (tools): revert back to using default database created --- docker-compose.test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.test.yml b/docker-compose.test.yml index b4b0770b8..fbacfd81c 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -15,7 +15,7 @@ services: build: ./agenta-backend container_name: agenta-backend-test environment: - - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432/agenta_test + - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432 - REDIS_URL=redis://redis:6379/0 - ENVIRONMENT=${ENVIRONMENT} - BARE_DOMAIN_NAME=localhost @@ -111,7 +111,7 @@ services: command: > watchmedo auto-restart --directory=./agenta_backend --pattern=*.py --recursive -- celery -A agenta_backend.main.celery_app worker --concurrency=1 --loglevel=INFO environment: - - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432/agenta_test + - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432 - REDIS_URL=redis://redis:6379/0 - ENVIRONMENT=${ENVIRONMENT} - CELERY_BROKER_URL=amqp://guest@rabbitmq//