Флинк KryoSerializer вызывает java.lang.IllegalAccessError: не удалось получить доступ к классу java.util.concurrent.locks.ReentrantLock$NonfairSync

Вопросы и ответы

Я пытаюсь реализовать семантику exactly-once в функции приемника, которая должна записывать потоковые данные в базу данных postgres. У меня есть следующая реализация TwoPhaseCommitSinkFunction:

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;

import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class ExactlyOncePostgresSink extends TwoPhaseCommitSinkFunction<Transaction, Connection, Void> {

    private static final Logger logger = LoggerFactory.getLogger(ExactlyOncePostgresSink.class);

    private Connection connection;
    private PreparedStatement updateBalanceStatement;
    private PreparedStatement logTransactionStatement;
    private String updateBalancesSql;
    private String logTransactionSql;

    private String jdbcUrl;
    private String username;
    private String password;

    public ExactlyOncePostgresSink(String jdbcUrl, String username, String password) {
        super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
        this.jdbcUrl = jdbcUrl;
        this.username = username;
        this.password = password;

        this.updateBalancesSql = "UPDATE postgres.public.account " +
                "SET balance = balance - ? " +
                "WHERE account_id = ?; " +
                "UPDATE postgres.public.account " +
                "SET balance = balance + ? " +
                "WHERE account_id = ?; ";
        this.logTransactionSql = "INSERT INTO postgres.public.transactions (transaction_id, from_account, to_account, amount) " +
                "VALUES(?, ?, ?, ?); ";
    }
    @Override
    protected void invoke(Connection connection, Transaction transaction, Context context) throws Exception {
        try {
            // Обновление балансов
            updateBalanceStatement.setDouble(1, transaction.getAmount());
            updateBalanceStatement.setString(2, transaction.getSendingClientAccountNumber());
            updateBalanceStatement.setDouble(3, transaction.getAmount());
            updateBalanceStatement.setString(4, transaction.getReceivingClientAccountNumber());

            // Логирование транзакции
            logTransactionStatement.setString(1, transaction.getTransactionId());
            logTransactionStatement.setString(2, transaction.getSendingClientAccountNumber());
            logTransactionStatement.setString(3, transaction.getReceivingClientAccountNumber());
            logTransactionStatement.setDouble(4, transaction.getAmount());

            updateBalanceStatement.executeUpdate();
            logTransactionStatement.executeUpdate();
            logger.info("Транзакция обработана: {}", transaction);
        }
        catch (Exception e) {
            logger.error("Ошибка при обработке транзакции: {}", transaction, e);
            throw new RuntimeException(e);
        }

    }

    @Override
    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        connection = DriverManager.getConnection(jdbcUrl, username, password);
        connection.setAutoCommit(false);
        updateBalanceStatement = connection.prepareStatement(this.updateBalancesSql);
        logTransactionStatement = connection.prepareStatement(this.logTransactionSql);
    }

    @Override
    protected Connection beginTransaction() throws Exception {
        Connection conn = DriverManager.getConnection(this.jdbcUrl, this.username, this.password);
        conn.setAutoCommit(false);
        logger.info("Транзакция начата с новым соединением базы данных");
        return conn;
    }

    @Override
    protected void preCommit(Connection connection) throws Exception {

    }

    @Override
    protected void commit(Connection connection) {
        try{
            connection.commit();
            logger.info("Транзакция зафиксирована");
        }
        catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void abort(Connection connection) {
        try {
            connection.rollback();
            logger.warn("Откат.");
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public void close() throws Exception {
        if (updateBalanceStatement != null) {
            updateBalanceStatement.close();
        }
        if (logTransactionStatement != null) {
            logTransactionStatement.close();
        }
        if (connection != null) {
            connection.close();
        }
        super.close();
        logger.info("ExactlyOncePostgresSink закрыт");
    }
}

Поведение, которое наблюдается, заключается в том, что вызывается функция beginTransaction несколько раз, затем по мере поступления записей вызывается функция invoke, после чего происходит откат транзакции, что, как я считаю, вызвано ошибкой, что транзакцию нельзя зафиксировать:

java.lang.IllegalAccessError: failed to access class java.util.concurrent.locks.ReentrantLock$NonfairSync from class reflectasm.java.util.concurrent.locks.ReentrantLock$NonfairSyncConstructorAccess (java.util.concurrent.locks.ReentrantLock$NonfairSync is in module java.base of loader 'bootstrap'; reflectasm.java.util.concurrent.locks.ReentrantLock$NonfairSyncConstructorAccess is in unnamed module of loader 'app')
    at reflectasm.java.util.concurrent.locks.ReentrantLock$NonfairSyncConstructorAccess.newInstance(Unknown Source)
    at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy$1.newInstance(Kryo.java:1193)
    at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
    at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
    at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField.copy(UnsafeCacheFields.java:297)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634)
    at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
    at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField.copy(UnsafeCacheFields.java:297)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634)
    at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:298)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.copy(TwoPhaseCommitSinkFunction.java:720)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.copy(TwoPhaseCommitSinkFunction.java:669)
    at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:70)
    at org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:67)
    at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:81)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:85)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:40)
    at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:185)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:720)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:352)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1242)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$15(StreamTask.java:1201)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)

