Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@
import net.spy.memcached.collection.CollectionPipedUpdate;
import net.spy.memcached.collection.CollectionPipedUpdate.BTreePipedUpdate;
import net.spy.memcached.collection.CollectionPipedUpdate.MapPipedUpdate;
import net.spy.memcached.collection.CollectionPipedUpsert;
import net.spy.memcached.collection.CollectionPipedUpsert.BTreePipedUpsert;
import net.spy.memcached.collection.CollectionPipedUpsert.ByteArrayBTreePipedUpsert;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.collection.CollectionUpdate;
import net.spy.memcached.collection.Element;
Expand Down Expand Up @@ -2707,6 +2710,64 @@ public void complete() {
return rv;
}

@Override
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, Map<Long, Object> elements,
CollectionAttributes attributesForCreate) {
return asyncBopPipedUpsertBulk(key, elements, attributesForCreate, collectionTranscoder);
}

@Override
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, List<Element<Object>> elements, CollectionAttributes collectionAttributes) {
return asyncBopPipedUpsertBulk(key, elements, collectionAttributes, collectionTranscoder);
}

@Override
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, Map<Long, T> elements, CollectionAttributes attributesForCreate, Transcoder<T> tc) {

if (elements.isEmpty()) {
throw new IllegalArgumentException("The number of piped operations must be larger than 0.");
}

List<CollectionPipedInsert<T>> upsertList = new ArrayList<CollectionPipedInsert<T>>();

if (elements.size() <= CollectionPipedUpsert.MAX_PIPED_ITEM_COUNT) {
upsertList.add(new BTreePipedUpsert<T>(key, elements, attributesForCreate, tc));
} else {
PartitionedMap<Long, T> list = new PartitionedMap<Long, T>(
elements, CollectionPipedUpsert.MAX_PIPED_ITEM_COUNT);
for (Map<Long, T> elementMap : list) {
upsertList.add(new BTreePipedUpsert<T>(key, elementMap, attributesForCreate, tc));
}
}
return asyncCollectionPipedInsert(key, upsertList);
}

@Override
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, List<Element<T>> elements, CollectionAttributes attributesForCreate, Transcoder<T> tc) {

if (elements.isEmpty()) {
throw new IllegalArgumentException("The number of piped operations must be larger than 0.");
}

List<CollectionPipedInsert<T>> upsertList = new ArrayList<CollectionPipedInsert<T>>();


if (elements.size() <= CollectionPipedUpsert.MAX_PIPED_ITEM_COUNT) {
upsertList.add(new ByteArrayBTreePipedUpsert<T>(key, elements, attributesForCreate, tc));
} else {
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
for (List<Element<T>> elementList : list) {
upsertList.add(new ByteArrayBTreePipedUpsert<T>(key, elementList, attributesForCreate, tc));
}
}
return asyncCollectionPipedInsert(key, upsertList);
}

@Override
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpdateBulk(
String key, List<Element<Object>> elements) {
Expand Down
55 changes: 55 additions & 0 deletions src/main/java/net/spy/memcached/ArcusClientIF.java
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,61 @@ CollectionFuture<Boolean> asyncMopUpdate(String key, String mkey,
<T> CollectionFuture<Boolean> asyncMopUpdate(String key, String mkey,
T value, Transcoder<T> tc);

/**
* Upsert elements into the b+tree
*
* @param key key of a b+tree
* @param elements list of b+tree elements
* @param attributesForCreate create a b+tree with this attributes,
* if given key of b+tree is not exists.
* @return a future indicating success
*/
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, List<Element<Object>> elements, CollectionAttributes attributesForCreate);


/**
* Upsert elements into the b+tree
*
* @param key key of a b+tree
* @param elements map of b+tree elements
* @param attributesForCreate create a b+tree with this attributes,
* if given key of b+tree is not exists.
* @return a future indicating success
*/
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, Map<Long, Object> elements,
CollectionAttributes attributesForCreate);

/**
* Upsert elements into the b+tree
*
* @param key key of a b+tree
* @param elements list of b+tree elements
* @param attributesForCreate create a b+tree with this attributes,
* if given key of b+tree is not exists.
* @param tc a transcoder to encode the value of element
* @return a future indicating success
*/
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, List<Element<T>> elements,
CollectionAttributes attributesForCreate, Transcoder<T> tc);


/**
* Upsert elements into the b+tree
*
* @param key key of a b+tree
* @param elements map of b+tree elements
* @param attributesForCreate create a b+tree with this attributes,
* if given key of b+tree is not exists.
* @param tc a transcoder to encode the value of element
* @return a future indicating success
*/
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, Map<Long, T> elements,
CollectionAttributes attributesForCreate, Transcoder<T> tc);

