Skip to content

Commit db4f08a

Browse files
committed
INTERNAL: make lop piped operations process synchronously
1 parent 69cfd20 commit db4f08a

File tree

5 files changed

+145
-63
lines changed

5 files changed

+145
-63
lines changed

docs/user_guide/04-list-API.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,10 @@ asyncLopPipedInsertBulk(String key, int index, List<Object> valueList, Collectio
332332
- null: element 삽입하지 않는다.
333333
- attributes: 주어진 attributes를 가진 empty list item 생성 후에 element 삽입한다.
334334

335+
만약 삽입 요청 중 일부가 실패하면 filled map을 반환한다. 마지막 entry의 value에
336+
`CollectionResponse가 CANCELED인 CollectionOperationStatus`가 존재하는 경우, 이전 오류로 인해
337+
해당 요청 이후의 모든 요청이 취소되었다는 것을 의미한다.
338+
335339
둘째, 여러 key들이 가리키는 list들에 각각 동일한 하나의 element를 삽입하는 함수이다.
336340

337341
```java

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.ArrayList;
2727
import java.util.Arrays;
2828
import java.util.Collection;
29+
import java.util.Collections;
2930
import java.util.Enumeration;
3031
import java.util.HashMap;
3132
import java.util.HashSet;
@@ -38,6 +39,7 @@
3839
import java.util.concurrent.TimeUnit;
3940
import java.util.concurrent.atomic.AtomicBoolean;
4041
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.function.IntFunction;
4143
import java.util.jar.JarFile;
4244
import java.util.jar.Manifest;
4345

@@ -1864,11 +1866,16 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
18641866
} else {
18651867
PartitionedList<T> list = new PartitionedList<>(valueList,
18661868
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
1869+
18671870
for (List<T> elementList : list) {
18681871
insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc));
1872+
if (index >= 0) {
1873+
index += elementList.size();
1874+
}
18691875
}
18701876
}
1871-
return asyncCollectionPipedInsert(key, insertList);
1877+
1878+
return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList));
18721879
}
18731880

18741881
@Override
@@ -3139,6 +3146,74 @@ public void gotStatus(Integer index, OperationStatus status) {
31393146
return rv;
31403147
}
31413148

3149+
/**
3150+
* Pipe insert method for collection items.
3151+
*
3152+
* @param key arcus cache key
3153+
* @param insertList must not be empty.
3154+
* @return future holding the map of element index and the reason why insert operation failed
3155+
*/
3156+
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
3157+
final String key, final List<CollectionPipedInsert<T>> insertList) {
3158+
final CountDownLatch latch = new CountDownLatch(1);
3159+
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
3160+
new PipedCollectionFuture<>(latch, operationTimeout);
3161+
3162+
final List<Operation> ops = new ArrayList<>(insertList.size());
3163+
IntFunction<OperationCallback> makeCallback = opIdx -> new CollectionPipedInsertOperation.Callback() {
3164+
// each result status
3165+
public void receivedStatus(OperationStatus status) {
3166+
CollectionOperationStatus cstatus;
3167+
3168+
if (status instanceof CollectionOperationStatus) {
3169+
cstatus = (CollectionOperationStatus) status;
3170+
} else {
3171+
getLogger().warn("Unhandled state: " + status);
3172+
cstatus = new CollectionOperationStatus(status);
3173+
}
3174+
rv.setOperationStatus(cstatus);
3175+
}
3176+
3177+
// complete
3178+
public void complete() {
3179+
if (opIdx == insertList.size() - 1 || rv.hasErrored() || rv.isCancelled()) {
3180+
latch.countDown();
3181+
} else if (!rv.getOperationStatus().isSuccess()) {
3182+
// If this operation failed, remaining subsequent operation
3183+
// should not be added and should be marked as cancelled.
3184+
rv.addEachResult((opIdx + 1) * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT,
3185+
new CollectionOperationStatus(false, "CANCELED", CollectionResponse.CANCELED));
3186+
latch.countDown();
3187+
} else {
3188+
// add next operation if this is not last op
3189+
Operation nextOp = ops.get(opIdx + 1);
3190+
rv.addOperation(nextOp);
3191+
addOp(key, nextOp);
3192+
}
3193+
}
3194+
3195+
// got status
3196+
public void gotStatus(Integer index, OperationStatus status) {
3197+
if (status instanceof CollectionOperationStatus) {
3198+
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3199+
(CollectionOperationStatus) status);
3200+
} else {
3201+
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3202+
new CollectionOperationStatus(status));
3203+
}
3204+
}
3205+
};
3206+
3207+
for (int i = 0; i < insertList.size(); i++) {
3208+
final CollectionPipedInsert<T> insert = insertList.get(i);
3209+
Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i));
3210+
ops.add(op);
3211+
}
3212+
rv.addOperation(ops.get(0));
3213+
addOp(key, ops.get(0));
3214+
return rv;
3215+
}
3216+
31423217
@Override
31433218
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
31443219
List<String> keyList, long bkey, byte[] eFlag, Object value,

src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package net.spy.memcached.internal;
22

33
import java.util.ArrayList;
4-
import java.util.Collection;
4+
import java.util.List;
55
import java.util.Map;
66
import java.util.concurrent.ConcurrentHashMap;
77
import java.util.concurrent.CountDownLatch;
@@ -11,13 +11,15 @@
1111
import java.util.concurrent.atomic.AtomicReference;
1212

1313
import net.spy.memcached.MemcachedConnection;
14+
import net.spy.memcached.collection.CollectionResponse;
1415
import net.spy.memcached.ops.CollectionOperationStatus;
1516
import net.spy.memcached.ops.Operation;
1617
import net.spy.memcached.ops.OperationState;
1718

1819
public class PipedCollectionFuture<K, V>
1920
extends CollectionFuture<Map<K, V>> {
20-
private final Collection<Operation> ops = new ArrayList<>();
21+
// operations that are completed or in progress
22+
private final List<Operation> ops = new ArrayList<>();
2123
private final AtomicReference<CollectionOperationStatus> operationStatus
2224
= new AtomicReference<>(null);
2325

@@ -30,67 +32,60 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {
3032

3133
@Override
3234
public boolean cancel(boolean ign) {
33-
boolean rv = false;
34-
for (Operation op : ops) {
35-
rv |= op.cancel("by application.");
36-
}
37-
return rv;
35+
return ops.get(ops.size() - 1).cancel("by application.");
3836
}
3937

38+
/**
39+
* if previous op is cancelled, then next ops are not added to the opQueue.
40+
* So we only need to check current op.
41+
*
42+
* @return true if operation is cancelled.
43+
*/
4044
@Override
4145
public boolean isCancelled() {
42-
for (Operation op : ops) {
43-
if (op.isCancelled()) {
44-
return true;
45-
}
46-
}
47-
return false;
46+
return operationStatus.get().getResponse() == CollectionResponse.CANCELED;
47+
}
48+
49+
/**
50+
* if previous op threw exception, then next ops are not added to the opQueue.
51+
* So we only need to check current op.
52+
*
53+
* @return true if operation has errored by exception.
54+
*/
55+
public boolean hasErrored() {
56+
return ops.get(ops.size() - 1).hasErrored();
4857
}
4958

5059
@Override
5160
public boolean isDone() {
52-
for (Operation op : ops) {
53-
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
54-
return false;
55-
}
56-
}
57-
return true;
61+
return latch.getCount() == 0;
5862
}
5963

6064
@Override
6165
public Map<K, V> get(long duration, TimeUnit unit)
6266
throws InterruptedException, TimeoutException, ExecutionException {
63-
6467
long beforeAwait = System.currentTimeMillis();
68+
Operation lastOp;
6569
if (!latch.await(duration, unit)) {
66-
Collection<Operation> timedOutOps = new ArrayList<>();
67-
for (Operation op : ops) {
68-
if (op.getState() != OperationState.COMPLETE) {
69-
timedOutOps.add(op);
70-
} else {
71-
MemcachedConnection.opSucceeded(op);
72-
}
73-
}
74-
if (!timedOutOps.isEmpty()) {
75-
// set timeout only once for piped ops requested to single node.
76-
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());
70+
lastOp = ops.get(ops.size() - 1);
71+
if(lastOp.getState() != OperationState.COMPLETE) {
72+
MemcachedConnection.opTimedOut(lastOp);
7773

7874
long elapsed = System.currentTimeMillis() - beforeAwait;
79-
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
75+
throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastOp);
8076
}
8177
} else {
8278
// continuous timeout counter will be reset only once in pipe
83-
MemcachedConnection.opSucceeded(ops.iterator().next());
79+
lastOp = ops.get(ops.size() - 1);
80+
MemcachedConnection.opSucceeded(lastOp);
8481
}
8582

86-
for (Operation op : ops) {
87-
if (op != null && op.hasErrored()) {
88-
throw new ExecutionException(op.getException());
89-
}
83+
if (lastOp != null && lastOp.hasErrored()) {
84+
throw new ExecutionException(lastOp.getException());
85+
}
9086

91-
if (op != null && op.isCancelled()) {
92-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
93-
}
87+
if (lastOp != null && lastOp.isCancelled()) {
88+
throw new ExecutionException(new RuntimeException(lastOp.getCancelCause()));
9489
}
9590

9691
return failedResult;

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ assert getState() == OperationState.READING
136136
<status of the last pipelined command>\r\n
137137
END|PIPE_ERROR <error_string>\r\n
138138
*/
139-
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
139+
if (line.startsWith("END")) {
140140
/* ENABLE_MIGRATION if */
141141
if (needRedirect()) {
142142
transitionState(OperationState.REDIRECT);
@@ -145,6 +145,10 @@ assert getState() == OperationState.READING
145145
/* ENABLE_MIGRATION end */
146146
cb.receivedStatus((successAll) ? END : FAILED_END);
147147
transitionState(OperationState.COMPLETE);
148+
} else if (line.startsWith("PIPE_ERROR ")) {
149+
// command flow / memory flow
150+
cb.receivedStatus(FAILED_END);
151+
transitionState(OperationState.COMPLETE);
148152
} else if (line.startsWith("RESPONSE ")) {
149153
getLogger().debug("Got line %s", line);
150154

src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java renamed to src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package net.spy.memcached.bulkoperation;
1818

19+
import java.util.ArrayList;
1920
import java.util.Arrays;
2021
import java.util.List;
2122
import java.util.Map;
@@ -25,19 +26,22 @@
2526

2627
import net.spy.memcached.collection.BaseIntegrationTest;
2728
import net.spy.memcached.collection.CollectionAttributes;
29+
import net.spy.memcached.collection.CollectionPipedInsert;
30+
import net.spy.memcached.collection.CollectionResponse;
2831
import net.spy.memcached.ops.CollectionOperationStatus;
2932

3033
import org.junit.jupiter.api.AfterEach;
3134
import org.junit.jupiter.api.Test;
3235

3336
import static org.junit.jupiter.api.Assertions.assertEquals;
37+
import static org.junit.jupiter.api.Assertions.assertFalse;
3438
import static org.junit.jupiter.api.Assertions.assertNotNull;
3539
import static org.junit.jupiter.api.Assertions.assertTrue;
3640
import static org.junit.jupiter.api.Assertions.fail;
3741

38-
class LopInsertBulkMultipleValueTest extends BaseIntegrationTest {
42+
class LopPipedInsertBulkMultipleValueTest extends BaseIntegrationTest {
3943

40-
private String key = "LopInsertBulkMultipleValueTest";
44+
private String key = "LopPipedInsertBulkMultipleValueTest";
4145

4246
@AfterEach
4347
@Override
@@ -50,10 +54,10 @@ protected void tearDown() throws Exception {
5054
void testInsertAndGet() {
5155
String value = "MyValue";
5256

53-
int valueCount = 500;
54-
Object[] valueList = new Object[valueCount];
55-
for (int i = 0; i < valueList.length; i++) {
56-
valueList[i] = "MyValue";
57+
int valueCount = 510;
58+
List<Object> valueList = new ArrayList<>(valueCount);
59+
for (int i = 0; i < valueCount; i++) {
60+
valueList.add("MyValue" + i);
5761
}
5862

5963
try {
@@ -62,8 +66,7 @@ void testInsertAndGet() {
6266

6367
// SET
6468
Future<Map<Integer, CollectionOperationStatus>> future = mc
65-
.asyncLopPipedInsertBulk(key, 0, Arrays.asList(valueList),
66-
new CollectionAttributes());
69+
.asyncLopPipedInsertBulk(key, 0, valueList, new CollectionAttributes());
6770
try {
6871
Map<Integer, CollectionOperationStatus> errorList = future.get(
6972
20000L, TimeUnit.MILLISECONDS);
@@ -76,27 +79,27 @@ void testInsertAndGet() {
7679

7780
// GET
7881
int errorCount = 0;
79-
List<Object> list = null;
82+
List<Object> resultList = null;
8083
Future<List<Object>> f = mc.asyncLopGet(key, 0, valueCount, false,
8184
false);
8285
try {
83-
list = f.get();
86+
resultList = f.get();
8487
} catch (Exception e) {
8588
f.cancel(true);
8689
e.printStackTrace();
8790
fail(e.getMessage());
8891
}
8992

90-
assertNotNull(list, "List is null.");
91-
assertTrue(!list.isEmpty(), "Cached list is empty.");
92-
assertEquals(valueCount, list.size());
93+
assertNotNull(resultList);
94+
assertFalse(resultList.isEmpty(), "Cached list is empty.");
95+
assertEquals(valueCount, resultList.size());
9396

94-
for (Object o : list) {
95-
if (!value.equals(o)) {
97+
for (int i = 0; i < resultList.size(); i++) {
98+
if (!resultList.get(i).equals(valueList.get(i))) {
9699
errorCount++;
97100
}
98101
}
99-
assertEquals(valueCount, list.size());
102+
assertEquals(valueCount, resultList.size());
100103
assertEquals(0, errorCount);
101104

102105
// REMOVE
@@ -111,9 +114,7 @@ void testInsertAndGet() {
111114
void testErrorCount() {
112115
int valueCount = 1200;
113116
Object[] valueList = new Object[valueCount];
114-
for (int i = 0; i < valueList.length; i++) {
115-
valueList[i] = "MyValue";
116-
}
117+
Arrays.fill(valueList, "MyValue");
117118

118119
try {
119120
// SET
@@ -123,8 +124,11 @@ void testErrorCount() {
123124

124125
Map<Integer, CollectionOperationStatus> map = future.get(1000L,
125126
TimeUnit.MILLISECONDS);
126-
assertEquals(valueCount, map.size());
127-
127+
assertEquals(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + 1, map.size());
128+
assertEquals(map.get(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT - 1).getResponse(),
129+
CollectionResponse.NOT_FOUND);
130+
assertEquals(map.get(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT).getResponse(),
131+
CollectionResponse.CANCELED);
128132
} catch (Exception e) {
129133
e.printStackTrace();
130134
fail();

0 commit comments

Comments
 (0)