Судя по всему, кажется, что переданный в конструктор KryoSerializer класс java.sql.Connection вызывает проблему, но я не могу понять, как именно. Мой вопрос: какова правильная реализация KryoSerializer в этом случае?

Версия Java SDK: 11

Версия Flink: 1.19.1

Ответ

Проблема, с которой вы столкнулись, связана с тем, что класс java.sql.Connection не может быть сериализован с помощью KryoSerializer. Это происходит из-за того, что Kryo пытается получить доступ к полям внутреннего класса ReentrantLock$NonfairSync, что приводит к ошибке IllegalAccessError.

Вот шаги для решения проблемы:

  1. Используйте сериализацию по умолчанию: Вместо передачи класса Connection в KryoSerializer, вы можете использовать другую стратегию сериализации, так как объект Connection является связующим с ресурсами, которые не могут быть сериализованы (например, соединение с базой данных). Серверные соединения, как правило, не сохраняются в состоянии и не должны быть сериализованы.

  2. Измените реализацию ExactlyOncePostgresSink:
    Вам не нужно использовать KryoSerializer для сериализации Connection. Вместо этого, вы можете использовать VoidSerializer (или просто использовать стандартную сериализацию для вашего типа данных), сохранив логику работы с соединениями на уровне методов open, beginTransaction, commit, и abort. Пример:

    public class ExactlyOncePostgresSink extends TwoPhaseCommitSinkFunction<Transaction, Void, Void> {
    
       // Остальной код остается без изменений
    
       public ExactlyOncePostgresSink(String jdbcUrl, String username, String password) {
           super(VoidSerializer.INSTANCE, VoidSerializer.INSTANCE);
           // Остальные параметры и инициализация
       }
    
       @Override
       protected Connection beginTransaction() throws Exception {
           Connection conn = DriverManager.getConnection(this.jdbcUrl, this.username, this.password);
           conn.setAutoCommit(false);
           logger.info("Transaction started with a new database connection");
           return conn;
       }
    
       @Override
       protected void preCommit(Connection connection) throws Exception {
           // здесь может быть запрещено делать изменения
       }
    
       @Override
       protected void commit(Connection connection) {
           try {
               connection.commit();
               logger.info("Transaction committed");
           } catch (SQLException e) {
               throw new RuntimeException(e);
           }
       }
    
       @Override
       protected void abort(Connection connection) {
           try {
               connection.rollback();
               logger.warn("Rollback.");
           } catch (SQLException e) {
               throw new RuntimeException(e);
           }
       }
    
       @Override
       public void close() throws Exception {
           // Закрытие соединения и подготовленных выражений
           if (updateBalanceStatement != null) {
               updateBalanceStatement.close();
           }
           if (logTransactionStatement != null) {
               logTransactionStatement.close();
           }
           if (connection != null) {
               connection.close();
           }
           super.close();
           logger.info("ExactlyOncePostgresSink закрыт");
       }
    }
  3. Проверьте другие зависимости: Убедитесь, что у вас нет конфликтующих зависимостей в вашем проекте. Иногда проблемы возникают из-за конфликтующих версий библиотек.

  4. Проверка конфигурации Flink: Убедитесь, что ваша версия Flink правильно настроена и соответствует используемым библиотекам и версиям базы данных.

  5. Логирование и отладка: Наконец, добавьте дополнительные журналы для лучшего понимания потока выполнения, особенно перед и после транзакций, чтобы выявить места, где проблема может проявляться.

Следуя приведенным выше рекомендациям, вы сможете избежать ошибки IllegalAccessError в вашем проекте на Flink.

Оцените материал
Добавить комментарий

Капча загружается...