Skip to content

Commit

Permalink
Implement fetchAllIds.
Browse files Browse the repository at this point in the history
  • Loading branch information
CamaradeRoman committed May 23, 2024
1 parent 76f7b43 commit 91065ed
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ interface OpenSearchService {
fun findRepositoryInfoById(repositoryId: String): RepositoryInfoDocument?
fun saveRepositoriesInfo(repositoriesInfo: Sequence<RepositoryInfoDocument>)
fun deleteRepositoryInfoById(repositoryId: String)
fun findRefsByRepositoryId(repositoryId: String): Set<RefDocument>
fun findRefsIdsByRepositoryId(repositoryId: String): Set<String>
fun saveRefs(refs: Sequence<RefDocument>)
fun deleteRefsByIds(refsIds: Sequence<String>)
fun deleteRefsByRepositoryId(repositoryId: String)
fun findCommitsByRepositoryId(repositoryId: String): Set<CommitDocument>
fun findCommitsIdsByRepositoryId(repositoryId: String): Set<String>
fun saveCommits(commits: Sequence<CommitDocument>)
fun deleteCommitsByIds(commitsIds: Sequence<String>)
fun deleteCommitsByRepositoryId(repositoryId: String)
fun findPullRequestsByRepositoryId(repositoryId: String): Set<PullRequestDocument>
fun findPullRequestsIdsByRepositoryId(repositoryId: String): Set<String>
fun savePullRequests(pullRequests: Sequence<PullRequestDocument>)
fun deletePullRequestsByIds(pullRequestsIds: Sequence<String>)
fun deletePullRequestsByRepositoryId(repositoryId: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,13 @@ class GiteaIndexerServiceImpl(
val tags = giteaService.getTags(repository.group, repository.name).map {
it.toDocument(repository)
}
val orphanedRefsIds = (openSearchService.findRefsByRepositoryId(repository.id).map { it.id } - (
branches.map { it.id }.toSet() + tags.map { it.id })).asSequence()
val orphanedRefsIds = (openSearchService.findRefsIdsByRepositoryId(repository.id) -
(branches.map { it.id } + tags.map { it.id }).toSet()).asSequence()
logIndexActionMessage(
"Remove orphaned refs from index for `${repository.fullName()}` $GITEA repository",
orphanedRefsIds
)
openSearchService.deleteRefsByIds(orphanedRefsIds)
log.debug("Save repository refs in index for `{}` {} repository", repository.fullName(), GITEA)
logIndexActionMessage(
"Save branches in index for `${repository.fullName()}` $GITEA repository", branches
)
Expand All @@ -206,8 +205,8 @@ class GiteaIndexerServiceImpl(
val commits = giteaService.getBranchesCommitGraph(repository.group, repository.name).map {
it.toDocument(repository)
}
val orphanedCommitsIds = (openSearchService.findCommitsByRepositoryId(repository.id)
.map { it.id } - commits.map { it.id }.toSet()).asSequence()
val orphanedCommitsIds = (openSearchService.findCommitsIdsByRepositoryId(repository.id) -
commits.map { it.id }.toSet()).asSequence()
logIndexActionMessage(
"Remove orphaned commits from index for `${repository.fullName()}` $GITEA repository",
orphanedCommitsIds
Expand All @@ -222,8 +221,8 @@ class GiteaIndexerServiceImpl(
} catch (e: NotFoundException) {
emptySequence() //for some reason Gitea returns 404 in case of empty repository
}.map { it.toDocument(repository) }
val orphanedPullRequestsIds = (openSearchService.findPullRequestsByRepositoryId(repository.id)
.map { it.id } - pullRequests.map { it.id }.toSet()).asSequence()
val orphanedPullRequestsIds = (openSearchService.findPullRequestsIdsByRepositoryId(repository.id) -
pullRequests.map { it.id }.toSet()).asSequence()
logIndexActionMessage(
"Remove orphaned pull requests from index for `${repository.fullName()}` $GITEA repository",
orphanedPullRequestsIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ class OpenSearchServiceImpl(
log.trace("<= deleteRepositoryInfo({})", repositoryId)
}

override fun findRefsByRepositoryId(repositoryId: String): Set<RefDocument> {
override fun findRefsIdsByRepositoryId(repositoryId: String): Set<String> {
log.trace("=> findRefsByRepositoryId({})", repositoryId)
return fetchAll { refRepository.searchFirst100ByRepositoryIdAndIdAfterOrderByIdAsc(repositoryId, it) }
return fetchAllIds { refRepository.searchFirst100ByRepositoryIdAndIdAfterOrderByIdAsc(repositoryId, it) }
.also { log.trace("<= findRefsByRepositoryId({}): {}", repositoryId, it) }
}

Expand All @@ -79,9 +79,9 @@ class OpenSearchServiceImpl(
log.trace("<= deleteRefsByRepositoryId({})", repositoryId)
}

override fun findCommitsByRepositoryId(repositoryId: String): Set<CommitDocument> {
override fun findCommitsIdsByRepositoryId(repositoryId: String): Set<String> {
log.trace("=> findCommitsByRepositoryId({})", repositoryId)
return fetchAll { commitRepository.searchFirst100ByRepositoryIdAndIdAfterOrderByIdAsc(repositoryId, it) }
return fetchAllIds { commitRepository.searchFirst100ByRepositoryIdAndIdAfterOrderByIdAsc(repositoryId, it) }
.also { log.trace("<= findCommitsByRepositoryId({}): {}", repositoryId, it) }
}

Expand All @@ -103,9 +103,9 @@ class OpenSearchServiceImpl(
log.trace("<= deleteCommitsByRepositoryId({})", repositoryId)
}

override fun findPullRequestsByRepositoryId(repositoryId: String): Set<PullRequestDocument> {
override fun findPullRequestsIdsByRepositoryId(repositoryId: String): Set<String> {
log.trace("=> findPullRequestsByRepositoryId({})", repositoryId)
return fetchAll { pullRequestRepository.searchFirst100ByRepositoryIdAndIdAfterOrderByIdAsc(repositoryId, it) }
return fetchAllIds { pullRequestRepository.searchFirst100ByRepositoryIdAndIdAfterOrderByIdAsc(repositoryId, it) }
.also { log.trace("<= findCommitsByRepositoryId({}): {}", repositoryId, it) }
}

Expand Down Expand Up @@ -208,6 +208,18 @@ class OpenSearchServiceImpl(
return documents
}

private fun <T : BaseDocument> fetchAllIds(fetchBatchAfterId: (id: String) -> List<T>): Set<String> {
val documentsIds = mutableSetOf<String>()
var lastId = ""
do {
val batch = fetchBatchAfterId.invoke(lastId).map { it.id }
if (batch.isEmpty()) break
documentsIds.addAll(batch)
lastId = batch.last()
} while (batch.size == BATCH_SIZE)
return documentsIds
}

private fun <T> processAll(documents: Sequence<T>, batchOperation: (batch: List<T>) -> Unit) {
fun batchOperationWithFallback(batch: List<T>) = try {
batchOperation.invoke(batch)
Expand Down

0 comments on commit 91065ed

Please sign in to comment.