Skip to content

Commit

Permalink
Remove hardcoded Postgres port (similar to seanharr11#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
quassy committed Feb 8, 2022
1 parent 9285c0f commit 3998f8d
Showing 1 changed file with 25 additions and 24 deletions.
49 changes: 25 additions & 24 deletions etlalchemy/ETLAlchemySource.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def __init__(self,
# after data has been migrated, rather than before
self.unique_columns = []
self.compress_varchar = compress_varchar

self.logger = logging.getLogger("ETLAlchemySource")
self.logger.propagate = False

for h in list(self.logger.handlers):
# Clean up any old loggers...(useful during testing w/ multiple
# log_files)
Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(self,

self.tgt_insp = None
self.src_insp = None

self.dst_engine = None
self.constraints = {}
self.indexes = {}
Expand Down Expand Up @@ -181,7 +181,7 @@ def standardize_column_type(self, column, raw_rows):
column.type.__class__.__bases__)
self.logger.info("({0}) {1}".format(column.name,
column.type.__class__.__name__))
self.logger.info("Bases: {0}".format(str(base_classes)))
self.logger.info("Bases: {0}".format(str(base_classes)))

# Assume the column is empty, unless told otherwise
null = True
Expand Down Expand Up @@ -462,7 +462,7 @@ def standardize_column_type(self, column, raw_rows):
column.name,
column.table.name
)

return column_copy

def add_or_eliminate_column(
Expand All @@ -477,7 +477,7 @@ def add_or_eliminate_column(
table_name = T.name
null = True
idx = self.current_ordered_table_columns.index(column.name)

cname = column_copy.name
columnHasGloballyIgnoredSuffix = len(
filter(
Expand Down Expand Up @@ -689,16 +689,17 @@ def send_data(self, table, columns):
password = self.dst_engine.url.password
db_name = self.dst_engine.url.database
host = self.dst_engine.url.host

port = self.dst_engine.url.port

import psycopg2
conn = psycopg2.connect(
"""
host='{0}'
port='5432'
dbname='{1}'
user='{2}'
password='{3}'
""".format(host, db_name, username, password))
f"""
host='{host}'
port='{port}'
dbname='{db_name}'
user='{username}'
password='{password}'
""")
cur = conn.cursor()
# Legacy method (doesn't work if not superuser, and if file is
# LOCAL
Expand Down Expand Up @@ -880,7 +881,7 @@ def migrate(
""""""""""""""""""""""""
""" ** REFLECTION ** """
""""""""""""""""""""""""

buffer_size = 10000

if self.database_url.split(":")[0] == "oracle+cx_oracle":
Expand Down Expand Up @@ -919,7 +920,7 @@ def migrate(
elif self.excluded_tables:
TablesIterator = list(set(TablesIterator) -
set(self.excluded_tables))

t_idx = -1
t_total = len(TablesIterator)
self.logger.info("""
Expand Down Expand Up @@ -996,7 +997,7 @@ def migrate(
self.logger.info(
"Building query to fetch all rows from {0}".format(
T_src.name))


cnt = self.engine.execute(T_src.count()).fetchone()[0]
resultProxy = self.engine.execute(T_src.select())
Expand All @@ -1023,7 +1024,7 @@ def migrate(

# TODO: Use column/table mappers, would need to update foreign
# keys...

for column in T_src.columns:
self.column_count += 1
##############################
Expand All @@ -1033,7 +1034,7 @@ def migrate(
if column.primary_key:
pks.append(column.name.lower())
pk_count += 1

if column.autoincrement:
auto_inc_count += 1
##############################
Expand Down Expand Up @@ -1088,7 +1089,7 @@ def migrate(
#self.tgt_insp.reflecttable(T, None)
t_start_dump = datetime.now()
t_start_load = datetime.now()

row_buffer_size = 100000
if self.dst_engine.dialect.name.lower() == 'mssql' and \
not self.enable_mssql_bulk_insert:
Expand Down Expand Up @@ -1227,7 +1228,7 @@ def add_indexes(self, destination_database_url):
# (i.e. can't create Idx w/ same name as column in Postgresql)
name = "IDX_" + table_name + "__" + \
"_".join(col) + "__" + str(this_idx_count)
# Max length of identifier is 63 characters in
# Max length of identifier is 63 characters in
# postgresql & mysql
if len(name) > 63:
name = name[:60] + "_" + str(this_idx_count)
Expand Down Expand Up @@ -1341,7 +1342,7 @@ def add_fks(self, destination_database_url):
# Add FKs
############################
dst_meta = MetaData()

if self.dst_engine.dialect.name.lower() == "mssql":
raise Exception(
"Adding Constraints to MSSQL is not supported" +
Expand Down Expand Up @@ -1426,7 +1427,7 @@ def add_fks(self, destination_database_url):
constrained_cols = filter(lambda c: c is not None,
map(lambda x: T.columns.get(x),
constrained_columns))


################################
# Check that the constrained columns
Expand Down Expand Up @@ -1469,7 +1470,7 @@ def add_fks(self, destination_database_url):
table_name.upper(), T_ref.name.upper())
if len(constraint_name) > 63:
constraint_name = constraint_name[:63]

try:
inspector.reflecttable(T_ref, None)
except sqlalchemy.exc.NoSuchTableError as e:
Expand Down

0 comments on commit 3998f8d

Please sign in to comment.