diff --git a/spring-batch-couchbase/.gitignore b/spring-batch-couchbase/.gitignore new file mode 100644 index 00000000..96839142 --- /dev/null +++ b/spring-batch-couchbase/.gitignore @@ -0,0 +1,4 @@ +/target +.classpath +.project +.settings \ No newline at end of file diff --git a/spring-batch-couchbase/README.md b/spring-batch-couchbase/README.md new file mode 100644 index 00000000..2978f996 --- /dev/null +++ b/spring-batch-couchbase/README.md @@ -0,0 +1,65 @@ +# spring-batch-couchbase +========= +ItemReader and ItemWriter implementations for Couchbase using Spring Data Couchbase. Please read the [reference manual] for implementation details of +different methods to read/write to/from Couchbase as provided by [CouchbaseOperations] and other configuration details. + +Please read the api documentation for [Query] for configuring query parameters. + +## Configuration for reader/writer +========= +``` +@Configuration +public class ReaderWriterConfig { + + @Bean + public ItemReader couchbaseItemReader() { + + CouchbaseItemReader reader = new CouchbaseItemReader(); + reader.setCouchbaseOperations(couchbaseOperations()); + reader.setQuery(query()); + reader.setDesignDocument("designDocumentName"); + reader.setView("viewName"); + reader.setTargetType(ClassToRead.class); + reader.setPageSize(15); + return reader; + } + + @Bean + public ItemWriter couchbaseItemWriter() { + + CouchbaseItemWriter writer = new CouchbaseItemWriter(couchbaseOperations()); + + // Optional + // writer.setDelete(true|false); (defaults to false) + // writer.setOverrideDocuments(true|false); (defaults to false) + // writer.setPersistTo(ZERO|MASTER|ONE|TWO|THREE|FOUR); (defaults to PersistTo.ZERO) + // writer.setReplicateTo(ZERO|ONE|TWO|THREE); (defaults to ReplicateTo.ZERO) + + return writer; + } + + /*Optional + @Bean + */ + public Query query() { + + Query query = new Query(); + // configure the query as required (there are 17 query parameters) + + return query; + } + + @Bean + public CouchbaseOperations couchbaseOperations() { + // configure and return Couchbase template + } +} +``` + +##### NOTE +Use the pageSize attribute (inherited from AbstractPaginatedDataItemReader) to set the limit for the data returned by the query. +If the Query object's limit attribute has been set, it will be overridden by the pageSize attribute to make it consistent with the other reader behaviors. + +[reference manual]:http://docs.spring.io/spring-data/couchbase/docs/1.1.1.RELEASE/reference/html/ +[CouchbaseOperations]:http://docs.spring.io/spring-data/couchbase/docs/1.1.1.RELEASE/api/org/springframework/data/couchbase/core/CouchbaseOperations.html +[Query]:http://www.couchbase.com/autodocs/couchbase-java-client-1.4.3/index.html?com/couchbase/client/protocol/views/Query.html \ No newline at end of file diff --git a/spring-batch-couchbase/pom.xml b/spring-batch-couchbase/pom.xml new file mode 100644 index 00000000..91e8e091 --- /dev/null +++ b/spring-batch-couchbase/pom.xml @@ -0,0 +1,62 @@ + + + 4.0.0 + org.springframework.batch + spring-batch-couchbase + 1.0.0-SNAPSHOT + + + UTF-8 + 1.7 + 3.0.1.RELEASE + 1.1.1.RELEASE + 1.9.5 + 4.11 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.5.1 + + ${java.version} + ${java.version} + + + + + + + + + org.springframework.batch + spring-batch-core + ${spring-batch.version} + + + + org.springframework.data + spring-data-couchbase + ${spring-data-couchbase.version} + + + + org.mockito + mockito-all + ${mockito.verion} + test + + + + junit + junit + ${junit.verion} + test + + + + + \ No newline at end of file diff --git a/spring-batch-couchbase/src/main/java/org/springframework/batch/item/data/CouchbaseItemReader.java b/spring-batch-couchbase/src/main/java/org/springframework/batch/item/data/CouchbaseItemReader.java new file mode 100644 index 00000000..c3717d40 --- /dev/null +++ b/spring-batch-couchbase/src/main/java/org/springframework/batch/item/data/CouchbaseItemReader.java @@ -0,0 +1,168 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.item.data; + +import static org.slf4j.LoggerFactory.getLogger; +import static org.springframework.util.Assert.hasLength; +import static org.springframework.util.Assert.notNull; +import static org.springframework.util.ClassUtils.getShortName; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.slf4j.Logger; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemReader; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.data.couchbase.core.CouchbaseOperations; + +import com.couchbase.client.protocol.views.Query; +import com.couchbase.client.protocol.views.ViewResponse; +import com.couchbase.client.protocol.views.ViewRow; + +/** + *

