Skip to content

Commit

Permalink
- Improving AsyncSemaphore API
Browse files Browse the repository at this point in the history
- Update Kotlin to 1.9.21
- Update AWS SDK to 1.23.x
  • Loading branch information
gabfssilva committed Jan 14, 2024
1 parent 3fe11d9 commit 5e5651d
Show file tree
Hide file tree
Showing 21 changed files with 258 additions and 108 deletions.
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ subprojects {
apply(plugin = "java-library")
apply(plugin = "signing")

version = "1.0.0-alpha11"
version = "1.0.0-alpha12"

group = "com.river-kt"

Expand Down Expand Up @@ -192,6 +192,7 @@ subprojects {
dependencies {
api(rootProject.libs.coroutines)
testImplementation(rootProject.libs.kotest.junit5)
testImplementation(rootProject.libs.turbine)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.river.connector.aws.s3

import com.river.core.*
import com.river.core.GroupStrategy.Count
import com.river.core.asByteArray
import com.river.core.asBytes
import com.river.core.intersperse
import io.kotest.core.spec.style.FeatureSpec
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeTypeOf
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.future.await
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
Expand Down Expand Up @@ -111,12 +112,8 @@ class S3AsyncClientExtTest : FeatureSpec({
count shouldBe 2104969
}

// Localstack seems to not like selectObjectContent very much,
// the following example should return the first item, but it doesn't.
// I tested directly against AWS S3, and, it works just fine, so I assume it's probably a Localstack bug.
// Also, Localstack prints the following error to the console:
// ProtocolSerializerError: Expected iterator for streaming event serialization.
scenario("Querying data using selectObjectContent") {
// This is a pro Localstack feature. Disabled for now
xscenario("Querying data using selectObjectContent") {
val items = 100

s3Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,12 +499,12 @@ fun SqsAsyncClient.onMessage(
receiveConfiguration: ReceiveConfiguration.() -> Unit = {},
commitConfiguration: CommitConfiguration.() -> Unit = {},
onError: OnError = OnError.Retry(250.milliseconds),
onMessage: suspend (Message) -> MessageAcknowledgment<Acknowledgment>
onMessage: suspend (Message) -> Acknowledgment
): Job = onMessages(
queueName = queueName,
concurrency = concurrency,
groupStrategy = GroupStrategy.Count(1),
receiveConfiguration = receiveConfiguration,
commitConfiguration = commitConfiguration,
onError = onError
) { messages -> messages.map { onMessage(it) } }
) { messages -> messages.map { MessageAcknowledgment(it, onMessage(it)) } }
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class SqsFlowExtKtTest : FeatureSpec({
receiveRequest { waitTimeSeconds = 0 }
}
) {
it.acknowledgeWith(Acknowledgment.Delete)
Acknowledgment.Delete
}

consumerJob.join()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import kotlin.time.Duration.Companion.milliseconds
* collection.insert(documentsFlow).collect { result -> println("Document inserted: ${result.insertedId}") }
* ```
*/
fun <T> MongoCollection<T>.insert(
fun <T : Any> MongoCollection<T>.insert(
flow: Flow<T>,
concurrency: Int = 1,
): Flow<InsertOneResult> =
Expand Down Expand Up @@ -139,7 +139,7 @@ fun MongoCollection<Document>.updateMany(
* collection.replace(replacementsFlow, filter).collect { result -> println("Documents replaced: ${result.modifiedCount}") }
* ```
*/
fun <T> MongoCollection<T>.replace(
fun <T : Any> MongoCollection<T>.replace(
flow: Flow<T>,
filter: Bson,
concurrency: Int = 1,
Expand All @@ -162,7 +162,7 @@ fun <T> MongoCollection<T>.replace(
* collection.replace(replacementsFlow).collect { result -> println("Documents replaced: ${result.modifiedCount}") }
* ```
*/
fun <T> MongoCollection<T>.replace(
fun <T : Any> MongoCollection<T>.replace(
flow: Flow<Pair<Bson, T>>,
concurrency: Int = 1,
): Flow<UpdateResult> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fun RedissonClient.semaphore(
name: String,
concurrencyLevel: Int,
leaseTime: Duration = 10.seconds
): suspend CoroutineScope.() -> AsyncSemaphore = {
): suspend CoroutineScope.() -> AsyncSemaphore<String> = {
val semaphore = getPermitExpirableSemaphore(name)
semaphore.setPermitsAsync(concurrencyLevel).coAwait()
RedisAsyncSemaphore(concurrencyLevel, this, leaseTime, semaphore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import kotlinx.coroutines.future.await as coAwait
@ExperimentalCoroutinesApi
internal class RedisAsyncSemaphore(
override val totalPermits: Int,
val scope: CoroutineScope,
val leaseTime: Duration,
private val scope: CoroutineScope,
private val leaseTime: Duration,
private val rSemaphore: RPermitExpirableSemaphore
) : AsyncSemaphore {
) : AsyncSemaphore<String> {
private val mutex = Mutex()
private val acquired: MutableMap<String, Job> = mutableMapOf()

Expand Down
11 changes: 6 additions & 5 deletions core/src/main/kotlin/com/river/core/AsyncSemaphore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import kotlin.time.Duration
* A semaphore maintains a set of permits and each `acquire()` blocks if necessary until a permit is available,
* and then takes it. Each `release()` adds a permit, potentially releasing an acquirer.
*/
interface AsyncSemaphore {
interface AsyncSemaphore<P> {
/**
* The total number of permits this semaphore can provide.
*/
Expand All @@ -26,20 +26,20 @@ interface AsyncSemaphore {
/**
* Acquires a permit from this semaphore, suspending until one is available.
*/
suspend fun acquire(): String
suspend fun acquire(): P

/**
* Tries to acquire a permit from this semaphore. This function is marked as a suspend function because it may
* perform I/O operations, but it won't suspend in case that no permit is available at the moment.
*
* @return `permit` if a permit was acquired and `null` otherwise.
*/
suspend fun tryAcquire(): String?
suspend fun tryAcquire(): P?

/**
* Releases a permit, returning it to the semaphore.
*/
suspend fun release(permit: String)
suspend fun release(permit: P)

/**
* Releases all permits back to the semaphore.
Expand All @@ -50,14 +50,15 @@ interface AsyncSemaphore {
/**
* Returns an instance of an [AsyncSemaphore] with the specified number of permits.
*
* @param scope The number of permits this semaphore can provide.
* @param permits The number of permits this semaphore can provide.
* @return An [AsyncSemaphore] instance with the specified number of permits.
*/
operator fun invoke(
scope: CoroutineScope,
permits: Int,
leaseTime: Duration? = null
): AsyncSemaphore =
): AsyncSemaphore<Int> =
DefaultAsyncSemaphore(scope, permits, leaseTime)
}
}
28 changes: 14 additions & 14 deletions core/src/main/kotlin/com/river/core/FlowAsyncExt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ fun <T, R> Flow<T>.mapAsync(
*
* @return A new flow with the transformed items.
*/
fun <T, R> Flow<T>.mapAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore,
fun <T, R, P> Flow<T>.mapAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>,
transform: suspend (T) -> R
): Flow<R> = MapAsyncFlow(
upstream = this,
Expand Down Expand Up @@ -107,8 +107,8 @@ fun <T, R> Flow<T>.unorderedMapAsync(
*
* @return A new flow with the transformed items.
*/
fun <T, R> Flow<T>.unorderedMapAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore,
fun <T, R, P> Flow<T>.unorderedMapAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>,
transform: suspend (T) -> R
): Flow<R> = UnorderedMapAsyncFlow(
upstream = this,
Expand Down Expand Up @@ -163,8 +163,8 @@ fun <T, R> Flow<T>.flatMapIterableAsync(
*
* @return A new flow with the transformed and flattened items.
*/
fun <T, R> Flow<T>.flatMapIterableAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore,
fun <T, R, P> Flow<T>.flatMapIterableAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>,
transform: suspend (T) -> Iterable<R>
): Flow<R> = mapAsync(semaphore, transform).flattenIterable()

Expand Down Expand Up @@ -215,8 +215,8 @@ fun <T, R> Flow<T>.unorderedFlatMapIterableAsync(
*
* @return A new flow with the transformed and flattened items.
*/
fun <T, R> Flow<T>.unorderedFlatMapIterableAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore,
fun <T, R, P> Flow<T>.unorderedFlatMapIterableAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>,
transform: suspend (T) -> Iterable<R>
): Flow<R> = unorderedMapAsync(semaphore, transform).flattenIterable()

Expand Down Expand Up @@ -266,8 +266,8 @@ fun <T> Flow<T>.onEachAsync(
*
* @return The same flow with the side-effecting [block] applied to each item.
*/
fun <T> Flow<T>.onEachAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore,
fun <T, P> Flow<T>.onEachAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>,
block: suspend (T) -> Unit
): Flow<T> = mapAsync(semaphore) { it.also { block(it) } }

Expand Down Expand Up @@ -317,8 +317,8 @@ fun <T> Flow<T>.unorderedOnEachAsync(
*
* @return A new flow with the same elements, side-effecting [block] applied to each item, possibly unordered.
*/
fun <T> Flow<T>.unorderedOnEachAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore,
fun <T, P> Flow<T>.unorderedOnEachAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>,
block: suspend (T) -> Unit
): Flow<T> = unorderedMapAsync(semaphore) { it.also { block(it) } }

Expand Down Expand Up @@ -357,7 +357,7 @@ suspend fun <T> Flow<T>.collectAsync(
* @param semaphore The suspending function that creates an [AsyncSemaphore].
* @param block The transformation function to apply to each item.
*/
suspend fun <T> Flow<T>.collectAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore,
suspend fun <T, P> Flow<T>.collectAsync(
semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>,
block: suspend (T) -> Unit
): Unit = onEachAsync(semaphore, block).collect()
21 changes: 0 additions & 21 deletions core/src/main/kotlin/com/river/core/FlowBigIntegerExt.kt

This file was deleted.

8 changes: 3 additions & 5 deletions core/src/main/kotlin/com/river/core/FlowByteArrayExt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package com.river.core

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map

import java.nio.ByteBuffer
import java.nio.charset.Charset

/**
* Converts the [Flow] of [ByteArray] to a [Flow] of [ByteBuffer].
Expand All @@ -15,9 +15,7 @@ fun Flow<ByteArray>.asByteBuffer(): Flow<ByteBuffer> = map { ByteBuffer.wrap(it)
/**
* Converts the [Flow] of [ByteArray] to a [Flow] of [String].
*
* @param charset The [Charset] to use for converting the bytes to strings. Defaults to the system's default charset.
*
* @return A new [Flow] of [String] converted from the original [Flow] of [ByteArray].
*/
fun Flow<ByteArray>.asString(charset: Charset = Charset.defaultCharset()): Flow<String> =
map { String(it, charset) }
fun Flow<ByteArray>.asString(): Flow<String> =
map { it.decodeToString() }
14 changes: 8 additions & 6 deletions core/src/main/kotlin/com/river/core/FlowByteExt.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.river.core

import kotlinx.coroutines.flow.*
import java.nio.charset.Charset
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.fold
import kotlinx.coroutines.flow.map

/**
* Converts the [Flow] of [Byte] to a [Flow] of [ByteArray].
Expand All @@ -17,12 +18,13 @@ fun Flow<Byte>.asByteArray(
/**
* Converts the [Flow] of [Byte] to a [Flow] of [String].
*
* @param charset The [Charset] to use for converting the bytes to strings. Defaults to the system's default charset.
*
* @return A new [Flow] of [String] converted from the original [Flow] of [Byte].
*/
fun Flow<Byte>.asString(charset: Charset = Charset.defaultCharset()): Flow<String> =
map { String(listOf(it).toByteArray(), charset) }
fun Flow<Byte>.asString(
groupStrategy: GroupStrategy = GroupStrategy.Count(8)
): Flow<String> =
asByteArray(groupStrategy)
.asString()

/**
* Sums the elements of this [Flow] of [Byte] and returns the result.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/kotlin/com/river/core/ThrottleExt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fun <T> Flow<T>.throttle(
* }
* ```
*/
fun <T> Flow<T>.throttle(
fun <T, P> Flow<T>.throttle(
strategy: ThrottleStrategy = ThrottleStrategy.Suspend,
semaphore: suspend CoroutineScope.() -> AsyncSemaphore
semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>
): Flow<T> = ThrottleFlow(semaphore, strategy, this)
Loading

0 comments on commit 5e5651d

Please sign in to comment.