Skip to content

Commit 8531cfb

Browse files
committed
Fix MongoDB job restart failure after abrupt shutdown
Fixes #4943 where job restart fails after abrupt shutdown because getLastStepExecution() only checks the empty stepExecutions array in BATCH_JOB_EXECUTION instead of the BATCH_STEP_EXECUTION collection. Changes: - getLastStepExecution(): Query BATCH_STEP_EXECUTION collection directly - countStepExecutions(): Query collection instead of embedded array - Align with JDBC implementation behavior Signed-off-by: baezzys <[email protected]>
1 parent 3da06e5 commit 8531cfb

File tree

3 files changed

+117
-41
lines changed

3 files changed

+117
-41
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,16 @@
1515
*/
1616
package org.springframework.batch.core.repository.dao.mongodb;
1717

18-
import java.util.ArrayList;
1918
import java.util.Collection;
20-
import java.util.Comparator;
2119
import java.util.List;
22-
import java.util.Optional;
2320

2421
import org.springframework.batch.core.job.JobExecution;
2522
import org.springframework.batch.core.job.JobInstance;
2623
import org.springframework.batch.core.step.StepExecution;
2724
import org.springframework.batch.core.repository.dao.StepExecutionDao;
2825
import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter;
2926
import org.springframework.batch.core.repository.persistence.converter.StepExecutionConverter;
27+
import org.springframework.data.domain.Sort;
3028
import org.springframework.data.mongodb.core.MongoOperations;
3129
import org.springframework.data.mongodb.core.query.Query;
3230
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
@@ -36,6 +34,7 @@
3634

3735
/**
3836
* @author Mahmoud Ben Hassine
37+
* @author Jinwoo Bae
3938
* @since 5.2.0
4039
*/
4140
public class MongoStepExecutionDao implements StepExecutionDao {
@@ -100,34 +99,42 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
10099
@Override
101100
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
102101
// TODO optimize the query
103-
// get all step executions
104-
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = new ArrayList<>();
105-
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
102+
Query jobExecutionQuery = query(where("jobInstanceId").is(jobInstance.getId()));
106103
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
107-
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
104+
.find(jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class,
108105
JOB_EXECUTIONS_COLLECTION_NAME);
109-
for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) {
110-
stepExecutions.addAll(jobExecution.getStepExecutions());
111-
}
112-
// sort step executions by creation date then id (see contract) and return the
113-
// first one
114-
Optional<org.springframework.batch.core.repository.persistence.StepExecution> lastStepExecution = stepExecutions
115-
.stream()
116-
.filter(stepExecution -> stepExecution.getName().equals(stepName))
117-
.min(Comparator
118-
.comparing(org.springframework.batch.core.repository.persistence.StepExecution::getCreateTime)
119-
.thenComparing(org.springframework.batch.core.repository.persistence.StepExecution::getId));
120-
if (lastStepExecution.isPresent()) {
121-
org.springframework.batch.core.repository.persistence.StepExecution stepExecution = lastStepExecution.get();
122-
JobExecution jobExecution = this.jobExecutionConverter.toJobExecution(jobExecutions.stream()
123-
.filter(execution -> execution.getJobExecutionId().equals(stepExecution.getJobExecutionId()))
124-
.findFirst()
125-
.get(), jobInstance);
126-
return this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution);
106+
107+
if (jobExecutions.isEmpty()) {
108+
return null;
127109
}
128-
else {
110+
111+
List<Long> jobExecutionIds = jobExecutions.stream()
112+
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
113+
.toList();
114+
115+
Query stepExecutionQuery = query(where("name").is(stepName).and("jobExecutionId").in(jobExecutionIds))
116+
.with(Sort.by(Sort.Direction.DESC, "createTime", "stepExecutionId"))
117+
.limit(1);
118+
119+
org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations
120+
.findOne(stepExecutionQuery, org.springframework.batch.core.repository.persistence.StepExecution.class,
121+
STEP_EXECUTIONS_COLLECTION_NAME);
122+
123+
if (stepExecution == null) {
129124
return null;
130125
}
126+
127+
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = jobExecutions.stream()
128+
.filter(execution -> execution.getJobExecutionId().equals(stepExecution.getJobExecutionId()))
129+
.findFirst()
130+
.orElse(null);
131+
132+
if (jobExecution != null) {
133+
JobExecution jobExecutionDomain = this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance);
134+
return this.stepExecutionConverter.toStepExecution(stepExecution, jobExecutionDomain);
135+
}
136+
137+
return null;
131138
}
132139

