@@ -99,6 +99,13 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int):
9999 with connect (node .http_url , error_trace = True ) as conn :
100100 new_shards = init_data (conn , node .version , shards , replicas )
101101 expected_active_shards += new_shards
102+ if node .version >= (5 , 7 , 0 ):
103+ remote_cluster = self ._new_cluster (path .from_version , 1 , settings = settings , explicit_discovery = False )
104+ remote_cluster .start ()
105+ remote_node = remote_cluster .node ()
106+ with connect (remote_node .http_url , error_trace = True ) as remote_conn :
107+ new_shards = init_foreign_data_wrapper_data (conn , remote_conn , node .addresses .psql .port , remote_node .addresses .psql .port )
108+ expected_active_shards += new_shards
102109
103110 for idx , node in enumerate (cluster ):
104111 # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.
@@ -129,6 +136,10 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int):
129136 c = conn .cursor ()
130137 new_shards = self ._test_queries_on_new_node (idx , c , node , new_node , nodes , shards , expected_active_shards )
131138 expected_active_shards += new_shards
139+ if node .version >= (5 , 7 , 0 ):
140+ assert remote_node is not None
141+ with connect (remote_node .http_url , error_trace = True ) as remote_conn :
142+ test_foreign_data_wrapper (self , conn , remote_conn )
132143
133144 # Finally validate that all shards (primaries and replicas) of all partitions are started
134145 # and writes into the partitioned table while upgrading were successful
@@ -328,3 +339,44 @@ def init_data(conn: Connection, version: tuple[int, int, int], shards: int, repl
328339 c .execute ("INSERT INTO doc.parted (id, value) VALUES (1, 1)" )
329340 new_shards += shards
330341 return new_shards
342+
343+
344+ def init_foreign_data_wrapper_data (local_conn : Connection , remote_conn : Connection , local_psql_port : int , remote_psql_port : int ) -> int :
345+ assert 5430 <= local_psql_port <= 5440 and 5430 <= remote_psql_port <= 5440
346+
347+ c = local_conn .cursor ()
348+ rc = remote_conn .cursor ()
349+
350+ c .execute ("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)" )
351+ rc .execute ("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)" )
352+ new_shards = 1
353+
354+ rc .execute (f"CREATE SERVER source FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{ local_psql_port } /')" )
355+ c .execute (f"CREATE SERVER remote FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{ remote_psql_port } /')" )
356+
357+ rc .execute ("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER source OPTIONS (schema_name 'doc', table_name 'y')" )
358+ c .execute ("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER remote OPTIONS (schema_name 'doc', table_name 'y')" )
359+
360+ wait_for_active_shards (c )
361+ wait_for_active_shards (rc )
362+
363+ return new_shards
364+
365+
366+ def test_foreign_data_wrapper (self , local_conn : Connection , remote_conn : Connection ):
367+ c = local_conn .cursor ()
368+ rc = remote_conn .cursor ()
369+
370+ rc .execute ("select count(a) from doc.remote_y" )
371+ count = rc .fetchall ()[0 ][0 ]
372+ c .execute ("insert into doc.y values (1)" )
373+ c .execute ("refresh table doc.y" )
374+ rc .execute ("select count(a) from doc.remote_y" )
375+ self .assertEqual (rc .fetchall ()[0 ][0 ], count + 1 )
376+
377+ c .execute ("select count(a) from doc.remote_y" )
378+ count = c .fetchall ()[0 ][0 ]
379+ rc .execute ("insert into doc.y values (1)" )
380+ rc .execute ("refresh table doc.y" )
381+ c .execute ("select count(a) from doc.remote_y" )
382+ self .assertEqual (c .fetchall ()[0 ][0 ], count + 1 )
0 commit comments