diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index 12e0e57e616d..b9ed4bf9c0f5 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -68,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class TestFileSystemRepository { @@ -154,28 +155,58 @@ public void testIsArchived() { @Test @Timeout(30) public void testClaimsArchivedWhenMarkedDestructable() throws IOException, InterruptedException { - final ContentClaim contentClaim = repository.create(false); - final Map containerPaths = nifiProperties.getContentRepositoryPaths(); - assertEquals(1, containerPaths.size()); - final String containerName = containerPaths.keySet().iterator().next(); - - try (final OutputStream out = repository.write(contentClaim)) { - long bytesWritten = 0L; - final byte[] bytes = "Hello World".getBytes(StandardCharsets.UTF_8); + // Release the repository created during setup and use one that reports no disk pressure. Otherwise, on a + // host whose content repository partition is nearly full, the background archive cleanup task can reclaim + // the freshly archived claim before this test observes it, leaving the archive count oscillating at 0. + shutdown(); - while (bytesWritten <= maxClaimLength) { - out.write(bytes); - bytesWritten += bytes.length; + final FileSystemRepository localRepository = new FileSystemRepository(nifiProperties) { + @Override + public long getContainerUsableSpace(final String containerName) { + return Long.MAX_VALUE; } - } + }; + + try { + final StandardResourceClaimManager localClaimManager = new StandardResourceClaimManager(); + localRepository.initialize(new StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP)); + localRepository.purge(); - assertEquals(0, repository.getArchiveCount(containerName)); - assertEquals(0, claimManager.decrementClaimantCount(contentClaim.getResourceClaim())); - claimManager.markDestructable(contentClaim.getResourceClaim()); + final ContentClaim contentClaim = localRepository.create(false); + final Map containerPaths = nifiProperties.getContentRepositoryPaths(); + assertEquals(1, containerPaths.size()); + final String containerName = containerPaths.keySet().iterator().next(); + + try (final OutputStream out = localRepository.write(contentClaim)) { + long bytesWritten = 0L; + final byte[] bytes = "Hello World".getBytes(StandardCharsets.UTF_8); + + while (bytesWritten <= maxClaimLength) { + out.write(bytes); + bytesWritten += bytes.length; + } + } - // The claim should become archived but it may take a few seconds, as it's handled by background threads - while (repository.getArchiveCount(containerName) != 1) { - Thread.sleep(50L); + assertEquals(0, localRepository.getArchiveCount(containerName)); + assertEquals(0, localClaimManager.decrementClaimantCount(contentClaim.getResourceClaim())); + localClaimManager.markDestructable(contentClaim.getResourceClaim()); + + // The claim is archived by background threads, so poll until the archive count reflects it. With disk + // pressure disabled the count remains at 1 once archived, so this terminates reliably within a few seconds. + final long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); + while (localRepository.getArchiveCount(containerName) != 1) { + if (System.nanoTime() > deadline) { + final Path livePath = getPath(localRepository, contentClaim); + final Path archivePath = livePath == null ? null : FileSystemRepository.getArchivePath(livePath); + fail("Claim was not archived for container %s; archive count is %d. Live path %s exists=%s, archive path %s exists=%s".formatted( + containerName, localRepository.getArchiveCount(containerName), + livePath, livePath != null && Files.exists(livePath), + archivePath, archivePath != null && Files.exists(archivePath))); + } + Thread.sleep(50L); + } + } finally { + localRepository.shutdown(); } }