Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions qbit-core/src/commonMain/kotlin/qbit/Conn.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package qbit

import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.toSet
import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.KSerializer
Expand All @@ -23,6 +26,7 @@ import qbit.factoring.serializatoin.KSFactorizer
import qbit.index.Indexer
import qbit.index.InternalDb
import qbit.index.RawEntity
import qbit.index.entities
import qbit.ns.Namespace
import qbit.resolving.lastWriterWinsResolve
import qbit.resolving.logsDiff
Expand All @@ -32,7 +36,7 @@ import qbit.storage.SerializedStorage
import qbit.trx.*
import kotlin.reflect.KClass

suspend fun qbit(storage: Storage, appSerialModule: SerializersModule): Conn {
suspend fun qbit(storage: Storage, appSerialModule: SerializersModule, registerFolders: Map<String, (Any, Any) -> Any>): Conn {
val iid = Iid(1, 4)
// TODO: fix dbUuid retrieving
val dbUuid = DbUuid(iid)
Expand All @@ -42,7 +46,14 @@ suspend fun qbit(storage: Storage, appSerialModule: SerializersModule): Conn {
val systemSerialModule = createSystemSerialModule(appSerialModule)
val factor = KSFactorizer(systemSerialModule)::factor
val head = loadOrInitHead(storage, nodesStorage, serializedStorage, dbUuid, factor)
val db = Indexer(systemSerialModule, null, null, nodesResolver(nodesStorage)).index(head)
val db = Indexer(
systemSerialModule,
null,
null,
nodesResolver(nodesStorage),
causalHashesResolver(nodesStorage),
registerFolders
).index(head)

return QConn(dbUuid, serializedStorage, head, factor, nodesStorage, db)
}
Expand Down Expand Up @@ -82,6 +93,8 @@ class QConn(

private val resolveNode = nodesResolver(nodesStorage)

private val resolveCausality = causalHashesResolver(nodesStorage)

private val gidSequence: GidSequence = with(db.pull<Instance>(Gid(dbUuid.iid, theInstanceEid))) {
if (this == null) {
throw QBitException("Corrupted DB - the instance entity not found")
Expand All @@ -99,7 +112,7 @@ class QConn(
}

override fun trx(): Trx {
return QTrx(db.pull(Gid(dbUuid.iid, theInstanceEid))!!, trxLog, db, this, factor, gidSequence)
return QTrx(db.pull(Gid(dbUuid.iid, theInstanceEid))!!, trxLog, db, this, factor, gidSequence, resolveCausality)
}

override suspend fun <T> trx(body: Trx.() -> T): T {
Expand Down Expand Up @@ -144,18 +157,17 @@ class QConn(
newDb: InternalDb
): Pair<TrxLog, InternalDb> {
val logsDifference = logsDiff(baseLog, committedLog, committingLog, resolveNode)

val committedEavs = logsDifference
.logAEntities()
.toEavsList()
val reconciliationEavs = logsDifference
.reconciliationEntities(lastWriterWinsResolve { db.attr(it) })
.toEavsList()

val mergedDb = newDb
.with(committedEavs)
.with(reconciliationEavs)
val mergedLog = committingLog.mergeWith(committedLog, baseLog.hash, reconciliationEavs)
val allNodes = nodesBetween(null, mergedLog.head, resolveNode).toList()
val indexedNodes = nodesBetween(null, committingLog.head, resolveNode).toSet()
val notIndexedNodes = allNodes.filter { node -> indexedNodes.none { it.hash == node.hash } }
val mergedDb = notIndexedNodes.fold(newDb) {db, n ->
db.with(n.entities().flatMap { it.second }, n.hash, resolveCausality(n.hash))
}

return mergedLog to mergedDb
}
Expand All @@ -172,6 +184,13 @@ private fun nodesResolver(nodeStorage: NodesStorage): (Node<Hash>) -> NodeVal<Ha
}
}

fun causalHashesResolver(nodeStorage: NodesStorage): suspend (Hash) -> List<Hash> = { hash ->
val node = nodeStorage.load(NodeRef(hash)) ?: throw QBitException("Error: could not resolve node for hash $hash")
val resolveNode = nodesResolver(nodeStorage)
val causalNodes = nodesBetween(null, node, resolveNode)
causalNodes.map { it.hash }.toList()
}

@Suppress("EXPERIMENTAL_API_USAGE")
class SchemaValidator : SerializersModuleCollector {

Expand Down
2 changes: 1 addition & 1 deletion qbit-core/src/commonMain/kotlin/qbit/api/QbitSelfSchema.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object Instances {
val nextEid = Attr<Int>(
Gid(1, 5),
"Instance/nextEid",
QInt.code,
QInt.counter().code,
unique = false,
list = false
)
Expand Down
57 changes: 46 additions & 11 deletions qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import kotlin.reflect.KClass
* - List<Ref>
*/

val scalarRange = 0..31
val listRange = 32..63
val counterRange = 64..95
val registerRange = 96..127

@Suppress("UNCHECKED_CAST")
sealed class DataType<out T : Any> {

Expand All @@ -31,12 +36,13 @@ sealed class DataType<out T : Any> {
private val values: Array<DataType<*>>
get() = arrayOf(QBoolean, QByte, QInt, QLong, QString, QBytes, QGid, QRef)

fun ofCode(code: Byte): DataType<*>? =
if (code <= 19) {
values.firstOrNull { it.code == code }
} else {
values.map { it.list() }.firstOrNull { it.code == code }
}
fun ofCode(code: Byte): DataType<*>? = when(code) {
in scalarRange -> values.firstOrNull { it.code == code }
in listRange -> ofCode((code - listRange.first).toByte())?.list()
in counterRange -> ofCode((code - counterRange.first).toByte())?.counter()
in registerRange -> ofCode((code - registerRange.first).toByte())?.register()
else -> null
}

fun <T : Any> ofValue(value: T?): DataType<T>? = when (value) {
is Boolean -> QBoolean as DataType<T>
Expand All @@ -46,7 +52,7 @@ sealed class DataType<out T : Any> {
is String -> QString as DataType<T>
is ByteArray -> QBytes as DataType<T>
is Gid -> QGid as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>?
else -> QRef as DataType<T>
}
}
Expand All @@ -57,9 +63,25 @@ sealed class DataType<out T : Any> {
return QList(this)
}

fun isList(): Boolean = (code.toInt().and(32)) > 0
fun isList(): Boolean = code in listRange

fun ref(): Boolean = this == QRef || this is QList<*> && this.itemsType == QRef
fun counter(): QCounter<T> {
require(this is QByte || this is QInt || this is QLong) { "Only primitive number values are allowed in counters" }
return QCounter(this)
}

fun isCounter(): Boolean = code in counterRange

fun register(): QRegister<T> {
require(!(this is QList<*> || this is QCounter || this is QRegister)) { "Nested wrappers is not allowed" }
return QRegister(this)
}

fun isRegister(): Boolean = code in registerRange

fun ref(): Boolean = this == QRef ||
this is QList<*> && this.itemsType == QRef ||
this is QRegister<*> && this.itemsType == QRef

fun value(): Boolean = !ref()

Expand All @@ -73,15 +95,28 @@ sealed class DataType<out T : Any> {
is QBytes -> ByteArray::class
is QGid -> Gid::class
is QList<*> -> this.itemsType.typeClass()
is QCounter<*> -> this.primitiveType.typeClass()
is QRegister<*> -> this.itemsType.typeClass()
QRef -> Any::class
}
}

}

data class QList<out I : Any>(val itemsType: DataType<I>) : DataType<List<I>>() {

override val code = (32 + itemsType.code).toByte()
override val code = (listRange.first + itemsType.code).toByte()

}

data class QCounter<out I : Any>(val primitiveType: DataType<I>) : DataType<I>() {

override val code = (counterRange.first + primitiveType.code).toByte()

}

data class QRegister<out I : Any>(val itemsType: DataType<I>) : DataType<I>() {

override val code = (registerRange.first + itemsType.code).toByte()

}

Expand Down
56 changes: 42 additions & 14 deletions qbit-core/src/commonMain/kotlin/qbit/index/Index.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package qbit.index

import qbit.api.db.QueryPred
import qbit.api.gid.Gid
import qbit.api.model.Attr
import qbit.api.model.DataType
import qbit.api.model.Eav
import qbit.api.model.Hash
import qbit.api.tombstone
import qbit.platform.assert
import qbit.platform.collections.firstMatchIdx
Expand All @@ -13,7 +16,7 @@ import qbit.platform.collections.subList
typealias RawEntity = Pair<Gid, List<Eav>>

fun Index(entities: List<RawEntity>): Index =
Index().add(entities)
Index().add(entities, null, emptyList())

fun eidPattern(eid: Gid) = { other: Eav -> other.gid.compareTo(eid) }

Expand Down Expand Up @@ -61,17 +64,17 @@ class Index(
}
}

fun addFacts(facts: List<Eav>): Index =
addFacts(facts as Iterable<Eav>)
fun addFacts(facts: List<Eav>, hash: Hash? = null, causalHashes: List<Hash> = emptyList(), resolveAttr: (String) -> Attr<*>? = { null }): Index =
addFacts(facts as Iterable<Eav>, hash, causalHashes, resolveAttr)

fun addFacts(facts: Iterable<Eav>): Index {
fun addFacts(facts: Iterable<Eav>, hash: Hash?, causalHashes: List<Hash>, resolveAttr: (String) -> Attr<*>? = { null }): Index {
val entities = facts
.groupBy { it.gid }
.map { it.key to it.value }
return add(entities)
return add(entities, hash, causalHashes, resolveAttr)
}

fun add(entities: List<RawEntity>): Index {
fun add(entities: List<RawEntity>, hash: Hash?, causalHashes: List<Hash>, resolveAttr: (String) -> Attr<*>? = { null }): Index {
val newEntities = HashMap(this.entities)

// eavs of removed or updated entities
Expand All @@ -82,17 +85,42 @@ class Index(
val (gid, eavs) = e

val isUpdate = eavs[0].attr != tombstone.name
val obsoleteEntity =
if (isUpdate) {
newEntities.put(gid, e)
} else {
newEntities.remove(gid)
val obsoleteEntity = newEntities.get(gid)

if (isUpdate) {
val effectiveEavs = ArrayList<Eav>()
eavs.mapTo(effectiveEavs) { eav ->
val attr = resolveAttr(eav.attr)
if (attr != null && DataType.ofCode(attr.type)!!.isRegister()) {
val persistedEav = obsoleteEntity?.second?.firstOrNull { it.attr == eav.attr }
if (persistedEav != null) {
persistedEav.copy(value = (persistedEav.value as IndexedRegister).indexValue(hash, eav.value, causalHashes))
} else {
eav.copy(value = IndexedRegister(listOf(Pair(hash, eav.value))))
}
} else {
eav
}
}

if(obsoleteEntity != null) {
effectiveEavs.addAll(obsoleteEntity.second.filter { eav ->
val attr = resolveAttr(eav.attr)
attr != null &&
DataType.ofCode(attr.type)!!.let { it.isCounter() || it.isRegister() } &&
eavs.none { it.attr == eav.attr }
})
}

newEntities.put(gid, RawEntity(gid, effectiveEavs))
newEavs.addAll(effectiveEavs.filter { it.value is Comparable<*> && it.attr != tombstone.name })
} else {
newEntities.remove(gid)
}

if (obsoleteEntity != null) {
obsoleteEavs.addAll(obsoleteEntity.second)
}
newEavs.addAll(eavs.filter { it.value is Comparable<*> && it.attr != tombstone.name })
}

obsoleteEavs.sortWith(aveCmp)
Expand All @@ -103,8 +131,8 @@ class Index(
return Index(newEntities, newAveIndex)
}

fun add(e: RawEntity): Index {
return add(listOf(e))
fun add(e: RawEntity, hash: Hash?, causalHashes: List<Hash>, resolveAttr: (String) -> Attr<*>? = { null }): Index {
return add(listOf(e), hash, causalHashes, resolveAttr)
}

fun entityById(eid: Gid): Map<String, List<Any>>? =
Expand Down
14 changes: 8 additions & 6 deletions qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import qbit.api.gid.Gid
import qbit.api.model.*
import qbit.api.model.impl.AttachedEntity
import qbit.collections.LimitedPersistentMap
import qbit.trx.deoperationalize
import qbit.typing.typify
import kotlin.reflect.KClass

class IndexDb(
internal val index: Index,
private val serialModule: SerializersModule
private val serialModule: SerializersModule,
private val registerFolders: Map<String, (Any, Any) -> Any>
) : InternalDb() {

private val schema = loadAttrs(index)
Expand All @@ -30,8 +32,8 @@ class IndexDb(

private val dataClassesCache = atomic<LimitedPersistentMap<Entity, Any>>(LimitedPersistentMap(1024))

override fun with(facts: Iterable<Eav>): InternalDb {
return IndexDb(index.addFacts(facts), serialModule)
override fun with(facts: Iterable<Eav>, commitHash: Hash?, causalHashes: List<Hash>): IndexDb {
return IndexDb(index.addFacts(deoperationalize(this, facts.toList()), commitHash, causalHashes, this::attr), serialModule, registerFolders)
}

override fun pullEntity(gid: Gid): StoredEntity? {
Expand Down Expand Up @@ -64,8 +66,8 @@ class IndexDb(
// see https://github.com/d-r-q/qbit/issues/114, https://github.com/d-r-q/qbit/issues/132
private fun fixNumberType(attr: Attr<Any>, value: Any) =
when (attr.type) {
QByte.code -> (value as Number).toByte()
QInt.code -> (value as Number).toInt()
QByte.code, QByte.counter().code -> (value as Number).toByte()
QInt.code, QInt.counter().code -> (value as Number).toInt()
else -> value
}

Expand All @@ -79,7 +81,7 @@ class IndexDb(
return cached as R
}

val dc = typify(schema::get, entity, type, serialModule)
val dc = typify(schema::get, entity, type, serialModule, registerFolders)
dataClassesCache.update { it.put(entity, dc) }
return dc
}
Expand Down
14 changes: 14 additions & 0 deletions qbit-core/src/commonMain/kotlin/qbit/index/IndexedRegister.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package qbit.index

import qbit.api.model.Hash

class IndexedRegister(
val cells: List<Pair<Hash?, Any>>
) {
fun indexValue(hash: Hash?, value: Any, causalHashes: List<Hash?>): IndexedRegister {
val concurrentCells = cells.filter { !causalHashes.contains(it.first) }
return IndexedRegister(concurrentCells + Pair(hash, value))
}

fun values() = cells.map { it.second }
}
9 changes: 5 additions & 4 deletions qbit-core/src/commonMain/kotlin/qbit/index/Indexer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ class Indexer(
private val base: IndexDb?,
private val baseNode: Node<Hash>?,
private val resolveNode: (Node<Hash>) -> NodeVal<Hash>?,
private val causalHashesResolver: suspend (Hash) -> List<Hash>,
private val registerFolders: Map<String, (Any, Any) -> Any>
) {

suspend fun index(from: Node<Hash>): IndexDb {
return nodesBetween(baseNode, from, resolveNode)
.toList()
.map { it.entities() }
.fold(base ?: IndexDb(Index(), serialModule)) { db, n ->
IndexDb(db.index.add(n), serialModule)
.fold(base ?: IndexDb(Index(), serialModule, registerFolders)) { db, n ->
db.with(n.entities().flatMap { it.second }, n.hash, causalHashesResolver(n.hash))
}
}

}

private fun NodeVal<Hash>.entities(): List<RawEntity> =
fun NodeVal<Hash>.entities(): List<RawEntity> =
data.trxes.toList()
.groupBy { it.gid }
.map { it.key to it.value }
Expand Down
Loading