Skip to content

Latest commit

 

History

History
140 lines (94 loc) · 9.4 KB

005-consumers.md

File metadata and controls

140 lines (94 loc) · 9.4 KB

Работа с консумерами

Запуск консумеров

Мы также подготовили консумер на Go: программа читает топик и распечатывает сообщение на экран. В тестовом кластере сейчас работает сразу три консумера — все они представляют собой один и тот же экземпляр запущенной программы.

Посмотрите пример прочитанных событий:

docker-compose --profile app logs -f consumer-1 consumer-2 consumer-3
consumer-3  | 2023/02/26 09:27:17 message at topic/partition/offset example/2/2597: 5 = {"id":5,"lat":70.305836,"lon":-82.353797}
consumer-2  | 2023/02/26 09:27:17 message at topic/partition/offset example/5/1446: 6 = {"id":6,"lat":-68.561725,"lon":41.288281}
consumer-2  | 2023/02/26 09:27:17 message at topic/partition/offset example/5/1447: 6 = {"id":6,"lat":-18.154029,"lon":-62.287865}
consumer-1  | 2023/02/26 09:27:17 message at topic/partition/offset example/4/1421: 1 = {"id":1,"lat":-50.646267,"lon":10.077705}
consumer-1  | 2023/02/26 09:27:17 message at topic/partition/offset example/4/1422: 1 = {"id":1,"lat":-47.339896,"lon":-33.38711}

Обратите внимание, что каждый консумер читает только события, связанные соответствующим ключом. Это возможно благодаря партицированию по хешу со стороны продюсера.

Откройте исходный код консумера и посмотрите описание kafka.Reader. Программа подключается к брокерам, поллит данные в соответствии с выбранной минимальной величиной байт для одной пачки данных (MinBytes) и максимальным размером сообщения (MaxBytes).

r := kafka.NewReader(kafka.ReaderConfig{
	Brokers:  addrs,
	GroupID:  group,
	Topic:    topic,
	MinBytes: 10e2, // 1KB
	MaxBytes: 10e6, // 10MB
})

Само чтение же представляет собой бесконечный цикл:

  • Чтение пачки данных
  • Обработка
  • Коммит оффсета

В представленном примере чтение данных и коммит оффсета совмещены в функции ReadMessage и выполняются неявно (implicit).

for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
	    break
    }
    log.Print(fmt.Sprintf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)))
}

Однако обе эти операции могут быть выполнены явно (explicit). Например, сначала вы считываете порцию данных, обрабатываете её, а уже в самом конце — коммитите оффсет, перемещаясь к следующей порции записей. Например:

ctx := context.Background()
for {
    m, err := r.FetchMessage(ctx)
    if err != nil {
        break
    }
    log.Print(fmt.Sprintf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)))
    if err := r.CommitMessages(ctx, m); err != nil {
        log.Fatal("failed to commit messages:", err)
    }
}

Для простоты оба примера иллюстрируют синхронную работу консумера, но для улучшения производительности операции могут быть выполнены и асинхронно.

Управление консумер-группой

Посмотрим на консумер-группу детальнее. Откройте веб-интерфейс Redpanda Console, перейдите на вкладку "Consumer groups" и откройте профиль консумер-группы example-consumer-group.

Redpanda Console: Open consumer group profile page

На странице видно, что в группу example-consumer-group входят три консумера, каждый из которых определяется уникальным именем (Assigned member) и адресом узла (Host). Важно, что каждый консумер в группе читает одну или несколько партиций, однако одна партиция может быть прочитана только одним консумером в текущей группе.

Каждый консумер в группе характеризуется текущим оффсетом (Group Offset) относительно конечного оффсета партиции (Log End Offset). Расстояние между конечным оффсетом партиции и текущим оффсетом группы является лагом (Lag).

Вы можете управлять положением оффсета консумер-группы на уровне брокера. Например, если вам требуется прочитать сообщения за пределами текущего оффсета, вы можете переместить его на позицию в любое другое место.

Однако для этого важно, чтобы консумер-группа была пуста: то есть, в ней не было ни одного участника.

Остановите имеющиеся консумеры.

docker-compose --profile app stop consumer-1 consumer-2 consumer-3

Обновите страницу Redpanda Console.

Redpanda Console: Empty consumer group

Консумер-группа пуста (Empty). Теперь вы можете отредактировать её (Edit group). Переключите оффсет для всей группы, «отмотав» его на 5 минут назад.

Redpanda Console: Edit consumer group

Подтвердите изменение.

Redpanda Console: Edit consumer group

Готово. Обратите внимание на значения лага партиций.

Redpanda Console: Consumer group with lag

Теперь, когда мы запустим консумеры, они вновь вернутся к чтению с обозначенных позиций. Запустите остановленные консумеры:

docker-compose --profile app start consumer-1 consumer-2 consumer-3

Посмотрите на лог консумер-групп и убедитесь, что сообщения были прочитаны повторно.

docker-compose --profile app logs -f consumer-1 consumer-2 consumer-3

Ребалансировка консумер-группы

Любая смена композиции в консумер-группе (например, добавление или выход участника из группы, а также изменение числа партиций) приводит к её ребалансировке — процессу перераспределения партиций между «живыми» участниками. Сейчас в консумер-группе три консумера, каждый из которых читает две партиции.

Остановите один из работающих консумеров, перейдите на страницу консумер-группы в Redpanda Console.

docker-compose --profile app stop consumer-1

Сначала никаких изменений не будет, а после, на короткое время, группа перейдёт в состояние PreparingRebalance.

Redpanda Console: Consumer group is in preparing rebalance state

В этот момент все консумеры прекращают чтение и дожидаются синхронизации всех имеющихся участников в группе. После окончания ребалансировки, партиции перераспределяются между участниками и консумеры продолжают чтение с сохранённых позиций.

Redpanda Console: Consumer group after rebalance

Как можно заметить, теперь партиции распределились между двумя консумерами: по три на каждого участника группы.


✅ Готово. Теперь переходите к изучению метрик Kafka.