diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchRegistrar.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchRegistrar.java index 81451e134f..e0f42d66ab 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchRegistrar.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchRegistrar.java @@ -42,6 +42,7 @@ * Batch in a declarative way through {@link EnableBatchProcessing}. * * @author Mahmoud Ben Hassine + * @author Myeongha Shin * @since 5.0 * @see EnableBatchProcessing */ @@ -185,6 +186,11 @@ private void registerMongoJobRepository(BeanDefinitionRegistry registry, beanDefinitionBuilder.addPropertyValue("isolationLevelForCreate", isolationLevelForCreate); } + String collectionPrefix = mongoJobRepositoryAnnotation.collectionPrefix(); + if (collectionPrefix != null) { + beanDefinitionBuilder.addPropertyValue("collectionPrefix", collectionPrefix); + } + String jobKeyGeneratorRef = mongoJobRepositoryAnnotation.jobKeyGeneratorRef(); if (registry.containsBeanDefinition(jobKeyGeneratorRef)) { beanDefinitionBuilder.addPropertyReference("jobKeyGenerator", jobKeyGeneratorRef); diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableMongoJobRepository.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableMongoJobRepository.java index 922bca0192..e5f48163d8 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableMongoJobRepository.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableMongoJobRepository.java @@ -98,4 +98,10 @@ */ String stepExecutionIncrementerRef() default "stepExecutionIncrementer"; + /** + * Set the prefix for MongoDB collection names. Defaults to {@literal BATCH_}. + * @return the collection prefix to use + */ + String collectionPrefix() default "BATCH_"; + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java index 1f28cd73eb..282619724d 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java @@ -159,7 +159,7 @@ protected JobKeyGenerator getJobKeyGenerator() { * @since 6.0 */ protected DataFieldMaxValueIncrementer getJobInstanceIncrementer() { - return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_JOB_INSTANCE_SEQ"); + return new MongoSequenceIncrementer(getMongoOperations(), "JOB_INSTANCE_SEQ", "BATCH_"); } /** @@ -168,7 +168,7 @@ protected DataFieldMaxValueIncrementer getJobInstanceIncrementer() { * @since 6.0 */ protected DataFieldMaxValueIncrementer getJobExecutionIncrementer() { - return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_JOB_EXECUTION_SEQ"); + return new MongoSequenceIncrementer(getMongoOperations(), "JOB_EXECUTION_SEQ", "BATCH_"); } /** @@ -177,7 +177,7 @@ protected DataFieldMaxValueIncrementer getJobExecutionIncrementer() { * @since 6.0 */ protected DataFieldMaxValueIncrementer getStepExecutionIncrementer() { - return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_STEP_EXECUTION_SEQ"); + return new MongoSequenceIncrementer(getMongoOperations(), "STEP_EXECUTION_SEQ", "BATCH_"); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoExecutionContextDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoExecutionContextDao.java index 70b2c95b78..9c7ea13a91 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoExecutionContextDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoExecutionContextDao.java @@ -25,24 +25,34 @@ import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; +import org.springframework.util.Assert; import static org.springframework.data.mongodb.core.query.Criteria.where; import static org.springframework.data.mongodb.core.query.Query.query; /** * @author Mahmoud Ben Hassine + * @author Myeongha Shin * @since 5.2.0 */ public class MongoExecutionContextDao implements ExecutionContextDao { - private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION"; + private static final String STEP_EXECUTIONS_COLLECTION_NAME = "STEP_EXECUTION"; - private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; + private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION"; private final MongoOperations mongoOperations; - public MongoExecutionContextDao(MongoOperations mongoOperations) { + private final String stepExecutionCollectionName; + + private final String jobExecutionCollectionName; + + public MongoExecutionContextDao(MongoOperations mongoOperations, String collectionPrefix) { + Assert.notNull(mongoOperations, "mongoOperations must not be null."); + Assert.notNull(collectionPrefix, "collectionPrefix must not be null."); this.mongoOperations = mongoOperations; + this.stepExecutionCollectionName = collectionPrefix + STEP_EXECUTIONS_COLLECTION_NAME; + this.jobExecutionCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME; } @Override @@ -50,7 +60,7 @@ public ExecutionContext getExecutionContext(JobExecution jobExecution) { Query query = query(where("jobExecutionId").is(jobExecution.getId())); org.springframework.batch.core.repository.persistence.JobExecution execution = this.mongoOperations.findOne( query, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + jobExecutionCollectionName); if (execution == null) { return new ExecutionContext(); } @@ -62,7 +72,7 @@ public ExecutionContext getExecutionContext(StepExecution stepExecution) { Query query = query(where("stepExecutionId").is(stepExecution.getId())); org.springframework.batch.core.repository.persistence.StepExecution execution = this.mongoOperations.findOne( query, org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); + stepExecutionCollectionName); if (execution == null) { return new ExecutionContext(); } @@ -78,8 +88,7 @@ public void saveExecutionContext(JobExecution jobExecution) { new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(), executionContext.isDirty())); this.mongoOperations.updateFirst(query, update, - org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + org.springframework.batch.core.repository.persistence.JobExecution.class, jobExecutionCollectionName); } @Override @@ -91,8 +100,7 @@ public void saveExecutionContext(StepExecution stepExecution) { new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(), executionContext.isDirty())); this.mongoOperations.updateFirst(query, update, - org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); + org.springframework.batch.core.repository.persistence.StepExecution.class, stepExecutionCollectionName); } @@ -119,7 +127,7 @@ public void deleteExecutionContext(JobExecution jobExecution) { org.springframework.batch.core.repository.persistence.ExecutionContext executionContext = new org.springframework.batch.core.repository.persistence.ExecutionContext( Collections.emptyMap(), false); Update executionContextRemovalUpdate = new Update().set("executionContext", executionContext); - this.mongoOperations.updateFirst(query, executionContextRemovalUpdate, JOB_EXECUTIONS_COLLECTION_NAME); + this.mongoOperations.updateFirst(query, executionContextRemovalUpdate, this.jobExecutionCollectionName); } @Override @@ -128,7 +136,7 @@ public void deleteExecutionContext(StepExecution stepExecution) { org.springframework.batch.core.repository.persistence.ExecutionContext executionContext = new org.springframework.batch.core.repository.persistence.ExecutionContext( Collections.emptyMap(), false); Update executionContextRemovalUpdate = new Update().set("executionContext", executionContext); - this.mongoOperations.updateFirst(query, executionContextRemovalUpdate, STEP_EXECUTIONS_COLLECTION_NAME); + this.mongoOperations.updateFirst(query, executionContextRemovalUpdate, this.stepExecutionCollectionName); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java index e0630659ab..d8d749d3a6 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java @@ -44,25 +44,30 @@ /** * @author Mahmoud Ben Hassine * @author Yanming Zhou + * @author Myeongha Shin * @since 5.2.0 */ public class MongoJobExecutionDao implements JobExecutionDao { - private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; + private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION"; - private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "BATCH_JOB_EXECUTION_SEQ"; + private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "JOB_EXECUTION_SEQ"; private final MongoOperations mongoOperations; + private final String jobExecutionsCollectionName; + private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter(); private DataFieldMaxValueIncrementer jobExecutionIncrementer; private MongoJobInstanceDao jobInstanceDao; - public MongoJobExecutionDao(MongoOperations mongoOperations) { + public MongoJobExecutionDao(MongoOperations mongoOperations, String collectionPrefix) { this.mongoOperations = mongoOperations; - this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME); + this.jobExecutionsCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME; + this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME, + collectionPrefix); } public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) { @@ -79,7 +84,7 @@ public JobExecution createJobExecution(JobInstance jobInstance, JobParameters jo org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToSave = this.jobExecutionConverter .fromJobExecution(jobExecution); - this.mongoOperations.insert(jobExecutionToSave, JOB_EXECUTIONS_COLLECTION_NAME); + this.mongoOperations.insert(jobExecutionToSave, jobExecutionsCollectionName); return jobExecution; } @@ -89,7 +94,7 @@ public void updateJobExecution(JobExecution jobExecution) { Query query = query(where("jobExecutionId").is(jobExecution.getId())); org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToUpdate = this.jobExecutionConverter .fromJobExecution(jobExecution); - this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME); + this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, jobExecutionsCollectionName); } @Override @@ -98,7 +103,7 @@ public List findJobExecutions(JobInstance jobInstance) { .with(Sort.by(Sort.Direction.DESC, "jobExecutionId")); List jobExecutions = this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + jobExecutionsCollectionName); return jobExecutions.stream().map(jobExecution -> convert(jobExecution, jobInstance)).toList(); } @@ -108,8 +113,7 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) { Sort.Order sortOrder = Sort.Order.desc("jobExecutionId"); org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne( query.with(Sort.by(sortOrder)), - org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + org.springframework.batch.core.repository.persistence.JobExecution.class, jobExecutionsCollectionName); return jobExecution != null ? convert(jobExecution, jobInstance) : null; } @@ -122,7 +126,7 @@ public Set findRunningJobExecutions(String jobName) { where("jobInstanceId").is(jobInstance.getId()).and("status").in("STARTING", "STARTED", "STOPPING")); this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME) + jobExecutionsCollectionName) .stream() .map(jobExecution -> convert(jobExecution, jobInstance)) .forEach(runningJobExecutions::add); @@ -135,7 +139,7 @@ public JobExecution getJobExecution(long executionId) { Query jobExecutionQuery = query(where("jobExecutionId").is(executionId)); org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne( jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + jobExecutionsCollectionName); if (jobExecution == null) { return null; } @@ -158,7 +162,7 @@ public void synchronizeStatus(JobExecution jobExecution) { @Override public void deleteJobExecution(JobExecution jobExecution) { this.mongoOperations.remove(query(where("jobExecutionId").is(jobExecution.getId())), - JOB_EXECUTIONS_COLLECTION_NAME); + this.jobExecutionsCollectionName); } @@ -166,7 +170,7 @@ public void deleteJobExecution(JobExecution jobExecution) { public void deleteJobExecutionParameters(JobExecution jobExecution) { Query query = new Query(where("jobExecutionId").is(jobExecution.getId())); Update jobParametersRemovalUpdate = new Update().set("jobParameters", Collections.emptyList()); - this.mongoOperations.updateFirst(query, jobParametersRemovalUpdate, JOB_EXECUTIONS_COLLECTION_NAME); + this.mongoOperations.updateFirst(query, jobParametersRemovalUpdate, this.jobExecutionsCollectionName); } private JobExecution convert(org.springframework.batch.core.repository.persistence.JobExecution jobExecution, diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java index 760bdbba55..6de0bc2044 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java @@ -38,26 +38,31 @@ /** * @author Mahmoud Ben Hassine * @author Yanming Zhou + * @author Myeongha Shin * @since 5.2.0 */ public class MongoJobInstanceDao implements JobInstanceDao { - private static final String COLLECTION_NAME = "BATCH_JOB_INSTANCE"; + private static final String COLLECTION_NAME = "JOB_INSTANCE"; - private static final String SEQUENCE_NAME = "BATCH_JOB_INSTANCE_SEQ"; + private static final String SEQUENCE_NAME = "JOB_INSTANCE_SEQ"; private final MongoOperations mongoOperations; + private final String collectionName; + private DataFieldMaxValueIncrementer jobInstanceIncrementer; private JobKeyGenerator jobKeyGenerator = new DefaultJobKeyGenerator(); private final JobInstanceConverter jobInstanceConverter = new JobInstanceConverter(); - public MongoJobInstanceDao(MongoOperations mongoOperations) { + public MongoJobInstanceDao(MongoOperations mongoOperations, String collectionPrefix) { Assert.notNull(mongoOperations, "mongoOperations must not be null."); + Assert.notNull(collectionPrefix, "collectionPrefix must not be null."); this.mongoOperations = mongoOperations; - this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, SEQUENCE_NAME); + this.collectionName = collectionPrefix + COLLECTION_NAME; + this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, SEQUENCE_NAME, collectionPrefix); } public void setJobKeyGenerator(JobKeyGenerator jobKeyGenerator) { @@ -81,7 +86,7 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters jobInstanceToSave.setJobKey(key); long instanceId = jobInstanceIncrementer.nextLongValue(); jobInstanceToSave.setJobInstanceId(instanceId); - this.mongoOperations.insert(jobInstanceToSave, COLLECTION_NAME); + this.mongoOperations.insert(jobInstanceToSave, this.collectionName); JobInstance jobInstance = new JobInstance(instanceId, jobName); jobInstance.incrementVersion(); // TODO is this needed? @@ -92,16 +97,16 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters public JobInstance getJobInstance(String jobName, JobParameters jobParameters) { String key = this.jobKeyGenerator.generateKey(jobParameters); Query query = query(where("jobName").is(jobName).and("jobKey").is(key)); - org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations - .findOne(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME); + org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne( + query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName); return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null; } @Override public JobInstance getJobInstance(long instanceId) { Query query = query(where("jobInstanceId").is(instanceId)); - org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations - .findOne(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME); + org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne( + query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName); return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null; } @@ -116,7 +121,7 @@ public List getJobInstances(String jobName, int start, int count) { Sort.Order sortOrder = Sort.Order.desc("jobInstanceId"); List jobInstances = this.mongoOperations .find(query.with(Sort.by(sortOrder)), - org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME) + org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName) .stream() .toList(); if (jobInstances.size() <= start) { @@ -139,7 +144,7 @@ public List getJobInstances(String jobName, int start, int count) { public List getJobInstances(String jobName) { Query query = query(where("jobName").is(jobName)); return this.mongoOperations - .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME) + .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName) .stream() .map(this.jobInstanceConverter::toJobInstance) .toList(); @@ -149,7 +154,7 @@ public List getJobInstances(String jobName) { public List getJobInstanceIds(String jobName) { Query query = query(where("jobName").is(jobName)); return this.mongoOperations - .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME) + .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName) .stream() .map(org.springframework.batch.core.repository.persistence.JobInstance::getJobInstanceId) .toList(); @@ -158,7 +163,7 @@ public List getJobInstanceIds(String jobName) { public List findJobInstancesByName(String jobName) { Query query = query(where("jobName").is(jobName)); return this.mongoOperations - .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME) + .find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName) .stream() .map(this.jobInstanceConverter::toJobInstance) .toList(); @@ -170,14 +175,14 @@ public JobInstance getLastJobInstance(String jobName) { Sort.Order sortOrder = Sort.Order.desc("jobInstanceId"); org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne( query.with(Sort.by(sortOrder)), org.springframework.batch.core.repository.persistence.JobInstance.class, - COLLECTION_NAME); + this.collectionName); return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null; } @Override public List getJobNames() { return this.mongoOperations - .findAll(org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME) + .findAll(org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName) .stream() .map(org.springframework.batch.core.repository.persistence.JobInstance::getJobName) .toList(); @@ -200,12 +205,12 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException { throw new NoSuchJobException("Job not found " + jobName); } Query query = query(where("jobName").is(jobName)); - return this.mongoOperations.count(query, COLLECTION_NAME); + return this.mongoOperations.count(query, this.collectionName); } @Override public void deleteJobInstance(JobInstance jobInstance) { - this.mongoOperations.remove(query(where("jobInstanceId").is(jobInstance.getId())), COLLECTION_NAME); + this.mongoOperations.remove(query(where("jobInstanceId").is(jobInstance.getId())), this.collectionName); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java index 9722db637f..ca27fc22d6 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoSequenceIncrementer.java @@ -33,18 +33,27 @@ */ public class MongoSequenceIncrementer implements DataFieldMaxValueIncrementer { + private static final String SEQUENCES_COLLECTION_NAME = "SEQUENCES"; + private final MongoOperations mongoTemplate; private final String sequenceName; + private final String sequencesCollectionName; + public MongoSequenceIncrementer(MongoOperations mongoTemplate, String sequenceName) { + this(mongoTemplate, sequenceName, "BATCH_"); + } + + public MongoSequenceIncrementer(MongoOperations mongoTemplate, String sequenceName, String collectionPrefix) { this.mongoTemplate = mongoTemplate; - this.sequenceName = sequenceName; + this.sequencesCollectionName = collectionPrefix + SEQUENCES_COLLECTION_NAME; + this.sequenceName = collectionPrefix + sequenceName; } @Override public long nextLongValue() throws DataAccessException { - return mongoTemplate.execute("BATCH_SEQUENCES", + return mongoTemplate.execute(sequencesCollectionName, collection -> collection .findOneAndUpdate(new Document("_id", sequenceName), new Document("$inc", new Document("count", 1)), new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java index f203f3184a..99a18e62c3 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java @@ -37,15 +37,20 @@ /** * @author Mahmoud Ben Hassine * @author Yanming Zhou + * @author Myeongha Shin * @since 5.2.0 */ public class MongoStepExecutionDao implements StepExecutionDao { - private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION"; + private static final String STEP_EXECUTIONS_COLLECTION_NAME = "STEP_EXECUTION"; - private static final String STEP_EXECUTIONS_SEQUENCE_NAME = "BATCH_STEP_EXECUTION_SEQ"; + private static final String STEP_EXECUTIONS_SEQUENCE_NAME = "STEP_EXECUTION_SEQ"; - private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION"; + private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION"; + + private final String stepExecutionCollectionName; + + private final String jobExecutionCollectionName; private final StepExecutionConverter stepExecutionConverter = new StepExecutionConverter(); @@ -57,9 +62,12 @@ public class MongoStepExecutionDao implements StepExecutionDao { MongoJobExecutionDao jobExecutionDao; - public MongoStepExecutionDao(MongoOperations mongoOperations) { + public MongoStepExecutionDao(MongoOperations mongoOperations, String collectionPrefix) { this.mongoOperations = mongoOperations; - this.stepExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, STEP_EXECUTIONS_SEQUENCE_NAME); + this.stepExecutionCollectionName = collectionPrefix + STEP_EXECUTIONS_COLLECTION_NAME; + this.jobExecutionCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME; + this.stepExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, STEP_EXECUTIONS_SEQUENCE_NAME, + collectionPrefix); } public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecutionIncrementer) { @@ -76,7 +84,7 @@ public StepExecution createStepExecution(String stepName, JobExecution jobExecut StepExecution stepExecution = new StepExecution(id, stepName, jobExecution); org.springframework.batch.core.repository.persistence.StepExecution stepExecutionToSave = this.stepExecutionConverter .fromStepExecution(stepExecution); - this.mongoOperations.insert(stepExecutionToSave, STEP_EXECUTIONS_COLLECTION_NAME); + this.mongoOperations.insert(stepExecutionToSave, stepExecutionCollectionName); return stepExecution; } @@ -86,7 +94,7 @@ public void updateStepExecution(StepExecution stepExecution) { Query query = query(where("stepExecutionId").is(stepExecution.getId())); org.springframework.batch.core.repository.persistence.StepExecution stepExecutionToUpdate = this.stepExecutionConverter .fromStepExecution(stepExecution); - this.mongoOperations.findAndReplace(query, stepExecutionToUpdate, STEP_EXECUTIONS_COLLECTION_NAME); + this.mongoOperations.findAndReplace(query, stepExecutionToUpdate, this.stepExecutionCollectionName); } @Nullable @@ -95,7 +103,7 @@ public StepExecution getStepExecution(long stepExecutionId) { Query query = query(where("stepExecutionId").is(stepExecutionId)); org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations .findOne(query, org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); + this.stepExecutionCollectionName); return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecutionDao.getJobExecution(stepExecution.getJobExecutionId())) : null; } @@ -106,7 +114,7 @@ public StepExecution getStepExecution(JobExecution jobExecution, long stepExecut Query query = query(where("stepExecutionId").is(stepExecutionId)); org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations .findOne(query, org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); + stepExecutionCollectionName); return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution) : null; } @@ -118,12 +126,12 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa Query query = query(where("jobInstanceId").is(jobInstance.getId())); List jobExecutions = this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + jobExecutionCollectionName); List stepExecutions = this.mongoOperations .find(query(where("jobExecutionId").in(jobExecutions.stream() .map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId) .toList())), org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); + stepExecutionCollectionName); // sort step executions by creation date then id (see contract) and return the // last one Optional lastStepExecution = stepExecutions @@ -157,7 +165,7 @@ public List getStepExecutions(JobExecution jobExecution) { Query query = query(where("jobExecutionId").is(jobExecution.getId())); return this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME) + stepExecutionCollectionName) .stream() .map(stepExecution -> this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution)) .toList(); @@ -168,19 +176,19 @@ public long countStepExecutions(JobInstance jobInstance, String stepName) { Query query = query(where("jobInstanceId").is(jobInstance.getId())); List jobExecutions = this.mongoOperations .find(query, org.springframework.batch.core.repository.persistence.JobExecution.class, - JOB_EXECUTIONS_COLLECTION_NAME); + jobExecutionCollectionName); return this.mongoOperations.count( query(where("jobExecutionId").in(jobExecutions.stream() .map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId) .toList())), org.springframework.batch.core.repository.persistence.StepExecution.class, - STEP_EXECUTIONS_COLLECTION_NAME); + stepExecutionCollectionName); } @Override public void deleteStepExecution(StepExecution stepExecution) { this.mongoOperations.remove(query(where("stepExecutionId").is(stepExecution.getId())), - STEP_EXECUTIONS_COLLECTION_NAME); + stepExecutionCollectionName); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/MongoJobExplorerFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/MongoJobExplorerFactoryBean.java index 13997a0fd7..415fd2474b 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/MongoJobExplorerFactoryBean.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/MongoJobExplorerFactoryBean.java @@ -38,6 +38,7 @@ * "batch.version") * * @author Mahmoud Ben Hassine + * @author Myeongha Shin * @since 5.2.0 * @deprecated since 6.0 in favor of {@link MongoJobRepositoryFactoryBean}. Scheduled for * removal in 6.2 or later. @@ -47,28 +48,34 @@ public class MongoJobExplorerFactoryBean extends AbstractJobExplorerFactoryBean private MongoOperations mongoOperations; + private String collectionPrefix; + public void setMongoOperations(MongoOperations mongoOperations) { this.mongoOperations = mongoOperations; } + public void setCollectionPrefix(String collectionPrefix) { + this.collectionPrefix = collectionPrefix; + } + @Override protected JobInstanceDao createJobInstanceDao() { - return new MongoJobInstanceDao(this.mongoOperations); + return new MongoJobInstanceDao(this.mongoOperations, this.collectionPrefix); } @Override protected JobExecutionDao createJobExecutionDao() { - return new MongoJobExecutionDao(this.mongoOperations); + return new MongoJobExecutionDao(this.mongoOperations, this.collectionPrefix); } @Override protected StepExecutionDao createStepExecutionDao() { - return new MongoStepExecutionDao(this.mongoOperations); + return new MongoStepExecutionDao(this.mongoOperations, this.collectionPrefix); } @Override protected ExecutionContextDao createExecutionContextDao() { - return new MongoExecutionContextDao(this.mongoOperations); + return new MongoExecutionContextDao(this.mongoOperations, this.collectionPrefix); } @Override diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java index 75e309f70b..48f8a02d2a 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/MongoJobRepositoryFactoryBean.java @@ -37,6 +37,7 @@ * "batch.version") * * @author Mahmoud Ben Hassine + * @author Myeongha Shin * @since 5.2.0 */ public class MongoJobRepositoryFactoryBean extends AbstractJobRepositoryFactoryBean implements InitializingBean { @@ -49,6 +50,8 @@ public class MongoJobRepositoryFactoryBean extends AbstractJobRepositoryFactoryB private @Nullable DataFieldMaxValueIncrementer stepExecutionIncrementer; + private @Nullable String collectionPrefix = "BATCH_"; + public void setMongoOperations(MongoOperations mongoOperations) { this.mongoOperations = mongoOperations; } @@ -65,6 +68,10 @@ public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecuti this.stepExecutionIncrementer = stepExecutionIncrementer; } + public void setCollectionPrefix(String collectionPrefix) { + this.collectionPrefix = collectionPrefix; + } + @Override protected Object getTarget() throws Exception { MongoJobInstanceDao jobInstanceDao = createJobInstanceDao(); @@ -78,7 +85,7 @@ protected Object getTarget() throws Exception { @Override protected MongoJobInstanceDao createJobInstanceDao() { - MongoJobInstanceDao mongoJobInstanceDao = new MongoJobInstanceDao(this.mongoOperations); + MongoJobInstanceDao mongoJobInstanceDao = new MongoJobInstanceDao(this.mongoOperations, this.collectionPrefix); mongoJobInstanceDao.setJobKeyGenerator(this.jobKeyGenerator); mongoJobInstanceDao.setJobInstanceIncrementer(this.jobInstanceIncrementer); return mongoJobInstanceDao; @@ -86,37 +93,41 @@ protected MongoJobInstanceDao createJobInstanceDao() { @Override protected MongoJobExecutionDao createJobExecutionDao() { - MongoJobExecutionDao mongoJobExecutionDao = new MongoJobExecutionDao(this.mongoOperations); + MongoJobExecutionDao mongoJobExecutionDao = new MongoJobExecutionDao(this.mongoOperations, + this.collectionPrefix); mongoJobExecutionDao.setJobExecutionIncrementer(this.jobExecutionIncrementer); return mongoJobExecutionDao; } @Override protected MongoStepExecutionDao createStepExecutionDao() { - MongoStepExecutionDao mongoStepExecutionDao = new MongoStepExecutionDao(this.mongoOperations); + MongoStepExecutionDao mongoStepExecutionDao = new MongoStepExecutionDao(this.mongoOperations, + this.collectionPrefix); mongoStepExecutionDao.setStepExecutionIncrementer(this.stepExecutionIncrementer); return mongoStepExecutionDao; } @Override protected MongoExecutionContextDao createExecutionContextDao() { - return new MongoExecutionContextDao(this.mongoOperations); + return new MongoExecutionContextDao(this.mongoOperations, this.collectionPrefix); } @Override public void afterPropertiesSet() throws Exception { super.afterPropertiesSet(); Assert.notNull(this.mongoOperations, "MongoOperations must not be null."); + if (this.jobInstanceIncrementer == null) { - this.jobInstanceIncrementer = new MongoSequenceIncrementer(this.mongoOperations, "BATCH_JOB_INSTANCE_SEQ"); + this.jobInstanceIncrementer = new MongoSequenceIncrementer(this.mongoOperations, "JOB_INSTANCE_SEQ", + this.collectionPrefix); } if (this.jobExecutionIncrementer == null) { - this.jobExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations, - "BATCH_JOB_EXECUTION_SEQ"); + this.jobExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations, "JOB_EXECUTION_SEQ", + this.collectionPrefix); } if (this.stepExecutionIncrementer == null) { - this.stepExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations, - "BATCH_STEP_EXECUTION_SEQ"); + this.stepExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations, "STEP_EXECUTION_SEQ", + this.collectionPrefix); } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBCollectionPrefixTestConfiguration.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBCollectionPrefixTestConfiguration.java new file mode 100644 index 0000000000..d229140251 --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBCollectionPrefixTestConfiguration.java @@ -0,0 +1,96 @@ + +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.repository.support; + +import org.springframework.batch.core.job.Job; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.EnableMongoJobRepository; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.MongoDatabaseFactory; +import org.springframework.data.mongodb.MongoTransactionManager; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Test configuration for MongoDB collection prefix functionality + * + * @author Myeongha Shin + */ +@Configuration +@EnableBatchProcessing +class MongoDBCollectionPrefixTestConfiguration { + + private static final DockerImageName MONGODB_IMAGE = DockerImageName.parse("mongo:8.0.11"); + + @Bean(initMethod = "start") + public MongoDBContainer mongoDBContainer() { + return new MongoDBContainer(MONGODB_IMAGE); + } + + @Bean + public MongoDatabaseFactory mongoDatabaseFactory(MongoDBContainer mongoDBContainer) { + return new SimpleMongoClientDatabaseFactory(mongoDBContainer.getConnectionString() + "/test"); + } + + @Bean + public JobRepository jobRepository(MongoTemplate mongoTemplate, MongoTransactionManager transactionManager) + throws Exception { + MongoJobRepositoryFactoryBean jobRepositoryFactoryBean = new MongoJobRepositoryFactoryBean(); + jobRepositoryFactoryBean.setMongoOperations(mongoTemplate); + jobRepositoryFactoryBean.setTransactionManager(transactionManager); + jobRepositoryFactoryBean.setCollectionPrefix("TEST_COLLECTION_PREFIX_"); + jobRepositoryFactoryBean.afterPropertiesSet(); + return jobRepositoryFactoryBean.getObject(); + } + + @Bean + public MongoTemplate mongoTemplate(MongoDatabaseFactory mongoDatabaseFactory) { + MongoTemplate template = new MongoTemplate(mongoDatabaseFactory); + MappingMongoConverter converter = (MappingMongoConverter) template.getConverter(); + converter.setMapKeyDotReplacement("."); + return template; + } + + @Bean + public MongoTransactionManager transactionManager(MongoDatabaseFactory mongoDatabaseFactory) { + MongoTransactionManager mongoTransactionManager = new MongoTransactionManager(); + mongoTransactionManager.setDatabaseFactory(mongoDatabaseFactory); + mongoTransactionManager.afterPropertiesSet(); + return mongoTransactionManager; + } + + @Bean + public Job job(JobRepository jobRepository, MongoTransactionManager transactionManager) { + return new JobBuilder("job", jobRepository) + .start(new StepBuilder("step1", jobRepository) + .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager) + .build()) + .next(new StepBuilder("step2", jobRepository) + .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, transactionManager) + .build()) + .build(); + } + +} \ No newline at end of file diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryCollectionPrefixTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryCollectionPrefixTests.java new file mode 100644 index 0000000000..453ec151a5 --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryCollectionPrefixTests.java @@ -0,0 +1,101 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.repository.support; + +import java.time.LocalDateTime; +import java.util.Map; + +import com.mongodb.client.MongoCollection; +import org.bson.Document; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.job.Job; +import org.springframework.batch.core.job.JobExecution; +import org.springframework.batch.core.job.parameters.JobParameters; +import org.springframework.batch.core.job.parameters.JobParametersBuilder; +import org.springframework.batch.core.launch.JobOperator; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.mongodb.core.MongoTemplate; + +/** + * Test for MongoDB collection prefix functionality + * + * @author Myeongha Shin + */ +@DirtiesContext +@Testcontainers(disabledWithoutDocker = true) +@SpringJUnitConfig(MongoDBCollectionPrefixTestConfiguration.class) +public class MongoDBJobRepositoryCollectionPrefixTests { + + @Autowired + private MongoTemplate mongoTemplate; + + private static final String PREFIX = "TEST_COLLECTION_PREFIX_"; + + @BeforeEach + public void setUp() { + // collections with custom prefix + mongoTemplate.createCollection(PREFIX + "JOB_INSTANCE"); + mongoTemplate.createCollection(PREFIX + "JOB_EXECUTION"); + mongoTemplate.createCollection(PREFIX + "STEP_EXECUTION"); + // sequences collection with custom prefix + mongoTemplate.createCollection(PREFIX + "SEQUENCES"); + mongoTemplate.getCollection(PREFIX + "SEQUENCES") + .insertOne(new Document(Map.of("_id", PREFIX + "JOB_INSTANCE_SEQ", "count", 0L))); + mongoTemplate.getCollection(PREFIX + "SEQUENCES") + .insertOne(new Document(Map.of("_id", PREFIX + "JOB_EXECUTION_SEQ", "count", 0L))); + mongoTemplate.getCollection(PREFIX + "SEQUENCES") + .insertOne(new Document(Map.of("_id", PREFIX + "STEP_EXECUTION_SEQ", "count", 0L))); + } + + @Test + void testJobExecutionWithCustomPrefix(@Autowired JobOperator jobOperator, @Autowired Job job) throws Exception { + // given + JobParameters jobParameters = new JobParametersBuilder().addString("name", "foo") + .addLocalDateTime("runtime", LocalDateTime.now()) + .toJobParameters(); + + // when + JobExecution jobExecution = jobOperator.start(job, jobParameters); + + // then + Assertions.assertNotNull(jobExecution); + Assertions.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); + + // Verify data is stored in collections with custom prefix + MongoCollection jobInstancesCollection = mongoTemplate.getCollection(PREFIX + "JOB_INSTANCE"); + MongoCollection jobExecutionsCollection = mongoTemplate.getCollection(PREFIX + "JOB_EXECUTION"); + MongoCollection stepExecutionsCollection = mongoTemplate.getCollection(PREFIX + "STEP_EXECUTION"); + + Assertions.assertEquals(1, jobInstancesCollection.countDocuments()); + Assertions.assertEquals(1, jobExecutionsCollection.countDocuments()); + Assertions.assertEquals(2, stepExecutionsCollection.countDocuments()); + + // Verify default collections are empty + Assertions.assertFalse(mongoTemplate.collectionExists("BATCH_JOB_INSTANCE")); + Assertions.assertFalse(mongoTemplate.collectionExists("BATCH_JOB_EXECUTION")); + Assertions.assertFalse(mongoTemplate.collectionExists("BATCH_STEP_EXECUTION")); + + System.out.println("✅ Collection prefix test passed! Data stored in: " + PREFIX + "* collections"); + } + +} \ No newline at end of file