Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions lib/fluent/plugin/in_mysql_replicator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@ def initialize
config_param :encoding, :string, :default => 'utf8'
config_param :query, :string
config_param :prepared_query, :string, :default => nil
config_param :primary_key, :string, :default => 'id'
config_param :primary_key, :default => 'id' do |val|
param = val.is_a?(String) ? JSON.load(val) : val
if param.is_a? String
[param]
elsif param.is_a? Array
param
else
raise Fluent::ConfigError, "mysql_replicator: 'primary_key' param must be either of String or Array. #{val}, #{param}"
end
end
config_param :interval, :string, :default => '1m'
config_param :enable_delete, :bool, :default => true
config_param :tag, :string, :default => nil
Expand Down Expand Up @@ -65,7 +74,11 @@ def poll
end
end
query(@query).each do |row|
current_ids << row[@primary_key]
# @primary_key is an array even though the primary key is single column
id = @primary_key.map do |col|
row[col]
end
current_ids << id
current_hash = Digest::SHA1.hexdigest(row.flatten.join)
row.each {|k, v| row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)}
row.select {|k, v| v.to_s.strip.match(/^SELECT/i) }.each do |k, v|
Expand All @@ -75,18 +88,18 @@ def poll
row[k] << nest_row
end
end
if row[@primary_key].nil?
$log.error "mysql_replicator: missing primary_key. :tag=>#{tag} :primary_key=>#{primary_key}"
if id.select { |idval| idval.nil? }.length > 0
$log.error "mysql_replicator: missing primary_key. :tag=>#{tag} :primary_key=>#{primary_key}, #{id}"
break
end
if !table_hash.include?(row[@primary_key])
if !table_hash.include?(id)
tag = format_tag(@tag, {:event => :insert})
emit_record(tag, row)
elsif table_hash[row[@primary_key]] != current_hash
elsif table_hash[row[id]] != current_hash
tag = format_tag(@tag, {:event => :update})
emit_record(tag, row)
end
table_hash[row[@primary_key]] = current_hash
table_hash[id] = current_hash
rows_count += 1
end
ids = current_ids
Expand Down Expand Up @@ -115,7 +128,7 @@ def hash_delete_by_list (hash, deleted_keys)
end

def format_tag(tag, param)
pattern = {'${event}' => param[:event].to_s, '${primary_key}' => @primary_key}
pattern = {'${event}' => param[:event].to_s, '${primary_key}' => @primary_key.join(",")} #TODO
tag.gsub(/(\${[a-z_]+})/) do
$log.warn "mysql_replicator: missing placeholder. :tag=>#{tag} :placeholder=>#{$1}" unless pattern.include?($1)
pattern[$1]
Expand Down
13 changes: 9 additions & 4 deletions lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,27 @@ def shutdown
super
end

def id_val(record, id_key)
id_key.map { |col| record[col] }.join(",")
end

def write(chunk)
bulk_message = []

chunk.msgpack_each do |tag, time, record|
tag_parts = tag.match(@tag_format)
target_index = tag_parts['index_name']
target_type = tag_parts['type_name']
id_key = tag_parts['primary_key']
id_key = tag_parts['primary_key'].split(",")
id_val = id_val(record, id_key)

if tag_parts['event'] == 'delete'
meta = { "delete" => {"_index" => target_index, "_type" => target_type, "_id" => record[id_key]} }
meta = { "delete" => {"_index" => target_index, "_type" => target_type, "_id" => id_val} }
bulk_message << Yajl::Encoder.encode(meta)
else
meta = { "index" => {"_index" => target_index, "_type" => target_type} }
if id_key && record[id_key]
meta['index']['_id'] = record[id_key]
if id_key && id_val
meta['index']['_id'] = id_val
end
bulk_message << Yajl::Encoder.encode(meta)
bulk_message << Yajl::Encoder.encode(record)
Expand Down