diff --git a/.github/workflows/main_ci.yml b/.github/workflows/main_ci.yml index f7f09bb5..61249a26 100644 --- a/.github/workflows/main_ci.yml +++ b/.github/workflows/main_ci.yml @@ -69,6 +69,14 @@ jobs: SERVICE_ACCOUNT_GCP: op://Engineering - dbt-coves/dbt-coves-tests/SERVICE_ACCOUNT_GCP PROJECT_BIGQUERY: op://Engineering - dbt-coves/dbt-coves-tests/PROJECT_BIGQUERY DATASET_BIGQUERY: op://Engineering - dbt-coves/dbt-coves-tests/DATASET_BIGQUERY + # Blue green + DATACOVES__DBT_COVES_TEST__ACCOUNT: op://Engineering - dbt-coves/dbt-coves-tests/ACCOUNT_SNOWFLAKE + DATACOVES__DBT_COVES_TEST__USER: op://Engineering - dbt-coves/dbt-coves-tests/USER_SNOWFLAKE + DATACOVES__DBT_COVES_TEST__PASSWORD: op://Engineering - dbt-coves/dbt-coves-tests/PASSWORD_SNOWFLAKE + DATACOVES__DBT_COVES_TEST__WAREHOUSE: op://Engineering - dbt-coves/dbt-coves-tests/WAREHOUSE_SNOWFLAKE + DATACOVES__DBT_COVES_TEST__DATABASE: op://Engineering - dbt-coves/dbt-coves-tests/DATABASE_SNOWFLAKE + DATACOVES__DBT_COVES_TEST__SCHEMA: op://Engineering - dbt-coves/dbt-coves-tests/SCHEMA_SNOWFLAKE + DATACOVES__DBT_COVES_TEST__ROLE: op://Engineering - dbt-coves/dbt-coves-tests/ROLE_SNOWFLAKE - name: Create profiles run: | diff --git a/.gitignore b/.gitignore index 4e8654b3..1be6a23f 100644 --- a/.gitignore +++ b/.gitignore @@ -151,4 +151,3 @@ cython_debug/ **.vim** .gitsecret/keys/random_seed !*.secret -profiles.yml diff --git a/dbt_coves/config/config.py b/dbt_coves/config/config.py index 63beb9ea..05c4687e 100644 --- a/dbt_coves/config/config.py +++ b/dbt_coves/config/config.py @@ -154,6 +154,18 @@ class DataSyncModel(BaseModel): snowflake: Optional[SnowflakeDataSyncModel] = SnowflakeDataSyncModel() +class BlueGreenModel(BaseModel): + service_connection_name: Optional[str] = "" + staging_database: Optional[str] = "" + staging_suffix: Optional[str] = "" + drop_staging_db_at_start: Optional[bool] = False + keep_staging_db_on_success: Optional[bool] = False + drop_staging_db_on_failure: Optional[bool] = False + dbt_selector: Optional[str] = "" + defer: Optional[bool] = False + full_refresh: Optional[bool] = False + + class ConfigModel(BaseModel): generate: Optional[GenerateModel] = GenerateModel() extract: Optional[ExtractModel] = ExtractModel() @@ -162,6 +174,7 @@ class ConfigModel(BaseModel): dbt: Optional[RunDbtModel] = RunDbtModel() data_sync: Optional[DataSyncModel] = DataSyncModel() disable_tracking: Optional[bool] = False + blue_green: Optional[BlueGreenModel] = BlueGreenModel() class DbtCovesConfig: @@ -249,6 +262,15 @@ class DbtCovesConfig: "load.fivetran.secrets_key", "data_sync.redshift.tables", "data_sync.snowflake.tables", + "blue_green.service_connection_name", + "blue_green.staging_database", + "blue_green.staging_suffix", + "blue_green.drop_staging_db_at_start", + "blue_green.drop_staging_db_on_failure", + "blue_green.dbt_selector", + "blue_green.defer", + "blue_green.full_refresh", + "blue_green.keep_staging_db_on_success", ] def __init__(self, flags: DbtCovesFlags) -> None: diff --git a/dbt_coves/core/main.py b/dbt_coves/core/main.py index d0d24d45..b03dca82 100644 --- a/dbt_coves/core/main.py +++ b/dbt_coves/core/main.py @@ -14,6 +14,7 @@ from dbt_coves.config.config import DbtCovesConfig from dbt_coves.core.exceptions import MissingCommand, MissingDbtProject from dbt_coves.tasks.base import BaseTask +from dbt_coves.tasks.blue_green.main import BlueGreenTask from dbt_coves.tasks.data_sync.main import DataSyncTask from dbt_coves.tasks.dbt.main import RunDbtTask from dbt_coves.tasks.extract.main import ExtractTask @@ -218,7 +219,15 @@ # Register subcommands [ task.register_parser(sub_parsers, base_subparser) - for task in [GenerateTask, SetupTask, ExtractTask, LoadTask, RunDbtTask, DataSyncTask] + for task in [ + GenerateTask, + SetupTask, + ExtractTask, + LoadTask, + RunDbtTask, + DataSyncTask, + BlueGreenTask, + ] ] diff --git a/dbt_coves/tasks/blue_green/__init__.py b/dbt_coves/tasks/blue_green/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dbt_coves/tasks/blue_green/clone_db.py b/dbt_coves/tasks/blue_green/clone_db.py new file mode 100644 index 00000000..2f84b51c --- /dev/null +++ b/dbt_coves/tasks/blue_green/clone_db.py @@ -0,0 +1,214 @@ +import threading +import time + +from rich.console import Console +from snowflake.connector import DictCursor + +console = Console() + + +class CloneDB: + """ + Class to clone a Snowflake database from one to another. This is intended to be used in a blue/green deployment and + will clone the schemas and grants from the blue database to the green database. + """ + + def __init__( + self, blue_database: str, green_database: str, snowflake_conn, thread_count: int = 20 + ): + """ + Blue/Green deployment for Snowflake databases. + Args: + blue_database: The current production database. + green_database: The temporary database where the build will occur. + """ + self.start_time = time.time() + self.time_check = self.start_time + self._list_of_schemas_to_exclude = [ + "INFORMATION_SCHEMA", + "ACCOUNT_USAGE", + "SECURITY", + "SNOWFLAKE", + "UTILS", + "PUBLIC", + ] + self.blue_database = blue_database + self.green_database = green_database + self.con = snowflake_conn + self._thread_count = thread_count + + def drop_database(self): + """ + Utility function to drop the green database. + + Returns: + None + """ + console.print(f"Dropping database [green]{self.green_database}[/green]") + self.con.cursor().execute(f"drop database if exists {self.green_database};") + + def create_database(self, database: str): + """ + Creates the specified database. + """ + console.print(f"Creating database [green]{self.green_database}[/green]") + self.con.cursor().execute(f"create database {database};") + + def clone_database_grants(self, blue_database: str, green_database: str): + """ + Clones the grants from the blue database to the green database. + + Args: + blue_database: The name of the blue database (prod) + green_database: The name of the green database (staging). + + Returns: + None + """ + console.print( + f"Cloning grants from [blue]{self.blue_database}[/blue] to green [green]{self.green_database}[/green]" + ) + dict_cursor = self.con.cursor(DictCursor) + grants_sql_stg_1 = f"""show grants on database {blue_database}""" + dict_cursor.execute(grants_sql_stg_1) + grants = dict_cursor.fetchall() + threaded_run_commands = ThreadedRunCommands(self.con, self._thread_count) + for grant in grants: + grant_sql = ( + f"GRANT {grant['privilege']} ON {grant['granted_on']} {green_database} " + f"TO ROLE {grant['grantee_name']};" + ) + + threaded_run_commands.register_command(grant_sql) + threaded_run_commands.run() + + def clone_database_schemas(self, blue_database: str, green_database: str): + """ + Clones the schemas from the blue database to the green database and clones the existing blue database schema + grants. + + Args: + green_database: The name of the green database (staging). + blue_database: The name of the blue database (prod) + + Returns: + None + """ + console.print( + f"Cloning [u]schemas[/u] from [blue]{self.blue_database}[/blue] to [green]{self.green_database}[/green]" + ) + dict_cursor = self.con.cursor(DictCursor) + dict_cursor.execute(f"show schemas in database {blue_database};") + schemas = dict_cursor.fetchall() + threaded_run_commands = ThreadedRunCommands(self.con, self._thread_count) + + # Clone schemas + for schema in schemas: + if schema["name"] not in self._list_of_schemas_to_exclude: + # Clone each schema + sql = f"create schema {green_database}.{schema['name']} clone {blue_database}.{schema['name']};" + threaded_run_commands.register_command(sql) + threaded_run_commands.run() + console.print(f"Cloned schemas in {time.time() - self.time_check} seconds.") + self.time_check = time.time() + # Copy grants from Blue DB schemas + console.print( + f"Cloning [u]schema grants[/u] from [blue]{self.blue_database}[/blue] to " + f"[green]{self.green_database}[/green]" + ) + for schema in schemas: + if schema["name"] not in self._list_of_schemas_to_exclude: + grants_sql_stg_1 = f"""show grants on schema {blue_database}.{schema['name']}""" + dict_cursor.execute(grants_sql_stg_1) + grants = dict_cursor.fetchall() + for grant in grants: + sql = ( + f"GRANT {grant['privilege']} ON {grant['granted_on']} {green_database}.{schema['name']}" + f"TO ROLE {grant['grantee_name']};" + ) + # Load SQL into the threaded commands to run. + threaded_run_commands.register_command(sql) + threaded_run_commands.run() + print(f"Cloned grants to schemas in {time.time() - self.time_check} seconds.") + self.time_check = time.time() + + +class ThreadedRunCommands: + """Helper class for running queries across a configurable number of threads""" + + def __init__(self, con, threads): + self.threads = threads + self.register_command_thread = 0 + self.thread_commands = [[] for _ in range(self.threads)] + self.con = con + + def register_command(self, command: str): + """ + Register a sql command to be run in a thread. + + Args: + command: A SQL string to be run. + + Returns: + None + """ + self.thread_commands[self.register_command_thread].append(command) + if self.register_command_thread + 1 == self.threads: + self.register_command_thread = 0 + else: + self.register_command_thread += 1 + + def run_commands(self, commands): + """ + Loops over the commands passing off to the "run + Args: + commands: A list of SQL commands to be run async. + + Returns: + None + """ + for command in commands: + self.con.cursor().execute_async(command) + + def run(self): + """ + Run the commands in the threads. + + Returns: + None + """ + procs = [] + for v in self.thread_commands: + proc = threading.Thread(target=self.run_commands, args=(v,)) + procs.append(proc) + proc.start() + # complete the processes + for proc in procs: + proc.join() + + +# if __name__ == "__main__": +# ''' +# This section is really only designed for testing purposes. When used in production, it's is intended that you will +# call the clone_blue_db_to_green method from an external script or directly from the DAG as needed. +# ''' +# parser = argparse.ArgumentParser( +# description="Script to run a blue/green swap") + +# # Add the arguments +# parser.add_argument('--blue-db', type=str, default=os.environ.get('DATACOVES__MAIN__DATABASE'), +# help='The source database.') +# parser.add_argument('--green-db', type=str, help='The name of the green (temporary build) database.') + +# # Parse the arguments +# args = parser.parse_args() + +# # Handle the case when --green-db is not provided +# if args.green_db is None: +# args.green_db = f'{args.blue_db}_STAGING' + +# blue_db = args.blue_db +# green_db = args.green_db + +# c = CloneDB(blue_db, green_db) +# c.clone_blue_db_to_green() diff --git a/dbt_coves/tasks/blue_green/main.py b/dbt_coves/tasks/blue_green/main.py new file mode 100644 index 00000000..ab25f808 --- /dev/null +++ b/dbt_coves/tasks/blue_green/main.py @@ -0,0 +1,234 @@ +import os +import subprocess + +import snowflake.connector +from rich.console import Console +from rich.text import Text + +from dbt_coves.core.exceptions import DbtCovesException +from dbt_coves.tasks.base import NonDbtBaseConfiguredTask +from dbt_coves.utils.tracking import trackable + +from .clone_db import CloneDB + +console = Console() + + +class BlueGreenTask(NonDbtBaseConfiguredTask): + """ + Task that performs a blue-green deployment + """ + + arg_parser = None + + @classmethod + def register_parser(cls, sub_parsers, base_subparser): + ext_subparser = sub_parsers.add_parser( + "blue-green", + parents=[base_subparser], + # help="Run dbt on an isolated environment", + help="""Command to perfrom blue-green dbt runs""", + ) + ext_subparser.set_defaults(cls=cls, which="blue-green") + cls.arg_parser = ext_subparser + ext_subparser.add_argument( + "--service-connection-name", + type=str, + help="Snowflake service connection name", + ) + ext_subparser.add_argument( + "--staging-database", + type=str, + help="Green database", + ) + ext_subparser.add_argument( + "--staging-suffix", + type=str, + help="Green database suffix", + ) + ext_subparser.add_argument( + "--drop-staging-db-at-start", + action="store_true", + help="Drop staging db at start if it already exists", + ) + ext_subparser.add_argument( + "--drop-staging-db-on-failure", + action="store_true", + help="Drop staging db if blue-green fails", + ) + ext_subparser.add_argument( + "--keep-staging-db-on-success", + action="store_true", + help="", + ) + ext_subparser.add_argument( + "--dbt-selector", + type=str, + help="dbt selector(s) to be passed to build operation", + ) + ext_subparser.add_argument( + "--full-refresh", + action="store_true", + help="Perform a full dbt build", + ) + ext_subparser.add_argument("--defer", action="store_true", help="Run in deferral") + return ext_subparser + + def get_config_value(self, key): + return self.coves_config.integrated["blue_green"][key] + + @trackable + def run(self) -> int: + self.service_connection_name = self.get_config_value("service_connection_name").upper() + try: + self.production_database = os.environ[ + f"DATACOVES__{self.service_connection_name}__DATABASE" + ] + except KeyError: + raise DbtCovesException( + f"There is no Database defined for Service Connection {self.service_connection_name}" + ) + staging_database = self.get_config_value("staging_database") + staging_suffix = self.get_config_value("staging_suffix") + if staging_database and staging_suffix: + raise DbtCovesException("Cannot specify both staging_database and staging_suffix") + elif not staging_database and not staging_suffix: + staging_suffix = "STAGING" + self.staging_database = staging_database or f"{self.production_database}_{staging_suffix}" + if self.production_database == self.staging_database: + raise DbtCovesException( + f"Production database {self.production_database} cannot be the same as staging database " + f"{self.staging_database}" + ) + self.drop_staging_db_at_start = self.get_config_value("drop_staging_db_at_start") + self.con = self.snowflake_connection() + + self.cdb = CloneDB( + self.production_database, + self.staging_database, + self.con, + ) + + self._check_and_drop_staging_db() + env = os.environ.copy() + try: + # create staging db + self.cdb.create_database(self.staging_database) + # clones schemas and schema grants from production to pre_production + self.cdb.clone_database_schemas(self.production_database, self.staging_database) + # run dbt build + self._run_dbt_build(env) + # copy db grants from production db + self.cdb.clone_database_grants(self.production_database, self.staging_database) + # Swaps databases: Snowflake sql `alter database {blue} swap with {green}` + self._swap_databases() + # drops pre_production (ex production) + if not self.get_config_value("keep_staging_db_on_success"): + self.cdb.drop_database() + except Exception as e: + if self.get_config_value("drop_staging_db_on_failure"): + self.cdb.drop_database() + raise e + + return 0 + + def _run_dbt_build(self, env): + dbt_build_command: list = self._get_dbt_build_command() + env[f"DATACOVES__{self.service_connection_name}__DATABASE"] = self.staging_database + self._run_command(dbt_build_command, env=env) + + def _run_command(self, command: list, env=os.environ.copy()): + command_string = " ".join(command) + console.print(f"Running [b][i]{command_string}[/i][/b]") + try: + output = subprocess.check_output(command, env=env, stderr=subprocess.PIPE) + console.print( + f"{Text.from_ansi(output.decode())}\n" + f"[green]{command_string} :heavy_check_mark:[/green]" + ) + except subprocess.CalledProcessError as e: + formatted = f"{Text.from_ansi(e.stderr.decode()) if e.stderr else Text.from_ansi(e.stdout.decode())}" + e.stderr = f"An error has occurred running [red]{command_string}[/red]:\n{formatted}" + raise + + def _get_dbt_command(self, command): + """ + Returns the dbt build command to be run. + """ + dbt_selector: str = self.get_config_value("dbt_selector") + is_deferral = self.get_config_value("defer") + dbt_command = ["dbt", command, "--fail-fast"] + if is_deferral or os.environ.get("MANIFEST_FOUND"): + dbt_command.extend(["--defer", "--state", "logs", "-s", "state:modified+"]) + else: + dbt_command.extend(dbt_selector.split()) + if self.get_config_value("full_refresh"): + dbt_command.append("--full-refresh") + if self.args.target: + dbt_command.extend(["-t", self.args.target]) + return dbt_command + + def _get_dbt_build_command(self): + return self._get_dbt_command("build") + + def _swap_databases(self): + """ + Swaps databases: Snowflake sql `alter database {blue} swap with {green}` + """ + console.print("Swapping databases") + try: + sql = f"alter database {self.production_database} swap with {self.staging_database};" + self.con.cursor().execute(sql) + except Exception as e: + print(f"Error swapping databases: {e}") + raise e + + def _check_and_drop_staging_db(self): + """ + Checks if the staging database exists and drops it if it does. + + Returns: + None + """ + green_exists = self._check_if_database_exists() + if green_exists and self.drop_staging_db_at_start: + self.cdb.drop_database() + elif green_exists: + raise DbtCovesException( + f"Green database {self.staging_database} already exists. Please either drop it or use a different name." + ) + + def snowflake_connection(self): + try: + return snowflake.connector.connect( + account=os.environ.get(f"DATACOVES__{self.service_connection_name}__ACCOUNT"), + warehouse=os.environ.get(f"DATACOVES__{self.service_connection_name}__WAREHOUSE"), + database=os.environ.get(f"DATACOVES__{self.service_connection_name}__DATABASE"), + role=os.environ.get(f"DATACOVES__{self.service_connection_name}__ROLE"), + schema=os.environ.get(f"DATACOVES__{self.service_connection_name}__SCHEMA"), + user=os.environ.get(f"DATACOVES__{self.service_connection_name}__USER"), + password=os.environ.get(f"DATACOVES__{self.service_connection_name}__PASSWORD"), + session_parameters={ + "QUERY_TAG": "blue_green_swap", + }, + ) + except Exception as e: + raise DbtCovesException( + f"Couldn't establish Snowflake connection with {self.production_database}: {e}" + ) + + def _check_if_database_exists(self): + """ + Check if the green database exists and fail if it does + + Returns: + None + """ + query = f"SHOW DATABASES LIKE '{self.staging_database}'" + cursor = self.con.cursor() + cursor.execute(query) + result = cursor.fetchone() + if result is not None: + return True + else: + return False diff --git a/dbt_coves/utils/flags.py b/dbt_coves/utils/flags.py index 02aba32b..b6535870 100644 --- a/dbt_coves/utils/flags.py +++ b/dbt_coves/utils/flags.py @@ -138,6 +138,17 @@ def __init__(self, cli_parser: ArgumentParser) -> None: } self.dbt = {"command": None, "project_dir": None, "virtualenv": None, "cleanup": False} self.data_sync = {"redshift": {"tables": []}, "snowflake": {"tables": []}} + self.blue_green = { + "service_connection_name": None, + "staging_database": None, + "staging_suffix": None, + "drop_staging_db_at_start": False, + "drop_staging_db_on_failure": False, + "dbt_selector": None, + "defer": False, + "full_refresh": False, + "keep_staging_db_on_success": False, + } def parse_args(self, cli_args: List[str] = list()) -> None: args = sys.argv[1:] @@ -407,3 +418,28 @@ def parse_args(self, cli_args: List[str] = list()) -> None: self.data_sync["snowflake"]["tables"] = [ table.strip() for table in self.args.tables.split(",") ] + + # blue green + if self.args.cls.__name__ == "BlueGreenTask": + if self.args.service_connection_name: + self.blue_green["service_connection_name"] = self.args.service_connection_name + if self.args.staging_database: + self.blue_green["staging_database"] = self.args.staging_database + if self.args.staging_suffix: + self.blue_green["staging_suffix"] = self.args.staging_suffix + if self.args.drop_staging_db_at_start: + self.blue_green["drop_staging_db_at_start"] = self.args.drop_staging_db_at_start + if self.args.drop_staging_db_on_failure: + self.blue_green[ + "drop_staging_db_on_failure" + ] = self.args.drop_staging_db_on_failure + if self.args.dbt_selector: + self.blue_green["dbt_selector"] = self.args.dbt_selector + if self.args.defer: + self.blue_green["defer"] = self.args.defer + if self.args.full_refresh: + self.blue_green["full_refresh"] = self.args.full_refresh + if self.args.keep_staging_db_on_success: + self.blue_green[ + "keep_staging_db_on_success" + ] = self.args.keep_staging_db_on_success diff --git a/tests/.gitignore b/tests/.gitignore index 8ec4f111..2e3a5547 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -1,6 +1,4 @@ .env !*.secret -dbt_project.yml -profiles.yml generate_sources_cases/*/output service_account.json diff --git a/tests/blue_green/.user.yml b/tests/blue_green/.user.yml new file mode 100644 index 00000000..0da24ae8 --- /dev/null +++ b/tests/blue_green/.user.yml @@ -0,0 +1 @@ +id: f29ef39e-061e-49b7-ab06-93112d5137f0 diff --git a/tests/blue_green/dbt_project.yml b/tests/blue_green/dbt_project.yml new file mode 100644 index 00000000..e51f59bb --- /dev/null +++ b/tests/blue_green/dbt_project.yml @@ -0,0 +1,15 @@ +name: test_dbt_coves_snowflake +version: "1.0.0" +config-version: 2 +profile: default +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_packages" diff --git a/tests/blue_green/input/dummy_data.sql b/tests/blue_green/input/dummy_data.sql new file mode 100644 index 00000000..774aa97f --- /dev/null +++ b/tests/blue_green/input/dummy_data.sql @@ -0,0 +1,35 @@ +CREATE DATABASE IF NOT EXISTS DBT_COVES_TEST; +USE DATABASE DBT_COVES_TEST; + +GRANT USAGE ON DATABASE DBT_COVES_TEST TO ROLE bot_integration; + +CREATE SCHEMA IF NOT EXISTS DBT_COVES_TEST.TESTS_BLUE_GREEN; + +CREATE TABLE IF NOT EXISTS DBT_COVES_TEST.TESTS_BLUE_GREEN.TEST_MODEL ( + test_smallint SMALLINT, + test_integer INTEGER, + test_bigint BIGINT, + test_decimal DECIMAL, + test_numeric NUMERIC, + test_char CHAR, + test_varchar VARCHAR, + test_date DATE, + test_timestamp TIMESTAMP, + test_boolean BOOLEAN, + test_json VARIANT +); + +INSERT INTO DBT_COVES_TEST.TESTS_BLUE_GREEN.TEST_MODEL + +SELECT + 1, + 1, + 1, + 1.1, + 1.1, + 'a', + 'a', + '2020-01-01', + '2020-01-01 00:00:00', + true, + parse_json($${"json_value_1":"abc","json_value_2": 2}$$); diff --git a/tests/blue_green/models/blue_green/schema.yml b/tests/blue_green/models/blue_green/schema.yml new file mode 100644 index 00000000..763be47b --- /dev/null +++ b/tests/blue_green/models/blue_green/schema.yml @@ -0,0 +1,8 @@ +version: 2 + +models: + - name: test_model_2 + description: "A starter dbt model" + columns: + - name: id + description: "The primary key for this table" diff --git a/tests/blue_green/models/blue_green/test_model_2.sql b/tests/blue_green/models/blue_green/test_model_2.sql new file mode 100644 index 00000000..381d90f2 --- /dev/null +++ b/tests/blue_green/models/blue_green/test_model_2.sql @@ -0,0 +1,18 @@ +{{ config(materialized='table') }} + +with source_data as ( + + select 1 as id + union all + select null as id + +) + +select * +from source_data + +/* + Uncomment the line below to remove records with null `id` values +*/ + +-- where id is not null diff --git a/tests/blue_green/profiles.yml b/tests/blue_green/profiles.yml new file mode 100644 index 00000000..3266d220 --- /dev/null +++ b/tests/blue_green/profiles.yml @@ -0,0 +1,13 @@ +default: + outputs: + dev: + account: "{{ env_var('DATACOVES__DBT_COVES_TEST__ACCOUNT') }}" + database: DBT_COVES_TEST_STG + password: "{{ env_var('DATACOVES__DBT_COVES_TEST__PASSWORD') }}" + role: "{{ env_var('DATACOVES__DBT_COVES_TEST__ROLE') }}" + schema: TESTS_BLUE_GREEN + threads: 2 + type: snowflake + user: "{{ env_var('DATACOVES__DBT_COVES_TEST__USER') }}" + warehouse: "{{ env_var('DATACOVES__DBT_COVES_TEST__WAREHOUSE') }}" + target: dev diff --git a/tests/blue_green/settings.yml b/tests/blue_green/settings.yml new file mode 100644 index 00000000..99bf0627 --- /dev/null +++ b/tests/blue_green/settings.yml @@ -0,0 +1,8 @@ +# Test and dbt-coves command settings +table: "TEST_MODEL" +schema: "TESTS_BLUE_GREEN" +database: "DBT_COVES_TEST" + +blue_green: + production_database: DBT_COVES_TEST + drop_staging_db_at_start: True diff --git a/tests/blue_green_test.py b/tests/blue_green_test.py new file mode 100644 index 00000000..ce36aac9 --- /dev/null +++ b/tests/blue_green_test.py @@ -0,0 +1,204 @@ +""" +Test should: + +Create Prod DB and populate with dummy data + Name comes from dbt-coves blue-green --prod flag + Run sql table creation and insert + Grant some usage + +Get info from prod db: + This I have no idea how to, some SHOW GRANTS?? + +Run dbt-coves blue-green: + Test has a TEST_STAGING_MODEL model inside, which should end up being the difference between DBs + + +Asserts: + Prod has 2 models + ... +""" + +import os +import subprocess +from pathlib import Path + +import pytest +import snowflake.connector +from ruamel.yaml import YAML +from snowflake.connector import DictCursor + +yaml = YAML() +try: + FIXTURE_DIR = Path(Path(__file__).parent.absolute(), "blue_green") + SETTINGS = yaml.load(open(f"{FIXTURE_DIR}/settings.yml")) + DBT_COVES_SETTINGS = SETTINGS["blue_green"] +except KeyError: + raise KeyError("blue_green not found in settings") + + +@pytest.fixture(scope="class") +def snowflake_connection(request): + # Check env vars + assert "DATACOVES__DBT_COVES_TEST__USER" in os.environ + assert "DATACOVES__DBT_COVES_TEST__PASSWORD" in os.environ + assert "DATACOVES__DBT_COVES_TEST__ACCOUNT" in os.environ + assert "DATACOVES__DBT_COVES_TEST__WAREHOUSE" in os.environ + assert "DATACOVES__DBT_COVES_TEST__ROLE" in os.environ + + user = os.environ["DATACOVES__DBT_COVES_TEST__USER"] + password = os.environ["DATACOVES__DBT_COVES_TEST__PASSWORD"] + account = os.environ["DATACOVES__DBT_COVES_TEST__ACCOUNT"] + role = os.environ["DATACOVES__DBT_COVES_TEST__ROLE"] + warehouse = os.environ["DATACOVES__DBT_COVES_TEST__WAREHOUSE"] + database = os.environ["DATACOVES__DBT_COVES_TEST__DATABASE"] + schema = "TESTS_BLUE_GREEN" + + conn = snowflake.connector.connect( + user=user, + password=password, + account=account, + warehouse=warehouse, + role=role, + ) + + request.cls.conn = conn + request.cls.warehouse = warehouse + request.cls.schema = schema + request.cls.production_database = database + request.cls.staging_database = ( + f"{request.cls.production_database}_{DBT_COVES_SETTINGS.get('staging_suffix', 'staging')}" + ) + + yield conn + + conn.close() + + +@pytest.mark.usefixtures("snowflake_connection") +class TestBlueGreen: + """ + Test class for testing the blue-green functionality. + """ + + def test_setup_database(self): + """ + Fixture to set up the database for testing. + """ + """ + Fixture to set up the database for testing. + """ + # input dummy data + self._generate_dummy_data() + self.conn.prod_creation_timestamp = self._get_db_creation_timestamp( + self.production_database + ) + self.conn.prod_grants = self._get_db_grants(self.production_database) + + def test_dbt_coves_bluegreen(self): + command = [ + "python", + "../../dbt_coves/core/main.py", + "blue-green", + "--project-dir", + str(FIXTURE_DIR), + "--profiles-dir", + str(FIXTURE_DIR), + "--service-connection-name", + self.production_database, + "--keep-staging-db-on-success", + ] + if DBT_COVES_SETTINGS.get("drop_staging_db_at_start"): + command.append("--drop-staging-db-at-start") + if DBT_COVES_SETTINGS.get("dbt_selector"): + command.extend(["--dbt-selector", DBT_COVES_SETTINGS["dbt_selector"]]) + if DBT_COVES_SETTINGS.get("staging_suffix"): + command.extend(["--staging-suffix", DBT_COVES_SETTINGS["staging_suffix"]]) + self.assert_suffix = True + + # Execute CLI command and interact with it + process = subprocess.run( + args=command, + encoding="utf-8", + cwd=FIXTURE_DIR, + ) + + assert process.returncode == 0 + + def test_blue_green_integrity(self): + """ + Here prod and staging were already swapped + self.prod == ex staging + self.staging == ex prod + Here we'll assert: + - self.staging_database exists with it's name + - Grants match + - self.prod_timestamp is staging timestamp + - prod has 2 models + - staging has 1 model + """ + assert self._check_staging_existence() + new_staging_timestamp = self._get_db_creation_timestamp(self.staging_database) + new_prod_grants = self._get_db_grants(self.production_database) + assert new_staging_timestamp == self.conn.prod_creation_timestamp + assert new_prod_grants == self.conn.prod_grants + cursor = self.conn.cursor(DictCursor) + cursor.execute(f"SHOW TABLES IN {self.production_database}.{self.schema}") + prod_tables = cursor.fetchall() + assert len(prod_tables) == 2 + cursor.execute(f"SHOW TABLES IN {self.staging_database}.{self.schema}") + staging_tables = cursor.fetchall() + assert len(staging_tables) == 1 + cursor.close() + + def test_cleanup(self): + with self.conn.cursor() as cursor: + cursor.execute(f"DROP DATABASE IF EXISTS {self.production_database}") + self.conn.commit() + self.conn.close() + + def _generate_dummy_data(self): + with open(f"{FIXTURE_DIR}/input/dummy_data.sql", "r") as file: + sql_commands = file.read().split(";") + + cursor = self.conn.cursor() + cursor.execute(f"USE WAREHOUSE {self.warehouse};") + for command in sql_commands: + if command.strip(): + cursor.execute(command) + cursor.close() + + def _get_db_creation_timestamp(self, database: str): + # Get a database creation timestamp + timestamp = None + cursor = self.conn.cursor(DictCursor) + cursor.execute("SHOW TERSE DATABASES;") + databases = cursor.fetchall() + for db in databases: + if db["name"] == database.upper(): + timestamp = db["created_on"] + cursor.close() + return timestamp + + def _get_db_grants(self, database: str): + # Get a database creation timestamp and grants + sql_grants = None + cursor = self.conn.cursor(DictCursor) + cursor.execute(f"SHOW GRANTS ON DATABASE {database};") + sql_grants = cursor.fetchall() + grants = [] + # remove created_on from grants + for grant in sql_grants: + grant.pop("created_on", None) + grants.append(grant) + cursor.close() + return sql_grants + + def _check_staging_existence(self): + cursor = self.conn.cursor(DictCursor) + cursor.execute("SHOW TERSE DATABASES;") + databases = cursor.fetchall() + for db in databases: + if db["name"] == self.staging_database.upper(): + return True + cursor.close() + return False