Skip to content

Commit

Permalink
gpsync v0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
wonb168 authored Feb 21, 2023
1 parent 804876c commit caf5c19
Showing 1 changed file with 32 additions and 32 deletions.
64 changes: 32 additions & 32 deletions gpsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def gen_copyfile(cfg_file='config.toml'):
# %%
def run_sql(sql,dest,dbname):
conn = psycopg2.connect(database=dbname, user=dest['usr'], password=dest['pwd'], host=dest['host'], port=dest['port'])
print(conn)
cur = conn.cursor()
cur.execute(sql)
conn.commit()
Expand Down Expand Up @@ -125,33 +126,33 @@ def split_dump(file_path):
REVOKE FROM
GRANT FROM
"""
# 反转sql
def reverse_sql(sql):
obj=None
rsql=''
if sql.startswith('CREATE TABLE'):
obj=re.findall(r'^CREATE TABLE (\S+)', sql)[0]
rsql=f'drop table {obj};'
if sql.startswith('CREATE FUNCTION'):
obj=re.findall(r'^CREATE FUNCTION (.*?\))', sql)[0]
rsql=f'drop function {obj};'
if sql.startswith('CREATE INDEX'):
obj=re.findall(r'^CREATE INDEX (\S+)', sql)[0]
rsql=f'drop index if exists {obj};'#若该表先删,则执行报错
if sql.startswith('GRANT'):
obj=re.findall(r'^GRANT (.*+) TO (.*;)', sql)[0]
rsql=f'revoke {obj[0]} from {obj[1]};'
return rsql
#%%
def dest_has(dump_dest,dump_src):
# 若表空间不一致,直接退出
# compare_tablespace(dump_src,dump_dest)
has=[elem for elem in dump_dest if elem not in dump_src]
print('need del in dest db:',len(has))
dels=[]
for o in has:
print(o)
if (obj:=re.findall(r'^CREATE TABLE (\S+)', o)):
print(obj,len(obj))
sql=f'drop table {obj[0]};'
print(sql)
dels.append(sql)
if (obj:=re.findall(r'^CREATE FUNCTION (.*?\))', o)):
sql=f'drop function {obj[0]};'
print(sql)
dels.append(sql)
if (obj:=re.findall(r'^CREATE INDEX (\S+)', o)):
sql=f'drop index {obj[0]};'
print(sql)
dels.append(sql)
if (obj:=re.findall(r'^GRANT (.*+) TO (.*;)', o)):
sql=f'revoke {obj[0]} from {obj[1]};'
print(sql)
dels.append(sql)
# 目标库中执行dels
return dest_has
for h in has:
dels.append(reverse_sql(h))
return has,dels

# %%
# 源库多的(源库新增或修改)
Expand All @@ -168,24 +169,28 @@ def src_adds(dump_src,dump_dest,dels):
def sync_schema(dbinfo,dbinfo2,dbname):
filename=dbname+'_src.sql'
filename2=dbname+'_dest.sql'
dump(dbinfo,dbname,filename)
dump(dbinfo2,dbname,filename2)
# dump(dbinfo,dbname,filename)
# dump(dbinfo2,dbname,filename2)

dump_src=split_dump(filename)
print(len(dump_src))
dump_dest=split_dump(filename2)
print(len(dump_dest))

dels=dest_has(dump_dest,dump_src)
src_adds(dump_src,dump_dest,dels)
has,dels=dest_has(dump_dest,dump_src)
# 目标库中先执行反向sql
print('反向sql:\n','\n'.join(dels))
run_sql('\n'.join(dels),dbinfo2,dbname)
# 再增加源的sql
src_adds(dump_src,dump_dest,has)

# %%
if __name__=='__main__':
dbname='mdmaster_platform'#'mdmaster_bsgj_dev551_product_dev'
dbinfo={"host":"192.168.200.101", "port":2345, "usr":"gpadmin", "pwd":"密码"}
dbinfo={"host":"192.168.200.207", "port":2345, "usr":"gpadmin", "pwd":"密码"}
dbinfo2={"host":"192.168.200.73", "port":2345, "usr":"gpadmin", "pwd":"密码"}
print('begint to sync schema...')
# sync_schema(dbinfo,dbinfo2,dbname)
sync_schema(dbinfo,dbinfo2,dbname)
print('begint to sync data...')
copy_data(dbinfo,dbinfo2,dbname)

Expand All @@ -194,8 +199,3 @@ def sync_schema(dbinfo,dbinfo2,dbname):








0 comments on commit caf5c19

Please sign in to comment.