diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/LocationOfInterestMutationDao.kt b/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/LocationOfInterestMutationDao.kt index 49923ba409..b2e44a1e71 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/LocationOfInterestMutationDao.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/LocationOfInterestMutationDao.kt @@ -19,8 +19,7 @@ import androidx.room.Dao import androidx.room.Query import com.google.android.ground.persistence.local.room.entity.LocationOfInterestMutationEntity import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus -import com.google.android.ground.rx.annotations.Cold -import io.reactivex.Flowable +import kotlinx.coroutines.flow.Flow /** * Provides low-level read/write operations of [LocationOfInterestMutationEntity] to/from the local @@ -29,14 +28,14 @@ import io.reactivex.Flowable @Dao interface LocationOfInterestMutationDao : BaseDao { @Query("SELECT * FROM location_of_interest_mutation") - fun loadAllOnceAndStream(): Flowable> + fun getAllMutationsFlow(): Flow> @Query( "SELECT * FROM location_of_interest_mutation " + "WHERE location_of_interest_id = :locationOfInterestId " + "AND state IN (:allowedStates)" ) - suspend fun findByLocationOfInterestId( + suspend fun getMutations( locationOfInterestId: String, vararg allowedStates: MutationEntitySyncStatus ): List? @@ -46,8 +45,8 @@ interface LocationOfInterestMutationDao : BaseDao> + vararg allowedStates: MutationEntitySyncStatus, + ): Flow> } diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/SubmissionMutationDao.kt b/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/SubmissionMutationDao.kt index 46e53553c7..d42c3b9797 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/SubmissionMutationDao.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/room/dao/SubmissionMutationDao.kt @@ -20,14 +20,13 @@ import androidx.room.Query import com.google.android.ground.persistence.local.room.entity.SubmissionMutationEntity import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus import com.google.android.ground.persistence.local.room.fields.MutationEntityType -import com.google.android.ground.rx.annotations.Cold -import io.reactivex.Flowable +import kotlinx.coroutines.flow.Flow /** Data access object for database operations related to [SubmissionMutationEntity]. */ @Dao interface SubmissionMutationDao : BaseDao { @Query("SELECT * FROM submission_mutation") - fun loadAllOnceAndStream(): Flowable> + fun getAllMutationsFlow(): Flow> @Query( "SELECT * FROM submission_mutation " + @@ -62,8 +61,8 @@ interface SubmissionMutationDao : BaseDao { "SELECT * FROM submission_mutation " + "WHERE location_of_interest_id = :locationOfInterestId AND state IN (:allowedStates)" ) - fun findByLocationOfInterestIdOnceAndStream( + fun findByLoiIdFlow( locationOfInterestId: String, vararg allowedStates: MutationEntitySyncStatus - ): @Cold(terminates = false) Flowable> + ): Flow> } diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomLocationOfInterestStore.kt b/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomLocationOfInterestStore.kt index ca157c423f..d7eab5f1e9 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomLocationOfInterestStore.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomLocationOfInterestStore.kt @@ -36,6 +36,7 @@ import io.reactivex.Flowable import io.reactivex.Maybe import javax.inject.Inject import javax.inject.Singleton +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import timber.log.Timber @@ -125,22 +126,25 @@ class RoomLocationOfInterestStore @Inject internal constructor() : LocalLocation } } - override fun getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream( + override fun getMutationsFlow( locationOfInterestId: String, vararg allowedStates: MutationEntitySyncStatus - ): Flowable> = - locationOfInterestMutationDao - .findByLocationOfInterestIdOnceAndStream(locationOfInterestId, *allowedStates) - .map { list: List -> list.map { it.toModelObject() } } + ): Flow> = + locationOfInterestMutationDao.getMutationsFlow(locationOfInterestId, *allowedStates).map { + mutations -> + mutations.map { it.toModelObject() } + } - override fun getAllMutationsAndStream(): Flowable> = - locationOfInterestMutationDao.loadAllOnceAndStream() + override fun getAllSurveyMutations(survey: Survey): Flow> = + locationOfInterestMutationDao.getAllMutationsFlow().map { mutations -> + mutations.filter { it.surveyId == survey.id }.map { it.toModelObject() } + } override suspend fun findByLocationOfInterestId( id: String, vararg states: MutationEntitySyncStatus ): List = - locationOfInterestMutationDao.findByLocationOfInterestId(id, *states) ?: listOf() + locationOfInterestMutationDao.getMutations(id, *states) ?: listOf() override suspend fun insertOrUpdate(loi: LocationOfInterest) = locationOfInterestDao.insertOrUpdate(loi.toLocalDataStoreObject()) diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomSubmissionStore.kt b/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomSubmissionStore.kt index aa7a3f9172..54ca4c8ff2 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomSubmissionStore.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomSubmissionStore.kt @@ -44,11 +44,12 @@ import com.google.android.ground.persistence.local.stores.LocalSubmissionStore import com.google.android.ground.rx.Schedulers import com.google.android.ground.util.Debug.logOnFailure import com.google.firebase.crashlytics.FirebaseCrashlytics -import io.reactivex.Flowable import io.reactivex.Single import javax.inject.Inject import javax.inject.Singleton import kotlinx.collections.immutable.toPersistentList +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map import timber.log.Timber /** Manages access to [Submission] objects persisted in local storage. */ @@ -195,14 +196,15 @@ class RoomSubmissionStore @Inject internal constructor() : LocalSubmissionStore submissionDao.findByIdSuspend(submissionId)?.let { submissionDao.delete(it) } } - override fun getSubmissionMutationsByLocationOfInterestIdOnceAndStream( + override fun getSubmissionMutationsByLoiIdFlow( survey: Survey, locationOfInterestId: String, vararg allowedStates: MutationEntitySyncStatus - ): Flowable> = - submissionMutationDao - .findByLocationOfInterestIdOnceAndStream(locationOfInterestId, *allowedStates) - .map { list: List -> list.map { it.toModelObject(survey) } } + ): Flow> = + submissionMutationDao.findByLoiIdFlow(locationOfInterestId, *allowedStates).map { + list: List -> + list.map { it.toModelObject(survey) } + } override suspend fun applyAndEnqueue(mutation: SubmissionMutation) { try { @@ -216,8 +218,10 @@ class RoomSubmissionStore @Inject internal constructor() : LocalSubmissionStore } } - override fun getAllMutationsAndStream(): Flowable> = - submissionMutationDao.loadAllOnceAndStream() + override fun getAllSurveyMutationsFlow(survey: Survey): Flow> = + submissionMutationDao.getAllMutationsFlow().map { mutations -> + mutations.filter { it.surveyId == survey.id }.map { it.toModelObject(survey) } + } override suspend fun findByLocationOfInterestId( loidId: String, diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalLocationOfInterestStore.kt b/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalLocationOfInterestStore.kt index dc15dde52d..347fef5242 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalLocationOfInterestStore.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalLocationOfInterestStore.kt @@ -21,7 +21,6 @@ import com.google.android.ground.model.mutation.LocationOfInterestMutation import com.google.android.ground.persistence.local.room.entity.LocationOfInterestMutationEntity import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus import com.google.android.ground.rx.annotations.Cold -import io.reactivex.Flowable import io.reactivex.Maybe import kotlinx.coroutines.flow.Flow @@ -46,12 +45,17 @@ interface LocalLocationOfInterestStore : * Emits the list of [LocationOfInterestMutation] instances for a given LOI which match the * provided `allowedStates`. A new list is emitted on each subsequent change. */ - fun getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream( + fun getMutationsFlow( locationOfInterestId: String, vararg allowedStates: MutationEntitySyncStatus - ): Flowable> + ): Flow> - fun getAllMutationsAndStream(): Flowable> + /** + * Returns a [Flow] that emits a [List] of all [LocationOfInterestMutation]s stored in the local + * db related to a given [Survey]. A new [List] is emitted on each change to the underlying saved + * data. + */ + fun getAllSurveyMutations(survey: Survey): Flow> suspend fun findByLocationOfInterestId( id: String, diff --git a/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalSubmissionStore.kt b/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalSubmissionStore.kt index b64e4aed18..60168131d1 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalSubmissionStore.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/local/stores/LocalSubmissionStore.kt @@ -21,7 +21,7 @@ import com.google.android.ground.model.mutation.SubmissionMutation import com.google.android.ground.model.submission.Submission import com.google.android.ground.persistence.local.room.entity.SubmissionMutationEntity import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus -import io.reactivex.Flowable +import kotlinx.coroutines.flow.Flow interface LocalSubmissionStore : LocalMutationStore { /** @@ -43,16 +43,20 @@ interface LocalSubmissionStore : LocalMutationStore> + ): Flow> - fun getAllMutationsAndStream(): Flowable> + /** + * Returns a [Flow] that emits a list of all [SubmissionMutation]s associated with a given + * [Survey]. The list is newly emitted each time the underlying local data changes. + */ + fun getAllSurveyMutationsFlow(survey: Survey): Flow> suspend fun findByLocationOfInterestId( loidId: String, diff --git a/ground/src/main/java/com/google/android/ground/repository/LocationOfInterestRepository.kt b/ground/src/main/java/com/google/android/ground/repository/LocationOfInterestRepository.kt index d8c506b341..dfb9c26db3 100644 --- a/ground/src/main/java/com/google/android/ground/repository/LocationOfInterestRepository.kt +++ b/ground/src/main/java/com/google/android/ground/repository/LocationOfInterestRepository.kt @@ -41,6 +41,8 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.map +import kotlinx.coroutines.rx2.asFlowable +import kotlinx.coroutines.rx2.awaitSingleOrNull /** * Coordinates persistence and retrieval of [LocationOfInterest] instances from remote, local, and @@ -73,6 +75,7 @@ constructor( } /** This only works if the survey and location of interests are already cached to local db. */ + @Deprecated("Prefer getOfflineLocationOfInterestSuspend") fun getOfflineLocationOfInterest( surveyId: String, locationOfInterest: String @@ -84,6 +87,15 @@ constructor( Single.error { NotFoundException("Location of interest not found $locationOfInterest") } ) + suspend fun getOfflineLoiSuspend( + surveyId: String, + locationOfInterest: String + ): LocationOfInterest = + localSurveyStore.getSurveyByIdSuspend(surveyId)?.let { + localLoiStore.getLocationOfInterest(it, locationOfInterest).awaitSingleOrNull() + } + ?: throw NotFoundException("Location of interest not found $locationOfInterest") + fun createLocationOfInterest(geometry: Geometry, job: Job, surveyId: String): LocationOfInterest { val auditInfo = AuditInfo(authManager.currentUser) return LocationOfInterest( @@ -115,15 +127,17 @@ constructor( * have not yet been marked as [SyncStatus.COMPLETED], including pending, in progress, and failed * mutations. A new list is emitted on each subsequent change. */ - fun getIncompleteLocationOfInterestMutationsOnceAndStream( + fun getIncompleteLoiMutationsOnceAndStream( locationOfInterestId: String ): Flowable> = - localLoiStore.getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream( - locationOfInterestId, - MutationEntitySyncStatus.PENDING, - MutationEntitySyncStatus.IN_PROGRESS, - MutationEntitySyncStatus.FAILED - ) + localLoiStore + .getMutationsFlow( + locationOfInterestId, + MutationEntitySyncStatus.PENDING, + MutationEntitySyncStatus.IN_PROGRESS, + MutationEntitySyncStatus.FAILED + ) + .asFlowable() /** Returns a flow of all [LocationOfInterest] associated with the given [Survey]. */ fun getLocationsOfInterests(survey: Survey): Flow> = diff --git a/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt b/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt index 6b2bd4500a..f6736e7ed5 100644 --- a/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt +++ b/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt @@ -27,11 +27,10 @@ import com.google.android.ground.persistence.local.room.fields.MutationEntitySyn import com.google.android.ground.persistence.local.stores.LocalLocationOfInterestStore import com.google.android.ground.persistence.local.stores.LocalSubmissionStore import com.google.android.ground.persistence.local.stores.LocalSurveyStore -import com.google.android.ground.rx.Schedulers -import com.google.android.ground.rx.annotations.Cold -import io.reactivex.Flowable import javax.inject.Inject import javax.inject.Singleton +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.combine /** * Coordinates persistence of mutations across [LocationOfInterestMutation] and [SubmissionMutation] @@ -44,33 +43,17 @@ constructor( private val localSurveyStore: LocalSurveyStore, private val localLocationOfInterestStore: LocalLocationOfInterestStore, private val localSubmissionStore: LocalSubmissionStore, - private val schedulers: Schedulers ) { /** * Returns a long-lived stream that emits the full list of mutations for specified survey on * subscribe and a new list on each subsequent change. */ - fun getMutationsOnceAndStream( - survey: Survey - ): @Cold(terminates = false) Flowable> { + fun getSurveyMutationsFlow(survey: Survey): Flow> { // TODO: Show mutations for all surveys, not just current one. - val locationOfInterestMutations = - localLocationOfInterestStore - .getAllMutationsAndStream() - .map { it.filter { it.surveyId == survey.id }.map { it.toModelObject() } } - .subscribeOn(schedulers.io()) - val submissionMutations = - localSubmissionStore - .getAllMutationsAndStream() - .map { list: List -> - list.filter { it.surveyId == survey.id }.map { it.toModelObject(survey) } - } - .subscribeOn(schedulers.io()) - return Flowable.combineLatest( - locationOfInterestMutations, - submissionMutations, - this::combineAndSortMutations - ) + val locationOfInterestMutations = localLocationOfInterestStore.getAllSurveyMutations(survey) + val submissionMutations = localSubmissionStore.getAllSurveyMutationsFlow(survey) + + return locationOfInterestMutations.combine(submissionMutations, this::combineAndSortMutations) } /** diff --git a/ground/src/main/java/com/google/android/ground/repository/SubmissionRepository.kt b/ground/src/main/java/com/google/android/ground/repository/SubmissionRepository.kt index 9eea47008a..f4b12669d2 100644 --- a/ground/src/main/java/com/google/android/ground/repository/SubmissionRepository.kt +++ b/ground/src/main/java/com/google/android/ground/repository/SubmissionRepository.kt @@ -31,13 +31,13 @@ import com.google.android.ground.persistence.uuid.OfflineUuidGenerator import com.google.android.ground.rx.annotations.Cold import com.google.android.ground.system.auth.AuthenticationManager import io.reactivex.Completable -import io.reactivex.Flowable import io.reactivex.Single import javax.inject.Inject import javax.inject.Singleton import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.rx2.rxCompletable -import kotlinx.coroutines.rx2.rxMaybe import kotlinx.coroutines.rx2.rxSingle /** @@ -137,21 +137,20 @@ constructor( * been marked as [SyncStatus.COMPLETED], including pending, in progress, and failed mutations. A * new list is emitted on each subsequent change. */ - fun getIncompleteSubmissionMutationsOnceAndStream( + suspend fun getIncompleteSubmissionMutationsOnceAndStream( surveyId: String, locationOfInterestId: String - ): Flowable> = - rxMaybe { localSurveyStore.getSurveyByIdSuspend(surveyId) } - .toFlowable() - .flatMap { - localSubmissionStore.getSubmissionMutationsByLocationOfInterestIdOnceAndStream( - it, - locationOfInterestId, - MutationEntitySyncStatus.PENDING, - MutationEntitySyncStatus.IN_PROGRESS, - MutationEntitySyncStatus.FAILED - ) - } + ): Flow> { + val survey = localSurveyStore.getSurveyByIdSuspend(surveyId) ?: return flowOf() + + return localSubmissionStore.getSubmissionMutationsByLoiIdFlow( + survey, + locationOfInterestId, + MutationEntitySyncStatus.PENDING, + MutationEntitySyncStatus.IN_PROGRESS, + MutationEntitySyncStatus.FAILED + ) + } suspend fun getTotalSubmissionCount(loi: LocationOfInterest) = loi.submissionCount + getPendingCreateCount(loi.id) - getPendingDeleteCount(loi.id) diff --git a/ground/src/main/java/com/google/android/ground/ui/syncstatus/SyncStatusViewModel.kt b/ground/src/main/java/com/google/android/ground/ui/syncstatus/SyncStatusViewModel.kt index 5c1d9ce158..af23d41422 100644 --- a/ground/src/main/java/com/google/android/ground/ui/syncstatus/SyncStatusViewModel.kt +++ b/ground/src/main/java/com/google/android/ground/ui/syncstatus/SyncStatusViewModel.kt @@ -17,18 +17,19 @@ package com.google.android.ground.ui.syncstatus import android.util.Pair import androidx.lifecycle.LiveData -import androidx.lifecycle.toLiveData -import com.google.android.ground.model.Survey +import androidx.lifecycle.asLiveData import com.google.android.ground.model.locationofinterest.LocationOfInterest import com.google.android.ground.model.mutation.Mutation import com.google.android.ground.repository.LocationOfInterestRepository import com.google.android.ground.repository.MutationRepository import com.google.android.ground.repository.SurveyRepository import com.google.android.ground.ui.common.AbstractViewModel -import io.reactivex.Flowable -import io.reactivex.Single -import java8.util.Optional import javax.inject.Inject +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.map /** * View model for the offline area manager fragment. Handles the current list of downloaded areas. @@ -41,31 +42,32 @@ internal constructor( private val locationOfInterestRepository: LocationOfInterestRepository ) : AbstractViewModel() { - val mutations: LiveData>> + /** [Flow] of latest mutations for the active [Survey]. */ + @OptIn(ExperimentalCoroutinesApi::class) + private val mutationsFlow: Flow> = + surveyRepository.activeSurveyFlow.filterNotNull().flatMapLatest { + mutationRepository.getSurveyMutationsFlow(it) + } - init { - mutations = mutationsOnceAndStream.switchMap { loadLocationsOfInterestAndPair(it) }.toLiveData() - } - - private val mutationsOnceAndStream: Flowable> - get() = - surveyRepository.activeSurveyFlowable.switchMap { survey: Optional -> - survey - .map { mutationRepository.getMutationsOnceAndStream(it) } - .orElse(Flowable.just(listOf())) - } + /** + * List of current local [Mutation]s executed by the user, with their corresponding + * [LocationOfInterest]. + */ + val mutations: LiveData>> = + mutationsFlow.map { loadLocationsOfInterestAndPair(it) }.asLiveData() - // TODO: Replace with kotlin coroutine - private fun loadLocationsOfInterestAndPair( + private suspend fun loadLocationsOfInterestAndPair( mutations: List - ): Flowable>> = - Single.merge(mutations.map { loadLocationOfInterestAndPair(it) }).toList().toFlowable() + ): List> = mutations.map { loadLocationOfInterestAndPair(it) } - // TODO: Replace with kotlin coroutine - private fun loadLocationOfInterestAndPair( + private suspend fun loadLocationOfInterestAndPair( mutation: Mutation - ): Single> = - locationOfInterestRepository - .getOfflineLocationOfInterest(mutation.surveyId, mutation.locationOfInterestId) - .map { locationOfInterest: LocationOfInterest -> Pair.create(locationOfInterest, mutation) } + ): Pair { + val loi = + locationOfInterestRepository.getOfflineLoiSuspend( + mutation.surveyId, + mutation.locationOfInterestId + ) + return Pair(loi, mutation) + } } diff --git a/ground/src/test/java/com/google/android/ground/persistence/local/LocalDataStoreTests.kt b/ground/src/test/java/com/google/android/ground/persistence/local/LocalDataStoreTests.kt index 864ad67aaa..c0c285f7fd 100644 --- a/ground/src/test/java/com/google/android/ground/persistence/local/LocalDataStoreTests.kt +++ b/ground/src/test/java/com/google/android/ground/persistence/local/LocalDataStoreTests.kt @@ -137,12 +137,8 @@ class LocalDataStoreTests : BaseHiltTest() { advanceUntilIdle() localLoiStore - .getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream( - TEST_LOI_MUTATION.locationOfInterestId, - MutationEntitySyncStatus.PENDING - ) - .test() - .assertValue(listOf(TEST_LOI_MUTATION)) + .getMutationsFlow(TEST_LOI_MUTATION.locationOfInterestId, MutationEntitySyncStatus.PENDING) + .test { assertThat(expectMostRecentItem()).isEqualTo(listOf(TEST_LOI_MUTATION)) } } @Test @@ -165,12 +161,11 @@ class LocalDataStoreTests : BaseHiltTest() { localLoiStore.applyAndEnqueue(TEST_POLYGON_LOI_MUTATION) localLoiStore - .getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream( + .getMutationsFlow( TEST_POLYGON_LOI_MUTATION.locationOfInterestId, MutationEntitySyncStatus.PENDING ) - .test() - .assertValue(listOf(TEST_POLYGON_LOI_MUTATION)) + .test { assertThat(expectMostRecentItem()).isEqualTo(listOf(TEST_POLYGON_LOI_MUTATION)) } } @Test @@ -221,13 +216,12 @@ class LocalDataStoreTests : BaseHiltTest() { localSubmissionStore.applyAndEnqueue(TEST_SUBMISSION_MUTATION) localSubmissionStore - .getSubmissionMutationsByLocationOfInterestIdOnceAndStream( + .getSubmissionMutationsByLoiIdFlow( TEST_SURVEY, TEST_LOI_MUTATION.locationOfInterestId, MutationEntitySyncStatus.PENDING ) - .test() - .assertValue(listOf(TEST_SUBMISSION_MUTATION)) + .test { assertThat(expectMostRecentItem()).isEqualTo(listOf(TEST_SUBMISSION_MUTATION)) } val loi = localLoiStore.getLocationOfInterest(TEST_SURVEY, "loi id").blockingGet() var submission = localSubmissionStore.getSubmission(loi, "submission id") assertEquivalent(TEST_SUBMISSION_MUTATION, submission) @@ -247,13 +241,14 @@ class LocalDataStoreTests : BaseHiltTest() { localSubmissionStore.applyAndEnqueue(mutation) localSubmissionStore - .getSubmissionMutationsByLocationOfInterestIdOnceAndStream( + .getSubmissionMutationsByLoiIdFlow( TEST_SURVEY, TEST_LOI_MUTATION.locationOfInterestId, MutationEntitySyncStatus.PENDING ) - .test() - .assertValue(listOf(TEST_SUBMISSION_MUTATION, mutation)) + .test { + assertThat(expectMostRecentItem()).isEqualTo(listOf(TEST_SUBMISSION_MUTATION, mutation)) + } // check if the submission was updated in the local database submission = localSubmissionStore.getSubmission(loi, "submission id") diff --git a/ground/src/test/java/com/google/android/ground/repository/LocationOfInterestRepositoryTest.kt b/ground/src/test/java/com/google/android/ground/repository/LocationOfInterestRepositoryTest.kt index c9822babeb..06b3080b8c 100644 --- a/ground/src/test/java/com/google/android/ground/repository/LocationOfInterestRepositoryTest.kt +++ b/ground/src/test/java/com/google/android/ground/repository/LocationOfInterestRepositoryTest.kt @@ -99,7 +99,7 @@ class LocationOfInterestRepositoryTest : BaseHiltTest() { locationOfInterestRepository.applyAndEnqueue(mutation) locationOfInterestRepository - .getIncompleteLocationOfInterestMutationsOnceAndStream(LOCATION_OF_INTEREST.id) + .getIncompleteLoiMutationsOnceAndStream(LOCATION_OF_INTEREST.id) .test() .assertNoErrors() .assertValue(listOf(mutation.copy(id = 1)))