Неожиданный формат событий CDC Neo4j для значений свойств

Вопрос или проблема

Я включил CDC в своем экземпляре Neo4j и настроил тему Kafka для прослушивания изменений данных в базе данных Neo4j. Согласно документации Neo4j, данные событий должны быть в формате пар “ключ-значение”, но я получаю более сложную структуру с дополнительными полями для каждого свойства.

Вот пример данных, которые я получаю из темы Kafka:

{
  "id": "CJUg4WrNW0Y7ttlh8lbkxfwAAAAAAAAEMAAAAAAAAAACAAABkh21o5c=",
  "txId": 1072,
  "seq": 2,
  "event": {
    "elementId": "4:9520e16a-cd5b-463b-b6d9-61f256e4c5fc:2073",
    "eventType": "NODE",
    "operation": "CREATE",
    "labels": [
      "Environment"
    ],
    "keys": {},
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Environment"
        ],
        "properties": {
          "name": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "Dev",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          },
          "id": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "78b90e78-9b79-4330-9d02-7895f349964b",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          }
        }
      }
    }
  }
}

Моя версия сервера Neo4j – 5.22, а версия Kafka connect – 5.1.1.

Конфигурация коннектора Kafka:

{
  "name": "neo4j-source-connector",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "neo4j.uri": "bolt://localhost:7687",
    "neo4j.streaming.from": "ALL",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "500ms",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.neo4j-node-topic.patterns.0.pattern": "(:Environment)",
  }
}

Обратите внимание, что для свойств name и id, вместо того чтобы получать простой формат “ключ-значение”, я вижу структуру с такими полями, как B, I64, S и т.д. Я ожидал нечто подобное:

"id": "78b90e78-9b79-4330-9d02-7895f349964b"

Есть ли конфигурация, которую я пропустил, или что-то еще, что может объяснить, почему я получаю данные в таком формате? Как я могу упростить это до пар “ключ-значение”?

Ответ или решение

Проблема, с которой вы столкнулись, связана с тем, как Neo4j сериализует и отправляет события изменений через Kafka. В вашей конфигурации Kafka Connector, судя по предоставленным данным, вы получаете события в формате, который включает множественные типы данных для каждой из свойств, что определённо отличается от ожидаемого простого ключ-значение формата.

Причины проблемы

  1. Формат сериализации: Neo4j CDC использует определённый формат для сериализации различных типов данных свойств (например, S для строки, I64 для 64-битных целых чисел и т.д.). Это может быть неудобно, если вы ожидаете просто получать значения в простом формате.

  2. Отсутствие соответствующей конфигурации: Возможно, в конфигурации некорректно настроены конвертеры или используются неподходящие параметры.

Решение

Пожалуйста, выполните следующие шаги для упрощения структуры данных, получаемых из Kafka:

  1. Изменение настройки Kafka Connector: Вам необходимо убедиться, что ваш Kafka Connector правильно настроен для обработки данных в более удобном формате. Попробуйте использовать другой конвертер, который может распознавать типы данных и преобразовывать их в простой ключ-значение формат.

  2. Использование Transform (Transformed) в Kafka Connect: Вы можете рассмотреть возможность добавления SingleMessageTransform в конфигурацию.connector, чтобы обработать и преобразовать входящие данные в нужный формат. Например, org.apache.kafka.connect.transforms.ExtractField может быть использован для извлечения нужных полей.

  3. Проверка версий: Убедитесь, что вы используете совместимые версии Neo4j и Kafka Connector. Разные версии могут иметь различия в формате сообщений.

  4. Измените конфигурацию полей свойства: Возможно, стоит рассмотреть возможность изменения настройков Neo4j, чтобы они использовали упрощенный формат свойств. Проверьте документацию Neo4j на наличие опций конфигурации CDC, которые могут помочь настроить вывод данных.

Пример конфигурации с использованием преобразования

Вот пример обновлённой конфигурации Kafka Connector, в которую добавлены трансформации:

{
  "name": "neo4j-source-connector",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "transforms": "ExtractId",
    "transforms.ExtractId.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "neo4j.uri": "bolt://localhost:7687",
    "neo4j.streaming.from": "ALL",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "500ms",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.neo4j-node-topic.patterns.0.pattern": "(:Environment)"
  }
}

Заключение

Если после внесения изменений проблема не исчезнет, возможно, стоит обратиться в сообщество Neo4j или на платформы поддержки, чтобы более глубоко исследовать существующие проблемы с CDC и Kafka Connector. Вы также можете использовать механизм логирования для отслеживания того, как данные обрабатываются и отправляются через ваш поток.

Оцените материал
Добавить комментарий

Капча загружается...