forked from bucardo/bucardo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bucardo.schema
2812 lines (2383 loc) · 98.2 KB
/
bucardo.schema
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
-- Schema for the main Bucardo database
-- Version 5.6.0
-- Should be run as a superuser
-- This should not need to be run directly: use either
-- bucardo install
-- or
-- bucardo upgrade
\set ON_ERROR_STOP off
\echo NOTE: Creating bucardo prerequisites: user, database, plperl; can ignore any errors
-- Create the bucardo user and database if they don't already exist
SET client_min_messages = 'ERROR';
CREATE USER bucardo SUPERUSER;
CREATE DATABASE bucardo OWNER bucardo;
\c bucardo bucardo
-- Create the base bucardo schema and languages
SET client_min_messages = 'ERROR';
CREATE LANGUAGE plpgsql;
CREATE LANGUAGE plperlu;
CREATE SCHEMA bucardo;
ALTER DATABASE bucardo SET search_path = bucardo, public;
SET standard_conforming_strings = 'ON';
-- The above were allowed to fail, because there is no harm if the objects
-- already existed. From this point forward however, we suffer no errors
\echo NOTE: Done with prerequisite setup; errors no longer ignored
\set ON_ERROR_STOP on
BEGIN;
SET client_min_messages = 'WARNING';
SET search_path TO bucardo;
SET escape_string_warning = 'OFF';
-- Try and create a plperlu function, then call it at the very end
-- Do not change this string, as the bucardo program parses it
CREATE OR REPLACE FUNCTION bucardo.plperlu_test()
RETURNS TEXT
LANGUAGE plperlu
AS $bc$
return 'Pl/PerlU was successfully installed';
$bc$;
--
-- Main bucardo configuration information
--
CREATE TABLE bucardo.bucardo_config (
name TEXT NOT NULL, -- short unique name, maps to %config inside Bucardo
setting TEXT NOT NULL,
defval TEXT NULL, -- the default value for this setting, per initial config
about TEXT NULL, -- long description
type TEXT NULL, -- sync or goat
item TEXT NULL, -- which specific sync or goat
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.bucardo_config IS $$Contains configuration variables for a specific Bucardo instance$$;
CREATE UNIQUE INDEX bucardo_config_unique ON bucardo.bucardo_config(LOWER(name)) WHERE item IS NULL;
CREATE UNIQUE INDEX bucardo_config_unique_name ON bucardo.bucardo_config(name,item,type) WHERE item IS NOT NULL;
ALTER TABLE bucardo.bucardo_config ADD CONSTRAINT valid_config_type CHECK (type IN ('sync','goat'));
ALTER TABLE bucardo.bucardo_config ADD CONSTRAINT valid_config_isolation_level
CHECK (name <> 'isolation_level' OR (setting IN ('serializable','repeatable read')));
CREATE FUNCTION bucardo.check_bucardo_config()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $bc$
BEGIN
IF NEW.name <> ALL('{log_conflict_file,warning_file,email_debug_file,flatfile_dir,reason_file,stats_script_url,stopfile,log_timer_format}') THEN
NEW.setting = LOWER(NEW.setting);
END IF;
IF (NEW.type IS NOT NULL and NEW.item IS NULL) THEN
RAISE EXCEPTION 'Must provide a specific %', NEW.type;
END IF;
IF (NEW.item IS NOT NULL and NEW.type IS NULL) THEN
RAISE EXCEPTION 'Must provide a type if giving a name';
END IF;
IF (NEW.name = 'sync' OR NEW.name = 'goat') THEN
RAISE EXCEPTION 'Invalid configuration name';
END IF;
RETURN NEW;
END;
$bc$;
COMMENT ON FUNCTION bucardo.check_bucardo_config() IS $$Basic sanity checks for configuration items$$;
CREATE TRIGGER check_bucardo_config
BEFORE INSERT OR UPDATE ON bucardo.bucardo_config
FOR EACH ROW EXECUTE PROCEDURE bucardo.check_bucardo_config();
-- Sleep times (all in seconds)
COPY bucardo.bucardo_config("name",setting,about)
FROM STDIN
WITH DELIMITER '|';
mcp_loop_sleep|0.2|How long does the main MCP daemon sleep between loops?
mcp_dbproblem_sleep|15|How many seconds to sleep before trying to respawn
mcp_vactime|60|How often in seconds do we check that a VAC is still running?
ctl_sleep|0.2|How long does the controller loop sleep?
kid_sleep|0.5|How long does a kid loop sleep?
kid_nodeltarows_sleep|0.5|How long do kids sleep if no delta rows are found?
kid_serial_sleep|0.5|How long to sleep in seconds if we hit a serialization error
kid_deadlock_sleep|0.5|How long to sleep in seconds if we hit a deadlock error
kid_restart_sleep|1|How long to sleep in seconds when restarting a kid?
endsync_sleep|1.0|How long do we sleep when custom code requests an endsync?
vac_sleep|120|How long does VAC process sleep between runs?
vac_run|30|How often does the VAC process run?
\.
-- Various timeouts (times are in seconds)
COPY bucardo.bucardo_config("name",setting,about)
FROM STDIN
WITH DELIMITER '|';
mcp_pingtime|60|How often do we ping check the MCP?
kid_pingtime|60|How often do we ping check the KID?
ctl_checkonkids_time|10|How often does the controller check on the kids health?
ctl_createkid_time|0.5|How long do we sleep to allow kids-on-demand to get on their feet?
tcp_keepalives_idle|0|How long to wait between each keepalive probe.
tcp_keepalives_interval|0|How long to wait for a response to a keepalive probe.
tcp_keepalives_count|0|How many probes to send. 0 indicates sticking with system defaults.
reload_config_timeout|30|How long to wait for reload_config to finish.
\.
-- Logging
COPY bucardo.bucardo_config(name,setting,about)
FROM STDIN
WITH DELIMITER '|';
log_microsecond|0|Show microsecond output in the timestamps?
log_showpid|1|Show PID in the log output?
log_showlevel|0|Show log level in the log output?
log_showline|0|Show line number in the log output?
log_showtime|3|Show timestamp in the log output? 0=off 1=seconds since epoch 2=scalar gmtime 3=scalar localtime
log_timer_format||Show timestamps in specific format; default/empty to show time from scalar
log_conflict_file|bucardo_conflict.log|Name of the conflict detail log file
log_showsyncname|1|Show the name of the sync next to the 'KID' prefix
log_level|NORMAL|How verbose to make the logging. Higher is more verbose.
warning_file|bucardo.warning.log|File containing all log lines starting with "Warning"
\.
-- Versioning
COPY bucardo.bucardo_config(name,setting,about)
FROM STDIN
WITH DELIMITER '|';
bucardo_initial_version|5.6.0|Bucardo version this schema was created with
bucardo_version|5.6.0|Current version of Bucardo
\.
-- Other settings:
COPY bucardo.bucardo_config(name,setting,about)
FROM STDIN
WITH DELIMITER '|';
bucardo_vac|1|Do we want the automatic VAC daemon to run?
default_email_from|[email protected]|Who the alert emails are sent as
default_email_to|[email protected]|Who to send alert emails to
default_email_host|localhost|Which host to send email through
default_email_port|25|Which port to send email through
default_conflict_strategy|bucardo_latest|Default conflict strategy for all syncs
email_debug_file||File to save a copy of all outgoing emails to
email_auth_user||User to use for email authentication via Net::SMTP
email_auth_pass||Password to use for email authentication via Net::SMTP
flatfile_dir|.|Directory to store the flatfile output inside of
host_safety_check||Regex to make sure we don't accidentally run where we should not
isolation_level|repeatable read|Default isolation level: can be serializable or repeatable read
piddir|/var/run/bucardo|Directory holding Bucardo PID files
quick_delta_check|1|Whether to do a quick scan of delta activity
reason_file|bucardo.restart.reason.txt|File to hold reasons for stopping and starting
semaphore_table|bucardo_status|Table to let apps know a sync is ongoing
statement_chunk_size|6000|How many primary keys to shove into a single statement
stats_script_url|http://www.bucardo.org/|Location of the stats script
stopfile|fullstopbucardo|Name of the semaphore file used to stop Bucardo processes
syslog_facility|LOG_LOCAL1|Which syslog facility level to use
\.
-- Unused at the moment:
COPY bucardo.bucardo_config(name,setting,about)
FROM STDIN
WITH DELIMITER '|';
autosync_ddl|newcol|Which DDL changing conditions do we try to remedy automatically?
\.
-- This needs to run after all population of bucardo.config
UPDATE bucardo.bucardo_config SET defval = setting;
--
-- Keep track of every database we need to connect to
--
CREATE TABLE bucardo.db (
name TEXT NOT NULL, -- local name for convenience, not necessarily database name
CONSTRAINT db_name_pk PRIMARY KEY (name),
dbdsn TEXT NOT NULL DEFAULT '',
dbtype TEXT NOT NULL DEFAULT 'postgres',
dbhost TEXT NULL DEFAULT '',
dbport TEXT NULL DEFAULT '',
dbname TEXT NULL, -- the actual name of the database, not the primary key 'local' name
dbuser TEXT NULL,
dbpass TEXT NULL,
dbconn TEXT NOT NULL DEFAULT '', -- string to add to the generated dsn
dbservice TEXT NULL DEFAULT '',
pgpass TEXT NULL, -- local file with connection info same as pgpass
status TEXT NOT NULL DEFAULT 'active',
server_side_prepares BOOLEAN NOT NULL DEFAULT true,
makedelta BOOLEAN NOT NULL DEFAULT false,
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.db IS $$Holds information about each database used in replication$$;
ALTER TABLE bucardo.db ADD CONSTRAINT db_status CHECK (status IN ('active','inactive','stalled'));
ALTER TABLE bucardo.db ADD CONSTRAINT db_service_valid CHECK (dbservice IS NOT NULL OR dbname IS NOT NULL AND dbuser IS NOT NULL AND dbhost IS NOT NULL AND dbport IS NOT NULL);
--
-- Databases can belong to zero or more named groups
--
CREATE TABLE bucardo.dbgroup (
name TEXT NOT NULL,
CONSTRAINT dbgroup_name_pk PRIMARY KEY (name),
about TEXT NULL,
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.dbgroup IS $$Named groups of databases: used as 'targetgroup' for syncs$$;
CREATE TABLE bucardo.dbmap (
db TEXT NOT NULL,
CONSTRAINT dbmap_db_fk FOREIGN KEY (db) REFERENCES bucardo.db(name) ON UPDATE CASCADE ON DELETE CASCADE,
dbgroup TEXT NOT NULL,
CONSTRAINT dbmap_dbgroup_fk FOREIGN KEY (dbgroup) REFERENCES bucardo.dbgroup(name) ON UPDATE CASCADE ON DELETE CASCADE,
priority SMALLINT NOT NULL DEFAULT 0,
role TEXT NOT NULL DEFAULT 'target',
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.dbmap IS $$Associates a database with one or more groups$$;
CREATE UNIQUE INDEX dbmap_unique ON bucardo.dbmap(db,dbgroup);
--
-- Track status information about each database
--
CREATE TABLE bucardo.db_connlog (
db TEXT NOT NULL,
CONSTRAINT db_connlog_dbid_fk FOREIGN KEY (db) REFERENCES bucardo.db(name) ON UPDATE CASCADE ON DELETE CASCADE,
conndate TIMESTAMPTZ NOT NULL DEFAULT now(), -- when we first connected to it
connstring TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'unknown',
CONSTRAINT db_connlog_status CHECK (status IN ('unknown', 'good', 'down', 'unreachable')),
version TEXT NULL
);
COMMENT ON TABLE bucardo.db_connlog IS $$Tracks connection attempts to each database when its information changes$$;
--
-- We need to track each item we want to replicate from or replicate to
--
CREATE SEQUENCE bucardo.goat_id_seq;
CREATE TABLE bucardo.goat (
id INTEGER NOT NULL DEFAULT nextval('goat_id_seq'),
CONSTRAINT goat_id_pk PRIMARY KEY (id),
db TEXT NOT NULL,
CONSTRAINT goat_db_fk FOREIGN KEY (db) REFERENCES bucardo.db(name) ON UPDATE CASCADE ON DELETE RESTRICT,
schemaname TEXT NOT NULL,
tablename TEXT NOT NULL,
reltype TEXT NOT NULL DEFAULT 'table',
pkey TEXT NULL,
qpkey TEXT NULL,
pkeytype TEXT NULL,
has_delta BOOLEAN NOT NULL DEFAULT false,
autokick BOOLEAN NULL, -- overrides sync-level autokick
conflict_strategy TEXT NULL,
makedelta TEXT NULL,
rebuild_index SMALLINT NULL, -- overrides sync-level rebuild_index
ghost BOOLEAN NOT NULL DEFAULT false, -- only drop triggers, do not replicate
analyze_after_copy BOOLEAN NOT NULL DEFAULT true,
vacuum_after_copy BOOLEAN NOT NULL DEFAULT true,
strict_checking BOOLEAN NOT NULL DEFAULT true,
delta_bypass BOOLEAN NOT NULL DEFAULT false,
delta_bypass_min BIGINT NULL,
delta_bypass_count BIGINT NULL,
delta_bypass_percent SMALLINT NULL,
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.goat IS $$Holds information on each table or sequence that may be replicated$$;
ALTER TABLE bucardo.goat ADD CONSTRAINT has_schemaname CHECK (length(schemaname) >= 1);
ALTER TABLE bucardo.goat ADD CONSTRAINT valid_reltype CHECK (reltype IN ('table','sequence'));
ALTER TABLE bucardo.goat ADD CONSTRAINT pkey_needs_type CHECK (pkey = '' OR pkeytype IS NOT NULL);
--
-- Set of filters for each goat.
--
CREATE SEQUENCE bucardo.bucardo_custom_trigger_id_seq;
CREATE TABLE bucardo.bucardo_custom_trigger (
id INTEGER NOT NULL DEFAULT nextval('bucardo_custom_trigger_id_seq'),
CONSTRAINT bucardo_custom_trigger_id_pk PRIMARY KEY (id),
goat INTEGER NOT NULL,
CONSTRAINT bucardo_custom_trigger_goat_fk FOREIGN KEY (goat) REFERENCES bucardo.goat(id) ON DELETE CASCADE,
trigger_name TEXT NOT NULL,
trigger_type TEXT NOT NULL,
trigger_language TEXT NOT NULL DEFAULT 'plpgsql',
trigger_body TEXT NOT NULL,
trigger_level TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.bucardo_custom_trigger IS $$Used to override the default bucardo_delta trigger on a per-table basis$$;
ALTER TABLE bucardo.bucardo_custom_trigger ADD CONSTRAINT type_is_delta_or_trigger CHECK (trigger_type IN ('delta', 'triggerkick'));
ALTER TABLE bucardo.bucardo_custom_trigger ADD CONSTRAINT level_is_row_statement CHECK (trigger_level IN ('ROW', 'STATEMENT'));
CREATE UNIQUE INDEX bucardo_custom_trigger_goat_type_unique ON bucardo.bucardo_custom_trigger(goat, trigger_type);
--
-- A group of goats. Ideally arranged in some sort of tree.
--
CREATE TABLE bucardo.herd (
name TEXT NOT NULL,
CONSTRAINT herd_name_pk PRIMARY KEY (name),
about TEXT NULL,
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.herd IS $$Named group of tables or sequences from the goat table: used as the 'source' for syncs$$;
--
-- Goats belong to zero or more herds. In most cases, they will
-- belong to a single herd if they are being replicated.
--
CREATE TABLE bucardo.herdmap (
herd TEXT NOT NULL,
CONSTRAINT herdmap_herd_fk FOREIGN KEY (herd) REFERENCES bucardo.herd(name) ON UPDATE CASCADE ON DELETE CASCADE,
goat INTEGER NOT NULL,
CONSTRAINT herdmap_goat_fk FOREIGN KEY (goat) REFERENCES bucardo.goat(id) ON DELETE CASCADE,
priority SMALLINT NOT NULL DEFAULT 0,
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.herdmap IS $$Associates a goat with one or more herds$$;
CREATE UNIQUE INDEX bucardo_herdmap_unique ON bucardo.herdmap(herd,goat);
CREATE FUNCTION bucardo.herdcheck()
RETURNS TRIGGER
LANGUAGE plpgsql
AS
$bc$
BEGIN
-- All goats in a herd must be from the same database
PERFORM herd FROM herdmap h, goat g WHERE h.goat=g.id GROUP BY 1 HAVING COUNT(DISTINCT db) > 1;
IF FOUND THEN
RAISE EXCEPTION 'All tables within a relgroup must be from the same database';
END IF;
RETURN NEW;
END;
$bc$;
CREATE TRIGGER herdcheck
AFTER INSERT OR UPDATE ON bucardo.herdmap
FOR EACH ROW EXECUTE PROCEDURE bucardo.herdcheck();
--
-- We need to know who is replicating to who, and how
--
CREATE TABLE bucardo.sync (
name TEXT NOT NULL UNIQUE,
CONSTRAINT sync_name_pk PRIMARY KEY (name),
herd TEXT NULL,
CONSTRAINT sync_herd_fk FOREIGN KEY (herd) REFERENCES bucardo.herd(name) ON UPDATE CASCADE ON DELETE RESTRICT,
dbs TEXT NULL,
CONSTRAINT sync_dbs_fk FOREIGN KEY (dbs) REFERENCES bucardo.dbgroup(name) ON UPDATE CASCADE ON DELETE RESTRICT,
stayalive BOOLEAN NOT NULL DEFAULT true, -- Does the sync controller stay connected?
kidsalive BOOLEAN NOT NULL DEFAULT true, -- Do the children stay connected?
conflict_strategy TEXT NOT NULL DEFAULT 'bucardo_latest',
copyextra TEXT NOT NULL DEFAULT '', -- e.g. WITH OIDS
deletemethod TEXT NOT NULL DEFAULT 'delete',
autokick BOOLEAN NOT NULL DEFAULT true, -- Are we issuing NOTICES via triggers?
checktime INTERVAL NULL, -- How often to check if we've not heard anything?
status TEXT NOT NULL DEFAULT 'active', -- Possibly CHECK / FK ('stopped','paused','b0rken')
rebuild_index SMALLINT NOT NULL DEFAULT 0, -- Load without indexes and then REINDEX table
priority SMALLINT NOT NULL DEFAULT 0, -- Higher is better
analyze_after_copy BOOLEAN NOT NULL DEFAULT true,
vacuum_after_copy BOOLEAN NOT NULL DEFAULT false,
strict_checking BOOLEAN NOT NULL DEFAULT true,
overdue INTERVAL NOT NULL DEFAULT '0 seconds'::interval,
expired INTERVAL NOT NULL DEFAULT '0 seconds'::interval,
track_rates BOOLEAN NOT NULL DEFAULT false,
onetimecopy SMALLINT NOT NULL DEFAULT 0,
lifetime INTERVAL NULL, -- force controller and kids to restart
maxkicks INTEGER NOT NULL DEFAULT 0, -- force controller and kids to restart
isolation_level TEXT NULL,
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.sync IS $$Defines a single replication event from a herd to one or more target databases$$;
ALTER TABLE bucardo.sync ADD CONSTRAINT sync_deletemethod CHECK (deletemethod IN ('truncate', 'delete', 'truncate_cascade'));
-- Because NOTIFY is broke, make sure our names are simple:
ALTER TABLE bucardo.db ADD CONSTRAINT db_name_sane CHECK (name ~ E'^[a-zA-Z]\\w*$');
ALTER TABLE bucardo.dbgroup ADD CONSTRAINT dbgroup_name_sane CHECK (name ~ E'^[a-zA-Z]\\w*$');
ALTER TABLE bucardo.sync ADD CONSTRAINT sync_name_sane
CHECK (name ~ E'^[a-zA-Z]\\w*$' AND (lower(name) NOT IN ('pushdelta','fullcopy','swap','sync')));
ALTER TABLE bucardo.sync ADD CONSTRAINT sync_isolation_level
CHECK (isolation_level IS NULL OR (lower(isolation_level) IN ('serializable', 'repeatable read')));
CREATE SEQUENCE bucardo.clone_id_seq;
CREATE TABLE bucardo.clone (
id INTEGER NOT NULL DEFAULT nextval('clone_id_seq'),
CONSTRAINT clone_id_pk PRIMARY KEY (id),
sync TEXT NULL,
CONSTRAINT clone_sync_fk FOREIGN KEY (sync) REFERENCES bucardo.sync(name) ON UPDATE CASCADE ON DELETE CASCADE,
dbgroup TEXT NULL,
CONSTRAINT clone_dbgroup_fk FOREIGN KEY (dbgroup) REFERENCES bucardo.dbgroup(name) ON UPDATE CASCADE ON DELETE CASCADE,
relgroup TEXT NULL,
CONSTRAINT clone_relgroup_fk FOREIGN KEY (relgroup) REFERENCES bucardo.herd(name) ON UPDATE CASCADE ON DELETE CASCADE,
options TEXT NULL,
status TEXT NULL,
started TIMESTAMPTZ NULL,
ended TIMESTAMPTZ NULL,
summary TEXT NULL,
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE SEQUENCE bucardo.customcode_id_seq;
CREATE TABLE bucardo.customcode (
id INTEGER NOT NULL DEFAULT nextval('customcode_id_seq'),
CONSTRAINT customcode_id_pk PRIMARY KEY (id),
name TEXT NOT NULL UNIQUE,
about TEXT NULL,
whenrun TEXT NOT NULL,
getdbh BOOLEAN NOT NULL DEFAULT true,
src_code TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
priority SMALLINT NOT NULL DEFAULT 0,
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.customcode IS $$Holds Perl subroutines that run via hooks in the replication process$$;
ALTER TABLE bucardo.customcode ADD CONSTRAINT customcode_whenrun
CHECK (whenrun IN ('before_txn',
'before_check_rows',
'before_trigger_disable',
'after_trigger_disable',
'after_table_sync',
'exception',
'conflict',
'before_trigger_enable',
'after_trigger_enable',
'after_txn',
'before_sync',
'after_sync'));
CREATE TABLE bucardo.customcode_map (
code INTEGER NOT NULL,
CONSTRAINT customcode_map_code_fk FOREIGN KEY (code) REFERENCES bucardo.customcode(id) ON DELETE CASCADE,
sync TEXT NULL,
CONSTRAINT customcode_map_sync_fk FOREIGN KEY (sync) REFERENCES bucardo.sync(name) ON UPDATE CASCADE ON DELETE SET NULL,
goat INTEGER NULL,
CONSTRAINT customcode_map_goat_fk FOREIGN KEY (goat) REFERENCES bucardo.goat(id) ON DELETE SET NULL,
active BOOLEAN NOT NULL DEFAULT true,
priority SMALLINT NOT NULL DEFAULT 0,
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.customcode_map IS $$Associates a custom code with one or more syncs or goats$$;
ALTER TABLE bucardo.customcode_map ADD CONSTRAINT customcode_map_syncgoat
CHECK (sync IS NULL OR goat IS NULL);
CREATE UNIQUE INDEX customcode_map_unique_sync ON bucardo.customcode_map(code,sync) WHERE sync IS NOT NULL;
CREATE UNIQUE INDEX customcode_map_unique_goat ON bucardo.customcode_map(code,goat) WHERE goat IS NOT NULL;
--
-- Allow the target's names to differ from the source
--
CREATE SEQUENCE bucardo.customname_id_seq;
CREATE TABLE bucardo.customname (
id INTEGER NOT NULL DEFAULT nextval('customname_id_seq'),
CONSTRAINT customname_id_pk PRIMARY KEY (id),
goat INTEGER NOT NULL,
newname TEXT NULL,
db TEXT NULL,
sync TEXT NULL,
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
ALTER TABLE bucardo.customname ADD CONSTRAINT customname_sane_name
CHECK (newname ~ E'^["a-zA-Z 0-9_.~]+$');
ALTER TABLE bucardo.customname
ADD CONSTRAINT customname_db_fk
FOREIGN KEY (db) REFERENCES bucardo.db (name)
ON UPDATE CASCADE ON DELETE CASCADE;
ALTER TABLE bucardo.customname
ADD CONSTRAINT customname_sync_fk
FOREIGN KEY (sync) REFERENCES bucardo.sync (name)
ON UPDATE CASCADE ON DELETE CASCADE;
ALTER TABLE bucardo.customname
ADD CONSTRAINT customname_goat_fk
FOREIGN KEY (goat) REFERENCES bucardo.goat (id)
ON UPDATE CASCADE ON DELETE CASCADE;
--
-- Allow the target's columns to differ from the source
--
CREATE SEQUENCE bucardo.customcols_id_seq;
CREATE TABLE bucardo.customcols (
id INTEGER NOT NULL DEFAULT nextval('customcols_id_seq'),
CONSTRAINT customcols_id_pk PRIMARY KEY (id),
goat INTEGER NOT NULL,
clause TEXT NULL,
db TEXT NULL,
sync TEXT NULL,
cdate TIMESTAMPTZ NOT NULL DEFAULT now()
);
ALTER TABLE bucardo.customcols
ADD CONSTRAINT customcols_db_fk
FOREIGN KEY (db) REFERENCES bucardo.db (name)
ON UPDATE CASCADE ON DELETE CASCADE;
ALTER TABLE bucardo.customcols
ADD CONSTRAINT customcols_sync_fk
FOREIGN KEY (sync) REFERENCES bucardo.sync (name)
ON UPDATE CASCADE ON DELETE CASCADE;
ALTER TABLE bucardo.customcols
ADD CONSTRAINT customcols_goat_fk
FOREIGN KEY (goat) REFERENCES bucardo.goat (id)
ON UPDATE CASCADE ON DELETE CASCADE;
--
-- Keep track of syncs as they run: provides instant and historical status information
--
CREATE TABLE bucardo.syncrun (
sync TEXT NULL,
truncates INTEGER NOT NULL DEFAULT 0,
deletes BIGINT NOT NULL DEFAULT 0,
inserts BIGINT NOT NULL DEFAULT 0,
conflicts BIGINT NOT NULL DEFAULT 0,
started TIMESTAMPTZ NOT NULL DEFAULT now(),
ended TIMESTAMPTZ NULL,
lastgood BOOLEAN NOT NULL DEFAULT false,
lastbad BOOLEAN NOT NULL DEFAULT false,
lastempty BOOLEAN NOT NULL DEFAULT false,
details TEXT NULL,
status TEXT NULL
);
COMMENT ON TABLE bucardo.syncrun IS $$Information about specific runs of syncs$$;
-- Link back to the sync table, but never lose the data even on a sync drop
ALTER TABLE bucardo.syncrun
ADD CONSTRAINT syncrun_sync_fk
FOREIGN KEY (sync) REFERENCES bucardo.sync (name)
ON UPDATE CASCADE ON DELETE SET NULL;
-- Is essentially a unique index, but we want to avoid any [b]locking
CREATE INDEX syncrun_sync_started ON syncrun(sync) WHERE ended IS NULL;
-- We often need the last good/bad/empty for a sync:
CREATE INDEX syncrun_sync_lastgood ON syncrun(sync) WHERE lastgood IS TRUE;
CREATE INDEX syncrun_sync_lastbad ON syncrun(sync) WHERE lastbad IS TRUE;
CREATE INDEX syncrun_sync_lastempty ON syncrun(sync) WHERE lastempty IS TRUE;
--
-- Keep track of which dbs are currently being used, for traffic control
--
CREATE TABLE bucardo.dbrun (
sync TEXT NOT NULL,
dbname TEXT NOT NULL,
pgpid INTEGER NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT now()
);
COMMENT ON TABLE bucardo.dbrun IS $$Information about which databases are being accessed$$;
CREATE INDEX dbrun_index ON bucardo.dbrun(sync);
CREATE FUNCTION bucardo.table_exists(text,text)
RETURNS BOOLEAN
LANGUAGE plpgsql
AS $bc$
BEGIN
PERFORM 1
FROM pg_catalog.pg_class c, pg_namespace n
WHERE c.relnamespace = n.oid
AND n.nspname = $1
AND c.relname = $2;
IF FOUND THEN RETURN true; END IF;
RETURN false;
END;
$bc$;
--
-- Return a safe/standard name for a table, for use in delta/track namings
--
CREATE OR REPLACE FUNCTION bucardo.bucardo_tablename_maker(text)
RETURNS TEXT
LANGUAGE plpgsql
IMMUTABLE
AS $bc$
DECLARE
tname TEXT;
newname TEXT;
hashed TEXT;
BEGIN
-- sanitize and dequote the table name to avoid double-quoting later
SELECT INTO tname REGEXP_REPLACE(
REPLACE($1, '.', '_'), -- replace dots
'"(")?',
'\1',
'g'
);
-- Assumes max_identifier_length is 63
-- Because even if not, we'll still abbreviate for consistency and portability
SELECT INTO newname SUBSTRING(tname FROM 1 FOR 57);
IF (newname != tname) THEN
SELECT INTO newname SUBSTRING(tname FROM 1 FOR 46)
|| '!'
|| SUBSTRING(MD5(tname) FROM 1 FOR 10);
END IF;
-- We let Postgres worry about the quoting details
SELECT INTO newname quote_ident(newname);
RETURN newname;
END;
$bc$;
--
-- Return a created connection string from the db table
--
CREATE OR REPLACE FUNCTION bucardo.db_getconn(text)
RETURNS TEXT
LANGUAGE plperlu
SECURITY DEFINER
AS $bc$
## Given the name of a db, return the type, plus type-specific connection information
## ALL: the string 'DSN', a colon, and the value of the dbdsn field, if set
## Postgres: a connection string, username, password, and attribs
## Drizzle: a connection string, username, and password
## Firebird: a connection string, username, and password
## Mongo: "foo: bar" style connection information, one per line
## MariaDB: a connection string, username, and password
## MySQL: a connection string, username, and password
## Oracle: a connection string, username, and password
## Redis: "foo: bar" style connection information, one per line
## SQLite: a database file name
use strict;
use warnings;
use DBI;
my ($name, $SQL, $rv, $row, %db);
$name = shift;
$name =~ s/'/''/go;
$SQL = "SELECT * FROM bucardo.db WHERE name = '$name'";
$rv = spi_exec_query($SQL);
if (!$rv->{processed}) {
elog(ERROR, qq{Error: Could not find a database with a name of $name\n});
}
$row = $rv->{rows}[0];
my $dbtype = $row->{dbtype} || 'postgres';
## If we have a DSN, it trumps everything else
if (exists $row->{dbdsn} and length $row->{dbdsn}) {
return "$dbtype\nDSN:$row->{dbdsn}\n\n\n";
}
for (qw(host port name user pass conn service)) {
$db{$_} = exists $row->{"db$_"} ? $row->{"db$_"} : '';
}
## Check that the port is numeric
if (defined $db{port} and length $db{port} and $db{port} !~ /^\d+$/) {
elog(ERROR, qq{Database port must be numeric, but got "$db{port}"\n});
}
if ($dbtype eq 'postgres') {
## If there is a dbfile and it exists, it overrides the rest
## Format = hostname:port:database:username:password
## http://www.postgresql.org/docs/current/static/libpq-pgpass.html
## We also check for one if no password is given
if (!defined $row->{dbpass}) {
my $passfile = $row->{pgpass} || '';
if (open my $pass, "<", $passfile) {
## We only do complete matches
my $match = "$row->{dbhost}:$row->{dbport}:$row->{dbname}:$row->{dbuser}";
while (<$pass>) {
if (/^$match:(.+)/) {
$row->{dbpass} = $1;
elog(DEBUG, "Found password in pgpass file $passfile for $match");
last;
}
}
}
}
## These may be specified in the service name
$db{service} = '' if ! defined $db{service};
if (! length($db{service})) {
length $db{name} or elog(ERROR, qq{Database name is mandatory\n});
length $db{user} or elog(ERROR, qq{Database username is mandatory\n});
}
my $connstring = "dbi:Pg:";
$db{host} ||= ''; $db{port} ||= ''; $db{pass} ||= ''; $db{user} ||= '';
$connstring .= join ';', map {
( $_ eq 'name' ? 'dbname' : $_ ) . "=$db{$_}";
} grep { length $db{$_} } qw/name host port service/;
$connstring .= ';' . $db{conn} if length $db{conn};
my $ssp = $row->{server_side_prepares};
$ssp = 1 if ! defined $ssp;
return "$dbtype\n$connstring\n$db{user}\n$db{pass}\n$ssp";
} ## end postgres
if ($dbtype eq 'drizzle') {
length $db{name} or elog(ERROR, qq{Database name is mandatory\n});
length $db{user} or elog(ERROR, qq{Database username is mandatory\n});
my $connstring = "dbi:drizzle:database=$db{name}";
$db{host} ||= ''; $db{port} ||= ''; $db{pass} ||= '';
length $db{host} and $connstring .= ";host=$db{host}";
length $db{port} and $connstring .= ";port=$db{port}";
length $db{conn} and $connstring .= ";$db{conn}";
return "$dbtype\n$connstring\n$db{user}\n$db{pass}";
} ## end drizzle
if ($dbtype eq 'mongo') {
my $connstring = "$dbtype\n";
for my $name (qw/ host port user pass /) {
defined $db{$name} and length $db{$name} and $connstring .= "$name: $db{$name}\n";
}
chomp $connstring;
return $connstring;
}
if ($dbtype eq 'mysql' or $dbtype eq 'mariadb') {
length $db{name} or elog(ERROR, qq{Database name is mandatory\n});
length $db{user} or elog(ERROR, qq{Database username is mandatory\n});
my $connstring = "dbi:mysql:database=$db{name}";
$db{host} ||= ''; $db{port} ||= ''; $db{pass} ||= '';
length $db{host} and $connstring .= ";host=$db{host}";
length $db{port} and $connstring .= ";port=$db{port}";
length $db{conn} and $connstring .= ";$db{conn}";
return "$dbtype\n$connstring\n$db{user}\n$db{pass}";
} ## end mysql/mariadb
if ($dbtype eq 'firebird') {
length $db{name} or elog(ERROR, qq{Database name is mandatory\n});
length $db{user} or elog(ERROR, qq{Database username is mandatory\n});
my $connstring = "dbi:Firebird:db=$db{name}";
$db{host} ||= ''; $db{port} ||= ''; $db{pass} ||= '';
length $db{host} and $connstring .= ";host=$db{host}";
length $db{port} and $connstring .= ";port=$db{port}";
length $db{conn} and $connstring .= ";$db{conn}";
return "$dbtype\n$connstring\n$db{user}\n$db{pass}";
} ## end firebird
if ($dbtype eq 'oracle') {
## We should loosen this up somewhere
length $db{name} or elog(ERROR, qq{Database name is mandatory\n});
length $db{user} or elog(ERROR, qq{Database username is mandatory\n});
## TODO: Support SID, other forms
my $connstring = "dbi:Oracle:dbname=$db{name}";
$db{host} ||= ''; $db{port} ||= ''; $db{conn} ||= ''; $db{pass} ||= '';
length $db{host} and $connstring .= ";host=$db{host}";
length $db{port} and $connstring .= ";port=$db{port}";
length $db{conn} and $connstring .= ";$db{conn}";
return "$dbtype\n$connstring\n$db{user}\n$db{pass}";
} ## end oracle
if ($dbtype eq 'redis') {
my $connstring = "$dbtype\n";
for my $name (qw/ host port user pass name /) {
defined $db{$name} and length $db{$name} and $connstring .= "$name: $db{$name}\n";
}
chomp $connstring;
return $connstring;
}
if ($dbtype eq 'sqlite') {
## We should loosen this up somewhere
length $db{name} or elog(ERROR, qq{Database name is mandatory\n});
## TODO: Support SID, other forms
my $connstring = "dbi:SQLite:dbname=$db{name}";
return "$dbtype\n$connstring";
} ## end sqlite
return "Unknown database type: $dbtype";
$bc$;
--
-- Test a database connection, and log to the db_connlog table
--
CREATE FUNCTION bucardo.db_testconn(text)
RETURNS TEXT
LANGUAGE plperlu
SECURITY DEFINER
AS
$bc$
## Given the name of a db connection, construct the connection
## string for it and then connect to it and log the attempt
use strict; use warnings; use DBI;
my ($name, $SQL, $rv, $row, $dbh, %db, $version, $found);
$name = shift;
$name =~ s/'/''/g;
$SQL = "SELECT bucardo.db_getconn('$name') AS bob";
$rv = spi_exec_query($SQL);
if (!$rv->{processed}) {
elog(ERROR, qq{Error: Could not find a database with an name of $name\n});
}
$row = $rv->{rows}[0]{bob};
($db{type},$db{dsn},$db{user},$db{pass}) = split /\n/ => $row;
$db{dsn} =~ s/^DSN://;
if ($db{type} ne 'postgres') {
return '';
}
my $safeconn = "$db{dsn} user=$db{user}"; ## No password for now
$safeconn =~ s/'/''/go;
(my $safename = $name) =~ s/'/''/go;
elog(DEBUG, "Connecting as $db{dsn} user=$db{user} $$");
eval {
$dbh = DBI->connect($db{dsn}, $db{user}, $db{pass},
{AutoCommit=>1, RaiseError=>1, PrintError=>0});
};
if ($@ or !$dbh) {
$SQL = "INSERT INTO bucardo.db_connlog (db,connstring,status) VALUES ('$safename','$safeconn','unknown')";
spi_exec_query($SQL);
return "Failed to make database connection: $@";
}
$version = $dbh->{pg_server_version};
## Just in case, switch to read/write mode
$dbh->do('SET SESSION CHARACTERISTICS AS TRANSACTION READ WRITE');
## Install plpgsql if not there already
$SQL = q{SELECT 1 FROM pg_language WHERE lanname = 'plpgsql'};
my $sth = $dbh->prepare($SQL);
my $count = $sth->execute();
$sth->finish();
if ($count < 1) {
$dbh->do("CREATE LANGUAGE plpgsql");
}
$dbh->disconnect();
$SQL = "INSERT INTO bucardo.db_connlog (db,connstring,status,version) VALUES ('$safename','$safeconn','good',$version)";
spi_exec_query($SQL);
return "Database connection successful";
$bc$;
--
-- Check the database connection if anything changes in the db table
--
CREATE FUNCTION bucardo.db_change()
RETURNS TRIGGER
LANGUAGE plperlu
SECURITY DEFINER
AS
$bc$
return if $_TD->{new}{status} eq 'inactive';
## Test connection to the database specified
my $name = $_TD->{new}{name};
$name =~ s/'/''/g;
spi_exec_query("SELECT bucardo.db_testconn('$name')");
return;
$bc$;
CREATE TRIGGER db_change
AFTER INSERT OR UPDATE ON bucardo.db
FOR EACH ROW EXECUTE PROCEDURE bucardo.db_change();
--
-- Setup the goat table after any change
--
CREATE OR REPLACE FUNCTION bucardo.validate_goat()
RETURNS TRIGGER
LANGUAGE plperlu
SECURITY DEFINER
AS
$bc$
## If a row in goat has changed, re-validate and set things up for that table
elog(DEBUG, "Running validate_goat");
use strict; use warnings; use DBI;
my ($SQL, $rv, $row, %db, $dbh, $sth, $count, $oid);
my $old = $_TD->{event} eq 'UPDATE' ? $_TD->{old} : 0;
my $new = $_TD->{new};
if (!defined $new->{db}) {
die qq{Must provide a db\n};
}
if (!defined $new->{tablename}) {
die qq{Must provide a tablename\n};
}
if (!defined $new->{schemaname}) {
die qq{Must provide a schemaname\n};
}
if ($new->{reltype} ne 'table') {
return;
}
my ($dbname,$schema,$table,$pkey) =
($new->{db}, $new->{schemaname}, $new->{tablename}, $new->{pkey});
## Do not allow pkeytype or qpkey to be set manually.
if (defined $new->{pkeytype} and (!$old or $new->{pkeytype} ne $old->{pkeytype})) {
die qq{Cannot set pkeytype manually\n};
}
if (defined $new->{qpkey} and (!$old or $new->{qpkey} ne $old->{qpkey})) {
die qq{Cannot set qpkey manually\n};
}
## If this is an update, we only continue if certain fields have changed
if ($old
and $old->{db} eq $new->{db}
and $old->{schemaname} eq $new->{schemaname}