- Вопрос или проблема
- Ответ или решение
- Горизонтальное масштабирование кластера Kafka 3.6.1 в режиме KRaft
- Рекомендуемые шаги по безопасному добавлению новых брокеров и контроллеров KRaft
- 1. Подготовка к масштабированию
- 2. Конфигурация нового узла
- 3. Запуск нового узла
- 4. Обновление состояния кластера
- 5. Проверка согласованности кластера
- 6. Перераспределение разделов (Partition Rebalancing)
- Является ли удаление файла состояния нормальным в процессе масштабирования?
- Инструменты и документация для масштабирования KRaft-кластеров
- Заключение
Вопрос или проблема
Я работаю с кластером Kafka 3.6.1 (включен режим KRaft) и хотел бы получить рекомендации по масштабированию брокеров и контроллеров Kafka. Ниже приведены детали моей настройки и шаги, которые я выполнил, а также некоторые проблемы, с которыми я столкнулся. Перед переходом в продакшн я протестировал процесс масштабирования в тестовой среде с 2 узлами (брокер и контроллеры KRaft на одних и тех же узлах).
Настройка тестового кластера:
- Начальная конфигурация:
- Узлы: 2
- Конфигурация кворума контроллеров на узлах:
[email protected]:9093,[email protected]:9093
- Процесс масштабирования:
- Добавлен новый узел (172.26.1.81).
- Настроен controller.quorum.voters на новом узле как:
[email protected]:9093,[email protected]:9093,[email protected]:9093 - Запустил Kafka на новом узле, который успешно подключился как наблюдатель в кворуме KRaft.
Проблемы, с которыми столкнулся:
- Новый узел был указан как наблюдатель вместо голосующего после запуска.
ClusterId: mXMb-Ah9Q8uNFoMtqGrBag
LeaderId: 0
LeaderEpoch: 7
HighWatermark: 33068
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [0,1]
CurrentObservers: [2]
- Обновление controller.quorum.voters на старых узлах вызвало ошибку:
[2024-12-02 12:04:11,314] ERROR [SharedServer id=0] Получено исключение при запуске SharedServer (kafka.server.SharedServer)
java.lang.IllegalStateException: Настроенный набор голосующих: [0, 1, 2] отличается от набора голосующих, прочитанного из файла состояния: [0, 1]. Проверьте, актуальна ли конфигурация кворума, или удалите локальный файл состояния, если это необходимо
at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:132)
at org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:375)
at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:248)
at kafka.raft.KafkaRaftManager.<init>(RaftManager.scala:174)
at kafka.server.SharedServer.start(SharedServer.scala:260)
at kafka.server.SharedServer.startForController(SharedServer.scala:132)
at kafka.server.ControllerServer.startup(ControllerServer.scala:192)
at kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:95)
at kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:95)
at scala.Option.foreach(Option.scala:437)
at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:95)
at kafka.Kafka$.main(Kafka.scala:113)
at kafka.Kafka.main(Kafka.scala)
[2024-12-02 12:04:11,325] INFO [ControllerServer id=0] Ожидание будущего голосующих кворума контроллера (kafka.server.ControllerServer)
[2024-12-02 12:04:11,328] INFO [ControllerServer id=0] Закончил ожидание будущего голосующих кворума контроллера (kafka.server.ControllerServer)
[2024-12-02 12:04:11,331] ERROR Встретил фатальную ошибку: поймано исключение (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.NullPointerException: Невозможно вызвать "kafka.raft.KafkaRaftManager.apiVersions(" потому что возвращаемое значение "kafka.server.SharedServer.raftManager()" равно null
at kafka.server.ControllerServer.startup(ControllerServer.scala:205)
at kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:95)
at kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:95)
at scala.Option.foreach(Option.scala:437)
at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:95)
at kafka.Kafka$.main(Kafka.scala:113)
at kafka.Kafka.main(Kafka.scala)
Итак, согласно логам, мне нужно “удалить локальный файл состояния”.
Хорошо, файл, содержащий слово “state”, находится в папке data.dir
/var/lib/kafka/data/__cluster-metadata-0
Итак, я удалил этот файл из старого брокера 103 и перезапустил Kafka, что прошло успешно.
Я спросил узел 103 о состоянии кворума KRaft и получил:
ClusterId: mXMb-Ah9Q8uNFoMtqGrBag
LeaderId: 2
LeaderEpoch: 125
HighWatermark: 84616
MaxFollowerLag: 84617
MaxFollowerLagTimeMs: -1
CurrentVoters: [0,1,2]
CurrentObservers: []
LeaderID = 2? Что :)
Хорошо, давайте спросим то же самое на старом узле 189 и получили:
ClusterId: mXMb-Ah9Q8uNFoMtqGrBag
LeaderId: 1
LeaderEpoch: 8
HighWatermark: -1
MaxFollowerLag: 74376
MaxFollowerLagTimeMs: -1
CurrentVoters: [0,1]
CurrentObservers: []
Давайте спросим то же самое на новом узле 81 и получили:
ClusterId: mXMb-Ah9Q8uNFoMtqGrBag
LeaderId: 2
LeaderEpoch: 125
HighWatermark: 84813
MaxFollowerLag: 84814
MaxFollowerLagTimeMs: -1
CurrentVoters: [0,1,2]
CurrentObservers: []
Так что похоже, что старый узел не совпадает с другими узлами.
Хорошо, давайте удалим файл состояния кворума на узле 189.
После удаления этого файла состояния я столкнулся со следующей ошибкой:
[2024-12-02 12:16:33,310] ERROR Встретил фатальную ошибку: Неожиданная ошибка в потоке raft I0 (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.IllegalStateException: Невозможно перейти к Следующему с leaderId=2 и epoch=125, поскольку он не является одним из голосующих [0, 1]
at org.apache.kafka.raft.QuorumState.transitionToFollower(QuorumState.java:382)
at org.apache.kafka.raft.KafkaRaftClient.transitionToFollower(KafkaRaftClient.java:522)
Что? 🙂 Хорошо, я решил удалить файл состояния на новом узле (81). Удаление файла состояния кворума на затронутых узлах решило проблему, но процесс казался рискованным и неструктурированным.
Часть повторного балансирования разделов:
После добавления нового узла я перересбалансировал разделы, используя следующие команды:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 172.26.1.103:9092 --command-config /etc/kafka/admin.properties --topics-to-move-json-file topics.json --broker-list "0,1,2" > reassignment_plan.json
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 172.26.1.103:9092 --command-config /etc/kafka/admin.properties --execute --reassignment-json-file reassignment_plan.json
Разделы были хорошо сбалансированы между брокерами после ожидания 3–4 минут.
Вопросы:
- Каковы рекомендуемые шаги для безопасного добавления новых брокеров и контроллеров KRaft к существующему кластеру Kafka?
- Нормально ли требовать удаления файла состояния кворума в процессе масштабирования?
- Существуют ли инструменты или документация, специально предназначенные для масштабирования кластеров Kafka на базе KRaft, которые я мог пропустить?
Буду признателен за любые советы или отзывы по моему подходу!
Ответ или решение
Горизонтальное масштабирование кластера Kafka 3.6.1 в режиме KRaft
Вопросы масштабирования KRaft-кластера Kafka актуальны для многих организаций, использующих данную технологию для управления потоками данных. Ваша ситуация связана с начальной попыткой добавить новый узел к кластеру, в котором уже функционируют два узла, что создает определенные сложности и требует корректного подхода.
Рекомендуемые шаги по безопасному добавлению новых брокеров и контроллеров KRaft
1. Подготовка к масштабированию
Прежде всего, обеспечьте резервное копирование текущих данных и конфигураций ваших узлов. Это поможет в случае возникновения непредвиденных проблем.
2. Конфигурация нового узла
- Добавление нового узла: Установите Kafka на новый сервер и убедитесь, что все зависимости установлены.
- Настройка
controller.quorum.voters
: Убедитесь, что вы правильно прописали идентификаторы новых контейнеров в настройкеcontroller.quorum.voters
на всех узлах. Например, если у вас изначально существовали два узла с идентификаторами 0 и 1, после добавления нового узла с идентификатором 2 изменения должны выглядеть следующим образом:- На узле 0:
controller.quorum.voters=0,1,2
- На узле 1:
controller.quorum.voters=0,1,2
- На узле 2:
controller.quorum.voters=0,1,2
- На узле 0:
3. Запуск нового узла
Запустите Kafka на новом узле и проверьте его статус с помощью команд для оценки состояния кластера.
4. Обновление состояния кластера
Если обнаружите, что новый узел стал наблюдателем, убедитесь, что все узлы имеют актуальную версию настройки controller.quorum.voters
. В некоторых случаях может потребоваться удалить локальные файлы состояния, если возникнут несоответствия.
5. Проверка согласованности кластера
После изменения конфигурации вы можете столкнуться с ошибками, которые указывают на несоответствие между настройками и фактическим состоянием узлов. Убедитесь, что все узлы успешно видят друг друга и корректно запускаются.
6. Перераспределение разделов (Partition Rebalancing)
После успешного добавления нового узла и актуализации настроек следует перераспределить разделы между узлами, чтобы обеспечить равномерную нагрузку. Подходящие команды уже указаны в вашем тестовом окружении.
Является ли удаление файла состояния нормальным в процессе масштабирования?
Удаление файла состояния (quorum-state
) не является стандартной практикой для KRaft-кластеров, однако в некоторых ситуациях оно может потребоваться, если узел не может корректно синхронизироваться с другими узлами кластера. Это должно использоваться как последний шаг после подтверждения, что все узлы имеют синхронизированные конфигурации.
Инструменты и документация для масштабирования KRaft-кластеров
Рекомендуется ознакомиться с официальной документацией Apache Kafka, а также изучить следующие материалы:
- Apache Kafka Documentation: Kafka Documentation
- Книги и курсы по Kafka: Есть много онлайн-ресурсов, которые обучают масштабированию и операционной деятельности с Kafka в режиме KRaft.
Заключение
Ваш подход к добавлению нового узла и перераспределению разделов подтвердили свою жизнеспособность. Однако для избежания ошибок в будущем стоит следовать более строгому и структурированному процессу. Основными рекомендациями остаются: тщательное резервирование, синхронизация конфигураций и обязательные проверки состояния после внесения изменений. Если у вас возникнут дополнительные вопросы или проблемы, не стесняйтесь обращаться к сообществу Kafka или использовать платные консультации, если это возможно.