Skip to content

Commit 115832a

Browse files
committed
PG-1447 Assert there are no encrypted relations when using FILE_COPY
The FILE_COPY strategy of CREATE DATABASE does not use the SMGR so it cannot properly copy and re-encrypt encrypted relations. So we check for such relations and error out if any are found. We look for encrypted relations simply by counting the number of keys in the key map file of the database. This simple approach is fine even with the risk of a left-over key from a crash or a bug due to the FILE_COPY method being rare and that we therefore want to keep this code minimal.
1 parent 2ba050e commit 115832a

File tree

7 files changed

+236
-8
lines changed

7 files changed

+236
-8
lines changed

contrib/pg_tde/expected/create_database.out

+8
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,14 @@ SELECT pg_tde_is_encrypted('test_plain_id_seq');
9292
(1 row)
9393

9494
\c :regress_database
95+
CREATE DATABASE new_db_file_copy TEMPLATE template_db STRATEGY FILE_COPY;
96+
ERROR: The FILE_COPY strategy cannot be used when there are encrypted objects in the template database: 3 objects found
97+
HINT: Use the WAL_LOG strategy instead.
98+
\c template_db
99+
DROP TABLE test_enc;
100+
\c :regress_database
101+
CREATE DATABASE new_db_file_copy TEMPLATE template_db STRATEGY FILE_COPY;
102+
DROP DATABASE new_db_file_copy;
95103
DROP DATABASE new_db;
96104
DROP DATABASE template_db;
97105
DROP EXTENSION pg_tde;

contrib/pg_tde/sql/create_database.sql

+11
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,17 @@ SELECT pg_tde_is_encrypted('test_plain_id_seq');
4444

4545
\c :regress_database
4646

47+
CREATE DATABASE new_db_file_copy TEMPLATE template_db STRATEGY FILE_COPY;
48+
49+
\c template_db
50+
51+
DROP TABLE test_enc;
52+
53+
\c :regress_database
54+
55+
CREATE DATABASE new_db_file_copy TEMPLATE template_db STRATEGY FILE_COPY;
56+
57+
DROP DATABASE new_db_file_copy;
4758
DROP DATABASE new_db;
4859
DROP DATABASE template_db;
4960

contrib/pg_tde/src/access/pg_tde_tdemap.c

