Вопрос или проблема
У меня есть коннектор источника debezium postgres, который захватывает данные изменений в таблице, и я хотел бы отфильтровать строки, отправляемые в Kafka, на основе поля даты в таблице.
Мне удалось отфильтровать по другим полям с типами данных целых чисел и строк, но дата postgres в сообщении kafka представлена в формате postgres — количество дней с начала эпохи. Например, в pgAdmin мой столбец createDate отображается как 2009-01-01, но в сообщении kafka стоит 13879 (количество дней с 1970 года). Я пытался использовать transforms.filter.condition: “value.after.createDate > 12418 (1 января 2004 года), но это не сработало. Я также пробовал сравнивать со строкой даты, но это тоже не сработало. Вот конфигурация моего коннектора, которая работает при фильтрации по полю целого числа.
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "100",
"database.hostname": "host.docker.internal",
"database.port": "5432",
"database.user": "testUser",
"database.dbname" : "postgres",
"topic.prefix": "postgresTest2",
"schema.include.list": "public",
"time.precision.mode": "connect",
"plugin.name": "pgoutput",
"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.after.testInt > 5"
}
}
И вот образец сообщения kafka, которое я получаю от коннектора.
{
"before": null,
"after": {
"id": 8,
"name": "'name-8'",
"createDate": 14245,
"lastDate": 16436,
"timestamp": "08:20:30Z",
"testInt": 8
},
"source": {
"version": "2.7.1.Final",
"connector": "postgresql",
"name": "postgresTest2",
"ts_ms": 1730840079375,
"snapshot": "last",
"db": "postgres",
"sequence": "[null,\"24444976\"]",
"ts_us": 1730840079375132,
"ts_ns": 1730840079375132000,
"schema": "public",
"table": "test-table-1",
"txId": 736,
"lsn": 24444976,
"xmin": null
},
"transaction": null,
"op": "r",
"ts_ms": 1730840079747,
"ts_us": 1730840079747671,
"ts_ns": 1730840079747671600
}
Любые предложения о том, как это сделать, будут очень признательны.
Ответ или решение
Для того чтобы успешно фильтровать сообщения, поступающие в Kafka через коннектор Debezium Postgres, основываясь на дате, необходимо учитывать, что даты в сообщениях Kafka представлены как количество дней с эпохи (epoch). Это значение отличается от представления даты в базах данных и требует дополнительной обработки.
Шаги для фильтрации по дате
-
Понимание формата даты: В PostgreSQL тип данных
DATE
хранит даты как количество дней с 1970-01-01 (эпоха Unix). Например, дата2004-01-01
представляется как12418
. -
Настройка трансформации фильтра:
Чтобы отфильтровать данные, поступающие в Kafka, нужно использовать JavaScript-скрипт или Groovy-скрипт в качестве условия фильтрации. Вместо того чтобы использовать строковые представления дат, вы должны преобразовать желаемую дату в соответствующее количество дней с эпохи. -
Изменение конфигурации соединителя:
Вам нужно изменить конфигурацию вашего коннектора. Вот пример того, как это может выглядеть с учетом фильтрации по дате:
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "100",
"database.hostname": "host.docker.internal",
"database.port": "5432",
"database.user": "testUser",
"database.dbname": "postgres",
"topic.prefix": "postgresTest2",
"schema.include.list": "public",
"time.precision.mode": "connect",
"plugin.name": "pgoutput",
"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.after.createDate > 12418"
}
}
- Проверка значений: Убедитесь, что вы используете правильные значения для фильтрации. Например, в данном случае мы фильтруем на основе даты больше
2004-01-01
, что соответствует числу12418
.
Устранение ошибок
Если фильтрация не работает, проверьте следующее:
- Убедитесь, что поле
createDate
действительно существует в ваших данных и корректно отображается в Kafka. - Проверьте синтаксис условия фильтра. Убедитесь, что вы используете правильные операторы и форматирование.
- Вы можете протестировать выражение в JavaScript или Groovy, чтобы удостовериться, что оно ведет себя ожидаемым образом. Например, простое выражение как
value.after.createDate > 12418
должно возвращатьtrue
для записей с датой после 1 января 2004 года.
Заключение
Фильтрация событий по дате в Debezium Postgres Source Connector может быть простой процедурой при условии правильного понимания форматов данных и правильной настройки фильтров. Подходя к вопросу с деталями и точностью, можно достичь желаемых результатов. Если у вас остаются вопросы или сложности, не стесняйтесь обращаться к документации Debezium или сообществу для получения дальнейшей информации и помощи.