Вопрос или проблема
Я включил CDC в своем экземпляре Neo4j и настроил тему Kafka для прослушивания изменений данных в базе данных Neo4j. Согласно документации Neo4j, данные событий должны быть в формате пар “ключ-значение”, но я получаю более сложную структуру с дополнительными полями для каждого свойства.
Вот пример данных, которые я получаю из темы Kafka:
{
"id": "CJUg4WrNW0Y7ttlh8lbkxfwAAAAAAAAEMAAAAAAAAAACAAABkh21o5c=",
"txId": 1072,
"seq": 2,
"event": {
"elementId": "4:9520e16a-cd5b-463b-b6d9-61f256e4c5fc:2073",
"eventType": "NODE",
"operation": "CREATE",
"labels": [
"Environment"
],
"keys": {},
"state": {
"before": null,
"after": {
"labels": [
"Environment"
],
"properties": {
"name": {
"B": null,
"I64": null,
"F64": null,
"S": "Dev",
"BA": null,
"TLD": null,
"TLDT": null,
"TLT": null,
"TZDT": null,
"TOT": null,
"TD": null,
"SP": null,
"LB": null,
"LI64": null,
"LF64": null,
"LS": null,
"LTLD": null,
"LTLDT": null,
"LTLT": null,
"LZDT": null,
"LTOT": null,
"LTD": null,
"LSP": null
},
"id": {
"B": null,
"I64": null,
"F64": null,
"S": "78b90e78-9b79-4330-9d02-7895f349964b",
"BA": null,
"TLD": null,
"TLDT": null,
"TLT": null,
"TZDT": null,
"TOT": null,
"TD": null,
"SP": null,
"LB": null,
"LI64": null,
"LF64": null,
"LS": null,
"LTLD": null,
"LTLDT": null,
"LTLT": null,
"LZDT": null,
"LTOT": null,
"LTD": null,
"LSP": null
}
}
}
}
}
}
Моя версия сервера Neo4j – 5.22, а версия Kafka connect – 5.1.1.
Конфигурация коннектора Kafka:
{
"name": "neo4j-source-connector",
"config": {
"connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"neo4j.uri": "bolt://localhost:7687",
"neo4j.streaming.from": "ALL",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.source-strategy": "CDC",
"neo4j.start-from": "NOW",
"neo4j.cdc.poll-interval": "500ms",
"neo4j.cdc.poll-duration": "5s",
"neo4j.cdc.topic.neo4j-node-topic.patterns.0.pattern": "(:Environment)",
}
}
Обратите внимание, что для свойств name и id, вместо того чтобы получать простой формат “ключ-значение”, я вижу структуру с такими полями, как B
, I64
, S
и т.д. Я ожидал нечто подобное:
"id": "78b90e78-9b79-4330-9d02-7895f349964b"
Есть ли конфигурация, которую я пропустил, или что-то еще, что может объяснить, почему я получаю данные в таком формате? Как я могу упростить это до пар “ключ-значение”?
Ответ или решение
Проблема, с которой вы столкнулись, связана с тем, как Neo4j сериализует и отправляет события изменений через Kafka. В вашей конфигурации Kafka Connector, судя по предоставленным данным, вы получаете события в формате, который включает множественные типы данных для каждой из свойств, что определённо отличается от ожидаемого простого ключ-значение формата.
Причины проблемы
-
Формат сериализации: Neo4j CDC использует определённый формат для сериализации различных типов данных свойств (например, S для строки, I64 для 64-битных целых чисел и т.д.). Это может быть неудобно, если вы ожидаете просто получать значения в простом формате.
-
Отсутствие соответствующей конфигурации: Возможно, в конфигурации некорректно настроены конвертеры или используются неподходящие параметры.
Решение
Пожалуйста, выполните следующие шаги для упрощения структуры данных, получаемых из Kafka:
-
Изменение настройки Kafka Connector: Вам необходимо убедиться, что ваш Kafka Connector правильно настроен для обработки данных в более удобном формате. Попробуйте использовать другой конвертер, который может распознавать типы данных и преобразовывать их в простой ключ-значение формат.
-
Использование Transform (Transformed) в Kafka Connect: Вы можете рассмотреть возможность добавления
SingleMessageTransform
в конфигурацию.connector, чтобы обработать и преобразовать входящие данные в нужный формат. Например,org.apache.kafka.connect.transforms.ExtractField
может быть использован для извлечения нужных полей. -
Проверка версий: Убедитесь, что вы используете совместимые версии Neo4j и Kafka Connector. Разные версии могут иметь различия в формате сообщений.
-
Измените конфигурацию полей свойства: Возможно, стоит рассмотреть возможность изменения настройков Neo4j, чтобы они использовали упрощенный формат свойств. Проверьте документацию Neo4j на наличие опций конфигурации CDC, которые могут помочь настроить вывод данных.
Пример конфигурации с использованием преобразования
Вот пример обновлённой конфигурации Kafka Connector, в которую добавлены трансформации:
{
"name": "neo4j-source-connector",
"config": {
"connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"transforms": "ExtractId",
"transforms.ExtractId.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"neo4j.uri": "bolt://localhost:7687",
"neo4j.streaming.from": "ALL",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.source-strategy": "CDC",
"neo4j.start-from": "NOW",
"neo4j.cdc.poll-interval": "500ms",
"neo4j.cdc.poll-duration": "5s",
"neo4j.cdc.topic.neo4j-node-topic.patterns.0.pattern": "(:Environment)"
}
}
Заключение
Если после внесения изменений проблема не исчезнет, возможно, стоит обратиться в сообщество Neo4j или на платформы поддержки, чтобы более глубоко исследовать существующие проблемы с CDC и Kafka Connector. Вы также можете использовать механизм логирования для отслеживания того, как данные обрабатываются и отправляются через ваш поток.