Skip to content

Commit

Permalink
ENHANCE: Change asyncSetPipedExist method logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Mar 19, 2024
1 parent 1fcd2d7 commit 21d9645
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 72 deletions.
145 changes: 73 additions & 72 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3119,99 +3119,100 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
}

@Override
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key,
List<Object> values) {
SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder);
return asyncSetPipedExist(key, exist);
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, List<Object> values) {
return asyncSopPipedExistBulk(key, values, collectionTranscoder);
}

@Override
public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key,
List<T> values,
Transcoder<T> tc) {
SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc);
return asyncSetPipedExist(key, exist);
if (values.size() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}

List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>();
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
existList.add(new SetPipedExist<T>(key, values, tc));
} else {
PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
for (List<T> partition : partitionedList) {
existList.add(new SetPipedExist<T>(key, partition, tc));
}
}
return asyncSetPipedExist(key, existList);
}

/**
* Generic pipelined existence operation for set items. Public methods call this method.
*
* @param key collection item's key
* @param exist operation parameters (element values)
* @param existList list of operation parameters (element values)
* @return future holding the map of elements and their existence results
*/
<T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
final String key, final SetPipedExist<T> exist) {

if (exist.getItemCount() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}
if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
throw new IllegalArgumentException(
"The number of piped operations must not exceed a maximum of "
+ CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + ".");
}

final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>(
latch, operationTimeout);

Operation op = opFact.collectionPipedExist(key, exist,
new CollectionPipedExistOperation.Callback() {

private final Map<T, Boolean> result = new HashMap<T, Boolean>();
private boolean hasAnError = false;

public void receivedStatus(OperationStatus status) {
if (hasAnError) {
return;
}

CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
final String key, final List<SetPipedExist<T>> existList) {
final CountDownLatch latch = new CountDownLatch(existList.size());
final PipedCollectionFuture<T, Boolean> rv
= new PipedCollectionFuture<T, Boolean>(latch, operationTimeout);

for (final SetPipedExist<T> exist : existList) {
Operation op = opFact.collectionPipedExist(key, exist, new CollectionPipedExistOperation.Callback() {
private CollectionOperationStatus failedStatus = null;
private int failStatusCount = 0;
public void gotStatus(Integer index, OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
switch (cstatus.getResponse()) {
case EXIST:
rv.addEachResult(exist.getValues().get(index), true);
break;
case NOT_EXIST:
rv.addEachResult(exist.getValues().get(index), false);
break;
case UNREADABLE:
case TYPE_MISMATCH:
case NOT_FOUND:
if (failedStatus == null) {
failedStatus = cstatus;
failStatusCount++;
} else if (failedStatus.equals(cstatus)) {
failStatusCount++;
}
break;
default:
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.set(result, cstatus);
}
}

public void complete() {
latch.countDown();
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}

public void gotStatus(Integer index, OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
cstatus = new CollectionOperationStatus(status);
}

switch (cstatus.getResponse()) {
case EXIST:
case NOT_EXIST:
result.put(exist.getValues().get(index),
(CollectionResponse.EXIST.equals(cstatus
.getResponse())));
break;
case UNREADABLE:
case TYPE_MISMATCH:
case NOT_FOUND:
hasAnError = true;
rv.set(new HashMap<T, Boolean>(0),
(CollectionOperationStatus) status);
break;
default:
getLogger().warn("Unhandled state: " + status);
}
if (failedStatus != null && exist.getItemCount() == failStatusCount) {
rv.addOperationStatus(failedStatus);
} else {
rv.addOperationStatus(cstatus);
}
});
}

rv.setOperation(op);
addOp(key, op);
public void complete() {
latch.countDown();
}
});
rv.addOperation(op);
addOp(key, op);
}
return rv;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,37 @@ public void testMaxPipedExist() {
}
}

public void testMaxOverPipedExist() {
int OVER_COUNT = 1000;

try {
List<Object> findValues = new ArrayList<Object>();

// insert items
for (int i = 0; i < OVER_COUNT; i++) {
findValues.add("VALUE" + i);

Assert.assertTrue(mc.asyncSopInsert(KEY, "VALUE" + i, new CollectionAttributes()).get());
}

// exist bulk
CollectionFuture<Map<Object, Boolean>> future = mc
.asyncSopPipedExistBulk(KEY, findValues);

Map<Object, Boolean> map = future.get();

Assert.assertTrue(future.getOperationStatus().isSuccess());

for (int i = 0; i < OVER_COUNT; i++) {
Assert.assertTrue(map.get("VALUE" + i));
}

} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}

public void testPipedExistNotExistsKey() {
try {
List<Object> findValues = new ArrayList<Object>();
Expand Down

0 comments on commit 21d9645

Please sign in to comment.