горизонтально масштабировать кластер Kafka 3 KRaft

Вопрос или проблема

Я работаю с кластером Kafka 3.6.1 (включен режим KRaft) и хотел бы получить рекомендации по масштабированию брокеров и контроллеров Kafka. Ниже приведены детали моей настройки и шаги, которые я выполнил, а также некоторые проблемы, с которыми я столкнулся. Перед переходом в продакшн я протестировал процесс масштабирования в тестовой среде с 2 узлами (брокер и контроллеры KRaft на одних и тех же узлах).

Настройка тестового кластера:

  1. Начальная конфигурация:
  2. Процесс масштабирования:
    • Добавлен новый узел (172.26.1.81).
    • Настроен controller.quorum.voters на новом узле как:
      [email protected]:9093,[email protected]:9093,[email protected]:9093
    • Запустил Kafka на новом узле, который успешно подключился как наблюдатель в кворуме KRaft.

Проблемы, с которыми столкнулся:

  1. Новый узел был указан как наблюдатель вместо голосующего после запуска.
ClusterId:              mXMb-Ah9Q8uNFoMtqGrBag
LeaderId:               0
LeaderEpoch:            7
HighWatermark:          33068
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   0
CurrentVoters:          [0,1]
CurrentObservers:       [2]
  1. Обновление controller.quorum.voters на старых узлах вызвало ошибку:
[2024-12-02 12:04:11,314] ERROR [SharedServer id=0] Получено исключение при запуске SharedServer (kafka.server.SharedServer)
java.lang.IllegalStateException: Настроенный набор голосующих: [0, 1, 2] отличается от набора голосующих, прочитанного из файла состояния: [0, 1]. Проверьте, актуальна ли конфигурация кворума, или удалите локальный файл состояния, если это необходимо
at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:132)
at org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:375)
at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:248)
at kafka.raft.KafkaRaftManager.<init>(RaftManager.scala:174)
at kafka.server.SharedServer.start(SharedServer.scala:260)
at kafka.server.SharedServer.startForController(SharedServer.scala:132)
at kafka.server.ControllerServer.startup(ControllerServer.scala:192)
at kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:95)
at kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:95)
at scala.Option.foreach(Option.scala:437)
at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:95)
at kafka.Kafka$.main(Kafka.scala:113)
at kafka.Kafka.main(Kafka.scala)
[2024-12-02 12:04:11,325] INFO [ControllerServer id=0] Ожидание будущего голосующих кворума контроллера (kafka.server.ControllerServer)
[2024-12-02 12:04:11,328] INFO [ControllerServer id=0] Закончил ожидание будущего голосующих кворума контроллера (kafka.server.ControllerServer)
[2024-12-02 12:04:11,331] ERROR Встретил фатальную ошибку: поймано исключение (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.NullPointerException: Невозможно вызвать "kafka.raft.KafkaRaftManager.apiVersions(" потому что возвращаемое значение "kafka.server.SharedServer.raftManager()" равно null
at kafka.server.ControllerServer.startup(ControllerServer.scala:205)
at kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:95)
at kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:95)
at scala.Option.foreach(Option.scala:437)
at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:95)
at kafka.Kafka$.main(Kafka.scala:113)
at kafka.Kafka.main(Kafka.scala)

Итак, согласно логам, мне нужно “удалить локальный файл состояния”.
Хорошо, файл, содержащий слово “state”, находится в папке data.dir

/var/lib/kafka/data/__cluster-metadata-0

Итак, я удалил этот файл из старого брокера 103 и перезапустил Kafka, что прошло успешно.
Я спросил узел 103 о состоянии кворума KRaft и получил:

ClusterId:              mXMb-Ah9Q8uNFoMtqGrBag
LeaderId:               2
LeaderEpoch:            125
HighWatermark:          84616
MaxFollowerLag:         84617
MaxFollowerLagTimeMs:   -1
CurrentVoters:          [0,1,2]
CurrentObservers:       []
LeaderID = 2? Что :)
Хорошо, давайте спросим то же самое на старом узле 189 и получили:
ClusterId:              mXMb-Ah9Q8uNFoMtqGrBag
LeaderId:               1
LeaderEpoch:            8
HighWatermark:          -1
MaxFollowerLag:         74376
MaxFollowerLagTimeMs:   -1
CurrentVoters:          [0,1]
CurrentObservers:       []
Давайте спросим то же самое на новом узле 81 и получили:
ClusterId:              mXMb-Ah9Q8uNFoMtqGrBag
LeaderId:               2
LeaderEpoch:            125
HighWatermark:          84813
MaxFollowerLag:         84814
MaxFollowerLagTimeMs:   -1
CurrentVoters:          [0,1,2]
CurrentObservers:       []

Так что похоже, что старый узел не совпадает с другими узлами.
Хорошо, давайте удалим файл состояния кворума на узле 189.
После удаления этого файла состояния я столкнулся со следующей ошибкой:

