diff --git a/agenta-backend/agenta_backend/migrations/backup.py b/agenta-backend/agenta_backend/migrations/mongo/backup.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/backup.py rename to agenta-backend/agenta_backend/migrations/mongo/backup.py diff --git a/agenta-backend/agenta_backend/migrations/v0_10_0_to_v0_11_0/20240131130415_updating_app_environment.py b/agenta-backend/agenta_backend/migrations/mongo/v0_10_0_to_v0_11_0/20240131130415_updating_app_environment.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_10_0_to_v0_11_0/20240131130415_updating_app_environment.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_10_0_to_v0_11_0/20240131130415_updating_app_environment.py diff --git a/agenta-backend/agenta_backend/migrations/v0_10_0_to_v0_11_0/20240131132738_create_app_environment_revision.py b/agenta-backend/agenta_backend/migrations/mongo/v0_10_0_to_v0_11_0/20240131132738_create_app_environment_revision.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_10_0_to_v0_11_0/20240131132738_create_app_environment_revision.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_10_0_to_v0_11_0/20240131132738_create_app_environment_revision.py diff --git a/agenta-backend/agenta_backend/migrations/v0_11_0_to_v0_12_0/20240126100524_models_revamp.py b/agenta-backend/agenta_backend/migrations/mongo/v0_11_0_to_v0_12_0/20240126100524_models_revamp.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_11_0_to_v0_12_0/20240126100524_models_revamp.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_11_0_to_v0_12_0/20240126100524_models_revamp.py diff --git a/agenta-backend/agenta_backend/migrations/v0_11_0_to_v0_12_0/20240126144938_drop_organization_model.py b/agenta-backend/agenta_backend/migrations/mongo/v0_11_0_to_v0_12_0/20240126144938_drop_organization_model.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_11_0_to_v0_12_0/20240126144938_drop_organization_model.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_11_0_to_v0_12_0/20240126144938_drop_organization_model.py diff --git a/agenta-backend/agenta_backend/migrations/v0_12_0_to_v0_16_0/20240509122536_evaluation_scenario_correct_answer.py b/agenta-backend/agenta_backend/migrations/mongo/v0_12_0_to_v0_16_0/20240509122536_evaluation_scenario_correct_answer.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_12_0_to_v0_16_0/20240509122536_evaluation_scenario_correct_answer.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_12_0_to_v0_16_0/20240509122536_evaluation_scenario_correct_answer.py diff --git a/agenta-backend/agenta_backend/migrations/v0_16_0_1_to_v0_16_0_2/20240603145957_code_evaluators.py b/agenta-backend/agenta_backend/migrations/mongo/v0_16_0_1_to_v0_16_0_2/20240603145957_code_evaluators.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_16_0_1_to_v0_16_0_2/20240603145957_code_evaluators.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_16_0_1_to_v0_16_0_2/20240603145957_code_evaluators.py diff --git a/agenta-backend/agenta_backend/migrations/v0_16_0_to_v0_16_0_1/20240602150517_evaluators.py b/agenta-backend/agenta_backend/migrations/mongo/v0_16_0_to_v0_16_0_1/20240602150517_evaluators.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_16_0_to_v0_16_0_1/20240602150517_evaluators.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_16_0_to_v0_16_0_1/20240602150517_evaluators.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110001454_initial_migration.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110001454_initial_migration.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110001454_initial_migration.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110001454_initial_migration.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110132547_create_exact_match_evaluator_config.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110132547_create_exact_match_evaluator_config.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110132547_create_exact_match_evaluator_config.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110132547_create_exact_match_evaluator_config.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110165900_evaluations_revamp.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110165900_evaluations_revamp.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240110165900_evaluations_revamp.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240110165900_evaluations_revamp.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120721_evaluation_scenarios_revamp.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120721_evaluation_scenarios_revamp.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120721_evaluation_scenarios_revamp.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120721_evaluation_scenarios_revamp.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120740_human_a_b_evaluation_scenarios.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120740_human_a_b_evaluation_scenarios.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120740_human_a_b_evaluation_scenarios.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120740_human_a_b_evaluation_scenarios.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120800_human_single_model_evaluation_scenarios.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120800_human_single_model_evaluation_scenarios.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240112120800_human_single_model_evaluation_scenarios.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240112120800_human_single_model_evaluation_scenarios.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240113131802_new_evaluation_results_aggregation.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240113131802_new_evaluation_results_aggregation.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240113131802_new_evaluation_results_aggregation.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240113131802_new_evaluation_results_aggregation.py diff --git a/agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240113204909_change_odmantic_reference_to_link.py b/agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240113204909_change_odmantic_reference_to_link.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_7_0_to_v0_8_0/20240113204909_change_odmantic_reference_to_link.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_7_0_to_v0_8_0/20240113204909_change_odmantic_reference_to_link.py diff --git a/agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133518_updating_app_variant.py b/agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133518_updating_app_variant.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133518_updating_app_variant.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133518_updating_app_variant.py diff --git a/agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133544_create_app_variant_revision.py b/agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133544_create_app_variant_revision.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133544_create_app_variant_revision.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133544_create_app_variant_revision.py diff --git a/agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133603_updating_app_environment.py b/agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133603_updating_app_environment.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130133603_updating_app_environment.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130133603_updating_app_environment.py diff --git a/agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130140202_connection_evaluation.py b/agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130140202_connection_evaluation.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_8_0_to_v0_9_0/20240130140202_connection_evaluation.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_8_0_to_v0_9_0/20240130140202_connection_evaluation.py diff --git a/agenta-backend/agenta_backend/migrations/v0_9_0_to_v10_0_0/20240124174954_evaluation_error_handling.py b/agenta-backend/agenta_backend/migrations/mongo/v0_9_0_to_v10_0_0/20240124174954_evaluation_error_handling.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_9_0_to_v10_0_0/20240124174954_evaluation_error_handling.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_9_0_to_v10_0_0/20240124174954_evaluation_error_handling.py diff --git a/agenta-backend/agenta_backend/migrations/v0_9_0_to_v10_0_0/20240124225808_evaluation_scenario_error_handling.py b/agenta-backend/agenta_backend/migrations/mongo/v0_9_0_to_v10_0_0/20240124225808_evaluation_scenario_error_handling.py similarity index 100% rename from agenta-backend/agenta_backend/migrations/v0_9_0_to_v10_0_0/20240124225808_evaluation_scenario_error_handling.py rename to agenta-backend/agenta_backend/migrations/mongo/v0_9_0_to_v10_0_0/20240124225808_evaluation_scenario_error_handling.py diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/README.md b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/README.md new file mode 100644 index 000000000..0c6f6c4b8 --- /dev/null +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/README.md @@ -0,0 +1,6 @@ +```bash +docker ps +docker exec -it {backend-container-id} bash +cd /app/agenta_backend/migrations/mongo_to_postgres +python3 migration.py +``` \ No newline at end of file diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/db_engine.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/db_engine.py new file mode 100644 index 000000000..30aa53262 --- /dev/null +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/db_engine.py @@ -0,0 +1,152 @@ +import os +import logging +from asyncio import current_task +from typing import AsyncGenerator +from contextlib import asynccontextmanager + +from sqlalchemy.ext.asyncio import ( + AsyncSession, + create_async_engine, + async_sessionmaker, + async_scoped_session, +) + +from agenta_backend.utils.common import isCloudEE + +if isCloudEE(): + from agenta_backend.commons.observability.models.db import SpanDB + from agenta_backend.commons.models.db_models import ( + APIKeyDB, + WorkspaceDB, + OrganizationDB, + AppDB_ as AppDB, + UserDB_ as UserDB, + ImageDB_ as ImageDB, + TestSetDB_ as TestSetDB, + AppVariantDB_ as AppVariantDB, + EvaluationDB_ as EvaluationDB, + DeploymentDB_ as DeploymentDB, + VariantBaseDB_ as VariantBaseDB, + AppEnvironmentDB_ as AppEnvironmentDB, + AppEnvironmentRevisionDB_ as AppEnvironmentRevisionDB, + EvaluatorConfigDB_ as EvaluatorConfigDB, + HumanEvaluationDB_ as HumanEvaluationDB, + EvaluationScenarioDB_ as EvaluationScenarioDB, + HumanEvaluationScenarioDB_ as HumanEvaluationScenarioDB, + ) +else: + from agenta_backend.models.db_models import ( + AppDB, + UserDB, + ImageDB, + TestSetDB, + EvaluationDB, + DeploymentDB, + AppVariantDB, + VariantBaseDB, + AppEnvironmentDB, + AppEnvironmentRevisionDB, + EvaluatorConfigDB, + HumanEvaluationDB, + EvaluationScenarioDB, + HumanEvaluationScenarioDB, + ) + +from agenta_backend.models.db_models import ( + TemplateDB, + AppVariantRevisionsDB, +) + +models = [ + AppDB, + UserDB, + ImageDB, + TestSetDB, + TemplateDB, + AppVariantDB, + DeploymentDB, + EvaluationDB, + VariantBaseDB, + AppEnvironmentDB, + AppEnvironmentRevisionDB, + EvaluatorConfigDB, + HumanEvaluationDB, + EvaluationScenarioDB, + AppVariantRevisionsDB, + HumanEvaluationScenarioDB, +] + +if isCloudEE(): + models.extend([OrganizationDB, WorkspaceDB, APIKeyDB]) # type: ignore + + +# Configure and set logging level +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class DBEngine: + """ + Database engine to initialize SQLAlchemy and return the engine based on mode. + """ + + def __init__(self) -> None: + self.mode = os.environ.get("DATABASE_MODE", "v2") + self.db_url = f"{os.environ.get('POSTGRES_URI')}" + self.engine = create_async_engine(url=self.db_url) + self.async_session_maker = async_sessionmaker( + bind=self.engine, class_=AsyncSession, expire_on_commit=False + ) + self.async_session = async_scoped_session( + session_factory=self.async_session_maker, scopefunc=current_task + ) + + async def init_db(self): + """ + Initialize the database based on the mode and create all tables. + """ + async with self.engine.begin() as conn: + # Drop all existing tables (if needed) + # await conn.run_sync(Base.metadata.drop_all) + # Create tables + for model in models: + await conn.run_sync(model.metadata.create_all) + logger.info(f"Using {self.mode} database...") + + async def remove_db(self) -> None: + """ + Remove the database based on the mode. + """ + async with self.engine.begin() as conn: + for model in models: + await conn.run_sync(model.metadata.drop_all) + + @asynccontextmanager + async def get_session(self) -> AsyncGenerator[AsyncSession, None]: + session = self.async_session() + try: + yield session + except Exception as e: + await session.rollback() + raise e + finally: + await session.close() + + async def close(self): + """ + Closes and dispose all the connections using the engine. + + :raises Exception: if engine is initialized + """ + + if self.engine is None: + raise Exception("DBEngine is not initialized") + + await self.engine.dispose() + + self.engine = None + self.async_session_maker = None + self.async_session = None + + +db_engine = DBEngine() diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py new file mode 100644 index 000000000..d2d595873 --- /dev/null +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/migration.py @@ -0,0 +1,548 @@ +import json +import os +import asyncio +from datetime import datetime, timezone + +from pymongo import MongoClient +from bson import DBRef +from sqlalchemy import text +from sqlalchemy.dialects.postgresql import UUID +import uuid_utils.compat as uuid + +from agenta_backend.models.db_models import ( + UserDB, + ImageDB, + AppDB, + DeploymentDB, + VariantBaseDB, + AppVariantDB, + AppEnvironmentDB, + AppEnvironmentRevisionDB, + TemplateDB, + TestSetDB, + EvaluatorConfigDB, + HumanEvaluationDB, + HumanEvaluationVariantDB, + HumanEvaluationScenarioDB, + EvaluationAggregatedResultDB, + EvaluationScenarioResultDB, + EvaluationDB, + EvaluationEvaluatorConfigDB, + EvaluationScenarioDB, + IDsMappingDB, + AppVariantRevisionsDB, +) + +from agenta_backend.migrations.mongo_to_postgres.utils import ( + drop_all_tables, + create_all_tables, + print_migration_report, + store_mapping, + get_mapped_uuid, + generate_uuid, + get_datetime, + migrate_collection, +) + +from agenta_backend.models.shared_models import TemplateType + +tables = [ + UserDB, + ImageDB, + AppDB, + DeploymentDB, + VariantBaseDB, + AppVariantDB, + AppVariantRevisionsDB, + AppEnvironmentDB, + AppEnvironmentRevisionDB, + TemplateDB, + TestSetDB, + EvaluatorConfigDB, + HumanEvaluationDB, + HumanEvaluationScenarioDB, + EvaluationDB, + EvaluationScenarioDB, + IDsMappingDB, + EvaluationEvaluatorConfigDB, + EvaluationScenarioResultDB, +] + + +async def transform_user(user): + user_uuid = generate_uuid() + await store_mapping("users", user["_id"], user_uuid) + return { + "id": user_uuid, + "uid": user["uid"], + "username": user["username"], + "email": user["email"], + "created_at": get_datetime(user.get("created_at")), + "updated_at": get_datetime(user.get("updated_at")), + } + + +async def transform_image(image): + user_uuid = await get_mapped_uuid( + "users", image["user"].id if isinstance(image["user"], DBRef) else image["user"] + ) + image_uuid = generate_uuid() + await store_mapping("docker_images", image["_id"], image_uuid) + return { + "id": image_uuid, + "type": image["type"], + "template_uri": image.get("template_uri"), + "docker_id": image.get("docker_id"), + "tags": image.get("tags"), + "deletable": image.get("deletable", True), + "user_id": user_uuid, + "created_at": get_datetime(image.get("created_at")), + "updated_at": get_datetime(image.get("updated_at")), + } + + +async def transform_app(app): + user_uuid = await get_mapped_uuid("users", app["user"].id) + app_uuid = generate_uuid() + await store_mapping("app_db", app["_id"], app_uuid) + return { + "id": app_uuid, + "app_name": app["app_name"], + "user_id": user_uuid, + "created_at": get_datetime(app.get("created_at")), + "updated_at": get_datetime(app.get("updated_at")), + } + + +async def transform_deployment(deployment): + app_uuid = await get_mapped_uuid("app_db", deployment["app"].id) + user_uuid = await get_mapped_uuid("users", deployment["user"].id) + deployment_uuid = generate_uuid() + await store_mapping("deployments", deployment["_id"], deployment_uuid) + return { + "id": deployment_uuid, + "app_id": app_uuid, + "user_id": user_uuid, + "container_name": deployment.get("container_name"), + "container_id": deployment.get("container_id"), + "uri": deployment.get("uri"), + "status": deployment["status"], + "created_at": get_datetime(deployment.get("created_at")), + "updated_at": get_datetime(deployment.get("updated_at")), + } + + +async def transform_variant_base(base): + app_uuid = await get_mapped_uuid("app_db", base["app"].id) + user_uuid = await get_mapped_uuid("users", base["user"].id) + image_uuid = await get_mapped_uuid("docker_images", base["image"].id) + deployment_uuid = base["deployment"] and await get_mapped_uuid( + "deployments", base["deployment"] + ) + base_uuid = generate_uuid() + await store_mapping("bases", base["_id"], base_uuid) + return { + "id": base_uuid, + "app_id": app_uuid, + "user_id": user_uuid, + "base_name": base["base_name"], + "image_id": image_uuid, + "deployment_id": deployment_uuid, + "created_at": get_datetime(base.get("created_at")), + "updated_at": get_datetime(base.get("updated_at")), + } + + +async def transform_app_variant(variant): + app_uuid = await get_mapped_uuid("app_db", variant["app"].id) + image_uuid = await get_mapped_uuid("docker_images", variant["image"].id) + user_uuid = await get_mapped_uuid("users", variant["user"].id) + modified_by_uuid = await get_mapped_uuid("users", variant["modified_by"].id) + base_uuid = await get_mapped_uuid("bases", variant["base"].id) + variant_uuid = generate_uuid() + await store_mapping("app_variants", variant["_id"], variant_uuid) + return { + "id": variant_uuid, + "app_id": app_uuid, + "variant_name": variant["variant_name"], + "revision": variant["revision"], + "image_id": image_uuid, + "user_id": user_uuid, + "modified_by_id": modified_by_uuid, + "base_name": variant.get("base_name"), + "base_id": base_uuid, + "config_name": variant["config_name"], + "config_parameters": variant["config"]["parameters"], + "created_at": get_datetime(variant.get("created_at")), + "updated_at": get_datetime(variant.get("updated_at")), + } + + +async def transform_app_variant_revision(revision): + variant_uuid = await get_mapped_uuid("app_variants", revision["variant"].id) + modified_by_uuid = await get_mapped_uuid("users", revision["modified_by"].id) + base_uuid = await get_mapped_uuid("bases", revision["base"].id) + revision_uuid = generate_uuid() + await store_mapping("app_variant_revisions", revision["_id"], revision_uuid) + return { + "id": revision_uuid, + "variant_id": variant_uuid, + "revision": revision["revision"], + "modified_by_id": modified_by_uuid, + "base_id": base_uuid, + "config_name": revision["config"]["config_name"], + "config_parameters": revision["config"]["parameters"], + "created_at": get_datetime(revision["created_at"]), + "updated_at": get_datetime(revision["updated_at"]), + } + + +async def transform_app_environment(environment): + app_uuid = await get_mapped_uuid("app_db", environment["app"].id) + user_uuid = await get_mapped_uuid("users", environment["user"].id) + variant_uuid = await get_mapped_uuid( + "app_variants", environment["deployed_app_variant"] + ) + + if environment["deployed_app_variant_revision"] is None: + revision_uuid = None + else: + revision_uuid = await get_mapped_uuid( + "app_variant_revisions", environment["deployed_app_variant_revision"].id + ) + + deployment_uuid = await get_mapped_uuid("deployments", environment["deployment"]) + environment_uuid = generate_uuid() + await store_mapping("environments", environment["_id"], environment_uuid) + return { + "id": environment_uuid, + "app_id": app_uuid, + "name": environment["name"], + "user_id": user_uuid, + "revision": environment["revision"], + "deployed_app_variant_id": variant_uuid, + "deployed_app_variant_revision_id": revision_uuid, + "deployment_id": deployment_uuid, + "created_at": get_datetime(environment.get("created_at")), + } + + +async def transform_app_environment_revision(revision): + environment_uuid = await get_mapped_uuid("environments", revision["environment"].id) + modified_by_uuid = await get_mapped_uuid("users", revision["modified_by"].id) + variant_revision_uuid = await get_mapped_uuid( + "app_variant_revisions", revision["deployed_app_variant_revision"] + ) + deployment_uuid = await get_mapped_uuid("deployments", revision["deployment"]) + revision_uuid = generate_uuid() + await store_mapping("environments_revisions", revision["_id"], revision_uuid) + return { + "id": revision_uuid, + "environment_id": environment_uuid, + "revision": revision["revision"], + "modified_by_id": modified_by_uuid, + "deployed_app_variant_revision_id": variant_revision_uuid, + "deployment_id": deployment_uuid, + "created_at": get_datetime(revision["created_at"]), + } + + +async def transform_template(template): + template_uuid = generate_uuid() + await store_mapping("templates", template["_id"], template_uuid) + + template_type = ( + TemplateType(template["type"]) if "type" in template else TemplateType.IMAGE + ) + + return { + "id": template_uuid, + "type": template_type, + "template_uri": template.get("template_uri"), + "tag_id": template.get("tag_id"), + "name": template["name"], + "repo_name": template.get("repo_name"), + "title": template["title"], + "description": template["description"], + "size": template.get("size"), + "digest": template.get("digest"), + "last_pushed": get_datetime(template.get("last_pushed")), + } + + +async def transform_test_set(test_set): + app_uuid = await get_mapped_uuid("app_db", test_set["app"].id) + user_uuid = await get_mapped_uuid("users", test_set["user"].id) + test_set_uuid = generate_uuid() + await store_mapping("testsets", test_set["_id"], test_set_uuid) + return { + "id": test_set_uuid, + "name": test_set["name"], + "app_id": app_uuid, + "csvdata": test_set["csvdata"], + "user_id": user_uuid, + "created_at": get_datetime(test_set.get("created_at")), + "updated_at": get_datetime(test_set.get("updated_at")), + } + + +async def transform_evaluator_config(config): + app_uuid = await get_mapped_uuid("app_db", config["app"].id) + user_uuid = await get_mapped_uuid("users", config["user"].id) + config_uuid = generate_uuid() + await store_mapping("evaluators_configs", config["_id"], config_uuid) + return { + "id": config_uuid, + "app_id": app_uuid, + "user_id": user_uuid, + "name": config["name"], + "evaluator_key": config["evaluator_key"], + "settings_values": config["settings_values"], + "created_at": get_datetime(config.get("created_at")), + "updated_at": get_datetime(config.get("updated_at")), + } + + +async def convert_human_evaluations_associated_variants( + variants, variants_revisions, evaluation_id +): + """Convert variant and revision ObjectIds to UUIDs and structure them.""" + associated_variants = [] + assert len(variants) == len( + variants_revisions + ), "variants and variants_revisions must have the same length" + + for variant_id, revision_id in zip(variants, variants_revisions): + variant_uuid = await get_mapped_uuid("app_variants", variant_id) + revision_uuid = await get_mapped_uuid("app_variant_revisions", revision_id) + associated_variants.append( + { + "human_evaluation_id": evaluation_id, + "variant_id": variant_uuid, + "variant_revision_id": revision_uuid, + } + ) + return associated_variants + + +async def transform_human_evaluation(evaluation): + app_uuid = await get_mapped_uuid("app_db", evaluation["app"].id) + user_uuid = await get_mapped_uuid("users", evaluation["user"].id) + test_set_uuid = await get_mapped_uuid("testsets", evaluation["testset"].id) + evaluation_uuid = generate_uuid() + + await store_mapping("human_evaluations", evaluation["_id"], evaluation_uuid) + + transformed_evaluation = { + "id": evaluation_uuid, + "app_id": app_uuid, + "user_id": user_uuid, + "status": evaluation["status"], + "evaluation_type": evaluation["evaluation_type"], + "testset_id": test_set_uuid, + "created_at": get_datetime(evaluation.get("created_at")), + "updated_at": get_datetime(evaluation.get("updated_at")), + } + + associated_variants = await convert_human_evaluations_associated_variants( + evaluation["variants"], evaluation["variants_revisions"], evaluation_uuid + ) + + return transformed_evaluation, associated_variants + + +async def transform_human_evaluation_scenario(scenario): + user_uuid = await get_mapped_uuid("users", scenario["user"].id) + evaluation_uuid = await get_mapped_uuid( + "human_evaluations", scenario["evaluation"].id + ) + variant_uuid = str(await get_mapped_uuid("app_variants", scenario["vote"])) + scenario_uuid = generate_uuid() + + await store_mapping("human_evaluations_scenarios", scenario["_id"], scenario_uuid) + return { + "id": scenario_uuid, + "user_id": user_uuid, + "evaluation_id": evaluation_uuid, + "inputs": scenario["inputs"], + "outputs": scenario["outputs"], + "vote": variant_uuid, + "score": scenario.get("score"), + "correct_answer": scenario.get("correct_answer"), + "created_at": get_datetime(scenario.get("created_at")), + "updated_at": get_datetime(scenario.get("updated_at")), + "is_pinned": scenario.get("is_pinned"), + "note": scenario.get("note"), + } + + +async def convert_aggregated_results(results, evaluation_id): + """Convert evaluator_config ObjectIds in aggregated_results to UUIDs and structure them.""" + aggregated_results = [] + for result in results: + evaluator_config_uuid = await get_mapped_uuid( + "evaluators_configs", result["evaluator_config"] + ) + result_uuid = generate_uuid() + aggregated_results.append( + { + "id": result_uuid, + "evaluation_id": evaluation_id, + "evaluator_config_id": evaluator_config_uuid, + "result": result["result"], + } + ) + return aggregated_results + + +async def convert_scenario_aggregated_results(results, scenario_id): + """Convert evaluator_config ObjectIds in scenario aggregated_results to UUIDs and structure them.""" + scenario_aggregated_results = [] + for result in results: + evaluator_config_uuid = await get_mapped_uuid( + "evaluators_configs", result["evaluator_config"] + ) + result_uuid = generate_uuid() + scenario_aggregated_results.append( + { + "id": result_uuid, + "evaluation_scenario_id": scenario_id, + "evaluator_config_id": evaluator_config_uuid, + "result": result["result"], + } + ) + return scenario_aggregated_results + + +async def transform_evaluation(evaluation): + app_uuid = await get_mapped_uuid("app_db", evaluation["app"].id) + user_uuid = await get_mapped_uuid("users", evaluation["user"].id) + test_set_uuid = await get_mapped_uuid("testsets", evaluation["testset"].id) + variant_uuid = await get_mapped_uuid("app_variants", evaluation["variant"]) + revision_uuid = await get_mapped_uuid( + "app_variant_revisions", evaluation["variant_revision"] + ) + evaluation_uuid = generate_uuid() + + await store_mapping("evaluations", evaluation["_id"], evaluation_uuid) + + transformed_evaluation = { + "id": evaluation_uuid, + "app_id": app_uuid, + "user_id": user_uuid, + "status": evaluation["status"], + "testset_id": test_set_uuid, + "variant_id": variant_uuid, + "variant_revision_id": revision_uuid, + "average_cost": evaluation["average_cost"], + "total_cost": evaluation["total_cost"], + "average_latency": evaluation["average_latency"], + "created_at": get_datetime(evaluation.get("created_at")), + "updated_at": get_datetime(evaluation.get("updated_at")), + } + + aggregated_results = await convert_aggregated_results( + evaluation["aggregated_results"], evaluation_uuid + ) + + return transformed_evaluation, aggregated_results + + +async def transform_evaluation_scenario(scenario): + user_uuid = await get_mapped_uuid("users", scenario["user"].id) + evaluation_uuid = await get_mapped_uuid("evaluations", scenario["evaluation"].id) + variant_uuid = await get_mapped_uuid("app_variants", scenario["variant_id"]) + scenario_uuid = generate_uuid() + + await store_mapping("evaluation_scenarios", scenario["_id"], scenario_uuid) + + transformed_scenario = { + "id": scenario_uuid, + "user_id": user_uuid, + "evaluation_id": evaluation_uuid, + "variant_id": variant_uuid, + "inputs": scenario["inputs"], + "outputs": scenario["outputs"], + "correct_answers": scenario.get("correct_answers"), + "is_pinned": scenario.get("is_pinned"), + "note": scenario.get("note"), + "latency": scenario.get("latency"), + "cost": scenario.get("cost"), + "created_at": get_datetime(scenario.get("created_at")), + "updated_at": get_datetime(scenario.get("updated_at")), + } + + aggregated_results = [] + if "results" in scenario: + aggregated_results = await convert_scenario_aggregated_results( + scenario["results"], scenario_uuid + ) + + return transformed_scenario, aggregated_results + + +async def main(): + try: + await drop_all_tables() + await create_all_tables(tables=tables) + await migrate_collection("users", UserDB, transform_user) + await migrate_collection("docker_images", ImageDB, transform_image) + await migrate_collection("app_db", AppDB, transform_app) + await migrate_collection("deployments", DeploymentDB, transform_deployment) + await migrate_collection("bases", VariantBaseDB, transform_variant_base) + await migrate_collection("app_variants", AppVariantDB, transform_app_variant) + await migrate_collection( + "app_variant_revisions", + AppVariantRevisionsDB, + transform_app_variant_revision, + ) + await migrate_collection( + "environments", AppEnvironmentDB, transform_app_environment + ) + await migrate_collection( + "environments_revisions", + AppEnvironmentRevisionDB, + transform_app_environment_revision, + ) + await migrate_collection("templates", TemplateDB, transform_template) + await migrate_collection("testsets", TestSetDB, transform_test_set) + await migrate_collection( + "evaluators_configs", EvaluatorConfigDB, transform_evaluator_config + ) + await migrate_collection( + "human_evaluations", + HumanEvaluationDB, + transform_human_evaluation, + HumanEvaluationVariantDB, + ) + await migrate_collection( + "human_evaluations_scenarios", + HumanEvaluationScenarioDB, + transform_human_evaluation_scenario, + ) + await migrate_collection( + "new_evaluations", + EvaluationDB, + transform_evaluation, + EvaluationAggregatedResultDB, + ) + await migrate_collection( + "new_evaluation_scenarios", + EvaluationScenarioDB, + transform_evaluation_scenario, + EvaluationScenarioResultDB, + ) + print("\n========================================================") + print("Migration completed successfully.") + except Exception as e: + import traceback + + print(f"\n====================== Error ======================\n") + print(f"Error occurred: {e}") + traceback.print_exc() + finally: + print_migration_report() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/mongo_db_engine.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/mongo_db_engine.py new file mode 100644 index 000000000..828a9753b --- /dev/null +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/mongo_db_engine.py @@ -0,0 +1,13 @@ +import os +from pymongo import MongoClient + +# MongoDB connection +MONGO_URI = os.environ.get("MONGODB_URI") +DATABASE_MODE = os.environ.get("DATABASE_MODE") +mongo_client = MongoClient(MONGO_URI) +mongo_db_name = f"agenta_{DATABASE_MODE}" +mongo_db = mongo_client[mongo_db_name] + + +def get_mongo_db(): + return mongo_db diff --git a/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py new file mode 100644 index 000000000..c4f5a0a0c --- /dev/null +++ b/agenta-backend/agenta_backend/migrations/mongo_to_postgres/utils.py @@ -0,0 +1,185 @@ +import os +import asyncio +import asyncpg +from datetime import datetime, timezone +from tqdm import tqdm + +from bson import ObjectId, DBRef +from sqlalchemy import MetaData, Column, String, DateTime, text, create_engine +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +import uuid_utils.compat as uuid +from sqlalchemy.future import select +from sqlalchemy.exc import NoResultFound +from agenta_backend.migrations.mongo_to_postgres.db_engine import db_engine +from sqlalchemy.exc import IntegrityError + +from agenta_backend.models.db_models import IDsMappingDB +from agenta_backend.models.base import Base +from agenta_backend.migrations.mongo_to_postgres.mongo_db_engine import get_mongo_db + +BATCH_SIZE = 1000 + +mongo_db = get_mongo_db() + +migration_report = {} + + +async def drop_all_tables(): + """Drop all tables in the database.""" + async with db_engine.engine.begin() as conn: + await conn.run_sync(Base.metadata.reflect) + # Drop all tables with CASCADE option + for table in reversed(Base.metadata.sorted_tables): + await conn.execute(text(f"DROP TABLE IF EXISTS {table.name} CASCADE")) + print("\n====================== All tables are dropped.\n") + + +async def create_all_tables(tables): + """Create all tables in the database.""" + async with db_engine.engine.begin() as conn: + for table in tables: + print(f"Creating table for {table.__name__}") + await conn.run_sync(table.metadata.create_all) + print("\n====================== All tables are created.\n") + + +async def store_mapping(table_name, mongo_id, uuid): + """Store the mapping of MongoDB ObjectId to UUID in the mapping table.""" + id_ = generate_uuid() + async with db_engine.get_session() as session: + mapping = IDsMappingDB( + id=id_, table_name=table_name, objectid=str(mongo_id), uuid=uuid + ) + session.add(mapping) + await session.commit() + + +async def get_mapped_uuid(table_name, mongo_id): + """Retrieve the mapped UUID for a given MongoDB ObjectId and table name.""" + async with db_engine.get_session() as session: + stmt = select(IDsMappingDB.uuid).filter( + IDsMappingDB.table_name == table_name, + IDsMappingDB.objectid == str(mongo_id), + ) + result = await session.execute(stmt) + try: + row = result.one() + except NoResultFound: + return None + return row[0] + + +def get_datetime(value): + """Helper function to handle datetime fields.""" + if isinstance(value, str): + return datetime.fromisoformat(value.replace("Z", "+00:00")) + return value if value else datetime.now(timezone.utc) + + +def generate_uuid(): + """Generate a new UUID.""" + return uuid.uuid7() + + +def update_migration_report(collection_name, total_docs, migrated_docs, skipped_docs): + migration_report[collection_name] = { + "total": total_docs, + "migrated": migrated_docs, + "skipped": skipped_docs, + } + + +def print_migration_report(): + print( + "\n ============================ Migration Report ============================" + ) + + # Headers + headers = ["Table", "Total in MongoDB", "Migrated to PostgreSQL", "Skipped"] + + if not migration_report: + print("No data available in the migration report.") + return + + # Determine the maximum lengths for each column including headers + max_table_length = max( + len(headers[0]), max(len(table) for table in migration_report.keys()) + ) + max_total_length = max( + len(headers[1]), + max(len(str(counts["total"])) for counts in migration_report.values()), + ) + max_migrated_length = max( + len(headers[2]), + max(len(str(counts["migrated"])) for counts in migration_report.values()), + ) + max_skipped_length = max( + len(headers[3]), + max(len(str(counts.get("skipped", 0))) for counts in migration_report.values()), + ) + + # Set the header and divider with appropriate padding + table_header = f"| {headers[0].ljust(max_table_length)} | {headers[1].ljust(max_total_length)} | {headers[2].ljust(max_migrated_length)} | {headers[3].ljust(max_skipped_length)} |" + table_divider = f"|{'-' * (max_table_length + 2)}|{'-' * (max_total_length + 2)}|{'-' * (max_migrated_length + 2)}|{'-' * (max_skipped_length + 2)}|" + + print(table_header) + print(table_divider) + + for table, counts in migration_report.items(): + skipped = counts.get("skipped", 0) + table_row = f"| {table.ljust(max_table_length)} | {str(counts['total']).ljust(max_total_length)} | {str(counts['migrated']).ljust(max_migrated_length)} | {str(skipped).ljust(max_skipped_length)} |" + print(table_row) + + print(table_divider) + + +async def migrate_collection( + collection_name, model_class, transformation_func, association_model=None +): + """General function to migrate a collection to a SQL table.""" + print(f"\n") + total_docs = mongo_db[collection_name].count_documents({}) + migrated_docs = 0 + skipped_docs = 0 + + async with db_engine.get_session() as session: + for skip in tqdm( + range(0, total_docs, BATCH_SIZE), + total=(total_docs - 1) // BATCH_SIZE + 1, + desc=f"Migrating: {collection_name}", + ncols=85, + ): + batch = await asyncio.get_event_loop().run_in_executor( + None, + lambda: list( + mongo_db[collection_name].find().skip(skip).limit(BATCH_SIZE) + ), + ) + for document in batch: + try: + if association_model: + ( + transformed_document, + associated_entities, + ) = await transformation_func(document) + session.add(model_class(**transformed_document)) + for assoc_entity in associated_entities: + session.add(association_model(**assoc_entity)) + else: + transformed_document = await transformation_func(document) + session.add(model_class(**transformed_document)) + await session.commit() + migrated_docs += 1 + except (asyncpg.exceptions.UniqueViolationError, IntegrityError) as e: + await session.rollback() + print(f"\nSkipping duplicate document in {collection_name}: {e}\n") + skipped_docs += 1 + pass + except Exception as e: + print(f"Error migrating document in {collection_name}: {e}") + print(f"Failing migration for collection: {collection_name}") + raise + + update_migration_report(collection_name, total_docs, migrated_docs, skipped_docs) diff --git a/agenta-backend/agenta_backend/models/db_engine.py b/agenta-backend/agenta_backend/models/db_engine.py index 361bff503..f9379a969 100644 --- a/agenta-backend/agenta_backend/models/db_engine.py +++ b/agenta-backend/agenta_backend/models/db_engine.py @@ -95,8 +95,15 @@ class DBEngine: def __init__(self) -> None: self.mode = os.environ.get("DATABASE_MODE", "v2") - self.postgres_uri = os.environ.get("POSTGRES_URI", None) + self.postgres_uri = os.environ.get("POSTGRES_URI") self.mongo_uri = os.environ.get("MONGODB_URI") + self.engine = create_async_engine(url=self.postgres_uri) # type: ignore + self.async_session_maker = async_sessionmaker( + bind=self.engine, class_=AsyncSession, expire_on_commit=False + ) + self.async_session = async_scoped_session( + session_factory=self.async_session_maker, scopefunc=current_task + ) async def initialize_async_postgres(self): """ @@ -106,14 +113,6 @@ async def initialize_async_postgres(self): if not self.postgres_uri: raise ValueError("Postgres URI cannot be None.") - self.engine = create_async_engine(self.postgres_uri) - self.async_session_maker = async_sessionmaker( - bind=self.engine, class_=AsyncSession, expire_on_commit=False - ) - self.async_session = async_scoped_session( - session_factory=self.async_session_maker, scopefunc=current_task - ) - async with self.engine.begin() as conn: # Drop and create tables if needed for model in models: @@ -184,9 +183,5 @@ async def close(self): await self.engine.dispose() - self.engine = None - self.async_session_maker = None - self.async_session = None - db_engine = DBEngine() diff --git a/agenta-backend/agenta_backend/models/db_models.py b/agenta-backend/agenta_backend/models/db_models.py index b9ff6e319..1b1444b50 100644 --- a/agenta-backend/agenta_backend/models/db_models.py +++ b/agenta-backend/agenta_backend/models/db_models.py @@ -633,6 +633,13 @@ class EvaluationScenarioDB(Base): class IDsMappingDB(Base): __tablename__ = "ids_mapping" + id = Column( + UUID(as_uuid=True), + primary_key=True, + default=uuid.uuid7, + unique=True, + nullable=False, + ) table_name = Column(String, nullable=False) - objectid = Column(String, primary_key=True) + objectid = Column(String, nullable=False) uuid = Column(UUID(as_uuid=True), nullable=False) diff --git a/agenta-backend/agenta_backend/services/db_manager.py b/agenta-backend/agenta_backend/services/db_manager.py index ffd153911..1bdca299f 100644 --- a/agenta-backend/agenta_backend/services/db_manager.py +++ b/agenta-backend/agenta_backend/services/db_manager.py @@ -166,7 +166,7 @@ async def get_image_by_id(image_id: str) -> ImageDB: result = await session.execute( select(ImageDB).filter_by(id=uuid.UUID(image_id)) ) - image = result.scalars().one_or_none() + image = result.scalars().first() return image @@ -187,7 +187,7 @@ async def fetch_app_by_id(app_id: str) -> AppDB: ) result = await session.execute(base_query) - app = result.unique().scalars().one_or_none() + app = result.unique().scalars().first() return app @@ -222,7 +222,7 @@ async def fetch_app_variant_by_id( ) result = await session.execute(query.filter_by(id=uuid.UUID(app_variant_id))) - app_variant = result.scalars().one_or_none() + app_variant = result.scalars().first() return app_variant @@ -242,7 +242,7 @@ async def fetch_app_variant_by_base_id(base_id: str) -> Optional[AppVariantDB]: result = await session.execute( select(AppVariantDB).filter_by(base_id=uuid.UUID(base_id)) ) - app_variant = result.scalars().one_or_none() + app_variant = result.scalars().first() return app_variant @@ -268,7 +268,7 @@ async def fetch_app_variant_by_base_id_and_config_name( base_id=uuid.UUID(base_id), config_name=config_name ) ) - app_variant = result.scalars().one_or_none() + app_variant = result.scalars().first() return app_variant @@ -294,7 +294,7 @@ async def fetch_app_variant_revision_by_variant( variant_id=uuid.UUID(app_variant_id), revision=revision ) ) - app_variant_revision = result.scalars().one_or_none() + app_variant_revision = result.scalars().first() if app_variant_revision is None: raise Exception( f"app variant revision for app_variant {app_variant_id} and revision {revision} not found" @@ -320,7 +320,7 @@ async def fetch_base_by_id(base_id: str) -> Optional[VariantBaseDB]: ) .filter_by(id=uuid.UUID(base_id)) ) - base = result.scalars().one_or_none() + base = result.scalars().first() return base @@ -343,7 +343,7 @@ async def fetch_app_variant_by_name_and_appid( variant_name=variant_name, app_id=uuid.UUID(app_id) ) ) - app_variant = result.scalars().one_or_none() + app_variant = result.scalars().first() return app_variant @@ -684,7 +684,7 @@ async def get_deployment_by_id( result = await session.execute( select(DeploymentDB).filter_by(id=uuid.UUID(deployment_id)) ) - deployment = result.scalars().one_or_none() + deployment = result.scalars().first() return deployment @@ -702,7 +702,7 @@ async def get_deployment_by_appid(app_id: str) -> DeploymentDB: result = await session.execute( select(DeploymentDB).filter_by(app_id=uuid.UUID(app_id)) ) - deployment = result.scalars().one_or_none() + deployment = result.scalars().first() logger.debug(f"deployment: {deployment}") return deployment @@ -781,7 +781,7 @@ async def get_user(user_uid: str) -> UserDB: async with db_engine.get_session() as session: result = await session.execute(select(UserDB).filter_by(uid=user_uid)) - user = result.scalars().one_or_none() + user = result.scalars().first() if user is None and isCloudEE(): raise Exception("Please login or signup") @@ -814,7 +814,7 @@ async def get_user_with_id(user_id: str): async with db_engine.get_session() as session: result = await session.execute(select(UserDB).filter_by(id=uuid.UUID(user_id))) - user = result.scalars().one_or_none() + user = result.scalars().first() if user is None: logger.error("Failed to get user with id") raise Exception("Error while getting user") @@ -844,7 +844,7 @@ async def get_user_with_email(email: str): async with db_engine.get_session() as session: result = await session.execute(select(UserDB).filter_by(email=email)) - user = result.scalars().one_or_none() + user = result.scalars().first() return user @@ -893,7 +893,7 @@ async def get_orga_image_instance_by_docker_id( ) result = await session.execute(query) - image = result.scalars().one_or_none() + image = result.scalars().first() return image @@ -931,7 +931,7 @@ async def get_orga_image_instance_by_uri( ) result = await session.execute(query) - image = result.scalars().one_or_none() + image = result.scalars().first() return image @@ -947,7 +947,7 @@ async def get_app_instance_by_id(app_id: str) -> AppDB: async with db_engine.get_session() as session: result = await session.execute(select(AppDB).filter_by(id=uuid.UUID(app_id))) - app = result.scalars().one_or_none() + app = result.scalars().first() return app @@ -1158,7 +1158,7 @@ async def remove_deployment(deployment_id: str): result = await session.execute( select(DeploymentDB).filter_by(id=uuid.UUID(deployment_id)) ) - deployment = result.scalars().one_or_none() + deployment = result.scalars().first() if not deployment: raise NoResultFound(f"Deployment with {deployment_id} not found") @@ -1254,7 +1254,7 @@ async def deploy_to_environment( app_id=app_variant_db.app_id, name=environment_name ) ) - environment_db = result.scalars().one_or_none() + environment_db = result.scalars().first() if environment_db is None: raise ValueError(f"Environment {environment_name} not found") @@ -1298,7 +1298,7 @@ async def fetch_app_environment_by_name_and_appid( joinedload(AppEnvironmentDB.deployed_app_variant.of_type(AppVariantDB)), # type: ignore ) result = await session.execute(query) - app_environment = result.scalars().one_or_none() + app_environment = result.scalars().first() return app_environment @@ -1318,7 +1318,7 @@ async def fetch_app_variant_revision_by_id( result = await session.execute( select(AppVariantRevisionsDB).filter_by(id=uuid.UUID(variant_revision_id)) ) - app_revision = result.scalars().one_or_none() + app_revision = result.scalars().first() return app_revision @@ -1359,7 +1359,7 @@ async def fetch_app_environment_revision(revision_id: str) -> AppEnvironmentRevi result = await session.execute( select(AppEnvironmentRevisionDB).filter_by(id=uuid.UUID(revision_id)) ) - environment_revision = result.scalars().one_or_none() + environment_revision = result.scalars().first() return environment_revision @@ -1398,7 +1398,7 @@ async def update_app_environment_deployed_variant_revision( id=uuid.UUID(deployed_variant_revision) ) ) - app_variant_revision = result.scalars().one_or_none() + app_variant_revision = result.scalars().first() if app_variant_revision is None: raise Exception( f"App variant revision {deployed_variant_revision} not found" @@ -1407,7 +1407,7 @@ async def update_app_environment_deployed_variant_revision( app_environment_result = await session.execute( select(AppEnvironmentDB).filter_by(id=uuid.UUID(app_environment_id)) ) - app_environment = app_environment_result.scalars().one_or_none() + app_environment = app_environment_result.scalars().first() app_environment.deployed_app_variant_revision_id = app_variant_revision.id # type: ignore await session.commit() @@ -1600,7 +1600,7 @@ async def fetch_app_variant_revision(app_variant: str, revision_number: int): ) # type: ignore ) result = await session.execute(query) - app_variant_revisions = result.scalars().one_or_none() + app_variant_revisions = result.scalars().first() return app_variant_revisions @@ -1645,7 +1645,7 @@ async def remove_image(image: ImageDB): async with db_engine.get_session() as session: result = await session.execute(select(ImageDB).filter_by(id=image.id)) - image = result.scalars().one_or_none() + image = result.scalars().first() await session.delete(image) await session.commit() @@ -1739,7 +1739,7 @@ async def remove_base_from_db(base_id: str): result = await session.execute( select(VariantBaseDB).filter_by(id=uuid.UUID(base_id)) ) - base = result.scalars().one_or_none() + base = result.scalars().first() if not base: raise NoResultFound(f"Base with id {base_id} not found") @@ -1764,7 +1764,7 @@ async def remove_app_by_id(app_id: str): assert app_id is not None, "app_id cannot be None" async with db_engine.get_session() as session: result = await session.execute(select(AppDB).filter_by(id=uuid.UUID(app_id))) - app_db = result.scalars().one_or_none() + app_db = result.scalars().first() if not app_db: raise NoResultFound(f"App with id {app_id} not found") @@ -1792,7 +1792,7 @@ async def update_variant_parameters( result = await session.execute( select(AppVariantDB).filter_by(id=uuid.UUID(app_variant_id)) ) - app_variant_db = result.scalars().one_or_none() + app_variant_db = result.scalars().first() if not app_variant_db: raise NoResultFound(f"App variant with id {app_variant_id} not found") @@ -1836,7 +1836,7 @@ async def get_app_variant_instance_by_id(variant_id: str) -> AppVariantDB: .options(joinedload(AppVariantDB.base), joinedload(AppVariantDB.app)) .filter_by(id=uuid.UUID(variant_id)) ) - app_variant_db = result.scalars().one_or_none() + app_variant_db = result.scalars().first() return app_variant_db @@ -1856,7 +1856,7 @@ async def get_app_variant_revision_by_id( result = await session.execute( select(AppVariantRevisionsDB).filter_by(id=uuid.UUID(variant_revision_id)) ) - variant_revision_db = result.scalars().one_or_none() + variant_revision_db = result.scalars().first() return variant_revision_db @@ -1878,7 +1878,7 @@ async def fetch_testset_by_id(testset_id: str) -> Optional[TestSetDB]: async with db_engine.get_session() as session: result = await session.execute(select(TestSetDB).filter_by(id=testset_uuid)) - testset = result.scalars().one_or_none() + testset = result.scalars().first() return testset @@ -1921,7 +1921,7 @@ async def update_testset(testset_id: str, values_to_update: dict) -> None: result = await session.execute( select(TestSetDB).filter_by(id=uuid.UUID(testset_id)) ) - testset = result.scalars().one_or_none() + testset = result.scalars().first() # Validate keys in values_to_update and update attributes valid_keys = [key for key in values_to_update.keys() if hasattr(testset, key)] @@ -1973,7 +1973,7 @@ async def fetch_evaluation_by_id(evaluation_id: str) -> Optional[EvaluationDB]: joinedload(EvaluationDB.testset).load_only(TestSetDB.id, TestSetDB.name), # type: ignore ) result = await session.execute(query) - evaluation = result.scalars().one_or_none() + evaluation = result.scalars().first() return evaluation @@ -2145,7 +2145,7 @@ async def fetch_human_evaluation_by_id( joinedload(HumanEvaluationDB.testset).load_only(TestSetDB.id, TestSetDB.name), # type: ignore ) result = await session.execute(query) - evaluation = result.scalars().one_or_none() + evaluation = result.scalars().first() return evaluation @@ -2164,7 +2164,7 @@ async def update_human_evaluation(evaluation_id: str, values_to_update: dict): result = await session.execute( select(HumanEvaluationDB).filter_by(id=uuid.UUID(evaluation_id)) ) - human_evaluation = result.scalars().one_or_none() + human_evaluation = result.scalars().first() if not human_evaluation: raise NoResultFound(f"Human evaluation with id {evaluation_id} not found") @@ -2188,7 +2188,7 @@ async def delete_human_evaluation(evaluation_id: str): result = await session.execute( select(HumanEvaluationDB).filter_by(id=uuid.UUID(evaluation_id)) ) - evaluation = result.scalars().one_or_none() + evaluation = result.scalars().first() if not evaluation: raise NoResultFound(f"Human evaluation with id {evaluation_id} not found") @@ -2250,7 +2250,7 @@ async def update_human_evaluation_scenario( id=uuid.UUID(evaluation_scenario_id) ) ) - human_evaluation_scenario = result.scalars().one_or_none() + human_evaluation_scenario = result.scalars().first() if not human_evaluation_scenario: raise NoResultFound( f"Human evaluation scenario with id {evaluation_scenario_id} not found" @@ -2347,7 +2347,7 @@ async def fetch_evaluation_scenario_by_id( result = await session.execute( select(EvaluationScenarioDB).filter_by(id=uuid.UUID(evaluation_scenario_id)) ) - evaluation_scenario = result.scalars().one_or_none() + evaluation_scenario = result.scalars().first() return evaluation_scenario @@ -2368,7 +2368,7 @@ async def fetch_human_evaluation_scenario_by_id( id=uuid.UUID(evaluation_scenario_id) ) ) - evaluation_scenario = result.scalars().one_or_none() + evaluation_scenario = result.scalars().first() return evaluation_scenario @@ -2389,7 +2389,7 @@ async def fetch_human_evaluation_scenario_by_evaluation_id( evaluation_id=evaluation.id # type: ignore ) ) - human_eval_scenario = result.scalars().one_or_none() + human_eval_scenario = result.scalars().first() return human_eval_scenario @@ -2433,7 +2433,7 @@ async def add_template(**kwargs: dict) -> str: result = await session.execute( select(TemplateDB).filter_by(tag_id=kwargs["tag_id"]) ) - existing_template = result.scalars().one_or_none() + existing_template = result.scalars().first() if existing_template is None: db_template = TemplateDB(**kwargs) @@ -2513,7 +2513,7 @@ async def get_template(template_id: str) -> TemplateDB: result = await session.execute( select(TemplateDB).filter_by(id=uuid.UUID(template_id)) ) - template_db = result.scalars().one_or_none() + template_db = result.scalars().first() return template_db @@ -2570,7 +2570,7 @@ async def update_base( result = await session.execute( select(VariantBaseDB).filter_by(id=uuid.UUID(base_id)) ) - base = result.scalars().one_or_none() + base = result.scalars().first() for key, value in kwargs.items(): if hasattr(base, key): setattr(base, key, value) @@ -2607,7 +2607,7 @@ async def update_app_variant( result = await session.execute( select(AppVariantDB).filter_by(id=uuid.UUID(app_variant_id)) ) - app_variant = result.scalars().one_or_none() + app_variant = result.scalars().first() if not app_variant: raise NoResultFound(f"App variant with id {app_variant_id} not found") @@ -2657,7 +2657,7 @@ async def fetch_app_by_name_and_parameters( query = base_query.join(UserDB).filter(UserDB.uid == user_uid) result = await session.execute(query) - app_db = result.unique().scalars().one_or_none() + app_db = result.unique().scalars().first() return app_db @@ -2939,7 +2939,7 @@ async def fetch_evaluator_config(evaluator_config_id: str): result = await session.execute( select(EvaluatorConfigDB).filter_by(id=uuid.UUID(evaluator_config_id)) ) - evaluator_config = result.scalars().one_or_none() + evaluator_config = result.scalars().first() return evaluator_config @@ -2988,7 +2988,7 @@ async def fetch_evaluator_config_by_appId( app_id=uuid.UUID(app_id), evaluator_key=evaluator_name ) ) - evaluator_config = result.scalars().one_or_none() + evaluator_config = result.scalars().first() return evaluator_config @@ -3039,7 +3039,7 @@ async def update_evaluator_config( result = await session.execute( select(EvaluatorConfigDB).filter_by(id=uuid.UUID(evaluator_config_id)) ) - evaluator_config = result.scalars().one_or_none() + evaluator_config = result.scalars().first() if not evaluator_config: raise NoResultFound( f"Evaluator config with id {evaluator_config_id} not found" @@ -3064,7 +3064,7 @@ async def delete_evaluator_config(evaluator_config_id: str) -> bool: result = await session.execute( select(EvaluatorConfigDB).filter_by(id=uuid.UUID(evaluator_config_id)) ) - evaluator_config = result.scalars().one_or_none() + evaluator_config = result.scalars().first() if evaluator_config is None: raise NoResultFound( f"Evaluator config with id {evaluator_config_id} not found" @@ -3094,7 +3094,7 @@ async def update_evaluation( result = await session.execute( select(EvaluationDB).filter_by(id=uuid.UUID(evaluation_id)) ) - evaluation = result.scalars().one_or_none() + evaluation = result.scalars().first() for key, value in updates.items(): if hasattr(evaluation, key): setattr(evaluation, key, value) diff --git a/agenta-backend/agenta_backend/tests/conftest.py b/agenta-backend/agenta_backend/tests/conftest.py index e78d0f2b4..c603898e2 100644 --- a/agenta-backend/agenta_backend/tests/conftest.py +++ b/agenta-backend/agenta_backend/tests/conftest.py @@ -16,10 +16,11 @@ def event_loop(): res._close = res.close # type: ignore # Initialize database and create tables - res.run_until_complete(DBEngine().init_db()) + db_engine = DBEngine() + res.run_until_complete(db_engine.init_db()) yield res - res.run_until_complete(DBEngine().close()) # close connections to database - res.run_until_complete(DBEngine().remove_db()) # drop database + res.run_until_complete(db_engine.remove_db()) # drop database + res.run_until_complete(db_engine.close()) # close connections to database res._close() # close event loop # type: ignore diff --git a/agenta-backend/poetry.lock b/agenta-backend/poetry.lock index 20bb34137..16b80ad6c 100644 --- a/agenta-backend/poetry.lock +++ b/agenta-backend/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "aioboto3" @@ -2700,6 +2700,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -3899,4 +3900,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "7118cded062bfcd960f08cc272d4eab330222cc1e51e9e03e7b728931642e0b2" +content-hash = "27f39a7d172ca89900f8feb0fd495ddc55977f205571823a19842417976be8c2" diff --git a/agenta-backend/pyproject.toml b/agenta-backend/pyproject.toml index df7a52d63..cd9dd8bc4 100644 --- a/agenta-backend/pyproject.toml +++ b/agenta-backend/pyproject.toml @@ -39,6 +39,7 @@ asyncpg = "^0.29.0" psycopg2-binary = "^2.9.9" uuid-utils = "^0.7.0" sqlalchemy-json = "^0.7.0" +tqdm = "^4.66.4" [tool.poetry.group.dev.dependencies] pytest = "^7.3.1" diff --git a/docker-assets/postgres/init-db.sql b/docker-assets/postgres/init-db.sql new file mode 100644 index 000000000..9bd429417 --- /dev/null +++ b/docker-assets/postgres/init-db.sql @@ -0,0 +1 @@ +CREATE DATABASE agenta_oss; diff --git a/docker-compose.yml b/docker-compose.yml index 59babfa63..c65c10268 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,7 +15,7 @@ services: build: ./agenta-backend environment: - MONGODB_URI=mongodb://username:password@mongo:27017 - - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432 + - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432/agenta_oss - REDIS_URL=redis://redis:6379/0 - ENVIRONMENT=development - DATABASE_MODE=v2 @@ -146,7 +146,7 @@ services: command: > watchmedo auto-restart --directory=./agenta_backend --pattern=*.py --recursive -- celery -A agenta_backend.main.celery_app worker --concurrency=1 --loglevel=INFO environment: - - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432 + - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432/agenta_oss - MONGODB_URI=mongodb://username:password@mongo:27017 - REDIS_URL=redis://redis:6379/0 - CELERY_BROKER_URL=amqp://guest@rabbitmq// @@ -172,12 +172,14 @@ services: environment: POSTGRES_USER: username POSTGRES_PASSWORD: password + POSTGRES_DB: agenta_oss ports: - "5432:5432" networks: - agenta-network volumes: - - postgresdb-data:/var/lib/postgresql/data/ + - postgresdb-data:/var/lib/postgresql/data/ + - ./docker-assets/postgres/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 10s @@ -194,12 +196,13 @@ services: PGADMIN_SERVER_PORT: 5432 PGADMIN_SERVER_USER: "username" PGADMIN_SERVER_PASSWORD: "password" + PGADMIN_SERVER_DB: agenta_oss ports: - "5050:80" networks: - agenta-network volumes: - - pgadmin-data:/var/lib/pgadmin + - pgadmin-data:/var/lib/pgadmin depends_on: postgres: condition: service_healthy @@ -213,4 +216,4 @@ volumes: redis_data: nextjs_cache: postgresdb-data: - pgadmin-data: \ No newline at end of file + pgadmin-data: diff --git a/docs/self-host/migration.mdx b/docs/self-host/migration/migration-to-mongodb.mdx similarity index 97% rename from docs/self-host/migration.mdx rename to docs/self-host/migration/migration-to-mongodb.mdx index 75d272cfd..d382b8558 100644 --- a/docs/self-host/migration.mdx +++ b/docs/self-host/migration/migration-to-mongodb.mdx @@ -1,5 +1,5 @@ --- -title: Migration +title: Migration to MongoDB (Deprecated) description: 'This is a step-by-step guide for upgrading to the latest version of Agenta' --- diff --git a/docs/self-host/migration/migration-to-postgres.mdx b/docs/self-host/migration/migration-to-postgres.mdx new file mode 100644 index 000000000..2d47e151c --- /dev/null +++ b/docs/self-host/migration/migration-to-postgres.mdx @@ -0,0 +1,46 @@ +--- +title: Migration to PostgreSQL +description: 'This is a step-by-step instructions for migrating Agenta to the newly released PostgreSQL version.' +--- + +This guide provides step-by-step instructions for migrating your Agenta instance from MongoDB to the newly released PostgreSQL version. + +> ⚠️ As of version 0.11.0, Agenta is transitioning from MongoDB to PostgreSQL. Users need to migrate their MongoDB databases to this latest version, as this will be the only version receiving feature updates and patches. + +**Table of content:** + - [Prepare for Migration](#prepare-for-migration) + - [Start the Migration](#start-the-migration) + - [Post Migration](#post-migrataion) +
+ +### Prepare for Migration + +Before starting the migration, ensure that you have backed up your production data. + +While the migration will not modify any data in your MongoDB instance, it is highly recommended that you create a [backup](https://www.mongodb.com/docs/manual/tutorial/backup-and-restore-tools/) of your database in the MongoDB instance before running the migration script. This ensures you have a recovery point in case of any issues. + +### Start the Migration + +1. Start the local instance of Agenta, and ensure that both MongoDB and Postgres instances are active. +2. Use the following commands to initiate the migration: +```bash +docker ps +``` +The above command will list the running docker containers that you have. Copy the backend container id and execute bash. + +```bash +docker exec -it {backend-container-id} bash +``` + +Next, navigate to the `mongo_to_postgres` folder to execute the migration script. + +```bash +cd /app/agenta_backend/migrations/mongo_to_postgres +python3 migration.py +``` + +### Post Migration + +After completing the migration, ensure you check the data integrity in PostgreSQL by accessing Agenta on the web and verifying that your data is intact and everything works fine. + +In the event that you encounter issues and need to revert the migration, rest assured that your data in the MongoDB instance is still intact. All you need to do to revert is to check out the last commit you were on before the PostgreSQL migration and create a Github [issue](https://github.com/Agenta-AI/agenta/issues/new?assignees=&labels=postgres,bug,Backend&projects=&template=bug_report.md&title=[Bug]+) describing the problem you encountered.