Skip to content

Commit

Permalink
added database backup functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
esalonico committed Aug 4, 2023
1 parent 2c89192 commit c9ab470
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 23 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ __pycache__/
backups
private
scrapes_csv
test.py

# logging
logs/
logs/
27 changes: 27 additions & 0 deletions flight_analysis.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,33 @@
"source": [
"flights._url"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"import private.private as private\n",
"from src.flight_analysis.database import Database\n",
"\n",
"db = Database(\n",
" db_host=private.DB_HOST,\n",
" db_name=private.DB_NAME,\n",
" db_user=private.DB_USER,\n",
" db_pw=private.DB_PW,\n",
" db_table=private.DB_TABLE,\n",
")\n",
"\n",
"db\n",
"\n",
"cursor = db.conn.cursor()\n",
"cursor.execute(f\"SELECT * FROM {self.db_name}\")\n",
"\n",
"for row in cursor:\n",
" print(row)\n",
" break"
]
}
],
"metadata": {
Expand Down
3 changes: 3 additions & 0 deletions flight_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,6 @@ def get_routes_df(routes: list):

# add results to database
db.add_pandas_df_to_db(scraped_flights)

# handle backup here
db.dump_database_to_sql_file()
78 changes: 56 additions & 22 deletions src/flight_analysis/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import psycopg2.extras as extras
import os
import logging
from datetime import datetime
import subprocess

# logging
logger_name = os.path.basename(__file__)
Expand All @@ -32,18 +34,19 @@ def connect_to_postgresql(self):
Connect to Postgresql and return a connection object.
"""
try:
conn = psycopg2.connect(host=self.db_host,
database=self.db_name,
user=self.db_user,
password=self.__db_pw)
conn = psycopg2.connect(
host=self.db_host,
database=self.db_name,
user=self.db_user,
password=self.__db_pw,
)
return conn
except Exception as e:
raise ConnectionError(e)

def list_all_databases(self):
cursor = self.conn.cursor()
cursor.execute(
"SELECT datname FROM pg_database WHERE datistemplate = false;")
cursor.execute("SELECT datname FROM pg_database WHERE datistemplate = false;")
result = cursor.fetchall()
cursor.close()

Expand All @@ -52,7 +55,8 @@ def list_all_databases(self):
def list_all_tables(self):
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM information_schema.tables WHERE table_schema = 'public';")
"SELECT * FROM information_schema.tables WHERE table_schema = 'public';"
)
result = cursor.fetchall()
cursor.close()

Expand Down Expand Up @@ -100,7 +104,7 @@ def create_scraped_table(self, overwrite):
ALTER TABLE IF EXISTS public.scraped OWNER to postgres;
"""

cursor = self.conn.cursor()
cursor.execute(query)
cursor.close()
Expand All @@ -115,44 +119,50 @@ def prepare_db_and_tables(self, overwrite_table=False):

# create table
self.create_scraped_table(overwrite_table)

def transform_and_clean_df(self, df):
"""
Some necessary cleaning and transforming operations to the df
before sending its content to the database
"""

df["airlines"] = df.airlines.apply(lambda x: np.array(ast.literal_eval(str(x).replace("[", '"{').replace("]", '}"'))))
df['layover_time'] = df['layover_time'].fillna(-1)
df["layover_location"] = df["layover_location"].fillna(np.nan).replace([np.nan], [None])
df["airlines"] = df.airlines.apply(
lambda x: np.array(
ast.literal_eval(str(x).replace("[", '"{').replace("]", '}"'))
)
)
df["layover_time"] = df["layover_time"].fillna(-1)
df["layover_location"] = (
df["layover_location"].fillna(np.nan).replace([np.nan], [None])
)
df["price_value"] = df["price_value"].fillna(np.nan).replace([np.nan], [None])

return df

def add_pandas_df_to_db(self, df):
# clean df
df = self.transform_and_clean_df(df)

# Create a list of tuples from the dataframe values
tuples = [tuple(x) for x in df.to_numpy()]

# Comma-separated dataframe columns
cols = ','.join(list(df.columns))
cols = ",".join(list(df.columns))

cursor = self.conn.cursor()

# SQL quert to execute
query = "INSERT INTO %s(%s) VALUES %%s" % (self.db_table, cols)
query = "INSERT INTO %s(%s) VALUES %%s" % (self.db_table, cols)
try:
extras.execute_values(cursor, query, tuples)
except (Exception, psycopg2.DatabaseError) as error:
logger.error("Error: %s" % error)
self.conn.rollback()
cursor.close()

logger.info("{} rows added to table [{}]".format(len(df), self.db_table))
cursor.close()

# fix layover time
# TODO: improve this
cursor = self.conn.cursor()
Expand All @@ -165,4 +175,28 @@ def add_pandas_df_to_db(self, df):
ALTER COLUMN layover_time TYPE smallint;
"""
cursor.execute(query)
cursor.close()
cursor.close()

def dump_database_to_sql_file(self):
"""
Dump the database to a .tar file.
Returns: the path to the dumped .tar file.
"""
BACKUPS_FOLDER_NAME = "db_backups"

# specify the backup file and folder name
date_str = datetime.now().strftime("%Y%m%d%H%M%S")
backup_folder = os.path.join(os.path.dirname(__file__), BACKUPS_FOLDER_NAME)
backup_file = os.path.join(backup_folder, f"{date_str}_{self.db_name}.tar")

# run the pg_dump command to create a backup
dump_command = f"pg_dump --format=tar -U {self.db_user} -f {backup_file} {self.db_name}"
try:
subprocess.call(dump_command)
return backup_file
except Exception as e:
logger.error(f"Error while dumping database to file: {e}")




0 comments on commit c9ab470

Please sign in to comment.