From aa6460859b4e07033877b07f0953e452efd2a46b Mon Sep 17 00:00:00 2001 From: Artur Albov Date: Sat, 26 May 2018 16:38:27 +0300 Subject: [PATCH 1/7] #172 Setup bitcoin mempool transaction dump --- build.gradle | 6 +- .../cassandra/bitcoin/model/Transaction.kt | 20 +- .../migrations/bitcoin/0_initial.cql | 9 +- .../migrations/bitcoin_cash/0_initial.cql | 8 +- .../cyber/search/model/bitcoin/Transaction.kt | 5 +- .../cyber/dump/bitcoin/BlockDumpProcess.kt | 52 ++--- .../fund/cyber/dump/bitcoin/TxDumpProcess.kt | 110 ++++++---- .../fund/cyber/dump/common/Functions.kt | 53 +++-- .../fund/cyber/dump/common/FuntionsTest.kt | 200 ++++++++++++++--- .../cyber/dump/ethereum/BlockDumpProcess.kt | 38 ++-- .../fund/cyber/dump/ethereum/TxDumpProcess.kt | 203 +++++------------- .../cyber/dump/ethereum/UncleDumpProcess.kt | 35 +-- .../dump/ethereum/BlockDumpProcessTest.kt | 36 ++-- .../cyber/dump/ethereum/TxDumpProcessTest.kt | 98 +++++---- .../dump/ethereum/UncleDumpProcessTest.kt | 31 +-- .../JsonRpcToDaoBitcoinTxConverter.kt | 9 +- .../BtcdToDaoTransactionConverterTest.kt | 7 +- .../client/ParityToEthereumBundleConverter.kt | 2 +- 18 files changed, 552 insertions(+), 370 deletions(-) diff --git a/build.gradle b/build.gradle index c7ea99ca..39e605c5 100644 --- a/build.gradle +++ b/build.gradle @@ -23,8 +23,8 @@ buildscript { // tests junitVersion = "5.2.0" junitPlatformVersion = "1.2.0" - mockitoVersion = "2.1.0" - mockitoKotlinVersion = "0.7.0" + mockitoVersion = "2.8.9" + mockitoKotlinVersion = "1.5.0" assertjVersion = "3.9.0" // metrics @@ -172,6 +172,7 @@ subprojects { exclude 'org.jetbrains.kotlin:kotlin-stdlib' } dependency("io.projectreactor:reactor-core:$reactorVersion") + dependency("io.projectreactor:reactor-test:$reactorVersion") dependency("io.micrometer:micrometer-core:$micrometerVersion") dependency("io.micrometer:micrometer-registry-prometheus:$micrometerVersion") @@ -184,6 +185,7 @@ subprojects { testCompile("org.mockito:mockito-core") testCompile("com.nhaarman:mockito-kotlin") testCompile("org.assertj:assertj-core") + testCompile("io.projectreactor:reactor-test") testRuntime("org.junit.jupiter:junit-jupiter-engine") } } diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Transaction.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Transaction.kt index 8dfc138b..df770a6d 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Transaction.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Transaction.kt @@ -5,6 +5,7 @@ package fund.cyber.cassandra.bitcoin.model import fund.cyber.search.model.bitcoin.BitcoinTx import fund.cyber.search.model.bitcoin.BitcoinTxIn import fund.cyber.search.model.bitcoin.BitcoinTxOut +import fund.cyber.search.model.bitcoin.SignatureScript import org.springframework.data.cassandra.core.mapping.Column import org.springframework.data.cassandra.core.mapping.PrimaryKey import org.springframework.data.cassandra.core.mapping.Table @@ -19,6 +20,7 @@ data class CqlBitcoinTx( @Column("block_number") val blockNumber: Long, @Column("block_hash") val blockHash: String?, val coinbase: String? = null, + @Column("first_seen_time") val firstSeenTime: Instant, @Column("block_time") val blockTime: Instant?, val size: Int, val fee: String, @@ -30,7 +32,7 @@ data class CqlBitcoinTx( constructor(tx: BitcoinTx) : this( hash = tx.hash, blockNumber = tx.blockNumber, blockHash = tx.blockHash, coinbase = tx.coinbase, - blockTime = tx.blockTime, size = tx.size, fee = tx.fee.toString(), + blockTime = tx.blockTime, size = tx.size, fee = tx.fee.toString(), firstSeenTime = tx.firstSeenTime, totalInput = tx.totalInputsAmount.toString(), totalOutput = tx.totalOutputsAmount.toString(), ins = tx.ins.map { txIn -> CqlBitcoinTxIn(txIn) }, outs = tx.outs.map { txOut -> CqlBitcoinTxOut(txOut) } ) @@ -45,14 +47,15 @@ data class CqlBitcoinTx( data class CqlBitcoinTxIn( val contracts: List, val amount: BigDecimal, - val asm: String, + val scriptSig: CqlBitcoinSignatureScript, + val txinwitness: List, @Column("tx_hash") val txHash: String, @Column("tx_out") val txOut: Int ) { constructor(txIn: BitcoinTxIn) : this( - contracts = txIn.contracts, amount = txIn.amount, asm = txIn.asm, - txHash = txIn.txHash, txOut = txIn.txOut + contracts = txIn.contracts, amount = txIn.amount, scriptSig = CqlBitcoinSignatureScript(txIn.scriptSig), + txHash = txIn.txHash, txOut = txIn.txOut, txinwitness = txIn.txinwitness ) } @@ -70,3 +73,12 @@ data class CqlBitcoinTxOut( out = txOut.out, requiredSignatures = txOut.requiredSignatures ) } + +@UserDefinedType("script_sig") +data class CqlBitcoinSignatureScript( + val asm: String, + val hex: String +) { + + constructor(scriptSig: SignatureScript) : this (asm = scriptSig.asm, hex = scriptSig.hex) +} diff --git a/cassandra-service/src/main/resources/migrations/bitcoin/0_initial.cql b/cassandra-service/src/main/resources/migrations/bitcoin/0_initial.cql index 22f15ba9..e8da024d 100644 --- a/cassandra-service/src/main/resources/migrations/bitcoin/0_initial.cql +++ b/cassandra-service/src/main/resources/migrations/bitcoin/0_initial.cql @@ -25,7 +25,10 @@ CREATE TABLE IF NOT EXISTS bitcoin.tx_preview_by_block ( PRIMARY KEY ( block_number, "index" ) ) WITH CLUSTERING ORDER BY ( "index" ASC ); - +CREATE TYPE IF NOT EXISTS bitcoin.script_sig ( + asm text, + hex text +); CREATE TYPE IF NOT EXISTS bitcoin.tx_out ( contracts FROZEN < list < text > >, @@ -38,7 +41,8 @@ CREATE TYPE IF NOT EXISTS bitcoin.tx_out ( CREATE TYPE IF NOT EXISTS bitcoin.tx_in ( contracts FROZEN < list < text > >, amount decimal, - asm text, + scriptSig FROZEN < bitcoin.script_sig >, + txinwitness FROZEN < list < text > >, tx_hash text, tx_out int ); @@ -48,6 +52,7 @@ CREATE TABLE IF NOT EXISTS bitcoin.tx ( block_number bigint, block_hash text, block_time timestamp, + first_seen_time timestamp, size int, coinbase text, fee text, diff --git a/cassandra-service/src/main/resources/migrations/bitcoin_cash/0_initial.cql b/cassandra-service/src/main/resources/migrations/bitcoin_cash/0_initial.cql index 5af9fc94..ab4f3d2f 100644 --- a/cassandra-service/src/main/resources/migrations/bitcoin_cash/0_initial.cql +++ b/cassandra-service/src/main/resources/migrations/bitcoin_cash/0_initial.cql @@ -26,6 +26,10 @@ CREATE TABLE IF NOT EXISTS bitcoin_cash.tx_preview_by_block ( ) WITH CLUSTERING ORDER BY ( "index" ASC ); +CREATE TYPE IF NOT EXISTS bitcoin.script_sig ( + asm text, + hex text +); CREATE TYPE IF NOT EXISTS bitcoin_cash.tx_out ( contracts FROZEN < list < text > >, @@ -38,7 +42,8 @@ CREATE TYPE IF NOT EXISTS bitcoin_cash.tx_out ( CREATE TYPE IF NOT EXISTS bitcoin_cash.tx_in ( contracts FROZEN < list < text > >, amount decimal, - asm text, + scriptSig FROZEN < bitcoin_cash.script_sig >, + txinwitness FROZEN < list < text > >, tx_hash text, tx_out int ); @@ -48,6 +53,7 @@ CREATE TABLE IF NOT EXISTS bitcoin_cash.tx ( block_number bigint, block_hash text, block_time timestamp, + first_seen_time timestamp, size int, coinbase text, fee text, diff --git a/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Transaction.kt b/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Transaction.kt index 6cc5ac2e..c45e4d3c 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Transaction.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Transaction.kt @@ -26,12 +26,15 @@ data class BitcoinTx( fun allContractsUsedInTransaction() = ins.flatMap { input -> input.contracts } + outs.flatMap { output -> output.contracts } + + fun mempoolState() = this.copy(blockNumber = -1, blockHash = null, index = -1, blockTime = null) } data class BitcoinTxIn( val contracts: List, val amount: BigDecimal, - val asm: String, + val scriptSig: SignatureScript, + val txinwitness: List = emptyList(), val txHash: String, val txOut: Int ) diff --git a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt index dc74d4b3..a1c0a12a 100644 --- a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt +++ b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt @@ -4,14 +4,16 @@ import fund.cyber.cassandra.bitcoin.model.CqlBitcoinContractMinedBlock import fund.cyber.cassandra.bitcoin.model.CqlBitcoinBlock import fund.cyber.cassandra.bitcoin.repository.BitcoinContractMinedBlockRepository import fund.cyber.cassandra.bitcoin.repository.BitcoinBlockRepository -import fund.cyber.dump.common.filterNotContainsAllEventsOf -import fund.cyber.dump.common.toRecordEventsMap +import fund.cyber.dump.common.executeOperations import fund.cyber.search.model.bitcoin.BitcoinBlock import fund.cyber.search.model.chains.BitcoinFamilyChain import fund.cyber.search.model.events.PumpEvent import org.apache.kafka.clients.consumer.ConsumerRecord +import org.reactivestreams.Publisher import org.slf4j.LoggerFactory import org.springframework.kafka.listener.BatchMessageListener +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono class BlockDumpProcess( @@ -24,28 +26,28 @@ class BlockDumpProcess( override fun onMessage(records: List>) { - val first = records.first() - val last = records.last() - log.info("Dumping batch of ${first.value().height}-${last.value().height} $chain blocks") - - val recordsToProcess = records.toRecordEventsMap() - .filterNotContainsAllEventsOf(listOf(PumpEvent.NEW_BLOCK, PumpEvent.DROPPED_BLOCK)) - - val blocksToCommit = recordsToProcess.filter { entry -> entry.value.contains(PumpEvent.NEW_BLOCK) }.keys - val blocksToRevert = recordsToProcess.filter { entry -> entry.value.contains(PumpEvent.DROPPED_BLOCK) }.keys - - blockRepository - .deleteAll(blocksToRevert.map { block -> CqlBitcoinBlock(block) }) - .block() - blockRepository - .saveAll(blocksToCommit.map { block -> CqlBitcoinBlock(block) }) - .collectList().block() - - contractMinedBlockRepository - .deleteAll(blocksToRevert.map { block -> CqlBitcoinContractMinedBlock(block) }) - .block() - contractMinedBlockRepository - .saveAll(blocksToCommit.map { block -> CqlBitcoinContractMinedBlock(block) }) - .collectList().block() + log.info("Dumping batch of ${records.size} $chain blocks from offset ${records.first().offset()}") + + records.executeOperations { event, block -> + return@executeOperations when (event) { + PumpEvent.NEW_BLOCK -> block.toNewBlockPublisher() + PumpEvent.NEW_POOL_TX -> Mono.empty() + PumpEvent.DROPPED_BLOCK -> block.toDropBlockPublisher() + } + } + } + + private fun BitcoinBlock.toNewBlockPublisher(): Publisher { + val saveBlockMono = blockRepository.save(CqlBitcoinBlock(this)) + val saveContractBlockMono = contractMinedBlockRepository.save(CqlBitcoinContractMinedBlock(this)) + + return Flux.concat(saveBlockMono, saveContractBlockMono) + } + + private fun BitcoinBlock.toDropBlockPublisher(): Publisher { + val deleteBlockMono = blockRepository.delete(CqlBitcoinBlock(this)) + val deleteContractBlockMono = contractMinedBlockRepository.delete(CqlBitcoinContractMinedBlock(this)) + + return Flux.concat(deleteBlockMono, deleteContractBlockMono) } } diff --git a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt index 3fbb1c45..f921942c 100644 --- a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt +++ b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt @@ -1,61 +1,97 @@ package fund.cyber.dump.bitcoin -import fund.cyber.cassandra.bitcoin.model.CqlBitcoinContractTxPreview import fund.cyber.cassandra.bitcoin.model.CqlBitcoinBlockTxPreview import fund.cyber.cassandra.bitcoin.model.CqlBitcoinTx +import fund.cyber.cassandra.bitcoin.model.CqlBitcoinContractTxPreview import fund.cyber.cassandra.bitcoin.repository.BitcoinContractTxRepository import fund.cyber.cassandra.bitcoin.repository.BitcoinBlockTxRepository import fund.cyber.cassandra.bitcoin.repository.BitcoinTxRepository -import fund.cyber.dump.common.filterNotContainsAllEventsOf -import fund.cyber.dump.common.toRecordEventsMap +import fund.cyber.dump.common.executeOperations import fund.cyber.search.model.bitcoin.BitcoinTx import fund.cyber.search.model.chains.BitcoinFamilyChain import fund.cyber.search.model.events.PumpEvent import org.apache.kafka.clients.consumer.ConsumerRecord +import org.reactivestreams.Publisher import org.slf4j.LoggerFactory import org.springframework.kafka.listener.BatchMessageListener +import reactor.core.publisher.Flux +import reactor.core.publisher.toFlux class TxDumpProcess( - private val txRepository: BitcoinTxRepository, - private val contractTxRepository: BitcoinContractTxRepository, - private val blockTxRepository: BitcoinBlockTxRepository, - private val chain: BitcoinFamilyChain + private val txRepository: BitcoinTxRepository, + private val contractTxRepository: BitcoinContractTxRepository, + private val blockTxRepository: BitcoinBlockTxRepository, + private val chain: BitcoinFamilyChain ) : BatchMessageListener { private val log = LoggerFactory.getLogger(BatchMessageListener::class.java) override fun onMessage(records: List>) { - log.info("Dumping batch of ${records.size} $chain txs from offset ${records.first().offset()}") - - val recordsToProcess = records.toRecordEventsMap() - .filterNotContainsAllEventsOf(listOf(PumpEvent.NEW_BLOCK, PumpEvent.DROPPED_BLOCK)) - - val txsToCommit = recordsToProcess.filter { entry -> entry.value.contains(PumpEvent.NEW_BLOCK) }.keys - val txsToRevert = recordsToProcess.filter { entry -> entry.value.contains(PumpEvent.DROPPED_BLOCK) }.keys - - txRepository.deleteAll(txsToRevert.map { tx -> CqlBitcoinTx(tx) }) - .block() - txRepository - .saveAll(txsToCommit.map { tx -> CqlBitcoinTx(tx) }) - .collectList().block() - - contractTxRepository - .deleteAll(txsToRevert.flatMap { tx -> - tx.allContractsUsedInTransaction().map { contract -> CqlBitcoinContractTxPreview(contract, tx) } - }) - .block() - contractTxRepository - .saveAll(txsToCommit.flatMap { tx -> - tx.allContractsUsedInTransaction().map { contract -> CqlBitcoinContractTxPreview(contract, tx) } - }) - .collectList().block() - - blockTxRepository.deleteAll(txsToRevert.map { tx -> CqlBitcoinBlockTxPreview(tx) }) - .block() - blockTxRepository - .saveAll(txsToCommit.map { tx -> CqlBitcoinBlockTxPreview(tx) }) - .collectList().block() + log.info("Dumping batch of ${records.size} $chain transactions from offset ${records.first().offset()}") + + records.executeOperations { event, tx -> + return@executeOperations when (event) { + PumpEvent.NEW_BLOCK -> tx.toNewBlockPublisher() + PumpEvent.NEW_POOL_TX -> tx.toNewPoolItemPublisher() + PumpEvent.DROPPED_BLOCK -> tx.toDropBlockPublisher() + } + } + + } + + private fun BitcoinTx.toNewBlockPublisher(): Publisher { + + val saveTxMono = txRepository.findById(this.hash) + .flatMap { cqlTx -> txRepository.save(CqlBitcoinTx(this.copy(firstSeenTime = cqlTx.firstSeenTime))) } + .switchIfEmpty(txRepository.save(CqlBitcoinTx(this))) + + val saveBlockTxMono = blockTxRepository.save(CqlBitcoinBlockTxPreview(this)) + + + val entitiesToDelete = this.allContractsUsedInTransaction().toSet() + .map { it -> CqlBitcoinContractTxPreview(it, this.mempoolState()) } + + val entitiesToSave = this.allContractsUsedInTransaction().toSet() + .map { it -> CqlBitcoinContractTxPreview(it, this) } + + val saveContractTxesFlux = Flux.concat( + contractTxRepository.saveAll(entitiesToSave), + contractTxRepository.deleteAll(entitiesToDelete) + ) + + return Flux.concat(saveTxMono, saveBlockTxMono, saveContractTxesFlux) + } + + private fun BitcoinTx.toDropBlockPublisher(): Publisher { + + val saveTxMono = txRepository.findById(this.hash) + .flatMap { cqlTx -> + txRepository.save(CqlBitcoinTx(this.mempoolState().copy(firstSeenTime = cqlTx.firstSeenTime))) + } + .switchIfEmpty(txRepository.save(CqlBitcoinTx(this.mempoolState()))) + + val deleteBlockTxMono = blockTxRepository.delete(CqlBitcoinBlockTxPreview(this)) + + val deleteContractTxesFlux = contractTxRepository.deleteAll( + this.allContractsUsedInTransaction().toSet().map { it -> CqlBitcoinContractTxPreview(it, this) } + ) + + return Flux.concat(saveTxMono, deleteBlockTxMono, deleteContractTxesFlux) + } + + private fun BitcoinTx.toNewPoolItemPublisher(): Publisher { + + val contractTxesToSave = this.allContractsUsedInTransaction().toSet() + .map { it -> CqlBitcoinContractTxPreview(it, this) } + + return txRepository.findById(this.hash) + .map { it -> it as Any } // hack to convert Mono to Any type + .toFlux() + .switchIfEmpty( + Flux.concat(txRepository.save(CqlBitcoinTx(this)), contractTxRepository.saveAll(contractTxesToSave)) + ) } + } diff --git a/dumps/common/src/main/kotlin/fund/cyber/dump/common/Functions.kt b/dumps/common/src/main/kotlin/fund/cyber/dump/common/Functions.kt index 5fc9a7ff..523ccb73 100644 --- a/dumps/common/src/main/kotlin/fund/cyber/dump/common/Functions.kt +++ b/dumps/common/src/main/kotlin/fund/cyber/dump/common/Functions.kt @@ -2,25 +2,48 @@ package fund.cyber.dump.common import fund.cyber.search.model.events.PumpEvent import org.apache.kafka.clients.consumer.ConsumerRecord +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux -fun List>.toRecordEventsMap(): Map> { - val blockEvents = mutableMapOf>() - this.forEach { record -> - val events = blockEvents[record.value()] - if (events != null) { - events += record.key() +fun List>.executeOperations( + convertToOperations: (PumpEvent, RECORD) -> Publisher<*> +) { + val fluxesToExecute = this.compileOperations(convertToOperations) + + fluxesToExecute.forEach { flux -> + flux.collectList().block() + } +} + +fun List>.compileOperations( + convertToOperations: (PumpEvent, RECORD) -> Publisher<*> +): List> { + + if (this.isEmpty()) return emptyList() + + var previousEvent = PumpEvent.NEW_BLOCK + val fluxesToExecute = mutableListOf>() + + var compiledFlux = Flux.empty() + this.forEach { recordEvent -> + val event = recordEvent.key() + val record = recordEvent.value() + val recordOperations = convertToOperations(event, record) + + val endBatch = (event != PumpEvent.NEW_BLOCK && previousEvent == PumpEvent.NEW_BLOCK) + || (event == PumpEvent.NEW_BLOCK && previousEvent != PumpEvent.NEW_BLOCK) + + compiledFlux = if (endBatch) { + fluxesToExecute.add(compiledFlux) + Flux.empty().concatWith(recordOperations) } else { - blockEvents[record.value()] = mutableListOf(record.key()) + compiledFlux.concatWith(recordOperations) } + + previousEvent = event } - return blockEvents -} + fluxesToExecute.add(compiledFlux) -fun Map>.filterNotContainsAllEventsOf( - events: Collection -): Map> { - return this.filterNot { entry -> - entry.value.containsAll(events) - } + return fluxesToExecute } diff --git a/dumps/common/src/test/kotlin/fund/cyber/dump/common/FuntionsTest.kt b/dumps/common/src/test/kotlin/fund/cyber/dump/common/FuntionsTest.kt index 2937f282..24c2779a 100644 --- a/dumps/common/src/test/kotlin/fund/cyber/dump/common/FuntionsTest.kt +++ b/dumps/common/src/test/kotlin/fund/cyber/dump/common/FuntionsTest.kt @@ -4,6 +4,10 @@ import fund.cyber.search.model.events.PumpEvent import org.apache.kafka.clients.consumer.ConsumerRecord import org.assertj.core.api.Assertions import org.junit.jupiter.api.Test +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.publisher.toFlux +import reactor.test.StepVerifier data class TestData( private val hash: String, @@ -13,42 +17,186 @@ data class TestData( class FunctionsTest { - private val valueA = TestData("a", 1, "a") - private val valueB = TestData("a", 1, "b") - - private val records = listOf( - ConsumerRecord("topic", 0, 0, PumpEvent.NEW_BLOCK, valueA), - ConsumerRecord("topic", 0, 2, PumpEvent.DROPPED_BLOCK, valueA), - ConsumerRecord("topic", 0, 4, PumpEvent.NEW_BLOCK, valueB) - ) - @Test - fun toRecordsEventsMapTest() { - - val recordsEventsMap = records.toRecordEventsMap() + fun compileOperationsTest() { + val records = listOf( + testRecord(PumpEvent.NEW_BLOCK), + testRecord(PumpEvent.DROPPED_BLOCK), + testRecord(PumpEvent.NEW_POOL_TX), + testRecord(PumpEvent.NEW_BLOCK), + testRecord(PumpEvent.NEW_BLOCK) + ) + + val fluxesToExecute = records.compileOperations { event, _ -> + return@compileOperations when (event) { + PumpEvent.NEW_BLOCK -> Flux.fromIterable(listOf("a1", "a2", "a3")) + PumpEvent.NEW_POOL_TX -> Flux.fromIterable(listOf("b1", "b2")) + PumpEvent.DROPPED_BLOCK -> Flux.fromIterable(listOf("c1", "c2", "c3")) + } + } + + Assertions.assertThat(fluxesToExecute).hasSize(3) + + StepVerifier.create(fluxesToExecute[0]) + .expectNext("a1") + .expectNext("a2") + .expectNext("a3") + .verifyComplete() + + StepVerifier.create(fluxesToExecute[1]) + .expectNext("c1") + .expectNext("c2") + .expectNext("c3") + .expectNext("b1") + .expectNext("b2") + .verifyComplete() + + StepVerifier.create(fluxesToExecute[2]) + .expectNext("a1") + .expectNext("a2") + .expectNext("a3") + .expectNext("a1") + .expectNext("a2") + .expectNext("a3") + .verifyComplete() - Assertions.assertThat(recordsEventsMap).hasSize(2) - Assertions.assertThat(recordsEventsMap.keys).containsExactly(valueA, valueB) - Assertions.assertThat(recordsEventsMap[valueA]).containsExactly(PumpEvent.NEW_BLOCK, PumpEvent.DROPPED_BLOCK) - Assertions.assertThat(recordsEventsMap[valueB]).containsExactly(PumpEvent.NEW_BLOCK) } @Test - fun filterNotContainsAllEventsOfTest() { - - val recordsEventsMap = records.toRecordEventsMap() + fun compileOperationsWithNoRecordsTest() { + val records = emptyList>() + + val fluxesToExecute = records.compileOperations { event, _ -> + return@compileOperations when (event) { + PumpEvent.NEW_BLOCK -> Flux.fromIterable(listOf("a1", "a2", "a3")) + PumpEvent.NEW_POOL_TX -> Flux.fromIterable(listOf("b1", "b2")) + PumpEvent.DROPPED_BLOCK -> Flux.fromIterable(listOf("c1", "c2", "c3")) + } + } + + Assertions.assertThat(fluxesToExecute).isEmpty() + } - val filtedByNewBlock = recordsEventsMap.filterNotContainsAllEventsOf(listOf(PumpEvent.NEW_BLOCK)) + @Test + fun compileOperationsWithEmptyRecordsTest() { + val records = listOf( + testRecord(PumpEvent.NEW_BLOCK), + testRecord(PumpEvent.DROPPED_BLOCK), + testRecord(PumpEvent.NEW_POOL_TX), + testRecord(PumpEvent.NEW_BLOCK), + testRecord(PumpEvent.NEW_BLOCK) + ) + + val fluxesToExecute = records.compileOperations { event, _ -> + return@compileOperations when (event) { + PumpEvent.NEW_BLOCK -> Flux.empty() + PumpEvent.NEW_POOL_TX -> Flux.empty() + PumpEvent.DROPPED_BLOCK -> Flux.empty() + } + } + + Assertions.assertThat(fluxesToExecute).hasSize(3) + + StepVerifier.create(fluxesToExecute[0]) + .verifyComplete() + + StepVerifier.create(fluxesToExecute[1]) + .verifyComplete() + + StepVerifier.create(fluxesToExecute[2]) + .verifyComplete() + } - Assertions.assertThat(filtedByNewBlock).isEmpty() - val filtedByNewBlockAndDroppedBlock = recordsEventsMap - .filterNotContainsAllEventsOf(listOf(PumpEvent.NEW_BLOCK, PumpEvent.DROPPED_BLOCK)) + @Test + fun compileOperationsWithDifferentPublishersTest() { + val records = listOf( + testRecord(PumpEvent.NEW_BLOCK), + testRecord(PumpEvent.DROPPED_BLOCK), + testRecord(PumpEvent.NEW_POOL_TX), + testRecord(PumpEvent.NEW_BLOCK), + testRecord(PumpEvent.NEW_BLOCK) + ) + + val fluxesToExecute = records.compileOperations { event, _ -> + return@compileOperations when (event) { + PumpEvent.NEW_BLOCK -> Flux.fromIterable(listOf("a1", "a2", "a3")) + PumpEvent.NEW_POOL_TX -> Mono.just("b1") + PumpEvent.DROPPED_BLOCK -> Flux.fromIterable(listOf("c1", "c2", "c3")) + } + } + + Assertions.assertThat(fluxesToExecute).hasSize(3) + + + StepVerifier.create(fluxesToExecute[0]) + .expectNext("a1") + .expectNext("a2") + .expectNext("a3") + .verifyComplete() + + StepVerifier.create(fluxesToExecute[1]) + .expectNext("c1") + .expectNext("c2") + .expectNext("c3") + .expectNext("b1") + .verifyComplete() + + StepVerifier.create(fluxesToExecute[2]) + .expectNext("a1") + .expectNext("a2") + .expectNext("a3") + .expectNext("a1") + .expectNext("a2") + .expectNext("a3") + .verifyComplete() + } - Assertions.assertThat(filtedByNewBlockAndDroppedBlock).hasSize(1) - Assertions.assertThat(filtedByNewBlockAndDroppedBlock.keys).containsExactly(valueB) - Assertions.assertThat(filtedByNewBlockAndDroppedBlock[valueB]).containsExactly(PumpEvent.NEW_BLOCK) + @Test + fun hacksWithPublishersTest() { + val testMono = Mono.just("a") + .flatMap { _ -> Mono.just(1) } + .switchIfEmpty(Mono.just(2)) + + StepVerifier.create(testMono) + .expectNext(1) + .verifyComplete() + + val testMono2 = Mono.just(3) + .map { 1 } //stub to convert in right type + .toFlux() + .switchIfEmpty(Flux.just(2)) + + StepVerifier.create(testMono2) + .expectNext(1) + .verifyComplete() + + val testFlux3 = Mono.just(4L) + .map { it -> it as Any } // stub to convert whole Flux to right type + .toFlux() + .switchIfEmpty( + Flux.concat(Flux.just(1), Flux.just("a")) + ) + + StepVerifier.create(testFlux3) + .expectNext(4L) + .verifyComplete() + + val testFlux4 = Mono.empty() + .map { it -> it as Any } // stub to convert whole Flux to right type + .toFlux() + .switchIfEmpty( + Flux.concat(Flux.just(1), Flux.just("a")) + ) + + StepVerifier.create(testFlux4) + .expectNext(1) + .expectNext("a") + .verifyComplete() } + private fun testRecord(event: PumpEvent) = + ConsumerRecord("topic", 0, 0, event, TestData("a", 1, "a")) + } diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt index b27a16ed..2f5cceee 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt @@ -4,14 +4,15 @@ import fund.cyber.cassandra.ethereum.model.CqlEthereumContractMinedBlock import fund.cyber.cassandra.ethereum.model.CqlEthereumBlock import fund.cyber.cassandra.ethereum.repository.EthereumContractMinedBlockRepository import fund.cyber.cassandra.ethereum.repository.EthereumBlockRepository -import fund.cyber.dump.common.filterNotContainsAllEventsOf -import fund.cyber.dump.common.toRecordEventsMap +import fund.cyber.dump.common.executeOperations import fund.cyber.search.model.chains.EthereumFamilyChain import fund.cyber.search.model.ethereum.EthereumBlock import fund.cyber.search.model.events.PumpEvent import org.apache.kafka.clients.consumer.ConsumerRecord +import org.reactivestreams.Publisher import org.slf4j.LoggerFactory import org.springframework.kafka.listener.BatchMessageListener +import reactor.core.publisher.Mono class BlockDumpProcess( @@ -26,25 +27,26 @@ class BlockDumpProcess( log.info("Dumping batch of ${records.size} $chain blocks from offset ${records.first().offset()}") - val recordsToProcess = records.toRecordEventsMap() - .filterNotContainsAllEventsOf(listOf(PumpEvent.NEW_BLOCK, PumpEvent.DROPPED_BLOCK)) + records.executeOperations { event, block -> + return@executeOperations when (event) { + PumpEvent.NEW_BLOCK -> block.toNewBlockPublisher() + PumpEvent.NEW_POOL_TX -> Mono.empty() + PumpEvent.DROPPED_BLOCK -> block.toDropBlockPublisher() + } + } + } - val blocksToCommit = recordsToProcess.filter { entry -> entry.value.contains(PumpEvent.NEW_BLOCK) }.keys - val blocksToRevert = recordsToProcess.filter { entry -> entry.value.contains(PumpEvent.DROPPED_BLOCK) }.keys + private fun EthereumBlock.toNewBlockPublisher(): Publisher { + val saveBlockMono = blockRepository.save(CqlEthereumBlock(this)) + val saveContractBlockMono = contractMinedBlockRepository.save(CqlEthereumContractMinedBlock(this)) + return reactor.core.publisher.Flux.concat(saveBlockMono, saveContractBlockMono) + } - blockRepository - .deleteAll(blocksToRevert.map { block -> CqlEthereumBlock(block) }) - .block() - blockRepository - .saveAll(blocksToCommit.map { block -> CqlEthereumBlock(block) }) - .collectList().block() + private fun EthereumBlock.toDropBlockPublisher(): Publisher { + val deleteBlockMono = blockRepository.delete(CqlEthereumBlock(this)) + val deleteContractBlockMono = contractMinedBlockRepository.delete(CqlEthereumContractMinedBlock(this)) - contractMinedBlockRepository - .deleteAll(blocksToRevert.map { block -> CqlEthereumContractMinedBlock(block) }) - .block() - contractMinedBlockRepository - .saveAll(blocksToCommit.map { block -> CqlEthereumContractMinedBlock(block) }) - .collectList().block() + return reactor.core.publisher.Flux.concat(deleteBlockMono, deleteContractBlockMono) } } diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt index a50175b3..f583bad1 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt @@ -6,26 +6,19 @@ import fund.cyber.cassandra.ethereum.model.CqlEthereumTx import fund.cyber.cassandra.ethereum.repository.EthereumBlockTxRepository import fund.cyber.cassandra.ethereum.repository.EthereumContractTxRepository import fund.cyber.cassandra.ethereum.repository.EthereumTxRepository -import fund.cyber.dump.common.filterNotContainsAllEventsOf -import fund.cyber.dump.common.toRecordEventsMap +import fund.cyber.dump.common.executeOperations import fund.cyber.search.model.chains.EthereumFamilyChain import fund.cyber.search.model.ethereum.EthereumTx import fund.cyber.search.model.events.PumpEvent import org.apache.kafka.clients.consumer.ConsumerRecord +import org.reactivestreams.Publisher import org.slf4j.LoggerFactory import org.springframework.kafka.listener.BatchMessageListener +import reactor.core.publisher.Flux +import reactor.core.publisher.toFlux //todo in dump add label for smart contract from calls -/** - * Transform transaction based on event. For [PumpEvent.DROPPED_BLOCK] we transform transaction to mempool state. - * For others it remains as it is. - * - * @param event used to transform - * @return transformed transaction - */ -fun EthereumTx.transformBy(event: PumpEvent) = if (event == PumpEvent.DROPPED_BLOCK) this.mempoolState() else this - class TxDumpProcess( private val txRepository: EthereumTxRepository, private val blockTxRepository: EthereumBlockTxRepository, @@ -39,155 +32,67 @@ class TxDumpProcess( log.info("Dumping batch of ${records.size} $chain transactions from offset ${records.first().offset()}") - val currentDbTxs = txRepository - .findAllById(records.map { r -> r.value().hash }).collectList().block()!! - .associateBy { tx -> tx.hash } + records.executeOperations { event, tx -> + return@executeOperations when (event) { + PumpEvent.NEW_BLOCK -> tx.toNewBlockPublisher() + PumpEvent.NEW_POOL_TX -> tx.toNewPoolItemPublisher() + PumpEvent.DROPPED_BLOCK -> tx.toDropBlockPublisher() + } + } - saveTransactions(records, currentDbTxs) - saveBlockTransactions(records) - saveContractTransactions(records, currentDbTxs) } - /** - * Save transactions to cassandra. We assume that all events are sequential. - * If in batch we have a lot of events for same transaction hash then we take the last one event - * and saving its state in database. - * - * [PumpEvent.DROPPED_BLOCK] returning transaction in mempool state. - * - * For [PumpEvent.NEW_POOL_TX] event if we already have this transaction in database we skipping this event - * cause it means that chain reorganization occurred and it's already (or will be) returned in mempool state by - * [PumpEvent.DROPPED_BLOCK] event. - * - * [EthereumTx.firstSeenTime] for transactions is updated from database every time if we already - * have this transaction in database. - * - * @param records kafka records with pairs of events and transactions to save. - * @param currentDbTxs map of transactions by hash that are already in database. - */ - private fun saveTransactions(records: List>, - currentDbTxs: Map): Map { - val txLastEventMap = recordIdToLastEvent(records) - - val txsToSave = txLastEventMap - // we don't need to save pool tx if we already have some state for this tx in DB - .filter { entry -> !(entry.value.first == PumpEvent.NEW_POOL_TX && currentDbTxs.contains(entry.key)) } - .map { entry -> - val (event, tx) = entry.value - val currentDbTx = currentDbTxs[entry.value.second.hash] - val newTx = tx.transformBy(event) - // Set first seen time from previous state in DB - if (currentDbTx != null) { - return@map newTx.copy(firstSeenTime = currentDbTx.firstSeenTime) - } - return@map newTx - } + private fun EthereumTx.toNewBlockPublisher(): Publisher { - log.info("Total transactions to save ${txsToSave.size}") + val saveTxMono = txRepository.findById(this.hash) + .flatMap { cqlTx -> txRepository.save(CqlEthereumTx(this.copy(firstSeenTime = cqlTx.firstSeenTime))) } + .switchIfEmpty(txRepository.save(CqlEthereumTx(this))) - txRepository.saveAll(txsToSave.map { tx -> CqlEthereumTx(tx) }).collectList().block() - return currentDbTxs - } + val saveBlockTxMono = blockTxRepository.save(CqlEthereumBlockTxPreview(this)) - /** - * Save transactions previews by block to cassandra. - * Here we don't need to handle transaction with [PumpEvent.NEW_POOL_TX] event cause they're not in block. - * - * Preview is unique by [CqlEthereumBlockTxPreview.blockNumber] and [CqlEthereumBlockTxPreview.positionInBlock] - * so if we have [PumpEvent.NEW_BLOCK] and [PumpEvent.DROPPED_BLOCK] events at the same time for transactions - * with the same [EthereumTx.blockNumber] and [EthereumTx.positionInBlock] then we don't need to do anything on it. - * - * @param records kafka records with pairs of events and transactions to save. - */ - private fun saveBlockTransactions(records: List>) { - // we don't need to handle NEW_POOL_TXes for transactions by block preview. - val blockTxsToProcess = records - .filter { record -> record.key() != PumpEvent.NEW_POOL_TX } - .toRecordEventsMap() - .filterNotContainsAllEventsOf(listOf(PumpEvent.NEW_BLOCK, PumpEvent.DROPPED_BLOCK)) - .map { entry -> CqlEthereumBlockTxPreview(entry.key) to entry.value } - .toMap() - - blockTxRepository - .deleteAll(blockTxsToProcess.filter { entry -> entry.value.contains(PumpEvent.DROPPED_BLOCK) }.keys) - .block() - blockTxRepository - .saveAll(blockTxsToProcess.filter { entry -> entry.value.contains(PumpEvent.NEW_BLOCK) }.keys) - .collectList().block() - } - /** - * Save transactions previews by contract to cassandra. - * Firstly we saving transactions with [PumpEvent.NEW_POOL_TX] event. It's neccesary cause we could have - * [PumpEvent.NEW_POOL_TX] event and [PumpEvent.NEW_BLOCK] event at the same batch. By saving - * [PumpEvent.NEW_POOL_TX] first we could then delete it with other mempool transactions that should be deleted - * by [PumpEvent.NEW_BLOCK] event that means transaction is not in mempool anymore. - * - * @param records kafka records with pairs of events and transactions to save. - * @param currentDbTxs map of transactions by hash that are already in database. - */ - private fun saveContractTransactions(records: List>, - currentDbTxs: Map) { - val contractTxsToProcess = records - .toRecordEventsMap() - .filterNotContainsAllEventsOf(listOf(PumpEvent.NEW_BLOCK, PumpEvent.DROPPED_BLOCK)) - .map { entry -> - val currentDbTx = currentDbTxs[entry.key.hash] - val newTx = entry.key - // Set first seen time from previous state in DB - if (currentDbTx != null) { - return@map newTx.copy(firstSeenTime = currentDbTx.firstSeenTime) to entry.value - } - return@map newTx to entry.value - }.toMap() - - - fun Collection.toContractTxes() = this.flatMap { tx -> - tx.contractsUsedInTransaction().map { it -> CqlEthereumContractTxPreview(tx, it) } - } + val entitiesToDelete = this.contractsUsedInTransaction().toSet() + .map { it -> CqlEthereumContractTxPreview(this.mempoolState(), it) } + + val entitiesToSave = this.contractsUsedInTransaction().toSet() + .map { it -> CqlEthereumContractTxPreview(this, it) } + + val saveContractTxesFlux = Flux.concat( + contractTxRepository.saveAll(entitiesToSave), + contractTxRepository.deleteAll(entitiesToDelete) + ) - val mempoolContractTxesToSave = contractTxsToProcess - .filter { entry -> entry.value.contains(PumpEvent.NEW_POOL_TX) } - .keys.toContractTxes() - // We should drop all txes with DROPPED_BLOCK event and mempool state transactions for NEW_BLOCK txes - val contractTxesToDrop = contractTxsToProcess.filter { entry -> !entry.value.contains(PumpEvent.NEW_POOL_TX) } - .map { entry -> if (entry.value.contains(PumpEvent.NEW_BLOCK)) entry.key.mempoolState() else entry.key } - .toContractTxes() - val contractTxesToSave = contractTxsToProcess - .filter { entry -> entry.value.contains(PumpEvent.NEW_BLOCK) } - .keys.toContractTxes() - - - contractTxRepository.saveAll(mempoolContractTxesToSave).collectList().block() - contractTxRepository.deleteAll(contractTxesToDrop).block() - contractTxRepository.saveAll(contractTxesToSave).collectList().block() + return Flux.concat(saveTxMono, saveBlockTxMono, saveContractTxesFlux) } - /** - * Create map of record id to last applied event and state to handle situation when we have multiply records - * in batch with the same id. It could happen when they have different events. - * - * @param records list of records - * @return map of record id to last applied event and record state - * - */ - private fun recordIdToLastEvent( - records: List> - ): MutableMap> { - val txLastEventMap = mutableMapOf>() - records.forEach { record -> - val event = record.key() - val tx = record.value() - val currentTxState = txLastEventMap[tx.hash] - if (currentTxState == null) { - txLastEventMap[tx.hash] = event to tx - } else { - if (record.key() != PumpEvent.NEW_POOL_TX) { - txLastEventMap[tx.hash] = event to tx - } + private fun EthereumTx.toDropBlockPublisher(): Publisher { + + val saveTxMono = txRepository.findById(this.hash) + .flatMap { cqlTx -> + txRepository.save(CqlEthereumTx(this.mempoolState().copy(firstSeenTime = cqlTx.firstSeenTime))) } - } - return txLastEventMap + .switchIfEmpty(txRepository.save(CqlEthereumTx(this.mempoolState()))) + + val deleteBlockTxMono = blockTxRepository.delete(CqlEthereumBlockTxPreview(this)) + + val deleteContractTxesFlux = contractTxRepository.deleteAll( + this.contractsUsedInTransaction().toSet().map { it -> CqlEthereumContractTxPreview(this, it) } + ) + + return Flux.concat(saveTxMono, deleteBlockTxMono, deleteContractTxesFlux) + } + + private fun EthereumTx.toNewPoolItemPublisher(): Publisher { + + val contractTxesToSave = this.contractsUsedInTransaction().toSet() + .map { it -> CqlEthereumContractTxPreview(this, it) } + + return txRepository.findById(this.hash) + .map { it -> it as Any } // hack to convert Mono to Any type + .toFlux() + .switchIfEmpty( + Flux.concat(txRepository.save(CqlEthereumTx(this)), contractTxRepository.saveAll(contractTxesToSave)) + ) } } diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt index ad3e3b1e..9329e606 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt @@ -4,14 +4,15 @@ import fund.cyber.cassandra.ethereum.model.CqlEthereumContractMinedUncle import fund.cyber.cassandra.ethereum.model.CqlEthereumUncle import fund.cyber.cassandra.ethereum.repository.EthereumUncleRepository import fund.cyber.cassandra.ethereum.repository.EthereumContractUncleRepository -import fund.cyber.dump.common.filterNotContainsAllEventsOf -import fund.cyber.dump.common.toRecordEventsMap +import fund.cyber.dump.common.executeOperations import fund.cyber.search.model.chains.EthereumFamilyChain import fund.cyber.search.model.ethereum.EthereumUncle import fund.cyber.search.model.events.PumpEvent import org.apache.kafka.clients.consumer.ConsumerRecord +import org.reactivestreams.Publisher import org.slf4j.LoggerFactory import org.springframework.kafka.listener.BatchMessageListener +import reactor.core.publisher.Mono class UncleDumpProcess( @@ -26,20 +27,26 @@ class UncleDumpProcess( log.info("Dumping batch of ${records.size} $chain uncles from offset ${records.first().offset()}") - val recordsToProcess = records.toRecordEventsMap() - .filterNotContainsAllEventsOf(listOf(PumpEvent.NEW_BLOCK, PumpEvent.DROPPED_BLOCK)) + records.executeOperations { event, uncle -> + return@executeOperations when (event) { + PumpEvent.NEW_BLOCK -> uncle.toNewBlockPublisher() + PumpEvent.NEW_POOL_TX -> Mono.empty() + PumpEvent.DROPPED_BLOCK -> uncle.toDropBlockPublisher() + } + } + } + + private fun EthereumUncle.toNewBlockPublisher(): Publisher { + val saveBlockMono = uncleRepository.save(CqlEthereumUncle(this)) + val saveContractBlockMono = contractUncleRepository.save(CqlEthereumContractMinedUncle(this)) - val unclesToCommit = recordsToProcess.filter { entry -> entry.value.contains(PumpEvent.NEW_BLOCK) }.keys - val unclesToRevert = recordsToProcess.filter { entry -> entry.value.contains(PumpEvent.DROPPED_BLOCK) }.keys + return reactor.core.publisher.Flux.concat(saveBlockMono, saveContractBlockMono) + } - uncleRepository.deleteAll(unclesToRevert.map { uncle -> CqlEthereumUncle(uncle) }).block() - uncleRepository.saveAll(unclesToCommit.map { uncle -> CqlEthereumUncle(uncle) }).collectList().block() + private fun EthereumUncle.toDropBlockPublisher(): Publisher { + val deleteBlockMono = uncleRepository.delete(CqlEthereumUncle(this)) + val deleteContractBlockMono = contractUncleRepository.delete(CqlEthereumContractMinedUncle(this)) - contractUncleRepository - .deleteAll(unclesToRevert.map { uncle -> CqlEthereumContractMinedUncle(uncle) }) - .block() - contractUncleRepository - .saveAll(unclesToCommit.map { uncle -> CqlEthereumContractMinedUncle(uncle) }) - .collectList().block() + return reactor.core.publisher.Flux.concat(deleteBlockMono, deleteContractBlockMono) } } diff --git a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/BlockDumpProcessTest.kt b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/BlockDumpProcessTest.kt index e7af3663..298b350b 100644 --- a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/BlockDumpProcessTest.kt +++ b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/BlockDumpProcessTest.kt @@ -1,6 +1,7 @@ package fund.cyber.dump.ethereum import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.times import com.nhaarman.mockito_kotlin.verify @@ -14,7 +15,6 @@ import fund.cyber.search.model.events.PumpEvent import fund.cyber.search.model.events.blockPumpTopic import org.apache.kafka.clients.consumer.ConsumerRecord import org.junit.jupiter.api.Test -import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.math.BigDecimal import java.math.BigInteger @@ -138,13 +138,14 @@ class BlockDumpProcessTest { val record8 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.blockPumpTopic, 0, 0, PumpEvent.NEW_BLOCK, blockI) + val blockRepository = mock { - on { saveAll(any>()) }.thenReturn(Flux.empty()) - on { deleteAll(any>()) }.thenReturn(Mono.empty()) + on { save(any()) }.doReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) } val contractMinedBlockRepository = mock { - on { saveAll(any>()) }.thenReturn(Flux.empty()) - on { deleteAll(any>()) }.thenReturn(Mono.empty()) + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) } val blockDumpProcess = BlockDumpProcess(blockRepository, contractMinedBlockRepository, @@ -153,17 +154,22 @@ class BlockDumpProcessTest { blockDumpProcess.onMessage(listOf(record1, record2, record3, record4, record5, record6, record7, record8)) - verify(blockRepository, times(1)) - .saveAll(listOf(CqlEthereumBlock(blockD), CqlEthereumBlock(blockE), - CqlEthereumBlock(blockG), CqlEthereumBlock(blockI))) - verify(blockRepository, times(1)) - .deleteAll(listOf(CqlEthereumBlock(blockF), CqlEthereumBlock(blockC))) + verify(blockRepository, times(1)).save(CqlEthereumBlock(blockD)) + verify(blockRepository, times(1)).save(CqlEthereumBlock(blockE)) + verify(blockRepository, times(1)).save(CqlEthereumBlock(blockG)) + verify(blockRepository, times(1)).save(CqlEthereumBlock(blockI)) + + verify(blockRepository, times(1)).delete(CqlEthereumBlock(blockF)) + verify(blockRepository, times(1)).delete(CqlEthereumBlock(blockC)) + + verify(contractMinedBlockRepository, times(1)).save(CqlEthereumContractMinedBlock(blockD)) + verify(contractMinedBlockRepository, times(1)).save(CqlEthereumContractMinedBlock(blockE)) + verify(contractMinedBlockRepository, times(1)).save(CqlEthereumContractMinedBlock(blockG)) + verify(contractMinedBlockRepository, times(1)).save(CqlEthereumContractMinedBlock(blockI)) + - verify(contractMinedBlockRepository, times(1)) - .saveAll(listOf(CqlEthereumContractMinedBlock(blockD), CqlEthereumContractMinedBlock(blockE), - CqlEthereumContractMinedBlock(blockG), CqlEthereumContractMinedBlock(blockI))) - verify(contractMinedBlockRepository, times(1)) - .deleteAll(listOf(CqlEthereumContractMinedBlock(blockF), CqlEthereumContractMinedBlock(blockC))) + verify(contractMinedBlockRepository, times(1)).delete(CqlEthereumContractMinedBlock(blockF)) + verify(contractMinedBlockRepository, times(1)).delete(CqlEthereumContractMinedBlock(blockC)) } diff --git a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/TxDumpProcessTest.kt b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/TxDumpProcessTest.kt index bee742fb..d3ba57b2 100644 --- a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/TxDumpProcessTest.kt +++ b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/TxDumpProcessTest.kt @@ -93,6 +93,15 @@ class TxDumpProcessTest { gasUsed = 21000L, fee = BigDecimal.ZERO, input = "", createdSmartContract = null, trace = null ) + val txK = EthereumTx( + hash = "K", error = null, + nonce = 0, blockHash = null, + blockNumber = -1, blockTime = null, positionInBlock = -1, + from = "a", to = "b", firstSeenTime = Instant.ofEpochSecond(100000), + value = BigDecimal.ZERO, gasPrice = BigDecimal.ZERO, gasLimit = 0, + gasUsed = 21000L, fee = BigDecimal.ZERO, input = "", createdSmartContract = null, trace = null + ) + val record1 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, 0, PumpEvent.NEW_BLOCK, txH) @@ -110,15 +119,17 @@ class TxDumpProcessTest { 0, PumpEvent.NEW_BLOCK, txG) val record8 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, 0, PumpEvent.NEW_BLOCK, txI) + val record9 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, + 0, PumpEvent.NEW_POOL_TX, txK) val txRepository = mock { - on { saveAll(any>()) }.thenReturn(Flux.empty()) - on { deleteAll(any>()) }.thenReturn(Mono.empty()) - on { findAllById(any>()) }.thenReturn(Flux.empty()) + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.empty()) } val blockTxRepository = mock { - on { saveAll(any>()) }.thenReturn(Flux.empty()) - on { deleteAll(any>()) }.thenReturn(Mono.empty()) + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) } val contractTxRepository = mock { on { saveAll(any>()) }.thenReturn(Flux.empty()) @@ -128,40 +139,49 @@ class TxDumpProcessTest { val txDumpProcess = TxDumpProcess(txRepository, blockTxRepository, contractTxRepository, EthereumFamilyChain.ETHEREUM) - txDumpProcess.onMessage(listOf(record1, record2, record3, record4, record5, record6, record7, record8)) - - - verify(txRepository, times(1)) - .saveAll( - listOf( - CqlEthereumTx(txH.mempoolState()), CqlEthereumTx(txF.mempoolState()), - CqlEthereumTx(txC.mempoolState()), CqlEthereumTx(txD), CqlEthereumTx(txE), - CqlEthereumTx(txG), CqlEthereumTx(txI) - ) - ) - - verify(blockTxRepository, times(1)) - .deleteAll(linkedSetOf(CqlEthereumBlockTxPreview(txF), CqlEthereumBlockTxPreview(txC))) - verify(blockTxRepository, times(1)) - .saveAll(linkedSetOf(CqlEthereumBlockTxPreview(txD), CqlEthereumBlockTxPreview(txE), - CqlEthereumBlockTxPreview(txG), CqlEthereumBlockTxPreview(txI))) - - verify(contractTxRepository, times(1)) - .deleteAll( - listOf(txF, txC, txD.mempoolState(), txE.mempoolState(), txG.mempoolState(), txI.mempoolState()) - .flatMap { tx -> - tx.contractsUsedInTransaction().map { it -> CqlEthereumContractTxPreview(tx, it) } - } - ) - verify(contractTxRepository, times(1)) - .saveAll( - listOf(txD, txE, txG, txI) - .flatMap { tx -> - tx.contractsUsedInTransaction().map { it -> CqlEthereumContractTxPreview(tx, it) } - } - ) - - + txDumpProcess.onMessage(listOf(record1, record2, record3, record4, record5, record6, record7, record8, record9)) + + + verify(txRepository, times(2)).findById(txH.hash) + verify(txRepository, times(1)).findById(txF.hash) + verify(txRepository, times(1)).findById(txC.hash) + verify(txRepository, times(1)).findById(txD.hash) + verify(txRepository, times(1)).findById(txE.hash) + verify(txRepository, times(1)).findById(txG.hash) + verify(txRepository, times(1)).findById(txI.hash) + verify(txRepository, times(1)).findById(txK.hash) + + verify(txRepository, times(1)).save(CqlEthereumTx(txH)) + verify(txRepository, times(1)).save(CqlEthereumTx(txH.mempoolState())) + verify(txRepository, times(1)).save(CqlEthereumTx(txF.mempoolState())) + verify(txRepository, times(1)).save(CqlEthereumTx(txC.mempoolState())) + verify(txRepository, times(1)).save(CqlEthereumTx(txD)) + verify(txRepository, times(1)).save(CqlEthereumTx(txE)) + verify(txRepository, times(1)).save(CqlEthereumTx(txG)) + verify(txRepository, times(1)).save(CqlEthereumTx(txI)) + verify(txRepository, times(1)).save(CqlEthereumTx(txK)) + + verify(blockTxRepository, times(1)).save(CqlEthereumBlockTxPreview(txH)) + verify(blockTxRepository, times(1)).delete(CqlEthereumBlockTxPreview(txH)) + verify(blockTxRepository, times(1)).delete(CqlEthereumBlockTxPreview(txF)) + verify(blockTxRepository, times(1)).delete(CqlEthereumBlockTxPreview(txC)) + verify(blockTxRepository, times(1)).save(CqlEthereumBlockTxPreview(txD)) + verify(blockTxRepository, times(1)).save(CqlEthereumBlockTxPreview(txE)) + verify(blockTxRepository, times(1)).save(CqlEthereumBlockTxPreview(txG)) + verify(blockTxRepository, times(1)).save(CqlEthereumBlockTxPreview(txI)) + + + listOf(txH, txF, txC, txH.mempoolState(), txD.mempoolState(), txE.mempoolState(), txG.mempoolState(), txI.mempoolState()) + .forEach { tx -> + verify(contractTxRepository, times(1)) + .deleteAll(tx.contractsUsedInTransaction().map { it -> CqlEthereumContractTxPreview(tx, it) }) + } + + listOf(txH, txD, txE, txG, txI, txK) + .forEach { tx -> + verify(contractTxRepository, times(1)) + .saveAll(tx.contractsUsedInTransaction().map { it -> CqlEthereumContractTxPreview(tx, it) }) + } } diff --git a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/UncleDumpProcessTest.kt b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/UncleDumpProcessTest.kt index 63302ab7..1858f2c1 100644 --- a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/UncleDumpProcessTest.kt +++ b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/UncleDumpProcessTest.kt @@ -14,7 +14,6 @@ import fund.cyber.search.model.events.PumpEvent import fund.cyber.search.model.events.unclePumpTopic import org.apache.kafka.clients.consumer.ConsumerRecord import org.junit.jupiter.api.Test -import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.math.BigDecimal import java.time.Instant @@ -81,12 +80,12 @@ class UncleDumpProcessTest { 0, PumpEvent.NEW_BLOCK, uncleI) val uncleRepository = mock { - on { saveAll(any>()) }.thenReturn(Flux.empty()) - on { deleteAll(any>()) }.thenReturn(Mono.empty()) + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) } val contractUncleRepository = mock { - on { saveAll(any>()) }.thenReturn(Flux.empty()) - on { deleteAll(any>()) }.thenReturn(Mono.empty()) + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) } val blockDumpProcess = UncleDumpProcess(uncleRepository, contractUncleRepository, EthereumFamilyChain.ETHEREUM) @@ -94,17 +93,19 @@ class UncleDumpProcessTest { blockDumpProcess.onMessage(listOf(record1, record2, record3, record4, record5, record6, record7, record8)) - verify(uncleRepository, times(1)) - .saveAll(listOf(CqlEthereumUncle(uncleD), CqlEthereumUncle(uncleE), - CqlEthereumUncle(uncleG), CqlEthereumUncle(uncleI))) - verify(uncleRepository, times(1)) - .deleteAll(listOf(CqlEthereumUncle(uncleF), CqlEthereumUncle(uncleC))) + verify(uncleRepository, times(1)).save(CqlEthereumUncle(uncleD)) + verify(uncleRepository, times(1)).save(CqlEthereumUncle(uncleE)) + verify(uncleRepository, times(1)).save(CqlEthereumUncle(uncleG)) + verify(uncleRepository, times(1)).save(CqlEthereumUncle(uncleI)) + verify(uncleRepository, times(1)).delete(CqlEthereumUncle(uncleF)) + verify(uncleRepository, times(1)).delete(CqlEthereumUncle(uncleC)) - verify(contractUncleRepository, times(1)) - .saveAll(listOf(CqlEthereumContractMinedUncle(uncleD), CqlEthereumContractMinedUncle(uncleE), - CqlEthereumContractMinedUncle(uncleG), CqlEthereumContractMinedUncle(uncleI))) - verify(contractUncleRepository, times(1)) - .deleteAll(listOf(CqlEthereumContractMinedUncle(uncleF), CqlEthereumContractMinedUncle(uncleC))) + verify(contractUncleRepository, times(1)).save(CqlEthereumContractMinedUncle(uncleD)) + verify(contractUncleRepository, times(1)).save(CqlEthereumContractMinedUncle(uncleE)) + verify(contractUncleRepository, times(1)).save(CqlEthereumContractMinedUncle(uncleG)) + verify(contractUncleRepository, times(1)).save(CqlEthereumContractMinedUncle(uncleI)) + verify(contractUncleRepository, times(1)).delete(CqlEthereumContractMinedUncle(uncleF)) + verify(contractUncleRepository, times(1)).delete(CqlEthereumContractMinedUncle(uncleC)) } diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/converter/JsonRpcToDaoBitcoinTxConverter.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/converter/JsonRpcToDaoBitcoinTxConverter.kt index d4bca49b..d168303c 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/converter/JsonRpcToDaoBitcoinTxConverter.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/converter/JsonRpcToDaoBitcoinTxConverter.kt @@ -133,12 +133,13 @@ class JsonRpcToDaoBitcoinTxConverter { fun convertToDaoTransactionInput(txIns: List, outputsByIds: Map, BitcoinCacheTxOutput>): List { - return txIns.map { (txid, vout, scriptSig) -> - log.trace("looking for $txid transaction and output $vout") - val daoTxOut = outputsByIds[txid to vout]!! + return txIns.map { txIn -> + log.trace("looking for ${txIn.txid} transaction and output ${txIn.vout}") + val daoTxOut = outputsByIds[txIn.txid to txIn.vout]!! BitcoinTxIn( contracts = daoTxOut.addresses.map { a -> a.toSearchHashFormat() }, amount = daoTxOut.value, - asm = scriptSig.asm, txHash = txid.toSearchHashFormat(), txOut = vout + scriptSig = txIn.scriptSig, txHash = txIn.txid.toSearchHashFormat(), txOut = txIn.vout, + txinwitness = txIn.txinwitness ) } } diff --git a/pumps/bitcoin/src/test/kotlin/fund/cyber/pump/bitcoin/client/BtcdToDaoTransactionConverterTest.kt b/pumps/bitcoin/src/test/kotlin/fund/cyber/pump/bitcoin/client/BtcdToDaoTransactionConverterTest.kt index d56c28f0..af42c823 100644 --- a/pumps/bitcoin/src/test/kotlin/fund/cyber/pump/bitcoin/client/BtcdToDaoTransactionConverterTest.kt +++ b/pumps/bitcoin/src/test/kotlin/fund/cyber/pump/bitcoin/client/BtcdToDaoTransactionConverterTest.kt @@ -11,6 +11,7 @@ import fund.cyber.search.model.bitcoin.BitcoinTx import fund.cyber.search.model.bitcoin.BitcoinTxIn import fund.cyber.search.model.bitcoin.BitcoinTxOut import fund.cyber.search.model.bitcoin.JsonRpcBitcoinBlock +import fund.cyber.search.model.bitcoin.SignatureScript import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test @@ -40,12 +41,14 @@ val expectedDaoCoinbaseTx = BitcoinTx( /*---------------Regular transaction-----------------------------------------------------*/ val expectedFirstTxInput = BitcoinTxIn( - contracts = listOf("1HWqMzw1jfpXb3xyuUZ4uWXY4tqL2cW47J"), amount = BigDecimal("0.1"), asm = "3046", + contracts = listOf("1HWqMzw1jfpXb3xyuUZ4uWXY4tqL2cW47J"), amount = BigDecimal("0.1"), + scriptSig = SignatureScript("3046", "493046022100c352d3dd993a981beba4a63ad15c209275ca9470abfcd57da93b58e4eb5dce82022100840792bc1f456062819f15d33ee7055cf7b5ee1af1ebcc6028d9cdb1c3af7748014104f46db5e9d61a9dc27b8d64ad23e7383a4e6ca164593c2527c038c0857eb67ee8e825dca65046b82c9331586c82e0fd1f633f25f87c161bc6f8a630121df2b3d3"), txHash = "83a157f3fd88ac7907c05fc55e271dc4acdc5605d187d646604ca8c0e9382e03", txOut = 1 ) val expectedSecondTxInput = BitcoinTxIn( - contracts = listOf("1HWqMzw1jfpXb3xyuUZ4uWXY4tqL2cW47J"), amount = BigDecimal("50"), asm = "3046", + contracts = listOf("1HWqMzw1jfpXb3xyuUZ4uWXY4tqL2cW47J"), amount = BigDecimal("50"), + scriptSig = SignatureScript("3046", "493046022100c352d3dd993a981beba4a63ad15c209275ca9470abfcd57da93b58e4eb5dce82022100840792bc1f456062819f15d33ee7055cf7b5ee1af1ebcc6028d9cdb1c3af7748014104f46db5e9d61a9dc27b8d64ad23e7383a4e6ca164593c2527c038c0857eb67ee8e825dca65046b82c9331586c82e0fd1f633f25f87c161bc6f8a630121df2b3d3"), txHash = "8c14f0db3df150123e6f3dbbf30f8b955a8249b62ac1d1ff16284aefa3d06d87", txOut = 0 ) diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/ParityToEthereumBundleConverter.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/ParityToEthereumBundleConverter.kt index a9bb7dc2..1517b956 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/ParityToEthereumBundleConverter.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/ParityToEthereumBundleConverter.kt @@ -43,7 +43,7 @@ class ParityToEthereumBundleConverter( fun parityMempoolTxToDao(parityTx: Transaction): EthereumTx { return EthereumTx( - from = parityTx.from.toSearchHashFormat(), to = parityTx.to.toSearchHashFormat(), + from = parityTx.from.toSearchHashFormat(), to = parityTx.to?.toSearchHashFormat(), nonce = parityTx.nonce.toLong(), error = null, value = BigDecimal(parityTx.value) * weiToEthRate, hash = parityTx.hash.toSearchHashFormat(), blockHash = null, From d7fa9680ba9f07abc79ffdcbdf0522dbfb32272a Mon Sep 17 00:00:00 2001 From: Artur Albov Date: Sat, 26 May 2018 16:53:48 +0300 Subject: [PATCH 2/7] #173 API to return BTC mempool transactions with BTC contract summary --- .../src/main/kotlin/fund/cyber/api/bitcoin/dto/Contract.kt | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 search-api/src/main/kotlin/fund/cyber/api/bitcoin/dto/Contract.kt diff --git a/search-api/src/main/kotlin/fund/cyber/api/bitcoin/dto/Contract.kt b/search-api/src/main/kotlin/fund/cyber/api/bitcoin/dto/Contract.kt new file mode 100644 index 00000000..1f32bc33 --- /dev/null +++ b/search-api/src/main/kotlin/fund/cyber/api/bitcoin/dto/Contract.kt @@ -0,0 +1,2 @@ +package fund.cyber.api.bitcoin.dto + From 8b7ecc2c812798607b8c2db4633e604d7b5e371f Mon Sep 17 00:00:00 2001 From: Artur Albov Date: Sat, 26 May 2018 16:53:56 +0300 Subject: [PATCH 3/7] #173 API to return BTC mempool transactions with BTC contract summary --- .../repository/ContractSummaryRepositories.kt | 5 ++- .../cyber/api/bitcoin/ContractHandlers.kt | 17 ++++++- .../fund/cyber/api/bitcoin/dto/Contract.kt | 45 +++++++++++++++++++ .../cyber/api/ethereum/ContractHandlers.kt | 2 - 4 files changed, 64 insertions(+), 5 deletions(-) diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/ContractSummaryRepositories.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/ContractSummaryRepositories.kt index 6b549820..20dd010e 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/ContractSummaryRepositories.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/repository/ContractSummaryRepositories.kt @@ -8,6 +8,7 @@ import org.springframework.data.cassandra.repository.CassandraRepository import org.springframework.data.domain.Pageable import org.springframework.data.domain.Slice import org.springframework.data.repository.reactive.ReactiveCrudRepository +import reactor.core.publisher.Flux interface BitcoinContractSummaryRepository : ReactiveCrudRepository @@ -18,7 +19,9 @@ interface PageableBitcoinContractMinedBlockRepository: CassandraRepository } -interface BitcoinContractTxRepository : ReactiveCrudRepository +interface BitcoinContractTxRepository : ReactiveCrudRepository { + fun findAllByContractHashAndBlockTime(contractHash: String, blockTime: Long): Flux +} interface PageableBitcoinContractTxRepository : CassandraRepository { fun findAllByContractHash(contractHash: String, page: Pageable): Slice diff --git a/search-api/src/main/kotlin/fund/cyber/api/bitcoin/ContractHandlers.kt b/search-api/src/main/kotlin/fund/cyber/api/bitcoin/ContractHandlers.kt index 18166112..9df80aba 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/bitcoin/ContractHandlers.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/bitcoin/ContractHandlers.kt @@ -1,10 +1,11 @@ package fund.cyber.api.bitcoin +import fund.cyber.api.bitcoin.dto.ContractSummaryDto import fund.cyber.api.common.asSingleRouterFunction import fund.cyber.api.bitcoin.functions.ContractBlocksByHash import fund.cyber.api.bitcoin.functions.ContractTxesByHash -import fund.cyber.cassandra.bitcoin.model.CqlBitcoinContractSummary import fund.cyber.cassandra.bitcoin.repository.BitcoinContractSummaryRepository +import fund.cyber.cassandra.bitcoin.repository.BitcoinContractTxRepository import fund.cyber.cassandra.bitcoin.repository.PageableBitcoinContractMinedBlockRepository import fund.cyber.cassandra.bitcoin.repository.PageableBitcoinContractTxRepository import fund.cyber.cassandra.configuration.REPOSITORY_NAME_DELIMETER @@ -34,10 +35,22 @@ class BitcoinContractHandlersConfiguration { val repository = applicationContext.getBean(beanName, BitcoinContractSummaryRepository::class.java) + //todo: make variables with repositories names and common method to get bean name + val contractTxRepositoryBeanName = "$chainName${REPOSITORY_NAME_DELIMETER}contractTxRepository" + val contractTxRepository = applicationContext + .getBean(contractTxRepositoryBeanName, BitcoinContractTxRepository::class.java) + val blockByNumber = HandlerFunction { request -> val contractHash = request.pathVariable("hash") + val contract = repository.findById(contractHash) - ServerResponse.ok().body(contract, CqlBitcoinContractSummary::class.java) + val contractUnconfirmedTxes = contractTxRepository + .findAllByContractHashAndBlockTime(contractHash, -1) + + val result = contract.zipWith(contractUnconfirmedTxes.collectList()) { contr, txes -> + ContractSummaryDto(contr, txes) + } + ServerResponse.ok().body(result, ContractSummaryDto::class.java) } RouterFunctions.route(path("/${chainName.toLowerCase()}/contract/{hash}"), blockByNumber) }.asSingleRouterFunction() diff --git a/search-api/src/main/kotlin/fund/cyber/api/bitcoin/dto/Contract.kt b/search-api/src/main/kotlin/fund/cyber/api/bitcoin/dto/Contract.kt index 1f32bc33..e217f119 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/bitcoin/dto/Contract.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/bitcoin/dto/Contract.kt @@ -1,2 +1,47 @@ package fund.cyber.api.bitcoin.dto +import fund.cyber.cassandra.bitcoin.model.CqlBitcoinContractSummary +import fund.cyber.cassandra.bitcoin.model.CqlBitcoinContractTxPreview +import fund.cyber.cassandra.bitcoin.model.CqlBitcoinTxPreviewIO +import java.math.BigDecimal +import java.time.Instant + +data class ContractTxIODto( + val contracts: List, + val amount: BigDecimal +) { + + constructor(io: CqlBitcoinTxPreviewIO) : this( + contracts = io.contracts, amount = io.amount + ) +} + +data class ContractTxSummaryDto( + val fee: BigDecimal, + val ins: List, + val outs: List +) { + + constructor(contractTx: CqlBitcoinContractTxPreview) : this( + fee = contractTx.fee, ins = contractTx.ins.map { i -> ContractTxIODto(i) }, + outs = contractTx.outs.map { o -> ContractTxIODto(o) } + ) +} + +data class ContractSummaryDto( + val hash: String, + val confirmedBalance: String, + val confirmedTotalReceived: String, + val confirmedTxNumber: Int, + val firstActivityDate: Instant, + val lastActivityDate: Instant, + val unconfirmedTxValues: Map = emptyMap() +) { + + constructor(contract: CqlBitcoinContractSummary, txes: List) : this( + hash = contract.hash, confirmedBalance = contract.confirmedBalance, + confirmedTotalReceived = contract.confirmedTotalReceived, confirmedTxNumber = contract.confirmedTxNumber, + firstActivityDate = contract.firstActivityDate, lastActivityDate = contract.lastActivityDate, + unconfirmedTxValues = txes.map { contractTx -> contractTx.hash to ContractTxSummaryDto(contractTx) }.toMap() + ) +} diff --git a/search-api/src/main/kotlin/fund/cyber/api/ethereum/ContractHandlers.kt b/search-api/src/main/kotlin/fund/cyber/api/ethereum/ContractHandlers.kt index 78fb4700..4ce6963b 100644 --- a/search-api/src/main/kotlin/fund/cyber/api/ethereum/ContractHandlers.kt +++ b/search-api/src/main/kotlin/fund/cyber/api/ethereum/ContractHandlers.kt @@ -28,8 +28,6 @@ class EthereumContractHandlersConfiguration { @Autowired private lateinit var applicationContext: GenericApplicationContext -//0.722901489983285700 -//0.722901489982963896 @Bean fun ethereumContractById(): RouterFunction { From b3d792220324743e6aa0bbb084212640e4d0aea6 Mon Sep 17 00:00:00 2001 From: Artur Albov Date: Sun, 27 May 2018 22:48:33 +0300 Subject: [PATCH 4/7] #172 Setup bitcoin mempool transaction dump. Some refactoring --- .../cyber/dump/bitcoin/BlockDumpProcess.kt | 9 +++++---- .../fund/cyber/dump/bitcoin/TxDumpProcess.kt | 9 +++++---- .../kotlin/fund/cyber/dump/common/Functions.kt | 10 ++++------ .../fund/cyber/dump/common/FuntionsTest.kt | 16 ++++++++-------- .../cyber/dump/ethereum/BlockDumpProcess.kt | 9 +++++---- .../fund/cyber/dump/ethereum/TxDumpProcess.kt | 9 +++++---- .../cyber/dump/ethereum/UncleDumpProcess.kt | 9 +++++---- .../client/EthereumBlockchainInterface.kt | 1 + .../client/EthereumClientConfiguration.kt | 18 ------------------ 9 files changed, 38 insertions(+), 52 deletions(-) diff --git a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt index a1c0a12a..12375b85 100644 --- a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt +++ b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcess.kt @@ -4,7 +4,8 @@ import fund.cyber.cassandra.bitcoin.model.CqlBitcoinContractMinedBlock import fund.cyber.cassandra.bitcoin.model.CqlBitcoinBlock import fund.cyber.cassandra.bitcoin.repository.BitcoinContractMinedBlockRepository import fund.cyber.cassandra.bitcoin.repository.BitcoinBlockRepository -import fund.cyber.dump.common.executeOperations +import fund.cyber.dump.common.execute +import fund.cyber.dump.common.toFluxBatch import fund.cyber.search.model.bitcoin.BitcoinBlock import fund.cyber.search.model.chains.BitcoinFamilyChain import fund.cyber.search.model.events.PumpEvent @@ -28,13 +29,13 @@ class BlockDumpProcess( log.info("Dumping batch of ${records.size} $chain blocks from offset ${records.first().offset()}") - records.executeOperations { event, block -> - return@executeOperations when (event) { + records.toFluxBatch { event, block -> + return@toFluxBatch when (event) { PumpEvent.NEW_BLOCK -> block.toNewBlockPublisher() PumpEvent.NEW_POOL_TX -> Mono.empty() PumpEvent.DROPPED_BLOCK -> block.toDropBlockPublisher() } - } + }.execute() } private fun BitcoinBlock.toNewBlockPublisher(): Publisher { diff --git a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt index f921942c..c9bbcccc 100644 --- a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt +++ b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt @@ -6,7 +6,8 @@ import fund.cyber.cassandra.bitcoin.model.CqlBitcoinContractTxPreview import fund.cyber.cassandra.bitcoin.repository.BitcoinContractTxRepository import fund.cyber.cassandra.bitcoin.repository.BitcoinBlockTxRepository import fund.cyber.cassandra.bitcoin.repository.BitcoinTxRepository -import fund.cyber.dump.common.executeOperations +import fund.cyber.dump.common.execute +import fund.cyber.dump.common.toFluxBatch import fund.cyber.search.model.bitcoin.BitcoinTx import fund.cyber.search.model.chains.BitcoinFamilyChain import fund.cyber.search.model.events.PumpEvent @@ -31,13 +32,13 @@ class TxDumpProcess( log.info("Dumping batch of ${records.size} $chain transactions from offset ${records.first().offset()}") - records.executeOperations { event, tx -> - return@executeOperations when (event) { + records.toFluxBatch { event, tx -> + return@toFluxBatch when (event) { PumpEvent.NEW_BLOCK -> tx.toNewBlockPublisher() PumpEvent.NEW_POOL_TX -> tx.toNewPoolItemPublisher() PumpEvent.DROPPED_BLOCK -> tx.toDropBlockPublisher() } - } + }.execute() } diff --git a/dumps/common/src/main/kotlin/fund/cyber/dump/common/Functions.kt b/dumps/common/src/main/kotlin/fund/cyber/dump/common/Functions.kt index 523ccb73..f760657b 100644 --- a/dumps/common/src/main/kotlin/fund/cyber/dump/common/Functions.kt +++ b/dumps/common/src/main/kotlin/fund/cyber/dump/common/Functions.kt @@ -5,17 +5,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.reactivestreams.Publisher import reactor.core.publisher.Flux -fun List>.executeOperations( - convertToOperations: (PumpEvent, RECORD) -> Publisher<*> -) { - val fluxesToExecute = this.compileOperations(convertToOperations) - fluxesToExecute.forEach { flux -> +fun List>.execute() { + + this.forEach { flux -> flux.collectList().block() } } -fun List>.compileOperations( +fun List>.toFluxBatch( convertToOperations: (PumpEvent, RECORD) -> Publisher<*> ): List> { diff --git a/dumps/common/src/test/kotlin/fund/cyber/dump/common/FuntionsTest.kt b/dumps/common/src/test/kotlin/fund/cyber/dump/common/FuntionsTest.kt index 24c2779a..2661404d 100644 --- a/dumps/common/src/test/kotlin/fund/cyber/dump/common/FuntionsTest.kt +++ b/dumps/common/src/test/kotlin/fund/cyber/dump/common/FuntionsTest.kt @@ -27,8 +27,8 @@ class FunctionsTest { testRecord(PumpEvent.NEW_BLOCK) ) - val fluxesToExecute = records.compileOperations { event, _ -> - return@compileOperations when (event) { + val fluxesToExecute = records.toFluxBatch { event, _ -> + return@toFluxBatch when (event) { PumpEvent.NEW_BLOCK -> Flux.fromIterable(listOf("a1", "a2", "a3")) PumpEvent.NEW_POOL_TX -> Flux.fromIterable(listOf("b1", "b2")) PumpEvent.DROPPED_BLOCK -> Flux.fromIterable(listOf("c1", "c2", "c3")) @@ -66,8 +66,8 @@ class FunctionsTest { fun compileOperationsWithNoRecordsTest() { val records = emptyList>() - val fluxesToExecute = records.compileOperations { event, _ -> - return@compileOperations when (event) { + val fluxesToExecute = records.toFluxBatch { event, _ -> + return@toFluxBatch when (event) { PumpEvent.NEW_BLOCK -> Flux.fromIterable(listOf("a1", "a2", "a3")) PumpEvent.NEW_POOL_TX -> Flux.fromIterable(listOf("b1", "b2")) PumpEvent.DROPPED_BLOCK -> Flux.fromIterable(listOf("c1", "c2", "c3")) @@ -87,8 +87,8 @@ class FunctionsTest { testRecord(PumpEvent.NEW_BLOCK) ) - val fluxesToExecute = records.compileOperations { event, _ -> - return@compileOperations when (event) { + val fluxesToExecute = records.toFluxBatch { event, _ -> + return@toFluxBatch when (event) { PumpEvent.NEW_BLOCK -> Flux.empty() PumpEvent.NEW_POOL_TX -> Flux.empty() PumpEvent.DROPPED_BLOCK -> Flux.empty() @@ -118,8 +118,8 @@ class FunctionsTest { testRecord(PumpEvent.NEW_BLOCK) ) - val fluxesToExecute = records.compileOperations { event, _ -> - return@compileOperations when (event) { + val fluxesToExecute = records.toFluxBatch { event, _ -> + return@toFluxBatch when (event) { PumpEvent.NEW_BLOCK -> Flux.fromIterable(listOf("a1", "a2", "a3")) PumpEvent.NEW_POOL_TX -> Mono.just("b1") PumpEvent.DROPPED_BLOCK -> Flux.fromIterable(listOf("c1", "c2", "c3")) diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt index 2f5cceee..1abf4d28 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/BlockDumpProcess.kt @@ -4,7 +4,8 @@ import fund.cyber.cassandra.ethereum.model.CqlEthereumContractMinedBlock import fund.cyber.cassandra.ethereum.model.CqlEthereumBlock import fund.cyber.cassandra.ethereum.repository.EthereumContractMinedBlockRepository import fund.cyber.cassandra.ethereum.repository.EthereumBlockRepository -import fund.cyber.dump.common.executeOperations +import fund.cyber.dump.common.execute +import fund.cyber.dump.common.toFluxBatch import fund.cyber.search.model.chains.EthereumFamilyChain import fund.cyber.search.model.ethereum.EthereumBlock import fund.cyber.search.model.events.PumpEvent @@ -27,13 +28,13 @@ class BlockDumpProcess( log.info("Dumping batch of ${records.size} $chain blocks from offset ${records.first().offset()}") - records.executeOperations { event, block -> - return@executeOperations when (event) { + records.toFluxBatch { event, block -> + return@toFluxBatch when (event) { PumpEvent.NEW_BLOCK -> block.toNewBlockPublisher() PumpEvent.NEW_POOL_TX -> Mono.empty() PumpEvent.DROPPED_BLOCK -> block.toDropBlockPublisher() } - } + }.execute() } private fun EthereumBlock.toNewBlockPublisher(): Publisher { diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt index f583bad1..5fca132a 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt @@ -6,7 +6,8 @@ import fund.cyber.cassandra.ethereum.model.CqlEthereumTx import fund.cyber.cassandra.ethereum.repository.EthereumBlockTxRepository import fund.cyber.cassandra.ethereum.repository.EthereumContractTxRepository import fund.cyber.cassandra.ethereum.repository.EthereumTxRepository -import fund.cyber.dump.common.executeOperations +import fund.cyber.dump.common.execute +import fund.cyber.dump.common.toFluxBatch import fund.cyber.search.model.chains.EthereumFamilyChain import fund.cyber.search.model.ethereum.EthereumTx import fund.cyber.search.model.events.PumpEvent @@ -32,13 +33,13 @@ class TxDumpProcess( log.info("Dumping batch of ${records.size} $chain transactions from offset ${records.first().offset()}") - records.executeOperations { event, tx -> - return@executeOperations when (event) { + records.toFluxBatch { event, tx -> + return@toFluxBatch when (event) { PumpEvent.NEW_BLOCK -> tx.toNewBlockPublisher() PumpEvent.NEW_POOL_TX -> tx.toNewPoolItemPublisher() PumpEvent.DROPPED_BLOCK -> tx.toDropBlockPublisher() } - } + }.execute() } diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt index 9329e606..06c27644 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt @@ -4,7 +4,8 @@ import fund.cyber.cassandra.ethereum.model.CqlEthereumContractMinedUncle import fund.cyber.cassandra.ethereum.model.CqlEthereumUncle import fund.cyber.cassandra.ethereum.repository.EthereumUncleRepository import fund.cyber.cassandra.ethereum.repository.EthereumContractUncleRepository -import fund.cyber.dump.common.executeOperations +import fund.cyber.dump.common.execute +import fund.cyber.dump.common.toFluxBatch import fund.cyber.search.model.chains.EthereumFamilyChain import fund.cyber.search.model.ethereum.EthereumUncle import fund.cyber.search.model.events.PumpEvent @@ -27,13 +28,13 @@ class UncleDumpProcess( log.info("Dumping batch of ${records.size} $chain uncles from offset ${records.first().offset()}") - records.executeOperations { event, uncle -> - return@executeOperations when (event) { + records.toFluxBatch { event, uncle -> + return@toFluxBatch when (event) { PumpEvent.NEW_BLOCK -> uncle.toNewBlockPublisher() PumpEvent.NEW_POOL_TX -> Mono.empty() PumpEvent.DROPPED_BLOCK -> uncle.toDropBlockPublisher() } - } + }.execute() } private fun EthereumUncle.toNewBlockPublisher(): Publisher { diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumBlockchainInterface.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumBlockchainInterface.kt index cfddbdc8..669d1986 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumBlockchainInterface.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumBlockchainInterface.kt @@ -69,6 +69,7 @@ class EthereumBlockchainInterface( parityClient .pendingTransactionObservable() .map { e -> parityToBundleConverter.parityMempoolTxToDao(e) } + .toBlocking() .subscribe( { v -> emitter.onNext(v) }, { e -> emitter.onError(e)}, { emitter.onComplete() }) }, BackpressureStrategy.BUFFER) diff --git a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumClientConfiguration.kt b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumClientConfiguration.kt index 08f16122..e978aa22 100644 --- a/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumClientConfiguration.kt +++ b/pumps/ethereum/src/main/kotlin/fund/cyber/pump/ethereum/client/EthereumClientConfiguration.kt @@ -1,17 +1,12 @@ package fund.cyber.pump.ethereum.client import fund.cyber.search.model.chains.ChainInfo -import org.apache.http.impl.client.HttpClients -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager -import org.apache.http.message.BasicHeader import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.web3j.protocol.http.HttpService import org.web3j.protocol.parity.Parity -const val MAX_PER_ROUTE = 16 -const val MAX_TOTAL = 32 @Configuration class EthereumClientConfiguration { @@ -19,19 +14,6 @@ class EthereumClientConfiguration { @Autowired private lateinit var chainInfo: ChainInfo - private val defaultHttpHeaders = listOf(BasicHeader("Keep-Alive", "timeout=10, max=1024")) - private val connectionManager = PoolingHttpClientConnectionManager().apply { - defaultMaxPerRoute = MAX_PER_ROUTE - maxTotal = MAX_TOTAL - } - - @Bean - fun httpClient() = HttpClients.custom() - .setConnectionManager(connectionManager) - .setConnectionManagerShared(true) - .setDefaultHeaders(defaultHttpHeaders) - .build()!! - @Bean fun parityClient() = Parity.build(HttpService(chainInfo.nodeUrl))!! } From 1da48a5e489d61cbd4da03e6b47d7562969f0816 Mon Sep 17 00:00:00 2001 From: Artur Albov Date: Sun, 27 May 2018 23:45:19 +0300 Subject: [PATCH 5/7] #172 Setup bitcoin mempool transaction dump. Some refactoring --- .../main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt | 8 ++++---- .../main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt index c9bbcccc..8cbf81bc 100644 --- a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt +++ b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt @@ -51,15 +51,15 @@ class TxDumpProcess( val saveBlockTxMono = blockTxRepository.save(CqlBitcoinBlockTxPreview(this)) - val entitiesToDelete = this.allContractsUsedInTransaction().toSet() + val contractTxesToDelete = this.allContractsUsedInTransaction().toSet() .map { it -> CqlBitcoinContractTxPreview(it, this.mempoolState()) } - val entitiesToSave = this.allContractsUsedInTransaction().toSet() + val contractTxesToSave = this.allContractsUsedInTransaction().toSet() .map { it -> CqlBitcoinContractTxPreview(it, this) } val saveContractTxesFlux = Flux.concat( - contractTxRepository.saveAll(entitiesToSave), - contractTxRepository.deleteAll(entitiesToDelete) + contractTxRepository.saveAll(contractTxesToSave), + contractTxRepository.deleteAll(contractTxesToDelete) ) return Flux.concat(saveTxMono, saveBlockTxMono, saveContractTxesFlux) diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt index 5fca132a..6ca26e71 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt @@ -52,15 +52,15 @@ class TxDumpProcess( val saveBlockTxMono = blockTxRepository.save(CqlEthereumBlockTxPreview(this)) - val entitiesToDelete = this.contractsUsedInTransaction().toSet() + val contractTxesToDelete = this.contractsUsedInTransaction().toSet() .map { it -> CqlEthereumContractTxPreview(this.mempoolState(), it) } - val entitiesToSave = this.contractsUsedInTransaction().toSet() + val contractTxesToSave = this.contractsUsedInTransaction().toSet() .map { it -> CqlEthereumContractTxPreview(this, it) } val saveContractTxesFlux = Flux.concat( - contractTxRepository.saveAll(entitiesToSave), - contractTxRepository.deleteAll(entitiesToDelete) + contractTxRepository.saveAll(contractTxesToSave), + contractTxRepository.deleteAll(contractTxesToDelete) ) return Flux.concat(saveTxMono, saveBlockTxMono, saveContractTxesFlux) From 80b249f283d284f1e108637b8963757d2970fca1 Mon Sep 17 00:00:00 2001 From: Artur Albov Date: Mon, 28 May 2018 13:32:50 +0300 Subject: [PATCH 6/7] #172 Setup bitcoin mempool transaction dump. Tests --- .../BitcoinRepositoryConfiguration.kt | 9 +- .../fund/cyber/search/model/bitcoin/Block.kt | 1 + .../fund/cyber/dump/bitcoin/TxDumpProcess.kt | 10 +- .../dump/bitcoin/BlockDumpProcessTest.kt | 99 +++++ .../cyber/dump/bitcoin/TxDumpProcessTest.kt | 360 ++++++++++++++++++ .../fund/cyber/dump/ethereum/TxDumpProcess.kt | 12 +- .../cyber/dump/ethereum/UncleDumpProcess.kt | 5 +- .../dump/ethereum/BlockDumpProcessTest.kt | 144 ++----- .../cyber/dump/ethereum/TxDumpProcessTest.kt | 355 ++++++++++++----- .../dump/ethereum/UncleDumpProcessTest.kt | 80 ++-- .../JsonRpcToDaoBitcoinBlockConverter.kt | 19 +- 11 files changed, 820 insertions(+), 274 deletions(-) create mode 100644 dumps/bitcoin/src/test/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcessTest.kt create mode 100644 dumps/bitcoin/src/test/kotlin/fund/cyber/dump/bitcoin/TxDumpProcessTest.kt diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/configuration/BitcoinRepositoryConfiguration.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/configuration/BitcoinRepositoryConfiguration.kt index e3e7f9a0..93915e77 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/configuration/BitcoinRepositoryConfiguration.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/configuration/BitcoinRepositoryConfiguration.kt @@ -4,6 +4,7 @@ import com.datastax.driver.core.Cluster import com.datastax.driver.extras.codecs.jdk8.InstantCodec import fund.cyber.cassandra.bitcoin.repository.BitcoinContractSummaryRepository import fund.cyber.cassandra.bitcoin.repository.BitcoinBlockRepository +import fund.cyber.cassandra.bitcoin.repository.BitcoinContractTxRepository import fund.cyber.cassandra.bitcoin.repository.BitcoinTxRepository import fund.cyber.cassandra.bitcoin.repository.PageableBitcoinContractMinedBlockRepository import fund.cyber.cassandra.bitcoin.repository.PageableBitcoinContractTxRepository @@ -162,7 +163,9 @@ class BitcoinRepositoriesConfiguration : InitializingBean { val contractRepository = reactiveRepositoryFactory .getRepository(BitcoinContractSummaryRepository::class.java) - val contractTxRepository = repositoryFactory + val contractTxRepository = reactiveRepositoryFactory + .getRepository(BitcoinContractTxRepository::class.java) + val pageableContractTxRepository = repositoryFactory .getRepository(PageableBitcoinContractTxRepository::class.java) val contractBlockRepository = repositoryFactory .getRepository(PageableBitcoinContractMinedBlockRepository::class.java) @@ -176,8 +179,10 @@ class BitcoinRepositoriesConfiguration : InitializingBean { beanFactory.registerSingleton("${repositoryPrefix}txRepository", txRepository) beanFactory.registerSingleton("${repositoryPrefix}contractRepository", contractRepository) + beanFactory.registerSingleton("${repositoryPrefix}contractTxRepository", + contractTxRepository) beanFactory.registerSingleton("${repositoryPrefix}pageableContractTxRepository", - contractTxRepository) + pageableContractTxRepository) beanFactory.registerSingleton("${repositoryPrefix}pageableContractBlockRepository", contractBlockRepository) } diff --git a/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Block.kt b/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Block.kt index 2d7bd0b2..80a95a78 100644 --- a/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Block.kt +++ b/common/src/main/kotlin/fund/cyber/search/model/bitcoin/Block.kt @@ -9,6 +9,7 @@ import java.time.Instant data class BitcoinBlock( val height: Long, val hash: String, + val parentHash: String, val minerContractHash: String, val blockReward: BigDecimal, val txFees: BigDecimal, diff --git a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt index 8cbf81bc..9e085d6b 100644 --- a/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt +++ b/dumps/bitcoin/src/main/kotlin/fund/cyber/dump/bitcoin/TxDumpProcess.kt @@ -16,6 +16,7 @@ import org.reactivestreams.Publisher import org.slf4j.LoggerFactory import org.springframework.kafka.listener.BatchMessageListener import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.core.publisher.toFlux @@ -46,7 +47,7 @@ class TxDumpProcess( val saveTxMono = txRepository.findById(this.hash) .flatMap { cqlTx -> txRepository.save(CqlBitcoinTx(this.copy(firstSeenTime = cqlTx.firstSeenTime))) } - .switchIfEmpty(txRepository.save(CqlBitcoinTx(this))) + .switchIfEmpty(Mono.defer { txRepository.save(CqlBitcoinTx(this)) }) val saveBlockTxMono = blockTxRepository.save(CqlBitcoinBlockTxPreview(this)) @@ -71,7 +72,7 @@ class TxDumpProcess( .flatMap { cqlTx -> txRepository.save(CqlBitcoinTx(this.mempoolState().copy(firstSeenTime = cqlTx.firstSeenTime))) } - .switchIfEmpty(txRepository.save(CqlBitcoinTx(this.mempoolState()))) + .switchIfEmpty(Mono.defer { txRepository.save(CqlBitcoinTx(this.mempoolState())) }) val deleteBlockTxMono = blockTxRepository.delete(CqlBitcoinBlockTxPreview(this)) @@ -91,7 +92,10 @@ class TxDumpProcess( .map { it -> it as Any } // hack to convert Mono to Any type .toFlux() .switchIfEmpty( - Flux.concat(txRepository.save(CqlBitcoinTx(this)), contractTxRepository.saveAll(contractTxesToSave)) + Flux.concat( + Mono.defer { txRepository.save(CqlBitcoinTx(this)) }, + Flux.defer { contractTxRepository.saveAll(contractTxesToSave) } + ) ) } diff --git a/dumps/bitcoin/src/test/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcessTest.kt b/dumps/bitcoin/src/test/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcessTest.kt new file mode 100644 index 00000000..8b501dd2 --- /dev/null +++ b/dumps/bitcoin/src/test/kotlin/fund/cyber/dump/bitcoin/BlockDumpProcessTest.kt @@ -0,0 +1,99 @@ +package fund.cyber.dump.bitcoin + +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.times +import com.nhaarman.mockito_kotlin.verify +import fund.cyber.cassandra.bitcoin.model.CqlBitcoinBlock +import fund.cyber.cassandra.bitcoin.model.CqlBitcoinContractMinedBlock +import fund.cyber.cassandra.bitcoin.repository.BitcoinBlockRepository +import fund.cyber.cassandra.bitcoin.repository.BitcoinContractMinedBlockRepository +import fund.cyber.search.model.bitcoin.BitcoinBlock +import fund.cyber.search.model.chains.BitcoinFamilyChain +import fund.cyber.search.model.events.PumpEvent +import fund.cyber.search.model.events.blockPumpTopic +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.junit.jupiter.api.Test +import reactor.core.publisher.Mono +import java.math.BigDecimal +import java.math.BigInteger +import java.time.Instant + +class BlockDumpProcessTest { + + + // --- D --- E --- G --- I + //A --- B --- C --- F --- H + @Test + @Suppress("LongMethod") + fun testWithDroppedBlocks() { + + val blockC = block("C","B", 2, "miner3") + val blockD = block("D", "B", 2, "miner4") + val blockE = block("E", "D", 3, "miner5") + val blockF = block("F", "C", 3, "miner6") + val blockG = block("G", "E", 4, "miner7") + val blockH = block("H", "F", 4, "miner8") + val blockI = block("I", "G", 5, "miner9") + + + val record1 = record(PumpEvent.NEW_BLOCK, blockH) + val record2 = record(PumpEvent.DROPPED_BLOCK, blockH) + val record3 = record(PumpEvent.DROPPED_BLOCK, blockF) + val record4 = record(PumpEvent.DROPPED_BLOCK, blockC) + val record5 = record(PumpEvent.NEW_BLOCK, blockD) + val record6 = record(PumpEvent.NEW_BLOCK, blockE) + val record7 = record(PumpEvent.NEW_BLOCK, blockG) + val record8 = record(PumpEvent.NEW_BLOCK, blockI) + + + val blockRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + } + val contractMinedBlockRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + } + + val blockDumpProcess = BlockDumpProcess(blockRepository, contractMinedBlockRepository, + BitcoinFamilyChain.BITCOIN) + + blockDumpProcess.onMessage(listOf(record1, record2, record3, record4, record5, record6, record7, record8)) + + + verify(blockRepository, times(1)).save(CqlBitcoinBlock(blockH)) + verify(blockRepository, times(1)).save(CqlBitcoinBlock(blockD)) + verify(blockRepository, times(1)).save(CqlBitcoinBlock(blockE)) + verify(blockRepository, times(1)).save(CqlBitcoinBlock(blockG)) + verify(blockRepository, times(1)).save(CqlBitcoinBlock(blockI)) + + verify(blockRepository, times(1)).delete(CqlBitcoinBlock(blockH)) + verify(blockRepository, times(1)).delete(CqlBitcoinBlock(blockF)) + verify(blockRepository, times(1)).delete(CqlBitcoinBlock(blockC)) + + verify(contractMinedBlockRepository, times(1)).save(CqlBitcoinContractMinedBlock(blockH)) + verify(contractMinedBlockRepository, times(1)).save(CqlBitcoinContractMinedBlock(blockD)) + verify(contractMinedBlockRepository, times(1)).save(CqlBitcoinContractMinedBlock(blockE)) + verify(contractMinedBlockRepository, times(1)).save(CqlBitcoinContractMinedBlock(blockG)) + verify(contractMinedBlockRepository, times(1)).save(CqlBitcoinContractMinedBlock(blockI)) + + + verify(contractMinedBlockRepository, times(1)).delete(CqlBitcoinContractMinedBlock(blockH)) + verify(contractMinedBlockRepository, times(1)).delete(CqlBitcoinContractMinedBlock(blockF)) + verify(contractMinedBlockRepository, times(1)).delete(CqlBitcoinContractMinedBlock(blockC)) + + } + + fun block(hash: String, parentHash: String, number: Long, miner: String) = BitcoinBlock( + hash = hash, height = number, txNumber = 158, minerContractHash = miner, difficulty = BigInteger("0"), + size = 0, blockReward = BigDecimal("0"), txFees = BigDecimal("0"), time = Instant.ofEpochMilli(1000000), + coinbaseData = "0x", nonce = 0, merkleroot = "0x", version = 0, weight = 0, + totalOutputsAmount = BigDecimal.ZERO, bits = "0x", parentHash = parentHash + ) + + fun record(event: PumpEvent, block: BitcoinBlock) = + ConsumerRecord(BitcoinFamilyChain.BITCOIN.blockPumpTopic, 0, 0, event, block) + + +} diff --git a/dumps/bitcoin/src/test/kotlin/fund/cyber/dump/bitcoin/TxDumpProcessTest.kt b/dumps/bitcoin/src/test/kotlin/fund/cyber/dump/bitcoin/TxDumpProcessTest.kt new file mode 100644 index 00000000..b96d783c --- /dev/null +++ b/dumps/bitcoin/src/test/kotlin/fund/cyber/dump/bitcoin/TxDumpProcessTest.kt @@ -0,0 +1,360 @@ +package fund.cyber.dump.bitcoin + +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.never +import com.nhaarman.mockito_kotlin.times +import com.nhaarman.mockito_kotlin.verify +import fund.cyber.cassandra.bitcoin.model.CqlBitcoinBlockTxPreview +import fund.cyber.cassandra.bitcoin.model.CqlBitcoinContractTxPreview +import fund.cyber.cassandra.bitcoin.model.CqlBitcoinTx +import fund.cyber.cassandra.bitcoin.repository.BitcoinBlockTxRepository +import fund.cyber.cassandra.bitcoin.repository.BitcoinContractTxRepository +import fund.cyber.cassandra.bitcoin.repository.BitcoinTxRepository +import fund.cyber.search.model.bitcoin.BitcoinTx +import fund.cyber.search.model.bitcoin.BitcoinTxIn +import fund.cyber.search.model.bitcoin.BitcoinTxOut +import fund.cyber.search.model.bitcoin.SignatureScript +import fund.cyber.search.model.chains.BitcoinFamilyChain +import fund.cyber.search.model.events.PumpEvent +import fund.cyber.search.model.events.txPumpTopic +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.junit.jupiter.api.Test +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.math.BigDecimal +import java.time.Instant + +@Suppress("LargeClass") +class TxDumpProcessTest { + + + @Test + @Suppress("LongMethod") + fun testWithAllEventsTxs() { + + val txC = tx("C", "C") + val txD = tx("D", "D") + val txE = tx("E", "E") + val txF = tx("F", "F") + val txG = tx("G", "G") + val txH = tx("H", "H") + val txI = tx("I", "I") + val txK = tx("K", "K") + + + val record1 = record(PumpEvent.NEW_BLOCK, txH) + val record2 = record(PumpEvent.DROPPED_BLOCK, txH) + val record3 = record(PumpEvent.DROPPED_BLOCK, txF) + val record4 = record(PumpEvent.DROPPED_BLOCK, txC) + val record5 = record(PumpEvent.NEW_BLOCK, txD) + val record6 = record(PumpEvent.NEW_BLOCK, txE) + val record7 = record(PumpEvent.NEW_BLOCK, txG) + val record8 = record(PumpEvent.NEW_BLOCK, txI) + val record9 = record(PumpEvent.NEW_POOL_TX, txK) + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.empty()) + } + val blockTxRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + } + val contractTxRepository = mock { + on { saveAll(any>()) }.thenReturn(Flux.empty()) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val txDumpProcess = TxDumpProcess(txRepository, contractTxRepository, blockTxRepository, + BitcoinFamilyChain.BITCOIN) + + txDumpProcess.onMessage(listOf(record1, record2, record3, record4, record5, record6, record7, record8, record9)) + + + verify(txRepository, times(2)).findById(txH.hash) + verify(txRepository, times(1)).findById(txF.hash) + verify(txRepository, times(1)).findById(txC.hash) + verify(txRepository, times(1)).findById(txD.hash) + verify(txRepository, times(1)).findById(txE.hash) + verify(txRepository, times(1)).findById(txG.hash) + verify(txRepository, times(1)).findById(txI.hash) + verify(txRepository, times(1)).findById(txK.hash) + + verify(txRepository, times(1)).save(CqlBitcoinTx(txH)) + verify(txRepository, times(1)).save(CqlBitcoinTx(txH.mempoolState())) + verify(txRepository, times(1)).save(CqlBitcoinTx(txF.mempoolState())) + verify(txRepository, times(1)).save(CqlBitcoinTx(txC.mempoolState())) + verify(txRepository, times(1)).save(CqlBitcoinTx(txD)) + verify(txRepository, times(1)).save(CqlBitcoinTx(txE)) + verify(txRepository, times(1)).save(CqlBitcoinTx(txG)) + verify(txRepository, times(1)).save(CqlBitcoinTx(txI)) + verify(txRepository, times(1)).save(CqlBitcoinTx(txK)) + + verify(blockTxRepository, times(1)).save(CqlBitcoinBlockTxPreview(txH)) + verify(blockTxRepository, times(1)).delete(CqlBitcoinBlockTxPreview(txH)) + verify(blockTxRepository, times(1)).delete(CqlBitcoinBlockTxPreview(txF)) + verify(blockTxRepository, times(1)).delete(CqlBitcoinBlockTxPreview(txC)) + verify(blockTxRepository, times(1)).save(CqlBitcoinBlockTxPreview(txD)) + verify(blockTxRepository, times(1)).save(CqlBitcoinBlockTxPreview(txE)) + verify(blockTxRepository, times(1)).save(CqlBitcoinBlockTxPreview(txG)) + verify(blockTxRepository, times(1)).save(CqlBitcoinBlockTxPreview(txI)) + + + listOf(txH, txF, txC, txH.mempoolState(), txD.mempoolState(), txE.mempoolState(), txG.mempoolState(), txI.mempoolState()) + .forEach { tx -> + verify(contractTxRepository, times(1)) + .deleteAll(tx.allContractsUsedInTransaction().map { it -> CqlBitcoinContractTxPreview(it, tx) }) + } + + listOf(txH, txD, txE, txG, txI, txK) + .forEach { tx -> + verify(contractTxRepository, times(1)) + .saveAll(tx.allContractsUsedInTransaction().map { it -> CqlBitcoinContractTxPreview(it, tx) }) + } + } + + @Test + fun testNewBlockWithoutTxInDb() { + val txABlock = tx("A", "A") + + val txToSave = CqlBitcoinTx(txABlock) + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.just(txToSave)) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.empty()) + } + val blockTxRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + } + val contractTxRepository = mock { + on { saveAll(any>()) }.thenReturn(Flux.empty()) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.NEW_BLOCK, txABlock) + + val txDumpProcess = TxDumpProcess(txRepository, contractTxRepository, blockTxRepository, + BitcoinFamilyChain.BITCOIN) + + txDumpProcess.onMessage(listOf(record)) + + verify(txRepository, times(1)).save(txToSave) + + verify(blockTxRepository, times(1)).save(CqlBitcoinBlockTxPreview(txABlock)) + + verify(contractTxRepository, times(1)).saveAll( + txABlock.allContractsUsedInTransaction().map { it -> + CqlBitcoinContractTxPreview(it, txABlock) + } + ) + verify(contractTxRepository, times(1)).deleteAll( + txABlock.allContractsUsedInTransaction().map { it -> + CqlBitcoinContractTxPreview(it, txABlock.mempoolState()) + } + ) + } + + @Test + fun testNewBlockWithTxInDb() { + val txADb = tx("A", "A").mempoolState().copy(firstSeenTime = Instant.ofEpochMilli(999999)) + val txABlock = tx("A", "A") + + val txToSave = CqlBitcoinTx(txABlock.copy(firstSeenTime = txADb.firstSeenTime)) + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.just(txToSave)) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.just(CqlBitcoinTx(txADb))) + } + val blockTxRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + } + val contractTxRepository = mock { + on { saveAll(any>()) }.thenReturn(Flux.empty()) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.NEW_BLOCK, txABlock) + + val txDumpProcess = TxDumpProcess(txRepository, contractTxRepository, blockTxRepository, + BitcoinFamilyChain.BITCOIN) + + txDumpProcess.onMessage(listOf(record)) + + + + verify(txRepository, times(1)).save(txToSave) + verify(txRepository, never()).save(CqlBitcoinTx(txABlock)) + + verify(blockTxRepository, times(1)).save(CqlBitcoinBlockTxPreview(txABlock)) + + verify(contractTxRepository, times(1)).saveAll( + txABlock.allContractsUsedInTransaction().map { it -> + CqlBitcoinContractTxPreview(it, txABlock) + } + ) + verify(contractTxRepository, times(1)).deleteAll( + txABlock.allContractsUsedInTransaction().map { it -> + CqlBitcoinContractTxPreview(it, txABlock.mempoolState()) + } + ) + } + + @Test + fun testDropBlockWithoutTxInDb() { + val txADrop = tx("A", "A") + + val txToSave = CqlBitcoinTx(txADrop.mempoolState()) + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.just(txToSave)) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.empty()) + } + val blockTxRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + } + val contractTxRepository = mock { + on { saveAll(any>()) }.thenReturn(Flux.empty()) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.DROPPED_BLOCK, txADrop) + + val txDumpProcess = TxDumpProcess(txRepository, contractTxRepository, blockTxRepository, + BitcoinFamilyChain.BITCOIN) + + txDumpProcess.onMessage(listOf(record)) + + verify(txRepository, times(1)).save(txToSave) + + verify(blockTxRepository, times(1)).delete(CqlBitcoinBlockTxPreview(txADrop)) + + verify(contractTxRepository, times(1)).deleteAll( + txADrop.allContractsUsedInTransaction().map { it -> + CqlBitcoinContractTxPreview(it, txADrop) + } + ) + } + + @Test + fun testDropBlockWithTxInDb() { + val txADb = tx("A", "A").copy(firstSeenTime = Instant.ofEpochMilli(999999)) + val txADrop = tx("A", "B") + + val txToSave = CqlBitcoinTx(txADrop.mempoolState().copy(firstSeenTime = txADb.firstSeenTime)) + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.just(txToSave)) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.just(CqlBitcoinTx(txADb))) + } + val blockTxRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + } + val contractTxRepository = mock { + on { saveAll(any>()) }.thenReturn(Flux.empty()) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.DROPPED_BLOCK, txADrop) + + val txDumpProcess = TxDumpProcess(txRepository, contractTxRepository, blockTxRepository, + BitcoinFamilyChain.BITCOIN) + + txDumpProcess.onMessage(listOf(record)) + + verify(txRepository, times(1)).save(txToSave) + verify(txRepository, never()).save(CqlBitcoinTx(txADrop.mempoolState())) + + verify(blockTxRepository, times(1)).delete(CqlBitcoinBlockTxPreview(txADrop)) + + verify(contractTxRepository, times(1)).deleteAll( + txADrop.allContractsUsedInTransaction().map { it -> + CqlBitcoinContractTxPreview(it, txADrop) + } + ) + } + + @Test + fun testNewPoolWithoutTxInDb() { + val txAPool = tx("A", "A").mempoolState() + + val txToSave = CqlBitcoinTx(txAPool) + val contractTxesToSave = txAPool.allContractsUsedInTransaction().map { it -> + CqlBitcoinContractTxPreview(it, txAPool) + } + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.just(txToSave)) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.empty()) + } + val blockTxRepository = mock() + + val contractTxRepository = mock { + on { saveAll(any>()) } + .thenReturn(Flux.fromIterable(contractTxesToSave)) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.NEW_POOL_TX, txAPool) + + val txDumpProcess = TxDumpProcess(txRepository, contractTxRepository, blockTxRepository, + BitcoinFamilyChain.BITCOIN) + + txDumpProcess.onMessage(listOf(record)) + + verify(txRepository, times(1)).save(txToSave) + + verify(contractTxRepository, times(1)).saveAll(contractTxesToSave) + } + + @Test + fun testNewPoolWithTxInDb() { + val txADb = tx("A", "A") + val txAPool = tx("A", "A").mempoolState() + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.just(CqlBitcoinTx(txADb))) + } + val blockTxRepository = mock() + + val contractTxRepository = mock { + on { saveAll(any>()) }.thenReturn(Flux.empty()) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.NEW_POOL_TX, txAPool) + + val txDumpProcess = TxDumpProcess(txRepository, contractTxRepository, blockTxRepository, + BitcoinFamilyChain.BITCOIN) + + txDumpProcess.onMessage(listOf(record)) + + verify(txRepository, never()).save(any()) + + verify(contractTxRepository, never()).saveAll(any>()) + } + + fun tx(hash: String, blockHash: String?) = BitcoinTx( + hash = hash, blockHash = blockHash, blockNumber = 4959189, blockTime = Instant.ofEpochSecond(100000), + index = 1, firstSeenTime = Instant.ofEpochSecond(100000), + fee = BigDecimal.ZERO, size = 1, totalOutputsAmount = BigDecimal.ONE, totalInputsAmount = BigDecimal.ONE, + ins = listOf(BitcoinTxIn(listOf("a"), BigDecimal.ONE, SignatureScript("a", "a"), emptyList(), "a0", 0)), + outs = listOf(BitcoinTxOut(listOf("b"), BigDecimal.ONE, "a", 0, 1)) + ) + + fun record(event: PumpEvent, tx: BitcoinTx) = + ConsumerRecord(BitcoinFamilyChain.BITCOIN.txPumpTopic, 0, 0, event, tx) + +} diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt index 6ca26e71..9f10b250 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/TxDumpProcess.kt @@ -16,6 +16,7 @@ import org.reactivestreams.Publisher import org.slf4j.LoggerFactory import org.springframework.kafka.listener.BatchMessageListener import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.core.publisher.toFlux @@ -47,7 +48,7 @@ class TxDumpProcess( val saveTxMono = txRepository.findById(this.hash) .flatMap { cqlTx -> txRepository.save(CqlEthereumTx(this.copy(firstSeenTime = cqlTx.firstSeenTime))) } - .switchIfEmpty(txRepository.save(CqlEthereumTx(this))) + .switchIfEmpty(Mono.defer { txRepository.save(CqlEthereumTx(this)) }) val saveBlockTxMono = blockTxRepository.save(CqlEthereumBlockTxPreview(this)) @@ -72,7 +73,7 @@ class TxDumpProcess( .flatMap { cqlTx -> txRepository.save(CqlEthereumTx(this.mempoolState().copy(firstSeenTime = cqlTx.firstSeenTime))) } - .switchIfEmpty(txRepository.save(CqlEthereumTx(this.mempoolState()))) + .switchIfEmpty(Mono.defer { txRepository.save(CqlEthereumTx(this.mempoolState())) }) val deleteBlockTxMono = blockTxRepository.delete(CqlEthereumBlockTxPreview(this)) @@ -89,10 +90,13 @@ class TxDumpProcess( .map { it -> CqlEthereumContractTxPreview(this, it) } return txRepository.findById(this.hash) - .map { it -> it as Any } // hack to convert Mono to Any type + .map { it -> it as Any } // hack to convert Mono to Mono type .toFlux() .switchIfEmpty( - Flux.concat(txRepository.save(CqlEthereumTx(this)), contractTxRepository.saveAll(contractTxesToSave)) + Flux.concat( + Mono.defer { txRepository.save(CqlEthereumTx(this)) }, + Flux.defer { contractTxRepository.saveAll(contractTxesToSave) } + ) ) } diff --git a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt index 06c27644..fab19ac3 100644 --- a/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt +++ b/dumps/ethereum/src/main/kotlin/fund/cyber/dump/ethereum/UncleDumpProcess.kt @@ -14,6 +14,7 @@ import org.reactivestreams.Publisher import org.slf4j.LoggerFactory import org.springframework.kafka.listener.BatchMessageListener import reactor.core.publisher.Mono +import reactor.core.publisher.Flux class UncleDumpProcess( @@ -41,13 +42,13 @@ class UncleDumpProcess( val saveBlockMono = uncleRepository.save(CqlEthereumUncle(this)) val saveContractBlockMono = contractUncleRepository.save(CqlEthereumContractMinedUncle(this)) - return reactor.core.publisher.Flux.concat(saveBlockMono, saveContractBlockMono) + return Flux.concat(saveBlockMono, saveContractBlockMono) } private fun EthereumUncle.toDropBlockPublisher(): Publisher { val deleteBlockMono = uncleRepository.delete(CqlEthereumUncle(this)) val deleteContractBlockMono = contractUncleRepository.delete(CqlEthereumContractMinedUncle(this)) - return reactor.core.publisher.Flux.concat(deleteBlockMono, deleteContractBlockMono) + return Flux.concat(deleteBlockMono, deleteContractBlockMono) } } diff --git a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/BlockDumpProcessTest.kt b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/BlockDumpProcessTest.kt index 298b350b..41edcb80 100644 --- a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/BlockDumpProcessTest.kt +++ b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/BlockDumpProcessTest.kt @@ -1,7 +1,6 @@ package fund.cyber.dump.ethereum import com.nhaarman.mockito_kotlin.any -import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.times import com.nhaarman.mockito_kotlin.verify @@ -29,118 +28,27 @@ class BlockDumpProcessTest { @Suppress("LongMethod") fun testWithDroppedBlocks() { - val blockC = EthereumBlock( - hash = "C", number = 2, - parentHash = "B", - txNumber = 158, minerContractHash = "miner3", - difficulty = BigInteger("0"), - totalDifficulty = BigInteger("0"), size = 0, - unclesReward = BigDecimal("0"), blockReward = BigDecimal("0"), - txFees = BigDecimal("0"), gasUsed = 0, gasLimit = 0, - timestamp = Instant.now(), logsBloom = "", transactionsRoot = "", stateRoot = "", - sha3Uncles = "", - nonce = 1, receiptsRoot = "", extraData = "", uncles = emptyList() - ) - - val blockD = EthereumBlock( - hash = "D", number = 2, - parentHash = "B", - txNumber = 158, minerContractHash = "miner4", - difficulty = BigInteger("0"), - totalDifficulty = BigInteger("0"), size = 0, - unclesReward = BigDecimal("0"), blockReward = BigDecimal("0"), - txFees = BigDecimal("0"), gasUsed = 0, gasLimit = 0, - timestamp = Instant.now(), logsBloom = "", transactionsRoot = "", stateRoot = "", - sha3Uncles = "", - nonce = 1, receiptsRoot = "", extraData = "", uncles = emptyList() - ) - - val blockE = EthereumBlock( - hash = "E", number = 3, - parentHash = "D", - txNumber = 158, minerContractHash = "miner5", - difficulty = BigInteger("0"), - totalDifficulty = BigInteger("0"), size = 0, - unclesReward = BigDecimal("0"), blockReward = BigDecimal("0"), - txFees = BigDecimal("0"), gasUsed = 0, gasLimit = 0, - timestamp = Instant.now(), logsBloom = "", transactionsRoot = "", stateRoot = "", - sha3Uncles = "", - nonce = 1, receiptsRoot = "", extraData = "", uncles = emptyList() - ) - - val blockF = EthereumBlock( - hash = "F", number = 3, - parentHash = "C", - txNumber = 158, minerContractHash = "miner6", - difficulty = BigInteger("0"), - totalDifficulty = BigInteger("0"), size = 0, - unclesReward = BigDecimal("0"), blockReward = BigDecimal("0"), - txFees = BigDecimal("0"), gasUsed = 0, gasLimit = 0, - timestamp = Instant.now(), logsBloom = "", transactionsRoot = "", stateRoot = "", - sha3Uncles = "", - nonce = 1, receiptsRoot = "", extraData = "", uncles = emptyList() - ) - - val blockG = EthereumBlock( - hash = "G", number = 4, - parentHash = "E", - txNumber = 158, minerContractHash = "miner7", - difficulty = BigInteger("0"), - totalDifficulty = BigInteger("0"), size = 0, - unclesReward = BigDecimal("0"), blockReward = BigDecimal("0"), - txFees = BigDecimal("0"), gasUsed = 0, gasLimit = 0, - timestamp = Instant.now(), logsBloom = "", transactionsRoot = "", stateRoot = "", - sha3Uncles = "", - nonce = 1, receiptsRoot = "", extraData = "", uncles = emptyList() - ) - - val blockH = EthereumBlock( - hash = "H", number = 4, - parentHash = "F", - txNumber = 158, minerContractHash = "miner8", - difficulty = BigInteger("0"), - totalDifficulty = BigInteger("0"), size = 0, - unclesReward = BigDecimal("0"), blockReward = BigDecimal("0"), - txFees = BigDecimal("0"), gasUsed = 0, gasLimit = 0, - timestamp = Instant.now(), logsBloom = "", transactionsRoot = "", stateRoot = "", - sha3Uncles = "", - nonce = 1, receiptsRoot = "", extraData = "", uncles = emptyList() - ) - - val blockI = EthereumBlock( - hash = "I", number = 5, - parentHash = "G", - txNumber = 158, minerContractHash = "miner9", - difficulty = BigInteger("0"), - totalDifficulty = BigInteger("0"), size = 0, - unclesReward = BigDecimal("0"), blockReward = BigDecimal("0"), - txFees = BigDecimal("0"), gasUsed = 0, gasLimit = 0, - timestamp = Instant.now(), logsBloom = "", transactionsRoot = "", stateRoot = "", - sha3Uncles = "", - nonce = 1, receiptsRoot = "", extraData = "", uncles = emptyList() - ) - - - val record1 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.blockPumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, blockH) - val record2 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.blockPumpTopic, 0, - 0, PumpEvent.DROPPED_BLOCK, blockH) - val record3 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.blockPumpTopic, 0, - 0, PumpEvent.DROPPED_BLOCK, blockF) - val record4 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.blockPumpTopic, 0, - 0, PumpEvent.DROPPED_BLOCK, blockC) - val record5 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.blockPumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, blockD) - val record6 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.blockPumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, blockE) - val record7 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.blockPumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, blockG) - val record8 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.blockPumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, blockI) + val blockC = block("C","B", 2, "miner3") + val blockD = block("D", "B", 2, "miner4") + val blockE = block("E", "D", 3, "miner5") + val blockF = block("F", "C", 3, "miner6") + val blockG = block("G", "E", 4, "miner7") + val blockH = block("H", "F", 4, "miner8") + val blockI = block("I", "G", 5, "miner9") + + + val record1 = record(PumpEvent.NEW_BLOCK, blockH) + val record2 = record(PumpEvent.DROPPED_BLOCK, blockH) + val record3 = record(PumpEvent.DROPPED_BLOCK, blockF) + val record4 = record(PumpEvent.DROPPED_BLOCK, blockC) + val record5 = record(PumpEvent.NEW_BLOCK, blockD) + val record6 = record(PumpEvent.NEW_BLOCK, blockE) + val record7 = record(PumpEvent.NEW_BLOCK, blockG) + val record8 = record(PumpEvent.NEW_BLOCK, blockI) val blockRepository = mock { - on { save(any()) }.doReturn(Mono.empty()) + on { save(any()) }.thenReturn(Mono.empty()) on { delete(any()) }.thenReturn(Mono.empty()) } val contractMinedBlockRepository = mock { @@ -154,25 +62,41 @@ class BlockDumpProcessTest { blockDumpProcess.onMessage(listOf(record1, record2, record3, record4, record5, record6, record7, record8)) + verify(blockRepository, times(1)).save(CqlEthereumBlock(blockH)) verify(blockRepository, times(1)).save(CqlEthereumBlock(blockD)) verify(blockRepository, times(1)).save(CqlEthereumBlock(blockE)) verify(blockRepository, times(1)).save(CqlEthereumBlock(blockG)) verify(blockRepository, times(1)).save(CqlEthereumBlock(blockI)) + verify(blockRepository, times(1)).delete(CqlEthereumBlock(blockH)) verify(blockRepository, times(1)).delete(CqlEthereumBlock(blockF)) verify(blockRepository, times(1)).delete(CqlEthereumBlock(blockC)) + verify(contractMinedBlockRepository, times(1)).save(CqlEthereumContractMinedBlock(blockH)) verify(contractMinedBlockRepository, times(1)).save(CqlEthereumContractMinedBlock(blockD)) verify(contractMinedBlockRepository, times(1)).save(CqlEthereumContractMinedBlock(blockE)) verify(contractMinedBlockRepository, times(1)).save(CqlEthereumContractMinedBlock(blockG)) verify(contractMinedBlockRepository, times(1)).save(CqlEthereumContractMinedBlock(blockI)) + verify(contractMinedBlockRepository, times(1)).delete(CqlEthereumContractMinedBlock(blockH)) verify(contractMinedBlockRepository, times(1)).delete(CqlEthereumContractMinedBlock(blockF)) verify(contractMinedBlockRepository, times(1)).delete(CqlEthereumContractMinedBlock(blockC)) } + + fun block(hash: String, parentHash: String, number: Long, miner: String) = EthereumBlock( + hash = hash, number = number, parentHash = parentHash, txNumber = 158, minerContractHash = miner, + difficulty = BigInteger("0"), totalDifficulty = BigInteger("0"), size = 0, + unclesReward = BigDecimal("0"), blockReward = BigDecimal("0"), + txFees = BigDecimal("0"), gasUsed = 0, gasLimit = 0, + timestamp = Instant.now(), logsBloom = "", transactionsRoot = "", stateRoot = "", + sha3Uncles = "", nonce = 1, receiptsRoot = "", extraData = "", uncles = emptyList() + ) + + fun record(event: PumpEvent, block: EthereumBlock) = + ConsumerRecord(EthereumFamilyChain.ETHEREUM.blockPumpTopic, 0, 0, event, block) } diff --git a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/TxDumpProcessTest.kt b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/TxDumpProcessTest.kt index d3ba57b2..0c3050e8 100644 --- a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/TxDumpProcessTest.kt +++ b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/TxDumpProcessTest.kt @@ -2,6 +2,7 @@ package fund.cyber.dump.ethereum import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.never import com.nhaarman.mockito_kotlin.times import com.nhaarman.mockito_kotlin.verify import fund.cyber.cassandra.ethereum.model.CqlEthereumContractTxPreview @@ -21,106 +22,33 @@ import reactor.core.publisher.Mono import java.math.BigDecimal import java.time.Instant +@Suppress("LargeClass") class TxDumpProcessTest { - // --- D --- E --- G --- I - //A --- B --- C --- F --- H @Test @Suppress("LongMethod") - fun testWithDroppedTxs() { - - val txC = EthereumTx( - hash = "C", error = null, - nonce = 0, blockHash = "C", - blockNumber = 4959189, blockTime = Instant.ofEpochSecond(100000), positionInBlock = 1, - from = "a", to = "b", firstSeenTime = Instant.ofEpochSecond(100000), - value = BigDecimal.ZERO, gasPrice = BigDecimal.ZERO, gasLimit = 0, - gasUsed = 21000L, fee = BigDecimal.ZERO, input = "", createdSmartContract = null, trace = null - ) - - val txD = EthereumTx( - hash = "D", error = null, - nonce = 0, blockHash = "D", - blockNumber = 4959189, blockTime = Instant.ofEpochSecond(100000), positionInBlock = 1, - from = "a", to = "b", firstSeenTime = Instant.ofEpochSecond(100000), - value = BigDecimal.ZERO, gasPrice = BigDecimal.ZERO, gasLimit = 0, - gasUsed = 21000L, fee = BigDecimal.ZERO, input = "", createdSmartContract = null, trace = null - ) + fun testWithAllEventsTxs() { - val txE = EthereumTx( - hash = "E", error = null, - nonce = 0, blockHash = "E", - blockNumber = 4959189, blockTime = Instant.ofEpochSecond(100000), positionInBlock = 1, - from = "a", to = "b", firstSeenTime = Instant.ofEpochSecond(100000), - value = BigDecimal.ZERO, gasPrice = BigDecimal.ZERO, gasLimit = 0, - gasUsed = 21000L, fee = BigDecimal.ZERO, input = "", createdSmartContract = null, trace = null - ) + val txC = tx("C", "C") + val txD = tx("D", "D") + val txE = tx("E", "E") + val txF = tx("F", "F") + val txG = tx("G", "G") + val txH = tx("H", "H") + val txI = tx("I", "I") + val txK = tx("K", "K") - val txF = EthereumTx( - hash = "F", error = null, - nonce = 0, blockHash = "F", - blockNumber = 4959189, blockTime = Instant.ofEpochSecond(100000), positionInBlock = 1, - from = "a", to = "b", firstSeenTime = Instant.ofEpochSecond(100000), - value = BigDecimal.ZERO, gasPrice = BigDecimal.ZERO, gasLimit = 0, - gasUsed = 21000L, fee = BigDecimal.ZERO, input = "", createdSmartContract = null, trace = null - ) - val txG = EthereumTx( - hash = "G", error = null, - nonce = 0, blockHash = "G", - blockNumber = 4959189, blockTime = Instant.ofEpochSecond(100000), positionInBlock = 1, - from = "a", to = "b", firstSeenTime = Instant.ofEpochSecond(100000), - value = BigDecimal.ZERO, gasPrice = BigDecimal.ZERO, gasLimit = 0, - gasUsed = 21000L, fee = BigDecimal.ZERO, input = "", createdSmartContract = null, trace = null - ) - - val txH = EthereumTx( - hash = "H", error = null, - nonce = 0, blockHash = "H", - blockNumber = 4959189, blockTime = Instant.ofEpochSecond(100000), positionInBlock = 1, - from = "a", to = "b", firstSeenTime = Instant.ofEpochSecond(100000), - value = BigDecimal.ZERO, gasPrice = BigDecimal.ZERO, gasLimit = 0, - gasUsed = 21000L, fee = BigDecimal.ZERO, input = "", createdSmartContract = null, trace = null - ) - - val txI = EthereumTx( - hash = "I", error = null, - nonce = 0, blockHash = "I", - blockNumber = 4959189, blockTime = Instant.ofEpochSecond(100000), positionInBlock = 1, - from = "a", to = "b", firstSeenTime = Instant.ofEpochSecond(100000), - value = BigDecimal.ZERO, gasPrice = BigDecimal.ZERO, gasLimit = 0, - gasUsed = 21000L, fee = BigDecimal.ZERO, input = "", createdSmartContract = null, trace = null - ) - - val txK = EthereumTx( - hash = "K", error = null, - nonce = 0, blockHash = null, - blockNumber = -1, blockTime = null, positionInBlock = -1, - from = "a", to = "b", firstSeenTime = Instant.ofEpochSecond(100000), - value = BigDecimal.ZERO, gasPrice = BigDecimal.ZERO, gasLimit = 0, - gasUsed = 21000L, fee = BigDecimal.ZERO, input = "", createdSmartContract = null, trace = null - ) - - - val record1 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, txH) - val record2 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, - 0, PumpEvent.DROPPED_BLOCK, txH) - val record3 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, - 0, PumpEvent.DROPPED_BLOCK, txF) - val record4 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, - 0, PumpEvent.DROPPED_BLOCK, txC) - val record5 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, txD) - val record6 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, txE) - val record7 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, txG) - val record8 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, txI) - val record9 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, - 0, PumpEvent.NEW_POOL_TX, txK) + val record1 = record(PumpEvent.NEW_BLOCK, txH) + val record2 = record(PumpEvent.DROPPED_BLOCK, txH) + val record3 = record(PumpEvent.DROPPED_BLOCK, txF) + val record4 = record(PumpEvent.DROPPED_BLOCK, txC) + val record5 = record(PumpEvent.NEW_BLOCK, txD) + val record6 = record(PumpEvent.NEW_BLOCK, txE) + val record7 = record(PumpEvent.NEW_BLOCK, txG) + val record8 = record(PumpEvent.NEW_BLOCK, txI) + val record9 = record(PumpEvent.NEW_POOL_TX, txK) val txRepository = mock { on { save(any()) }.thenReturn(Mono.empty()) @@ -137,7 +65,7 @@ class TxDumpProcessTest { } val txDumpProcess = TxDumpProcess(txRepository, blockTxRepository, contractTxRepository, - EthereumFamilyChain.ETHEREUM) + EthereumFamilyChain.ETHEREUM) txDumpProcess.onMessage(listOf(record1, record2, record3, record4, record5, record6, record7, record8, record9)) @@ -184,5 +112,246 @@ class TxDumpProcessTest { } } + @Test + fun testNewBlockWithoutTxInDb() { + val txABlock = tx("A", "A") + + val txToSave = CqlEthereumTx(txABlock) + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.just(txToSave)) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.empty()) + } + val blockTxRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + } + val contractTxRepository = mock { + on { saveAll(any>()) }.thenReturn(Flux.empty()) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.NEW_BLOCK, txABlock) + + val txDumpProcess = TxDumpProcess(txRepository, blockTxRepository, contractTxRepository, + EthereumFamilyChain.ETHEREUM) + + txDumpProcess.onMessage(listOf(record)) + + verify(txRepository, times(1)).save(txToSave) + + verify(blockTxRepository, times(1)).save(CqlEthereumBlockTxPreview(txABlock)) + + verify(contractTxRepository, times(1)).saveAll( + txABlock.contractsUsedInTransaction().map { it -> + CqlEthereumContractTxPreview(txABlock, it) + } + ) + verify(contractTxRepository, times(1)).deleteAll( + txABlock.contractsUsedInTransaction().map { it -> + CqlEthereumContractTxPreview(txABlock.mempoolState(), it) + } + ) + } + + @Test + fun testNewBlockWithTxInDb() { + val txADb = tx("A", "A").mempoolState().copy(firstSeenTime = Instant.ofEpochMilli(999999)) + val txABlock = tx("A", "A") + + val txToSave = CqlEthereumTx(txABlock.copy(firstSeenTime = txADb.firstSeenTime)) + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.just(txToSave)) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.just(CqlEthereumTx(txADb))) + } + val blockTxRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + } + val contractTxRepository = mock { + on { saveAll(any>()) }.thenReturn(Flux.empty()) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.NEW_BLOCK, txABlock) + + val txDumpProcess = TxDumpProcess(txRepository, blockTxRepository, contractTxRepository, + EthereumFamilyChain.ETHEREUM) + + txDumpProcess.onMessage(listOf(record)) + + + + verify(txRepository, times(1)).save(txToSave) + verify(txRepository, never()).save(CqlEthereumTx(txABlock)) + + verify(blockTxRepository, times(1)).save(CqlEthereumBlockTxPreview(txABlock)) + + verify(contractTxRepository, times(1)).saveAll( + txABlock.contractsUsedInTransaction().map { it -> + CqlEthereumContractTxPreview(txABlock, it) + } + ) + verify(contractTxRepository, times(1)).deleteAll( + txABlock.contractsUsedInTransaction().map { it -> + CqlEthereumContractTxPreview(txABlock.mempoolState(), it) + } + ) + } + + @Test + fun testDropBlockWithoutTxInDb() { + val txADrop = tx("A", "A") + + val txToSave = CqlEthereumTx(txADrop.mempoolState()) + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.just(txToSave)) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.empty()) + } + val blockTxRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + } + val contractTxRepository = mock { + on { saveAll(any>()) }.thenReturn(Flux.empty()) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.DROPPED_BLOCK, txADrop) + + val txDumpProcess = TxDumpProcess(txRepository, blockTxRepository, contractTxRepository, + EthereumFamilyChain.ETHEREUM) + + txDumpProcess.onMessage(listOf(record)) + + verify(txRepository, times(1)).save(txToSave) + + verify(blockTxRepository, times(1)).delete(CqlEthereumBlockTxPreview(txADrop)) + + verify(contractTxRepository, times(1)).deleteAll( + txADrop.contractsUsedInTransaction().map { it -> + CqlEthereumContractTxPreview(txADrop, it) + } + ) + } + + @Test + fun testDropBlockWithTxInDb() { + val txADb = tx("A", "A").copy(firstSeenTime = Instant.ofEpochMilli(999999)) + val txADrop = tx("A", "B") + + val txToSave = CqlEthereumTx(txADrop.mempoolState().copy(firstSeenTime = txADb.firstSeenTime)) + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.just(txToSave)) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.just(CqlEthereumTx(txADb))) + } + val blockTxRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + } + val contractTxRepository = mock { + on { saveAll(any>()) }.thenReturn(Flux.empty()) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.DROPPED_BLOCK, txADrop) + + val txDumpProcess = TxDumpProcess(txRepository, blockTxRepository, contractTxRepository, + EthereumFamilyChain.ETHEREUM) + + txDumpProcess.onMessage(listOf(record)) + + verify(txRepository, times(1)).save(txToSave) + verify(txRepository, never()).save(CqlEthereumTx(txADrop.mempoolState())) + + verify(blockTxRepository, times(1)).delete(CqlEthereumBlockTxPreview(txADrop)) + + verify(contractTxRepository, times(1)).deleteAll( + txADrop.contractsUsedInTransaction().map { it -> + CqlEthereumContractTxPreview(txADrop, it) + } + ) + } + + @Test + fun testNewPoolWithoutTxInDb() { + val txAPool = tx("A", "A").mempoolState() + + val txToSave = CqlEthereumTx(txAPool) + val contractTxesToSave = txAPool.contractsUsedInTransaction().map { it -> + CqlEthereumContractTxPreview(txAPool, it) + } + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.just(txToSave)) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.empty()) + } + val blockTxRepository = mock() + + val contractTxRepository = mock { + on { saveAll(any>()) } + .thenReturn(Flux.fromIterable(contractTxesToSave)) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.NEW_POOL_TX, txAPool) + + val txDumpProcess = TxDumpProcess(txRepository, blockTxRepository, contractTxRepository, + EthereumFamilyChain.ETHEREUM) + + txDumpProcess.onMessage(listOf(record)) + + verify(txRepository, times(1)).save(txToSave) + + verify(contractTxRepository, times(1)).saveAll(contractTxesToSave) + } + + @Test + fun testNewPoolWithTxInDb() { + val txADb = tx("A", "A") + val txAPool = tx("A", "A").mempoolState() + + val txRepository = mock { + on { save(any()) }.thenReturn(Mono.empty()) + on { delete(any()) }.thenReturn(Mono.empty()) + on { findById(any()) }.thenReturn(Mono.just(CqlEthereumTx(txADb))) + } + val blockTxRepository = mock() + + val contractTxRepository = mock { + on { saveAll(any>()) }.thenReturn(Flux.empty()) + on { deleteAll(any>()) }.thenReturn(Mono.empty()) + } + + val record = record(PumpEvent.NEW_POOL_TX, txAPool) + + val txDumpProcess = TxDumpProcess(txRepository, blockTxRepository, contractTxRepository, + EthereumFamilyChain.ETHEREUM) + + txDumpProcess.onMessage(listOf(record)) + + verify(txRepository, never()).save(any()) + + verify(contractTxRepository, never()).saveAll(any>()) + } + + fun tx(hash: String, blockHash: String?) = EthereumTx( + hash = hash, error = null, nonce = 0, blockHash = blockHash, + blockNumber = 4959189, blockTime = Instant.ofEpochSecond(100000), positionInBlock = 1, + from = "a", to = "b", firstSeenTime = Instant.ofEpochSecond(100000), + value = BigDecimal.ZERO, gasPrice = BigDecimal.ZERO, gasLimit = 0, + gasUsed = 21000L, fee = BigDecimal.ZERO, input = "", createdSmartContract = null, trace = null + ) + + fun record(event: PumpEvent, tx: EthereumTx) = + ConsumerRecord(EthereumFamilyChain.ETHEREUM.txPumpTopic, 0, 0, event, tx) } diff --git a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/UncleDumpProcessTest.kt b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/UncleDumpProcessTest.kt index 1858f2c1..dcfa4c43 100644 --- a/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/UncleDumpProcessTest.kt +++ b/dumps/ethereum/src/test/kotlin/fund/cyber/dump/ethereum/UncleDumpProcessTest.kt @@ -27,57 +27,23 @@ class UncleDumpProcessTest { @Suppress("LongMethod") fun testWithDroppedUncles() { - val uncleC = EthereumUncle( - hash = "C", position = 0, number = 0, timestamp = Instant.now(), blockNumber = 0, - blockTime = Instant.now(), blockHash = "C", miner = "minerContractHash", uncleReward = BigDecimal("0") - ) - - val uncleD = EthereumUncle( - hash = "D", position = 0, number = 0, timestamp = Instant.now(), blockNumber = 0, - blockTime = Instant.now(), blockHash = "D", miner = "minerContractHash", uncleReward = BigDecimal("0") - ) - - val uncleE = EthereumUncle( - hash = "E", position = 0, number = 0, timestamp = Instant.now(), blockNumber = 0, - blockTime = Instant.now(), blockHash = "E", miner = "minerContractHash", uncleReward = BigDecimal("0") - ) - - val uncleF = EthereumUncle( - hash = "F", position = 0, number = 0, timestamp = Instant.now(), blockNumber = 0, - blockTime = Instant.now(), blockHash = "F", miner = "minerContractHash", uncleReward = BigDecimal("0") - ) - val uncleG = EthereumUncle( - hash = "G", position = 0, number = 0, timestamp = Instant.now(), blockNumber = 0, - blockTime = Instant.now(), blockHash = "G", miner = "minerContractHash", uncleReward = BigDecimal("0") - ) - - val uncleH = EthereumUncle( - hash = "H", position = 0, number = 0, timestamp = Instant.now(), blockNumber = 0, - blockTime = Instant.now(), blockHash = "H", miner = "minerContractHash", uncleReward = BigDecimal("0") - ) - - val uncleI = EthereumUncle( - hash = "I", position = 0, number = 0, timestamp = Instant.now(), blockNumber = 0, - blockTime = Instant.now(), blockHash = "I", miner = "minerContractHash", uncleReward = BigDecimal("0") - ) - - - val record1 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.unclePumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, uncleH) - val record2 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.unclePumpTopic, 0, - 0, PumpEvent.DROPPED_BLOCK, uncleH) - val record3 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.unclePumpTopic, 0, - 0, PumpEvent.DROPPED_BLOCK, uncleF) - val record4 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.unclePumpTopic, 0, - 0, PumpEvent.DROPPED_BLOCK, uncleC) - val record5 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.unclePumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, uncleD) - val record6 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.unclePumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, uncleE) - val record7 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.unclePumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, uncleG) - val record8 = ConsumerRecord(EthereumFamilyChain.ETHEREUM.unclePumpTopic, 0, - 0, PumpEvent.NEW_BLOCK, uncleI) + val uncleC = uncle("C") + val uncleD = uncle("D") + val uncleE = uncle("E") + val uncleF = uncle("F") + val uncleG = uncle("G") + val uncleH = uncle("H") + val uncleI = uncle("I") + + + val record1 = record(PumpEvent.NEW_BLOCK, uncleH) + val record2 = record(PumpEvent.DROPPED_BLOCK, uncleH) + val record3 = record(PumpEvent.DROPPED_BLOCK, uncleF) + val record4 = record(PumpEvent.DROPPED_BLOCK, uncleC) + val record5 = record(PumpEvent.NEW_BLOCK, uncleD) + val record6 = record(PumpEvent.NEW_BLOCK, uncleE) + val record7 = record(PumpEvent.NEW_BLOCK, uncleG) + val record8 = record(PumpEvent.NEW_BLOCK, uncleI) val uncleRepository = mock { on { save(any()) }.thenReturn(Mono.empty()) @@ -93,22 +59,34 @@ class UncleDumpProcessTest { blockDumpProcess.onMessage(listOf(record1, record2, record3, record4, record5, record6, record7, record8)) + verify(uncleRepository, times(1)).save(CqlEthereumUncle(uncleH)) verify(uncleRepository, times(1)).save(CqlEthereumUncle(uncleD)) verify(uncleRepository, times(1)).save(CqlEthereumUncle(uncleE)) verify(uncleRepository, times(1)).save(CqlEthereumUncle(uncleG)) verify(uncleRepository, times(1)).save(CqlEthereumUncle(uncleI)) + verify(uncleRepository, times(1)).delete(CqlEthereumUncle(uncleH)) verify(uncleRepository, times(1)).delete(CqlEthereumUncle(uncleF)) verify(uncleRepository, times(1)).delete(CqlEthereumUncle(uncleC)) + verify(contractUncleRepository, times(1)).save(CqlEthereumContractMinedUncle(uncleH)) verify(contractUncleRepository, times(1)).save(CqlEthereumContractMinedUncle(uncleD)) verify(contractUncleRepository, times(1)).save(CqlEthereumContractMinedUncle(uncleE)) verify(contractUncleRepository, times(1)).save(CqlEthereumContractMinedUncle(uncleG)) verify(contractUncleRepository, times(1)).save(CqlEthereumContractMinedUncle(uncleI)) + verify(contractUncleRepository, times(1)).delete(CqlEthereumContractMinedUncle(uncleH)) verify(contractUncleRepository, times(1)).delete(CqlEthereumContractMinedUncle(uncleF)) verify(contractUncleRepository, times(1)).delete(CqlEthereumContractMinedUncle(uncleC)) } + fun uncle(hash: String) = EthereumUncle( + hash = hash, position = 0, number = 0, timestamp = Instant.now(), blockNumber = 0, + blockTime = Instant.now(), blockHash = hash, miner = "minerContractHash", uncleReward = BigDecimal("0") + ) + + fun record(event: PumpEvent, uncle: EthereumUncle) = + ConsumerRecord(EthereumFamilyChain.ETHEREUM.unclePumpTopic, 0, 0, event, uncle) + } diff --git a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/converter/JsonRpcToDaoBitcoinBlockConverter.kt b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/converter/JsonRpcToDaoBitcoinBlockConverter.kt index 2527f8c2..143d10ac 100644 --- a/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/converter/JsonRpcToDaoBitcoinBlockConverter.kt +++ b/pumps/bitcoin/src/main/kotlin/fund/cyber/pump/bitcoin/client/converter/JsonRpcToDaoBitcoinBlockConverter.kt @@ -24,15 +24,16 @@ class JsonRpcToDaoBitcoinBlockConverter { val coinbaseTxMinerOutput = coinbaseTx?.outs?.firstOrNull() return BitcoinBlock( - hash = jsonRpcBlock.hash.toSearchHashFormat(), size = jsonRpcBlock.size, - minerContractHash = coinbaseTxMinerOutput?.contracts?.first()?.toSearchHashFormat() ?: "", - version = jsonRpcBlock.version, blockReward = getBlockReward(jsonRpcBlock.height), - txFees = transactions.map { tx -> tx.fee }.sum(), coinbaseData = coinbaseTx?.coinbase ?: "", - bits = jsonRpcBlock.bits, difficulty = jsonRpcBlock.difficulty.toBigInteger(), - nonce = jsonRpcBlock.nonce, time = Instant.ofEpochSecond(jsonRpcBlock.time), - weight = jsonRpcBlock.weight, merkleroot = jsonRpcBlock.merkleroot.toSearchHashFormat(), - height = jsonRpcBlock.height, - txNumber = jsonRpcBlock.tx.size, totalOutputsAmount = totalOutputsValue + hash = jsonRpcBlock.hash.toSearchHashFormat(), parentHash = jsonRpcBlock.previousblockhash ?: "", + size = jsonRpcBlock.size, + minerContractHash = coinbaseTxMinerOutput?.contracts?.first()?.toSearchHashFormat() ?: "", + version = jsonRpcBlock.version, blockReward = getBlockReward(jsonRpcBlock.height), + txFees = transactions.map { tx -> tx.fee }.sum(), coinbaseData = coinbaseTx?.coinbase ?: "", + bits = jsonRpcBlock.bits, difficulty = jsonRpcBlock.difficulty.toBigInteger(), + nonce = jsonRpcBlock.nonce, time = Instant.ofEpochSecond(jsonRpcBlock.time), + weight = jsonRpcBlock.weight, merkleroot = jsonRpcBlock.merkleroot.toSearchHashFormat(), + height = jsonRpcBlock.height, + txNumber = jsonRpcBlock.tx.size, totalOutputsAmount = totalOutputsValue ) } } From 77d03503edc19635f3e79476be44c7d02ff22d03 Mon Sep 17 00:00:00 2001 From: Artur Albov Date: Mon, 28 May 2018 13:35:54 +0300 Subject: [PATCH 7/7] #172 Setup bitcoin mempool transaction dump. Add parent hash to BTC block --- .../cyber/cassandra/bitcoin/model/Block.kt | 36 ++++++++++--------- .../migrations/bitcoin/0_initial.cql | 1 + .../migrations/bitcoin_cash/0_initial.cql | 1 + 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Block.kt b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Block.kt index 919c9d89..8d94600e 100644 --- a/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Block.kt +++ b/cassandra-service/src/main/kotlin/fund/cyber/cassandra/bitcoin/model/Block.kt @@ -54,26 +54,28 @@ data class CqlBitcoinBlockTxPreview( @Table("block") data class CqlBitcoinBlock( - @PrimaryKey val number: Long, - val hash: String, - @Column("miner_contract_hash") val minerContractHash: String, - @Column("block_reward") val blockReward: BigDecimal, - @Column("tx_fees") val txFees: BigDecimal, - @Column("coinbase_data") val coinbaseData: String, - val timestamp: Instant, - val nonce: Long, - val merkleroot: String, - val size: Int, - val version: Int, - val weight: Int, - val bits: String, - val difficulty: BigInteger, - @Column("tx_number") val txNumber: Int, - @Column("total_outputs_value") val totalOutputsValue: String + @PrimaryKey val number: Long, + val hash: String, + @Column("parent_hash") val parentHash: String, + @Column("miner_contract_hash") val minerContractHash: String, + @Column("block_reward") val blockReward: BigDecimal, + @Column("tx_fees") val txFees: BigDecimal, + @Column("coinbase_data") val coinbaseData: String, + val timestamp: Instant, + val nonce: Long, + val merkleroot: String, + val size: Int, + val version: Int, + val weight: Int, + val bits: String, + val difficulty: BigInteger, + @Column("tx_number") val txNumber: Int, + @Column("total_outputs_value") val totalOutputsValue: String ) : CqlBitcoinItem { constructor(block: BitcoinBlock) : this( - number = block.height, hash = block.hash, minerContractHash = block.minerContractHash, + number = block.height, hash = block.hash, parentHash = block.parentHash, + minerContractHash = block.minerContractHash, blockReward = block.blockReward, txFees = block.txFees, coinbaseData = block.coinbaseData, timestamp = block.time, nonce = block.nonce, bits = block.bits, merkleroot = block.merkleroot, size = block.size, version = block.version, weight = block.weight, difficulty = block.difficulty, diff --git a/cassandra-service/src/main/resources/migrations/bitcoin/0_initial.cql b/cassandra-service/src/main/resources/migrations/bitcoin/0_initial.cql index e8da024d..8d0ec96f 100644 --- a/cassandra-service/src/main/resources/migrations/bitcoin/0_initial.cql +++ b/cassandra-service/src/main/resources/migrations/bitcoin/0_initial.cql @@ -66,6 +66,7 @@ CREATE TABLE IF NOT EXISTS bitcoin.tx ( CREATE TABLE IF NOT EXISTS bitcoin.block ( hash text, + parent_hash text, number bigint PRIMARY KEY, miner_contract_hash text, block_reward decimal, diff --git a/cassandra-service/src/main/resources/migrations/bitcoin_cash/0_initial.cql b/cassandra-service/src/main/resources/migrations/bitcoin_cash/0_initial.cql index ab4f3d2f..4a9f26e2 100644 --- a/cassandra-service/src/main/resources/migrations/bitcoin_cash/0_initial.cql +++ b/cassandra-service/src/main/resources/migrations/bitcoin_cash/0_initial.cql @@ -67,6 +67,7 @@ CREATE TABLE IF NOT EXISTS bitcoin_cash.tx ( CREATE TABLE IF NOT EXISTS bitcoin_cash.block ( hash text, + parent_hash text, number bigint PRIMARY KEY, miner_contract_hash text, block_reward decimal,