Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lh fix individual read deduplication and limiting #195

Open
wants to merge 79 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
2d2bab4
Add validation to getManagedLedgerMaxReadsInFlightSizeInMB
lhotari Oct 21, 2024
e9c1b2d
Don't divide by 0 if it can be avoided
lhotari Oct 21, 2024
2302e22
Estimate entry size for InflightReadsLimiter by keeping stats
lhotari Oct 21, 2024
41b48c1
Test limiting by bytes size
lhotari Oct 21, 2024
351e173
Avoid exception which commonly happens
lhotari Oct 22, 2024
4d5221f
Add debug logging to InflightReadsLimiter
lhotari Oct 22, 2024
ff06af5
Clear the cache in the test
lhotari Oct 22, 2024
1faba30
Add logging
lhotari Jan 23, 2025
9f238dc
Fix read limits for individual reads
lhotari Oct 22, 2024
7f02c13
Reduce timeout
lhotari Jan 23, 2025
d0f9028
Improve asyncReplayEntries
lhotari Oct 23, 2024
3fdfbfe
Refactor to have a single replay method
lhotari Oct 23, 2024
d4b9811
Optimize filtering deleted messages
lhotari Jan 23, 2025
d35c3f6
deprecate synchronous replayEntries method
lhotari Oct 23, 2024
877975f
move asyncReplayEntries without sortEntries as a default method
lhotari Oct 23, 2024
56ebde8
Perform replay reads in ranges
lhotari Oct 23, 2024
dd12e2c
Make test to fail
lhotari Jan 23, 2025
d9dd491
Fix disabled timeout in InflightReadsLimiter
lhotari Oct 23, 2024
4641158
Add queuing to InflightReadsLimiter when the limit is reached
lhotari Nov 1, 2024
5a171ff
revisit
lhotari Nov 1, 2024
1f3522e
Ensure that cached entry readerIndex is not tied to the original one
lhotari Nov 1, 2024
0963d97
Adjust InflightReadsLimiterTest
lhotari Nov 4, 2024
6a1f135
Add timeout executor to InflightReadsLimiter
lhotari Nov 1, 2024
56283c0
Polish
lhotari Nov 4, 2024
688c8dc
Fix test
lhotari Nov 4, 2024
e64d31a
Instantiate ArrayList directly
lhotari Nov 4, 2024
20a78c1
Support caching replayed entries
lhotari Nov 4, 2024
421273c
Start addressing the removal issue with a removal queue
lhotari Nov 4, 2024
eefcf5b
Move EntryWrapper to upper level
lhotari Nov 4, 2024
d71a559
Move RangeCache to cache package
lhotari Nov 4, 2024
5900a63
Start adding removal queue
lhotari Nov 4, 2024
b1c3b61
Move towards adding removal queue changes
lhotari Nov 4, 2024
def470f
More moves towards removal queue solution
lhotari Nov 4, 2024
b1513c3
Handle removing by size in scheduled task so that blocking would be rare
lhotari Nov 4, 2024
f7bdab5
Remove unused code
lhotari Nov 5, 2024
18a3701
Update RangeCacheTest with removal queue
lhotari Nov 5, 2024
3d08eb7
Fix imports
lhotari Nov 5, 2024
da81c2a
Add license headers
lhotari Nov 5, 2024
914c6a6
Handle all cache evictions by the same thread
lhotari Nov 5, 2024
c6637e0
Move RangeCacheRemovalCounters to top level
lhotari Nov 8, 2024
0ecf766
Use removal queue in RangeCacheTest
lhotari Nov 8, 2024
857bf93
Handle eviction
lhotari Nov 10, 2024
c85347b
Handle adding atomically to removal queue and cache
lhotari Nov 12, 2024
69185d5
Improve javadoc
lhotari Nov 12, 2024
04d016d
Use unbounded queue for removal queue since cache size is bounded by …
lhotari Nov 12, 2024
c784fbb
Refactor eviction when cache size exceeds evictionTriggerThreshold
lhotari Nov 12, 2024
aa2c363
Start adding cacheEvictionByExpectedReadCount
lhotari Nov 12, 2024
157208f
Add CachedEntry
lhotari Nov 12, 2024
c1d9083
Activate cursor when consumers disconnect and connect
lhotari Nov 12, 2024
16e2c4d
disable checkCursorsToCacheEntries when eviction by read count is ena…
lhotari Nov 12, 2024
b11e71a
Always keep cursors with connected consumers in "active" state when e…
lhotari Nov 12, 2024
852c5d7
Add a way to find cursors before current cursor
lhotari Nov 12, 2024
d475488
Pass predicate for deciding whether to cache or not
lhotari Nov 12, 2024
d508b85
Reduce coupling to EntryImpl
lhotari Nov 12, 2024
87c8b16
Extract abstract base class to be used for Entry implementations
lhotari Nov 12, 2024
8bc8979
Introduce CachedEntry for eviction by read count implementation
lhotari Nov 12, 2024
bd3c616
Remove generics type parameters from RangeCache since there's no need…
lhotari Nov 13, 2024
3abfc2b
Add TODO about the solution for handling skipping of the entries
lhotari Nov 13, 2024
5b2a783
Add note of replacing the use of peek
lhotari Nov 13, 2024
11ef94d
update design
lhotari Nov 13, 2024
656ca9d
Evaluate entry size when it gets added
lhotari Nov 14, 2024
5179f24
Fix issue in getting the entry
lhotari Nov 14, 2024
fe23fe4
Fix RangeCacheTest
lhotari Nov 14, 2024
16f0050
Remove generics
lhotari Nov 14, 2024
c5f219c
Add simple stash
lhotari Nov 14, 2024
1a3950f
Add note
lhotari Nov 14, 2024
fdcd8b3
Fix test that expects old defaults
lhotari Nov 14, 2024
9827e88
Adapt test to new behavior
lhotari Nov 14, 2024
091b511
Improve removal queue
lhotari Nov 14, 2024
89f0401
Fix EntryCacheManagerTest
lhotari Nov 14, 2024
eef32ec
Support configuring ManagedLedgerConfig defaults for tests
lhotari Nov 14, 2024
09672dd
Fix ManagedLedgerTests
lhotari Nov 15, 2024
7114acd
Use waitForPendingCacheEvictions in EntryCacheManagerTest
lhotari Nov 15, 2024
aecdacf
Use Double.NaN instead of Double.POSITIVE_INFINITY
lhotari Nov 15, 2024
67c8aef
Start adding expected read count solution
lhotari Nov 18, 2024
da64b02
Improve ManagedCursorContainerTest
lhotari Nov 18, 2024
2da704e
Implement getNumberOfCursorsAtSamePositionOrBefore
lhotari Nov 18, 2024
49b27cf
Implement EntryReadCountHandler
lhotari Nov 19, 2024
5525402
Implement decreasing read count on release
lhotari Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import io.netty.util.ReferenceCounted;

