diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AbstractEntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AbstractEntryImpl.java index 6c20b388337ba..8f52f26fe94c1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AbstractEntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AbstractEntryImpl.java @@ -133,6 +133,7 @@ public void onDeallocate(Runnable r) { @Override protected final void deallocate() { + beforeDeallocate(); // This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it if (onDeallocate != null) { try { @@ -153,6 +154,15 @@ protected final void deallocate() { recyclerHandle.recycle(self()); } + /** + * This method is called just before the object is deallocated. + * Subclasses can override this method to run actions before the fields + * of the object are cleared and the object gets recycled. + */ + protected void beforeDeallocate() { + // No-op + } + /** * This method is called just before the object is recycled. Subclasses can override this methods to cleanup * the object before it is returned to the pool. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index bd4b927f31365..8cb16531ed3ac 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -34,6 +34,8 @@ protected EntryImpl newObject(Handle handle) { } }; + private boolean decreaseReadCountOnRelease; + public static EntryImpl create(LedgerEntry ledgerEntry, int expectedReadCount) { EntryImpl entry = RECYCLER.get(); entry.timestamp = System.nanoTime(); @@ -91,10 +93,29 @@ public static EntryImpl create(Entry other) { entry.readCountHandler = (EntryReadCountHandlerImpl) other.getReadCountHandler(); entry.setDataBuffer(other.getDataBuffer().retainedDuplicate()); entry.setRefCnt(1); + entry.decreaseReadCountOnRelease = true; return entry; } private EntryImpl(Recycler.Handle recyclerHandle) { super(recyclerHandle); } + + @Override + protected void beforeDeallocate() { + super.beforeDeallocate(); + if (decreaseReadCountOnRelease && readCountHandler != null) { + readCountHandler.markRead(); + } + } + + @Override + protected void beforeRecycle() { + super.beforeRecycle(); + decreaseReadCountOnRelease = false; + } + + public void setDecreaseReadCountOnRelease(boolean enabled) { + decreaseReadCountOnRelease = enabled; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java index 1cbd441287704..7462a7ee6889b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java @@ -287,6 +287,8 @@ public void attach(CompletableFuture> handle) { callback.callback.readEntriesComplete(copy, callback.ctx); } for (Entry entry : entriesToReturn) { + // don't decrease the read count when these entries are released + ((EntryImpl) entry).setDecreaseReadCountOnRelease(false); entry.release(); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index ea2166c3a4cde..11f951b73c5f9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -405,6 +405,7 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio } callback.readEntriesComplete(entriesToReturn, ctx); } else { + // TODO: consider reusing the partially cached entries and only reading the missing ones if (!cachedEntries.isEmpty()) { cachedEntries.forEach(entry -> entry.release()); }