Skip to content

Commit

Permalink
fix style
Browse files Browse the repository at this point in the history
  • Loading branch information
weaverba137 committed Jan 9, 2025
1 parent 349bfb6 commit 0db0c0d
Showing 1 changed file with 33 additions and 18 deletions.
51 changes: 33 additions & 18 deletions dlairflow/postgresql.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,45 @@
# Licensed under a BSD-style 3-clause license - see LICENSE.md.
# -*- coding: utf-8 -*-
"""
postgresql
==========
dlairflow.postgresql
====================
Standard tasks for working with PostgreSQL that can be imported into a DAG.
"""
import os
from airflow.operators.bash import BashOperator
from airflow.hooks.base import BaseHook
from .util import user_scratch


def _connection_to_environment(connection):
"""Convert a database connection to environment variables.
Parameters
----------
connection : :class:`str`
An Airflow database connection string.
Returns
-------
:class:`dict`
A dictionary suitable for passing to the ``env`` keyword on, *e.g.*
:class:`~airflow.operators.bash.BashOperator`.
"""
conn = BaseHook.get_connection(connection)
env = {'PGUSER': conn.login,
'PGPASSWORD': conn.password,
'PGHOST': conn.host,
'PGDATABASE': conn.schema}
return env


def pg_dump_schema(connection, schema, dump_dir=None):
"""Dump an entire database schema using :command:`pg_dump`.
Parameters
----------
connection : :class:`str`
An Airflow database connection string
An Airflow database connection string.
schema : :class:`str`
The name of the database schema.
dump_dir : :class:`str`, optional
Expand All @@ -32,17 +53,14 @@ def pg_dump_schema(connection, schema, dump_dir=None):
"""
if dump_dir is None:
dump_dir = user_scratch()
conn = BaseHook.get_connection(connection)
return BashOperator(task_id="pg_dump",
pg_env = _connection_to_environment(connection)
return BashOperator(task_id="pg_dump_schema",
bash_command=("[[ -f {{ params.dump_dir }}/{{ params.schema }}.dump ]] || " +
"pg_dump --schema={{ params.schema }} --format=c " +
"--file={{ params.dump_dir }}/{{ params.schema }}.dump"),
params={'schema': schema,
'dump_dir': dump_dir},
env={'PGUSER': conn.login,
'PGPASSWORD': conn.password,
'PGHOST': conn.host,
'PGDATABASE': conn.schema},
env=pg_env,
append_env=True)


Expand All @@ -52,7 +70,7 @@ def pg_restore_schema(connection, schema, dump_dir=None):
Parameters
----------
connection : :class:`str`
An Airflow database connection string
An Airflow database connection string.
schema : :class:`str`
The name of the database schema.
dump_dir : :class:`str`, optional
Expand All @@ -62,18 +80,15 @@ def pg_restore_schema(connection, schema, dump_dir=None):
Returns
-------
:class:`~airflow.operators.bash.BashOperator`
A BashOperator that will execute :command:`pg_dump`.
A BashOperator that will execute :command:`pg_restore`.
"""
if dump_dir is None:
dump_dir = user_scratch()
conn = BaseHook.get_connection(connection)
return BashOperator(task_id="pg_restore",
pg_env = _connection_to_environment(connection)
return BashOperator(task_id="pg_restore_schema",
bash_command=("[[ -f {{ params.dump_dir }}/{{ params.schema }}.dump ]] && " +
"pg_restore {{ params.dump_dir }}/{{ params.schema }}.dump"),
params={'schema': schema,
'dump_dir': dump_dir},
env={'PGUSER': conn.login,
'PGPASSWORD': conn.password,
'PGHOST': conn.host,
'PGDATABASE': conn.schema},
env=pg_env,
append_env=True)

0 comments on commit 0db0c0d

Please sign in to comment.