Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: ECS compatibility (RFC email fields) #56

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 3.2.0
- Feat: ECS compatibility [#55](https://github.com/logstash-plugins/logstash-input-imap/pull/55)
* added (optional) `headers_target` configuration option
* added (optional) `attachments_target` configuration option
- Fix: plugin should not close `$stdin`, while stoping
- Fix: make sure the 'Date' header is skipped regardless of the `lowercase_headers` setting

## 3.1.0
- Adds an option to recursively search the message parts for attachment and inline attachment filenames. If the save_attachments option is set to true, the content of attachments is included in the `attachments.data` field. The attachment data can then be used by the Elasticsearch Ingest Attachment Processor Plugin.
[#48](https://github.com/logstash-plugins/logstash-input-imap/pull/48)
Expand Down
50 changes: 48 additions & 2 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ Read mails from IMAP server
Periodically scan an IMAP folder (`INBOX` by default) and move any read messages
to the trash.

[id="plugins-{type}s-{plugin}-ecs"]
==== Compatibility with the Elastic Common Schema (ECS)

The plugin includes sensible defaults that change based on <<plugins-{type}s-{plugin}-ecs_compatibility,ECS compatibility mode>>.
When ECS compatibility is disabled, mail headers and attachments are targeted at the root level.
When targeting an ECS version, headers and attachments target `@metadata` sub-fields unless configured otherwise in order
to avoid conflict with ECS fields.
See <<plugins-{type}s-{plugin}-headers_target>>, and <<plugins-{type}s-{plugin}-attachments_target>>.

[id="plugins-{type}s-{plugin}-options"]
==== Imap Input Configuration Options

Expand All @@ -34,12 +43,15 @@ This plugin supports the following configuration options plus the <<plugins-{typ
[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-attachments_target>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-check_interval>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-content_type>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-delete>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-ecs_compatibility>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-expunge>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-fetch_count>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-folder>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-headers_target>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-host>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-lowercase_headers>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|Yes
Expand All @@ -58,6 +70,16 @@ input plugins.

&nbsp;

[id="plugins-{type}s-{plugin}-attachments_target"]
===== `attachments_target`

* Value type is <<string,string>>
* Default value depends on whether <<plugins-{type}s-{plugin}-ecs_compatibility>> is enabled:
** ECS Compatibility disabled: no default value for this setting
** ECS Compatibility enabled: `"[@metadata][input][imap][attachments]"

The name of the field under which mail attachments information will be added, if <<plugins-{type}s-{plugin}-save_attachments>> is set.

[id="plugins-{type}s-{plugin}-check_interval"]
===== `check_interval`

Expand All @@ -72,8 +94,7 @@ input plugins.
* Value type is <<string,string>>
* Default value is `"text/plain"`

For multipart messages, use the first part that has this
content-type as the event message.
For multipart messages, use the first part that has this content-type as the event message.

[id="plugins-{type}s-{plugin}-delete"]
===== `delete`
Expand All @@ -83,6 +104,21 @@ content-type as the event message.



[id="plugins-{type}s-{plugin}-ecs_compatibility"]
===== `ecs_compatibility`

* Value type is <<string,string>>
* Supported values are:
** `disabled`: does not use ECS-compatible field names (for example, `From` header field is added to the event)
** `v1`, `v8`: avoids field names that might conflict with Elastic Common Schema (for example, the `From` header is added as metadata)
* Default value depends on which version of Logstash is running:
** When Logstash provides a `pipeline.ecs_compatibility` setting, its value is used as the default
** Otherwise, the default value is `disabled`.

Controls this plugin's compatibility with the {ecs-ref}[Elastic Common Schema (ECS)].
The value of this setting affects the _default_ value of <<plugins-{type}s-{plugin}-headers_target>> and
<<plugins-{type}s-{plugin}-attachments_target>>.

[id="plugins-{type}s-{plugin}-expunge"]
===== `expunge`

Expand All @@ -107,6 +143,16 @@ content-type as the event message.



[id="plugins-{type}s-{plugin}-headers_target"]
===== `headers_target`

* Value type is <<string,string>>
* Default value depends on whether <<plugins-{type}s-{plugin}-ecs_compatibility>> is enabled:
** ECS Compatibility disabled: no default value for this setting
** ECS Compatibility enabled: `"[@metadata][input][imap][headers]"

The name of the field under which mail headers will be added.

[id="plugins-{type}s-{plugin}-host"]
===== `host`

Expand Down
179 changes: 130 additions & 49 deletions lib/logstash/inputs/imap.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@
require "logstash/namespace"
require "logstash/timestamp"
require "stud/interval"
require "socket" # for Socket.gethostname
require 'fileutils'

require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'

# Read mails from IMAP server
#
# Periodically scan an IMAP folder (`INBOX` by default) and move any read messages
# to the trash.
class LogStash::Inputs::IMAP < LogStash::Inputs::Base

include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)

extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter

config_name "imap"

default :codec, "plain"
Expand All @@ -24,15 +33,23 @@ class LogStash::Inputs::IMAP < LogStash::Inputs::Base

config :folder, :validate => :string, :default => 'INBOX'
config :fetch_count, :validate => :number, :default => 50
config :lowercase_headers, :validate => :boolean, :default => true
config :check_interval, :validate => :number, :default => 300

config :lowercase_headers, :validate => :boolean, :default => true

config :headers_target, :validate => :field_reference # ECS default: [@metadata][input][imap][headers]

config :delete, :validate => :boolean, :default => false
config :expunge, :validate => :boolean, :default => false

config :strip_attachments, :validate => :boolean, :default => false
config :save_attachments, :validate => :boolean, :default => false

# For multipart messages, use the first part that has this
# content-type as the event message.
# Legacy default: [attachments]
# ECS default: [@metadata][input][imap][attachments]
config :attachments_target, :validate => :field_reference

# For multipart messages, use the first part that has this content-type as the event message.
config :content_type, :validate => :string, :default => "text/plain"

# Whether to use IMAP uid to track last processed message
Expand All @@ -41,6 +58,40 @@ class LogStash::Inputs::IMAP < LogStash::Inputs::Base
# Path to file with last run time metadata
config :sincedb_path, :validate => :string, :required => false

# NOTE: when set an extra hash of email information is provided under the target field.
# The hash is based on ECS's email.* fields.
# Due compatibility these fields are only set when target is configured.
config :target, :validate => :field_reference # ECS default: [email], legacy default: nil

def initialize(*params)
super

if original_params.include?('headers_target')
@headers_target = normalize_field_ref(headers_target)
else
@headers_target = ecs_compatibility != :disabled ? '[@metadata][input][imap][headers]' : ''
end

if original_params.include?('attachments_target')
@attachments_target = normalize_field_ref(attachments_target)
else
@attachments_target = ecs_compatibility != :disabled ? '[@metadata][input][imap][attachments]' : '[attachments]'
end

if original_params.include?('target')
@target = normalize_field_ref(target)
else
@target = '[email]' if ecs_compatibility != :disabled
end
end

def normalize_field_ref(target)
return nil if target.nil? || target.empty?
# so we can later event.set("#{target}[#{name}]", ...)
target.match?(/\A[^\[\]]+\z/) ? "[#{target}]" : target
end
private :normalize_field_ref

def register
require "net/imap" # in stdlib
require "mail" # gem 'mail'
Expand All @@ -63,14 +114,16 @@ def register
# Ensure that the filepath exists before writing, since it's deeply nested.
FileUtils::mkdir_p datapath
@sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest("#{@user}_#{@host}_#{@port}_#{@folder}"))
@logger.debug? && @logger.debug("Generated sincedb path", sincedb_path: @sincedb_path)
end
if File.directory?(@sincedb_path)
raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
end
@logger.info("Using \"sincedb_path\": \"#{@sincedb_path}\"")

if File.exist?(@sincedb_path)
if File.directory?(@sincedb_path)
raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
end
@logger.debug? && @logger.debug("Found existing sincedb path", sincedb_path: @sincedb_path)
@uid_last_value = File.read(@sincedb_path).to_i
@logger.info("Loading \"uid_last_value\": \"#{@uid_last_value}\"")
@logger.debug? && @logger.debug("Loaded from sincedb", uid_last_value: @uid_last_value)
end

@content_type_re = Regexp.new("^" + @content_type)
Expand Down Expand Up @@ -136,7 +189,6 @@ def check_mail(queue)
rescue => e
@logger.error("Encountered error #{e.class}", :message => e.message, :backtrace => e.backtrace)
# Do not raise error, check_mail will be invoked in the next run time

ensure
# Close the connection (and ignore errors)
imap.close rescue nil
Expand All @@ -145,12 +197,12 @@ def check_mail(queue)
# Always save @uid_last_value so when tracking is switched from
# "NOT SEEN" to "UID" we will continue from first unprocessed message
if @uid_last_value
@logger.info("Saving \"uid_last_value\": \"#{@uid_last_value}\"")
@logger.debug? && @logger.debug("Saving to sincedb", uid_last_value: @uid_last_value)
File.write(@sincedb_path, @uid_last_value)
end
end

def parse_attachments(mail)
def legacy_parse_attachments(mail)
attachments = []
mail.attachments.each do |attachment|
if @save_attachments
Expand All @@ -164,7 +216,8 @@ def parse_attachments(mail)

def parse_mail(mail)
# Add a debug message so we can track what message might cause an error later
@logger.debug? && @logger.debug("Working with message_id", :message_id => mail.message_id)
@logger.debug? && @logger.debug("Processing mail", message_id: mail.message_id)

# TODO(sissel): What should a multipart message look like as an event?
# For now, just take the plain-text part and set it as the message.
if mail.parts.count == 0
Expand All @@ -174,54 +227,83 @@ def parse_mail(mail)
# Multipart message; use the first text/plain part we find
part = mail.parts.find { |p| p.content_type.match @content_type_re } || mail.parts.first
message = part.decoded

# Parse attachments
attachments = parse_attachments(mail)
end

@codec.decode(message) do |event|
# Use the 'Date' field as the timestamp
event.timestamp = LogStash::Timestamp.new(mail.date.to_time)

# Add fields: Add message.header_fields { |h| h.name=> h.value }
mail.header_fields.each do |header|
# 'header.name' can sometimes be a Mail::Multibyte::Chars, get it in String form
name = @lowercase_headers ? header.name.to_s.downcase : header.name.to_s
# Call .decoded on the header in case it's in encoded-word form.
# Details at:
# https://github.com/mikel/mail/blob/master/README.md#encodings
# http://tools.ietf.org/html/rfc2047#section-2
value = transcode_to_utf8(header.decoded.to_s)

# Assume we already processed the 'date' above.
next if name == "Date"

case (field = event.get(name))
when String
# promote string to array if a header appears multiple times
# (like 'received')
event.set(name, [field, value])
when Array
field << value
event.set(name, field)
when nil
event.set(name, value)
end
end
event.timestamp = LogStash::Timestamp.new(mail.date.to_time) if mail.date

set_target_fields(event, mail) if @target

process_headers(mail, event) if @headers_target

# Add attachments
if attachments && attachments.length > 0
event.set('attachments', attachments)
if @attachments_target && mail.has_attachments?
event.set(@attachments_target, legacy_parse_attachments(mail))
end

decorate(event)
event
end
end

def set_target_fields(event, mail)
event.set("#{@target}[direction]", 'inbound') # we're reading mails from IMAP
event.set("#{@target}[subject]", mail.subject)
event.set("#{@target}[from]", mail.from) # Array<String>
event.set("#{@target}[to]", mail.to) if mail.to
event.set("#{@target}[cc]", mail.cc) if mail.cc
event.set("#{@target}[bcc]", mail.bcc) if mail.bcc
event.set("#{@target}[content_type]", mail.mime_type) if mail.mime_type
event.set("#{@target}[message_id]", mail.message_id) if mail.has_message_id?
event.set("#{@target}[reply_to]", mail.reply_to) if mail.reply_to
if mail.has_attachments?
attachments = mail.attachments.map do |attachment|
{
"file" => {
'name' => attachment.filename,
'mime_type' => attachment.mime_type,
'size' => attachment.body.to_s.size
}
}
end
event.set("#{@target}[attachments]", attachments)
end
end

def process_headers(mail, event)
# Add fields: Add message.header_fields { |h| h.name=> h.value }
mail.header_fields.each do |header|
# 'header.name' can sometimes be a Mail::Multibyte::Chars, get it in String form
name = header.name.to_s

# assume we already processed the 'date' into event.timestamp
next if name == "Date"

name = name.downcase if @lowercase_headers

# Call .decoded on the header in case it's in encoded-word form.
# Details at:
# https://github.com/mikel/mail/blob/master/README.md#encodings
# http://tools.ietf.org/html/rfc2047#section-2
value = transcode_to_utf8(header.decoded)

targeted_name = "#{@headers_target}[#{name}]"
case (field = event.get(targeted_name))
when String
# promote string to array if a header appears multiple times (like 'received')
event.set(targeted_name, [field, value])
when Array
field << value
event.set(targeted_name, field)
when nil
event.set(targeted_name, value)
end
end
end

def stop
Stud.stop!(@run_thread)
$stdin.close
end

private
Expand All @@ -230,8 +312,7 @@ def stop
# the mail gem will set the correct encoding on header strings decoding
# and we want to transcode it to utf8
def transcode_to_utf8(s)
unless s.nil?
s.encode(Encoding::UTF_8, :invalid => :replace, :undef => :replace)
end
return nil if s.nil?
s.encode(Encoding::UTF_8, :invalid => :replace, :undef => :replace)
end
end
Loading