18
18
19
19
package org .apache .hadoop .hive .cli ;
20
20
21
+ import org .apache .commons .compress .archivers .tar .TarArchiveEntry ;
22
+ import org .apache .commons .compress .archivers .tar .TarArchiveInputStream ;
21
23
import org .apache .hadoop .conf .Configuration ;
22
24
import org .apache .hadoop .hive .cli .control .CliAdapter ;
23
25
import org .apache .hadoop .hive .cli .control .CliConfigs ;
35
37
import org .junit .runner .RunWith ;
36
38
import org .junit .runners .Parameterized ;
37
39
import org .junit .runners .Parameterized .Parameters ;
40
+ import org .slf4j .Logger ;
38
41
import org .slf4j .LoggerFactory ;
39
- import org .testcontainers .containers .BindMode ;
40
42
import org .testcontainers .containers .output .Slf4jLogConsumer ;
41
43
import org .testcontainers .containers .wait .strategy .Wait ;
42
44
import org .testcontainers .containers .wait .strategy .WaitAllStrategy ;
54
56
import java .time .Duration ;
55
57
import java .util .Comparator ;
56
58
import java .util .List ;
59
+ import java .util .concurrent .ExecutorService ;
60
+ import java .util .concurrent .Executors ;
57
61
58
62
@ RunWith (Parameterized .class )
59
63
public class TestIcebergRESTCatalogGravitinoLlapLocalCliDriver {
60
64
65
+ private static final Logger LOG = LoggerFactory .getLogger (TestIcebergRESTCatalogGravitinoLlapLocalCliDriver .class );
66
+
61
67
private static final String CATALOG_NAME = "ice01" ;
62
68
private static final String GRAVITINO_CONF_FILE_TEMPLATE = "gravitino-h2-test-template.conf" ;
63
69
private static final String GRAVITINO_ROOT_DIR = "/root/gravitino-iceberg-rest-server" ;
64
- private static final long GRAVITINO_STARTUP_TIMEOUT_MINUTES = 5l ;
70
+ private static final long GRAVITINO_STARTUP_TIMEOUT_MINUTES = 5L ;
65
71
66
72
private static final String GRAVITINO_STARTUP_SCRIPT = GRAVITINO_ROOT_DIR + "/bin/start-iceberg-rest-server.sh" ;
67
73
private static final String GRAVITINO_H2_LIB = GRAVITINO_ROOT_DIR + "/libs/h2-driver.jar" ;
@@ -76,6 +82,7 @@ public class TestIcebergRESTCatalogGravitinoLlapLocalCliDriver {
76
82
77
83
private GenericContainer <?> gravitinoContainer ;
78
84
private Path warehouseDir ;
85
+ private final ExecutorService fileSyncExecutor = Executors .newSingleThreadExecutor ();
79
86
80
87
@ Parameters (name = "{0}" )
81
88
public static List <Object []> getParameters () throws Exception {
@@ -97,7 +104,8 @@ public TestIcebergRESTCatalogGravitinoLlapLocalCliDriver(String name, File qfile
97
104
public void setup () throws IOException {
98
105
createWarehouseDir ();
99
106
prepareGravitinoConfig ();
100
- setupGravitinoContainer ();
107
+ startGravitinoContainer ();
108
+ startWarehouseDirSync ();
101
109
102
110
String host = gravitinoContainer .getHost ();
103
111
Integer port = gravitinoContainer .getMappedPort (9001 );
@@ -117,6 +125,8 @@ public void teardown() {
117
125
gravitinoContainer .stop ();
118
126
}
119
127
128
+ fileSyncExecutor .shutdownNow ();
129
+
120
130
if (warehouseDir != null && Files .exists (warehouseDir )) {
121
131
try (var paths = Files .walk (warehouseDir )) {
122
132
paths .sorted (Comparator .reverseOrder ())
@@ -128,23 +138,17 @@ public void teardown() {
128
138
}
129
139
}
130
140
131
- private void setupGravitinoContainer () {
141
+ private void startGravitinoContainer () {
132
142
gravitinoContainer = new GenericContainer <>(GRAVITINO_IMAGE )
133
143
.withExposedPorts (9001 )
134
144
// Update entrypoint to create the warehouse directory before starting the server
135
- .withCreateContainerCmdModifier (cmd -> {
136
- cmd .withEntrypoint ("bash" , "-c" ,
137
- String .format ("mkdir -p %s && exec %s" , warehouseDir .toString (), GRAVITINO_STARTUP_SCRIPT ));
138
- })
145
+ .withCreateContainerCmdModifier (cmd -> cmd .withEntrypoint ("bash" , "-c" ,
146
+ String .format ("mkdir -p %s && exec %s" , warehouseDir .toString (), GRAVITINO_STARTUP_SCRIPT )))
139
147
// Mount gravitino configuration file
140
148
.withCopyFileToContainer (
141
149
MountableFile .forHostPath (Paths .get (warehouseDir .toString (), GRAVITINO_CONF_FILE_TEMPLATE )),
142
150
GRAVITINO_CONF_FILE
143
151
)
144
- // Bind the warehouse directory into the container on the same path to keep gravitino and hive aligned
145
- .withFileSystemBind (
146
- warehouseDir .toString (), warehouseDir .toString (), BindMode .READ_WRITE
147
- )
148
152
// Mount the H2 driver JAR into the server's lib directory
149
153
.withCopyFileToContainer (
150
154
MountableFile .forHostPath (
@@ -165,6 +169,85 @@ private void setupGravitinoContainer() {
165
169
gravitinoContainer .start ();
166
170
}
167
171
172
+ /**
173
+ * Starts a background daemon that continuously synchronizes the Iceberg warehouse
174
+ * directory from the running Gravitino container to the host file system.
175
+ *
176
+ * <p>In CI environments, Testcontainers' {@code .withFileSystemBind()} cannot reliably
177
+ * bind the same host path to the same path inside the container, especially when
178
+ * using remote Docker hosts or Docker-in-Docker setups. This causes the container's
179
+ * writes (e.g., Iceberg metadata files like {@code .metadata.json}) to be invisible
180
+ * on the host.</p>
181
+ *
182
+ * <p>This method works around that limitation by repeatedly copying new files from
183
+ * the container's warehouse directory to the corresponding host directory. Existing
184
+ * files on the host are preserved, and only files that do not yet exist are copied.
185
+ * The sync runs every 1 second while the container is running.</p>
186
+ *
187
+ * <p>Each archive copy from the container is extracted using a {@link TarArchiveInputStream},
188
+ * and directories are created as needed. Files that already exist on the host are skipped
189
+ * to avoid overwriting container data.</p>
190
+ */
191
+ private void startWarehouseDirSync () {
192
+ fileSyncExecutor .submit (() -> {
193
+ try {
194
+ while (gravitinoContainer .isRunning ()) {
195
+ try (InputStream tarStream = gravitinoContainer .getDockerClient ()
196
+ .copyArchiveFromContainerCmd (gravitinoContainer .getContainerId (), warehouseDir .toString ())
197
+ .exec ();
198
+ TarArchiveInputStream tis = new TarArchiveInputStream (tarStream )) {
199
+
200
+ TarArchiveEntry entry ;
201
+ while ((entry = tis .getNextTarEntry ()) != null ) {
202
+ // Skip directories because we only want to copy metadata files from the container.
203
+ // Directory structure will be created automatically via
204
+ // Files.createDirectories(outputPath.getParent()) when copying each file.
205
+ if (entry .isDirectory ()) {
206
+ continue ;
207
+ }
208
+
209
+ /* The tar entry names from the container include a prefix corresponding to the container's temporary warehouse directory, for example:
210
+ iceberg-test-1759245909247/iceberg_warehouse/ice_rest/ice_orc1/metadata/.00000-c37404c0-67a7-4355-8578-845952693a8c.metadata.json.crc
211
+
212
+ This strips the first part (container temp dir) and takes the relative path inside the warehouse,
213
+ so it maps correctly to the host warehouseDir:
214
+ /tmp/iceberg-test-1759245909247/iceberg_warehouse/ice_rest/ice_orc1/metadata/.00000-c37404c0-67a7-4355-8578-845952693a8c.metadata.json.crc
215
+
216
+ This ensures that files from the container are merged into the host warehouse without overwriting existing files. */
217
+
218
+ String [] parts = entry .getName ().split ("/" , 2 );
219
+
220
+ // Defensive check: skip entries that don’t contain a relative path under the warehouse folder.
221
+ // Normally, all entries should have at least two parts: top-level container folder + relative path.
222
+ if (parts .length < 2 ) {
223
+ continue ;
224
+ }
225
+
226
+ Path relativePath = Paths .get (parts [1 ]);
227
+ Path outputPath = warehouseDir .resolve (relativePath );
228
+
229
+ // Skip copying if the file already exists on the host.
230
+ // This ensures we merge new container files without overwriting existing ones.
231
+ if (Files .exists (outputPath )) {
232
+ continue ;
233
+ }
234
+
235
+ Files .createDirectories (outputPath .getParent ());
236
+ Files .copy (tis , outputPath );
237
+ }
238
+
239
+ } catch (Exception e ) {
240
+ LOG .error ("Warehouse folder sync failed: {}" , e .getMessage (), e );
241
+ }
242
+
243
+ Thread .sleep (1000 ); // sync every 1 second
244
+ }
245
+ } catch (InterruptedException ignored ) {
246
+ Thread .currentThread ().interrupt ();
247
+ }
248
+ });
249
+ }
250
+
168
251
private void createWarehouseDir () {
169
252
try {
170
253
warehouseDir = Paths .get ("/tmp" , "iceberg-test-" + System .currentTimeMillis ()).toAbsolutePath ();
0 commit comments