Skip to content

Commit 076d02b

Browse files
committed
INTERNAL: make lop piped operations process synchronously
1 parent e99869f commit 076d02b

File tree

3 files changed

+140
-67
lines changed

3 files changed

+140
-67
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.ArrayList;
2727
import java.util.Arrays;
2828
import java.util.Collection;
29+
import java.util.Collections;
2930
import java.util.Enumeration;
3031
import java.util.HashMap;
3132
import java.util.HashSet;
@@ -1863,11 +1864,16 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
18631864
} else {
18641865
PartitionedList<T> list = new PartitionedList<>(valueList,
18651866
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
1867+
18661868
for (List<T> elementList : list) {
18671869
insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc));
1870+
if (index >= 0) {
1871+
index += elementList.size();
1872+
}
18681873
}
18691874
}
1870-
return asyncCollectionPipedInsert(key, insertList);
1875+
1876+
return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList));
18711877
}
18721878

18731879
@Override
@@ -3170,6 +3176,82 @@ public void gotStatus(Integer index, OperationStatus status) {
31703176
return rv;
31713177
}
31723178

3179+
/**
3180+
* Pipe insert method for collection items.
3181+
*
3182+
* @param key arcus cache key
3183+
* @param insertList must not be empty.
3184+
* @return future holding the map of element index and the reason why insert operation failed
3185+
*/
3186+
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
3187+
final String key, final List<CollectionPipedInsert<T>> insertList) {
3188+
final CountDownLatch latch = new CountDownLatch(1);
3189+
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
3190+
new PipedCollectionFuture<>(latch, operationTimeout);
3191+
3192+
final List<Operation> ops = new ArrayList<>();
3193+
for (int i = 0; i < insertList.size(); i++) {
3194+
final CollectionPipedInsert<T> insert = insertList.get(i);
3195+
final int idx = i;
3196+
Operation op = opFact.collectionPipedInsert(key, insert,
3197+
new CollectionPipedInsertOperation.Callback() {
3198+
// each result status
3199+
public void receivedStatus(OperationStatus status) {
3200+
CollectionOperationStatus cstatus;
3201+
3202+
if (status instanceof CollectionOperationStatus) {
3203+
cstatus = (CollectionOperationStatus) status;
3204+
} else {
3205+
getLogger().warn("Unhandled state: " + status);
3206+
cstatus = new CollectionOperationStatus(status);
3207+
}
3208+
rv.setOperationStatus(cstatus);
3209+
}
3210+
3211+
// complete
3212+
public void complete() {
3213+
if (idx == insertList.size() - 1
3214+
|| rv.hasErrored()
3215+
|| rv.getOperationStatus().getResponse() == CollectionResponse.CANCELED) {
3216+
// countdown if this is last op
3217+
latch.countDown();
3218+
} else if (!rv.getOperationStatus().isSuccess()) {
3219+
// if error or cancel occurred by this operation,
3220+
// do not add all remaining operations and mark as cancelled
3221+
for (int chunkIdx = idx + 1; chunkIdx < insertList.size(); chunkIdx++) {
3222+
for (int itemIdx = 0; itemIdx < insertList.get(chunkIdx).getItemCount(); itemIdx++) {
3223+
rv.addEachResult(itemIdx + (chunkIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3224+
new CollectionOperationStatus(new CollectionOperationStatus(
3225+
false, "CANCELED", CollectionResponse.CANCELED)));
3226+
}
3227+
}
3228+
latch.countDown();
3229+
} else {
3230+
// add next operation if this is not last op
3231+
Operation nextOp = ops.get(idx + 1);
3232+
rv.addOperation(nextOp);
3233+
addOp(key, nextOp);
3234+
}
3235+
}
3236+
3237+
// got status
3238+
public void gotStatus(Integer index, OperationStatus status) {
3239+
if (status instanceof CollectionOperationStatus) {
3240+
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3241+
(CollectionOperationStatus) status);
3242+
} else {
3243+
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3244+
new CollectionOperationStatus(status));
3245+
}
3246+
}
3247+
});
3248+
ops.add(op);
3249+
}
3250+
rv.addOperation(ops.get(0));
3251+
addOp(key, ops.get(0));
3252+
return rv;
3253+
}
3254+
31733255
@Override
31743256
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
31753257
List<String> keyList, long bkey, byte[] eFlag, Object value,

src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java

Lines changed: 35 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package net.spy.memcached.internal;
22

