Skip to content

Commit

Permalink
Merge pull request #1502 from embrace-io/reduce-periodic-cache-attempts
Browse files Browse the repository at this point in the history
Address backpressure in periodic caching
  • Loading branch information
fractalwrench authored Oct 11, 2024
2 parents 8fe90df + b48bb45 commit a47a22e
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.embrace.android.embracesdk.internal.injection

import io.embrace.android.embracesdk.internal.comms.delivery.DeliveryService
import io.embrace.android.embracesdk.internal.comms.delivery.EmbraceDeliveryService
import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata
import io.embrace.android.embracesdk.internal.delivery.caching.PayloadCachingService
import io.embrace.android.embracesdk.internal.delivery.caching.PayloadCachingServiceImpl
import io.embrace.android.embracesdk.internal.delivery.execution.RequestExecutionService
Expand All @@ -16,6 +17,7 @@ import io.embrace.android.embracesdk.internal.session.orchestrator.PayloadStore
import io.embrace.android.embracesdk.internal.session.orchestrator.V1PayloadStore
import io.embrace.android.embracesdk.internal.session.orchestrator.V2PayloadStore
import io.embrace.android.embracesdk.internal.utils.Provider
import io.embrace.android.embracesdk.internal.worker.PriorityWorker
import io.embrace.android.embracesdk.internal.worker.Worker