/**
* Update elements from the b+tree
*
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/net/spy/memcached/ArcusClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -1586,4 +1586,29 @@ public <E> BTreeStoreAndGetFuture<Boolean, E> asyncBopUpsertAndGetTrimmed(
value, attributesForCreate, transcoder);
}

@Override
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, List<Element<Object>> elements, CollectionAttributes attributesForCreate) {
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate);
}

@Override
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, Map<Long, Object> elements, CollectionAttributes attributesForCreate) {
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate);
}

@Override
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, List<Element<T>> elements,
CollectionAttributes attributesForCreate, Transcoder<T> tc) {
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate, tc);
}

@Override
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
String key, Map<Long, T> elements,
CollectionAttributes attributesForCreate, Transcoder<T> tc) {
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate, tc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public ByteBuffer getAsciiCommand() {
for (i = this.nextOpIndex; i < keySize; i++) {
Long bkey = keyList.get(i);
byte[] value = encodedList.get(i);
setArguments(bb, COMMAND, key, bkey, value.length,
setArguments(bb, getCommand(), key, bkey, value.length,
createOption, (i < keySize - 1) ? PIPE : "");
bb.put(value);
bb.put(CRLF);
Expand All @@ -223,6 +223,10 @@ public ByteBuffer getAsciiCommand() {

return bb;
}

public String getCommand() {
return COMMAND;
}
}

/**
Expand Down Expand Up @@ -274,7 +278,7 @@ public ByteBuffer getAsciiCommand() {
for (i = this.nextOpIndex; i < eSize; i++) {
Element<T> element = elements.get(i);
byte[] value = encodedList.get(i);
setArguments(bb, COMMAND, key,
setArguments(bb, getCommand(), key,
element.getStringBkey(), element.getStringEFlag(), value.length,
createOption, (i < eSize - 1) ? PIPE : "");
bb.put(value);
Expand All @@ -286,6 +290,10 @@ public ByteBuffer getAsciiCommand() {

return bb;
}

public String getCommand() {
return COMMAND;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package net.spy.memcached.collection;

import java.util.List;
import java.util.Map;

import net.spy.memcached.transcoders.Transcoder;

public abstract class CollectionPipedUpsert<T> extends CollectionPipedInsert<T> {

public CollectionPipedUpsert(String key, CollectionAttributes attribute,
Transcoder<T> tc, int itemCount) {
super(key, attribute, tc, itemCount);
}

public static class BTreePipedUpsert<T> extends BTreePipedInsert<T> {
private static final String COMMAND = "bop upsert";

public BTreePipedUpsert(String key, Map<Long, T> map,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, map, attr, tc);
}

@Override
public String getCommand() {
return COMMAND;
}
}

public static class ByteArrayBTreePipedUpsert<T> extends ByteArraysBTreePipedInsert<T> {
private static final String COMMAND = "bop upsert";

public ByteArrayBTreePipedUpsert(String key, List<Element<T>> elements,
CollectionAttributes attr, Transcoder<T> tc) {
super(key, elements, attr, tc);
}

@Override
public String getCommand() {
return COMMAND;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class CollectionPipedInsertOperationImpl extends OperationImpl
true, "CREATED_STORED", CollectionResponse.CREATED_STORED);
private static final OperationStatus STORED = new CollectionOperationStatus(
true, "STORED", CollectionResponse.STORED);
private static final OperationStatus REPLACED = new CollectionOperationStatus(
true, "REPLACED", CollectionResponse.REPLACED);
private static final OperationStatus NOT_FOUND = new CollectionOperationStatus(
false, "NOT_FOUND", CollectionResponse.NOT_FOUND);
private static final OperationStatus ELEMENT_EXISTS = new CollectionOperationStatus(
Expand Down Expand Up @@ -117,8 +119,8 @@ assert getState() == OperationState.READING

if (insert.isNotPiped()) {
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
TYPE_MISMATCH, BKEY_MISMATCH);
REPLACED, NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED,
OUT_OF_RANGE, TYPE_MISMATCH, BKEY_MISMATCH);
if (status.isSuccess()) {
cb.receivedStatus((successAll) ? END : FAILED_END);
} else {
Expand Down Expand Up @@ -157,8 +159,8 @@ assert getState() == OperationState.READING
count = Integer.parseInt(stuff[1]);
} else {
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
TYPE_MISMATCH, BKEY_MISMATCH);
REPLACED, NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED,
OUT_OF_RANGE, TYPE_MISMATCH, BKEY_MISMATCH);

if (!status.isSuccess()) {
cb.gotStatus(index, status);
Expand Down
Loading