3-
import java.util.Collection;
4-
import java.util.HashSet;
3+
import java.util.ArrayList;
4+
import java.util.List;
55
import java.util.Map;
66
import java.util.concurrent.ConcurrentHashMap;
7-
import java.util.concurrent.ConcurrentLinkedQueue;
87
import java.util.concurrent.CountDownLatch;
98
import java.util.concurrent.ExecutionException;
109
import java.util.concurrent.TimeUnit;
@@ -18,7 +17,8 @@
1817

1918
public class PipedCollectionFuture<K, V>
2019
extends CollectionFuture<Map<K, V>> {
21-
private final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<>();
20+
// operations that are completed or in progress
21+
private final List<Operation> ops = new ArrayList<>();
2222
private final AtomicReference<CollectionOperationStatus> operationStatus
2323
= new AtomicReference<>(null);
2424

@@ -31,67 +31,56 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {
3131

3232
@Override
3333
public boolean cancel(boolean ign) {
34-
boolean rv = false;
35-
for (Operation op : ops) {
36-
rv |= op.cancel("by application.");
37-
}
38-
return rv;
34+
return ops.get(ops.size() - 1).cancel("by application.");
3935
}
4036

37+
/**
38+
* if previous op is cancelled, then next ops are not added to the opQueue.
39+
* So we only need to check current op.
40+
*
41+
* @return true if operation is cancelled.
42+
*/
4143
@Override
4244
public boolean isCancelled() {
43-
for (Operation op : ops) {
44-
if (op.isCancelled()) {
45-
return true;
46-
}
47-
}
48-
return false;
45+
return ops.get(ops.size() - 1).isCancelled();
46+
}
47+
48+
/**
49+
* if previous op threw exception, then next ops are not added to the opQueue.
50+
* So we only need to check current op.
51+
*
52+
* @return true if operation has errored by exception.
53+
*/
54+
public boolean hasErrored() {
55+
return ops.get(ops.size() - 1).hasErrored();
4956
}
5057

5158
@Override
5259
public boolean isDone() {
53-
for (Operation op : ops) {
54-
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
55-
return false;
56-
}
57-
}
58-
return true;
60+
return latch.getCount() == 0;
5961
}
6062

6163
@Override
6264
public Map<K, V> get(long duration, TimeUnit unit)
6365
throws InterruptedException, TimeoutException, ExecutionException {
64-
6566
long beforeAwait = System.currentTimeMillis();
66-
if (!latch.await(duration, unit)) {
67-
Collection<Operation> timedOutOps = new HashSet<>();
68-
for (Operation op : ops) {
69-
if (op.getState() != OperationState.COMPLETE) {
70-
timedOutOps.add(op);
71-
} else {
72-
MemcachedConnection.opSucceeded(op);
73-
}
74-
}
75-
if (!timedOutOps.isEmpty()) {
76-
// set timeout only once for piped ops requested to single node.
77-
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());
78-
79-
long elapsed = System.currentTimeMillis() - beforeAwait;
80-
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
81-
}
67+
Operation lastOp = ops.get(ops.size() - 1);
68+
if (!latch.await(duration, unit) && lastOp.getState() != OperationState.COMPLETE) {
69+
MemcachedConnection.opTimedOut(lastOp);
70+
71+
long elapsed = System.currentTimeMillis() - beforeAwait;
72+
throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastOp);
8273
} else {
8374
// continuous timeout counter will be reset only once in pipe
84-
MemcachedConnection.opSucceeded(ops.iterator().next());
75+
MemcachedConnection.opSucceeded(lastOp);
8576
}
8677

87-
for (Operation op : ops) {
88-
if (op != null && op.hasErrored()) {
89-
throw new ExecutionException(op.getException());
90-
}
78+
if (lastOp != null && lastOp.hasErrored()) {
79+
throw new ExecutionException(lastOp.getException());
80+
}
9181

92-
if (op != null && op.isCancelled()) {
93-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
94-
}
82+
if (lastOp != null && lastOp.isCancelled()) {
83+
throw new ExecutionException(new RuntimeException(lastOp.getCancelCause()));
9584
}
9685

9786
return failedResult;

src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java renamed to src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package net.spy.memcached.bulkoperation;
1818

19+
import java.util.ArrayList;
1920
import java.util.Arrays;
2021
import java.util.List;
2122
import java.util.Map;
@@ -25,13 +26,14 @@
2526

2627
import net.spy.memcached.collection.BaseIntegrationTest;
2728
import net.spy.memcached.collection.CollectionAttributes;
29+
import net.spy.memcached.collection.CollectionResponse;
2830
import net.spy.memcached.ops.CollectionOperationStatus;
2931

3032
import org.junit.Assert;
3133

32-
public class LopInsertBulkMultipleValueTest extends BaseIntegrationTest {
34+
public class LopPipedInsertBulkMultipleValueTest extends BaseIntegrationTest {
3335

34-
private String key = "LopInsertBulkMultipleValueTest";
36+
private String key = "LopPipedInsertBulkMultipleValueTest";
3537

3638
@Override
3739
protected void tearDown() throws Exception {
@@ -42,10 +44,10 @@ protected void tearDown() throws Exception {
4244
public void testInsertAndGet() {
4345
String value = "MyValue";
4446

45-
int valueCount = 500;
46-
Object[] valueList = new Object[valueCount];
47-
for (int i = 0; i < valueList.length; i++) {
48-
valueList[i] = "MyValue";
47+
int valueCount = 510;
48+
List<Object> valueList = new ArrayList<>(valueCount);
49+
for (int i = 0; i < valueCount; i++) {
50+
valueList.add("MyValue" + i);
4951
}
5052

5153
try {
@@ -54,8 +56,7 @@ public void testInsertAndGet() {
5456

5557
// SET
5658
Future<Map<Integer, CollectionOperationStatus>> future = mc
57-
.asyncLopPipedInsertBulk(key, 0, Arrays.asList(valueList),
58-
new CollectionAttributes());
59+
.asyncLopPipedInsertBulk(key, 0, valueList, new CollectionAttributes());
5960
try {
6061
Map<Integer, CollectionOperationStatus> errorList = future.get(
6162
20000L, TimeUnit.MILLISECONDS);
@@ -68,27 +69,27 @@ public void testInsertAndGet() {
6869

6970
// GET
7071
int errorCount = 0;
71-
List<Object> list = null;
72+
List<Object> resultList = null;
7273
Future<List<Object>> f = mc.asyncLopGet(key, 0, valueCount, false,
7374
false);
7475
try {
75-
list = f.get();
76+
resultList = f.get();
7677
} catch (Exception e) {
7778
f.cancel(true);
7879
e.printStackTrace();
7980
Assert.fail(e.getMessage());
8081
}
8182

82-
Assert.assertNotNull("List is null.", list);
83-
Assert.assertTrue("Cached list is empty.", !list.isEmpty());
84-
Assert.assertEquals(valueCount, list.size());
83+
Assert.assertNotNull("List is null.", resultList);
84+
Assert.assertTrue("Cached resultList is empty.", !resultList.isEmpty());
85+
Assert.assertEquals(valueCount, resultList.size());
8586

86-
for (Object o : list) {
87-
if (!value.equals(o)) {
87+
for (int i = 0; i < resultList.size(); i++) {
88+
if (!resultList.get(i).equals(valueList.get(i))) {
8889
errorCount++;
8990
}
9091
}
91-
Assert.assertEquals(valueCount, list.size());
92+
Assert.assertEquals(valueCount, resultList.size());
9293
Assert.assertEquals(0, errorCount);
9394

9495
// REMOVE
@@ -102,9 +103,7 @@ public void testInsertAndGet() {
102103
public void testErrorCount() {
103104
int valueCount = 1200;
104105
Object[] valueList = new Object[valueCount];
105-
for (int i = 0; i < valueList.length; i++) {
106-
valueList[i] = "MyValue";
107-
}
106+
Arrays.fill(valueList, "MyValue");
108107

109108
try {
110109
// SET
@@ -115,7 +114,10 @@ public void testErrorCount() {
115114
Map<Integer, CollectionOperationStatus> map = future.get(1000L,
116115
TimeUnit.MILLISECONDS);
117116
assertEquals(valueCount, map.size());
118-
117+
assertEquals(map.get(mc.getMaxPipedItemCount() - 1).getResponse(),
118+
CollectionResponse.NOT_FOUND);
119+
assertEquals(map.get(mc.getMaxPipedItemCount()).getResponse(),
120+
CollectionResponse.CANCELED);
119121
} catch (Exception e) {
120122
e.printStackTrace();
121123
Assert.fail();

0 commit comments

Comments
 (0)