Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

Commit

Permalink
Merge branch 'master' into 92_pump_dev_guide
Browse files Browse the repository at this point in the history
  • Loading branch information
hleb-albau authored Jun 1, 2018
2 parents 32f81ce + da0633e commit 3a5f578
Show file tree
Hide file tree
Showing 75 changed files with 2,138 additions and 189 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Search engine backend and API.
## Getting Start Useful Links

* [Development Quick Start Guide](./docs/contributing/dev-environment.md)
* [Architecture Overview](http://docs.cybernode.io/cybernode/components/search/)
* [Architecture Overview](https://cybersearch.io/cyberSearch/components/overview/)
* [Api OpenAPI 3.0 Documentation](http://docs.cybersearch.io/)
* [Api Test Server](http://api.cybersearch.io/search?query=42)
* [Docker Images](https://hub.docker.com/r/cybernode/)
Expand Down
14 changes: 12 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ buildscript {
}

repositories {
maven {
url "$rootDir/libs"
}
mavenCentral()
jcenter()
maven { url 'https://dl.bintray.com/kotlin/kotlin-dev/' }
Expand Down Expand Up @@ -71,6 +74,9 @@ allprojects {

buildscript {
repositories {
maven {
url "$rootDir/libs"
}
maven { url 'https://plugins.gradle.org/m2/' }
}
}
Expand Down Expand Up @@ -114,6 +120,9 @@ subprojects {
}

repositories {
maven {
url "$rootDir/libs"
}
jcenter()
mavenCentral()
maven { url 'https://repo.spring.io/libs-milestone' }
Expand Down Expand Up @@ -158,7 +167,6 @@ subprojects {
}

dependency("org.ehcache:ehcache:$ehcacheVersion")
dependency("org.elasticsearch.client:transport:$elasticVersion")

dependency "org.springframework.data:spring-data-cassandra:$springCassandraVersion"
dependency "org.springframework.data:spring-data-commons:$springCassandraVersion"
Expand Down Expand Up @@ -222,6 +230,7 @@ project(":common-kafka") {

compile "org.springframework.kafka:spring-kafka"
compile "org.apache.kafka:kafka-clients"
compile "javax.annotation:javax.annotation-api:1.3.2"

testCompile 'org.apache.kafka:kafka_2.11'
testCompile 'org.springframework:spring-test'
Expand Down Expand Up @@ -260,9 +269,10 @@ project(":search-api") {
compile 'org.springframework.boot:spring-boot-starter-webflux'
compile 'org.springframework.boot:spring-boot-starter-reactor-netty'
compile 'org.springframework.boot:spring-boot-starter-actuator'
compile 'org.elasticsearch.client:transport'
compile 'io.micrometer:micrometer-registry-prometheus'
compile 'io.micrometer:micrometer-core'
compile 'com.strapdata.elasticsearch:elasticsearch:5.5.0'
compile 'com.strapdata.elasticsearch.client:transport:5.5.0'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import fund.cyber.cassandra.migration.BlockchainMigrationSettings
import fund.cyber.cassandra.migration.MigrationSettings
import fund.cyber.search.configuration.CASSANDRA_HOSTS
import fund.cyber.search.configuration.CASSANDRA_HOSTS_DEFAULT
import fund.cyber.search.configuration.CASSANDRA_MAX_CONNECTIONS_LOCAL
import fund.cyber.search.configuration.CASSANDRA_MAX_CONNECTIONS_LOCAL_DEFAULT
import fund.cyber.search.configuration.CASSANDRA_MAX_CONNECTIONS_REMOTE
import fund.cyber.search.configuration.CASSANDRA_MAX_CONNECTIONS_REMOTE_DEFAULT
import fund.cyber.search.configuration.CASSANDRA_PORT
import fund.cyber.search.configuration.CASSANDRA_PORT_DEFAULT
import fund.cyber.search.configuration.CHAIN
Expand Down Expand Up @@ -62,8 +66,12 @@ class BitcoinRepositoryConfiguration(
@Value("\${$CASSANDRA_HOSTS:$CASSANDRA_HOSTS_DEFAULT}")
private val cassandraHosts: String,
@Value("\${$CASSANDRA_PORT:$CASSANDRA_PORT_DEFAULT}")
private val cassandraPort: Int
) : CassandraRepositoriesConfiguration(cassandraHosts, cassandraPort) {
private val cassandraPort: Int,
@Value("\${$CASSANDRA_MAX_CONNECTIONS_LOCAL:$CASSANDRA_MAX_CONNECTIONS_LOCAL_DEFAULT}")
private val maxConnectionsLocal: Int,
@Value("\${$CASSANDRA_MAX_CONNECTIONS_REMOTE:$CASSANDRA_MAX_CONNECTIONS_REMOTE_DEFAULT}")
private val maxConnectionsRemote: Int
) : CassandraRepositoriesConfiguration(cassandraHosts, cassandraPort, maxConnectionsLocal, maxConnectionsRemote) {

private val chain = BitcoinFamilyChain.valueOf(env(CHAIN, ""))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ data class CqlBitcoinContractTxPreview(
val outs: List<CqlBitcoinTxPreviewIO>
) : CqlBitcoinItem {

constructor(contract: String, tx: BitcoinTx) : this(
constructor(
contract: String, tx: BitcoinTx, ins: List<CqlBitcoinTxPreviewIO>, outs: List<CqlBitcoinTxPreviewIO>
) : this(
contractHash = contract, blockTime = tx.blockTime?.toEpochMilli() ?: -1, hash = tx.hash, fee = tx.fee,
blockNumber = tx.blockNumber, ins = tx.ins.map { txIn -> CqlBitcoinTxPreviewIO(txIn) },
outs = tx.outs.map { txOut -> CqlBitcoinTxPreviewIO(txOut) }
blockNumber = tx.blockNumber, ins = ins, outs = outs
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package fund.cyber.cassandra.configuration
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.HostDistance
import com.datastax.driver.core.PoolingOptions
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy
import com.datastax.driver.core.policies.LoadBalancingPolicy
import fund.cyber.cassandra.common.defaultKeyspaceSpecification
import fund.cyber.cassandra.migration.DefaultMigrationsLoader
import fund.cyber.search.model.chains.Chain
Expand Down Expand Up @@ -34,8 +36,8 @@ val Chain.keyspace: String get() = lowerCaseName
const val REPOSITORY_NAME_DELIMETER = "__"

fun mappingContext(
cluster: Cluster, keyspace: String, basePackage: String,
customConversions: CassandraCustomConversions = CassandraCustomConversions(emptyList<Any>())
cluster: Cluster, keyspace: String, basePackage: String,
customConversions: CassandraCustomConversions = CassandraCustomConversions(emptyList<Any>())
): CassandraMappingContext {

val mappingContext = CassandraMappingContext()
Expand All @@ -50,21 +52,23 @@ fun mappingContext(
fun getKeyspaceSession(cluster: Cluster,
keyspace: String,
converter: MappingCassandraConverter) = CassandraSessionFactoryBean()
.apply {
setCluster(cluster)
setConverter(converter)
setKeyspaceName(keyspace)
schemaAction = SchemaAction.NONE
}
.apply {
setCluster(cluster)
setConverter(converter)
setKeyspaceName(keyspace)
schemaAction = SchemaAction.NONE
}

abstract class CassandraRepositoriesConfiguration(
private val cassandraHosts: String,
private val cassandraPort: Int
private val cassandraHosts: String,
private val cassandraPort: Int,
private val maxRequestLocal: Int = MAX_CONCURRENT_REQUESTS,
private val maxRequestRemote: Int = MAX_CONCURRENT_REQUESTS
) : AbstractReactiveCassandraConfiguration() {

override fun getPoolingOptions() = PoolingOptions()
.setMaxRequestsPerConnection(HostDistance.LOCAL, MAX_CONCURRENT_REQUESTS)
.setMaxRequestsPerConnection(HostDistance.REMOTE, MAX_CONCURRENT_REQUESTS)!!
.setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestLocal)
.setMaxRequestsPerConnection(HostDistance.REMOTE, maxRequestRemote)!!

override fun getPort() = cassandraPort
override fun getContactPoints() = cassandraHosts
Expand All @@ -78,6 +82,13 @@ abstract class CassandraRepositoriesConfiguration(
override fun getKeyspaceCreations(): List<CreateKeyspaceSpecification> {
return listOf(defaultKeyspaceSpecification("cyber_system"))
}

override fun getLoadBalancingPolicy(): LoadBalancingPolicy? {
return DCAwareRoundRobinPolicy.builder().withLocalDc("WITHOUT_REPLICATION")
.withUsedHostsPerRemoteDc(0)
.build()
}

}


Expand All @@ -91,13 +102,13 @@ class CassandraConfiguration {

@Bean
fun httpClient() = HttpClients.custom()
.setConnectionManager(connectionManager)
.setConnectionManagerShared(true)
.setDefaultHeaders(defaultHttpHeaders)
.build()!!
.setConnectionManager(connectionManager)
.setConnectionManagerShared(true)
.setDefaultHeaders(defaultHttpHeaders)
.build()!!

@Bean
fun migrationsLoader(resourceLoader: GenericApplicationContext) = DefaultMigrationsLoader(
resourceLoader = resourceLoader
resourceLoader = resourceLoader
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import fund.cyber.cassandra.migration.BlockchainMigrationSettings
import fund.cyber.cassandra.migration.MigrationSettings
import fund.cyber.search.configuration.CASSANDRA_HOSTS
import fund.cyber.search.configuration.CASSANDRA_HOSTS_DEFAULT
import fund.cyber.search.configuration.CASSANDRA_MAX_CONNECTIONS_LOCAL
import fund.cyber.search.configuration.CASSANDRA_MAX_CONNECTIONS_LOCAL_DEFAULT
import fund.cyber.search.configuration.CASSANDRA_MAX_CONNECTIONS_REMOTE
import fund.cyber.search.configuration.CASSANDRA_MAX_CONNECTIONS_REMOTE_DEFAULT
import fund.cyber.search.configuration.CASSANDRA_PORT
import fund.cyber.search.configuration.CASSANDRA_PORT_DEFAULT
import fund.cyber.search.configuration.CHAIN
Expand Down Expand Up @@ -68,8 +72,12 @@ class EthereumRepositoryConfiguration(
@Value("\${$CASSANDRA_HOSTS:$CASSANDRA_HOSTS_DEFAULT}")
private val cassandraHosts: String,
@Value("\${$CASSANDRA_PORT:$CASSANDRA_PORT_DEFAULT}")
private val cassandraPort: Int
) : CassandraRepositoriesConfiguration(cassandraHosts, cassandraPort) {
private val cassandraPort: Int,
@Value("\${$CASSANDRA_MAX_CONNECTIONS_LOCAL:$CASSANDRA_MAX_CONNECTIONS_LOCAL_DEFAULT}")
private val maxConnectionsLocal: Int,
@Value("\${$CASSANDRA_MAX_CONNECTIONS_REMOTE:$CASSANDRA_MAX_CONNECTIONS_REMOTE_DEFAULT}")
private val maxConnectionsRemote: Int
) : CassandraRepositoriesConfiguration(cassandraHosts, cassandraPort, maxConnectionsLocal, maxConnectionsRemote) {

private val chain = EthereumFamilyChain.valueOf(env(CHAIN, ""))

Expand Down
15 changes: 15 additions & 0 deletions common/src/main/kotlin/fund/cyber/search/configuration/Env.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ const val GENESIS_SUPPLY = "GENESIS_SUPPLY"
const val BITCOIN_TX_OUTS_CACHE_HEAP_SIZE = "BITCOIN_TX_OUTS_CACHE_HEAP_SIZE"
const val BITCOIN_TX_OUTS_CACHE_HEAP_SIZE_DEFAULT = 4

const val PUMP_MAX_CONCURRENCY = "PUMP_MAX_CONCURRENCY"
const val PUMP_MAX_CONCURRENCY_DEFAULT = 6

const val BTC_TX_DOWNLOAD_MAX_CONCURRENCY = "BTC_TX_DOWNLOAD_MAX_CONCURRENCY"
const val BTC_TX_DOWNLOAD_MAX_CONCURRENCY_DEFAULT = 4

const val KAFKA_LISTENER_MAX_POLL_RECORDS = "KAFKA_LISTENER_MAX_POLL_RECORDS"
const val KAFKA_LISTENER_MAX_POLL_RECORDS_DEFAULT = 500

const val CASSANDRA_MAX_CONNECTIONS_LOCAL = "CASSANDRA_MAX_CONNECTIONS_LOCAL"
const val CASSANDRA_MAX_CONNECTIONS_LOCAL_DEFAULT = 32768

const val CASSANDRA_MAX_CONNECTIONS_REMOTE = "CASSANDRA_MAX_CONNECTIONS_REMOTE"
const val CASSANDRA_MAX_CONNECTIONS_REMOTE_DEFAULT = 2048


inline fun <reified T : Any> env(name: String, default: T): T =
when (T::class) {
Expand Down
1 change: 1 addition & 0 deletions common/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<logger name="org.springframework.http.server" level="WARN"/>
<logger name="org.springframework.transaction.support" level="WARN"/>
<logger name="org.web3j.protocol.http" level="WARN"/>
<logger name="org.springframework.data.cassandra.core.convert" level="WARN"/>

<root level="${CS_LOG_LEVEL:-INFO}">
<appender-ref ref="STDOUT"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,36 +56,43 @@ data class BitcoinContractSummaryDelta(
}
}

//todo: txNumberDelta should be 1 if contract both in ins and outs
@Component
class BitcoinTxDeltaProcessor : DeltaProcessor<BitcoinTx, CqlBitcoinContractSummary, BitcoinContractSummaryDelta> {

override fun recordToDeltas(record: ConsumerRecord<PumpEvent, BitcoinTx>): List<BitcoinContractSummaryDelta> {
val tx = record.value()
val event = record.key()

val contractsAffected = mutableSetOf<String>()

val contractsDeltasByIns = tx.ins.flatMap { input ->
input.contracts.map { contract ->
val txNumberDelta = if (contractsAffected.contains(contract)) 0 else 1
contractsAffected.add(contract)

BitcoinContractSummaryDelta(
contract = contract, balanceDelta = -input.amount, txNumberDelta = 1,
totalReceivedDelta = ZERO, topic = record.topic(), partition = record.partition(),
offset = record.offset(), time = tx.blockTime!!
contract = contract, balanceDelta = -input.amount, txNumberDelta = txNumberDelta,
totalReceivedDelta = ZERO, topic = record.topic(), partition = record.partition(),
offset = record.offset(), time = tx.blockTime!!
)
}
}

val contractsDeltasByOuts = tx.outs.flatMap { output ->
output.contracts.map { contract ->
val txNumberDelta = if (contractsAffected.contains(contract)) 0 else 1
contractsAffected.add(contract)

BitcoinContractSummaryDelta(
contract = contract, balanceDelta = output.amount, txNumberDelta = 1,
totalReceivedDelta = output.amount, topic = record.topic(), partition = record.partition(),
offset = record.offset(), time = tx.blockTime!!
contract = contract, balanceDelta = output.amount, txNumberDelta = txNumberDelta,
totalReceivedDelta = output.amount, topic = record.topic(), partition = record.partition(),
offset = record.offset(), time = tx.blockTime!!
)
}
}

return (contractsDeltasByIns + contractsDeltasByOuts)
.map { delta -> if (event == PumpEvent.DROPPED_BLOCK) delta.revertedDelta() else delta }
.map { delta -> if (event == PumpEvent.DROPPED_BLOCK) delta.revertedDelta() else delta }
}

override fun affectedContracts(records: List<ConsumerRecord<PumpEvent, BitcoinTx>>): Set<String> {
Expand Down
31 changes: 30 additions & 1 deletion docs/api/search.v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ paths:
'400':
description: 'Invalid query, page or pageSize parameters supplied'


/search/stats:
get:
tags:
- search
summary: Search API indices statistic
description: 'Endpoint to obtain search indices statistics'
operationId: search-stats
responses:
'200':
description: 'Successful statistics'
content:
application/json:
schema:
$ref: '#/components/schemas/SearchStatsResponse'

components:
schemas:

Expand Down Expand Up @@ -113,4 +129,17 @@ components:
type: object
anyOf:
- $ref: "bitcoin.v1.yaml#/components/schemas/BitcoinBlockPreview"
- $ref: "bitcoin.v1.yaml#/components/schemas/BitcoinTxPreview"
- $ref: "bitcoin.v1.yaml#/components/schemas/BitcoinTxPreview"

SearchStatsResponse:
type: object
properties:
blockchains:
type: integer
format: int32
transactionsCount:
type: integer
format: int64
indexSizeBytes:
type: integer
format: int64
Loading

0 comments on commit 3a5f578

Please sign in to comment.