Skip to content

Commit

Permalink
Convert a bunch of mutations related streams to Flow and suspend funs. (
Browse files Browse the repository at this point in the history
#2048)

* Convert a bunch of mutations related streams to Flow and suspend funs.

Updates mutations, submission and LOI, to use FLow and suspend functions. Also updates tests accordingly. I've verified the sync UI works as expected after this change.

In one or two places, I've used RX interop to avoid needing to scope-bloat the conversions at this time, but we should be able to eliminate those calls relatively soon.

* Stores+Sync Status VM: Rename a few methods to adhere to conventions; refactor based on feedback
  • Loading branch information
scolsen authored Nov 7, 2023
1 parent be39c55 commit 172107e
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import androidx.room.Dao
import androidx.room.Query
import com.google.android.ground.persistence.local.room.entity.LocationOfInterestMutationEntity
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus
import com.google.android.ground.rx.annotations.Cold
import io.reactivex.Flowable
import kotlinx.coroutines.flow.Flow

/**
* Provides low-level read/write operations of [LocationOfInterestMutationEntity] to/from the local
Expand All @@ -29,14 +28,14 @@ import io.reactivex.Flowable
@Dao
interface LocationOfInterestMutationDao : BaseDao<LocationOfInterestMutationEntity> {
@Query("SELECT * FROM location_of_interest_mutation")
fun loadAllOnceAndStream(): Flowable<List<LocationOfInterestMutationEntity>>
fun getAllMutationsFlow(): Flow<List<LocationOfInterestMutationEntity>>

@Query(
"SELECT * FROM location_of_interest_mutation " +
"WHERE location_of_interest_id = :locationOfInterestId " +
"AND state IN (:allowedStates)"
)
suspend fun findByLocationOfInterestId(
suspend fun getMutations(
locationOfInterestId: String,
vararg allowedStates: MutationEntitySyncStatus
): List<LocationOfInterestMutationEntity>?
Expand All @@ -46,8 +45,8 @@ interface LocationOfInterestMutationDao : BaseDao<LocationOfInterestMutationEnti
"WHERE location_of_interest_id = :locationOfInterestId " +
"AND state IN (:allowedStates)"
)
fun findByLocationOfInterestIdOnceAndStream(
fun getMutationsFlow(
locationOfInterestId: String,
vararg allowedStates: MutationEntitySyncStatus
): @Cold(terminates = false) Flowable<List<LocationOfInterestMutationEntity>>
vararg allowedStates: MutationEntitySyncStatus,
): Flow<List<LocationOfInterestMutationEntity>>
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ import androidx.room.Query
import com.google.android.ground.persistence.local.room.entity.SubmissionMutationEntity
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus
import com.google.android.ground.persistence.local.room.fields.MutationEntityType
import com.google.android.ground.rx.annotations.Cold
import io.reactivex.Flowable
import kotlinx.coroutines.flow.Flow

/** Data access object for database operations related to [SubmissionMutationEntity]. */
@Dao
interface SubmissionMutationDao : BaseDao<SubmissionMutationEntity> {
@Query("SELECT * FROM submission_mutation")
fun loadAllOnceAndStream(): Flowable<List<SubmissionMutationEntity>>
fun getAllMutationsFlow(): Flow<List<SubmissionMutationEntity>>

@Query(
"SELECT * FROM submission_mutation " +
Expand Down Expand Up @@ -62,8 +61,8 @@ interface SubmissionMutationDao : BaseDao<SubmissionMutationEntity> {
"SELECT * FROM submission_mutation " +
"WHERE location_of_interest_id = :locationOfInterestId AND state IN (:allowedStates)"
)
fun findByLocationOfInterestIdOnceAndStream(
fun findByLoiIdFlow(
locationOfInterestId: String,
vararg allowedStates: MutationEntitySyncStatus
): @Cold(terminates = false) Flowable<List<SubmissionMutationEntity>>
): Flow<List<SubmissionMutationEntity>>
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import io.reactivex.Flowable
import io.reactivex.Maybe
import javax.inject.Inject
import javax.inject.Singleton
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import timber.log.Timber

Expand Down Expand Up @@ -125,22 +126,25 @@ class RoomLocationOfInterestStore @Inject internal constructor() : LocalLocation
}
}

override fun getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream(
override fun getMutationsFlow(
locationOfInterestId: String,
vararg allowedStates: MutationEntitySyncStatus
): Flowable<List<LocationOfInterestMutation>> =
locationOfInterestMutationDao
.findByLocationOfInterestIdOnceAndStream(locationOfInterestId, *allowedStates)
.map { list: List<LocationOfInterestMutationEntity> -> list.map { it.toModelObject() } }
): Flow<List<LocationOfInterestMutation>> =
locationOfInterestMutationDao.getMutationsFlow(locationOfInterestId, *allowedStates).map {
mutations ->
mutations.map { it.toModelObject() }
}

override fun getAllMutationsAndStream(): Flowable<List<LocationOfInterestMutationEntity>> =
locationOfInterestMutationDao.loadAllOnceAndStream()
override fun getAllSurveyMutations(survey: Survey): Flow<List<LocationOfInterestMutation>> =
locationOfInterestMutationDao.getAllMutationsFlow().map { mutations ->
mutations.filter { it.surveyId == survey.id }.map { it.toModelObject() }
}

override suspend fun findByLocationOfInterestId(
id: String,
vararg states: MutationEntitySyncStatus
): List<LocationOfInterestMutationEntity> =
locationOfInterestMutationDao.findByLocationOfInterestId(id, *states) ?: listOf()
locationOfInterestMutationDao.getMutations(id, *states) ?: listOf()

override suspend fun insertOrUpdate(loi: LocationOfInterest) =
locationOfInterestDao.insertOrUpdate(loi.toLocalDataStoreObject())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ import com.google.android.ground.persistence.local.stores.LocalSubmissionStore
import com.google.android.ground.rx.Schedulers
import com.google.android.ground.util.Debug.logOnFailure
import com.google.firebase.crashlytics.FirebaseCrashlytics
import io.reactivex.Flowable
import io.reactivex.Single
import javax.inject.Inject
import javax.inject.Singleton
import kotlinx.collections.immutable.toPersistentList
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import timber.log.Timber

/** Manages access to [Submission] objects persisted in local storage. */
Expand Down Expand Up @@ -195,14 +196,15 @@ class RoomSubmissionStore @Inject internal constructor() : LocalSubmissionStore
submissionDao.findByIdSuspend(submissionId)?.let { submissionDao.delete(it) }
}

