Skip to content

Commit

Permalink
ENHANCE: Change asyncSetPipedExist method logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Jul 24, 2023
1 parent 4bc1139 commit 0928e2f
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 78 deletions.
153 changes: 79 additions & 74 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3531,97 +3531,102 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
@Override
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key,
List<Object> values) {
SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder);
return asyncSetPipedExist(key, exist);
if (values.size() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}

List<SetPipedExist<Object>> existList = new ArrayList<SetPipedExist<Object>>();
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
existList.add(new SetPipedExist<Object>(key, values, collectionTranscoder));
} else {
PartitionedList<Object> partitionedList = new PartitionedList<Object>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
for (List<Object> partition : partitionedList) {
existList.add(new SetPipedExist<Object>(key, partition, collectionTranscoder));
}
}
return asyncSetPipedExist(key, existList);
}

@Override
public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key,
List<T> values,
Transcoder<T> tc) {
SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc);
return asyncSetPipedExist(key, exist);
if (values.size() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}

List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>();
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
existList.add(new SetPipedExist<T>(key, values, tc));
} else {
PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
for (List<T> partition : partitionedList) {
existList.add(new SetPipedExist<T>(key, partition, tc));
}
}
return asyncSetPipedExist(key, existList);
}

/**
* Generic pipelined existence operation for set items. Public methods call this method.
*
* @param key collection item's key
* @param exist operation parameters (element values)
* @param existList list of operation parameters (element values)
* @return future holding the map of elements and their existence results
*/
<T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
final String key, final SetPipedExist<T> exist) {

if (exist.getItemCount() == 0) {
throw new IllegalArgumentException(
"The number of piped operations must be larger than 0.");
}
if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
throw new IllegalArgumentException(
"The number of piped operations must not exceed a maximum of "
+ CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + ".");
}

final CountDownLatch latch = new CountDownLatch(1);
final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>(
latch, operationTimeout);
final String key, final List<SetPipedExist<T>> existList) {
final CountDownLatch latch = new CountDownLatch(existList.size());

Operation op = opFact.collectionPipedExist(key, exist,
new CollectionPipedExistOperation.Callback() {
final PipedCollectionFuture<T, Boolean> rv
= new PipedCollectionFuture<T, Boolean>(latch, operationTimeout);

private final Map<T, Boolean> result = new HashMap<T, Boolean>();
private boolean hasAnError = false;

public void receivedStatus(OperationStatus status) {
if (hasAnError) {
return;
}

CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.set(result, cstatus);
}

public void complete() {
latch.countDown();
}

public void gotStatus(Integer index, OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
cstatus = new CollectionOperationStatus(status);
}

switch (cstatus.getResponse()) {
case EXIST:
case NOT_EXIST:
result.put(exist.getValues().get(index),
(CollectionResponse.EXIST.equals(cstatus
.getResponse())));
break;
case UNREADABLE:
case TYPE_MISMATCH:
case NOT_FOUND:
hasAnError = true;
rv.set(new HashMap<T, Boolean>(0),
(CollectionOperationStatus) status);
break;
default:
getLogger().warn("Unhandled state: " + status);
}
}
});

rv.setOperation(op);
addOp(key, op);
for (final SetPipedExist<T> exist : existList) {
Operation op = opFact.collectionPipedExist(key, exist,
new CollectionPipedExistOperation.Callback() {
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.addOperationStatus(cstatus);
}
public void complete() {
latch.countDown();
}
public void gotStatus(Integer index, OperationStatus status) {
CollectionOperationStatus cstatus;
if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
switch (cstatus.getResponse()) {
case EXIST:
case NOT_EXIST:
rv.addEachResult(exist.getValues().get(index),
(CollectionResponse.EXIST.equals(cstatus
.getResponse())));
break;
case UNREADABLE:
case TYPE_MISMATCH:
case NOT_FOUND:
rv.addOperationStatus(cstatus);
break;
default:
getLogger().warn("Unhandled state: " + status);
}
}
});
rv.addOperation(op);
addOp(key, op);
}
return rv;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class CollectionPipedExistOperationImpl extends OperationImpl implements
protected int count;
protected int index = 0;
protected boolean successAll = true;
private OperationStatus status = null;

public CollectionPipedExistOperationImpl(String key,
SetPipedExist<?> collectionExist, OperationCallback cb) {
Expand Down Expand Up @@ -93,10 +94,10 @@ assert getState() == OperationState.READING : "Read ``" + line
/* ENABLE_MIGRATION end */

if (setPipedExist.isNotPiped()) {
OperationStatus status = matchStatus(line, EXIST, NOT_EXIST,
status = matchStatus(line, EXIST, NOT_EXIST,
NOT_FOUND, TYPE_MISMATCH, UNREADABLE);
cb.gotStatus(index, status);
cb.receivedStatus(status.isSuccess() ? END : FAILED_END);
cb.receivedStatus(status.isSuccess() ? END : status);
transitionState(OperationState.COMPLETE);
return;
}
Expand All @@ -108,14 +109,16 @@ assert getState() == OperationState.READING : "Read ``" + line
<status of the last pipelined command>\r\n
END|PIPE_ERROR <error_string>\r\n
*/

if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
return;
}
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
System.out.println("status = " + status.getMessage());
cb.receivedStatus((successAll) ? END : status);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);
Expand All @@ -128,7 +131,7 @@ assert getState() == OperationState.READING : "Read ``" + line
assert "RESPONSE".equals(stuff[0]);
count = Integer.parseInt(stuff[1]);
} else {
OperationStatus status = matchStatus(line, EXIST, NOT_EXIST,
status = matchStatus(line, EXIST, NOT_EXIST,
NOT_FOUND, TYPE_MISMATCH, UNREADABLE);

if (!status.isSuccess()) {
Expand Down

0 comments on commit 0928e2f

Please sign in to comment.