+50-7
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ static int pg_tde_open_file_basic(const char *tde_filename, int fileFlags, bool
115115
static void pg_tde_file_header_read(const char *tde_filename, int fd, TDEFileHeader *fheader, off_t *bytes_read);
116116
static bool pg_tde_read_one_map_entry(int fd, TDEMapEntry *map_entry, off_t *offset);
117117
static void pg_tde_read_one_map_entry2(int keydata_fd, int32 key_index, TDEMapEntry *map_entry, Oid databaseId);
118-
static int pg_tde_open_file_read(const char *tde_filename, off_t *curr_pos);
118+
static int pg_tde_open_file_read(const char *tde_filename, bool ignore_missing, off_t *curr_pos);
119119
static InternalKey *pg_tde_get_key_from_cache(const RelFileLocator *rlocator, uint32 key_type);
120120
static WALKeyCacheRec *pg_tde_add_wal_key_to_cache(InternalKey *cached_key, XLogRecPtr start_lsn);
121121
static InternalKey *pg_tde_put_key_into_cache(const RelFileLocator *locator, InternalKey *key);
@@ -599,7 +599,7 @@ pg_tde_perform_rotate_key(TDEPrincipalKey *principal_key, TDEPrincipalKey *new_p
599599

600600
pg_tde_set_db_file_path(principal_key->keyInfo.databaseId, old_path);
601601

602-
old_fd = pg_tde_open_file_read(old_path, &old_curr_pos);
602+
old_fd = pg_tde_open_file_read(old_path, false, &old_curr_pos);
603603
new_fd = keyrotation_init_file(&new_signed_key_info, new_path, old_path, &new_curr_pos);
604604

605605
/* Read all entries until EOF */
@@ -874,7 +874,7 @@ pg_tde_find_map_entry(const RelFileLocator *rlocator, uint32 key_type, char *db_
874874

875875
Assert(rlocator != NULL);
876876

877-
map_fd = pg_tde_open_file_read(db_map_path, &curr_pos);
877+
map_fd = pg_tde_open_file_read(db_map_path, false, &curr_pos);
878878

879879
while (pg_tde_read_one_map_entry(map_fd, map_entry, &curr_pos))
880880
{
@@ -890,6 +890,47 @@ pg_tde_find_map_entry(const RelFileLocator *rlocator, uint32 key_type, char *db_
890890
return found;
891891
}
892892

893+
/*
894+
* Counts number of encrypted objects in a database.
895+
*
896+
* Does not check if objects actually exist but just that they have keys in
897+
* the map file. For the conly current user, checking if we can use FILE_COPY,
898+
* this is fgood enough but for other workloads where a false positive is
899+
* more harmful this might not be.
900+
*
901+
* Works even if the database has no map file.
902+
*/
903+
int
904+
pg_tde_count_relations(Oid dbOid)
905+
{
906+
char db_map_path[MAXPGPATH];
907+
LWLock *lock_pk = tde_lwlock_enc_keys();
908+
File map_fd;
909+
off_t curr_pos = 0;
910+
TDEMapEntry map_entry;
911+
int count = 0;
912+
913+
pg_tde_set_db_file_path(dbOid, db_map_path);
914+
915+
LWLockAcquire(lock_pk, LW_SHARED);
916+
917+
map_fd = pg_tde_open_file_read(db_map_path, true, &curr_pos);
918+
if (map_fd < 0)
919+
return count;
920+
921+
while (pg_tde_read_one_map_entry(map_fd, &map_entry, &curr_pos))
922+
{
923+
if (map_entry.flags & TDE_KEY_TYPE_SMGR)
924+
count++;
925+
}
926+
927+
close(map_fd);
928+
929+
LWLockRelease(lock_pk);
930+
931+
return count;
932+
}
933+
893934
bool
894935
pg_tde_verify_principal_key_info(TDESignedPrincipalKeyInfo *signed_key_info, const TDEPrincipalKey *principal_key)
895936
{
@@ -926,15 +967,17 @@ tde_decrypt_rel_key(TDEPrincipalKey *principal_key, TDEMapEntry *map_entry)
926967
* is raised.
927968
*/
928969
static int
929-
pg_tde_open_file_read(const char *tde_filename, off_t *curr_pos)
970+
pg_tde_open_file_read(const char *tde_filename, bool ignore_missing, off_t *curr_pos)
930971
{
931972
int fd;
932973
TDEFileHeader fheader;
933974
off_t bytes_read = 0;
934975

935976
Assert(LWLockHeldByMeInMode(tde_lwlock_enc_keys(), LW_SHARED) || LWLockHeldByMeInMode(tde_lwlock_enc_keys(), LW_EXCLUSIVE));
936977

937-
fd = pg_tde_open_file_basic(tde_filename, O_RDONLY | PG_BINARY, false);
978+
fd = pg_tde_open_file_basic(tde_filename, O_RDONLY | PG_BINARY, ignore_missing);
979+
if (ignore_missing && fd < 0)
980+
return fd;
938981

939982
pg_tde_file_header_read(tde_filename, fd, &fheader, &bytes_read);
940983
*curr_pos = bytes_read;
@@ -1187,7 +1230,7 @@ pg_tde_read_last_wal_key(void)
11871230
}
11881231
pg_tde_set_db_file_path(rlocator.dbOid, db_map_path);
11891232

1190-
fd = pg_tde_open_file_read(db_map_path, &read_pos);
1233+
fd = pg_tde_open_file_read(db_map_path, false, &read_pos);
11911234
fsize = lseek(fd, 0, SEEK_END);
11921235
/* No keys */
11931236
if (fsize == TDE_FILE_HEADER_SIZE)
@@ -1230,7 +1273,7 @@ pg_tde_fetch_wal_keys(XLogRecPtr start_lsn)
12301273

12311274
pg_tde_set_db_file_path(rlocator.dbOid, db_map_path);
12321275

1233-
fd = pg_tde_open_file_read(db_map_path, &read_pos);
1276+
fd = pg_tde_open_file_read(db_map_path, false, &read_pos);
12341277

12351278
keys_count = (lseek(fd, 0, SEEK_END) - TDE_FILE_HEADER_SIZE) / MAP_ENTRY_SIZE;
12361279

contrib/pg_tde/src/include/access/pg_tde_tdemap.h

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ pg_tde_set_db_file_path(Oid dbOid, char *path)
111111
}
112112

113113
extern InternalKey *GetSMGRRelationKey(RelFileLocatorBackend rel);
114+
extern int pg_tde_count_relations(Oid dbOid);
114115

115116
extern void pg_tde_delete_tde_files(Oid dbOid);
116117

contrib/pg_tde/src/include/pg_tde_event_capture.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ typedef struct TdeCreateEvent
2626
* based on earlier encryption status. */
2727
} TdeCreateEvent;
2828

29+
extern void TdeEventCaptureInit(void);
2930
extern TdeCreateEvent *GetCurrentTdeCreateEvent(void);
30-
3131
extern void validateCurrentEventTriggerState(bool mightStartTransaction);
3232

3333
#endif

contrib/pg_tde/src/pg_tde.c

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "utils/builtins.h"
3434
#include "smgr/pg_tde_smgr.h"
3535
#include "catalog/tde_global_space.h"
36+
#include "pg_tde_event_capture.h"
3637
#include "utils/percona.h"
3738
#include "pg_tde_guc.h"
3839
#include "access/tableam.h"
@@ -111,6 +112,7 @@ _PG_init(void)
111112
check_percona_api_version();
112113

113114
TdeGucInit();
115+
TdeEventCaptureInit();
114116

115117
InitializePrincipalKeyInfo();
116118
InitializeKeyProviderInfo();

contrib/pg_tde/src/pg_tde_event_capture.c

+163
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "utils/rel.h"
1616
#include "utils/builtins.h"
1717
#include "catalog/pg_class.h"
18+
#include "catalog/pg_database.h"
1819
#include "commands/defrem.h"
1920
#include "commands/sequence.h"
2021
#include "access/table.h"
@@ -23,8 +24,13 @@
2324
#include "catalog/namespace.h"
2425
#include "commands/event_trigger.h"
2526
#include "common/pg_tde_utils.h"
27+
#include "storage/lmgr.h"
28+
#include "tcop/utility.h"
29+
#include "utils/fmgroids.h"
30+
#include "utils/syscache.h"
2631
#include "pg_tde_event_capture.h"
2732
#include "pg_tde_guc.h"
33+
#include "access/pg_tde_tdemap.h"
2834
#include "catalog/tde_principal_key.h"
2935
#include "miscadmin.h"
3036
#include "access/tableam.h"
@@ -34,6 +40,7 @@
3440
static TdeCreateEvent tdeCurrentCreateEvent = {.tid = {.value = 0}};
3541
static bool alterSetAccessMethod = false;
3642

43+
static Oid get_db_oid(const char *name);
3744
static void reset_current_tde_create_event(void);
3845
static Oid get_tde_table_am_oid(void);
3946

@@ -315,3 +322,159 @@ get_tde_table_am_oid(void)
315322
{
316323
return get_table_am_oid("tde_heap", false);
317324
}
325+
326+
static ProcessUtility_hook_type next_ProcessUtility_hook = NULL;
327+
328+
/*
329+
* Handles utility commands which we cannot handle in the event trigger.
330+
*/
331+
static void
332+
pg_tde_proccess_utility(PlannedStmt *pstmt,
333+
const char *queryString,
334+
bool readOnlyTree,
335+
ProcessUtilityContext context,
336+
ParamListInfo params,
337+
QueryEnvironment *queryEnv,
338+
DestReceiver *dest,
339+
QueryCompletion *qc)
340+
{
341+
Node *parsetree = pstmt->utilityStmt;
342+
343+
switch (nodeTag(parsetree))
344+
{
345+
case T_CreatedbStmt:
346+
347+
CreatedbStmt *stmt = castNode(CreatedbStmt, parsetree);
348+
ListCell *option;
349+
char *dbtemplate = "template1";
350+
char *strategy = "wal_log";
351+
352+
foreach(option, stmt->options)
353+
{
354+
DefElem *defel = (DefElem *) lfirst(option);
355+
356+
if (strcmp(defel->defname, "template") == 0)
357+
dbtemplate = defGetString(defel);
358+
else if (strcmp(defel->defname, "strategy") == 0)
359+
strategy = defGetString(defel);
360+
}
361+
362+
if (pg_strcasecmp(strategy, "file_copy") == 0)
363+
{
364+
Oid dbOid = get_db_oid(dbtemplate);
365+
366+
if (dbOid != InvalidOid)
367+
{
368+
int count = pg_tde_count_relations(dbOid);
369+
370+
if (count > 0)
371+
ereport(ERROR,
372+
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
373+
errmsg("The FILE_COPY strategy cannot be used when there are encrypted objects in the template database: %d objects found", count),
374+
errhint("Use the WAL_LOG strategy instead."));
375+
}
376+
}
377+
break;
378+
default:
379+
break;
380+
}
381+
382+
if (next_ProcessUtility_hook)
383+
(*next_ProcessUtility_hook) (pstmt, queryString, readOnlyTree,
384+
context, params, queryEnv,
385+
dest, qc);
386+
else
387+
standard_ProcessUtility(pstmt, queryString, readOnlyTree,
388+
context, params, queryEnv,
389+
dest, qc);
390+
}
391+
392+
void
393+
TdeEventCaptureInit(void)
394+
{
395+
next_ProcessUtility_hook = ProcessUtility_hook;
396+
ProcessUtility_hook = pg_tde_proccess_utility;
397+
}
398+
399+
/*
400+
* A stripped down version of get_db_info() from src/backend/commands/dbcommands.c
401+
*/
402+
static Oid
403+
get_db_oid(const char *name)
404+
{
405+
Oid resDbOid = InvalidOid;
406+
Relation relation;
407+
408+
Assert(name);
409+
410+
relation = table_open(DatabaseRelationId, AccessShareLock);
411+
412+
/*
413+
* Loop covers the rare case where the database is renamed before we can
414+
* lock it. We try again just in case we can find a new one of the same
415+
* name.
416+
*/
417+
for (;;)
418+
{
419+
ScanKeyData scanKey;
420+
SysScanDesc scan;
421+
HeapTuple tuple;
422+
Oid dbOid;
423+
424+
/*
425+
* there's no syscache for database-indexed-by-name, so must do it the
426+
* hard way
427+
*/
428+
ScanKeyInit(&scanKey,
429+
Anum_pg_database_datname,
430+
BTEqualStrategyNumber, F_NAMEEQ,
431+
CStringGetDatum(name));
432+
433+
scan = systable_beginscan(relation, DatabaseNameIndexId, true,
434+
NULL, 1, &scanKey);
435+
436+
tuple = systable_getnext(scan);
437+
438+
if (!HeapTupleIsValid(tuple))
439+
{
440+
/* definitely no database of that name */
441+
systable_endscan(scan);
442+
break;
443+
}
444+
445+
dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
446+
447+
systable_endscan(scan);
448+
449+
/*
450+
* Now that we have a database OID, we can try to lock the DB.
451+
*/
452+
LockSharedObject(DatabaseRelationId, dbOid, 0, AccessExclusiveLock);
453+
454+
/*
455+
* And now, re-fetch the tuple by OID. If it's still there and still
456+
* the same name, we win; else, drop the lock and loop back to try
457+
* again.
458+
*/
459+
tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
460+
if (HeapTupleIsValid(tuple))
461+
{
462+
Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
463+
464+
if (strcmp(name, NameStr(dbform->datname)) == 0)
465+
{
466+
resDbOid = dbOid;
467+
ReleaseSysCache(tuple);
468+
break;
469+
}
470+
/* can only get here if it was just renamed */
471+
ReleaseSysCache(tuple);
472+
}
473+
474+
UnlockSharedObject(DatabaseRelationId, dbOid, 0, AccessExclusiveLock);
475+
}
476+
477+
table_close(relation, AccessShareLock);
478+
479+
return resDbOid;
480+
}

0 commit comments

Comments
 (0)