Skip to content

Commit f3abd30

Browse files
author
Dmitriy Fingerman
committed
manual sync
1 parent 00e4da4 commit f3abd30

File tree

2 files changed

+76
-12
lines changed

2 files changed

+76
-12
lines changed

iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_gravitino.q

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ show create table ice_orc2;
6666
insert into ice_orc2 partition (company_id=100)
6767
VALUES ('fn1','ln1', 1, 10), ('fn2','ln2', 2, 20), ('fn3','ln3', 3, 30);
6868

69+
--! In CI, Testcontainers' .withFileSystemBind() is not able to bind the same host path to the same container path,
70+
--! so as a workaround, the .metadata.json files from container are manually synced in a daemon process,
71+
--! since the sync can take some time, need to wait for it to happen after the insert operation.
72+
! sleep 20;
73+
6974
describe formatted ice_orc2;
7075
select * from ice_orc2;
7176

itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.java

Lines changed: 71 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.hadoop.hive.cli;
2020

21+
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
22+
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
2123
import org.apache.hadoop.conf.Configuration;
2224
import org.apache.hadoop.hive.cli.control.CliAdapter;
2325
import org.apache.hadoop.hive.cli.control.CliConfigs;
@@ -35,8 +37,8 @@
3537
import org.junit.runner.RunWith;
3638
import org.junit.runners.Parameterized;
3739
import org.junit.runners.Parameterized.Parameters;
40+
import org.slf4j.Logger;
3841
import org.slf4j.LoggerFactory;
39-
import org.testcontainers.containers.BindMode;
4042
import org.testcontainers.containers.output.Slf4jLogConsumer;
4143
import org.testcontainers.containers.wait.strategy.Wait;
4244
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
@@ -54,14 +56,18 @@
5456
import java.time.Duration;
5557
import java.util.Comparator;
5658
import java.util.List;
59+
import java.util.concurrent.ExecutorService;
60+
import java.util.concurrent.Executors;
5761

