diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 9babc8b88..addf3479d 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -17,36 +17,6 @@ */ package net.spy.memcached; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.URL; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; -import java.util.jar.JarFile; -import java.util.jar.Manifest; - import net.spy.memcached.collection.Attributes; import net.spy.memcached.collection.BKeyObject; import net.spy.memcached.collection.BTreeCount; @@ -119,15 +89,15 @@ import net.spy.memcached.compat.log.Logger; import net.spy.memcached.compat.log.LoggerFactory; import net.spy.memcached.internal.BTreeStoreAndGetFuture; +import net.spy.memcached.internal.BroadcastFuture; import net.spy.memcached.internal.BulkOperationFuture; import net.spy.memcached.internal.CheckedOperationTimeoutException; import net.spy.memcached.internal.CollectionFuture; import net.spy.memcached.internal.CollectionGetBulkFuture; +import net.spy.memcached.internal.CollectionGetFuture; import net.spy.memcached.internal.OperationFuture; -import net.spy.memcached.internal.SMGetFuture; import net.spy.memcached.internal.PipedCollectionFuture; -import net.spy.memcached.internal.CollectionGetFuture; -import net.spy.memcached.internal.BroadcastFuture; +import net.spy.memcached.internal.SMGetFuture; import net.spy.memcached.ops.BTreeFindPositionOperation; import net.spy.memcached.ops.BTreeFindPositionWithGetOperation; import net.spy.memcached.ops.BTreeGetBulkOperation; @@ -153,6 +123,36 @@ import net.spy.memcached.transcoders.Transcoder; import net.spy.memcached.util.BTreeUtil; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.jar.JarFile; +import java.util.jar.Manifest; + /** * Client to a Arcus. * @@ -3498,99 +3498,98 @@ public CollectionFuture asyncBopGetItemCount(String key, } @Override - public CollectionFuture> asyncSopPipedExistBulk(String key, - List values) { - SetPipedExist exist = new SetPipedExist(key, values, collectionTranscoder); - return asyncSetPipedExist(key, exist); + public CollectionFuture> asyncSopPipedExistBulk(String key, List values) { + return asyncSopPipedExistBulk(key, values, collectionTranscoder); } @Override public CollectionFuture> asyncSopPipedExistBulk(String key, List values, Transcoder tc) { - SetPipedExist exist = new SetPipedExist(key, values, tc); - return asyncSetPipedExist(key, exist); + if (values.size() == 0) { + throw new IllegalArgumentException( + "The number of piped operations must be larger than 0."); + } + + List> existList = new ArrayList>(); + if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) { + existList.add(new SetPipedExist(key, values, tc)); + } else { + PartitionedList partitionedList = new PartitionedList(values, SetPipedExist.MAX_PIPED_ITEM_COUNT); + for (List partition : partitionedList) { + existList.add(new SetPipedExist(key, partition, tc)); + } + } + return asyncSetPipedExist(key, existList); } /** * Generic pipelined existence operation for set items. Public methods call this method. * * @param key collection item's key - * @param exist operation parameters (element values) + * @param existList list of operation parameters (element values) * @return future holding the map of elements and their existence results */ CollectionFuture> asyncSetPipedExist( - final String key, final SetPipedExist exist) { - - if (exist.getItemCount() == 0) { - throw new IllegalArgumentException( - "The number of piped operations must be larger than 0."); - } - if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { - throw new IllegalArgumentException( - "The number of piped operations must not exceed a maximum of " - + CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + "."); - } - - final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture> rv = new CollectionFuture>( - latch, operationTimeout); - - Operation op = opFact.collectionPipedExist(key, exist, - new CollectionPipedExistOperation.Callback() { - - private final Map result = new HashMap(); - private boolean hasAnError = false; - - public void receivedStatus(OperationStatus status) { - if (hasAnError) { - return; - } - - CollectionOperationStatus cstatus; - if (status instanceof CollectionOperationStatus) { - cstatus = (CollectionOperationStatus) status; - } else { + final String key, final List> existList) { + final CountDownLatch latch = new CountDownLatch(existList.size()); + + final PipedCollectionFuture rv + = new PipedCollectionFuture(latch, operationTimeout); + + for (final SetPipedExist exist : existList) { + Operation op = opFact.collectionPipedExist(key, exist, new CollectionPipedExistOperation.Callback() { + private CollectionOperationStatus failedStatus = null; + private boolean isSameStatus = true; + public void gotStatus(Integer index, OperationStatus status) { + CollectionOperationStatus cstatus; + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); + } + switch (cstatus.getResponse()) { + case EXIST: + case NOT_EXIST: + rv.addEachResult(exist.getValues().get(index), (CollectionResponse.EXIST.equals(cstatus.getResponse()))); + break; + case UNREADABLE: + case TYPE_MISMATCH: + case NOT_FOUND: + if (failedStatus == null) { + failedStatus = cstatus; + } else if (!failedStatus.equals(cstatus)) { + isSameStatus = false; + } + break; + default: getLogger().warn("Unhandled state: " + status); - cstatus = new CollectionOperationStatus(status); - } - rv.set(result, cstatus); } + } - public void complete() { - latch.countDown(); + public void receivedStatus(OperationStatus status) { + CollectionOperationStatus cstatus; + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); } - - public void gotStatus(Integer index, OperationStatus status) { - CollectionOperationStatus cstatus; - if (status instanceof CollectionOperationStatus) { - cstatus = (CollectionOperationStatus) status; - } else { - cstatus = new CollectionOperationStatus(status); - } - - switch (cstatus.getResponse()) { - case EXIST: - case NOT_EXIST: - result.put(exist.getValues().get(index), - (CollectionResponse.EXIST.equals(cstatus - .getResponse()))); - break; - case UNREADABLE: - case TYPE_MISMATCH: - case NOT_FOUND: - hasAnError = true; - rv.set(new HashMap(0), - (CollectionOperationStatus) status); - break; - default: - getLogger().warn("Unhandled state: " + status); - } + if (failedStatus != null && isSameStatus) { + rv.addOperationStatus(failedStatus); + } else { + rv.addOperationStatus(cstatus); } - }); + } - rv.setOperation(op); - addOp(key, op); + public void complete() { + latch.countDown(); + } + }); + rv.addOperation(op); + addOp(key, op); + } return rv; } diff --git a/src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java b/src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java index 201847137..3e0942432 100644 --- a/src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java +++ b/src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java @@ -16,18 +16,17 @@ */ package net.spy.memcached.collection.set; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.collection.ElementValueType; import net.spy.memcached.internal.CollectionFuture; - import org.junit.Assert; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + public class SopPipedExistTest extends BaseIntegrationTest { private final String KEY = this.getClass().getSimpleName(); @@ -166,6 +165,36 @@ public void testMaxPipedExist() { Assert.fail(e.getMessage()); } } + public void testMaxOverPipedExist() { + int OVER_COUNT = 1000; + + try { + List findValues = new ArrayList(); + + // insert items + for (int i = 0; i < OVER_COUNT; i++) { + findValues.add("VALUE" + i); + + Assert.assertTrue(mc.asyncSopInsert(KEY, "VALUE" + i, new CollectionAttributes()).get()); + } + + // exist bulk + CollectionFuture> future = mc + .asyncSopPipedExistBulk(KEY, findValues); + + Map map = future.get(); + + Assert.assertTrue(future.getOperationStatus().isSuccess()); + + for (int i = 0; i < OVER_COUNT; i++) { + Assert.assertTrue(map.get("VALUE" + i)); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } public void testPipedExistNotExistsKey() { try {