Skip to content

Commit

Permalink
INTERNAL: make lop piped operations process synchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviarla committed Sep 6, 2024
1 parent d065b77 commit d232a01
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 68 deletions.
4 changes: 4 additions & 0 deletions docs/user_guide/04-list-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ asyncLopPipedInsertBulk(String key, int index, List<Object> valueList, Collectio
- null: element 삽입하지 않는다.
- attributes: 주어진 attributes를 가진 empty list item 생성 후에 element 삽입한다.

만약 삽입 요청 중 일부가 실패하면 filled map을 반환한다. 마지막 entry의 value에
`CollectionResponse가 CANCELED인 CollectionOperationStatus`가 존재하는 경우, 이전 오류로 인해
해당 요청 이후의 모든 요청이 취소되었다는 것을 의미한다.

둘째, 여러 key들이 가리키는 list들에 각각 동일한 하나의 element를 삽입하는 함수이다.

```java
Expand Down
77 changes: 76 additions & 1 deletion src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -38,6 +39,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntFunction;
import java.util.jar.JarFile;
import java.util.jar.Manifest;

Expand Down Expand Up @@ -1862,11 +1864,16 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
} else {
PartitionedList<T> list = new PartitionedList<>(valueList,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

for (List<T> elementList : list) {
insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc));
if (index >= 0) {
index += elementList.size();
}
}
}
return asyncCollectionPipedInsert(key, insertList);

return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList));
}

@Override
Expand Down Expand Up @@ -3137,6 +3144,74 @@ public void gotStatus(Integer index, OperationStatus status) {
return rv;
}

/**
* Pipe insert method for collection items.
*
* @param key arcus cache key
* @param insertList must not be empty.
* @return future holding the map of element index and the reason why insert operation failed
*/
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
final String key, final List<CollectionPipedInsert<T>> insertList) {
final CountDownLatch latch = new CountDownLatch(1);
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<>(latch, operationTimeout);

final List<Operation> ops = new ArrayList<>(insertList.size());
IntFunction<OperationCallback> makeCallback = idx -> new CollectionPipedInsertOperation.Callback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

// complete
public void complete() {
if (idx == insertList.size() - 1 || rv.hasErrored() || rv.isCancelled()) {
latch.countDown();
} else if (!rv.getOperationStatus().isSuccess()) {
// If this operation failed, remaining subsequent operation
// should not be added and should be marked as cancelled.
rv.addEachResult((idx + 1) * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT,
new CollectionOperationStatus(false, "CANCELED", CollectionResponse.CANCELED));
latch.countDown();
} else {
// add next operation if this is not last op
Operation nextOp = ops.get(idx + 1);
rv.addOperation(nextOp);
addOp(key, nextOp);
}
}

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
};

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i));
ops.add(op);
}
rv.addOperation(ops.get(0));
addOp(key, ops.get(0));
return rv;
}

@Override
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
List<String> keyList, long bkey, byte[] eFlag, Object value,
Expand Down
80 changes: 35 additions & 45 deletions src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package net.spy.memcached.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -12,13 +11,15 @@
import java.util.concurrent.atomic.AtomicReference;

import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

public class PipedCollectionFuture<K, V>
extends CollectionFuture<Map<K, V>> {
private final Collection<Operation> ops = new ArrayList<>();
// operations that are completed or in progress
private final List<Operation> ops = new ArrayList<>();
private final AtomicReference<CollectionOperationStatus> operationStatus
= new AtomicReference<>(null);

Expand All @@ -31,67 +32,56 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
rv |= op.cancel("by application.");
}
return rv;
return ops.get(ops.size() - 1).cancel("by application.");
}

/**
* if previous op is cancelled, then next ops are not added to the opQueue.
* So we only need to check current op.
*
* @return true if operation is cancelled.
*/
@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
return operationStatus.get().getResponse() == CollectionResponse.CANCELED;
}

/**
* if previous op threw exception, then next ops are not added to the opQueue.
* So we only need to check current op.
*
* @return true if operation has errored by exception.
*/
public boolean hasErrored() {
return ops.get(ops.size() - 1).hasErrored();
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
return latch.getCount() == 0;
}

@Override
public Map<K, V> get(long duration, TimeUnit unit)
throws InterruptedException, TimeoutException, ExecutionException {

long beforeAwait = System.currentTimeMillis();
if (!latch.await(duration, unit)) {
Collection<Operation> timedOutOps = new HashSet<>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedOutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (!timedOutOps.isEmpty()) {
// set timeout only once for piped ops requested to single node.
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());

long elapsed = System.currentTimeMillis() - beforeAwait;
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
}
Operation lastOp = ops.get(ops.size() - 1);
if (!latch.await(duration, unit) && lastOp.getState() != OperationState.COMPLETE) {
MemcachedConnection.opTimedOut(lastOp);

long elapsed = System.currentTimeMillis() - beforeAwait;
throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastOp);
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
MemcachedConnection.opSucceeded(lastOp);
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}
if (lastOp != null && lastOp.hasErrored()) {
throw new ExecutionException(lastOp.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
if (lastOp != null && lastOp.isCancelled()) {
throw new ExecutionException(new RuntimeException(lastOp.getCancelCause()));
}

return failedResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ assert getState() == OperationState.READING
<status of the last pipelined command>\r\n
END|PIPE_ERROR <error_string>\r\n
*/
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
if (line.startsWith("END")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
Expand All @@ -145,6 +145,10 @@ assert getState() == OperationState.READING
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("PIPE_ERROR ")) {
// command flow / memory flow
cb.receivedStatus(FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package net.spy.memcached.bulkoperation;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -25,13 +26,15 @@

import net.spy.memcached.collection.BaseIntegrationTest;
import net.spy.memcached.collection.CollectionAttributes;
import net.spy.memcached.collection.CollectionPipedInsert;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.CollectionOperationStatus;

import org.junit.Assert;

public class LopInsertBulkMultipleValueTest extends BaseIntegrationTest {
public class LopPipedInsertBulkMultipleValueTest extends BaseIntegrationTest {

private String key = "LopInsertBulkMultipleValueTest";
private String key = "LopPipedInsertBulkMultipleValueTest";

@Override
protected void tearDown() throws Exception {
Expand All @@ -42,10 +45,10 @@ protected void tearDown() throws Exception {
public void testInsertAndGet() {
String value = "MyValue";

int valueCount = 500;
Object[] valueList = new Object[valueCount];
for (int i = 0; i < valueList.length; i++) {
valueList[i] = "MyValue";
int valueCount = 510;
List<Object> valueList = new ArrayList<>(valueCount);
for (int i = 0; i < valueCount; i++) {
valueList.add("MyValue" + i);
}

try {
Expand All @@ -54,8 +57,7 @@ public void testInsertAndGet() {

// SET
Future<Map<Integer, CollectionOperationStatus>> future = mc
.asyncLopPipedInsertBulk(key, 0, Arrays.asList(valueList),
new CollectionAttributes());
.asyncLopPipedInsertBulk(key, 0, valueList, new CollectionAttributes());
try {
Map<Integer, CollectionOperationStatus> errorList = future.get(
20000L, TimeUnit.MILLISECONDS);
Expand All @@ -68,27 +70,27 @@ public void testInsertAndGet() {

// GET
int errorCount = 0;
List<Object> list = null;
List<Object> resultList = null;
Future<List<Object>> f = mc.asyncLopGet(key, 0, valueCount, false,
false);
try {
list = f.get();
resultList = f.get();
} catch (Exception e) {
f.cancel(true);
e.printStackTrace();
Assert.fail(e.getMessage());
}

Assert.assertNotNull("List is null.", list);
Assert.assertTrue("Cached list is empty.", !list.isEmpty());
Assert.assertEquals(valueCount, list.size());
Assert.assertNotNull("List is null.", resultList);
Assert.assertTrue("Cached resultList is empty.", !resultList.isEmpty());
Assert.assertEquals(valueCount, resultList.size());

for (Object o : list) {
if (!value.equals(o)) {
for (int i = 0; i < resultList.size(); i++) {
if (!resultList.get(i).equals(valueList.get(i))) {
errorCount++;
}
}
Assert.assertEquals(valueCount, list.size());
Assert.assertEquals(valueCount, resultList.size());
Assert.assertEquals(0, errorCount);

// REMOVE
Expand All @@ -102,9 +104,7 @@ public void testInsertAndGet() {
public void testErrorCount() {
int valueCount = 1200;
Object[] valueList = new Object[valueCount];
for (int i = 0; i < valueList.length; i++) {
valueList[i] = "MyValue";
}
Arrays.fill(valueList, "MyValue");

try {
// SET
Expand All @@ -114,8 +114,11 @@ public void testErrorCount() {

Map<Integer, CollectionOperationStatus> map = future.get(1000L,
TimeUnit.MILLISECONDS);
assertEquals(valueCount, map.size());

assertEquals(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + 1, map.size());
assertEquals(map.get(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT - 1).getResponse(),
CollectionResponse.NOT_FOUND);
assertEquals(map.get(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT).getResponse(),
CollectionResponse.CANCELED);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
Expand Down

0 comments on commit d232a01

Please sign in to comment.