5862
@RunWith(Parameterized.class)
5963
public class TestIcebergRESTCatalogGravitinoLlapLocalCliDriver {
6064

65+
private static final Logger LOG = LoggerFactory.getLogger(TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.class);
66+
6167
private static final String CATALOG_NAME = "ice01";
6268
private static final String GRAVITINO_CONF_FILE_TEMPLATE = "gravitino-h2-test-template.conf";
6369
private static final String GRAVITINO_ROOT_DIR = "/root/gravitino-iceberg-rest-server";
64-
private static final long GRAVITINO_STARTUP_TIMEOUT_MINUTES = 5l;
70+
private static final long GRAVITINO_STARTUP_TIMEOUT_MINUTES = 5L;
6571

6672
private static final String GRAVITINO_STARTUP_SCRIPT = GRAVITINO_ROOT_DIR + "/bin/start-iceberg-rest-server.sh";
6773
private static final String GRAVITINO_H2_LIB = GRAVITINO_ROOT_DIR + "/libs/h2-driver.jar";
@@ -76,6 +82,7 @@ public class TestIcebergRESTCatalogGravitinoLlapLocalCliDriver {
7682

7783
private GenericContainer<?> gravitinoContainer;
7884
private Path warehouseDir;
85+
private final ExecutorService fileSyncExecutor = Executors.newSingleThreadExecutor();
7986

8087
@Parameters(name = "{0}")
8188
public static List<Object[]> getParameters() throws Exception {
@@ -97,7 +104,8 @@ public TestIcebergRESTCatalogGravitinoLlapLocalCliDriver(String name, File qfile
97104
public void setup() throws IOException {
98105
createWarehouseDir();
99106
prepareGravitinoConfig();
100-
setupGravitinoContainer();
107+
startGravitinoContainer();
108+
startWarehouseDirSync();
101109

102110
String host = gravitinoContainer.getHost();
103111
Integer port = gravitinoContainer.getMappedPort(9001);
@@ -117,6 +125,8 @@ public void teardown() {
117125
gravitinoContainer.stop();
118126
}
119127

128+
fileSyncExecutor.shutdownNow();
129+
120130
if (warehouseDir != null && Files.exists(warehouseDir)) {
121131
try (var paths = Files.walk(warehouseDir)) {
122132
paths.sorted(Comparator.reverseOrder())
@@ -128,23 +138,17 @@ public void teardown() {
128138
}
129139
}
130140

131-
private void setupGravitinoContainer() {
141+
private void startGravitinoContainer() {
132142
gravitinoContainer = new GenericContainer<>(GRAVITINO_IMAGE)
133143
.withExposedPorts(9001)
134144
// Update entrypoint to create the warehouse directory before starting the server
135-
.withCreateContainerCmdModifier(cmd -> {
136-
cmd.withEntrypoint("bash", "-c",
137-
String.format("mkdir -p %s && exec %s", warehouseDir.toString(), GRAVITINO_STARTUP_SCRIPT));
138-
})
145+
.withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("bash", "-c",
146+
String.format("mkdir -p %s && exec %s", warehouseDir.toString(), GRAVITINO_STARTUP_SCRIPT)))
139147
// Mount gravitino configuration file
140148
.withCopyFileToContainer(
141149
MountableFile.forHostPath(Paths.get(warehouseDir.toString(), GRAVITINO_CONF_FILE_TEMPLATE)),
142150
GRAVITINO_CONF_FILE
143151
)
144-
// Bind the warehouse directory into the container on the same path to keep gravitino and hive aligned
145-
.withFileSystemBind(
146-
warehouseDir.toString(), warehouseDir.toString(), BindMode.READ_WRITE
147-
)
148152
// Mount the H2 driver JAR into the server's lib directory
149153
.withCopyFileToContainer(
150154
MountableFile.forHostPath(
@@ -165,6 +169,61 @@ private void setupGravitinoContainer() {
165169
gravitinoContainer.start();
166170
}
167171

172+
/**
173+
* Starts a background daemon that continuously synchronizes the Iceberg warehouse
174+
* directory from the running Gravitino container to the host file system.
175+
*
176+
* <p>In CI environments, Testcontainers' {@code .withFileSystemBind()} cannot reliably
177+
* bind the same host path to the same path inside the container, especially when
178+
* using remote Docker hosts or Docker-in-Docker setups. This causes the container's
179+
* writes (e.g., Iceberg metadata files like {@code .metadata.json}) to be invisible
180+
* on the host.</p>
181+
*
182+
* <p>This method works around that limitation by repeatedly copying new files from
183+
* the container's warehouse directory to the corresponding host directory. Existing
184+
* files on the host are preserved, and only files that do not yet exist are copied.
185+
* The sync runs every 1 second while the container is running.</p>
186+
*
187+
* <p>Each archive copy from the container is extracted using a {@link TarArchiveInputStream},
188+
* and directories are created as needed. Files that already exist on the host are skipped
189+
* to avoid overwriting container data.</p>
190+
*/
191+
private void startWarehouseDirSync() {
192+
fileSyncExecutor.submit(() -> {
193+
try {
194+
while (gravitinoContainer.isRunning()) {
195+
try (InputStream tarStream = gravitinoContainer.getDockerClient()
196+
.copyArchiveFromContainerCmd(gravitinoContainer.getContainerId(), warehouseDir.toString())
197+
.exec();
198+
TarArchiveInputStream tis = new TarArchiveInputStream(tarStream)) {
199+
200+
TarArchiveEntry entry;
201+
while ((entry = tis.getNextTarEntry()) != null) {
202+
if (entry.isDirectory()) continue;
203+
204+
String[] parts = entry.getName().split("/", 2);
205+
if (parts.length < 2) continue;
206+
Path relativePath = Paths.get(parts[1]);
207+
208+
Path outputPath = warehouseDir.resolve(relativePath);
209+
if (Files.exists(outputPath)) continue;
210+
211+
Files.createDirectories(outputPath.getParent());
212+
Files.copy(tis, outputPath);
213+
}
214+
215+
} catch (Exception e) {
216+
LOG.error("Warehouse folder sync failed: {}", e.getMessage(), e);
217+
}
218+
219+
Thread.sleep(1000); // sync every 1 second
220+
}
221+
} catch (InterruptedException ignored) {
222+
Thread.currentThread().interrupt();
223+
}
224+
});
225+
}
226+
168227
private void createWarehouseDir() {
169228
try {
170229
warehouseDir = Paths.get("/tmp", "iceberg-test-" + System.currentTimeMillis()).toAbsolutePath();

0 commit comments

Comments
 (0)