Вопрос или проблема
Я пытаюсь записать тему Kafka (с данными proto!) в таблицу Postgres, используя Kafka-Connect и Schema-registry!
Мой файл конфигурации соединителя Kafka-Connect:
{ "name":"proto_sink", "config":{ "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "connection.password":"some_password", "topics":"some_topic", "value.converter.schema.registry.url":"http://localhost:8081", "key.converter.schemas.enable":"false", "auto.evolve":"true", "connection.user":"some_user", "value.converter.schemas.enable":"true", "name":"sink_proto", "auto.create":"true", "connection.url":"jdbc:postgresql://localhost:5432/some_db", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "insert.mode":"insert", "key.converter":"org.apache.kafka.connect.storage.StringConverter" } }
Сначала я использую эту команду для отправки схемы proto в Schema-registry.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "syntax = \"proto3\";\npackage nilian;\n\nmessage Person {\n string name = 1;\n int32 age = 2;\n}"}' http://localhost:8081/subjects/some_topic-value/versions
А затем я пытаюсь отправить конфигурации соединителя в Kafka-Connect с помощью этой команды:
curl -X POST -H "Content-Type: application/json" -d @connector_configs/json_conn.json http://localhost:8083/connectors
Так я получаю эту ошибку:
Caused by: org.apache.kafka.connect.errors.DataException: Не удалось десериализовать данные для темы some_topic в Protobuf:
at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:154)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:528)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Protobuf для id -1
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:228)
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaProtobufDeserializer.java:292)
at io.confluent.connect.protobuf.ProtobufConverter$Deserializer.deserialize(ProtobufConverter.java:200)
at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:132)
... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Неизвестный магический байт!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:600)
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:133)
... 20 more
Что не так с этим?
Ответ или решение
Ошибка десериализации данных Protobuf в Kafka-Connect с использованием Schema Registry
Введение
При работе с Kafka и Protobuf, особенно в контексте использования Kafka-Connect и Schema Registry, могут возникать сложности с десериализацией сообщений. В вашем случае ошибка возникает при попытке извлечь данные из темы Kafka и записать их в таблицу Postgres. Рассмотрим возможные причины и решения данной проблемы.
Описание проблемы
Сообщение об ошибке указывает, что не удается десериализовать данные Protobuf для темы some_topic
. Ключевые сообщения в трассировке ошибок:
Failed to deserialize data for topic some_topic to Protobuf
Unknown magic byte!
Эти сообщения являются индикаторами того, что данные, отправленные в Kafka, не соответствуют ожидаемому формату Protobuf, или есть проблемы с обработкой схемы.
Возможные причины ошибки
-
Неверный формат сообщения: Убедитесь, что данные, отправляемые в тему Kafka (
some_topic
), представляют собой сериализованные объекты Protobuf. Они должны быть закодированы с использованием Protobuf и содержать правильный "magic byte". -
Ошибки в конфигурации serializers: Вы используете ProtobufConverter для значения, поэтому убедитесь, что данные сериализуются с использованием соответствующего ProtobufSerializer. Если используется неправильный сериализатор, данные могут восприниматься неверно.
-
Отсутствующая или некорректная схема в Schema Registry: Убедитесь, что вы правильно зарегистрировали схему, и что версия, отправленная в Kafka, соответствует зарегистрированной в Schema Registry. Проверьте, что фактические данные, отправляемые в Kafka, соответствуют прово́дной схемы.
-
Magic byte: Этот байт указывает на тип данных, и если он неверен (например, если данные не были сериализованы с использованием Kafka’s Protobuf Serializer), произойдет ошибка.
Решения
-
Проверьте данные, отправляемые в Kafka: Убедитесь, что вы действительно отправляете сериализованные Protobuf сообщения. Вы можете использовать инструменты, такие как
kafkacat
, для проверки типа содержимого в вашей теме Kafka.kafkacat -b localhost:9092 -t some_topic -C
-
Правильные конвертеры: Убедитесь, что в конфигурации вашего коннектора вы используете правильные конвертеры:
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter"
-
Проверьте схему: Чтобы проверить, зарегистрирована ли схема, отправленная в Schema Registry, можете использовать следующий cURL запрос:
curl -X GET http://localhost:8081/subjects/some_topic-value/versions
Убедитесь, что схема и версии совпадают с теми, которые вы ожидаете.
-
Отладка ошибок Magic Byte: Если у вас есть возможность протестировать код, который отправляет сообщения в Kafka, убедитесь, что используется правильный ProtobufSerializer с настройками, которые работают с вашим Schema Registry.
-
Убедитесь в соответствии версии: Убедитесь, что ваша версия Kafka, Confluent платформы и всех используемых библиотек совместимы между собой и поддерживают Protobuf правильным образом.
Заключение
Ошибки при десериализации данных Protobuf в Kafka-Connect могут быть вызваны множеством факторов, от неверно сериализованных данных до проблем с настройками. Определив узкие места в процессе, например, проверив отправляемые данные, схемы и настройки коннектора, вы сможете устранить проблему. Надеюсь, что предложенные средства диагностики и решения помогут вам в исправлении ситуации. Не забывайте про обновления и консультации с документацией Confluent и Apache Kafka для лучшего понимания работы системы.