Skip to content

Commit

Permalink
Implement dynamic batch size for commit documents (taking into accoun…
Browse files Browse the repository at this point in the history
…t number of files).
  • Loading branch information
CamaradeRoman committed May 24, 2024
1 parent 91065ed commit b99b31d
Showing 1 changed file with 16 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import org.octopusden.octopus.vcsfacade.repository.RepositoryInfoRepository
import org.octopusden.octopus.vcsfacade.service.OpenSearchService
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException
import org.springframework.stereotype.Service

@Service
Expand Down Expand Up @@ -45,7 +44,7 @@ class OpenSearchServiceImpl(

override fun saveRepositoriesInfo(repositoriesInfo: Sequence<RepositoryInfoDocument>) {
log.trace("=> saveRepositoriesInfo({})", repositoriesInfo)
processAll(repositoriesInfo) { batch -> repositoryInfoRepository.saveAll(batch) }
processAll(repositoriesInfo) { repositoryInfoRepository.saveAll(it) }
log.trace("<= saveRepositoriesInfo({})", repositoriesInfo)
}

Expand All @@ -63,13 +62,13 @@ class OpenSearchServiceImpl(

override fun saveRefs(refs: Sequence<RefDocument>) {
log.trace("=> saveRef({})", refs)
processAll(refs) { batch -> refRepository.saveAll(batch) }
processAll(refs) { refRepository.saveAll(it) }
log.trace("<= saveRef({})", refs)
}

override fun deleteRefsByIds(refsIds: Sequence<String>) {
log.trace("=> deleteRefsByIds({})", refsIds)
processAll(refsIds) { batch -> refRepository.deleteAllById(batch) }
processAll(refsIds) { refRepository.deleteAllById(it) }
log.trace("<= deleteRefsByIds({})", refsIds)
}

Expand All @@ -87,13 +86,13 @@ class OpenSearchServiceImpl(

override fun saveCommits(commits: Sequence<CommitDocument>) {
log.trace("=> saveCommits({})", commits)
processAll(commits) { batch -> commitRepository.saveAll(batch) }
processAll(commits, 1000, { 1 + it.files.size }) { commitRepository.saveAll(it) }
log.trace("<= saveCommits({})", commits)
}

override fun deleteCommitsByIds(commitsIds: Sequence<String>) {
log.trace("=> deleteCommitsByIds({})", commitsIds)
processAll(commitsIds) { batch -> commitRepository.deleteAllById(batch) }
processAll(commitsIds) { commitRepository.deleteAllById(it) }
log.trace("<= deleteCommitsByIds({})", commitsIds)
}

Expand All @@ -111,13 +110,13 @@ class OpenSearchServiceImpl(

override fun savePullRequests(pullRequests: Sequence<PullRequestDocument>) {
log.trace("=> savePullRequests({})", pullRequests)
processAll(pullRequests) { batch -> pullRequestRepository.saveAll(batch) }
processAll(pullRequests) { pullRequestRepository.saveAll(it) }
log.trace("<= savePullRequests({})", pullRequests)
}

override fun deletePullRequestsByIds(pullRequestsIds: Sequence<String>) {
log.trace("=> deletePullRequestsByIds({})", pullRequestsIds)
processAll(pullRequestsIds) { batch -> pullRequestRepository.deleteAllById(batch) }
processAll(pullRequestsIds) { pullRequestRepository.deleteAllById(it) }
log.trace("<= deletePullRequestsByIds({})", pullRequestsIds)
}

Expand Down Expand Up @@ -220,24 +219,23 @@ class OpenSearchServiceImpl(
return documentsIds
}

private fun <T> processAll(documents: Sequence<T>, batchOperation: (batch: List<T>) -> Unit) {
fun batchOperationWithFallback(batch: List<T>) = try {
batchOperation.invoke(batch)
} catch (e: UncategorizedElasticsearchException) {
//fallback if batch size is still too large (because of non-average size of some documents)
batch.forEach { batchOperation(listOf(it)) }
}
private fun <T> processAll(documents: Sequence<T>, batchOperation: (batch: List<T>) -> Unit) =
processAll(documents, BATCH_SIZE, { 1 }, batchOperation)

private fun <T> processAll(documents: Sequence<T>, batchWeightLimit: Int, documentWeight: (document: T) -> Int, batchOperation: (batch: List<T>) -> Unit) {
val batch = ArrayList<T>(BATCH_SIZE)
var batchWeight = 0
for (document in documents) {
batchWeight += documentWeight.invoke(document)
batch.add(document)
if (batch.size == BATCH_SIZE) {
batchOperationWithFallback(batch)
if (batch.size == BATCH_SIZE || batchWeight >= batchWeightLimit) {
batchOperation.invoke(batch)
batch.clear()
batchWeight = 0
}
}
if (batch.isNotEmpty()) {
batchOperationWithFallback(batch)
batchOperation.invoke(batch)
}
}
}
Expand Down

0 comments on commit b99b31d

Please sign in to comment.