diff --git a/examples/map/referencecount/build.gradle.kts b/examples/map/referencecount/build.gradle.kts new file mode 100644 index 0000000..b7863eb --- /dev/null +++ b/examples/map/referencecount/build.gradle.kts @@ -0,0 +1,29 @@ +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar + +plugins { + kotlin("jvm") + id("com.github.johnrengelman.shadow") version "7.1.2" +} + +repositories { + mavenCentral() +} + +dependencies { + implementation(project(":examples:map:utils")) +} + +tasks { + named("shadowJar") { + archiveBaseName.set("referencecount-shadowjar") + manifest { + attributes(mapOf("Main-Class" to "com.target.nativememoryallocator.examples.map.referencecount.ReferenceCountKt")) + } + } +} + +tasks { + build { + dependsOn(shadowJar) + } +} \ No newline at end of file diff --git a/examples/map/referencecount/src/main/kotlin/com/target/nativememoryallocator/examples/map/referencecount/ReferenceCount.kt b/examples/map/referencecount/src/main/kotlin/com/target/nativememoryallocator/examples/map/referencecount/ReferenceCount.kt new file mode 100644 index 0000000..7c1aece --- /dev/null +++ b/examples/map/referencecount/src/main/kotlin/com/target/nativememoryallocator/examples/map/referencecount/ReferenceCount.kt @@ -0,0 +1,81 @@ +package com.target.nativememoryallocator.examples.map.referencecount + +import com.target.nativememoryallocator.allocator.NativeMemoryAllocatorBuilder +import com.target.nativememoryallocator.examples.map.utils.CacheObject +import com.target.nativememoryallocator.examples.map.utils.CacheObjectSerializer +import com.target.nativememoryallocator.examples.map.utils.buildRandomString +import com.target.nativememoryallocator.map.NativeMemoryMapBackend +import com.target.nativememoryallocator.map.NativeMemoryMapBuilder +import com.target.nativememoryallocator.map.ReferenceCountMap +import kotlinx.coroutines.* +import mu.KotlinLogging +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} + +private class OffHeap { + + private val numEntries = 20_000 + + private val randomIndex = Random.nextInt(0, numEntries) + + private val nativeMemoryAllocator = NativeMemoryAllocatorBuilder( + pageSizeBytes = 4_096, // 4 KB + nativeMemorySizeBytes = (20L * 1_024L * 1_024L * 1_024L), // 20 GB + ).build() + + private val referenceCountMap = ReferenceCountMap( + valueSerializer = CacheObjectSerializer(), + nativeMemoryAllocator = nativeMemoryAllocator, + ) + + private fun putValueIntoMap(i: Int) { + if ((i % 100) == 0) { + logger.info { "put i = $i" } + } + val value = buildRandomString(length = 500 * 1_024) + if (i == randomIndex) { + logger.info { "put randomIndex = $randomIndex value.length = ${value.length}" } + logger.info { "value.substring(0,20) = ${value.substring(0, 20)}" } + } + referenceCountMap.put( + key = i, + value = CacheObject( + s = value, + ), + ) + } + + suspend fun run() { + logger.info { "begin run randomIndex = $randomIndex" } + + coroutineScope { + (0 until numEntries).forEach { i -> + launch { + putValueIntoMap(i = i) + } + } + } + + logger.info { "nativeMemoryMap.size = ${referenceCountMap.size}" } + logger.info { "nativeMemoryAllocator.nativeMemoryAllocatorMetadata = ${nativeMemoryAllocator.nativeMemoryAllocatorMetadata}" } + + val randomIndexValue = referenceCountMap.get(key = randomIndex) + randomIndexValue?.let { + logger.info { "get randomIndex = $randomIndex" } + logger.info { "randomIndexValue.s.length = ${it.s.length}" } + logger.info { "randomIndexValue.s.substring(0,20) = ${it.s.substring(0, 20)}" } + } + + while (true) { + delay(1_000) + } + } + +} + +suspend fun main() { + withContext(Dispatchers.Default) { + OffHeap().run() + } +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index e1d0be0..375a6d9 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -10,4 +10,5 @@ include( "examples:map:offheap-eviction", "examples:map:offheap-eviction-operationcounters", "examples:map:offheap-flatbuffers", + "examples:map:referencecount", ) diff --git a/src/main/kotlin/com/target/nativememoryallocator/map/ReferenceCountMap.kt b/src/main/kotlin/com/target/nativememoryallocator/map/ReferenceCountMap.kt new file mode 100644 index 0000000..f62a414 --- /dev/null +++ b/src/main/kotlin/com/target/nativememoryallocator/map/ReferenceCountMap.kt @@ -0,0 +1,123 @@ +package com.target.nativememoryallocator.map + +import com.target.nativememoryallocator.allocator.NativeMemoryAllocator +import com.target.nativememoryallocator.buffer.NativeMemoryBuffer +import com.target.nativememoryallocator.buffer.OnHeapMemoryBuffer +import com.target.nativememoryallocator.buffer.OnHeapMemoryBufferFactory +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +private class ReferenceCountValue( + val nativeMemoryBuffer: NativeMemoryBuffer, +) { + private val referenceCount = AtomicInteger(0) + + fun incrementReferenceCount(): Int { + return referenceCount.incrementAndGet() + } + + fun decrementReferenceCount(): Int { + return referenceCount.decrementAndGet() + } + +} + +class ReferenceCountMap( + private val valueSerializer: NativeMemoryMapSerializer, + private val nativeMemoryAllocator: NativeMemoryAllocator, +) { + + private val innerMap = ConcurrentHashMap() + + private fun freeNativeMemoryBuffer( + nativeMemoryBuffer: NativeMemoryBuffer, + ) { + nativeMemoryAllocator.freeNativeMemoryBuffer(nativeMemoryBuffer) + } + + private fun decrementReferenceCount( + referenceCountValue: ReferenceCountValue, + ) { + if (referenceCountValue.decrementReferenceCount() == 0) { + freeNativeMemoryBuffer(referenceCountValue.nativeMemoryBuffer) + } + } + + fun put(key: KEY_TYPE, value: VALUE_TYPE) { + + val newValueByteArray = valueSerializer.serializeToByteArray(value = value) + val newCapacityBytes = newValueByteArray.size + + val nativeMemoryBuffer = + nativeMemoryAllocator.allocateNativeMemoryBuffer(capacityBytes = newCapacityBytes) + + nativeMemoryBuffer.copyFromArray(byteArray = newValueByteArray) + + val newRefCountedValue = ReferenceCountValue( + nativeMemoryBuffer = nativeMemoryBuffer, + ) + newRefCountedValue.incrementReferenceCount() + + val previousValue = innerMap.put(key = key, value = newRefCountedValue) + + if (previousValue != null) { + decrementReferenceCount(previousValue) + } + } + + fun get(key: KEY_TYPE): VALUE_TYPE? { + val mapValue = innerMap.computeIfPresent(key) { _, currentValue -> + currentValue.incrementReferenceCount() + + currentValue + } ?: return null + + try { + // copy NMA to onheap buffer + val onHeapMemoryBuffer = + OnHeapMemoryBufferFactory.newOnHeapMemoryBuffer(initialCapacityBytes = mapValue.nativeMemoryBuffer.capacityBytes) + + mapValue.nativeMemoryBuffer.copyToOnHeapMemoryBuffer(onHeapMemoryBuffer) + + val deserializedValue = + valueSerializer.deserializeFromOnHeapMemoryBuffer(onHeapMemoryBuffer = onHeapMemoryBuffer) + + return deserializedValue + } finally { + decrementReferenceCount(mapValue) + } + } + + fun getWithBuffer( + key: KEY_TYPE, + onHeapMemoryBuffer: OnHeapMemoryBuffer, + ): VALUE_TYPE? { + val mapValue = innerMap.computeIfPresent(key) { _, currentValue -> + currentValue.incrementReferenceCount() + + currentValue + } ?: return null + + try { + mapValue.nativeMemoryBuffer.copyToOnHeapMemoryBuffer(onHeapMemoryBuffer) + + val deserializedValue = + valueSerializer.deserializeFromOnHeapMemoryBuffer(onHeapMemoryBuffer = onHeapMemoryBuffer) + + return deserializedValue + } finally { + decrementReferenceCount(mapValue) + } + } + + fun delete(key: KEY_TYPE) { + val previousValue = innerMap.remove(key) + + if (previousValue != null) { + decrementReferenceCount(previousValue) + } + } + + val size: Int + get() = innerMap.size +} \ No newline at end of file diff --git a/src/main/kotlin/com/target/nativememoryallocator/metrics/micrometer/MicrometerMetrics.kt b/src/main/kotlin/com/target/nativememoryallocator/metrics/micrometer/MicrometerMetrics.kt index 4bcc043..e0798d0 100644 --- a/src/main/kotlin/com/target/nativememoryallocator/metrics/micrometer/MicrometerMetrics.kt +++ b/src/main/kotlin/com/target/nativememoryallocator/metrics/micrometer/MicrometerMetrics.kt @@ -2,6 +2,7 @@ package com.target.nativememoryallocator.metrics.micrometer import com.target.nativememoryallocator.allocator.NativeMemoryAllocator import com.target.nativememoryallocator.map.BaseNativeMemoryMap +import com.target.nativememoryallocator.map.ReferenceCountMap import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Tags @@ -151,4 +152,18 @@ class MicrometerNativeMemoryMapMetrics( ) { operationCounters.numGetsNonNullValue.toDouble() } } } +} + +class MicrometerReferenceCountMapMetrics( + referenceCountMap: ReferenceCountMap<*, *>, + meterRegistry: MeterRegistry, + tags: Tags = Tags.empty(), +) { + init { + meterRegistry.registerGauge( + name = "nativeMemoryMap.size", + gaugeObject = referenceCountMap, + tags = tags, + ) { referenceCountMap.size.toDouble() } + } } \ No newline at end of file diff --git a/src/test/kotlin/com/target/nativememoryallocator/map/impl/ReferenceCountMapSpec.kt b/src/test/kotlin/com/target/nativememoryallocator/map/impl/ReferenceCountMapSpec.kt new file mode 100644 index 0000000..9b4be30 --- /dev/null +++ b/src/test/kotlin/com/target/nativememoryallocator/map/impl/ReferenceCountMapSpec.kt @@ -0,0 +1,89 @@ +package com.target.nativememoryallocator.map.impl + +import com.target.nativememoryallocator.allocator.NativeMemoryAllocatorBuilder +import com.target.nativememoryallocator.buffer.OnHeapMemoryBuffer +import com.target.nativememoryallocator.buffer.OnHeapMemoryBufferFactory +import com.target.nativememoryallocator.map.NativeMemoryMapSerializer +import com.target.nativememoryallocator.map.ReferenceCountMap +import mu.KotlinLogging + +private val logger = KotlinLogging.logger {} + +// TODO make unit test +fun main() { + class TestSerializer : NativeMemoryMapSerializer { + + override fun deserializeFromOnHeapMemoryBuffer(onHeapMemoryBuffer: OnHeapMemoryBuffer): String { + return String(onHeapMemoryBuffer.toTrimmedArray()) + } + + override fun serializeToByteArray(value: String): ByteArray { + return value.toByteArray() + } + + } + + val nativeMemoryAllocator = NativeMemoryAllocatorBuilder( + zeroNativeMemoryOnStartup = false, + nativeMemorySizeBytes = 100 * 1024 * 1024, + pageSizeBytes = 1024, + ).build() + + val map = ReferenceCountMap( + nativeMemoryAllocator = nativeMemoryAllocator, + valueSerializer = TestSerializer(), + ) + + logger.info { "map.size = ${map.size}" } + + map.put( + key = "123", value = "234", + ) + + logger.info { "map.size = ${map.size}" } + + var value = map.get(key = "123") + logger.info { "get value = $value" } + + val onHeapMemoryBuffer = OnHeapMemoryBufferFactory.newOnHeapMemoryBuffer(initialCapacityBytes = 2) + value = map.getWithBuffer(key = "123", onHeapMemoryBuffer = onHeapMemoryBuffer) + + logger.info { "getWithBuffer value = $value" } + + map.put( + key = "123", value = "345", + ) + + for (i in 0 until 100) { + map.put( + key = "234", value = "234", + ) + } + + logger.info { "map.size = ${map.size}" } + + logger.info { "nma metadata = ${nativeMemoryAllocator.nativeMemoryAllocatorMetadata}" } + + value = map.get(key = "123") + + logger.info { "got value = $value" } + + for (i in 0 until 100) { + value = map.get(key = "234") + } + + logger.info { "got value = $value" } + + map.delete(key = "234") + logger.info { "after delete 234 nma metadata = ${nativeMemoryAllocator.nativeMemoryAllocatorMetadata}" } + + map.delete(key = "123") + + value = map.get(key = "123") + + logger.info { "after delete got value = $value" } + + logger.info { "map.size = ${map.size}" } + + logger.info { "nma metadata = ${nativeMemoryAllocator.nativeMemoryAllocatorMetadata}" } +} \ No newline at end of file