From 172107eef31ccfceee51038fcd435080ceae85ad Mon Sep 17 00:00:00 2001 From: Scott Olsen Date: Tue, 7 Nov 2023 14:39:45 -0500 Subject: [PATCH] Convert a bunch of mutations related streams to Flow and suspend funs. (#2048) * Convert a bunch of mutations related streams to Flow and suspend funs. Updates mutations, submission and LOI, to use FLow and suspend functions. Also updates tests accordingly. I've verified the sync UI works as expected after this change. In one or two places, I've used RX interop to avoid needing to scope-bloat the conversions at this time, but we should be able to eliminate those calls relatively soon. * Stores+Sync Status VM: Rename a few methods to adhere to conventions; refactor based on feedback --- .../room/dao/LocationOfInterestMutationDao.kt | 13 ++--- .../local/room/dao/SubmissionMutationDao.kt | 9 ++- .../stores/RoomLocationOfInterestStore.kt | 20 ++++--- .../local/room/stores/RoomSubmissionStore.kt | 20 ++++--- .../stores/LocalLocationOfInterestStore.kt | 12 ++-- .../local/stores/LocalSubmissionStore.kt | 16 ++++-- .../LocationOfInterestRepository.kt | 28 +++++++--- .../ground/repository/MutationRepository.kt | 31 +++------- .../ground/repository/SubmissionRepository.kt | 29 +++++----- .../ui/syncstatus/SyncStatusViewModel.kt | 56 ++++++++++--------- .../persistence/local/LocalDataStoreTests.kt | 25 ++++----- .../LocationOfInterestRepositoryTest.kt | 2 +- 12 files changed, 134 insertions(+), 127 deletions(-) diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/LocationOfInterestMutationDao.kt b/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/LocationOfInterestMutationDao.kt index 49923ba409..b2e44a1e71 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/LocationOfInterestMutationDao.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/LocationOfInterestMutationDao.kt @@ -19,8 +19,7 @@ import androidx.room.Dao import androidx.room.Query import com.google.android.ground.persistence.local.room.entity.LocationOfInterestMutationEntity import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus -import com.google.android.ground.rx.annotations.Cold -import io.reactivex.Flowable +import kotlinx.coroutines.flow.Flow /** * Provides low-level read/write operations of [LocationOfInterestMutationEntity] to/from the local @@ -29,14 +28,14 @@ import io.reactivex.Flowable @Dao interface LocationOfInterestMutationDao : BaseDao { @Query("SELECT * FROM location_of_interest_mutation") - fun loadAllOnceAndStream(): Flowable> + fun getAllMutationsFlow(): Flow> @Query( "SELECT * FROM location_of_interest_mutation " + "WHERE location_of_interest_id = :locationOfInterestId " + "AND state IN (:allowedStates)" ) - suspend fun findByLocationOfInterestId( + suspend fun getMutations( locationOfInterestId: String, vararg allowedStates: MutationEntitySyncStatus ): List? @@ -46,8 +45,8 @@ interface LocationOfInterestMutationDao : BaseDao> + vararg allowedStates: MutationEntitySyncStatus, + ): Flow> } diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/SubmissionMutationDao.kt b/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/SubmissionMutationDao.kt index 46e53553c7..d42c3b9797 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/SubmissionMutationDao.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/SubmissionMutationDao.kt @@ -20,14 +20,13 @@ import androidx.room.Query import com.google.android.ground.persistence.local.room.entity.SubmissionMutationEntity import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus import com.google.android.ground.persistence.local.room.fields.MutationEntityType -import com.google.android.ground.rx.annotations.Cold -import io.reactivex.Flowable +import kotlinx.coroutines.flow.Flow /** Data access object for database operations related to [SubmissionMutationEntity]. */ @Dao interface SubmissionMutationDao : BaseDao { @Query("SELECT * FROM submission_mutation") - fun loadAllOnceAndStream(): Flowable> + fun getAllMutationsFlow(): Flow> @Query( "SELECT * FROM submission_mutation " + @@ -62,8 +61,8 @@ interface SubmissionMutationDao : BaseDao { "SELECT * FROM submission_mutation " + "WHERE location_of_interest_id = :locationOfInterestId AND state IN (:allowedStates)" ) - fun findByLocationOfInterestIdOnceAndStream( + fun findByLoiIdFlow( locationOfInterestId: String, vararg allowedStates: MutationEntitySyncStatus - ): @Cold(terminates = false) Flowable> + ): Flow> } diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomLocationOfInterestStore.kt b/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomLocationOfInterestStore.kt index ca157c423f..d7eab5f1e9 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomLocationOfInterestStore.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomLocationOfInterestStore.kt @@ -36,6 +36,7 @@ import io.reactivex.Flowable import io.reactivex.Maybe import javax.inject.Inject import javax.inject.Singleton +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import timber.log.Timber @@ -125,22 +126,25 @@ class RoomLocationOfInterestStore @Inject internal constructor() : LocalLocation } } - override fun getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream( + override fun getMutationsFlow( locationOfInterestId: String, vararg allowedStates: MutationEntitySyncStatus - ): Flowable> = - locationOfInterestMutationDao - .findByLocationOfInterestIdOnceAndStream(locationOfInterestId, *allowedStates) - .map { list: List -> list.map { it.toModelObject() } } + ): Flow> = + locationOfInterestMutationDao.getMutationsFlow(locationOfInterestId, *allowedStates).map { + mutations -> + mutations.map { it.toModelObject() } + } - override fun getAllMutationsAndStream(): Flowable> = - locationOfInterestMutationDao.loadAllOnceAndStream() + override fun getAllSurveyMutations(survey: Survey): Flow> = + locationOfInterestMutationDao.getAllMutationsFlow().map { mutations -> + mutations.filter { it.surveyId == survey.id }.map { it.toModelObject() } + } override suspend fun findByLocationOfInterestId( id: String, vararg states: MutationEntitySyncStatus ): List = - locationOfInterestMutationDao.findByLocationOfInterestId(id, *states) ?: listOf() + locationOfInterestMutationDao.getMutations(id, *states) ?: listOf() override suspend fun insertOrUpdate(loi: LocationOfInterest) = locationOfInterestDao.insertOrUpdate(loi.toLocalDataStoreObject()) diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomSubmissionStore.kt b/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomSubmissionStore.kt index aa7a3f9172..54ca4c8ff2 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomSubmissionStore.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomSubmissionStore.kt @@ -44,11 +44,12 @@ import com.google.android.ground.persistence.local.stores.LocalSubmissionStore import com.google.android.ground.rx.Schedulers import com.google.android.ground.util.Debug.logOnFailure import com.google.firebase.crashlytics.FirebaseCrashlytics -import io.reactivex.Flowable import io.reactivex.Single import javax.inject.Inject import javax.inject.Singleton import kotlinx.collections.immutable.toPersistentList +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map import timber.log.Timber /** Manages access to [Submission] objects persisted in local storage. */ @@ -195,14 +196,15 @@ class RoomSubmissionStore @Inject internal constructor() : LocalSubmissionStore submissionDao.findByIdSuspend(submissionId)?.let { submissionDao.delete(it) } } - override fun getSubmissionMutationsByLocationOfInterestIdOnceAndStream( + override fun getSubmissionMutationsByLoiIdFlow( survey: Survey, locationOfInterestId: String, vararg allowedStates: MutationEntitySyncStatus - ): Flowable> = - submissionMutationDao - .findByLocationOfInterestIdOnceAndStream(locationOfInterestId, *allowedStates) - .map { list: List -> list.map { it.toModelObject(survey) } } + ): Flow> = + submissionMutationDao.findByLoiIdFlow(locationOfInterestId, *allowedStates).map { + list: List -> + list.map { it.toModelObject(survey) } + } override suspend fun applyAndEnqueue(mutation: SubmissionMutation) { try { @@ -216,8 +218,10 @@ class RoomSubmissionStore @Inject internal constructor() : LocalSubmissionStore } } - override fun getAllMutationsAndStream(): Flowable> = - submissionMutationDao.loadAllOnceAndStream() + override fun getAllSurveyMutationsFlow(survey: Survey): Flow> = + submissionMutationDao.getAllMutationsFlow().map { mutations -> + mutations.filter { it.surveyId == survey.id }.map { it.toModelObject(survey) } + } override suspend fun findByLocationOfInterestId( loidId: String, diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalLocationOfInterestStore.kt b/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalLocationOfInterestStore.kt index dc15dde52d..347fef5242 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalLocationOfInterestStore.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalLocationOfInterestStore.kt @@ -21,7 +21,6 @@ import com.google.android.ground.model.mutation.LocationOfInterestMutation import com.google.android.ground.persistence.local.room.entity.LocationOfInterestMutationEntity import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus import com.google.android.ground.rx.annotations.Cold -import io.reactivex.Flowable import io.reactivex.Maybe import kotlinx.coroutines.flow.Flow @@ -46,12 +45,17 @@ interface LocalLocationOfInterestStore : * Emits the list of [LocationOfInterestMutation] instances for a given LOI which match the * provided `allowedStates`. A new list is emitted on each subsequent change. */ - fun getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream( + fun getMutationsFlow( locationOfInterestId: String, vararg allowedStates: MutationEntitySyncStatus - ): Flowable> + ): Flow> - fun getAllMutationsAndStream(): Flowable> + /** + * Returns a [Flow] that emits a [List] of all [LocationOfInterestMutation]s stored in the local + * db related to a given [Survey]. A new [List] is emitted on each change to the underlying saved + * data. + */ + fun getAllSurveyMutations(survey: Survey): Flow> suspend fun findByLocationOfInterestId( id: String, diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalSubmissionStore.kt b/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalSubmissionStore.kt index b64e4aed18..60168131d1 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalSubmissionStore.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalSubmissionStore.kt @@ -21,7 +21,7 @@ import com.google.android.ground.model.mutation.SubmissionMutation import com.google.android.ground.model.submission.Submission import com.google.android.ground.persistence.local.room.entity.SubmissionMutationEntity import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus -import io.reactivex.Flowable +import kotlinx.coroutines.flow.Flow interface LocalSubmissionStore : LocalMutationStore { /** @@ -43,16 +43,20 @@ interface LocalSubmissionStore : LocalMutationStore> + ): Flow> - fun getAllMutationsAndStream(): Flowable> + /** + * Returns a [Flow] that emits a list of all [SubmissionMutation]s associated with a given + * [Survey]. The list is newly emitted each time the underlying local data changes. + */ + fun getAllSurveyMutationsFlow(survey: Survey): Flow> suspend fun findByLocationOfInterestId( loidId: String, diff --git a/ground/src/main/java/com/google/android/ground/repository/LocationOfInterestRepository.kt b/ground/src/main/java/com/google/android/ground/repository/LocationOfInterestRepository.kt index d8c506b341..dfb9c26db3 100644 --- a/ground/src/main/java/com/google/android/ground/repository/LocationOfInterestRepository.kt +++ b/ground/src/main/java/com/google/android/ground/repository/LocationOfInterestRepository.kt @@ -41,6 +41,8 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.map +import kotlinx.coroutines.rx2.asFlowable +import kotlinx.coroutines.rx2.awaitSingleOrNull /** * Coordinates persistence and retrieval of [LocationOfInterest] instances from remote, local, and @@ -73,6 +75,7 @@ constructor( } /** This only works if the survey and location of interests are already cached to local db. */ + @Deprecated("Prefer getOfflineLocationOfInterestSuspend") fun getOfflineLocationOfInterest( surveyId: String, locationOfInterest: String @@ -84,6 +87,15 @@ constructor( Single.error { NotFoundException("Location of interest not found $locationOfInterest") } ) + suspend fun getOfflineLoiSuspend( + surveyId: String, + locationOfInterest: String + ): LocationOfInterest = + localSurveyStore.getSurveyByIdSuspend(surveyId)?.let { + localLoiStore.getLocationOfInterest(it, locationOfInterest).awaitSingleOrNull() + } + ?: throw NotFoundException("Location of interest not found $locationOfInterest") + fun createLocationOfInterest(geometry: Geometry, job: Job, surveyId: String): LocationOfInterest { val auditInfo = AuditInfo(authManager.currentUser) return LocationOfInterest( @@ -115,15 +127,17 @@ constructor( * have not yet been marked as [SyncStatus.COMPLETED], including pending, in progress, and failed * mutations. A new list is emitted on each subsequent change. */ - fun getIncompleteLocationOfInterestMutationsOnceAndStream( + fun getIncompleteLoiMutationsOnceAndStream( locationOfInterestId: String ): Flowable> = - localLoiStore.getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream( - locationOfInterestId, - MutationEntitySyncStatus.PENDING, - MutationEntitySyncStatus.IN_PROGRESS, - MutationEntitySyncStatus.FAILED - ) + localLoiStore + .getMutationsFlow( + locationOfInterestId, + MutationEntitySyncStatus.PENDING, + MutationEntitySyncStatus.IN_PROGRESS, + MutationEntitySyncStatus.FAILED + ) + .asFlowable() /** Returns a flow of all [LocationOfInterest] associated with the given [Survey]. */ fun getLocationsOfInterests(survey: Survey): Flow> = diff --git a/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt b/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt index 6b2bd4500a..f6736e7ed5 100644 --- a/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt +++ b/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt @@ -27,11 +27,10 @@ import com.google.android.ground.persistence.local.room.fields.MutationEntitySyn import com.google.android.ground.persistence.local.stores.LocalLocationOfInterestStore import com.google.android.ground.persistence.local.stores.LocalSubmissionStore import com.google.android.ground.persistence.local.stores.LocalSurveyStore -import com.google.android.ground.rx.Schedulers -import com.google.android.ground.rx.annotations.Cold -import io.reactivex.Flowable import javax.inject.Inject import javax.inject.Singleton +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.combine /** * Coordinates persistence of mutations across [LocationOfInterestMutation] and [SubmissionMutation] @@ -44,33 +43,17 @@ constructor( private val localSurveyStore: LocalSurveyStore, private val localLocationOfInterestStore: LocalLocationOfInterestStore, private val localSubmissionStore: LocalSubmissionStore, - private val schedulers: Schedulers ) { /** * Returns a long-lived stream that emits the full list of mutations for specified survey on * subscribe and a new list on each subsequent change. */ - fun getMutationsOnceAndStream( - survey: Survey - ): @Cold(terminates = false) Flowable> { + fun getSurveyMutationsFlow(survey: Survey): Flow> { // TODO: Show mutations for all surveys, not just current one. - val locationOfInterestMutations = - localLocationOfInterestStore - .getAllMutationsAndStream() - .map { it.filter { it.surveyId == survey.id }.map { it.toModelObject() } } - .subscribeOn(schedulers.io()) - val submissionMutations = - localSubmissionStore - .getAllMutationsAndStream() - .map { list: List -> - list.filter { it.surveyId == survey.id }.map { it.toModelObject(survey) } - } - .subscribeOn(schedulers.io()) - return Flowable.combineLatest( - locationOfInterestMutations, - submissionMutations, - this::combineAndSortMutations - ) + val locationOfInterestMutations = localLocationOfInterestStore.getAllSurveyMutations(survey) + val submissionMutations = localSubmissionStore.getAllSurveyMutationsFlow(survey) + + return locationOfInterestMutations.combine(submissionMutations, this::combineAndSortMutations) } /** diff --git a/ground/src/main/java/com/google/android/ground/repository/SubmissionRepository.kt b/ground/src/main/java/com/google/android/ground/repository/SubmissionRepository.kt index 9eea47008a..f4b12669d2 100644 --- a/ground/src/main/java/com/google/android/ground/repository/SubmissionRepository.kt +++ b/ground/src/main/java/com/google/android/ground/repository/SubmissionRepository.kt @@ -31,13 +31,13 @@ import com.google.android.ground.persistence.uuid.OfflineUuidGenerator import com.google.android.ground.rx.annotations.Cold import com.google.android.ground.system.auth.AuthenticationManager import io.reactivex.Completable -import io.reactivex.Flowable import io.reactivex.Single import javax.inject.Inject import javax.inject.Singleton import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.rx2.rxCompletable -import kotlinx.coroutines.rx2.rxMaybe import kotlinx.coroutines.rx2.rxSingle /** @@ -137,21 +137,20 @@ constructor( * been marked as [SyncStatus.COMPLETED], including pending, in progress, and failed mutations. A * new list is emitted on each subsequent change. */ - fun getIncompleteSubmissionMutationsOnceAndStream( + suspend fun getIncompleteSubmissionMutationsOnceAndStream( surveyId: String, locationOfInterestId: String - ): Flowable> = - rxMaybe { localSurveyStore.getSurveyByIdSuspend(surveyId) } - .toFlowable() - .flatMap { - localSubmissionStore.getSubmissionMutationsByLocationOfInterestIdOnceAndStream( - it, - locationOfInterestId, - MutationEntitySyncStatus.PENDING, - MutationEntitySyncStatus.IN_PROGRESS, - MutationEntitySyncStatus.FAILED - ) - } + ): Flow> { + val survey = localSurveyStore.getSurveyByIdSuspend(surveyId) ?: return flowOf() + + return localSubmissionStore.getSubmissionMutationsByLoiIdFlow( + survey, + locationOfInterestId, + MutationEntitySyncStatus.PENDING, + MutationEntitySyncStatus.IN_PROGRESS, + MutationEntitySyncStatus.FAILED + ) + } suspend fun getTotalSubmissionCount(loi: LocationOfInterest) = loi.submissionCount + getPendingCreateCount(loi.id) - getPendingDeleteCount(loi.id) diff --git a/ground/src/main/java/com/google/android/ground/ui/syncstatus/SyncStatusViewModel.kt b/ground/src/main/java/com/google/android/ground/ui/syncstatus/SyncStatusViewModel.kt index 5c1d9ce158..af23d41422 100644 --- a/ground/src/main/java/com/google/android/ground/ui/syncstatus/SyncStatusViewModel.kt +++ b/ground/src/main/java/com/google/android/ground/ui/syncstatus/SyncStatusViewModel.kt @@ -17,18 +17,19 @@ package com.google.android.ground.ui.syncstatus import android.util.Pair import androidx.lifecycle.LiveData -import androidx.lifecycle.toLiveData -import com.google.android.ground.model.Survey +import androidx.lifecycle.asLiveData import com.google.android.ground.model.locationofinterest.LocationOfInterest import com.google.android.ground.model.mutation.Mutation import com.google.android.ground.repository.LocationOfInterestRepository import com.google.android.ground.repository.MutationRepository import com.google.android.ground.repository.SurveyRepository import com.google.android.ground.ui.common.AbstractViewModel -import io.reactivex.Flowable -import io.reactivex.Single -import java8.util.Optional import javax.inject.Inject +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.map /** * View model for the offline area manager fragment. Handles the current list of downloaded areas. @@ -41,31 +42,32 @@ internal constructor( private val locationOfInterestRepository: LocationOfInterestRepository ) : AbstractViewModel() { - val mutations: LiveData>> + /** [Flow] of latest mutations for the active [Survey]. */ + @OptIn(ExperimentalCoroutinesApi::class) + private val mutationsFlow: Flow> = + surveyRepository.activeSurveyFlow.filterNotNull().flatMapLatest { + mutationRepository.getSurveyMutationsFlow(it) + } - init { - mutations = mutationsOnceAndStream.switchMap { loadLocationsOfInterestAndPair(it) }.toLiveData() - } - - private val mutationsOnceAndStream: Flowable> - get() = - surveyRepository.activeSurveyFlowable.switchMap { survey: Optional -> - survey - .map { mutationRepository.getMutationsOnceAndStream(it) } - .orElse(Flowable.just(listOf())) - } + /** + * List of current local [Mutation]s executed by the user, with their corresponding + * [LocationOfInterest]. + */ + val mutations: LiveData>> = + mutationsFlow.map { loadLocationsOfInterestAndPair(it) }.asLiveData() - // TODO: Replace with kotlin coroutine - private fun loadLocationsOfInterestAndPair( + private suspend fun loadLocationsOfInterestAndPair( mutations: List - ): Flowable>> = - Single.merge(mutations.map { loadLocationOfInterestAndPair(it) }).toList().toFlowable() + ): List> = mutations.map { loadLocationOfInterestAndPair(it) } - // TODO: Replace with kotlin coroutine - private fun loadLocationOfInterestAndPair( + private suspend fun loadLocationOfInterestAndPair( mutation: Mutation - ): Single> = - locationOfInterestRepository - .getOfflineLocationOfInterest(mutation.surveyId, mutation.locationOfInterestId) - .map { locationOfInterest: LocationOfInterest -> Pair.create(locationOfInterest, mutation) } + ): Pair { + val loi = + locationOfInterestRepository.getOfflineLoiSuspend( + mutation.surveyId, + mutation.locationOfInterestId + ) + return Pair(loi, mutation) + } } diff --git a/ground/src/test/java/com/google/android/ground/persistence/local/LocalDataStoreTests.kt b/ground/src/test/java/com/google/android/ground/persistence/local/LocalDataStoreTests.kt index 864ad67aaa..c0c285f7fd 100644 --- a/ground/src/test/java/com/google/android/ground/persistence/local/LocalDataStoreTests.kt +++ b/ground/src/test/java/com/google/android/ground/persistence/local/LocalDataStoreTests.kt @@ -137,12 +137,8 @@ class LocalDataStoreTests : BaseHiltTest() { advanceUntilIdle() localLoiStore - .getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream( - TEST_LOI_MUTATION.locationOfInterestId, - MutationEntitySyncStatus.PENDING - ) - .test() - .assertValue(listOf(TEST_LOI_MUTATION)) + .getMutationsFlow(TEST_LOI_MUTATION.locationOfInterestId, MutationEntitySyncStatus.PENDING) + .test { assertThat(expectMostRecentItem()).isEqualTo(listOf(TEST_LOI_MUTATION)) } } @Test @@ -165,12 +161,11 @@ class LocalDataStoreTests : BaseHiltTest() { localLoiStore.applyAndEnqueue(TEST_POLYGON_LOI_MUTATION) localLoiStore - .getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream( + .getMutationsFlow( TEST_POLYGON_LOI_MUTATION.locationOfInterestId, MutationEntitySyncStatus.PENDING ) - .test() - .assertValue(listOf(TEST_POLYGON_LOI_MUTATION)) + .test { assertThat(expectMostRecentItem()).isEqualTo(listOf(TEST_POLYGON_LOI_MUTATION)) } } @Test @@ -221,13 +216,12 @@ class LocalDataStoreTests : BaseHiltTest() { localSubmissionStore.applyAndEnqueue(TEST_SUBMISSION_MUTATION) localSubmissionStore - .getSubmissionMutationsByLocationOfInterestIdOnceAndStream( + .getSubmissionMutationsByLoiIdFlow( TEST_SURVEY, TEST_LOI_MUTATION.locationOfInterestId, MutationEntitySyncStatus.PENDING ) - .test() - .assertValue(listOf(TEST_SUBMISSION_MUTATION)) + .test { assertThat(expectMostRecentItem()).isEqualTo(listOf(TEST_SUBMISSION_MUTATION)) } val loi = localLoiStore.getLocationOfInterest(TEST_SURVEY, "loi id").blockingGet() var submission = localSubmissionStore.getSubmission(loi, "submission id") assertEquivalent(TEST_SUBMISSION_MUTATION, submission) @@ -247,13 +241,14 @@ class LocalDataStoreTests : BaseHiltTest() { localSubmissionStore.applyAndEnqueue(mutation) localSubmissionStore - .getSubmissionMutationsByLocationOfInterestIdOnceAndStream( + .getSubmissionMutationsByLoiIdFlow( TEST_SURVEY, TEST_LOI_MUTATION.locationOfInterestId, MutationEntitySyncStatus.PENDING ) - .test() - .assertValue(listOf(TEST_SUBMISSION_MUTATION, mutation)) + .test { + assertThat(expectMostRecentItem()).isEqualTo(listOf(TEST_SUBMISSION_MUTATION, mutation)) + } // check if the submission was updated in the local database submission = localSubmissionStore.getSubmission(loi, "submission id") diff --git a/ground/src/test/java/com/google/android/ground/repository/LocationOfInterestRepositoryTest.kt b/ground/src/test/java/com/google/android/ground/repository/LocationOfInterestRepositoryTest.kt index c9822babeb..06b3080b8c 100644 --- a/ground/src/test/java/com/google/android/ground/repository/LocationOfInterestRepositoryTest.kt +++ b/ground/src/test/java/com/google/android/ground/repository/LocationOfInterestRepositoryTest.kt @@ -99,7 +99,7 @@ class LocationOfInterestRepositoryTest : BaseHiltTest() { locationOfInterestRepository.applyAndEnqueue(mutation) locationOfInterestRepository - .getIncompleteLocationOfInterestMutationsOnceAndStream(LOCATION_OF_INTEREST.id) + .getIncompleteLoiMutationsOnceAndStream(LOCATION_OF_INTEREST.id) .test() .assertNoErrors() .assertValue(listOf(mutation.copy(id = 1)))