diff --git a/README.md b/README.md index 11f7310..e42b0ae 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ types. An example collection map might be: :columns: - id: :source: _id - :type: TEXT + :type: TEXT (Must be VARCHAR(255) for MySQL) - author_name: :source: author.name :type: TEXT @@ -100,6 +100,14 @@ the `title` and `created` attributes, above. Every defined collection must include a mapping for the `_id` attribute. +You can also specify [column options](http://sequel.jeremyevans.net/rdoc/files/doc/schema_modification_rdoc.html) +if you use the long-form YAML notation. However, in this case you must +specify both the type and source for each column even if name and source +are identical. Supported options are: + * :default + * :index + * :null + `:meta` contains metadata about this collection/table. It is required to include at least `:table`, naming the SQL table this collection will be mapped to. `extra_props` determines the handling of @@ -123,6 +131,10 @@ running on default ports on localhost without authentication. You can point it at different targets using the `--sql` and `--mongo` command-line parameters. +You can also specify a different adapter than 'postgres' with the `--sql` +parameter, such as 'mysql2'. Currently testing has only been done for mysql2 +and postgres, though adapter supported by the Sequel gem should work. + `mosql` will: 1. Create the appropriate SQL tables diff --git a/lib/mosql/cli.rb b/lib/mosql/cli.rb index 7df1cb4..d22ddfd 100644 --- a/lib/mosql/cli.rb +++ b/lib/mosql/cli.rb @@ -147,7 +147,7 @@ def run metadata_table = MoSQL::Tailer.create_table(@sql.db, 'mosql_tailers') @tailer = MoSQL::Tailer.new([@mongo], :existing, metadata_table, - :service => options[:service]) + {:service => options[:service]}, @sql.db.adapter_scheme) @streamer = Streamer.new(:options => @options, :tailer => @tailer, diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index f57930a..7f299a6 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -13,12 +13,14 @@ def to_array(lst) :source => ent.fetch(:source), :type => ent.fetch(:type), :name => (ent.keys - [:source, :type]).first, + :opts => ent.select {|k,v| [:default, :index, :null].include? k } } elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String) array << { :source => ent.first.first, :name => ent.first.first, - :type => ent.first.last + :type => ent.first.last, + :opts => {} } else raise SchemaError.new("Invalid ordered hash entry #{ent.inspect}") @@ -80,7 +82,8 @@ def create_schema(db, clobber=false) if col[:source] == '$timestamp' opts[:default] = Sequel.function(:now) end - column col[:name], col[:type], opts + opts.merge!(col[:opts]) + column col[:name].to_sym, col[:type], opts if col[:source].to_sym == :_id primary_key [col[:name].to_sym] @@ -224,19 +227,33 @@ def all_columns_for_copy(schema) def copy_data(db, ns, objs) schema = find_ns!(ns) - db.synchronize do |pg| - sql = "COPY \"#{schema[:meta][:table]}\" " + - "(#{all_columns_for_copy(schema).map {|c| "\"#{c}\""}.join(",")}) FROM STDIN" - pg.execute(sql) - objs.each do |o| - pg.put_copy_data(transform_to_copy(ns, o, schema) + "\n") - end - pg.put_copy_end - begin - pg.get_result.check - rescue PGError => e - db.send(:raise_error, e) + db.synchronize do |adaptor| + # For Postgres we can use the COPY table (*columns) FROM STDIN syntax + if db.adapter_scheme == :postgres + sql = "COPY \"#{schema[:meta][:table]}\" " + + "(#{all_columns_for_copy(schema).map {|c| "\"#{c}\""}.join(",")}) FROM STDIN" + + adaptor.execute(sql) + + objs.each do |o| + adaptor.put_copy_data(transform_to_copy(ns, o, schema) + "\n") + end + adaptor.put_copy_end + begin + adaptor.get_result.check + rescue PGError => e + db.send(:raise_error, e) + end + + # For all other SQL servers we'll use the standard INSERT INTO table (*columns) VALUES values syntax + else + sql = "INSERT INTO `#{schema[:meta][:table]}` + (#{all_columns_for_copy(schema).map {|c| "\"#{c}\""}.join(",")}) + VALUES (#{objs.map { |o| o.join(',') }.join('),(')})" + + db.execute_insert(sql) end + end end diff --git a/lib/mosql/sql.rb b/lib/mosql/sql.rb index 030e299..ec83c91 100644 --- a/lib/mosql/sql.rb +++ b/lib/mosql/sql.rb @@ -52,7 +52,7 @@ def upsert!(table, table_primary_key, item) begin table.insert(item) rescue Sequel::DatabaseError => e - raise e unless self.class.duplicate_key_error?(e) + raise e unless self.class.duplicate_key_error?(e, @db.adapter_scheme) log.info("RACE during upsert: Upserting #{item} into #{table}: #{e}") end elsif rows > 1 @@ -60,13 +60,23 @@ def upsert!(table, table_primary_key, item) end end - def self.duplicate_key_error?(e) - # c.f. http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html - # for the list of error codes. - # - # No thanks to Sequel and pg for making it easy to figure out - # how to get at this error code.... - e.wrapped_exception.result.error_field(PG::Result::PG_DIAG_SQLSTATE) == "23505" + def self.duplicate_key_error?(e, adapter_scheme) + if adapter_scheme == :postgres + # c.f. http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html + # for the list of error codes. + # + # No thanks to Sequel and pg for making it easy to figure out + # how to get at this error code.... + e.wrapped_exception.result.error_field(PG::Result::PG_DIAG_SQLSTATE) == "23505" + elsif [:mysql, :mysql2].include? adapter_scheme + # Using a string comparison of the error message in the same way as Sequel determines MySQL errors + # https://github.com/jeremyevans/sequel/blob/master/lib/sequel/adapters/mysql.rb#L191 + /duplicate entry .* for key/.match(e.message.downcase) + else + # TODO this needs to be tracked down for the particular adaptor's duplicate key error, + # but the mysql solution might be a good approximation + /duplicate entry .* for key/.match(e.message.downcase) + end end end end diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index 7400f68..6e8a3ec 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -39,7 +39,7 @@ def unsafe_handle_exceptions(ns, obj) yield rescue Sequel::DatabaseError => e wrapped = e.wrapped_exception - if wrapped.result && options[:unsafe] + if @sql.db.adapter_scheme == :postgres && wrapped.result && options[:unsafe] log.warn("Ignoring row (#{obj.inspect}): #{e}") else log.error("Error processing #{obj.inspect} for #{ns}.") diff --git a/lib/mosql/tailer.rb b/lib/mosql/tailer.rb index 9abca4f..5cf8617 100644 --- a/lib/mosql/tailer.rb +++ b/lib/mosql/tailer.rb @@ -1,18 +1,23 @@ module MoSQL class Tailer < Mongoriver::AbstractPersistentTailer def self.create_table(db, tablename) + # MySQL needs primary keys to have defined lengths, so we can't use a TEXT column type + pk_column_type = ([:mysql, :mysql2].include? db.adapter_scheme) ? 'VARCHAR(255)' : 'TEXT' + + # Create the table db.create_table?(tablename) do - column :service, 'TEXT' + column :service, pk_column_type column :timestamp, 'INTEGER' primary_key [:service] end db[tablename.to_sym] end - def initialize(backends, type, table, opts) + def initialize(backends, type, table, opts, adapter_scheme) super(backends, type, opts) @table = table @service = opts[:service] || "mosql" + @adapter_scheme = adapter_scheme end def read_timestamp @@ -29,7 +34,7 @@ def write_timestamp(ts) begin @table.insert({:service => @service, :timestamp => ts.seconds}) rescue Sequel::DatabaseError => e - raise unless MoSQL::SQLAdapter.duplicate_key_error?(e) + raise unless MoSQL::SQLAdapter.duplicate_key_error?(e, @adapter_scheme) end @did_insert = true end