diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java index 5bddf04ce..9f321039c 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java @@ -55,6 +55,7 @@ import net.spy.memcached.ops.CollectionCreateOperation; import net.spy.memcached.ops.CollectionGetOperation; import net.spy.memcached.ops.CollectionInsertOperation; +import net.spy.memcached.ops.ConcatenationType; import net.spy.memcached.ops.GetOperation; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; @@ -129,7 +130,55 @@ public void complete() { } }; Operation op = client.getOpFact() - .store(type, key, co.getFlags(), exp, co.getData(), cb); + .store(type, key, co.getFlags(), exp, co.getData(), cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + + public ArcusFuture append(String key, T val) { + return concat(ConcatenationType.append, key, val); + } + + public ArcusFuture prepend(String key, T val) { + return concat(ConcatenationType.prepend, key, val); + } + + private ArcusFuture concat(ConcatenationType catType, String key, T val) { + AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl future = new ArcusFutureImpl<>(result); + CachedData co = tc.encode(val); + ArcusClient client = arcusClientSupplier.get(); + + OperationCallback cb = new OperationCallback() { + @Override + public void receivedStatus(OperationStatus status) { + StatusCode code = status.getStatusCode(); + + switch (code) { + case SUCCESS: + result.set(true); + break; + case ERR_NOT_STORED: + result.set(false); + break; + case CANCELLED: + future.internalCancel(); + break; + default: + // TYPE_MISMATCH or unknown statement + result.addError(key, status); + break; + } + } + + @Override + public void complete() { + future.complete(); + } + }; + Operation op = client.getOpFact().cat(catType, 0L, key, co.getData(), cb); future.setOp(op); client.addOp(key, op); diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java index 44893a102..341535eb8 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java @@ -60,6 +60,24 @@ public interface AsyncArcusCommandsIF { */ ArcusFuture replace(String key, int exp, T value); + /** + * Append String or byte[] to an existing same type of value. + * + * @param key the key + * @param val the value to append + * @return {@code Boolean.True} if appended, otherwise {@code Boolean.False} + */ + ArcusFuture append(String key, T val); + + /** + * Prepend String or byte[] to an existing same type of value. + * + * @param key the key + * @param val the value to prepend + * @return {@code Boolean.True} if prepended, otherwise {@code Boolean.False} + */ + ArcusFuture prepend(String key, T val); + /** * Set values for multiple keys. * diff --git a/src/test/java/net/spy/memcached/v2/KVAsyncArcusCommandsTest.java b/src/test/java/net/spy/memcached/v2/KVAsyncArcusCommandsTest.java index 17bb70997..fefa0e6ed 100644 --- a/src/test/java/net/spy/memcached/v2/KVAsyncArcusCommandsTest.java +++ b/src/test/java/net/spy/memcached/v2/KVAsyncArcusCommandsTest.java @@ -3,6 +3,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -174,4 +175,74 @@ void cancelMultiGet() throws ExecutionException, InterruptedException, TimeoutEx } } } + + @Test + void appendString() throws ExecutionException, InterruptedException, TimeoutException { + // given + String expected = "Hello, Arcus!"; + String key = keys.get(0); + + async.set(key, 60, "Hello, ") + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.append(key, "Arcus!") + .thenCompose(result -> { + assertTrue(result); + return async.get(key); + }) + // then + .thenAccept(result -> assertEquals(expected, result)) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void prependString() throws ExecutionException, InterruptedException, TimeoutException { + // given + String expected = "Hello, World!"; + String key = keys.get(1); + + async.set(key, 60, "World!") + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.prepend(key, "Hello, ") + .thenCompose(result -> { + assertTrue(result); + return async.get(key); + }) + // then + .thenAccept(result -> assertEquals(expected, result)) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void appendNonStringException() throws ExecutionException, InterruptedException, + TimeoutException { + // given + String key = keys.get(0); + + async.set(key, 60, 123) + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + CompletableFuture future = async.append(key, "Arcus!") + .thenCompose(result -> { + assertTrue(result); + return async.get(key); + }) + .toCompletableFuture(); + // then + // AssertionError in the Transcoder causes the I/O thread to terminate abruptly. + // The future is never completed, leading to a TimeoutException in the main thread. + assertThrows(TimeoutException.class, () -> future.get(300L, TimeUnit.MILLISECONDS)); + } }