Skip to content

Reduce address comparisons for network topology replica calculation #532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
41 changes: 32 additions & 9 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
#!groovy
import org.jenkinsci.plugins.workflow.steps.FlowInterruptedException

def get_os_distro() {
return sh(label: 'Assign env.OS_DISTRO based on OS env', script: '''#!/bin/bash -le
echo ${OS_DISTRO}''', returnStdout: true).trim()
}

def initializeEnvironment() {
env.DRIVER_DISPLAY_NAME = 'Cassandra C/C++ Driver'
env.DRIVER_TYPE = 'CASS'
@@ -545,7 +550,10 @@ pipeline {
post {
success {
// Allow empty results for 'osx/high-sierra' which doesn't produce packages
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/libuv*", allowEmptyArchive: true
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/libuv*", allowEmptyArchive: true
}
}
}
}
@@ -555,12 +563,18 @@ pipeline {
}
post {
success {
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/cassandra-*-tests"
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/dse-*-tests", allowEmptyArchive: true
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/cassandra-*-tests"
archiveArtifacts artifacts: "${distro}/**/dse-*-tests", allowEmptyArchive: true
}
}
failure {
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeOutput.log"
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeError.log"
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/CMakeOutput.log"
archiveArtifacts artifacts: "${distro}/**/CMakeError.log"
}
}
}
}
@@ -590,7 +604,10 @@ pipeline {
}
post {
success {
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/*-cpp-driver*"
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/*-cpp-driver*"
}
}
}
}
@@ -748,8 +765,11 @@ pipeline {
}
post {
failure {
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeOutput.log"
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeError.log"
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/CMakeOutput.log"
archiveArtifacts artifacts: "${distro}/**/CMakeError.log"
}
}
}
}
@@ -764,7 +784,10 @@ pipeline {
junit testResults: '*integration-tests-*-results.xml', allowEmptyResults: true
}
failure {
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/*-integration-tests-driver-logs.tgz"
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/*-integration-tests-driver-logs.tgz"
}
}
cleanup {
cleanWs()
1 change: 1 addition & 0 deletions src/host.hpp
Original file line number Diff line number Diff line change
@@ -304,4 +304,5 @@ bool remove_host(CopyOnWriteHostVec& hosts, const Address& address);

}}} // namespace datastax::internal::core


#endif
14 changes: 10 additions & 4 deletions src/token_map_impl.hpp
Original file line number Diff line number Diff line change
@@ -435,6 +435,9 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
CopyOnWriteHostVec replicas(new HostVec());
replicas->reserve(num_replicas);

AddressSet replicas_set;
replicas_set.resize(num_replicas);

// Clear datacenter and rack information for the next token
for (typename DatacenterRackInfoMap::iterator j = dc_racks.begin(), end = dc_racks.end();
j != end; ++j) {
@@ -444,7 +447,7 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
}

for (typename TokenHostVec::const_iterator j = tokens.begin(), end = tokens.end();
j != end && replicas->size() < num_replicas; ++j) {
j != end && replicas_set.size() < num_replicas; ++j) {
typename TokenHostVec::const_iterator curr_token_it = token_it;
Host* host = curr_token_it->second;
uint32_t dc = host->dc_id();
@@ -476,15 +479,17 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
// datacenter only then consider hosts in the same rack

if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) {
if (add_replica(replicas, Host::Ptr(host))) {
if (replicas_set.insert(host->address()).second) {
replicas->push_back(Host::Ptr(host));
++replica_count_this_dc;
}
} else {
TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints;
if (racks_observed_this_dc.count(rack) > 0) {
skipped_endpoints_this_dc.push_back(curr_token_it);
} else {
if (add_replica(replicas, Host::Ptr(host))) {
if (replicas_set.insert(host->address()).second) {
replicas->push_back(Host::Ptr(host));
++replica_count_this_dc;
racks_observed_this_dc.insert(rack);
}
@@ -494,7 +499,8 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
if (racks_observed_this_dc.size() == rack_count_this_dc) {
while (!skipped_endpoints_this_dc.empty() &&
replica_count_this_dc < replication_factor) {
if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) {
if (replicas_set.insert(skipped_endpoints_this_dc.front()->second->address()).second) {
replicas->push_back(Host::Ptr(skipped_endpoints_this_dc.front()->second));
++replica_count_this_dc;
}
skipped_endpoints_this_dc.pop_front();
12 changes: 10 additions & 2 deletions tests/src/unit/test_token_map_utils.hpp
Original file line number Diff line number Diff line change
@@ -79,6 +79,8 @@ class BufferBuilder {

static size_t size_of(const String& value) { return value.size(); }

static size_t size_of(const Address& value) { char buf[16]; return value.to_inet(buf); }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes fix test warnings.


static void encode(char* buf, uint16_t value) { datastax::internal::encode_uint16(buf, value); }

static void encode(char* buf, int32_t value) { datastax::internal::encode_int32(buf, value); }
@@ -87,6 +89,8 @@ class BufferBuilder {

static void encode(char* buf, const String& value) { memcpy(buf, value.data(), value.size()); }

static void encode(char* buf, const Address& value) { value.to_inet(buf); }

private:
String buffer_;
};
@@ -154,9 +158,11 @@ class RowResultResponseBuilder : protected BufferBuilder {
++row_count_;
}

void append_local_peers_row_v3(const TokenVec& tokens, const String& partitioner,
void append_local_peers_row_v3(const Address& rpc_address,
const TokenVec& tokens, const String& partitioner,
const String& dc, const String& rack,
const String& release_version) {
append_value<Address>(rpc_address);
append_value<String>(rack);
append_value<String>(dc);
append_value<String>(release_version);
@@ -306,8 +312,10 @@ inline Host::Ptr create_host(const Address& address, const TokenVec& tokens,
Host::Ptr host(new Host(address));

DataType::ConstPtr varchar_data_type(new DataType(CASS_VALUE_TYPE_VARCHAR));
DataType::ConstPtr inet_data_type(new DataType(CASS_VALUE_TYPE_INET));

ColumnMetadataVec column_metadata;
column_metadata.push_back(ColumnMetadata("rpc_address", inet_data_type));
column_metadata.push_back(ColumnMetadata("data_center", varchar_data_type));
column_metadata.push_back(ColumnMetadata("rack", varchar_data_type));
column_metadata.push_back(ColumnMetadata("release_version", varchar_data_type));
@@ -318,7 +326,7 @@ inline Host::Ptr create_host(const Address& address, const TokenVec& tokens,
ColumnMetadata("tokens", CollectionType::list(varchar_data_type, true)));

RowResultResponseBuilder builder(column_metadata);
builder.append_local_peers_row_v3(tokens, partitioner, dc, rack, release_version);
builder.append_local_peers_row_v3(address, tokens, partitioner, dc, rack, release_version);

host->set(&builder.finish()->first_row(), true);

10 changes: 5 additions & 5 deletions tests/src/unit/tests/test_token_map.cpp
Original file line number Diff line number Diff line change
@@ -129,12 +129,12 @@ TEST(TokenMapUnitTest, Murmur3MultipleTokensPerHost) {
TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) {
TestTokenMap<Murmur3Partitioner> test_murmur3;

size_t num_dcs = 3;
size_t num_racks = 3;
size_t num_hosts = 4;
size_t num_dcs = 2;
size_t num_racks = 1;
size_t num_hosts = 27;
size_t num_vnodes = 256;
size_t replication_factor = 3;
size_t total_replicas = std::min(num_hosts, replication_factor) * num_dcs;
size_t replication_factor = 54;
size_t total_replicas = replication_factor;
Copy link
Contributor Author

@mpenick mpenick Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should likely be reverted. This is a pathological use case though.


ReplicationMap replication;
MT19937_64 rng;