Skip to content
This repository has been archived by the owner on Dec 22, 2020. It is now read-only.

Adding MySQL and Mongo slave functionality #37

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ types. An example collection map might be:
:columns:
- id:
:source: _id
:type: TEXT
:type: TEXT (Must be VARCHAR(255) for MySQL)
- author_name:
:source: author.name
:type: TEXT
Expand Down Expand Up @@ -100,6 +100,14 @@ the `title` and `created` attributes, above.
Every defined collection must include a mapping for the `_id`
attribute.

You can also specify [column options](http://sequel.jeremyevans.net/rdoc/files/doc/schema_modification_rdoc.html)
if you use the long-form YAML notation. However, in this case you must
specify both the type and source for each column even if name and source
are identical. Supported options are:
* :default
* :index
* :null

`:meta` contains metadata about this collection/table. It is
required to include at least `:table`, naming the SQL table this
collection will be mapped to. `extra_props` determines the handling of
Expand All @@ -123,6 +131,10 @@ running on default ports on localhost without authentication. You can
point it at different targets using the `--sql` and `--mongo`
command-line parameters.

You can also specify a different adapter than 'postgres' with the `--sql`
parameter, such as 'mysql2'. Currently testing has only been done for mysql2
and postgres, though adapter supported by the Sequel gem should work.

`mosql` will:

1. Create the appropriate SQL tables
Expand Down
2 changes: 1 addition & 1 deletion lib/mosql/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def run
metadata_table = MoSQL::Tailer.create_table(@sql.db, 'mosql_tailers')

@tailer = MoSQL::Tailer.new([@mongo], :existing, metadata_table,
:service => options[:service])
{:service => options[:service]}, @sql.db.adapter_scheme)

@streamer = Streamer.new(:options => @options,
:tailer => @tailer,
Expand Down
45 changes: 31 additions & 14 deletions lib/mosql/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ def to_array(lst)
:source => ent.fetch(:source),
:type => ent.fetch(:type),
:name => (ent.keys - [:source, :type]).first,
:opts => ent.select {|k,v| [:default, :index, :null].include? k }
}
elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String)
array << {
:source => ent.first.first,
:name => ent.first.first,
:type => ent.first.last
:type => ent.first.last,
:opts => {}
}
else
raise SchemaError.new("Invalid ordered hash entry #{ent.inspect}")
Expand Down Expand Up @@ -80,7 +82,8 @@ def create_schema(db, clobber=false)
if col[:source] == '$timestamp'
opts[:default] = Sequel.function(:now)
end
column col[:name], col[:type], opts
opts.merge!(col[:opts])
column col[:name].to_sym, col[:type], opts

if col[:source].to_sym == :_id
primary_key [col[:name].to_sym]
Expand Down Expand Up @@ -224,19 +227,33 @@ def all_columns_for_copy(schema)

def copy_data(db, ns, objs)
schema = find_ns!(ns)
db.synchronize do |pg|
sql = "COPY \"#{schema[:meta][:table]}\" " +
"(#{all_columns_for_copy(schema).map {|c| "\"#{c}\""}.join(",")}) FROM STDIN"
pg.execute(sql)
objs.each do |o|
pg.put_copy_data(transform_to_copy(ns, o, schema) + "\n")
end
pg.put_copy_end
begin
pg.get_result.check
rescue PGError => e
db.send(:raise_error, e)
db.synchronize do |adaptor|
# For Postgres we can use the COPY table (*columns) FROM STDIN syntax
if db.adapter_scheme == :postgres
sql = "COPY \"#{schema[:meta][:table]}\" " +
"(#{all_columns_for_copy(schema).map {|c| "\"#{c}\""}.join(",")}) FROM STDIN"

adaptor.execute(sql)

objs.each do |o|
adaptor.put_copy_data(transform_to_copy(ns, o, schema) + "\n")
end
adaptor.put_copy_end
begin
adaptor.get_result.check
rescue PGError => e
db.send(:raise_error, e)
end

