Skip to content

Commit b39f47d

Browse files
committed
Added delete of json file to bulk import and new integration test
1 parent 5bdda4f commit b39f47d

File tree

2 files changed

+54
-0
lines changed

2 files changed

+54
-0
lines changed

java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/runner/BulkImportJobDriver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@ private static BulkImportJob loadJob(
235235
} catch (JsonSyntaxException e) {
236236
LOGGER.error("Json job was malformed");
237237
throw e;
238+
} finally {
239+
s3Client.deleteObject(bulkImportBucket, jsonJobKey);
238240
}
239241
}
240242

java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverIT.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080

8181
import static org.assertj.core.api.Assertions.assertThat;
8282
import static org.assertj.core.api.Assertions.tuple;
83+
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_IMPORT_BUCKET;
8384
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
8485
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET;
8586
import static sleeper.core.properties.instance.CommonProperty.FILE_SYSTEM;
@@ -372,6 +373,55 @@ void shouldNotThrowExceptionIfProvidedWithDirectoryWhichContainsParquetAndNonPar
372373
ingestFinishedStatus(summary(startTime, endTime, 200, 200), 1))));
373374
}
374375

376+
@ParameterizedTest
377+
@MethodSource("getParameters")
378+
void shouldDeleteJsonFileAfterImport(BulkImportJobRunner runner) throws IOException {
379+
// Given
380+
// - Write some data to be imported
381+
List<Record> records = getRecords();
382+
writeRecordsToFile(records, dataDir + "/import/a.parquet");
383+
List<String> inputFiles = new ArrayList<>();
384+
inputFiles.add(dataDir + "/import/a.parquet");
385+
// - State store
386+
StateStore stateStore = createTable(instanceProperties, tableProperties);
387+
388+
// When
389+
BulkImportJob job = jobForTable(tableProperties).id("my-job").files(inputFiles).build();
390+
runJob(runner, instanceProperties, job);
391+
392+
// Then
393+
List<FileReference> fileReferences = stateStore.getFileReferences();
394+
List<Record> readRecords = new ArrayList<>();
395+
for (FileReference fileReference : fileReferences) {
396+
try (ParquetRecordReader reader = new ParquetRecordReader(new Path(fileReference.getFilename()), schema)) {
397+
List<Record> recordsInThisFile = new ArrayList<>();
398+
Record record = reader.read();
399+
while (null != record) {
400+
Record clonedRecord = new Record(record);
401+
readRecords.add(clonedRecord);
402+
recordsInThisFile.add(clonedRecord);
403+
record = reader.read();
404+
}
405+
assertThat(recordsInThisFile).isSortedAccordingTo(new RecordComparator(getSchema()));
406+
}
407+
}
408+
assertThat(readRecords).hasSameSizeAs(records);
409+
410+
List<Record> expectedRecords = new ArrayList<>(records);
411+
sortRecords(expectedRecords);
412+
sortRecords(readRecords);
413+
assertThat(readRecords).isEqualTo(expectedRecords);
414+
IngestJob ingestJob = job.toIngestJob();
415+
assertThat(tracker.getAllJobs(tableProperties.get(TABLE_ID)))
416+
.containsExactly(ingestJobStatus(ingestJob, jobRunOnTask(taskId,
417+
ingestAcceptedStatus(ingestJob, validationTime),
418+
validatedIngestStartedStatus(ingestJob, startTime),
419+
ingestFinishedStatus(summary(startTime, endTime, 200, 200), 1))));
420+
421+
// Check json file has been deleted
422+
assertThat(listObjectKeys(instanceProperties.get(BULK_IMPORT_BUCKET))).isEmpty();
423+
}
424+
375425
private static List<Record> readRecords(String filename, Schema schema) {
376426
try (ParquetRecordReader reader = new ParquetRecordReader(new Path(filename), schema)) {
377427
List<Record> readRecords = new ArrayList<>();
@@ -395,8 +445,10 @@ public InstanceProperties createInstanceProperties(String dir) {
395445
InstanceProperties instanceProperties = createTestInstanceProperties();
396446
instanceProperties.set(DATA_BUCKET, dir);
397447
instanceProperties.set(FILE_SYSTEM, "file://");
448+
instanceProperties.set(BULK_IMPORT_BUCKET, "bulkimport");
398449

399450
createBucket(instanceProperties.get(CONFIG_BUCKET));
451+
createBucket(instanceProperties.get(BULK_IMPORT_BUCKET));
400452
DynamoDBTableIndexCreator.create(dynamoClient, instanceProperties);
401453
new TransactionLogStateStoreCreator(instanceProperties, dynamoClient).create();
402454
return instanceProperties;

0 commit comments

Comments
 (0)