[2024-12-02 12:16:33,310] ERROR Встретил фатальную ошибку: Неожиданная ошибка в потоке raft I0 (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.IllegalStateException: Невозможно перейти к Следующему с leaderId=2 и epoch=125, поскольку он не является одним из голосующих [0, 1]
at org.apache.kafka.raft.QuorumState.transitionToFollower(QuorumState.java:382)
at org.apache.kafka.raft.KafkaRaftClient.transitionToFollower(KafkaRaftClient.java:522)

Что? 🙂 Хорошо, я решил удалить файл состояния на новом узле (81). Удаление файла состояния кворума на затронутых узлах решило проблему, но процесс казался рискованным и неструктурированным.

Часть повторного балансирования разделов:
После добавления нового узла я перересбалансировал разделы, используя следующие команды:

/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 172.26.1.103:9092 --command-config /etc/kafka/admin.properties --topics-to-move-json-file topics.json --broker-list "0,1,2" > reassignment_plan.json
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 172.26.1.103:9092 --command-config /etc/kafka/admin.properties --execute --reassignment-json-file reassignment_plan.json

Разделы были хорошо сбалансированы между брокерами после ожидания 3–4 минут.

Вопросы:

  1. Каковы рекомендуемые шаги для безопасного добавления новых брокеров и контроллеров KRaft к существующему кластеру Kafka?
  2. Нормально ли требовать удаления файла состояния кворума в процессе масштабирования?
  3. Существуют ли инструменты или документация, специально предназначенные для масштабирования кластеров Kafka на базе KRaft, которые я мог пропустить?
    Буду признателен за любые советы или отзывы по моему подходу!

Ответ или решение

Горизонтальное масштабирование кластера Kafka 3.6.1 в режиме KRaft

Вопросы масштабирования KRaft-кластера Kafka актуальны для многих организаций, использующих данную технологию для управления потоками данных. Ваша ситуация связана с начальной попыткой добавить новый узел к кластеру, в котором уже функционируют два узла, что создает определенные сложности и требует корректного подхода.

Рекомендуемые шаги по безопасному добавлению новых брокеров и контроллеров KRaft

1. Подготовка к масштабированию

Прежде всего, обеспечьте резервное копирование текущих данных и конфигураций ваших узлов. Это поможет в случае возникновения непредвиденных проблем.

2. Конфигурация нового узла

  • Добавление нового узла: Установите Kafka на новый сервер и убедитесь, что все зависимости установлены.
  • Настройка controller.quorum.voters: Убедитесь, что вы правильно прописали идентификаторы новых контейнеров в настройке controller.quorum.voters на всех узлах. Например, если у вас изначально существовали два узла с идентификаторами 0 и 1, после добавления нового узла с идентификатором 2 изменения должны выглядеть следующим образом:
    • На узле 0: controller.quorum.voters=0,1,2
    • На узле 1: controller.quorum.voters=0,1,2
    • На узле 2: controller.quorum.voters=0,1,2

3. Запуск нового узла

Запустите Kafka на новом узле и проверьте его статус с помощью команд для оценки состояния кластера.

4. Обновление состояния кластера

Если обнаружите, что новый узел стал наблюдателем, убедитесь, что все узлы имеют актуальную версию настройки controller.quorum.voters. В некоторых случаях может потребоваться удалить локальные файлы состояния, если возникнут несоответствия.

5. Проверка согласованности кластера

После изменения конфигурации вы можете столкнуться с ошибками, которые указывают на несоответствие между настройками и фактическим состоянием узлов. Убедитесь, что все узлы успешно видят друг друга и корректно запускаются.

6. Перераспределение разделов (Partition Rebalancing)

После успешного добавления нового узла и актуализации настроек следует перераспределить разделы между узлами, чтобы обеспечить равномерную нагрузку. Подходящие команды уже указаны в вашем тестовом окружении.

Является ли удаление файла состояния нормальным в процессе масштабирования?

Удаление файла состояния (quorum-state) не является стандартной практикой для KRaft-кластеров, однако в некоторых ситуациях оно может потребоваться, если узел не может корректно синхронизироваться с другими узлами кластера. Это должно использоваться как последний шаг после подтверждения, что все узлы имеют синхронизированные конфигурации.

Инструменты и документация для масштабирования KRaft-кластеров

Рекомендуется ознакомиться с официальной документацией Apache Kafka, а также изучить следующие материалы:

  • Apache Kafka Documentation: Kafka Documentation
  • Книги и курсы по Kafka: Есть много онлайн-ресурсов, которые обучают масштабированию и операционной деятельности с Kafka в режиме KRaft.

Заключение

Ваш подход к добавлению нового узла и перераспределению разделов подтвердили свою жизнеспособность. Однако для избежания ошибок в будущем стоит следовать более строгому и структурированному процессу. Основными рекомендациями остаются: тщательное резервирование, синхронизация конфигураций и обязательные проверки состояния после внесения изменений. Если у вас возникнут дополнительные вопросы или проблемы, не стесняйтесь обращаться к сообществу Kafka или использовать платные консультации, если это возможно.

Оцените материал
Добавить комментарий

Капча загружается...