internal class DeliveryModuleImpl(
Expand Down Expand Up @@ -64,6 +66,10 @@ internal class DeliveryModuleImpl(
deliveryServiceProvider()
}

private val dataPersistenceWorker: PriorityWorker<StoredTelemetryMetadata> by singleton {
workerThreadModule.priorityWorker(Worker.Priority.DataPersistenceWorker)
}

override val intakeService: IntakeService? by singleton {
if (configModule.configService.isOnlyUsingOtelExporters()) {
null
Expand All @@ -77,7 +83,7 @@ internal class DeliveryModuleImpl(
cacheStorageService,
initModule.logger,
initModule.jsonSerializer,
workerThreadModule.priorityWorker(Worker.Priority.DataPersistenceWorker)
dataPersistenceWorker
)
}
}
Expand Down Expand Up @@ -109,6 +115,7 @@ internal class DeliveryModuleImpl(
} else {
PayloadStorageServiceImpl(
coreModule.context,
dataPersistenceWorker,
PayloadStorageServiceImpl.OutputType.PAYLOAD,
initModule.logger
)
Expand All @@ -121,6 +128,7 @@ internal class DeliveryModuleImpl(
} else {
PayloadStorageServiceImpl(
coreModule.context,
dataPersistenceWorker,
PayloadStorageServiceImpl.OutputType.CACHE,
initModule.logger
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package io.embrace.android.embracesdk.internal.delivery.intake

import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata
import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType
import io.embrace.android.embracesdk.internal.delivery.scheduling.SchedulingService
import io.embrace.android.embracesdk.internal.delivery.storage.PayloadStorageService
import io.embrace.android.embracesdk.internal.logging.EmbLogger
import io.embrace.android.embracesdk.internal.logging.InternalErrorType
import io.embrace.android.embracesdk.internal.payload.Envelope
import io.embrace.android.embracesdk.internal.serialization.PlatformSerializer
import io.embrace.android.embracesdk.internal.worker.PriorityWorker
import java.util.concurrent.Future

class IntakeServiceImpl(
private val schedulingService: SchedulingService,
Expand All @@ -19,14 +21,30 @@ class IntakeServiceImpl(
private val shutdownTimeoutMs: Long = 3000
) : IntakeService {

private var lastCacheAttempt: Future<*>? = null
private var lastCacheType: SupportedEnvelopeType? = null

override fun shutdown() {
worker.shutdownAndWait(shutdownTimeoutMs)
}

override fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata) {
worker.submit(metadata) {
val future = worker.submit(metadata) {
processIntake(intake, metadata)
}

// cancel any cache attempts that are already pending to avoid unnecessary I/O.
if (!metadata.complete) {
val prev = lastCacheAttempt
lastCacheAttempt = future

val lastType = lastCacheType
lastCacheType = metadata.envelopeType

if (lastType == metadata.envelopeType) {
prev?.cancel(false)
}
}
}

private fun processIntake(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@ package io.embrace.android.embracesdk.internal.delivery.storage

import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata
import io.embrace.android.embracesdk.internal.injection.SerializationAction
import java.io.IOException
import java.io.InputStream

/**
* Stores a completed payload to disk. This service makes several assumptions around threading
* that MUST be adhered to:
*
* 1. All calls to [store] are made from thread A
* 2. All (external) calls to [delete] are executed on thread A (but can be submitted from
* a different thread)
* 3. The service may call [delete] internally on thread A to enforce storage limits
* 4. [store] will be called exactly once when each payload is complete & ready to send. I.e. it
* will never be called multiple times to persist an incomplete/transformed payload
* 5. For a given payload, [delete] will always be called after [store]
* 6. Callers to [loadPayloadAsStream] must be able to handle IOException when manipulating the
* 2. All external calls to [delete] can be made from any thread but will be enqueued on thread A
* 3. The service may delete files internally on thread A to enforce storage limits
* 4. For any given payload [store] will always be called before [delete]
* 5. Callers to [loadPayloadAsStream] must be able to handle IOException when manipulating the
* stream as the payload file backing the stream could be deleted at any time
*/
interface PayloadStorageService {
Expand All @@ -33,6 +31,7 @@ interface PayloadStorageService {
/**
* Loads a payload as an [InputStream]
*/
@Throws(IOException::class)
fun loadPayloadAsStream(metadata: StoredTelemetryMetadata): InputStream?

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.embrace.android.embracesdk.internal.delivery.storedTelemetryComparator
import io.embrace.android.embracesdk.internal.injection.SerializationAction
import io.embrace.android.embracesdk.internal.logging.EmbLogger
import io.embrace.android.embracesdk.internal.logging.InternalErrorType
import io.embrace.android.embracesdk.internal.worker.PriorityWorker
import java.io.File
import java.io.FileNotFoundException
import java.io.InputStream
Expand All @@ -19,6 +20,7 @@ import java.util.zip.GZIPOutputStream
*/
class PayloadStorageServiceImpl(
outputDir: Lazy<File>,
private val worker: PriorityWorker<StoredTelemetryMetadata>,
private val logger: EmbLogger,
private val storageLimit: Int = 500,
) : PayloadStorageService {
Expand All @@ -38,10 +40,11 @@ class PayloadStorageServiceImpl(

constructor(
ctx: Context,
worker: PriorityWorker<StoredTelemetryMetadata>,
outputType: OutputType,
logger: EmbLogger,
storageLimit: Int = 500,
) : this(createOutputDir(ctx, outputType, logger), logger, storageLimit)
) : this(createOutputDir(ctx, outputType, logger), worker, logger, storageLimit)

private val payloadDir by outputDir

Expand Down Expand Up @@ -94,6 +97,12 @@ class PayloadStorageServiceImpl(
}

override fun delete(metadata: StoredTelemetryMetadata) {
worker.submit(metadata) {
processDelete(metadata)
}
}

private fun processDelete(metadata: StoredTelemetryMetadata) {
try {
if (metadata.asFile().delete()) {
storedFiles.remove(metadata)
Expand Down Expand Up @@ -133,7 +142,7 @@ class PayloadStorageServiceImpl(
val removals = input
.sortedWith(storedTelemetryComparator)
.takeLast(removalCount)
removals.forEach(::delete)
removals.forEach(::processDelete)
logger.trackInternalError(InternalErrorType.PAYLOAD_STORAGE_FAIL, RuntimeException("Pruned payload storage"))

// notify the caller whether the new payload should be dropped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,23 @@ class IntakeServiceImplTest {
assertEquals(0, schedulingService.payloadIntakeCount)
}

@Test
fun `multiple cache attempts are ignored`() {
val cache1 = StoredTelemetryMetadata(clock.now(), UUID, "1", SESSION, complete = false)
val cache2 = StoredTelemetryMetadata(clock.now(), UUID, "2", SESSION, complete = false)
val cache3 = StoredTelemetryMetadata(clock.now(), UUID, "3", SESSION, complete = false)
intakeService.take(sessionEnvelope, cache1)
intakeService.take(sessionEnvelope, cache2)
intakeService.take(sessionEnvelope, cache3)
executorService.runCurrentlyBlocked()

// only one file was stored and it's the most recent one
val filename = cacheStorageService.storedFilenames().single()
val metadata = StoredTelemetryMetadata.fromFilename(filename).getOrThrow()
assertEquals(SESSION, metadata.envelopeType)
assertEquals("3", metadata.processId)
}

@Test
fun `exception in payload storage`() {
payloadStorageService.failStorage = true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.embrace.android.embracesdk.internal.delivery.storage

import io.embrace.android.embracesdk.concurrency.BlockableExecutorService
import io.embrace.android.embracesdk.fakes.FakeEmbLogger
import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata
import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType.CRASH
import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType.LOG
import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType.NETWORK
import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType.SESSION
import io.embrace.android.embracesdk.internal.delivery.storedTelemetryComparator
import io.embrace.android.embracesdk.internal.worker.PriorityWorker
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertNull
Expand All @@ -28,13 +30,15 @@ class PayloadStorageServiceImplTest {
private lateinit var service: PayloadStorageService
private lateinit var outputDir: File
private lateinit var logger: FakeEmbLogger
private lateinit var worker: PriorityWorker<StoredTelemetryMetadata>

@Before
fun setUp() {
outputDir = Files.createTempDirectory("output").toFile()
outputDir.deleteRecursively()
worker = PriorityWorker(BlockableExecutorService(false))
logger = FakeEmbLogger(false)
service = PayloadStorageServiceImpl(lazy { outputDir }, logger)
service = PayloadStorageServiceImpl(lazy { outputDir }, worker, logger)
}

@Test
Expand Down Expand Up @@ -84,7 +88,7 @@ class PayloadStorageServiceImplTest {
@Test
fun `test objects pruned past limit`() {
assertNull(outputDir.listFiles())
service = PayloadStorageServiceImpl(lazy { outputDir }, logger, 4)
service = PayloadStorageServiceImpl(lazy { outputDir }, worker, logger, 4)

// exceed storage limit
listOf(
Expand Down

0 comments on commit a47a22e

Please sign in to comment.