public interface CachedEntry extends Entry, ReferenceCounted {
boolean matchesKey(Position key);
boolean canEvict();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ public interface Entry {
/**
* @return the data
*/
@Deprecated
byte[] getData();

@Deprecated
byte[] getDataAndRelease();

/**
Expand Down Expand Up @@ -66,4 +68,8 @@ public interface Entry {
* of data reached to 0).
*/
boolean release();

default EntryReadCountHandler getReadCountHandler() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

public interface EntryReadCountHandler {
int getExpectedReadCount();
boolean incrementExpectedReadCount();
void markRead();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.bookkeeper.mledger;

import com.google.common.collect.Range;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand Down Expand Up @@ -718,6 +721,7 @@ default void asyncFindNewestMatching(FindPositionConstraint constraint, Predicat
* @throws InterruptedException
* @throws ManagedLedgerException
*/
@Deprecated
List<Entry> replayEntries(Set<? extends Position> positions)
throws InterruptedException, ManagedLedgerException;

Expand All @@ -733,8 +737,10 @@ List<Entry> replayEntries(Set<? extends Position> positions)
* @return skipped positions
* set of positions which are already deleted/acknowledged and skipped while replaying them
*/
Set<? extends Position> asyncReplayEntries(
Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx);
default Set<? extends Position> asyncReplayEntries(final Set<? extends Position> positions,
ReadEntriesCallback callback, Object ctx) {
return asyncReplayEntries(positions, callback, ctx, false);
}

/**
* Read the specified set of positions from ManagedLedger.
Expand All @@ -753,6 +759,28 @@ Set<? extends Position> asyncReplayEntries(
Set<? extends Position> asyncReplayEntries(
Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx, boolean sortEntries);

/**
* Read the specified set of positions from ManagedLedger in ranges.
* This method is used to read entries in ranges to avoid reading all entries at once.
*
* @param positions
* set of positions to read
* @param callback
* callback object returning the result of each range
* @param ctx
* opaque context
* @param invokeCallbacksInOrder
* when true, the callback will be invoked in order of the positions, otherwise the callback will be
* invoked in the order of the completion of the range read.
* @return skipped positions
* set of positions which are already deleted/acknowledged and skipped while replaying them
*/
default Set<? extends Position> asyncReplayEntriesInRanges(SortedSet<? extends Position> positions,
ManagedCursorReplayReadEntriesCallback callback,
Object ctx, boolean invokeCallbacksInOrder) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Close the cursor and releases the associated resources.
*
Expand Down Expand Up @@ -918,6 +946,24 @@ default ManagedCursorAttributes getManagedCursorAttributes() {

boolean isMessageDeleted(Position position);

/**
* Returns the deleted messages from the given positions.
* Implementation classes can override this method to provide a more efficient way to filter deleted messages.
*
* @param positions the positions to filter
* @return the set of deleted positions
*/
default Set<Position> filterDeletedMessages(Collection<? extends Position> positions) {
Set<Position> deletedPositions = new HashSet<>();
// prefer for loop to avoid creating stream related instances
for (Position position : positions) {
if (isMessageDeleted(position)) {
deletedPositions.add(position);
}
}
return deletedPositions;
}

ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException;

long[] getBatchPositionAckSet(Position position);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import java.util.List;

public interface ManagedCursorReplayReadEntriesCallback {
void readEntriesComplete(ManagedCursorReplayReadRange range, boolean isLast, List<Entry> entries, Object ctx);

void readEntriesFailed(ManagedCursorReplayReadRange range, boolean isLast, ManagedLedgerException exception,
Object ctx);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

public interface ManagedCursorReplayReadRange extends Comparable<ManagedCursorReplayReadRange> {
int rangeIndex();

int totalRanges();

Position startPosition();

Position lastPosition();

default int size() {
if (startPosition().getLedgerId() != lastPosition().getLedgerId()) {
throw new IllegalStateException("Cannot calculate size for range spanning multiple ledgers");
}
return (int) (lastPosition().getEntryId() - startPosition().getEntryId() + 1);
}

@Override
default int compareTo(ManagedCursorReplayReadRange o) {
return Integer.compare(rangeIndex(), o.rangeIndex());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public class ManagedLedgerConfig {
@Getter
@Setter
private boolean cacheEvictionByMarkDeletedPosition = false;
@Getter
@Setter
private boolean cacheEvictionByExpectedReadCount = true;
private int minimumBacklogCursorsForCaching = 0;
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ public class ManagedLedgerFactoryConfig {
*/
private long managedLedgerMaxReadsInFlightSize = 0;

/**
* Maximum time to wait for acquiring permits for max reads in flight when managedLedgerMaxReadsInFlightSizeInMB is
* set (>0) and the limit is reached.
*/
private long managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis = 60000;

/**
* Maximum number of reads that can be queued for acquiring permits for max reads in flight when
* managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit is reached.
*/
private int managedLedgerMaxReadsInFlightPermitsAcquireQueueSize = 10000;

/**
* Whether trace managed ledger task execution time.
*/
Expand Down
Loading
Loading