Skip to content

Commit

Permalink
Merge pull request #17 from aaronriekenberg/refcountmap
Browse files Browse the repository at this point in the history
Adding ReferenceCountMap for testing
  • Loading branch information
aaronriekenberg authored Jun 11, 2024
2 parents 9244efc + 5971cfb commit dba8f7a
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 0 deletions.
29 changes: 29 additions & 0 deletions examples/map/referencecount/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

plugins {
kotlin("jvm")
id("com.github.johnrengelman.shadow") version "7.1.2"
}

repositories {
mavenCentral()
}

dependencies {
implementation(project(":examples:map:utils"))
}

tasks {
named<ShadowJar>("shadowJar") {
archiveBaseName.set("referencecount-shadowjar")
manifest {
attributes(mapOf("Main-Class" to "com.target.nativememoryallocator.examples.map.referencecount.ReferenceCountKt"))
}
}
}

tasks {
build {
dependsOn(shadowJar)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.target.nativememoryallocator.examples.map.referencecount

import com.target.nativememoryallocator.allocator.NativeMemoryAllocatorBuilder
import com.target.nativememoryallocator.examples.map.utils.CacheObject
import com.target.nativememoryallocator.examples.map.utils.CacheObjectSerializer
import com.target.nativememoryallocator.examples.map.utils.buildRandomString
import com.target.nativememoryallocator.map.NativeMemoryMapBackend
import com.target.nativememoryallocator.map.NativeMemoryMapBuilder
import com.target.nativememoryallocator.map.ReferenceCountMap
import kotlinx.coroutines.*
import mu.KotlinLogging
import kotlin.random.Random

private val logger = KotlinLogging.logger {}

private class OffHeap {

private val numEntries = 20_000

private val randomIndex = Random.nextInt(0, numEntries)

private val nativeMemoryAllocator = NativeMemoryAllocatorBuilder(
pageSizeBytes = 4_096, // 4 KB
nativeMemorySizeBytes = (20L * 1_024L * 1_024L * 1_024L), // 20 GB
).build()

private val referenceCountMap = ReferenceCountMap<Int, CacheObject>(
valueSerializer = CacheObjectSerializer(),
nativeMemoryAllocator = nativeMemoryAllocator,
)

private fun putValueIntoMap(i: Int) {
if ((i % 100) == 0) {
logger.info { "put i = $i" }
}
val value = buildRandomString(length = 500 * 1_024)
if (i == randomIndex) {
logger.info { "put randomIndex = $randomIndex value.length = ${value.length}" }
logger.info { "value.substring(0,20) = ${value.substring(0, 20)}" }
}
referenceCountMap.put(
key = i,
value = CacheObject(
s = value,
),
)
}

suspend fun run() {
logger.info { "begin run randomIndex = $randomIndex" }

coroutineScope {
(0 until numEntries).forEach { i ->
launch {
putValueIntoMap(i = i)
}
}
}

logger.info { "nativeMemoryMap.size = ${referenceCountMap.size}" }
logger.info { "nativeMemoryAllocator.nativeMemoryAllocatorMetadata = ${nativeMemoryAllocator.nativeMemoryAllocatorMetadata}" }

val randomIndexValue = referenceCountMap.get(key = randomIndex)
randomIndexValue?.let {
logger.info { "get randomIndex = $randomIndex" }
logger.info { "randomIndexValue.s.length = ${it.s.length}" }
logger.info { "randomIndexValue.s.substring(0,20) = ${it.s.substring(0, 20)}" }
}

while (true) {
delay(1_000)
}
}

}

suspend fun main() {
withContext(Dispatchers.Default) {
OffHeap().run()
}
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ include(
"examples:map:offheap-eviction",
"examples:map:offheap-eviction-operationcounters",
"examples:map:offheap-flatbuffers",
"examples:map:referencecount",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.target.nativememoryallocator.map

import com.target.nativememoryallocator.allocator.NativeMemoryAllocator
import com.target.nativememoryallocator.buffer.NativeMemoryBuffer
import com.target.nativememoryallocator.buffer.OnHeapMemoryBuffer
import com.target.nativememoryallocator.buffer.OnHeapMemoryBufferFactory
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger

private class ReferenceCountValue(
val nativeMemoryBuffer: NativeMemoryBuffer,
) {
private val referenceCount = AtomicInteger(0)

fun incrementReferenceCount(): Int {
return referenceCount.incrementAndGet()
}

fun decrementReferenceCount(): Int {
return referenceCount.decrementAndGet()
}

}

class ReferenceCountMap<KEY_TYPE : Any, VALUE_TYPE : Any>(
private val valueSerializer: NativeMemoryMapSerializer<VALUE_TYPE>,
private val nativeMemoryAllocator: NativeMemoryAllocator,
) {

private val innerMap = ConcurrentHashMap<KEY_TYPE, ReferenceCountValue>()

private fun freeNativeMemoryBuffer(
nativeMemoryBuffer: NativeMemoryBuffer,
) {
nativeMemoryAllocator.freeNativeMemoryBuffer(nativeMemoryBuffer)
}

private fun decrementReferenceCount(
referenceCountValue: ReferenceCountValue,
) {
if (referenceCountValue.decrementReferenceCount() == 0) {
freeNativeMemoryBuffer(referenceCountValue.nativeMemoryBuffer)
}
}

fun put(key: KEY_TYPE, value: VALUE_TYPE) {

val newValueByteArray = valueSerializer.serializeToByteArray(value = value)
val newCapacityBytes = newValueByteArray.size

val nativeMemoryBuffer =
nativeMemoryAllocator.allocateNativeMemoryBuffer(capacityBytes = newCapacityBytes)

nativeMemoryBuffer.copyFromArray(byteArray = newValueByteArray)

val newRefCountedValue = ReferenceCountValue(
nativeMemoryBuffer = nativeMemoryBuffer,
)
newRefCountedValue.incrementReferenceCount()

val previousValue = innerMap.put(key = key, value = newRefCountedValue)

if (previousValue != null) {
decrementReferenceCount(previousValue)
}
}

fun get(key: KEY_TYPE): VALUE_TYPE? {
val mapValue = innerMap.computeIfPresent(key) { _, currentValue ->
currentValue.incrementReferenceCount()

currentValue
} ?: return null

try {
// copy NMA to onheap buffer
val onHeapMemoryBuffer =
OnHeapMemoryBufferFactory.newOnHeapMemoryBuffer(initialCapacityBytes = mapValue.nativeMemoryBuffer.capacityBytes)

mapValue.nativeMemoryBuffer.copyToOnHeapMemoryBuffer(onHeapMemoryBuffer)

val deserializedValue =
valueSerializer.deserializeFromOnHeapMemoryBuffer(onHeapMemoryBuffer = onHeapMemoryBuffer)

return deserializedValue
} finally {
decrementReferenceCount(mapValue)
}
}

fun getWithBuffer(
key: KEY_TYPE,
onHeapMemoryBuffer: OnHeapMemoryBuffer,
): VALUE_TYPE? {
val mapValue = innerMap.computeIfPresent(key) { _, currentValue ->
currentValue.incrementReferenceCount()

currentValue
} ?: return null

try {
mapValue.nativeMemoryBuffer.copyToOnHeapMemoryBuffer(onHeapMemoryBuffer)

val deserializedValue =
valueSerializer.deserializeFromOnHeapMemoryBuffer(onHeapMemoryBuffer = onHeapMemoryBuffer)

return deserializedValue
} finally {
decrementReferenceCount(mapValue)
}
}

fun delete(key: KEY_TYPE) {
val previousValue = innerMap.remove(key)

if (previousValue != null) {
decrementReferenceCount(previousValue)
}
}

val size: Int
get() = innerMap.size
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.target.nativememoryallocator.metrics.micrometer

import com.target.nativememoryallocator.allocator.NativeMemoryAllocator
import com.target.nativememoryallocator.map.BaseNativeMemoryMap
import com.target.nativememoryallocator.map.ReferenceCountMap
import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
Expand Down Expand Up @@ -151,4 +152,18 @@ class MicrometerNativeMemoryMapMetrics(
) { operationCounters.numGetsNonNullValue.toDouble() }
}
}
}

class MicrometerReferenceCountMapMetrics(
referenceCountMap: ReferenceCountMap<*, *>,
meterRegistry: MeterRegistry,
tags: Tags = Tags.empty(),
) {
init {
meterRegistry.registerGauge(
name = "nativeMemoryMap.size",
gaugeObject = referenceCountMap,
tags = tags,
) { referenceCountMap.size.toDouble() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.target.nativememoryallocator.map.impl

import com.target.nativememoryallocator.allocator.NativeMemoryAllocatorBuilder
import com.target.nativememoryallocator.buffer.OnHeapMemoryBuffer
import com.target.nativememoryallocator.buffer.OnHeapMemoryBufferFactory
import com.target.nativememoryallocator.map.NativeMemoryMapSerializer
import com.target.nativememoryallocator.map.ReferenceCountMap
import mu.KotlinLogging

private val logger = KotlinLogging.logger {}

// TODO make unit test
fun main() {
class TestSerializer : NativeMemoryMapSerializer<String> {

override fun deserializeFromOnHeapMemoryBuffer(onHeapMemoryBuffer: OnHeapMemoryBuffer): String {
return String(onHeapMemoryBuffer.toTrimmedArray())
}

override fun serializeToByteArray(value: String): ByteArray {
return value.toByteArray()
}

}

val nativeMemoryAllocator = NativeMemoryAllocatorBuilder(
zeroNativeMemoryOnStartup = false,
nativeMemorySizeBytes = 100 * 1024 * 1024,
pageSizeBytes = 1024,
).build()

val map = ReferenceCountMap<String, String>(
nativeMemoryAllocator = nativeMemoryAllocator,
valueSerializer = TestSerializer(),
)

logger.info { "map.size = ${map.size}" }

map.put(
key = "123", value = "234",
)

logger.info { "map.size = ${map.size}" }

var value = map.get(key = "123")
logger.info { "get value = $value" }

val onHeapMemoryBuffer = OnHeapMemoryBufferFactory.newOnHeapMemoryBuffer(initialCapacityBytes = 2)
value = map.getWithBuffer(key = "123", onHeapMemoryBuffer = onHeapMemoryBuffer)

logger.info { "getWithBuffer value = $value" }

map.put(
key = "123", value = "345",
)

for (i in 0 until 100) {
map.put(
key = "234", value = "234",
)
}

logger.info { "map.size = ${map.size}" }

logger.info { "nma metadata = ${nativeMemoryAllocator.nativeMemoryAllocatorMetadata}" }

value = map.get(key = "123")

logger.info { "got value = $value" }

for (i in 0 until 100) {
value = map.get(key = "234")
}

logger.info { "got value = $value" }

map.delete(key = "234")
logger.info { "after delete 234 nma metadata = ${nativeMemoryAllocator.nativeMemoryAllocatorMetadata}" }

map.delete(key = "123")

value = map.get(key = "123")

logger.info { "after delete got value = $value" }

logger.info { "map.size = ${map.size}" }

logger.info { "nma metadata = ${nativeMemoryAllocator.nativeMemoryAllocatorMetadata}" }
}

0 comments on commit dba8f7a

Please sign in to comment.