Skip to content

Commit

Permalink
Implement decreasing read count on release
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Nov 19, 2024
1 parent 905ad5a commit df0036f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void onDeallocate(Runnable r) {

@Override
protected final void deallocate() {
beforeDeallocate();
// This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it
if (onDeallocate != null) {
try {
Expand All @@ -153,6 +154,15 @@ protected final void deallocate() {
recyclerHandle.recycle(self());
}

/**
* This method is called just before the object is deallocated.
* Subclasses can override this method to run actions before the fields
* of the object are cleared and the object gets recycled.
*/
protected void beforeDeallocate() {
// No-op
}

/**
* This method is called just before the object is recycled. Subclasses can override this methods to cleanup
* the object before it is returned to the pool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
}
};

private boolean decreaseReadCountOnRelease;

public static EntryImpl create(LedgerEntry ledgerEntry, int expectedReadCount) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
Expand Down Expand Up @@ -91,10 +93,29 @@ public static EntryImpl create(Entry other) {
entry.readCountHandler = (EntryReadCountHandlerImpl) other.getReadCountHandler();
entry.setDataBuffer(other.getDataBuffer().retainedDuplicate());
entry.setRefCnt(1);
entry.decreaseReadCountOnRelease = true;
return entry;
}

private EntryImpl(Recycler.Handle<EntryImpl> recyclerHandle) {
super(recyclerHandle);
}

@Override
protected void beforeDeallocate() {
super.beforeDeallocate();
if (decreaseReadCountOnRelease && readCountHandler != null) {
readCountHandler.markRead();
}
}

@Override
protected void beforeRecycle() {
super.beforeRecycle();
decreaseReadCountOnRelease = false;
}

public void setDecreaseReadCountOnRelease(boolean enabled) {
decreaseReadCountOnRelease = enabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ public void attach(CompletableFuture<List<Entry>> handle) {
callback.callback.readEntriesComplete(copy, callback.ctx);
}
for (Entry entry : entriesToReturn) {
// don't decrease the read count when these entries are released
((EntryImpl) entry).setDecreaseReadCountOnRelease(false);
entry.release();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio
}
callback.readEntriesComplete(entriesToReturn, ctx);
} else {
// TODO: consider reusing the partially cached entries and only reading the missing ones
if (!cachedEntries.isEmpty()) {
cachedEntries.forEach(entry -> entry.release());
}
Expand Down

0 comments on commit df0036f

Please sign in to comment.