override fun getSubmissionMutationsByLocationOfInterestIdOnceAndStream(
override fun getSubmissionMutationsByLoiIdFlow(
survey: Survey,
locationOfInterestId: String,
vararg allowedStates: MutationEntitySyncStatus
): Flowable<List<SubmissionMutation>> =
submissionMutationDao
.findByLocationOfInterestIdOnceAndStream(locationOfInterestId, *allowedStates)
.map { list: List<SubmissionMutationEntity> -> list.map { it.toModelObject(survey) } }
): Flow<List<SubmissionMutation>> =
submissionMutationDao.findByLoiIdFlow(locationOfInterestId, *allowedStates).map {
list: List<SubmissionMutationEntity> ->
list.map { it.toModelObject(survey) }
}

override suspend fun applyAndEnqueue(mutation: SubmissionMutation) {
try {
Expand All @@ -216,8 +218,10 @@ class RoomSubmissionStore @Inject internal constructor() : LocalSubmissionStore
}
}

override fun getAllMutationsAndStream(): Flowable<List<SubmissionMutationEntity>> =
submissionMutationDao.loadAllOnceAndStream()
override fun getAllSurveyMutationsFlow(survey: Survey): Flow<List<SubmissionMutation>> =
submissionMutationDao.getAllMutationsFlow().map { mutations ->
mutations.filter { it.surveyId == survey.id }.map { it.toModelObject(survey) }
}

