From 59d00e9fb0a9ec7536d6f48c39807eeef9577337 Mon Sep 17 00:00:00 2001 From: the123saurav Date: Sat, 18 Dec 2021 23:05:15 +0530 Subject: [PATCH] Initial commit for RW lock --- .../impl/key/manager/ExpirationManager.java | 7 +- .../manager/TransactionContext.java | 7 +- .../core/command/mapping/CommandMapper.java | 37 +++-- .../java/dev/keva/store/KevaDatabase.java | 3 +- .../keva/store/impl/OffHeapDatabaseImpl.java | 144 +++++++++--------- .../keva/store/impl/OnHeapDatabaseImpl.java | 138 ++++++++--------- .../java/dev/keva/store/lock/SpinLock.java | 24 ++- 7 files changed, 198 insertions(+), 162 deletions(-) diff --git a/core/src/main/java/dev/keva/core/command/impl/key/manager/ExpirationManager.java b/core/src/main/java/dev/keva/core/command/impl/key/manager/ExpirationManager.java index f15e9137..c21b782f 100644 --- a/core/src/main/java/dev/keva/core/command/impl/key/manager/ExpirationManager.java +++ b/core/src/main/java/dev/keva/core/command/impl/key/manager/ExpirationManager.java @@ -8,6 +8,7 @@ import dev.keva.ioc.annotation.Component; import dev.keva.protocol.resp.Command; import dev.keva.store.KevaDatabase; +import dev.keva.store.lock.SpinLock; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -71,14 +72,14 @@ public void executeExpire(byte[] key) { data[0] = "delete".getBytes(); data[1] = key; Command command = Command.newInstance(data, false); - Lock lock = database.getLock(); - lock.lock(); + SpinLock lock = database.getLock(); + lock.exclusiveLock(); try { aof.write(command); database.remove(key); clearExpiration(key); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } else { database.remove(key); diff --git a/core/src/main/java/dev/keva/core/command/impl/transaction/manager/TransactionContext.java b/core/src/main/java/dev/keva/core/command/impl/transaction/manager/TransactionContext.java index 2f5f5bdd..dbda205a 100644 --- a/core/src/main/java/dev/keva/core/command/impl/transaction/manager/TransactionContext.java +++ b/core/src/main/java/dev/keva/core/command/impl/transaction/manager/TransactionContext.java @@ -2,6 +2,7 @@ import dev.keva.core.command.mapping.CommandMapper; import dev.keva.protocol.resp.Command; +import dev.keva.store.lock.SpinLock; import dev.keva.util.hashbytes.BytesKey; import dev.keva.util.hashbytes.BytesValue; import dev.keva.protocol.resp.reply.MultiBulkReply; @@ -42,8 +43,8 @@ public void discard() { isQueuing = false; } - public Reply exec(ChannelHandlerContext ctx, Lock txLock) throws InterruptedException { - txLock.lock(); + public Reply exec(ChannelHandlerContext ctx, SpinLock txLock) throws InterruptedException { + txLock.exclusiveLock(); try { for (val watch : watchMap.entrySet()) { val key = watch.getKey(); @@ -74,7 +75,7 @@ public Reply exec(ChannelHandlerContext ctx, Lock txLock) throws InterruptedE return new MultiBulkReply(replies); } finally { - txLock.unlock(); + txLock.exclusiveUnlock(); } } } diff --git a/core/src/main/java/dev/keva/core/command/mapping/CommandMapper.java b/core/src/main/java/dev/keva/core/command/mapping/CommandMapper.java index c4d22910..457f6115 100644 --- a/core/src/main/java/dev/keva/core/command/mapping/CommandMapper.java +++ b/core/src/main/java/dev/keva/core/command/mapping/CommandMapper.java @@ -30,6 +30,10 @@ @Component @Slf4j public class CommandMapper { + + private static final Set EXCLUSIVE_COMMANDS = new HashSet<>(Arrays.asList( + "exec", "expire", "expireat", "restore", "flushdb")); + @Getter private final Map methods = new HashMap<>(); @@ -98,17 +102,26 @@ public void init() { try { val lock = database.getLock(); - lock.lock(); + boolean locked = false, exclusive = false, writeToAOF = isAoF && isMutate; try { - if (ctx != null && isAoF && isMutate) { - try { - aof.write(command); - } catch (Exception e) { - log.error("Error writing to AOF", e); - } - } Object[] objects = new Object[types.length]; command.toArguments(objects, types, ctx); + if (ctx != null) { + locked = true; + if (isMutate || EXCLUSIVE_COMMANDS.contains(name)) { + lock.exclusiveLock(); + exclusive = true; + if (writeToAOF) { + try { + aof.write(command); + } catch (Exception e) { + log.error("Error writing to AOF", e); + } + } + } else { + lock.sharedLock(); + } + } // If not in AOF mode, then recycle(), // else, the command will be recycled in the AOF dump if (!kevaConfig.getAof()) { @@ -116,7 +129,13 @@ public void init() { } return (Reply) method.invoke(instance, objects); } finally { - lock.unlock(); + if (locked) { + if (exclusive) { + lock.exclusiveUnlock(); + } else { + lock.sharedUnlock(); + } + } } } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/store/src/main/java/dev/keva/store/KevaDatabase.java b/store/src/main/java/dev/keva/store/KevaDatabase.java index d18bb0bd..5d0e01f5 100644 --- a/store/src/main/java/dev/keva/store/KevaDatabase.java +++ b/store/src/main/java/dev/keva/store/KevaDatabase.java @@ -1,12 +1,13 @@ package dev.keva.store; +import dev.keva.store.lock.SpinLock; import dev.keva.util.hashbytes.BytesKey; import java.util.AbstractMap; import java.util.concurrent.locks.Lock; public interface KevaDatabase { - Lock getLock(); + SpinLock getLock(); void flush(); diff --git a/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java b/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java index 7412ffc8..4f7bf089 100644 --- a/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java +++ b/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static dev.keva.util.Constants.FLAG_CH; import static dev.keva.util.Constants.FLAG_GT; @@ -37,7 +39,7 @@ @Slf4j public class OffHeapDatabaseImpl implements KevaDatabase { @Getter - private final Lock lock = new SpinLock(); + private final SpinLock lock = new SpinLock(); private ChronicleMap chronicleMap; @@ -65,47 +67,47 @@ public OffHeapDatabaseImpl(DatabaseConfig config) { @Override public void flush() { - lock.lock(); + lock.exclusiveLock(); try { chronicleMap.clear(); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override public byte[] get(byte[] key) { - lock.lock(); + lock.sharedLock(); try { return chronicleMap.get(key); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override public void put(byte[] key, byte[] val) { - lock.lock(); + lock.exclusiveLock(); try { chronicleMap.put(key, val); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override public boolean remove(byte[] key) { - lock.lock(); + lock.exclusiveLock(); try { return chronicleMap.remove(key) != null; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override public byte[] incrBy(byte[] key, long amount) { - lock.lock(); + lock.exclusiveLock(); try { return chronicleMap.compute(key, (k, oldVal) -> { long curVal = 0L; @@ -116,14 +118,14 @@ public byte[] incrBy(byte[] key, long amount) { return Long.toString(curVal).getBytes(StandardCharsets.UTF_8); }); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[] hget(byte[] key, byte[] field) { - lock.lock(); + lock.sharedLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -133,14 +135,14 @@ public byte[] hget(byte[] key, byte[] field) { BytesValue got = map.get(new BytesKey(field)); return got == null ? null : got.getBytes(); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] hgetAll(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -155,14 +157,14 @@ public byte[][] hgetAll(byte[] key) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] hkeys(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -176,14 +178,14 @@ public byte[][] hkeys(byte[] key) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] hvals(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -197,14 +199,14 @@ public byte[][] hvals(byte[] key) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public void hset(byte[] key, byte[] field, byte[] value) { - lock.lock(); + lock.exclusiveLock(); try { chronicleMap.compute(key, (k, oldVal) -> { HashMap map; @@ -217,14 +219,14 @@ public void hset(byte[] key, byte[] field, byte[] value) { return SerializationUtils.serialize(map); }); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public boolean hdel(byte[] key, byte[] field) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -237,14 +239,14 @@ public boolean hdel(byte[] key, byte[] field) { } return result; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int lpush(byte[] key, byte[]... values) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = chronicleMap.get(key); LinkedList list; @@ -255,14 +257,14 @@ public int lpush(byte[] key, byte[]... values) { chronicleMap.put(key, SerializationUtils.serialize(list)); return list.size(); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int rpush(byte[] key, byte[]... values) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = chronicleMap.get(key); LinkedList list; @@ -273,14 +275,14 @@ public int rpush(byte[] key, byte[]... values) { chronicleMap.put(key, SerializationUtils.serialize(list)); return list.size(); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[] lpop(byte[] key) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -294,14 +296,14 @@ public byte[] lpop(byte[] key) { chronicleMap.put(key, SerializationUtils.serialize(list)); return result; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[] rpop(byte[] key) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -315,14 +317,14 @@ public byte[] rpop(byte[] key) { chronicleMap.put(key, SerializationUtils.serialize(list)); return result; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int llen(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -331,14 +333,14 @@ public int llen(byte[] key) { LinkedList list = (LinkedList) SerializationUtils.deserialize(value); return list.size(); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] lrange(byte[] key, int start, int end) { - lock.lock(); + lock.sharedLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -372,14 +374,14 @@ public byte[][] lrange(byte[] key, int start, int end) { } return result.toArray(new byte[0][0]); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[] lindex(byte[] key, int index) { - lock.lock(); + lock.sharedLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -394,14 +396,14 @@ public byte[] lindex(byte[] key, int index) { } return list.get(index).getBytes(); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public void lset(byte[] key, int index, byte[] value) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value1 = chronicleMap.get(key); if (value1 == null) { @@ -417,14 +419,14 @@ public void lset(byte[] key, int index, byte[] value) { list.set(index, new BytesValue(value)); chronicleMap.put(key, SerializationUtils.serialize(list)); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int lrem(byte[] key, int count, byte[] value) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value1 = chronicleMap.get(key); if (value1 == null) { @@ -467,14 +469,14 @@ public int lrem(byte[] key, int count, byte[] value) { chronicleMap.put(key, SerializationUtils.serialize(list)); return result; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int sadd(byte[] key, byte[]... values) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = chronicleMap.get(key); HashSet set; @@ -489,14 +491,14 @@ public int sadd(byte[] key, byte[]... values) { chronicleMap.put(key, SerializationUtils.serialize(set)); return count; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] smembers(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -510,14 +512,14 @@ public byte[][] smembers(byte[] key) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public boolean sismember(byte[] key, byte[] value) { - lock.lock(); + lock.sharedLock(); try { byte[] got = chronicleMap.get(key); if (got == null) { @@ -526,14 +528,14 @@ public boolean sismember(byte[] key, byte[] value) { HashSet set = (HashSet) SerializationUtils.deserialize(got); return set.contains(new BytesKey(value)); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public int scard(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -542,14 +544,14 @@ public int scard(byte[] key) { HashSet set = (HashSet) SerializationUtils.deserialize(value); return set.size(); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] sdiff(byte[]... keys) { - lock.lock(); + lock.sharedLock(); try { HashSet set = new HashSet<>(); for (byte[] key : keys) { @@ -568,14 +570,14 @@ public byte[][] sdiff(byte[]... keys) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] sinter(byte[]... keys) { - lock.lock(); + lock.sharedLock(); try { HashSet set = new HashSet<>(); for (byte[] key : keys) { @@ -594,14 +596,14 @@ public byte[][] sinter(byte[]... keys) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] sunion(byte[]... keys) { - lock.lock(); + lock.sharedLock(); try { HashSet set = new HashSet<>(); for (byte[] key : keys) { @@ -618,14 +620,14 @@ public byte[][] sunion(byte[]... keys) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public int smove(byte[] source, byte[] destination, byte[] value) { - lock.lock(); + lock.exclusiveLock(); try { byte[] sourceValue = chronicleMap.get(source); if (sourceValue == null) { @@ -647,14 +649,14 @@ public int smove(byte[] source, byte[] destination, byte[] value) { } return 0; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int srem(byte[] key, byte[]... values) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -674,13 +676,13 @@ public int srem(byte[] key, byte[]... values) { } return result; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override public int strlen(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = chronicleMap.get(key); if (value == null) { @@ -688,13 +690,13 @@ public int strlen(byte[] key) { } return new String(value, StandardCharsets.UTF_8).length(); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override public int setrange(byte[] key, byte[] offset, byte[] val) { - lock.lock(); + lock.exclusiveLock(); try { var offsetPosition = Integer.parseInt(new String(offset, StandardCharsets.UTF_8)); byte[] oldVal = chronicleMap.get(key); @@ -712,13 +714,13 @@ public int setrange(byte[] key, byte[] offset, byte[] val) { chronicleMap.put(key, newVal); return newValLength; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override public byte[][] mget(byte[]... keys) { - lock.lock(); + lock.sharedLock(); try { byte[][] result = new byte[keys.length][]; for (int i = 0; i < keys.length; i++) { @@ -728,7 +730,7 @@ public byte[][] mget(byte[]... keys) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @@ -743,7 +745,7 @@ public int zadd(final byte[] key, @NonNull final AbstractMap.SimpleEntry map = new HashMap<>(100); @@ -37,39 +37,39 @@ public void flush() { @Override public void put(byte[] key, byte[] val) { - lock.lock(); + lock.exclusiveLock(); try { map.put(new BytesKey(key), new BytesValue(val)); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override public byte[] get(byte[] key) { - lock.lock(); + lock.sharedLock(); try { BytesValue got = map.get(new BytesKey(key)); return got != null ? got.getBytes() : null; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override public boolean remove(byte[] key) { - lock.lock(); + lock.exclusiveLock(); try { BytesValue removed = map.remove(new BytesKey(key)); return removed != null; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override public byte[] incrBy(byte[] key, long amount) { - lock.lock(); + lock.exclusiveLock(); try { return map.compute(new BytesKey(key), (k, oldVal) -> { long curVal = 0L; @@ -80,14 +80,14 @@ public byte[] incrBy(byte[] key, long amount) { return new BytesValue(Long.toString(curVal).getBytes(StandardCharsets.UTF_8)); }).getBytes(); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[] hget(byte[] key, byte[] field) { - lock.lock(); + lock.sharedLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); if (value == null) { @@ -97,14 +97,14 @@ public byte[] hget(byte[] key, byte[] field) { BytesValue got = map.get(new BytesKey(field)); return got != null ? got.getBytes() : null; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] hgetAll(byte[] key) { - lock.lock(); + lock.sharedLock(); try { BytesValue value = map.get(new BytesKey(key)); if (value == null) { @@ -119,14 +119,14 @@ public byte[][] hgetAll(byte[] key) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] hkeys(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); if (value == null) { @@ -140,14 +140,14 @@ public byte[][] hkeys(byte[] key) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] hvals(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); if (value == null) { @@ -161,14 +161,14 @@ public byte[][] hvals(byte[] key) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public void hset(byte[] key, byte[] field, byte[] value) { - lock.lock(); + lock.exclusiveLock(); try { map.compute(new BytesKey(key), (k, oldVal) -> { HashMap map; @@ -181,14 +181,14 @@ public void hset(byte[] key, byte[] field, byte[] value) { return new BytesValue(SerializationUtils.serialize(map)); }); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public boolean hdel(byte[] key, byte[] field) { - lock.lock(); + lock.exclusiveLock(); try { BytesValue value = map.get(new BytesKey(key)); if (value == null) { @@ -201,14 +201,14 @@ public boolean hdel(byte[] key, byte[] field) { } return removed; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int lpush(byte[] key, byte[]... values) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); LinkedList list; @@ -219,14 +219,14 @@ public int lpush(byte[] key, byte[]... values) { map.put(new BytesKey(key), new BytesValue(SerializationUtils.serialize(list))); return list.size(); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int rpush(byte[] key, byte[]... values) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); LinkedList list; @@ -237,14 +237,14 @@ public int rpush(byte[] key, byte[]... values) { map.put(new BytesKey(key), new BytesValue(SerializationUtils.serialize(list))); return list.size(); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[] lpop(byte[] key) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); LinkedList list; @@ -256,14 +256,14 @@ public byte[] lpop(byte[] key) { map.put(new BytesKey(key), new BytesValue(SerializationUtils.serialize(list))); return v.getBytes(); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[] rpop(byte[] key) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); LinkedList list; @@ -275,28 +275,28 @@ public byte[] rpop(byte[] key) { map.put(new BytesKey(key), new BytesValue(SerializationUtils.serialize(list))); return v.getBytes(); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int llen(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); LinkedList list; list = value == null ? new LinkedList<>() : (LinkedList) SerializationUtils.deserialize(value); return list.size(); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] lrange(byte[] key, int start, int end) { - lock.lock(); + lock.sharedLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); if (value == null) { @@ -330,14 +330,14 @@ public byte[][] lrange(byte[] key, int start, int end) { } return result.toArray(new byte[0][0]); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[] lindex(byte[] key, int index) { - lock.lock(); + lock.sharedLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); LinkedList list; @@ -350,14 +350,14 @@ public byte[] lindex(byte[] key, int index) { } return list.get(index).getBytes(); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public void lset(byte[] key, int index, byte[] value) { - lock.lock(); + lock.exclusiveLock(); try { byte[] v = map.get(new BytesKey(key)).getBytes(); LinkedList list; @@ -371,14 +371,14 @@ public void lset(byte[] key, int index, byte[] value) { list.set(index, new BytesValue(value)); map.put(new BytesKey(key), new BytesValue(SerializationUtils.serialize(list))); } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int lrem(byte[] key, int count, byte[] value) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value1 = map.get(new BytesKey(key)).getBytes(); if (value1 == null) { @@ -421,14 +421,14 @@ public int lrem(byte[] key, int count, byte[] value) { map.put(new BytesKey(key), new BytesValue(SerializationUtils.serialize(list))); return result; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int sadd(byte[] key, byte[]... values) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); HashSet set; @@ -443,14 +443,14 @@ public int sadd(byte[] key, byte[]... values) { map.put(new BytesKey(key), new BytesValue(SerializationUtils.serialize(set))); return count; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] smembers(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); if (value == null) { @@ -464,14 +464,14 @@ public byte[][] smembers(byte[] key) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public boolean sismember(byte[] key, byte[] value) { - lock.lock(); + lock.sharedLock(); try { byte[] got = map.get(new BytesKey(key)).getBytes(); if (got == null) { @@ -480,14 +480,14 @@ public boolean sismember(byte[] key, byte[] value) { HashSet set = (HashSet) SerializationUtils.deserialize(got); return set.contains(new BytesKey(value)); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public int scard(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); if (value == null) { @@ -496,14 +496,14 @@ public int scard(byte[] key) { HashSet set = (HashSet) SerializationUtils.deserialize(value); return set.size(); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] sdiff(byte[]... keys) { - lock.lock(); + lock.sharedLock(); try { HashSet set = new HashSet<>(); for (byte[] key : keys) { @@ -522,14 +522,14 @@ public byte[][] sdiff(byte[]... keys) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] sinter(byte[]... keys) { - lock.lock(); + lock.sharedLock(); try { HashSet set = new HashSet<>(); for (byte[] key : keys) { @@ -548,14 +548,14 @@ public byte[][] sinter(byte[]... keys) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public byte[][] sunion(byte[]... keys) { - lock.lock(); + lock.sharedLock(); try { HashSet set = new HashSet<>(); for (byte[] key : keys) { @@ -572,14 +572,14 @@ public byte[][] sunion(byte[]... keys) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override @SuppressWarnings("unchecked") public int smove(byte[] source, byte[] destination, byte[] value) { - lock.lock(); + lock.exclusiveLock(); try { byte[] sourceValue = map.get(new BytesKey(source)).getBytes(); if (sourceValue == null) { @@ -601,14 +601,14 @@ public int smove(byte[] source, byte[] destination, byte[] value) { } return 0; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override @SuppressWarnings("unchecked") public int srem(byte[] key, byte[]... values) { - lock.lock(); + lock.exclusiveLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); if (value == null) { @@ -628,13 +628,13 @@ public int srem(byte[] key, byte[]... values) { } return count; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override public int strlen(byte[] key) { - lock.lock(); + lock.sharedLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); if (value == null) { @@ -642,13 +642,13 @@ public int strlen(byte[] key) { } return new String(value, StandardCharsets.UTF_8).length(); } finally { - lock.unlock(); + lock.sharedUnlock(); } } @Override public int setrange(byte[] key, byte[] offset, byte[] val) { - lock.lock(); + lock.exclusiveLock(); try { var offsetPosition = Integer.parseInt(new String(offset, StandardCharsets.UTF_8)); byte[] oldVal = map.get(new BytesKey(key)).getBytes(); @@ -666,13 +666,13 @@ public int setrange(byte[] key, byte[] offset, byte[] val) { map.put(new BytesKey(key), new BytesValue(newVal)); return newValLength; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override public byte[][] mget(byte[]... keys) { - lock.lock(); + lock.sharedLock(); try { byte[][] result = new byte[keys.length][]; for (int i = 0; i < keys.length; i++) { @@ -682,7 +682,7 @@ public byte[][] mget(byte[]... keys) { } return result; } finally { - lock.unlock(); + lock.sharedUnlock(); } } @@ -697,7 +697,7 @@ public int zadd(byte[] key, AbstractMap.SimpleEntry[] members, // Track both to eliminate conditional branch int added = 0, changed = 0; - lock.lock(); + lock.exclusiveLock(); try { final BytesKey mapKey = new BytesKey(key); byte[] value = map.get(mapKey).getBytes(); @@ -728,13 +728,13 @@ public int zadd(byte[] key, AbstractMap.SimpleEntry[] members, map.put(mapKey, new BytesValue(SerializationUtils.serialize(zSet))); return ch ? changed : added; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override public Double zincrby(byte[] key, Double incr, BytesKey e, int flags) { - lock.lock(); + lock.exclusiveLock(); try { final BytesKey mapKey = new BytesKey(key); byte[] value = map.get(mapKey).getBytes(); @@ -765,14 +765,14 @@ public Double zincrby(byte[] key, Double incr, BytesKey e, int flags) { map.put(mapKey, new BytesValue(SerializationUtils.serialize(zSet))); return currentScore; } finally { - lock.unlock(); + lock.exclusiveUnlock(); } } @Override public Double zscore(byte[] key, byte[] member) { - lock.lock(); + lock.sharedLock(); try { byte[] value = map.get(new BytesKey(key)).getBytes(); if (value == null) { @@ -781,7 +781,7 @@ public Double zscore(byte[] key, byte[] member) { ZSet zset = (ZSet) SerializationUtils.deserialize(value); return zset.getScore(new BytesKey(member)); } finally { - lock.unlock(); + lock.sharedUnlock(); } } } diff --git a/store/src/main/java/dev/keva/store/lock/SpinLock.java b/store/src/main/java/dev/keva/store/lock/SpinLock.java index a3254b28..c8079669 100644 --- a/store/src/main/java/dev/keva/store/lock/SpinLock.java +++ b/store/src/main/java/dev/keva/store/lock/SpinLock.java @@ -1,18 +1,30 @@ package dev.keva.store.lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; -public class SpinLock extends ReentrantLock { +public class SpinLock extends ReentrantReadWriteLock { + private final ReentrantReadWriteLock.ReadLock readLock; + private final ReentrantReadWriteLock.WriteLock writeLock; public SpinLock() { super(true); + readLock = readLock(); + writeLock = writeLock(); } - public void lock() { - while (!tryLock()) { - } + public void sharedLock() { + while (!readLock.tryLock()); } - public void unlock() { - super.unlock(); + public void sharedUnlock() { + readLock.unlock(); + } + + public void exclusiveLock() { + while (!writeLock.tryLock()); + } + + public void exclusiveUnlock() { + writeLock.unlock(); } }