Я пытаюсь реализовать семантику exactly-once в функции приемника, которая должна записывать потоковые данные в базу данных postgres. У меня есть следующая реализация TwoPhaseCommitSinkFunction
:
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class ExactlyOncePostgresSink extends TwoPhaseCommitSinkFunction<Transaction, Connection, Void> {
private static final Logger logger = LoggerFactory.getLogger(ExactlyOncePostgresSink.class);
private Connection connection;
private PreparedStatement updateBalanceStatement;
private PreparedStatement logTransactionStatement;
private String updateBalancesSql;
private String logTransactionSql;
private String jdbcUrl;
private String username;
private String password;
public ExactlyOncePostgresSink(String jdbcUrl, String username, String password) {
super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
this.updateBalancesSql = "UPDATE postgres.public.account " +
"SET balance = balance - ? " +
"WHERE account_id = ?; " +
"UPDATE postgres.public.account " +
"SET balance = balance + ? " +
"WHERE account_id = ?; ";
this.logTransactionSql = "INSERT INTO postgres.public.transactions (transaction_id, from_account, to_account, amount) " +
"VALUES(?, ?, ?, ?); ";
}
@Override
protected void invoke(Connection connection, Transaction transaction, Context context) throws Exception {
try {
// Обновление балансов
updateBalanceStatement.setDouble(1, transaction.getAmount());
updateBalanceStatement.setString(2, transaction.getSendingClientAccountNumber());
updateBalanceStatement.setDouble(3, transaction.getAmount());
updateBalanceStatement.setString(4, transaction.getReceivingClientAccountNumber());
// Логирование транзакции
logTransactionStatement.setString(1, transaction.getTransactionId());
logTransactionStatement.setString(2, transaction.getSendingClientAccountNumber());
logTransactionStatement.setString(3, transaction.getReceivingClientAccountNumber());
logTransactionStatement.setDouble(4, transaction.getAmount());
updateBalanceStatement.executeUpdate();
logTransactionStatement.executeUpdate();
logger.info("Транзакция обработана: {}", transaction);
}
catch (Exception e) {
logger.error("Ошибка при обработке транзакции: {}", transaction, e);
throw new RuntimeException(e);
}
}
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
connection = DriverManager.getConnection(jdbcUrl, username, password);
connection.setAutoCommit(false);
updateBalanceStatement = connection.prepareStatement(this.updateBalancesSql);
logTransactionStatement = connection.prepareStatement(this.logTransactionSql);
}
@Override
protected Connection beginTransaction() throws Exception {
Connection conn = DriverManager.getConnection(this.jdbcUrl, this.username, this.password);
conn.setAutoCommit(false);
logger.info("Транзакция начата с новым соединением базы данных");
return conn;
}
@Override
protected void preCommit(Connection connection) throws Exception {
}
@Override
protected void commit(Connection connection) {
try{
connection.commit();
logger.info("Транзакция зафиксирована");
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
protected void abort(Connection connection) {
try {
connection.rollback();
logger.warn("Откат.");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws Exception {
if (updateBalanceStatement != null) {
updateBalanceStatement.close();
}
if (logTransactionStatement != null) {
logTransactionStatement.close();
}
if (connection != null) {
connection.close();
}
super.close();
logger.info("ExactlyOncePostgresSink закрыт");
}
}
Поведение, которое наблюдается, заключается в том, что вызывается функция beginTransaction
несколько раз, затем по мере поступления записей вызывается функция invoke
, после чего происходит откат транзакции, что, как я считаю, вызвано ошибкой, что транзакцию нельзя зафиксировать:
java.lang.IllegalAccessError: failed to access class java.util.concurrent.locks.ReentrantLock$NonfairSync from class reflectasm.java.util.concurrent.locks.ReentrantLock$NonfairSyncConstructorAccess (java.util.concurrent.locks.ReentrantLock$NonfairSync is in module java.base of loader 'bootstrap'; reflectasm.java.util.concurrent.locks.ReentrantLock$NonfairSyncConstructorAccess is in unnamed module of loader 'app')
at reflectasm.java.util.concurrent.locks.ReentrantLock$NonfairSyncConstructorAccess.newInstance(Unknown Source)
at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy$1.newInstance(Kryo.java:1193)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField.copy(UnsafeCacheFields.java:297)
at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField.copy(UnsafeCacheFields.java:297)
at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:298)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.copy(TwoPhaseCommitSinkFunction.java:720)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.copy(TwoPhaseCommitSinkFunction.java:669)
at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:70)
at org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:67)
at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:81)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:85)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:40)
at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:185)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:720)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:352)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1242)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$15(StreamTask.java:1201)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
Судя по всему, кажется, что переданный в конструктор KryoSerializer
класс java.sql.Connection
вызывает проблему, но я не могу понять, как именно. Мой вопрос: какова правильная реализация KryoSerializer
в этом случае?
Версия Java SDK: 11
Версия Flink: 1.19.1
Ответ
Проблема, с которой вы столкнулись, связана с тем, что класс java.sql.Connection
не может быть сериализован с помощью KryoSerializer
. Это происходит из-за того, что Kryo пытается получить доступ к полям внутреннего класса ReentrantLock$NonfairSync
, что приводит к ошибке IllegalAccessError
.
Вот шаги для решения проблемы:
-
Используйте сериализацию по умолчанию: Вместо передачи класса
Connection
вKryoSerializer
, вы можете использовать другую стратегию сериализации, так как объектConnection
является связующим с ресурсами, которые не могут быть сериализованы (например, соединение с базой данных). Серверные соединения, как правило, не сохраняются в состоянии и не должны быть сериализованы. -
Измените реализацию
ExactlyOncePostgresSink
:
Вам не нужно использоватьKryoSerializer
для сериализацииConnection
. Вместо этого, вы можете использоватьVoidSerializer
(или просто использовать стандартную сериализацию для вашего типа данных), сохранив логику работы с соединениями на уровне методовopen
,beginTransaction
,commit
, иabort
. Пример:public class ExactlyOncePostgresSink extends TwoPhaseCommitSinkFunction<Transaction, Void, Void> { // Остальной код остается без изменений public ExactlyOncePostgresSink(String jdbcUrl, String username, String password) { super(VoidSerializer.INSTANCE, VoidSerializer.INSTANCE); // Остальные параметры и инициализация } @Override protected Connection beginTransaction() throws Exception { Connection conn = DriverManager.getConnection(this.jdbcUrl, this.username, this.password); conn.setAutoCommit(false); logger.info("Transaction started with a new database connection"); return conn; } @Override protected void preCommit(Connection connection) throws Exception { // здесь может быть запрещено делать изменения } @Override protected void commit(Connection connection) { try { connection.commit(); logger.info("Transaction committed"); } catch (SQLException e) { throw new RuntimeException(e); } } @Override protected void abort(Connection connection) { try { connection.rollback(); logger.warn("Rollback."); } catch (SQLException e) { throw new RuntimeException(e); } } @Override public void close() throws Exception { // Закрытие соединения и подготовленных выражений if (updateBalanceStatement != null) { updateBalanceStatement.close(); } if (logTransactionStatement != null) { logTransactionStatement.close(); } if (connection != null) { connection.close(); } super.close(); logger.info("ExactlyOncePostgresSink закрыт"); } }
-
Проверьте другие зависимости: Убедитесь, что у вас нет конфликтующих зависимостей в вашем проекте. Иногда проблемы возникают из-за конфликтующих версий библиотек.
-
Проверка конфигурации Flink: Убедитесь, что ваша версия Flink правильно настроена и соответствует используемым библиотекам и версиям базы данных.
- Логирование и отладка: Наконец, добавьте дополнительные журналы для лучшего понимания потока выполнения, особенно перед и после транзакций, чтобы выявить места, где проблема может проявляться.
Следуя приведенным выше рекомендациям, вы сможете избежать ошибки IllegalAccessError
в вашем проекте на Flink.