Не удается десериализовать данные protobuf с использованием Kafka-Connect и Schema-registry.

Вопрос или проблема

Я пытаюсь записать тему Kafka (с данными proto!) в таблицу Postgres, используя Kafka-Connect и Schema-registry!

Мой файл конфигурации соединителя Kafka-Connect:

{
   "name":"proto_sink",
         "config":{
            "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
            "connection.password":"some_password",
            "topics":"some_topic",
            "value.converter.schema.registry.url":"http://localhost:8081",
            "key.converter.schemas.enable":"false",
            "auto.evolve":"true",
            "connection.user":"some_user",
            "value.converter.schemas.enable":"true",
            "name":"sink_proto",
            "auto.create":"true",
            "connection.url":"jdbc:postgresql://localhost:5432/some_db",
            "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
            "insert.mode":"insert",
            "key.converter":"org.apache.kafka.connect.storage.StringConverter"
  }
}

Сначала я использую эту команду для отправки схемы proto в Schema-registry.

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json"   --data '{"schema": "syntax = \"proto3\";\npackage nilian;\n\nmessage Person {\n  string name = 1;\n  int32 age = 2;\n}"}'   http://localhost:8081/subjects/some_topic-value/versions

А затем я пытаюсь отправить конфигурации соединителя в Kafka-Connect с помощью этой команды:

curl -X POST -H "Content-Type: application/json" -d @connector_configs/json_conn.json http://localhost:8083/connectors

Так я получаю эту ошибку:

Caused by: org.apache.kafka.connect.errors.DataException: Не удалось десериализовать данные для темы some_topic в Protobuf: 
        at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:154)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:528)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
        ... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Protobuf для id -1
        at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:228)
        at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaProtobufDeserializer.java:292)
        at io.confluent.connect.protobuf.ProtobufConverter$Deserializer.deserialize(ProtobufConverter.java:200)
        at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:132)
        ... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Неизвестный магический байт!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:600)
        at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:133)
        ... 20 more

Что не так с этим?

Ответ или решение

Ошибка десериализации данных Protobuf в Kafka-Connect с использованием Schema Registry

Введение

При работе с Kafka и Protobuf, особенно в контексте использования Kafka-Connect и Schema Registry, могут возникать сложности с десериализацией сообщений. В вашем случае ошибка возникает при попытке извлечь данные из темы Kafka и записать их в таблицу Postgres. Рассмотрим возможные причины и решения данной проблемы.

Описание проблемы

Сообщение об ошибке указывает, что не удается десериализовать данные Protobuf для темы some_topic. Ключевые сообщения в трассировке ошибок:

  • Failed to deserialize data for topic some_topic to Protobuf
  • Unknown magic byte!

Эти сообщения являются индикаторами того, что данные, отправленные в Kafka, не соответствуют ожидаемому формату Protobuf, или есть проблемы с обработкой схемы.

Возможные причины ошибки

  1. Неверный формат сообщения: Убедитесь, что данные, отправляемые в тему Kafka (some_topic), представляют собой сериализованные объекты Protobuf. Они должны быть закодированы с использованием Protobuf и содержать правильный "magic byte".

  2. Ошибки в конфигурации serializers: Вы используете ProtobufConverter для значения, поэтому убедитесь, что данные сериализуются с использованием соответствующего ProtobufSerializer. Если используется неправильный сериализатор, данные могут восприниматься неверно.

  3. Отсутствующая или некорректная схема в Schema Registry: Убедитесь, что вы правильно зарегистрировали схему, и что версия, отправленная в Kafka, соответствует зарегистрированной в Schema Registry. Проверьте, что фактические данные, отправляемые в Kafka, соответствуют прово́дной схемы.

  4. Magic byte: Этот байт указывает на тип данных, и если он неверен (например, если данные не были сериализованы с использованием Kafka’s Protobuf Serializer), произойдет ошибка.

Решения

  1. Проверьте данные, отправляемые в Kafka: Убедитесь, что вы действительно отправляете сериализованные Protobuf сообщения. Вы можете использовать инструменты, такие как kafkacat, для проверки типа содержимого в вашей теме Kafka.

    kafkacat -b localhost:9092 -t some_topic -C
  2. Правильные конвертеры: Убедитесь, что в конфигурации вашего коннектора вы используете правильные конвертеры:

    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  3. Проверьте схему: Чтобы проверить, зарегистрирована ли схема, отправленная в Schema Registry, можете использовать следующий cURL запрос:

    curl -X GET http://localhost:8081/subjects/some_topic-value/versions

    Убедитесь, что схема и версии совпадают с теми, которые вы ожидаете.

  4. Отладка ошибок Magic Byte: Если у вас есть возможность протестировать код, который отправляет сообщения в Kafka, убедитесь, что используется правильный ProtobufSerializer с настройками, которые работают с вашим Schema Registry.

  5. Убедитесь в соответствии версии: Убедитесь, что ваша версия Kafka, Confluent платформы и всех используемых библиотек совместимы между собой и поддерживают Protobuf правильным образом.

Заключение

Ошибки при десериализации данных Protobuf в Kafka-Connect могут быть вызваны множеством факторов, от неверно сериализованных данных до проблем с настройками. Определив узкие места в процессе, например, проверив отправляемые данные, схемы и настройки коннектора, вы сможете устранить проблему. Надеюсь, что предложенные средства диагностики и решения помогут вам в исправлении ситуации. Не забывайте про обновления и консультации с документацией Confluent и Apache Kafka для лучшего понимания работы системы.

Оцените материал
Добавить комментарий

Капча загружается...