Skip to content

Commit

Permalink
feat: add delete rows function
Browse files Browse the repository at this point in the history
  • Loading branch information
alimghmi committed Oct 23, 2023
1 parent cacbebd commit 2d76167
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions database/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
class MSSQLDatabaseConnection(DatabaseConnection):
"""Concrete implementation for MSSQL database connection."""

def __init__(self, server, database, username, password):
def __init__(
self, server: str, database: str, username: str, password: str
) -> None:
cnx_string = (
f"mssql+pyodbc://{username}:{password}@"
f"{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server"
Expand All @@ -22,26 +24,27 @@ def __init__(self, server, database, username, password):
class PandasSQLDataInserter(DataInserter):
"""Concrete implementation of data insertion using Pandas to_sql."""

def __init__(self, db_connection: DatabaseConnection, max_retries=3):
def __init__(self, db_connection: DatabaseConnection, max_retries: int = 3) -> None:
super().__init__(db_connection)
self.max_retries = max_retries

def insert(self, df: pd.DataFrame, table_name: str):
def insert(self, df: pd.DataFrame, table_name: str) -> None:
self.delete_rows(table_name=table_name)
schema, name = table_name.split(".")
if self.db_connection.engine is None:
self.db_connection.connect()

for i in range(self.max_retries):
try:
result = df.to_sql(
df.to_sql(
schema=schema,
name=name,
con=self.db_connection.engine,
if_exists="replace",
if_exists="append",
index=False,
)
logger.info(f"Inserted {len(df)} rows into {table_name} table")
return result
return
except exc.SQLAlchemyError as e:
logger.error(
f"Failed to insert data. Attempt {i + 1} of {self.max_retries}. Error: {e}" # noqa: E501
Expand All @@ -50,3 +53,21 @@ def insert(self, df: pd.DataFrame, table_name: str):
raise

time.sleep(i + 1)

def delete_rows(self, table_name: str) -> None:
if self.db_connection.engine is None:
self.db_connection.connect()

connection = self.db_connection.engine.raw_connection()
cursor = connection.cursor()

try:
cursor.execute(f"DELETE FROM {table_name}")
connection.commit()
logger.info(f"Deleted rows from {table_name} successfully.")
except Exception as e:
logger.error(f"Failed to delete rows from {table_name}. Error: {e}")
connection.rollback()
finally:
connection.close()
cursor.close()

0 comments on commit 2d76167

Please sign in to comment.