133140
@Override
@@ -144,22 +151,23 @@ public void addStepExecutions(JobExecution jobExecution) {
144151

145152
@Override
146153
public long countStepExecutions(JobInstance jobInstance, String stepName) {
147-
long count = 0;
148-
// TODO optimize the count query
149-
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
150-
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
151-
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
152-
JOB_EXECUTIONS_COLLECTION_NAME);
153-
for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) {
154-
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = jobExecution
155-
.getStepExecutions();
156-
for (org.springframework.batch.core.repository.persistence.StepExecution stepExecution : stepExecutions) {
157-
if (stepExecution.getName().equals(stepName)) {
158-
count++;
159-
}
160-
}
154+
Query jobExecutionQuery = query(where("jobInstanceId").is(jobInstance.getId()));
155+
List<Long> jobExecutionIds = this.mongoOperations
156+
.find(jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class,
157+
JOB_EXECUTIONS_COLLECTION_NAME)
158+
.stream()
159+
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
160+
.toList();
161+
162+
if (jobExecutionIds.isEmpty()) {
163+
return 0;
161164
}
162-
return count;
165+
166+
// Count step executions directly from BATCH_STEP_EXECUTION collection
167+
Query stepQuery = query(where("name").is(stepName).and("jobExecutionId").in(jobExecutionIds));
168+
return this.mongoOperations.count(stepQuery,
169+
org.springframework.batch.core.repository.persistence.StepExecution.class,
170+
STEP_EXECUTIONS_COLLECTION_NAME);
163171
}
164172

165173
}

spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBIntegrationTestConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
2020
import org.springframework.batch.core.job.builder.JobBuilder;
2121
import org.springframework.batch.core.repository.JobRepository;
22+
import org.springframework.batch.core.repository.dao.StepExecutionDao;
2223
import org.springframework.batch.core.step.builder.StepBuilder;
2324
import org.springframework.batch.repeat.RepeatStatus;
2425
import org.springframework.context.annotation.Bean;
@@ -56,6 +57,11 @@ public JobRepository jobRepository(MongoTemplate mongoTemplate, MongoTransaction
5657
return jobRepositoryFactoryBean.getObject();
5758
}
5859

60+
@Bean
61+
public StepExecutionDao stepExecutionDao(MongoTemplate mongoTemplate) {
62+
return new org.springframework.batch.core.repository.dao.mongodb.MongoStepExecutionDao(mongoTemplate);
63+
}
64+
5965
@Bean
6066
public MongoDatabaseFactory mongoDatabaseFactory(MongoDBContainer mongoDBContainer) {
6167
return new SimpleMongoClientDatabaseFactory(mongoDBContainer.getConnectionString() + "/test");

spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.batch.core.repository.support;
1717

1818
import java.time.LocalDateTime;
19+
import java.util.Collections;
1920
import java.util.Map;
2021

2122
import com.mongodb.client.MongoCollection;
@@ -29,17 +30,26 @@
2930
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
3031
import org.testcontainers.junit.jupiter.Testcontainers;
3132

33+
import org.springframework.batch.core.BatchStatus;
3234
import org.springframework.batch.core.ExitStatus;
3335
import org.springframework.batch.core.job.Job;
3436
import org.springframework.batch.core.job.JobExecution;
37+
import org.springframework.batch.core.job.JobInstance;
3538
import org.springframework.batch.core.job.parameters.JobParameters;
3639
import org.springframework.batch.core.job.parameters.JobParametersBuilder;
3740
import org.springframework.batch.core.launch.JobOperator;
41+
import org.springframework.batch.core.repository.JobRepository;
42+
import org.springframework.batch.core.repository.dao.StepExecutionDao;
43+
import org.springframework.batch.core.step.StepExecution;
3844
import org.springframework.beans.factory.annotation.Autowired;
3945
import org.springframework.data.mongodb.core.MongoTemplate;
46+
import org.springframework.data.mongodb.core.query.Criteria;
47+
import org.springframework.data.mongodb.core.query.Query;
48+
import org.springframework.data.mongodb.core.query.Update;
4049

4150
/**
4251
* @author Mahmoud Ben Hassine
52+
* @author Jinwoo Bae
4353
* @author Yanming Zhou
4454
*/
4555
@DirtiesContext
@@ -112,6 +122,58 @@ void testJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) th
112122
dump(stepExecutionsCollection, "step execution = ");
113123
}
114124

