diff --git a/server/src/main/kotlin/org/octopusden/octopus/vcsfacade/service/impl/OpenSearchServiceImpl.kt b/server/src/main/kotlin/org/octopusden/octopus/vcsfacade/service/impl/OpenSearchServiceImpl.kt index 1b967a2..7d118c2 100644 --- a/server/src/main/kotlin/org/octopusden/octopus/vcsfacade/service/impl/OpenSearchServiceImpl.kt +++ b/server/src/main/kotlin/org/octopusden/octopus/vcsfacade/service/impl/OpenSearchServiceImpl.kt @@ -17,7 +17,6 @@ import org.octopusden.octopus.vcsfacade.repository.RepositoryInfoRepository import org.octopusden.octopus.vcsfacade.service.OpenSearchService import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty -import org.springframework.data.elasticsearch.UncategorizedElasticsearchException import org.springframework.stereotype.Service @Service @@ -45,7 +44,7 @@ class OpenSearchServiceImpl( override fun saveRepositoriesInfo(repositoriesInfo: Sequence) { log.trace("=> saveRepositoriesInfo({})", repositoriesInfo) - processAll(repositoriesInfo) { batch -> repositoryInfoRepository.saveAll(batch) } + processAll(repositoriesInfo) { repositoryInfoRepository.saveAll(it) } log.trace("<= saveRepositoriesInfo({})", repositoriesInfo) } @@ -63,13 +62,13 @@ class OpenSearchServiceImpl( override fun saveRefs(refs: Sequence) { log.trace("=> saveRef({})", refs) - processAll(refs) { batch -> refRepository.saveAll(batch) } + processAll(refs) { refRepository.saveAll(it) } log.trace("<= saveRef({})", refs) } override fun deleteRefsByIds(refsIds: Sequence) { log.trace("=> deleteRefsByIds({})", refsIds) - processAll(refsIds) { batch -> refRepository.deleteAllById(batch) } + processAll(refsIds) { refRepository.deleteAllById(it) } log.trace("<= deleteRefsByIds({})", refsIds) } @@ -87,13 +86,13 @@ class OpenSearchServiceImpl( override fun saveCommits(commits: Sequence) { log.trace("=> saveCommits({})", commits) - processAll(commits) { batch -> commitRepository.saveAll(batch) } + processAll(commits, 1000, { 1 + it.files.size }) { commitRepository.saveAll(it) } log.trace("<= saveCommits({})", commits) } override fun deleteCommitsByIds(commitsIds: Sequence) { log.trace("=> deleteCommitsByIds({})", commitsIds) - processAll(commitsIds) { batch -> commitRepository.deleteAllById(batch) } + processAll(commitsIds) { commitRepository.deleteAllById(it) } log.trace("<= deleteCommitsByIds({})", commitsIds) } @@ -111,13 +110,13 @@ class OpenSearchServiceImpl( override fun savePullRequests(pullRequests: Sequence) { log.trace("=> savePullRequests({})", pullRequests) - processAll(pullRequests) { batch -> pullRequestRepository.saveAll(batch) } + processAll(pullRequests) { pullRequestRepository.saveAll(it) } log.trace("<= savePullRequests({})", pullRequests) } override fun deletePullRequestsByIds(pullRequestsIds: Sequence) { log.trace("=> deletePullRequestsByIds({})", pullRequestsIds) - processAll(pullRequestsIds) { batch -> pullRequestRepository.deleteAllById(batch) } + processAll(pullRequestsIds) { pullRequestRepository.deleteAllById(it) } log.trace("<= deletePullRequestsByIds({})", pullRequestsIds) } @@ -220,24 +219,23 @@ class OpenSearchServiceImpl( return documentsIds } - private fun processAll(documents: Sequence, batchOperation: (batch: List) -> Unit) { - fun batchOperationWithFallback(batch: List) = try { - batchOperation.invoke(batch) - } catch (e: UncategorizedElasticsearchException) { - //fallback if batch size is still too large (because of non-average size of some documents) - batch.forEach { batchOperation(listOf(it)) } - } + private fun processAll(documents: Sequence, batchOperation: (batch: List) -> Unit) = + processAll(documents, BATCH_SIZE, { 1 }, batchOperation) + private fun processAll(documents: Sequence, batchWeightLimit: Int, documentWeight: (document: T) -> Int, batchOperation: (batch: List) -> Unit) { val batch = ArrayList(BATCH_SIZE) + var batchWeight = 0 for (document in documents) { + batchWeight += documentWeight.invoke(document) batch.add(document) - if (batch.size == BATCH_SIZE) { - batchOperationWithFallback(batch) + if (batch.size == BATCH_SIZE || batchWeight >= batchWeightLimit) { + batchOperation.invoke(batch) batch.clear() + batchWeight = 0 } } if (batch.isNotEmpty()) { - batchOperationWithFallback(batch) + batchOperation.invoke(batch) } } }