+ * Restartable {@link ItemReader} that reads documents from Couchbase + * via a paging technique. + *

+ * + *

+ * It executes the query object {@link Query} to retrieve the requested + * documents. Additional pages are requested as needed to provide data + * when the {@link #read()} method is called. If the limit is not set on the + * {@link Query} object, the default limit will be applied as specified in + * {@link AbstractPaginatedDataItemReader#pageSize} + *

+ * + *

+ * The implementation is thread-safe between calls to + * {@link #open(ExecutionContext)}, but remember to use saveState=false + * if used in a multi-threaded client (no restart available). + *

+ * + * + * @author Hasnain Javed + * @since 3.x.x + * @param Type of item to be read + */ +public class CouchbaseItemReader extends AbstractPaginatedDataItemReader implements InitializingBean { + + private final Logger logger; + + private CouchbaseOperations couchbaseOperations; + + private Query query; + + private String designDocument; + private String view; + + private Class targetType; + + public CouchbaseItemReader() { + setName(getShortName(getClass())); + logger = getLogger(getClass()); + } + + /** + * Used to perform operations against the Couchbase instance. Also + * handles the mapping of documents to objects. + * + * @param couchbaseOperations the CouchbaseOperations instance to use + * @see CouchbaseOperations + */ + public void setCouchbaseOperations(CouchbaseOperations couchbaseOperations) { + this.couchbaseOperations = couchbaseOperations; + } + + /** + * Used to fetch documents from Couchbase. + * + * @param query the query to be executed + * @see Query + */ + public void setQuery(Query query) { + this.query = query; + } + + /** + * @param designDocument the name of the design document + */ + public void setDesignDocument(String designDocument) { + this.designDocument = designDocument; + } + + /** + * @param view the name of the view + */ + public void setView(String view) { + this.view = view; + } + + /** + * The type of object to be returned for each {@link #read()} call. + * + * @param type the type of object to return + */ + public void setTargetType(Class targetType) { + this.targetType = targetType; + } + + @Override + public void afterPropertiesSet() throws Exception { + notNull(couchbaseOperations, "A CouchbaseOperations implementation is required."); + notNull(query, "A valid query is required."); + notNull(targetType, "A target type to convert the input into is required."); + hasLength(designDocument, "A design document name is required."); + hasLength(view, "A view name is required."); + + logger.debug("setting limit on query to {}", pageSize); + query.setLimit(pageSize); + } + + @Override + @SuppressWarnings("unchecked") + protected Iterator doPageRead() { + + Iterator iterator = null; + + logger.debug("executing query {} with design document {} in view {}", query, designDocument, view); + + if(query.willReduce()) { + ViewResponse response = couchbaseOperations.queryView(designDocument, view, query); + iterator = getItems(response); + }else { + iterator = (Iterator)couchbaseOperations.findByView(designDocument, view, query, targetType).iterator(); + } + + return iterator; + } + + private Iterator getItems(ViewResponse response) { + + List items = new ArrayList(response.size()); + + for( ViewRow row : response) { + String id = row.getId(); + logger.debug("fetching document with id {}", id); + T item = couchbaseOperations.findById(id, targetType); + items.add(item); + } + + return items.iterator(); + } +} \ No newline at end of file diff --git a/spring-batch-couchbase/src/main/java/org/springframework/batch/item/data/CouchbaseItemWriter.java b/spring-batch-couchbase/src/main/java/org/springframework/batch/item/data/CouchbaseItemWriter.java new file mode 100644 index 00000000..d1e79a5d --- /dev/null +++ b/spring-batch-couchbase/src/main/java/org/springframework/batch/item/data/CouchbaseItemWriter.java @@ -0,0 +1,211 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.item.data; + +import static java.lang.String.valueOf; +import static java.util.UUID.randomUUID; +import static org.slf4j.LoggerFactory.getLogger; +import static org.springframework.transaction.support.TransactionSynchronizationManager.bindResource; +import static org.springframework.transaction.support.TransactionSynchronizationManager.getResource; +import static org.springframework.transaction.support.TransactionSynchronizationManager.hasResource; +import static org.springframework.transaction.support.TransactionSynchronizationManager.isActualTransactionActive; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; +import static org.springframework.transaction.support.TransactionSynchronizationManager.unbindResource; +import static org.springframework.util.Assert.notNull; +import static org.springframework.util.CollectionUtils.isEmpty; + +import java.util.Arrays; +import java.util.List; + +import net.spy.memcached.PersistTo; +import net.spy.memcached.ReplicateTo; + +import org.slf4j.Logger; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.data.couchbase.core.CouchbaseOperations; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; + +/** + *

+ * A {@link ItemWriter} implementation that writes to Couchbase store using an implementation of Spring Data's + * {@link CouchbaseOperations}. Couchbase is not a transactional store and does not support transactions in the + * traditional sense of the word. Couchbase is ACID compliant on a per document basis and have concurrency/locking + * controls. The strategy for writing data is similar to {@link MongoItemWriter}. + * There is no roll back if an error occurs + *

+ * + *

+ * This writer is thread-safe once all properties are set (normal singleton behavior) so it can be used in multiple + * concurrent transactions. + *

+ * + * @author Hasnain Javed + * @since 3.x.x + * @param Type of item to be written + * + */ +public class CouchbaseItemWriter implements ItemWriter, InitializingBean { + + private final String dataKey; + + private final Logger logger; + + private final CouchbaseOperations couchbaseOperations; + + private PersistTo persistTo; + private ReplicateTo replicateTo; + + private boolean delete; + private boolean overrideDocuments; + + public CouchbaseItemWriter(CouchbaseOperations couchbaseOperations) { + super(); + dataKey = valueOf(randomUUID()); + logger = getLogger(getClass()); + persistTo = PersistTo.ZERO; + replicateTo = ReplicateTo.ZERO; + delete = false; + overrideDocuments = false; + this.couchbaseOperations = couchbaseOperations; + } + + @Override + public void afterPropertiesSet() throws Exception { + notNull(couchbaseOperations, "A CouchbaseOperations implementation is required."); + notNull(persistTo, "A valid constant value is required for persistTo property. Allowed values are ". + concat(Arrays.toString(PersistTo.values()))); + notNull(replicateTo, "A valid constant value is required for replicateTo property. Allowed values are ". + concat(Arrays.toString(ReplicateTo.values()))); + } + + /** + * A flag for removing items given to the writer. Default value is set to false indicating that the + * items will not be removed. + * + * @param delete flag + */ + public void setDelete(boolean delete) { + this.delete = delete; + } + + /** + * A flag for overriding existing items given to the writer. Default value is set to false indicating + * that the items will be inserted. otherwise, the items will be overridden. + * + * @param override flag + */ + public void setOverrideDocuments(boolean overrideDocuments) { + this.overrideDocuments = overrideDocuments; + } + + /** + * Specifies value for persisting to node(s). + * Defaults to ZERO + * @param persistTo enum value + * @see PersistTo + */ + public void setPersistTo(PersistTo persistTo) { + this.persistTo = persistTo; + } + + /** + * Specifies value for replicating to node(s). + * Defaults to ZERO + * @param replicateTo enum value + * @see ReplicateTo + */ + public void setReplicateTo(ReplicateTo replicateTo) { + this.replicateTo = replicateTo; + } + + @Override + public void write(List items) throws Exception { + + if(isActualTransactionActive()) { + bufferItems(items); + }else { + writeItems(items); + } + } + + @SuppressWarnings("unchecked") + private void bufferItems(List items) { + + if(hasResource(dataKey)) { + logger.debug("appending items to buffer under key {}", dataKey); + List buffer = (List) getResource(dataKey); + buffer.addAll(items); + }else { + logger.debug("adding items to buffer under key {}", dataKey); + bindResource(dataKey, items); + registerSynchronization(new TransactionSynchronizationCallbackImpl()); + } + } + + /** + * Writes to Couchbase via the template. + * This can be overridden by a subclass if required. + * + * @param items the list of items to be added/removed to/from bucket. + */ + protected void writeItems(List items) { + + if(isEmpty(items)) { + logger.warn("no items to write to couchbase. list is empty or null"); + }else { + for(T item : items) { + if(delete) { + logger.debug("deleting item {}", item); + couchbaseOperations.remove(item, persistTo, replicateTo); + }else if(overrideDocuments) { + logger.debug("overriding item {}", item); + couchbaseOperations.save(item, persistTo, replicateTo); + }else { + logger.debug("inserting item {}", item); + couchbaseOperations.insert(item, persistTo, replicateTo); + } + } + } + } + + private class TransactionSynchronizationCallbackImpl extends TransactionSynchronizationAdapter { + @SuppressWarnings("unchecked") + @Override + public void beforeCommit(boolean readOnly) { + + List items = (List) getResource(dataKey); + if(!isEmpty(items)) { + if(!readOnly) { + writeItems(items); + }else{ + logger.warn("can not write items to couchbase as transaction is read only"); + } + }else { + logger.warn("no items to write to couchbase. list is empty or null"); + } + } + + @Override + public void afterCompletion(int status) { + if(hasResource(dataKey)) { + logger.debug("removing items from buffer under key {}", dataKey); + unbindResource(dataKey); + } + } + } +} \ No newline at end of file diff --git a/spring-batch-couchbase/src/test/java/org/springframework/batch/item/data/CouchbaseItemReaderTest.java b/spring-batch-couchbase/src/test/java/org/springframework/batch/item/data/CouchbaseItemReaderTest.java new file mode 100644 index 00000000..0095616f --- /dev/null +++ b/spring-batch-couchbase/src/test/java/org/springframework/batch/item/data/CouchbaseItemReaderTest.java @@ -0,0 +1,205 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.item.data; + +import static java.util.Arrays.asList; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import java.util.List; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.springframework.data.couchbase.core.CouchbaseOperations; + +import com.couchbase.client.protocol.views.Query; +import com.couchbase.client.protocol.views.ViewResponse; +import com.couchbase.client.protocol.views.ViewRow; + +public class CouchbaseItemReaderTest { + + @Mock + private CouchbaseOperations couchbaseOperations; + + private CouchbaseItemReader reader; + + private Query query; + + private String view; + private String designDocument; + + private Class targetType; + + @Before + public void setUp() throws Exception { + + initMocks(this); + + view = "testView"; + designDocument = "testDocument"; + targetType = Object.class; + query = new Query(); + reader = new CouchbaseItemReader<>(); + reader.setCouchbaseOperations(couchbaseOperations); + reader.setDesignDocument(designDocument); + reader.setView(view); + reader.setQuery(query); + reader.setTargetType(targetType); + reader.afterPropertiesSet(); + } + + @After + public void tearDown() { + designDocument = null; + view = null; + targetType = null; + query = null; + couchbaseOperations = null; + reader = null; + } + + @Test(expected=IllegalArgumentException.class) + public void shouldFailCouchbaseOperationsAssertion() throws Exception { + + try { + new CouchbaseItemReader<>().afterPropertiesSet(); + fail("Assertion should have thrown exception on null CouchbaseOperations"); + }catch(IllegalArgumentException e) { + assertEquals("A CouchbaseOperations implementation is required.", e.getMessage()); + throw e; + }catch (Exception e) { + fail("unexpected error occurred"); + } + } + + @Test(expected=IllegalArgumentException.class) + public void shouldFailQueryAssertion() throws Exception { + + try { + reader.setQuery(null); + reader.afterPropertiesSet(); + fail("Assertion should have thrown exception on null query"); + }catch(IllegalArgumentException e) { + assertEquals("A valid query is required.", e.getMessage()); + throw e; + }catch (Exception e) { + fail("unexpected error occurred"); + } + } + + @Test(expected=IllegalArgumentException.class) + public void shouldFailTargetTypeAssertion() throws Exception { + + try { + reader.setTargetType(null); + reader.afterPropertiesSet(); + fail("Assertion should have thrown exception on null target type"); + }catch(IllegalArgumentException e) { + assertEquals("A target type to convert the input into is required.", e.getMessage()); + throw e; + }catch (Exception e) { + fail("unexpected error occurred"); + } + } + + @Test(expected=IllegalArgumentException.class) + public void shouldFailDesignDocumentAssertion() throws Exception { + + try { + reader.setDesignDocument(null); + reader.afterPropertiesSet(); + fail("Assertion should have thrown exception on null design document name"); + }catch(IllegalArgumentException e) { + assertEquals("A design document name is required.", e.getMessage()); + throw e; + }catch (Exception e) { + fail("unexpected error occurred"); + } + } + + @Test(expected=IllegalArgumentException.class) + public void shouldFailViewAssertion() throws Exception { + + try { + reader.setView(null); + reader.afterPropertiesSet(); + fail("Assertion should have thrown exception on null view name"); + }catch(IllegalArgumentException e) { + assertEquals("A view name is required.", e.getMessage()); + throw e; + }catch (Exception e) { + fail("unexpected error occurred"); + } + } + + @Test + public void shouldSetDefaultLimitOnQuery() throws Exception { + + Query query = new Query(); + + assertThat(query.getLimit(), equalTo(-1)); + + reader.setQuery(query); + reader.afterPropertiesSet(); + + assertThat(query.getLimit(), equalTo(reader.pageSize)); + } + + @Test + public void shouldRunReducedQuery() { + + String documentId = "111"; + + query.setReduce(true); + + ViewResponse viewResponseMock = mock(ViewResponse.class); + ViewRow viewRowMock = mock(ViewRow.class); + + List rows = asList(viewRowMock); + + when(couchbaseOperations.queryView(designDocument, view, query)).thenReturn(viewResponseMock); + when(viewResponseMock.size()).thenReturn(1); + when(viewResponseMock.iterator()).thenReturn(rows.iterator()); + when(viewRowMock.getId()).thenReturn(documentId); + when(couchbaseOperations.findById(documentId, targetType)).thenReturn(new Object()); + + reader.doPageRead(); + + verify(couchbaseOperations).queryView(designDocument, view, query); + verify(viewResponseMock).size(); + verify(viewRowMock).getId(); + verify(couchbaseOperations).findById(documentId, targetType); + } + + @Test + public void shouldRunQuery() { + + when(couchbaseOperations.findByView(designDocument, view, query, targetType)).thenReturn(asList()); + + reader.doPageRead(); + + verify(couchbaseOperations).findByView(designDocument, view, query, targetType); + } +} \ No newline at end of file diff --git a/spring-batch-couchbase/src/test/java/org/springframework/batch/item/data/CouchbaseItemWriterTest.java b/spring-batch-couchbase/src/test/java/org/springframework/batch/item/data/CouchbaseItemWriterTest.java new file mode 100644 index 00000000..e1b2aa07 --- /dev/null +++ b/spring-batch-couchbase/src/test/java/org/springframework/batch/item/data/CouchbaseItemWriterTest.java @@ -0,0 +1,318 @@ +package org.springframework.batch.item.data; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.MockitoAnnotations.initMocks; + +import java.util.Arrays; +import java.util.List; + +import net.spy.memcached.PersistTo; +import net.spy.memcached.ReplicateTo; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.springframework.batch.support.transaction.ResourcelessTransactionManager; +import org.springframework.data.couchbase.core.CouchbaseOperations; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionTemplate; + +public class CouchbaseItemWriterTest { + + private CouchbaseItemWriter writer; + + @Mock + private CouchbaseOperations couchbaseOperations; + + private TransactionTemplate transactionTemplate; + + @Before + public void setUp() throws Exception { + initMocks(this); + transactionTemplate = new TransactionTemplate(new ResourcelessTransactionManager()); + writer = new CouchbaseItemWriter<>(couchbaseOperations); + writer.afterPropertiesSet(); + } + + @After + public void tearDown() { + transactionTemplate = null; + writer = null; + } + + @Test(expected=IllegalArgumentException.class) + public void shouldFailCouchbaseOperationsAssertion() throws Exception { + + try { + new CouchbaseItemWriter<>(null).afterPropertiesSet(); + fail("Assertion should have thrown exception on null CouchbaseOperations"); + }catch(IllegalArgumentException e) { + assertEquals("A CouchbaseOperations implementation is required.", e.getMessage()); + throw e; + } + } + + @Test(expected=IllegalArgumentException.class) + public void shouldFailPersistToAssertion() throws Exception { + + try { + CouchbaseItemWriter writer = new CouchbaseItemWriter<>(couchbaseOperations); + writer.setPersistTo(null); + writer.afterPropertiesSet(); + fail("Assertion should have thrown exception on null PersistTo enum constant"); + }catch(IllegalArgumentException e) { + assertEquals("A valid constant value is required for persistTo property. Allowed values are ". + concat(Arrays.toString(PersistTo.values())), e.getMessage()); + throw e; + }catch (Exception e) { + fail("unexpected error occurred"); + } + } + + @Test(expected=IllegalArgumentException.class) + public void shouldFailReplicateToAssertion() throws Exception { + + try { + CouchbaseItemWriter writer = new CouchbaseItemWriter<>(couchbaseOperations); + writer.setReplicateTo(null); + writer.afterPropertiesSet(); + fail("Assertion should have thrown exception on null ReplicateTo enum constant"); + }catch(IllegalArgumentException e) { + assertEquals("A valid constant value is required for replicateTo property. Allowed values are ". + concat(Arrays.toString(ReplicateTo.values())), e.getMessage()); + throw e; + }catch (Exception e) { + fail("unexpected error occurred"); + } + } + + @Test + public void shouldNotWriteWhenNoTransactionIsActiveAndNoItem() throws Exception { + + writer.write(null); + verifyZeroInteractions(couchbaseOperations); + + writer.write(asList()); + verifyZeroInteractions(couchbaseOperations); + } + + @Test + public void shouldInsertItemWhenNoTransactionIsActive() throws Exception { + + List items = asList(new Object()); + + writer.write(items); + + verify(couchbaseOperations).insert(items.iterator().next(), PersistTo.ZERO, ReplicateTo.ZERO); + } + + @Test + public void shouldOverrideItemWhenNoTransactionIsActive() throws Exception { + + List items = asList(new Object()); + + writer.setOverrideDocuments(true); + writer.write(items); + + verify(couchbaseOperations).save(items.iterator().next(), PersistTo.ZERO, ReplicateTo.ZERO); + } + + @Test + public void shouldInsertItemWhenInTransaction() throws Exception { + + final List items = asList(new Object()); + + transactionTemplate.execute(new TransactionCallback() { + + @Override + public Void doInTransaction(TransactionStatus status) { + try { + writer.write(items); + } catch (Exception e) { + fail("An error occurred while writing: " + e.getMessage()); + } + + return null; + } + }); + + verify(couchbaseOperations).insert(items.iterator().next(), PersistTo.ZERO, ReplicateTo.ZERO); + } + + @Test + public void shouldOverrideItemWhenInTransaction() throws Exception { + + final List items = asList(new Object()); + + writer.setOverrideDocuments(true); + + transactionTemplate.execute(new TransactionCallback() { + + @Override + public Void doInTransaction(TransactionStatus status) { + try { + writer.write(items); + } catch (Exception e) { + fail("An error occurred while writing: " + e.getMessage()); + } + + return null; + } + }); + + verify(couchbaseOperations).save(items.iterator().next(), PersistTo.ZERO, ReplicateTo.ZERO); + } + + @Test + public void shouldNotInsertItemWhenTransactionFails() throws Exception { + + final List items = asList(new Object()); + + try { + transactionTemplate.execute(new TransactionCallback() { + + @Override + public Void doInTransaction(TransactionStatus status) { + try { + writer.write(items); + } catch (Exception ignore) { + fail("unexpected error occurred"); + } + throw new RuntimeException("rollback"); + } + }); + } catch (RuntimeException re) { + // ignore + } catch (Throwable t) { + fail("Unexpected error occurred"); + } + + verifyZeroInteractions(couchbaseOperations); + } + + @Test + public void shouldNotOverrideItemWhenTransactionFails() throws Exception { + + final List items = asList(new Object()); + + writer.setOverrideDocuments(true); + + try { + transactionTemplate.execute(new TransactionCallback() { + + @Override + public Void doInTransaction(TransactionStatus status) { + try { + writer.write(items); + } catch (Exception ignore) { + fail("unexpected error occurred"); + } + throw new RuntimeException("rollback"); + } + }); + } catch (RuntimeException re) { + // ignore + } catch (Throwable t) { + fail("Unexpected error occurred"); + } + + verifyZeroInteractions(couchbaseOperations); + } + + @Test + public void shouldNotInsertItemWhenTransactionIsReadOnly() throws Exception { + + final List items = asList(new Object()); + + try { + + transactionTemplate.setReadOnly(true); + transactionTemplate.execute(new TransactionCallback() { + + @Override + public Void doInTransaction(TransactionStatus status) { + try { + writer.write(items); + } catch (Exception ignore) { + fail("unexpected error occurred"); + } + return null; + } + }); + } catch (Throwable t) { + fail("unexpected error occurred"); + } + + verifyZeroInteractions(couchbaseOperations); + } + + @Test + public void shouldNotOverrideItemWhenTransactionIsReadOnly() throws Exception { + + final List items = asList(new Object()); + + writer.setOverrideDocuments(true); + + try { + + transactionTemplate.setReadOnly(true); + transactionTemplate.execute(new TransactionCallback() { + + @Override + public Void doInTransaction(TransactionStatus status) { + try { + writer.write(items); + } catch (Exception ignore) { + fail("unexpected error occurred"); + } + return null; + } + }); + } catch (Throwable t) { + fail("unexpected error occurred"); + } + + verifyZeroInteractions(couchbaseOperations); + } + + @Test + public void shouldRemoveItemWhenNoTransactionIsActive() throws Exception { + + final List items = asList(new Object()); + + writer.setDelete(true); + writer.write(items); + + verify(couchbaseOperations).remove(items.iterator().next(), PersistTo.ZERO, ReplicateTo.ZERO); + } + + @Test + public void shouldRemoveItemWhenInTransaction() throws Exception { + + final List items = asList(new Object()); + + writer.setDelete(true); + + transactionTemplate.execute(new TransactionCallback() { + + @Override + public Void doInTransaction(TransactionStatus status) { + try { + writer.write(items); + } catch (Exception e) { + fail("An error occurred while writing: " + e.getMessage()); + } + + return null; + } + }); + + verify(couchbaseOperations).remove(items.iterator().next(), PersistTo.ZERO, ReplicateTo.ZERO); + } +} \ No newline at end of file