125+
/**
126+
* Test for GitHub issue #4943: getLastStepExecution should work when JobExecution's
127+
* embedded stepExecutions array is empty.
128+
*
129+
* <p>
130+
* This can happen after abrupt shutdown when the embedded stepExecutions array is not
131+
* synchronized, but BATCH_STEP_EXECUTION collection still contains the data.
132+
*
133+
*/
134+
@Test
135+
void testGetLastStepExecutionWithEmptyEmbeddedArray(@Autowired JobOperator jobOperator, @Autowired Job job,
136+
@Autowired StepExecutionDao stepExecutionDao) throws Exception {
137+
// Step 1: Run job normally
138+
JobParameters jobParameters = new JobParametersBuilder().addString("name", "emptyArrayTest")
139+
.addLocalDateTime("runtime", LocalDateTime.now())
140+
.toJobParameters();
141+
142+
JobExecution jobExecution = jobOperator.start(job, jobParameters);
143+
JobInstance jobInstance = jobExecution.getJobInstance();
144+
145+
// Verify job completed successfully
146+
Assertions.assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
147+
148+
// Step 2: Simulate the core issue - clear embedded stepExecutions array
149+
// while keeping BATCH_STEP_EXECUTION collection intact
150+
Query jobQuery = new Query(Criteria.where("jobExecutionId").is(jobExecution.getId()));
151+
Update jobUpdate = new Update().set("stepExecutions", Collections.emptyList());
152+
mongoTemplate.updateFirst(jobQuery, jobUpdate, "BATCH_JOB_EXECUTION");
153+
154+
// Step 3: Verify embedded array is empty but collection still has data
155+
MongoCollection<Document> jobExecutionsCollection = mongoTemplate.getCollection("BATCH_JOB_EXECUTION");
156+
MongoCollection<Document> stepExecutionsCollection = mongoTemplate.getCollection("BATCH_STEP_EXECUTION");
157+
158+
Document jobDoc = jobExecutionsCollection.find(new Document("jobExecutionId", jobExecution.getId())).first();
159+
Assertions.assertTrue(jobDoc.getList("stepExecutions", Document.class).isEmpty(),
160+
"Embedded stepExecutions array should be empty");
161+
Assertions.assertEquals(2, stepExecutionsCollection.countDocuments(),
162+
"BATCH_STEP_EXECUTION collection should still contain data");
163+
164+
// Step 4: Test the fix - getLastStepExecution should work despite empty embedded
165+
// array
166+
StepExecution lastStepExecution = stepExecutionDao.getLastStepExecution(jobInstance, "step1");
167+
Assertions.assertNotNull(lastStepExecution,
168+
"getLastStepExecution should find step execution even with empty embedded array");
169+
Assertions.assertEquals("step1", lastStepExecution.getStepName());
170+
Assertions.assertEquals(BatchStatus.COMPLETED, lastStepExecution.getStatus());
171+
172+
// Step 5: Test countStepExecutions also works
173+
long stepCount = stepExecutionDao.countStepExecutions(jobInstance, "step1");
174+
Assertions.assertEquals(1L, stepCount, "countStepExecutions should work despite empty embedded array");
175+
}
176+
115177
private static void dump(MongoCollection<Document> collection, String prefix) {
116178
for (Document document : collection.find()) {
117179
System.out.println(prefix + document.toJson());

0 commit comments

Comments
 (0)