Skip to content

Commit f58ad8c

Browse files
committed
Implement decreasing read count on release
1 parent c0750c3 commit f58ad8c

File tree

4 files changed

+34
-0
lines changed

4 files changed

+34
-0
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AbstractEntryImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public void onDeallocate(Runnable r) {
133133

134134
@Override
135135
protected final void deallocate() {
136+
beforeDeallocate();
136137
// This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it
137138
if (onDeallocate != null) {
138139
try {
@@ -153,6 +154,15 @@ protected final void deallocate() {
153154
recyclerHandle.recycle(self());
154155
}
155156

157+
/**
158+
* This method is called just before the object is deallocated.
159+
* Subclasses can override this method to run actions before the fields
160+
* of the object are cleared and the object gets recycled.
161+
*/
162+
protected void beforeDeallocate() {
163+
// No-op
164+
}
165+
156166
/**
157167
* This method is called just before the object is recycled. Subclasses can override this methods to cleanup
158168
* the object before it is returned to the pool.

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
3434
}
3535
};
3636

37+
private boolean decreaseReadCountOnRelease;
38+
3739
public static EntryImpl create(LedgerEntry ledgerEntry, int expectedReadCount) {
3840
EntryImpl entry = RECYCLER.get();
3941
entry.timestamp = System.nanoTime();
@@ -91,10 +93,29 @@ public static EntryImpl create(Entry other) {
9193
entry.readCountHandler = (EntryReadCountHandlerImpl) other.getReadCountHandler();
9294
entry.setDataBuffer(other.getDataBuffer().retainedDuplicate());
9395
entry.setRefCnt(1);
96+
entry.decreaseReadCountOnRelease = true;
9497
return entry;
9598
}
9699

97100
private EntryImpl(Recycler.Handle<EntryImpl> recyclerHandle) {
98101
super(recyclerHandle);
99102
}
103+
104+
@Override
105+
protected void beforeDeallocate() {
106+
super.beforeDeallocate();
107+
if (decreaseReadCountOnRelease && readCountHandler != null) {
108+
readCountHandler.markRead();
109+
}
110+
}
111+
112+
@Override
113+
protected void beforeRecycle() {
114+
super.beforeRecycle();
115+
decreaseReadCountOnRelease = false;
116+
}
117+
118+
public void setDecreaseReadCountOnRelease(boolean enabled) {
119+
decreaseReadCountOnRelease = enabled;
120+
}
100121
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,8 @@ public void attach(CompletableFuture<List<Entry>> handle) {
287287
callback.callback.readEntriesComplete(copy, callback.ctx);
288288
}
289289
for (Entry entry : entriesToReturn) {
290+
// don't decrease the read count when these entries are released
291+
((EntryImpl) entry).setDecreaseReadCountOnRelease(false);
290292
entry.release();
291293
}
292294
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio
405405
}
406406
callback.readEntriesComplete(entriesToReturn, ctx);
407407
} else {
408+
// TODO: consider reusing the partially cached entries and only reading the missing ones
408409
if (!cachedEntries.isEmpty()) {
409410
cachedEntries.forEach(entry -> entry.release());
410411
}

0 commit comments

Comments
 (0)