From caf5c192589d148be20adf674be505711ffe42b1 Mon Sep 17 00:00:00 2001 From: wonb168 <37640893@qq.com> Date: Tue, 21 Feb 2023 17:20:37 +0800 Subject: [PATCH] gpsync v0.2 --- gpsync.py | 64 +++++++++++++++++++++++++++---------------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/gpsync.py b/gpsync.py index 6a706d9..b71dbb5 100644 --- a/gpsync.py +++ b/gpsync.py @@ -39,6 +39,7 @@ def gen_copyfile(cfg_file='config.toml'): # %% def run_sql(sql,dest,dbname): conn = psycopg2.connect(database=dbname, user=dest['usr'], password=dest['pwd'], host=dest['host'], port=dest['port']) + print(conn) cur = conn.cursor() cur.execute(sql) conn.commit() @@ -125,33 +126,33 @@ def split_dump(file_path): REVOKE FROM GRANT FROM """ +# 反转sql +def reverse_sql(sql): + obj=None + rsql='' + if sql.startswith('CREATE TABLE'): + obj=re.findall(r'^CREATE TABLE (\S+)', sql)[0] + rsql=f'drop table {obj};' + if sql.startswith('CREATE FUNCTION'): + obj=re.findall(r'^CREATE FUNCTION (.*?\))', sql)[0] + rsql=f'drop function {obj};' + if sql.startswith('CREATE INDEX'): + obj=re.findall(r'^CREATE INDEX (\S+)', sql)[0] + rsql=f'drop index if exists {obj};'#若该表先删,则执行报错 + if sql.startswith('GRANT'): + obj=re.findall(r'^GRANT (.*+) TO (.*;)', sql)[0] + rsql=f'revoke {obj[0]} from {obj[1]};' + return rsql +#%% def dest_has(dump_dest,dump_src): # 若表空间不一致,直接退出 # compare_tablespace(dump_src,dump_dest) has=[elem for elem in dump_dest if elem not in dump_src] print('need del in dest db:',len(has)) dels=[] - for o in has: - print(o) - if (obj:=re.findall(r'^CREATE TABLE (\S+)', o)): - print(obj,len(obj)) - sql=f'drop table {obj[0]};' - print(sql) - dels.append(sql) - if (obj:=re.findall(r'^CREATE FUNCTION (.*?\))', o)): - sql=f'drop function {obj[0]};' - print(sql) - dels.append(sql) - if (obj:=re.findall(r'^CREATE INDEX (\S+)', o)): - sql=f'drop index {obj[0]};' - print(sql) - dels.append(sql) - if (obj:=re.findall(r'^GRANT (.*+) TO (.*;)', o)): - sql=f'revoke {obj[0]} from {obj[1]};' - print(sql) - dels.append(sql) - # 目标库中执行dels - return dest_has + for h in has: + dels.append(reverse_sql(h)) + return has,dels # %% # 源库多的(源库新增或修改) @@ -168,24 +169,28 @@ def src_adds(dump_src,dump_dest,dels): def sync_schema(dbinfo,dbinfo2,dbname): filename=dbname+'_src.sql' filename2=dbname+'_dest.sql' - dump(dbinfo,dbname,filename) - dump(dbinfo2,dbname,filename2) + # dump(dbinfo,dbname,filename) + # dump(dbinfo2,dbname,filename2) dump_src=split_dump(filename) print(len(dump_src)) dump_dest=split_dump(filename2) print(len(dump_dest)) - dels=dest_has(dump_dest,dump_src) - src_adds(dump_src,dump_dest,dels) + has,dels=dest_has(dump_dest,dump_src) + # 目标库中先执行反向sql + print('反向sql:\n','\n'.join(dels)) + run_sql('\n'.join(dels),dbinfo2,dbname) + # 再增加源的sql + src_adds(dump_src,dump_dest,has) # %% if __name__=='__main__': dbname='mdmaster_platform'#'mdmaster_bsgj_dev551_product_dev' - dbinfo={"host":"192.168.200.101", "port":2345, "usr":"gpadmin", "pwd":"密码"} + dbinfo={"host":"192.168.200.207", "port":2345, "usr":"gpadmin", "pwd":"密码"} dbinfo2={"host":"192.168.200.73", "port":2345, "usr":"gpadmin", "pwd":"密码"} print('begint to sync schema...') - # sync_schema(dbinfo,dbinfo2,dbname) + sync_schema(dbinfo,dbinfo2,dbname) print('begint to sync data...') copy_data(dbinfo,dbinfo2,dbname) @@ -194,8 +199,3 @@ def sync_schema(dbinfo,dbinfo2,dbname): - - - - -