Skip to content

Commit ce04587

Browse files
authored
GH-5433 Fix read lock contention in LmdbStore (#5434)
2 parents 30dc9de + 36edc7d commit ce04587

File tree

10 files changed

+495
-97
lines changed

10 files changed

+495
-97
lines changed
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2025 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.common.concurrent.locks;
12+
13+
import java.lang.invoke.VarHandle;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.LongAdder;
16+
import java.util.concurrent.locks.StampedLock;
17+
18+
/**
19+
* A lightweight read/write lock manager that avoids allocating per-lock objects. Readers are tracked using two
20+
* {@link LongAdder LongAdders} while writers rely on a {@link StampedLock}. The read-lock method returns a constant
21+
* stamp ({@link #READ_LOCK_STAMP}) and writers receive the stamp produced by the underlying {@link StampedLock}.
22+
*/
23+
public class StampedLongAdderLockManager {
24+
25+
/**
26+
* Stamp returned to callers holding a read lock. Passing any other value to {@link #unlockRead(long)} is considered
27+
* an illegal monitor state.
28+
*/
29+
public static final long READ_LOCK_STAMP = Long.MIN_VALUE;
30+
31+
private final StampedLock stampedLock = new StampedLock();
32+
private final LongAdder readersLocked = new LongAdder();
33+
private final LongAdder readersUnlocked = new LongAdder();
34+
35+
// milliseconds to wait when trying to acquire the write lock interruptibly
36+
private final int tryWriteLockMillis;
37+
38+
// Number of spin attempts before temporarily releasing the write lock when readers are active.
39+
private final int writePreference;
40+
41+
public StampedLongAdderLockManager() {
42+
this(1, 100);
43+
}
44+
45+
public StampedLongAdderLockManager(int writePreference, int tryWriteLockMillis) {
46+
this.writePreference = Math.max(1, writePreference);
47+
this.tryWriteLockMillis = Math.max(1, tryWriteLockMillis);
48+
}
49+
50+
public boolean isWriterActive() {
51+
return stampedLock.isWriteLocked();
52+
}
53+
54+
public boolean isReaderActive() {
55+
return readersUnlocked.sum() != readersLocked.sum();
56+
}
57+
58+
public void waitForActiveWriter() throws InterruptedException {
59+
while (stampedLock.isWriteLocked() && !isReaderActive()) {
60+
spinWait();
61+
}
62+
}
63+
64+
public void waitForActiveReaders() throws InterruptedException {
65+
while (isReaderActive()) {
66+
spinWait();
67+
}
68+
}
69+
70+
public long readLock() throws InterruptedException {
71+
readersLocked.increment();
72+
while (stampedLock.isWriteLocked()) {
73+
try {
74+
spinWaitAtReadLock();
75+
} catch (InterruptedException e) {
76+
readersUnlocked.increment();
77+
throw e;
78+
}
79+
}
80+
return READ_LOCK_STAMP;
81+
}
82+
83+
public long tryReadLock() {
84+
readersLocked.increment();
85+
if (!stampedLock.isWriteLocked()) {
86+
return READ_LOCK_STAMP;
87+
}
88+
readersUnlocked.increment();
89+
return 0L;
90+
}
91+
92+
public void unlockRead(long stamp) {
93+
if (stamp != READ_LOCK_STAMP) {
94+
throw new IllegalMonitorStateException("Trying to release a stamp that is not a read lock");
95+
}
96+
97+
VarHandle.acquireFence();
98+
readersUnlocked.increment();
99+
}
100+
101+
public long writeLock() throws InterruptedException {
102+
long writeStamp = writeLockInterruptibly();
103+
boolean lockAcquired = false;
104+
105+
try {
106+
int attempts = 0;
107+
do {
108+
if (Thread.interrupted()) {
109+
throw new InterruptedException();
110+
}
111+
112+
if (!hasActiveReaders()) {
113+
lockAcquired = true;
114+
break;
115+
}
116+
117+
if (attempts++ > writePreference) {
118+
attempts = 0;
119+
120+
stampedLock.unlockWrite(writeStamp);
121+
writeStamp = 0;
122+
123+
yieldWait();
124+
125+
writeStamp = writeLockInterruptibly();
126+
} else {
127+
spinWait();
128+
}
129+
130+
} while (!lockAcquired);
131+
} finally {
132+
if (!lockAcquired && writeStamp != 0) {
133+
stampedLock.unlockWrite(writeStamp);
134+
}
135+
}
136+
137+
VarHandle.releaseFence();
138+
return writeStamp;
139+
}
140+
141+
public long tryWriteLock() {
142+
long writeStamp = stampedLock.tryWriteLock();
143+
if (writeStamp == 0) {
144+
return 0L;
145+
}
146+
147+
if (!hasActiveReaders()) {
148+
VarHandle.releaseFence();
149+
return writeStamp;
150+
}
151+
152+
stampedLock.unlockWrite(writeStamp);
153+
return 0L;
154+
}
155+
156+
public void unlockWrite(long stamp) {
157+
if (stamp == 0) {
158+
throw new IllegalMonitorStateException("Trying to release a write lock that is not locked");
159+
}
160+
stampedLock.unlockWrite(stamp);
161+
}
162+
163+
private boolean hasActiveReaders() {
164+
return readersUnlocked.sum() != readersLocked.sum();
165+
}
166+
167+
private long writeLockInterruptibly() throws InterruptedException {
168+
long writeStamp;
169+
do {
170+
if (Thread.interrupted()) {
171+
throw new InterruptedException();
172+
}
173+
writeStamp = stampedLock.tryWriteLock(tryWriteLockMillis, TimeUnit.MILLISECONDS);
174+
} while (writeStamp == 0);
175+
return writeStamp;
176+
}
177+
178+
private void spinWait() throws InterruptedException {
179+
Thread.onSpinWait();
180+
if (Thread.interrupted()) {
181+
throw new InterruptedException();
182+
}
183+
}
184+
185+
private void spinWaitAtReadLock() throws InterruptedException {
186+
Thread.onSpinWait();
187+
if (Thread.interrupted()) {
188+
throw new InterruptedException();
189+
}
190+
}
191+
192+
private void yieldWait() throws InterruptedException {
193+
Thread.yield();
194+
if (Thread.interrupted()) {
195+
throw new InterruptedException();
196+
}
197+
}
198+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2025 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.common.concurrent.locks;
12+
13+
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
import static org.junit.jupiter.api.Assertions.assertFalse;
15+
import static org.junit.jupiter.api.Assertions.assertTrue;
16+
17+
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.Executors;
20+
import java.util.concurrent.Future;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
class StampedLongAdderLockManagerTest {
27+
28+
@Test
29+
void writeLockWaitsForReaders() throws Exception {
30+
StampedLongAdderLockManager manager = new StampedLongAdderLockManager();
31+
long readStamp = manager.readLock();
32+
assertTrue(manager.isReaderActive());
33+
34+
ExecutorService executor = Executors.newSingleThreadExecutor();
35+
try {
36+
CountDownLatch attemptingWrite = new CountDownLatch(1);
37+
AtomicBoolean acquiredWrite = new AtomicBoolean(false);
38+
39+
Future<Long> writeFuture = executor.submit(() -> {
40+
attemptingWrite.countDown();
41+
long stamp = manager.writeLock();
42+
acquiredWrite.set(true);
43+
return stamp;
44+
});
45+
46+
assertTrue(attemptingWrite.await(500, TimeUnit.MILLISECONDS), "write attempt did not start in time");
47+
TimeUnit.MILLISECONDS.sleep(100);
48+
assertFalse(acquiredWrite.get(), "write lock acquired while read lock active");
49+
50+
manager.unlockRead(readStamp);
51+
long writeStamp = writeFuture.get(2, TimeUnit.SECONDS);
52+
assertTrue(acquiredWrite.get());
53+
assertTrue(manager.isWriterActive());
54+
manager.unlockWrite(writeStamp);
55+
assertFalse(manager.isWriterActive());
56+
} finally {
57+
executor.shutdownNow();
58+
executor.awaitTermination(1, TimeUnit.SECONDS);
59+
}
60+
}
61+
62+
@Test
63+
void readLockWaitsForWriters() throws Exception {
64+
StampedLongAdderLockManager manager = new StampedLongAdderLockManager();
65+
long writeStamp = manager.writeLock();
66+
assertTrue(manager.isWriterActive());
67+
68+
ExecutorService executor = Executors.newSingleThreadExecutor();
69+
try {
70+
CountDownLatch attemptingRead = new CountDownLatch(1);
71+
AtomicBoolean acquiredRead = new AtomicBoolean(false);
72+
73+
Future<Long> readFuture = executor.submit(() -> {
74+
attemptingRead.countDown();
75+
long stamp = manager.readLock();
76+
acquiredRead.set(true);
77+
return stamp;
78+
});
79+
80+
assertTrue(attemptingRead.await(500, TimeUnit.MILLISECONDS), "read attempt did not start in time");
81+
TimeUnit.MILLISECONDS.sleep(100);
82+
assertFalse(acquiredRead.get(), "read lock acquired while write lock active");
83+
84+
manager.unlockWrite(writeStamp);
85+
long readStamp = readFuture.get(2, TimeUnit.SECONDS);
86+
assertTrue(acquiredRead.get());
87+
assertEquals(StampedLongAdderLockManager.READ_LOCK_STAMP, readStamp);
88+
assertTrue(manager.isReaderActive());
89+
manager.unlockRead(readStamp);
90+
assertFalse(manager.isReaderActive());
91+
} finally {
92+
executor.shutdownNow();
93+
executor.awaitTermination(1, TimeUnit.SECONDS);
94+
}
95+
}
96+
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbContextIdIterator.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import java.io.Closeable;
2424
import java.io.IOException;
2525
import java.nio.ByteBuffer;
26-
import java.util.concurrent.locks.StampedLock;
2726

27+
import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager;
2828
import org.eclipse.rdf4j.sail.SailException;
2929
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
3030
import org.lwjgl.PointerBuffer;
@@ -61,7 +61,7 @@ class LmdbContextIdIterator implements Closeable {
6161

6262
private boolean fetchNext = false;
6363

64-
private final StampedLock txnLock;
64+
private final StampedLongAdderLockManager txnLockManager;
6565

6666
private final Thread ownerThread = Thread.currentThread();
6767

@@ -72,9 +72,14 @@ class LmdbContextIdIterator implements Closeable {
7272

7373
this.dbi = dbi;
7474
this.txnRef = txnRef;
75-
this.txnLock = txnRef.lock();
75+
this.txnLockManager = txnRef.lockManager();
7676

77-
long stamp = txnLock.readLock();
77+
long readStamp;
78+
try {
79+
readStamp = txnLockManager.readLock();
80+
} catch (InterruptedException e) {
81+
throw new SailException(e);
82+
}
7883
try {
7984
this.txnRefVersion = txnRef.version();
8085
this.txn = txnRef.get();
@@ -85,12 +90,17 @@ class LmdbContextIdIterator implements Closeable {
8590
cursor = pp.get(0);
8691
}
8792
} finally {
88-
txnLock.unlockRead(stamp);
93+
txnLockManager.unlockRead(readStamp);
8994
}
9095
}
9196

9297
public long[] next() {
93-
long stamp = txnLock.readLock();
98+
long readStamp;
99+
try {
100+
readStamp = txnLockManager.readLock();
101+
} catch (InterruptedException e) {
102+
throw new SailException(e);
103+
}
94104
try {
95105
if (txnRefVersion != txnRef.version()) {
96106
// cursor must be renewed
@@ -143,17 +153,21 @@ public long[] next() {
143153
} catch (IOException e) {
144154
throw new SailException(e);
145155
} finally {
146-
txnLock.unlockRead(stamp);
156+
txnLockManager.unlockRead(readStamp);
147157
}
148158
}
149159

150160
private void closeInternal(boolean maybeCalledAsync) {
151161
if (!closed) {
152-
long stamp;
162+
long writeStamp = 0L;
163+
boolean writeLocked = false;
153164
if (maybeCalledAsync && ownerThread != Thread.currentThread()) {
154-
stamp = txnLock.writeLock();
155-
} else {
156-
stamp = 0;
165+
try {
166+
writeStamp = txnLockManager.writeLock();
167+
writeLocked = true;
168+
} catch (InterruptedException e) {
169+
throw new SailException(e);
170+
}
157171
}
158172
try {
159173
if (!closed) {
@@ -166,8 +180,8 @@ private void closeInternal(boolean maybeCalledAsync) {
166180
}
167181
} finally {
168182
closed = true;
169-
if (stamp != 0) {
170-
txnLock.unlockWrite(stamp);
183+
if (writeLocked) {
184+
txnLockManager.unlockWrite(writeStamp);
171185
}
172186
}
173187
}

0 commit comments

Comments
 (0)