diff --git a/lib/fluent/plugin/in_mysql_replicator.rb b/lib/fluent/plugin/in_mysql_replicator.rb index 8f4a4c8..8670621 100644 --- a/lib/fluent/plugin/in_mysql_replicator.rb +++ b/lib/fluent/plugin/in_mysql_replicator.rb @@ -16,7 +16,16 @@ def initialize config_param :encoding, :string, :default => 'utf8' config_param :query, :string config_param :prepared_query, :string, :default => nil - config_param :primary_key, :string, :default => 'id' + config_param :primary_key, :default => 'id' do |val| + param = val.is_a?(String) ? JSON.load(val) : val + if param.is_a? String + [param] + elsif param.is_a? Array + param + else + raise Fluent::ConfigError, "mysql_replicator: 'primary_key' param must be either of String or Array. #{val}, #{param}" + end + end config_param :interval, :string, :default => '1m' config_param :enable_delete, :bool, :default => true config_param :tag, :string, :default => nil @@ -65,7 +74,11 @@ def poll end end query(@query).each do |row| - current_ids << row[@primary_key] + # @primary_key is an array even though the primary key is single column + id = @primary_key.map do |col| + row[col] + end + current_ids << id current_hash = Digest::SHA1.hexdigest(row.flatten.join) row.each {|k, v| row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)} row.select {|k, v| v.to_s.strip.match(/^SELECT/i) }.each do |k, v| @@ -75,18 +88,18 @@ def poll row[k] << nest_row end end - if row[@primary_key].nil? - $log.error "mysql_replicator: missing primary_key. :tag=>#{tag} :primary_key=>#{primary_key}" + if id.select { |idval| idval.nil? }.length > 0 + $log.error "mysql_replicator: missing primary_key. :tag=>#{tag} :primary_key=>#{primary_key}, #{id}" break end - if !table_hash.include?(row[@primary_key]) + if !table_hash.include?(id) tag = format_tag(@tag, {:event => :insert}) emit_record(tag, row) - elsif table_hash[row[@primary_key]] != current_hash + elsif table_hash[row[id]] != current_hash tag = format_tag(@tag, {:event => :update}) emit_record(tag, row) end - table_hash[row[@primary_key]] = current_hash + table_hash[id] = current_hash rows_count += 1 end ids = current_ids @@ -115,7 +128,7 @@ def hash_delete_by_list (hash, deleted_keys) end def format_tag(tag, param) - pattern = {'${event}' => param[:event].to_s, '${primary_key}' => @primary_key} + pattern = {'${event}' => param[:event].to_s, '${primary_key}' => @primary_key.join(",")} #TODO tag.gsub(/(\${[a-z_]+})/) do $log.warn "mysql_replicator: missing placeholder. :tag=>#{tag} :placeholder=>#{$1}" unless pattern.include?($1) pattern[$1] diff --git a/lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb b/lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb index db62240..64df455 100644 --- a/lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb +++ b/lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb @@ -36,6 +36,10 @@ def shutdown super end + def id_val(record, id_key) + id_key.map { |col| record[col] }.join(",") + end + def write(chunk) bulk_message = [] @@ -43,15 +47,16 @@ def write(chunk) tag_parts = tag.match(@tag_format) target_index = tag_parts['index_name'] target_type = tag_parts['type_name'] - id_key = tag_parts['primary_key'] + id_key = tag_parts['primary_key'].split(",") + id_val = id_val(record, id_key) if tag_parts['event'] == 'delete' - meta = { "delete" => {"_index" => target_index, "_type" => target_type, "_id" => record[id_key]} } + meta = { "delete" => {"_index" => target_index, "_type" => target_type, "_id" => id_val} } bulk_message << Yajl::Encoder.encode(meta) else meta = { "index" => {"_index" => target_index, "_type" => target_type} } - if id_key && record[id_key] - meta['index']['_id'] = record[id_key] + if id_key && id_val + meta['index']['_id'] = id_val end bulk_message << Yajl::Encoder.encode(meta) bulk_message << Yajl::Encoder.encode(record)