override suspend fun findByLocationOfInterestId(
loidId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import com.google.android.ground.model.mutation.LocationOfInterestMutation
import com.google.android.ground.persistence.local.room.entity.LocationOfInterestMutationEntity
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus
import com.google.android.ground.rx.annotations.Cold
import io.reactivex.Flowable
import io.reactivex.Maybe
import kotlinx.coroutines.flow.Flow

Expand All @@ -46,12 +45,17 @@ interface LocalLocationOfInterestStore :
* Emits the list of [LocationOfInterestMutation] instances for a given LOI which match the
* provided `allowedStates`. A new list is emitted on each subsequent change.
*/
fun getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream(
fun getMutationsFlow(
locationOfInterestId: String,
vararg allowedStates: MutationEntitySyncStatus
): Flowable<List<LocationOfInterestMutation>>
): Flow<List<LocationOfInterestMutation>>

fun getAllMutationsAndStream(): Flowable<List<LocationOfInterestMutationEntity>>
/**
* Returns a [Flow] that emits a [List] of all [LocationOfInterestMutation]s stored in the local
* db related to a given [Survey]. A new [List] is emitted on each change to the underlying saved
* data.
*/
fun getAllSurveyMutations(survey: Survey): Flow<List<LocationOfInterestMutation>>

suspend fun findByLocationOfInterestId(
id: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.google.android.ground.model.mutation.SubmissionMutation
import com.google.android.ground.model.submission.Submission
import com.google.android.ground.persistence.local.room.entity.SubmissionMutationEntity
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus
import io.reactivex.Flowable
import kotlinx.coroutines.flow.Flow

interface LocalSubmissionStore : LocalMutationStore<SubmissionMutation, Submission> {
/**
Expand All @@ -43,16 +43,20 @@ interface LocalSubmissionStore : LocalMutationStore<SubmissionMutation, Submissi
suspend fun deleteSubmission(submissionId: String)

/**
* Emits the list of [SubmissionMutation] instances for a given LOI which match the provided
* `allowedStates`. A new list is emitted on each subsequent change.
* Returns a [Flow] that emits the list of [SubmissionMutation] instances for a given LOI which
* match the provided `allowedStates`. A new list is emitted on each subsequent change.
*/
fun getSubmissionMutationsByLocationOfInterestIdOnceAndStream(
fun getSubmissionMutationsByLoiIdFlow(
survey: Survey,
locationOfInterestId: String,
vararg allowedStates: MutationEntitySyncStatus
): Flowable<List<SubmissionMutation>>
): Flow<List<SubmissionMutation>>

fun getAllMutationsAndStream(): Flowable<List<SubmissionMutationEntity>>
/**
* Returns a [Flow] that emits a list of all [SubmissionMutation]s associated with a given
* [Survey]. The list is newly emitted each time the underlying local data changes.
*/
fun getAllSurveyMutationsFlow(survey: Survey): Flow<List<SubmissionMutation>>

suspend fun findByLocationOfInterestId(
loidId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.rx2.asFlowable
import kotlinx.coroutines.rx2.awaitSingleOrNull

/**
* Coordinates persistence and retrieval of [LocationOfInterest] instances from remote, local, and
Expand Down Expand Up @@ -73,6 +75,7 @@ constructor(
}

/** This only works if the survey and location of interests are already cached to local db. */
@Deprecated("Prefer getOfflineLocationOfInterestSuspend")
fun getOfflineLocationOfInterest(
surveyId: String,
locationOfInterest: String
Expand All @@ -84,6 +87,15 @@ constructor(
Single.error { NotFoundException("Location of interest not found $locationOfInterest") }
)

suspend fun getOfflineLoiSuspend(
surveyId: String,
locationOfInterest: String
): LocationOfInterest =
localSurveyStore.getSurveyByIdSuspend(surveyId)?.let {
localLoiStore.getLocationOfInterest(it, locationOfInterest).awaitSingleOrNull()
}
?: throw NotFoundException("Location of interest not found $locationOfInterest")

fun createLocationOfInterest(geometry: Geometry, job: Job, surveyId: String): LocationOfInterest {
val auditInfo = AuditInfo(authManager.currentUser)
return LocationOfInterest(
Expand Down Expand Up @@ -115,15 +127,17 @@ constructor(
* have not yet been marked as [SyncStatus.COMPLETED], including pending, in progress, and failed
* mutations. A new list is emitted on each subsequent change.
*/
fun getIncompleteLocationOfInterestMutationsOnceAndStream(
fun getIncompleteLoiMutationsOnceAndStream(
locationOfInterestId: String
): Flowable<List<LocationOfInterestMutation>> =
localLoiStore.getLocationOfInterestMutationsByLocationOfInterestIdOnceAndStream(
locationOfInterestId,
MutationEntitySyncStatus.PENDING,
MutationEntitySyncStatus.IN_PROGRESS,
MutationEntitySyncStatus.FAILED
)
localLoiStore
.getMutationsFlow(
locationOfInterestId,
MutationEntitySyncStatus.PENDING,
MutationEntitySyncStatus.IN_PROGRESS,
MutationEntitySyncStatus.FAILED
)
.asFlowable()

/** Returns a flow of all [LocationOfInterest] associated with the given [Survey]. */
fun getLocationsOfInterests(survey: Survey): Flow<Set<LocationOfInterest>> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ import com.google.android.ground.persistence.local.room.fields.MutationEntitySyn
import com.google.android.ground.persistence.local.stores.LocalLocationOfInterestStore
import com.google.android.ground.persistence.local.stores.LocalSubmissionStore
import com.google.android.ground.persistence.local.stores.LocalSurveyStore
import com.google.android.ground.rx.Schedulers
import com.google.android.ground.rx.annotations.Cold
import io.reactivex.Flowable
import javax.inject.Inject
import javax.inject.Singleton
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.combine

/**
* Coordinates persistence of mutations across [LocationOfInterestMutation] and [SubmissionMutation]
Expand All @@ -44,33 +43,17 @@ constructor(
private val localSurveyStore: LocalSurveyStore,
private val localLocationOfInterestStore: LocalLocationOfInterestStore,
private val localSubmissionStore: LocalSubmissionStore,
private val schedulers: Schedulers
) {
/**
* Returns a long-lived stream that emits the full list of mutations for specified survey on
* subscribe and a new list on each subsequent change.
*/
fun getMutationsOnceAndStream(
survey: Survey
): @Cold(terminates = false) Flowable<List<Mutation>> {
fun getSurveyMutationsFlow(survey: Survey): Flow<List<Mutation>> {
// TODO: Show mutations for all surveys, not just current one.
val locationOfInterestMutations =
localLocationOfInterestStore
.getAllMutationsAndStream()
.map { it.filter { it.surveyId == survey.id }.map { it.toModelObject() } }
.subscribeOn(schedulers.io())
val submissionMutations =
localSubmissionStore
.getAllMutationsAndStream()
.map { list: List<SubmissionMutationEntity> ->
list.filter { it.surveyId == survey.id }.map { it.toModelObject(survey) }
}
.subscribeOn(schedulers.io())
return Flowable.combineLatest(
locationOfInterestMutations,
submissionMutations,
this::combineAndSortMutations
)
val locationOfInterestMutations = localLocationOfInterestStore.getAllSurveyMutations(survey)
val submissionMutations = localSubmissionStore.getAllSurveyMutationsFlow(survey)

return locationOfInterestMutations.combine(submissionMutations, this::combineAndSortMutations)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ import com.google.android.ground.persistence.uuid.OfflineUuidGenerator
import com.google.android.ground.rx.annotations.Cold
import com.google.android.ground.system.auth.AuthenticationManager
import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.Single
import javax.inject.Inject
import javax.inject.Singleton
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.rx2.rxCompletable
import kotlinx.coroutines.rx2.rxMaybe
import kotlinx.coroutines.rx2.rxSingle

/**
Expand Down Expand Up @@ -137,21 +137,20 @@ constructor(
* been marked as [SyncStatus.COMPLETED], including pending, in progress, and failed mutations. A
* new list is emitted on each subsequent change.
*/
fun getIncompleteSubmissionMutationsOnceAndStream(
suspend fun getIncompleteSubmissionMutationsOnceAndStream(
surveyId: String,
locationOfInterestId: String
): Flowable<List<SubmissionMutation>> =
rxMaybe { localSurveyStore.getSurveyByIdSuspend(surveyId) }
.toFlowable()
.flatMap {
localSubmissionStore.getSubmissionMutationsByLocationOfInterestIdOnceAndStream(
it,
locationOfInterestId,
MutationEntitySyncStatus.PENDING,
MutationEntitySyncStatus.IN_PROGRESS,
MutationEntitySyncStatus.FAILED
)
}
): Flow<List<SubmissionMutation>> {
val survey = localSurveyStore.getSurveyByIdSuspend(surveyId) ?: return flowOf()

return localSubmissionStore.getSubmissionMutationsByLoiIdFlow(
survey,
locationOfInterestId,
MutationEntitySyncStatus.PENDING,
MutationEntitySyncStatus.IN_PROGRESS,
MutationEntitySyncStatus.FAILED
)
}

suspend fun getTotalSubmissionCount(loi: LocationOfInterest) =
loi.submissionCount + getPendingCreateCount(loi.id) - getPendingDeleteCount(loi.id)
Expand Down
Loading

0 comments on commit 172107e

Please sign in to comment.