Skip to content

Commit

Permalink
ENHANCE: Change asyncSetPipedExist method logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Aug 21, 2023
1 parent 5dde87b commit 9209c10
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 80 deletions.
149 changes: 74 additions & 75 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@
import net.spy.memcached.compat.log.Logger;
import net.spy.memcached.compat.log.LoggerFactory;
import net.spy.memcached.internal.BTreeStoreAndGetFuture;
import net.spy.memcached.internal.BroadcastFuture;
import net.spy.memcached.internal.BulkOperationFuture;
import net.spy.memcached.internal.CheckedOperationTimeoutException;
import net.spy.memcached.internal.CollectionFuture;
import net.spy.memcached.internal.CollectionGetBulkFuture;
import net.spy.memcached.internal.CollectionGetFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedCollectionFuture;
import net.spy.memcached.internal.CollectionGetFuture;
import net.spy.memcached.internal.BroadcastFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.ops.BTreeFindPositionOperation;
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
import net.spy.memcached.ops.BTreeGetBulkOperation;
Expand Down Expand Up @@ -3498,99 +3498,98 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
}

@Override
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key,
List<Object> values) {
SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder);
return asyncSetPipedExist(key, exist);
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, List<Object> values) {
return asyncSopPipedExistBulk(key, values, collectionTranscoder);
}

@Override
public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key,
List<T> values,
Transcoder<T> tc) {
SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc);
return asyncSetPipedExist(key, exist);
if (values.size() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}

List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>();
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
existList.add(new SetPipedExist<T>(key, values, tc));
} else {
PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
for (List<T> partition : partitionedList) {
existList.add(new SetPipedExist<T>(key, partition, tc));
}
}
return asyncSetPipedExist(key, existList);
}

/**
* Generic pipelined existence operation for set items. Public methods call this method.
*
* @param key collection item's key
* @param exist operation parameters (element values)
* @param existList list of operation parameters (element values)
* @return future holding the map of elements and their existence results
*/
<T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
final String key, final SetPipedExist<T> exist) {

if (exist.getItemCount() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}
if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
throw new IllegalArgumentException(
"The number of piped operations must not exceed a maximum of "
+ CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + ".");
}

final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>(
latch, operationTimeout);

Operation op = opFact.collectionPipedExist(key, exist,
new CollectionPipedExistOperation.Callback() {

private final Map<T, Boolean> result = new HashMap<T, Boolean>();
private boolean hasAnError = false;

public void receivedStatus(OperationStatus status) {
if (hasAnError) {
return;
}

CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
final String key, final List<SetPipedExist<T>> existList) {
final CountDownLatch latch = new CountDownLatch(existList.size());

final PipedCollectionFuture<T, Boolean> rv
= new PipedCollectionFuture<T, Boolean>(latch, operationTimeout);

for (final SetPipedExist<T> exist : existList) {
Operation op = opFact.collectionPipedExist(key, exist, new CollectionPipedExistOperation.Callback() {
private CollectionOperationStatus failedStatus = null;
private boolean isSameStatus = true;
public void gotStatus(Integer index, OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
switch (cstatus.getResponse()) {
case EXIST:
case NOT_EXIST:
rv.addEachResult(exist.getValues().get(index), (CollectionResponse.EXIST.equals(cstatus.getResponse())));
break;
case UNREADABLE:
case TYPE_MISMATCH:
case NOT_FOUND:
if (failedStatus == null) {
failedStatus = cstatus;
} else if (!failedStatus.equals(cstatus)) {
isSameStatus = false;
}
break;
default:
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.set(result, cstatus);
}
}

public void complete() {
latch.countDown();
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}

public void gotStatus(Integer index, OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
cstatus = new CollectionOperationStatus(status);
}

switch (cstatus.getResponse()) {
case EXIST:
case NOT_EXIST:
result.put(exist.getValues().get(index),
(CollectionResponse.EXIST.equals(cstatus
.getResponse())));
break;
case UNREADABLE:
case TYPE_MISMATCH:
case NOT_FOUND:
hasAnError = true;
rv.set(new HashMap<T, Boolean>(0),
(CollectionOperationStatus) status);
break;
default:
getLogger().warn("Unhandled state: " + status);
}
if (failedStatus != null && isSameStatus) {
rv.addOperationStatus(failedStatus);
} else {
rv.addOperationStatus(cstatus);
}
});
}

rv.setOperation(op);
addOp(key, op);
public void complete() {
latch.countDown();
}
});
rv.addOperation(op);
addOp(key, op);
}
return rv;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@
*/
package net.spy.memcached.collection.set;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import net.spy.memcached.collection.BaseIntegrationTest;
import net.spy.memcached.collection.CollectionAttributes;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.collection.ElementValueType;
import net.spy.memcached.internal.CollectionFuture;

import org.junit.Assert;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class SopPipedExistTest extends BaseIntegrationTest {

private final String KEY = this.getClass().getSimpleName();
Expand Down Expand Up @@ -166,6 +165,36 @@ public void testMaxPipedExist() {
Assert.fail(e.getMessage());
}
}
public void testMaxOverPipedExist() {
int OVER_COUNT = 1000;

try {
List<Object> findValues = new ArrayList<Object>();

// insert items
for (int i = 0; i < OVER_COUNT; i++) {
findValues.add("VALUE" + i);

Assert.assertTrue(mc.asyncSopInsert(KEY, "VALUE" + i, new CollectionAttributes()).get());
}

// exist bulk
CollectionFuture<Map<Object, Boolean>> future = mc
.asyncSopPipedExistBulk(KEY, findValues);

Map<Object, Boolean> map = future.get();

Assert.assertTrue(future.getOperationStatus().isSuccess());

for (int i = 0; i < OVER_COUNT; i++) {
Assert.assertTrue(map.get("VALUE" + i));
}

} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}

public void testPipedExistNotExistsKey() {
try {
Expand Down

0 comments on commit 9209c10

Please sign in to comment.