# For all other SQL servers we'll use the standard INSERT INTO table (*columns) VALUES values syntax
else
sql = "INSERT INTO `#{schema[:meta][:table]}`
(#{all_columns_for_copy(schema).map {|c| "\"#{c}\""}.join(",")})
VALUES (#{objs.map { |o| o.join(',') }.join('),(')})"

db.execute_insert(sql)
end

end
end

Expand Down
26 changes: 18 additions & 8 deletions lib/mosql/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,31 @@ def upsert!(table, table_primary_key, item)
begin
table.insert(item)
rescue Sequel::DatabaseError => e
raise e unless self.class.duplicate_key_error?(e)
raise e unless self.class.duplicate_key_error?(e, @db.adapter_scheme)
log.info("RACE during upsert: Upserting #{item} into #{table}: #{e}")
end
elsif rows > 1
log.warn("Huh? Updated #{rows} > 1 rows: upsert(#{table}, #{item})")
end
end

def self.duplicate_key_error?(e)
# c.f. http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
# for the list of error codes.
#
# No thanks to Sequel and pg for making it easy to figure out
# how to get at this error code....
e.wrapped_exception.result.error_field(PG::Result::PG_DIAG_SQLSTATE) == "23505"
def self.duplicate_key_error?(e, adapter_scheme)
if adapter_scheme == :postgres
# c.f. http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
# for the list of error codes.
#
# No thanks to Sequel and pg for making it easy to figure out
# how to get at this error code....
e.wrapped_exception.result.error_field(PG::Result::PG_DIAG_SQLSTATE) == "23505"
elsif [:mysql, :mysql2].include? adapter_scheme
# Using a string comparison of the error message in the same way as Sequel determines MySQL errors
# https://github.com/jeremyevans/sequel/blob/master/lib/sequel/adapters/mysql.rb#L191
/duplicate entry .* for key/.match(e.message.downcase)
else
# TODO this needs to be tracked down for the particular adaptor's duplicate key error,
# but the mysql solution might be a good approximation
/duplicate entry .* for key/.match(e.message.downcase)
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/mosql/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def unsafe_handle_exceptions(ns, obj)
yield
rescue Sequel::DatabaseError => e
wrapped = e.wrapped_exception
if wrapped.result && options[:unsafe]
if @sql.db.adapter_scheme == :postgres && wrapped.result && options[:unsafe]
log.warn("Ignoring row (#{obj.inspect}): #{e}")
else
log.error("Error processing #{obj.inspect} for #{ns}.")
Expand Down
11 changes: 8 additions & 3 deletions lib/mosql/tailer.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
module MoSQL
class Tailer < Mongoriver::AbstractPersistentTailer
def self.create_table(db, tablename)
# MySQL needs primary keys to have defined lengths, so we can't use a TEXT column type
pk_column_type = ([:mysql, :mysql2].include? db.adapter_scheme) ? 'VARCHAR(255)' : 'TEXT'

# Create the table
db.create_table?(tablename) do
column :service, 'TEXT'
column :service, pk_column_type
column :timestamp, 'INTEGER'
primary_key [:service]
end
db[tablename.to_sym]
end

def initialize(backends, type, table, opts)
def initialize(backends, type, table, opts, adapter_scheme)
super(backends, type, opts)
@table = table
@service = opts[:service] || "mosql"
@adapter_scheme = adapter_scheme
end

def read_timestamp
Expand All @@ -29,7 +34,7 @@ def write_timestamp(ts)
begin
@table.insert({:service => @service, :timestamp => ts.seconds})
rescue Sequel::DatabaseError => e
raise unless MoSQL::SQLAdapter.duplicate_key_error?(e)
raise unless MoSQL::SQLAdapter.duplicate_key_error?(e, @adapter_scheme)
end
@did_insert = true
end
Expand Down