Skip to content

Commit

Permalink
Rewrite sync worker WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
gino-m committed Nov 25, 2024
1 parent 8c0193a commit 7815e5b
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.util.Date
* by completing the data collection flow and clicking "Submit".
*/
data class UploadQueueEntry(
val userId: String,
val clientTimestamp: Date,
val uploadStatus: Mutation.SyncStatus,
val loiMutation: LocationOfInterestMutation?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ class RoomLocationOfInterestStore @Inject internal constructor() : LocalLocation
@Inject lateinit var userStore: RoomUserStore

/**
* Retrieves the complete set of [LocationOfInterest] associated with the given [Survey] from the
* local database and returns a [Flow] that continually emits the complete set anew any time the
* underlying table changes (insertions, deletions, updates).
* Retrieves the [LocationOfInterest]s associated with the given [Survey] from the local database
* and returns a [Flow] that continually emits the complete set anew any time the underlying table
* changes (insertions, deletions, updates). LOIs marked as "deleted" are not included.
*/
override fun getValidLois(survey: Survey): Flow<Set<LocationOfInterest>> =
override fun getLoisInSurvey(survey: Survey): Flow<Set<LocationOfInterest>> =
locationOfInterestDao.getByDeletionState(survey.id, EntityDeletionState.DEFAULT).map {
toLocationsOfInterest(survey, it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ interface LocalLocationOfInterestStore :
* Returns a main-safe flow that emits the full set of LOIs for a survey on subscribe, and
* continues to return the full set each time a LOI is added/changed/removed.
*/
fun getValidLois(survey: Survey): Flow<Set<LocationOfInterest>>
fun getLoisInSurvey(survey: Survey): Flow<Set<LocationOfInterest>>

/** Returns the LOI with the specified UUID from the local data store, if found. */
suspend fun getLocationOfInterest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import androidx.work.ListenableWorker.Result.success
import androidx.work.WorkerParameters
import com.google.android.ground.model.User
import com.google.android.ground.model.mutation.Mutation
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus.FAILED
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus.IN_PROGRESS
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus.PENDING
import com.google.android.ground.model.submission.UploadQueueEntry
import com.google.android.ground.persistence.remote.RemoteDataStore
import com.google.android.ground.persistence.sync.LocalMutationSyncWorker.Companion.createInputData
import com.google.android.ground.repository.MutationRepository
Expand Down Expand Up @@ -54,35 +52,32 @@ constructor(
private val userRepository: UserRepository,
) : CoroutineWorker(context, params) {

private val locationOfInterestId: String =
params.inputData.getString(LOCATION_OF_INTEREST_ID_PARAM_KEY)!!

override suspend fun doWork(): Result = withContext(Dispatchers.IO) { doWorkInternal() }

private suspend fun doWorkInternal(): Result =
try {
val mutations = getIncompleteMutations()
Timber.d("Syncing ${mutations.size} changes for LOI $locationOfInterestId")
val result = processMutations(mutations)
mediaUploadWorkManager.enqueueSyncWorker(locationOfInterestId)
if (result) success() else retry()
} catch (t: Throwable) {
Timber.e(t, "Failed to sync changes for LOI $locationOfInterestId")
retry()
override suspend fun doWork(): Result =
withContext(Dispatchers.IO) {
val queue = mutationRepository.getPendingUploads()
Timber.d("Uploading ${queue.size} additions / changes")
if (queue.map { processQueueEntry(it) }.all { it }) success() else retry()
// TODO: Update MediaUploader to work on entire queue, trigger when complete.
// mediaUploadWorkManager.enqueueSyncWorker(locationOfInterestId)
}

/**
* Attempts to fetch all mutations from the [MutationRepository] that are `PENDING`, `FAILED`, or
* `IN_PROGRESS` state. The latter should never occur since only on worker should be scheduled per
* LOI at a given time.
* Uploads a chunk of data to the remote data store, updating the upload status in the queue
* accordingly.
*
* @return `true` if all data was uploaded, `false` if at least one failed.
*/
private suspend fun getIncompleteMutations(): List<Mutation> =
mutationRepository.getMutations(locationOfInterestId, PENDING, FAILED, IN_PROGRESS)
private suspend fun processQueueEntry(entry: UploadQueueEntry): Boolean {
val mutations = listOfNotNull(entry.loiMutation, entry.submissionMutation)
return processMutations(mutations)
}

/**
* Applies mutations to remote data store. Once successful, removes them from the local db.
* Applies mutations to remote data store, updating their status in the queue accordingly. Catches
* and handles all exceptions.
*
* @return `true` if the mutations were successfully synced with [RemoteDataStore].
* @return `true` if all mutations were successfully synced with [RemoteDataStore], `false` if at
* least one failed.
*/
private suspend fun processMutations(mutations: List<Mutation>): Boolean {
if (mutations.isEmpty()) return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ constructor(

/** Returns a flow of all valid (not deleted) [LocationOfInterest] in the given [Survey]. */
fun getValidLois(survey: Survey): Flow<Set<LocationOfInterest>> =
localLoiStore.getValidLois(survey)
localLoiStore.getLoisInSurvey(survey)

/** Returns a flow of all [LocationOfInterest] within the map bounds (viewport). */
fun getWithinBounds(survey: Survey, bounds: Bounds): Flow<List<LocationOfInterest>> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ constructor(
* Returns a [List] of incomplete operations still in the upload queue, sorted in chronological
* order (FIFO).
*/
suspend fun getUploadQueue(): List<UploadQueueEntry> =
suspend fun getPendingUploads(): List<UploadQueueEntry> =
getUploadQueueFlow(includeCompleted = false).first()

/**
Expand Down Expand Up @@ -119,9 +119,10 @@ constructor(
.map {
val loiMutation = loiMutationMap[it]
val submissionMutation = submissionMutationMap[it]
val userId = submissionMutation?.userId ?: loiMutation!!.userId
val clientTimestamp = submissionMutation?.clientTimestamp ?: loiMutation!!.clientTimestamp
val syncStatus = submissionMutation?.syncStatus ?: loiMutation!!.syncStatus
UploadQueueEntry(clientTimestamp, syncStatus, loiMutation, submissionMutation)
UploadQueueEntry(userId, clientTimestamp, syncStatus, loiMutation, submissionMutation)
}
.sortedBy { it.clientTimestamp }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class LocalDataStoreTests : BaseHiltTest() {

val loi = localLoiStore.getLocationOfInterest(TEST_SURVEY, FakeData.LOI_ID)

localLoiStore.getValidLois(TEST_SURVEY).test {
localLoiStore.getLoisInSurvey(TEST_SURVEY).test {
assertThat(expectMostRecentItem()).isEqualTo(setOf(loi))
}
}
Expand Down Expand Up @@ -312,7 +312,7 @@ class LocalDataStoreTests : BaseHiltTest() {

// Assert that one LOI is streamed.
val loi = localLoiStore.getLocationOfInterest(TEST_SURVEY, FakeData.LOI_ID)!!
localLoiStore.getValidLois(TEST_SURVEY).test {
localLoiStore.getLoisInSurvey(TEST_SURVEY).test {
assertThat(expectMostRecentItem()).isEqualTo(setOf(loi))
}
val mutation = TEST_LOI_MUTATION.copy(id = null, type = Mutation.Type.DELETE)
Expand All @@ -325,7 +325,7 @@ class LocalDataStoreTests : BaseHiltTest() {
.isEqualTo(EntityDeletionState.DELETED)

// Verify that the local LOI is now removed from the latest LOI stream.
localLoiStore.getValidLois(TEST_SURVEY).test { assertThat(expectMostRecentItem()).isEmpty() }
localLoiStore.getLoisInSurvey(TEST_SURVEY).test { assertThat(expectMostRecentItem()).isEmpty() }

// After successful remote sync, delete LOI is called by LocalMutationSyncWorker.
localLoiStore.deleteLocationOfInterest(FakeData.LOI_ID)
Expand Down

0 comments on commit 7815e5b

Please sign in to comment.