Skip to content

Latest commit

 

History

History
74 lines (55 loc) · 2.49 KB

Consumer-and-topic-mappers.md

File metadata and controls

74 lines (55 loc) · 2.49 KB

Consumer mappers

Karafka has a default strategy for consumer ids. Each consumer id is a combination of the group name taken from the routing and the client_id. This is a really good convention for new applications and systems, however if you migrate from other tools, you may want to preserve your naming convention that is different. To do so, you can implement a consumer mapper that will follow your conventions.

Mapper needs to implement following method:

  • #call - accepts raw consumer group name, should return remapped id.

For example, if you want to skip the client_id all you need to do, is to create a mapper like the one below:

module MyCustomConsumerMapper
  # @param raw_consumer_group_name [String, Symbol] raw consumer group name
  # @return [String] remapped final consumer group name
  def call(raw_consumer_group_name)
    raw_consumer_group_name
  end
end

In order to use it, assign it as your default consumer_mapper:

class App < Karafka::App
  setup do |config|
    config.consumer_mapper = MyCustomConsumerMapper.new
    # Other config options
  end
end

Topic mappers

Some Kafka cloud providers require topics to be namespaced with a username. This approach is understandable, but at the same time, makes your applications less provider agnostic. To target that issue, you can create your own topic mapper that will sanitize incoming/outgoing topic names, so your logic won't be bound to those specific versions of topic names.

Mapper needs to implement two following methods:

  • #incoming - accepts an incoming "namespace dirty" version of topic. Needs to return sanitized topic.
  • #outgoing - accepts outgoing sanitized topic version. Needs to return namespaced one.

Given each of the topics needs to have "karafka." prefix, your mapper could look like that:

class KarafkaTopicMapper
  def initialize(prefix)
    @prefix = prefix
  end

  def incoming(topic)
    topic.to_s.gsub("#{@prefix}.", '')
  end

  def outgoing(topic)
    "#{@prefix}.#{topic}"
  end
end

mapper = KarafkaTopicMapper.new('karafka')
mapper.incoming('karafka.my_super_topic') #=> 'my_super_topic'
mapper.outgoing('my_other_topic') #=> 'karafka.my_other_topic'

To use a custom mapper, just assign it during application configuration:

class App < Karafka::App
  setup do |config|
    # Other settings
    config.topic_mapper = MyCustomMapper.new('username')
  end
end

Topic mapper automatically integrates with both messages consumer and responders.