Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class TestFileSystemRepository {

Expand Down Expand Up @@ -154,28 +155,58 @@ public void testIsArchived() {
@Test
@Timeout(30)
public void testClaimsArchivedWhenMarkedDestructable() throws IOException, InterruptedException {
final ContentClaim contentClaim = repository.create(false);
final Map<String, Path> containerPaths = nifiProperties.getContentRepositoryPaths();
assertEquals(1, containerPaths.size());
final String containerName = containerPaths.keySet().iterator().next();

try (final OutputStream out = repository.write(contentClaim)) {
long bytesWritten = 0L;
final byte[] bytes = "Hello World".getBytes(StandardCharsets.UTF_8);
// Release the repository created during setup and use one that reports no disk pressure. Otherwise, on a
// host whose content repository partition is nearly full, the background archive cleanup task can reclaim
// the freshly archived claim before this test observes it, leaving the archive count oscillating at 0.
shutdown();

while (bytesWritten <= maxClaimLength) {
out.write(bytes);
bytesWritten += bytes.length;
final FileSystemRepository localRepository = new FileSystemRepository(nifiProperties) {
@Override
public long getContainerUsableSpace(final String containerName) {
return Long.MAX_VALUE;
}
}
};

try {
final StandardResourceClaimManager localClaimManager = new StandardResourceClaimManager();
localRepository.initialize(new StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
localRepository.purge();

assertEquals(0, repository.getArchiveCount(containerName));
assertEquals(0, claimManager.decrementClaimantCount(contentClaim.getResourceClaim()));
claimManager.markDestructable(contentClaim.getResourceClaim());
final ContentClaim contentClaim = localRepository.create(false);
final Map<String, Path> containerPaths = nifiProperties.getContentRepositoryPaths();
assertEquals(1, containerPaths.size());
final String containerName = containerPaths.keySet().iterator().next();

try (final OutputStream out = localRepository.write(contentClaim)) {
long bytesWritten = 0L;
final byte[] bytes = "Hello World".getBytes(StandardCharsets.UTF_8);

while (bytesWritten <= maxClaimLength) {
out.write(bytes);
bytesWritten += bytes.length;
}
}

// The claim should become archived but it may take a few seconds, as it's handled by background threads
while (repository.getArchiveCount(containerName) != 1) {
Thread.sleep(50L);
assertEquals(0, localRepository.getArchiveCount(containerName));
assertEquals(0, localClaimManager.decrementClaimantCount(contentClaim.getResourceClaim()));
localClaimManager.markDestructable(contentClaim.getResourceClaim());

// The claim is archived by background threads, so poll until the archive count reflects it. With disk
// pressure disabled the count remains at 1 once archived, so this terminates reliably within a few seconds.
final long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
while (localRepository.getArchiveCount(containerName) != 1) {
if (System.nanoTime() > deadline) {
final Path livePath = getPath(localRepository, contentClaim);
final Path archivePath = livePath == null ? null : FileSystemRepository.getArchivePath(livePath);
fail("Claim was not archived for container %s; archive count is %d. Live path %s exists=%s, archive path %s exists=%s".formatted(
containerName, localRepository.getArchiveCount(containerName),
livePath, livePath != null && Files.exists(livePath),
archivePath, archivePath != null && Files.exists(archivePath)));
}
Thread.sleep(50L);
}
} finally {
localRepository.shutdown();
}
}

Expand Down
Loading