@@ -2,13 +2,15 @@ package cassandra
22
33import (
44 "fmt"
5+ "github.com/gocql/gocql"
6+ "github.com/mattes/migrate/database"
57 "io"
68 "io/ioutil"
79 nurl "net/url"
8- "github.com/gocql/gocql"
9- "time"
10- "github.com/mattes/migrate/database"
10+ "regexp"
1111 "strconv"
12+ "strings"
13+ "time"
1214)
1315
1416func init () {
@@ -20,8 +22,8 @@ var DefaultMigrationsTable = "schema_migrations"
2022var dbLocked = false
2123
2224var (
23- ErrNilConfig = fmt .Errorf ("no config" )
24- ErrNoKeyspace = fmt .Errorf ("no keyspace provided" )
25+ ErrNilConfig = fmt .Errorf ("no config" )
26+ ErrNoKeyspace = fmt .Errorf ("no keyspace provided" )
2527 ErrDatabaseDirty = fmt .Errorf ("database is dirty" )
2628)
2729
@@ -35,7 +37,7 @@ type Cassandra struct {
3537 isLocked bool
3638
3739 // Open and WithInstance need to guarantee that config is never nil
38- config * Config
40+ config * Config
3941}
4042
4143func (p * Cassandra ) Open (url string ) (database.Driver , error ) {
@@ -59,11 +61,24 @@ func (p *Cassandra) Open(url string) (database.Driver, error) {
5961 MigrationsTable : migrationsTable ,
6062 }
6163
64+ var username , password string
65+
66+ if u .User != nil {
67+ username = u .User .Username ()
68+ password , _ = u .User .Password ()
69+ } else {
70+ username = "cassandra"
71+ password = "cassandra"
72+ }
73+
6274 cluster := gocql .NewCluster (u .Host )
6375 cluster .Keyspace = u .Path [1 :len (u .Path )]
6476 cluster .Consistency = gocql .All
6577 cluster .Timeout = 1 * time .Minute
66-
78+ cluster .Authenticator = gocql.PasswordAuthenticator {
79+ Username : username ,
80+ Password : password ,
81+ }
6782
6883 // Retrieve query string configuration
6984 if len (u .Query ().Get ("consistency" )) > 0 {
@@ -111,7 +126,7 @@ func (p *Cassandra) Close() error {
111126}
112127
113128func (p * Cassandra ) Lock () error {
114- if ( dbLocked ) {
129+ if dbLocked {
115130 return database .ErrLocked
116131 }
117132 dbLocked = true
@@ -130,9 +145,17 @@ func (p *Cassandra) Run(migration io.Reader) error {
130145 }
131146 // run migration
132147 query := string (migr [:])
133- if err := p .session .Query (query ).Exec (); err != nil {
134- // TODO: cast to Cassandra error and get line number
135- return database.Error {OrigErr : err , Err : "migration failed" , Query : migr }
148+ matches := regexp .MustCompile (`(?m:;$)` ).Split (query , - 1 )
149+ for _ , match := range matches {
150+ trimmedMatch := strings .Trim (match , " \t \r \n " )
151+ if len (trimmedMatch ) == 0 {
152+ continue
153+ }
154+
155+ if err := p .session .Query (trimmedMatch ).Exec (); err != nil {
156+ // TODO: cast to Cassandra error and get line number
157+ return database.Error {OrigErr : err , Err : "migration failed" , Query : migr }
158+ }
136159 }
137160
138161 return nil
@@ -153,7 +176,6 @@ func (p *Cassandra) SetVersion(version int, dirty bool) error {
153176 return nil
154177}
155178
156-
157179// Return current keyspace version
158180func (p * Cassandra ) Version () (version int , dirty bool , err error ) {
159181 query := `SELECT version, dirty FROM "` + p .config .MigrationsTable + `" LIMIT 1`
@@ -191,7 +213,6 @@ func (p *Cassandra) Drop() error {
191213 return nil
192214}
193215
194-
195216// Ensure version table exists
196217func (p * Cassandra ) ensureVersionTable () error {
197218 err := p .session .Query (fmt .Sprintf ("CREATE TABLE IF NOT EXISTS %s (version bigint, dirty boolean, PRIMARY KEY(version))" , p .config .MigrationsTable )).Exec ()
@@ -204,7 +225,6 @@ func (p *Cassandra) ensureVersionTable() error {
204225 return nil
205226}
206227
207-
208228// ParseConsistency wraps gocql.ParseConsistency
209229// to return an error instead of a panicking.
210230func parseConsistency (consistencyStr string ) (consistency gocql.Consistency , err error ) {
0 commit comments