debezium postgres source connector kafka фильтр по дате

Вопрос или проблема

У меня есть коннектор источника debezium postgres, который захватывает данные изменений в таблице, и я хотел бы отфильтровать строки, отправляемые в Kafka, на основе поля даты в таблице.

Мне удалось отфильтровать по другим полям с типами данных целых чисел и строк, но дата postgres в сообщении kafka представлена в формате postgres — количество дней с начала эпохи. Например, в pgAdmin мой столбец createDate отображается как 2009-01-01, но в сообщении kafka стоит 13879 (количество дней с 1970 года). Я пытался использовать transforms.filter.condition: “value.after.createDate > 12418 (1 января 2004 года), но это не сработало. Я также пробовал сравнивать со строкой даты, но это тоже не сработало. Вот конфигурация моего коннектора, которая работает при фильтрации по полю целого числа.

{
   "name": "postgres-connector",
   "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "tasks.max": "100",
      "database.hostname": "host.docker.internal",
      "database.port": "5432",
      "database.user": "testUser",
      "database.dbname" : "postgres",
      "topic.prefix": "postgresTest2",
      "schema.include.list": "public",
      "time.precision.mode": "connect",
      "plugin.name": "pgoutput",
      "transforms": "filter",
      "transforms.filter.type": "io.debezium.transforms.Filter",
      "transforms.filter.language": "jsr223.groovy",
      "transforms.filter.condition": "value.after.testInt > 5"
      }
}

И вот образец сообщения kafka, которое я получаю от коннектора.

{
    "before": null,
    "after": {
        "id": 8,
        "name": "'name-8'",
        "createDate": 14245,
        "lastDate": 16436,
        "timestamp": "08:20:30Z",
        "testInt": 8
    },
    "source": {
        "version": "2.7.1.Final",
        "connector": "postgresql",
        "name": "postgresTest2",
        "ts_ms": 1730840079375,
        "snapshot": "last",
        "db": "postgres",
        "sequence": "[null,\"24444976\"]",
        "ts_us": 1730840079375132,
        "ts_ns": 1730840079375132000,
        "schema": "public",
        "table": "test-table-1",
        "txId": 736,
        "lsn": 24444976,
        "xmin": null
    },
    "transaction": null,
    "op": "r",
    "ts_ms": 1730840079747,
    "ts_us": 1730840079747671,
    "ts_ns": 1730840079747671600
}

Любые предложения о том, как это сделать, будут очень признательны.

Ответ или решение

Для того чтобы успешно фильтровать сообщения, поступающие в Kafka через коннектор Debezium Postgres, основываясь на дате, необходимо учитывать, что даты в сообщениях Kafka представлены как количество дней с эпохи (epoch). Это значение отличается от представления даты в базах данных и требует дополнительной обработки.

Шаги для фильтрации по дате

  1. Понимание формата даты: В PostgreSQL тип данных DATE хранит даты как количество дней с 1970-01-01 (эпоха Unix). Например, дата 2004-01-01 представляется как 12418.

  2. Настройка трансформации фильтра:
    Чтобы отфильтровать данные, поступающие в Kafka, нужно использовать JavaScript-скрипт или Groovy-скрипт в качестве условия фильтрации. Вместо того чтобы использовать строковые представления дат, вы должны преобразовать желаемую дату в соответствующее количество дней с эпохи.

  3. Изменение конфигурации соединителя:
    Вам нужно изменить конфигурацию вашего коннектора. Вот пример того, как это может выглядеть с учетом фильтрации по дате:

{
   "name": "postgres-connector",
   "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "tasks.max": "100",
      "database.hostname": "host.docker.internal",
      "database.port": "5432",
      "database.user": "testUser",
      "database.dbname": "postgres",
      "topic.prefix": "postgresTest2",
      "schema.include.list": "public",
      "time.precision.mode": "connect",
      "plugin.name": "pgoutput",
      "transforms": "filter",
      "transforms.filter.type": "io.debezium.transforms.Filter",
      "transforms.filter.language": "jsr223.groovy",
      "transforms.filter.condition": "value.after.createDate > 12418"
   }
}
  1. Проверка значений: Убедитесь, что вы используете правильные значения для фильтрации. Например, в данном случае мы фильтруем на основе даты больше 2004-01-01, что соответствует числу 12418.

Устранение ошибок

Если фильтрация не работает, проверьте следующее:

  • Убедитесь, что поле createDate действительно существует в ваших данных и корректно отображается в Kafka.
  • Проверьте синтаксис условия фильтра. Убедитесь, что вы используете правильные операторы и форматирование.
  • Вы можете протестировать выражение в JavaScript или Groovy, чтобы удостовериться, что оно ведет себя ожидаемым образом. Например, простое выражение как value.after.createDate > 12418 должно возвращать true для записей с датой после 1 января 2004 года.

Заключение

Фильтрация событий по дате в Debezium Postgres Source Connector может быть простой процедурой при условии правильного понимания форматов данных и правильной настройки фильтров. Подходя к вопросу с деталями и точностью, можно достичь желаемых результатов. Если у вас остаются вопросы или сложности, не стесняйтесь обращаться к документации Debezium или сообществу для получения дальнейшей информации и помощи.

Оцените материал
Добавить комментарий

Капча загружается...