Skip to content

Updated cassandra driver to 4.x #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.smartcat</groupId>
<artifactId>cassandra-migration-tool</artifactId>
<version>3.1.0.1-SNAPSHOT</version>
<version>4.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>cassandra-migration-tool</name>
Expand Down Expand Up @@ -56,8 +56,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<source.level>1.8</source.level>
<code.level>1.8</code.level>
<version.cassandra-driver>3.1.0</version.cassandra-driver>
<version.cassandra-unit>3.0.0.1</version.cassandra-unit>
<version.cassandra-driver>4.0.1</version.cassandra-driver>
<version.cassandra-unit>4.3.1.0</version.cassandra-unit>
<version.guava>18.0</version.guava>
<version.slf4j>1.7.7</version.slf4j>
<version.junit>4.12</version.junit>
Expand All @@ -76,8 +76,13 @@

<dependencies>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>${version.cassandra-driver}</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>${version.cassandra-driver}</version>
</dependency>
<dependency>
Expand Down
32 changes: 18 additions & 14 deletions src/main/java/io/smartcat/migration/CassandraVersioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;

/**
* Class responsible for version management.
Expand All @@ -32,13 +32,13 @@ public class CassandraVersioner {
+ String.format("PRIMARY KEY (%s, %s)", TYPE, VERSION)
+ String.format(") WITH CLUSTERING ORDER BY (%s DESC)", VERSION) + " AND COMMENT='Schema version';";

private final Session session;
private final CqlSession session;

/**
* Create Cassandra versioner for active session.
* @param session Active Cassandra session
*/
public CassandraVersioner(final Session session) {
public CassandraVersioner(final CqlSession session) {
this.session = session;

createSchemaVersion();
Expand All @@ -58,8 +58,9 @@ private void createSchemaVersion() {
* @return Database version for given type
*/
public int getCurrentVersion(final MigrationType type) {
final Statement select = QueryBuilder.select().all().from(SCHEMA_VERSION_CF)
.where(QueryBuilder.eq(TYPE, type.name())).limit(1).setConsistencyLevel(ConsistencyLevel.ALL);
final SimpleStatement select = QueryBuilder.selectFrom(SCHEMA_VERSION_CF).all()
.where(column(TYPE).isEqualTo(literal(type.name()))).limit(1)
.builder().setConsistencyLevel(DefaultConsistencyLevel.ALL).build();
final ResultSet result = session.execute(select);

final Row row = result.one();
Expand All @@ -73,10 +74,13 @@ public int getCurrentVersion(final MigrationType type) {
* @return Success of version update
*/
public boolean updateVersion(final Migration migration) {
final Statement insert = QueryBuilder.insertInto(SCHEMA_VERSION_CF).value(TYPE, migration.getType().name())
.value(VERSION, migration.getVersion()).value(TIMESTAMP, System.currentTimeMillis())
.value(DESCRIPTION, migration.getDescription()).setConsistencyLevel(ConsistencyLevel.ALL);

final SimpleStatement insert = QueryBuilder.insertInto(SCHEMA_VERSION_CF)
.value(TYPE, literal(migration.getType().name()))
.value(VERSION, literal(migration.getVersion()))
.value(TIMESTAMP, literal(System.currentTimeMillis()))
.value(DESCRIPTION, literal(migration.getDescription()))
.builder().setConsistencyLevel(DefaultConsistencyLevel.ALL)
.build();
try {
session.execute(insert);
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/smartcat/migration/DataMigration.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public abstract class DataMigration extends Migration {
* Creates new data migration.
* @param version Version of this data migration
*/
public DataMigration(int version) {
protected DataMigration(int version) {
super(MigrationType.DATA, version);
}
}
4 changes: 2 additions & 2 deletions src/main/java/io/smartcat/migration/Executor.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.smartcat.migration;

import com.datastax.driver.core.Session;
import com.datastax.oss.driver.api.core.CqlSession;

/**
* Executor is a class which executes all the migration for given session.
Expand All @@ -17,7 +17,7 @@ private Executor() {
* @param resources Migration resources collection
* @return Return success
*/
public static boolean migrate(final Session session, final MigrationResources resources) {
public static boolean migrate(final CqlSession session, final MigrationResources resources) {
return MigrationEngine.withSession(session).migrate(resources);
}

Expand Down
28 changes: 12 additions & 16 deletions src/main/java/io/smartcat/migration/Migration.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.smartcat.migration;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Statement;

import io.smartcat.migration.exceptions.MigrationException;
import io.smartcat.migration.exceptions.SchemaAgreementException;
Expand All @@ -18,14 +18,14 @@ public abstract class Migration {
/**
* Active Cassandra session.
*/
protected Session session;
protected CqlSession session;

/**
* Create new migration with provided type and version.
* @param type Migration type (SCHEMA or DATA)
* @param version Migration version
*/
public Migration(final MigrationType type, final int version) {
protected Migration(final MigrationType type, final int version) {
this.type = type;
this.version = version;
}
Expand All @@ -34,7 +34,7 @@ public Migration(final MigrationType type, final int version) {
* Enables session injection into migration class.
* @param session Session object
*/
public void setSession(final Session session) {
public void setSession(final CqlSession session) {
this.session = session;
}

Expand Down Expand Up @@ -78,7 +78,7 @@ protected void executeWithSchemaAgreement(Statement statement)
if (checkSchemaAgreement(result)) {
return;
}
if (checkClusterSchemaAgreement()) {
if (checkSchemaAgreement()) {
return;
}

Expand All @@ -90,11 +90,11 @@ protected void executeWithSchemaAgreement(Statement statement)
* Whether the cluster had reached schema agreement after the execution of this query.
*
* After a successful schema-altering query (ex: creating a table), the driver will check if the cluster's nodes
* agree on the new schema version. If not, it will keep retrying for a given delay (configurable via
* {@link com.datastax.driver.core.Cluster.Builder#withMaxSchemaAgreementWaitSeconds(int)}).
* agree on the new schema version.
*
* If this method returns {@code false}, clients can call
* {@link com.datastax.driver.core.Metadata#checkSchemaAgreement()} later to perform the check manually.
* {@link com.datastax.oss.driver.internal.core.cql.DefaultExecutionInfo#isSchemaInAgreement()}
* later to perform the check manually.
*
* Note that the schema agreement check is only performed for schema-altering queries For other query types, this
* method will always return {@code true}.
Expand All @@ -109,16 +109,12 @@ protected boolean checkSchemaAgreement(ResultSet resultSet) {
/**
* Checks whether hosts that are currently up agree on the schema definition.
*
* This method performs a one-time check only, without any form of retry; therefore
* {@link com.datastax.driver.core.Cluster.Builder#withMaxSchemaAgreementWaitSeconds(int)}
* does not apply in this case.
*
* @return {@code true} if all hosts agree on the schema; {@code false} if
* they don't agree, or if the check could not be performed
* (for example, if the control connection is down).
*/
protected boolean checkClusterSchemaAgreement() {
return this.session.getCluster().getMetadata().checkSchemaAgreement();
protected boolean checkSchemaAgreement() {
return this.session.checkSchemaAgreement();
}

@Override
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/smartcat/migration/MigrationEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.Session;
import com.datastax.oss.driver.api.core.CqlSession;

/**
* Migration engine wraps Migrator and provides DSL like API.
Expand All @@ -22,22 +22,22 @@ private MigrationEngine() {
* @param session Datastax driver session object
* @return migrator instance with versioner and session which can migrate resources
*/
public static Migrator withSession(final Session session) {
public static Migrator withSession(final CqlSession session) {
return new Migrator(session);
}

/**
* Migrator handles migrations and errors.
*/
public static class Migrator {
private final Session session;
private final CqlSession session;
private final CassandraVersioner versioner;

/**
* Create new Migrator with active Cassandra session.
* @param session Active Cassandra session
*/
public Migrator(final Session session) {
public Migrator(final CqlSession session) {
this.session = session;
this.versioner = new CassandraVersioner(session);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/smartcat/migration/SchemaMigration.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public abstract class SchemaMigration extends Migration {
* Create new schema migration with provided version.
* @param version Version of this schema migration
*/
public SchemaMigration(int version) {
protected SchemaMigration(int version) {
super(MigrationType.SCHEMA, version);
}
}
25 changes: 11 additions & 14 deletions src/test/java/io/smartcat/migration/BaseTest.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
package io.smartcat.migration;

import com.datastax.driver.core.*;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

public class BaseTest {

public void truncateTables(final String keyspace, final Session session) {
for (final String table : tables(keyspace, session)) {
public void truncateTables(final String keyspace, final CqlSession session) {
for (final CqlIdentifier table : tables(keyspace, session)) {
session.execute(String.format("TRUNCATE %s.%s;", keyspace, table));
}
}

private List<String> tables(final String keyspace, final Session session) {
final List<String> tables = new ArrayList<>();
final Cluster cluster = session.getCluster();
final Metadata meta = cluster.getMetadata();
final KeyspaceMetadata keyspaceMeta = meta.getKeyspace(keyspace);
for (final TableMetadata tableMeta : keyspaceMeta.getTables()) {
tables.add(tableMeta.getName());
}
private Set<CqlIdentifier> tables(final String keyspace, final CqlSession session) {
final Metadata meta = session.getMetadata();
final KeyspaceMetadata keyspaceMeta = meta.getKeyspace(keyspace).get();

return tables;
return keyspaceMeta.getTables().keySet();
}

}
28 changes: 15 additions & 13 deletions src/test/java/io/smartcat/migration/CassandraMetadataAnalyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,34 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.tryFind;

import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.google.common.base.Optional;

public class CassandraMetadataAnalyzer {

private Session session;
private CqlSession session;

public CassandraMetadataAnalyzer(Session session) {
public CassandraMetadataAnalyzer(CqlSession session) {
checkNotNull(session, "Session cannot be null");
checkNotNull(session.getLoggedKeyspace(), "Session must be logged into a keyspace");
checkNotNull(session.getKeyspace(), "Session must be logged into a keyspace");
this.session = session;
}

public boolean columnExistInTable(String columnName, String tableName) {
TableMetadata table = getTableMetadata(this.session, tableName);
Optional<ColumnMetadata> column = tryFind(table.getColumns(), new ColumnNameMatcher(columnName));
Optional<ColumnMetadata> column = tryFind(table.getColumns().values(), new ColumnNameMatcher(columnName));
return column.isPresent();
}

private static TableMetadata getTableMetadata(Session session, String tableName) {
Metadata metadata = session.getCluster().getMetadata();
KeyspaceMetadata keyspaceMetadata = metadata.getKeyspace(session.getLoggedKeyspace());
return keyspaceMetadata.getTable(tableName);
private static TableMetadata getTableMetadata(CqlSession session, String tableName) {
Metadata metadata = session.getMetadata();
CqlIdentifier keyspace = session.getKeyspace().get();
KeyspaceMetadata keyspaceMetadata = metadata.getKeyspace(keyspace).get();
return keyspaceMetadata.getTable(tableName).get();
}
}
21 changes: 11 additions & 10 deletions src/test/java/io/smartcat/migration/CassandraVersionerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,33 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;


import io.smartcat.migration.migrations.schema.AddBookGenreFieldMigration;

public class CassandraVersionerTest {
private CassandraVersioner versioner;
private Session session;
private CqlSession session;
private ResultSet versionResultSet;

@Before
public void setUp() throws Exception {
session = mock(Session.class);
public void setUp() {
session = mock(CqlSession.class);
versioner = new CassandraVersioner(session);
versionResultSet = mock(ResultSet.class);
}

@Test
public void whenSchemaVersionTableIsEmptyThenCurrentVersionShouldBe0() throws Exception {
public void whenSchemaVersionTableIsEmptyThenCurrentVersionShouldBe0() {
expectRetrieveEmptyCurrentVersion();

int currentVersion = versioner.getCurrentVersion(SCHEMA);
Expand All @@ -40,7 +41,7 @@ public void whenSchemaVersionTableIsEmptyThenCurrentVersionShouldBe0() throws Ex
}

@Test
public void whenSchemaVersionTableIsNotEmptyThenCurrentVersionShouldBeRetrievedFromTheTable() throws Exception {
public void whenSchemaVersionTableIsNotEmptyThenCurrentVersionShouldBeRetrievedFromTheTable() {
int expectedVersion = 1;

expectRetrieveCurrentVersion(expectedVersion);
Expand All @@ -51,7 +52,7 @@ public void whenSchemaVersionTableIsNotEmptyThenCurrentVersionShouldBeRetrievedF
}

@Test
public void updateVersionSucess() throws Exception {
public void updateVersionSuccess() {
versioner.updateVersion(new AddBookGenreFieldMigration(1));
}

Expand Down
Loading