Почему ListView работает лучше, чем ArrayList

Вопрос или проблема

Я столкнулся с проблемой при настройке UDAF с использованием Flink (версия 1.20). Я хотел реализовать UDAF, который рассчитывает медиану, и использовал следующие два метода:

public class MedianUDAF2 extends AggregateFunction<Double, MedianUDAF2.State> {

    public static class State {

        public int scale = 2;

        public ListView<Double> numbers;

        public State() {}
    }

    @Override
    public State createAccumulator() {
        State state = new State();
        state.numbers = new ListView<>();
        return state;
    }

    public void accumulate(State acc, Double val, Integer scale) throws Exception {
        acc.numbers.add(val);
        if (scale != null && scale > 0) acc.scale = scale;
    }

    public void merge(State acc, Iterable<State> it) throws Exception {
        for (State a : it) {
            acc.numbers.addAll(a.numbers.getList());
        }
    }

    @Override
    public Double getValue(State acc) {
        try {
            List<Double> numbers = acc.numbers.getList();
            numbers.sort(Double::compareTo);
            double n = numbers.size() - 1;
            double index = n * 0.5;

            int low = (int) Math.floor(index);
            int high = (int) Math.ceil(index);

            double value = low == high ? (numbers.get(low) + numbers.get(high)) * 0.5 : numbers.get(high);
            BigDecimal decimal = new BigDecimal(value);
            return decimal.setScale(acc.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
        } catch (Exception ignored) {
        }
        return 0.0;
    }
}
public class MedianUDAF extends AggregateFunction<Double, MedianUDAF.State> {

    public static class State {

        public int scale = 2;

        @DataTypeHint(value = "ARRAY<DOUBLE>")
        public ArrayList<Double> numbers;

        public State() {}
    }

    @Override
    public State createAccumulator() {
        State state = new State();
        state.numbers = new ArrayList<>();
        return state;
    }

    public void accumulate(State acc, Double val, Integer scale) throws Exception {
        acc.numbers.add(val);
        if (scale != null && scale > 0) acc.scale = scale;
    }

    public void merge(State acc, Iterable<State> it) throws Exception {
        for (State a : it) {
            acc.numbers.addAll(a.numbers);
        }
    }

    @Override
    public Double getValue(State acc) {
        try {
            List<Double> numbers = acc.numbers;
            numbers.sort(Double::compareTo);
            double n = numbers.size() - 1;
            double index = n * 0.5;

            int low = (int) Math.floor(index);
            int high = (int) Math.ceil(index);

            double value = low == high ? (numbers.get(low) + numbers.get(high)) * 0.5 : numbers.get(high);
            BigDecimal decimal = new BigDecimal(value);
            return decimal.setScale(acc.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
        } catch (Exception ignored) {
        }
        return 0.0;
    }
}
tableEnvironment.createTemporarySystemFunction("median", new MedianUDAF()); // Или new MedianUDAF2()
Table table = tableEnvironment.sqlQuery("select median(l_linenumber, 2) from lineitem");

Их отличие заключается в том, что один использует ArrayList, а другой – ListView в State, и разница в производительности очень большая. Почему?

В комментариях к ListView говорится, что он будет использовать бэкенд состояния при больших объемах данных. До Flink-table-planner 1.14 в addAccumulatorDataViews в AggregationCodeGenerator можно было видеть этот процесс конверсии, но в версии 1.20 он больше не виден. Я пытался отладить этот процесс конверсии в классе AggsHandlerCodeGenerator, но не смог добиться успеха. Могу ли я спросить, где произошел этот процесс конверсии и как я должен наблюдать за этим явлением? Большое спасибо!!!

Ответ или решение

Конечно, давайте разберем вопрос о производительности ListView по сравнению с ArrayList в контексте вашего случая с расчетом медианы в Flink.

Разница в производительности ListView и ArrayList

На первый взгляд, разница между ListView и ArrayList может показаться незначительной, но при работе с большими объемами данных она становится критически важной. Давайте рассмотрим несколько направлений, которые могут объяснить преимущества ListView.

  1. Управление состоянием:

    • ListView предназначен для хранения состояния в рамках задачи Flink и позволяет эффективно обрабатывать большие объемы данных. Он оптимизирован для работы с распределенным состоянием, что позволяет ему интегрироваться с бекендом состояний Flink, такими как RocksDB или filesystem-based state backends. Это значит, что ListView может хранить данные на диске, а не в оперативной памяти, что значительно снижает потребление памяти для больших объемов данных.
  2. Сериализация:

    • ListView использует специальную стратегию сериализации, которая позволяет ему сохранять добавленные элементы в безопасном виде. В отличие от этого, ArrayList требует больше ресурсов для сериализации и десериализации данных, что может приводить к замедлению работы при выполнении операций accumulate и merge.
  3. Производительность и память:

    • В случае больших наборов данных ArrayList может столкнуться с проблемами динамического расширения. Как только размер массива, на который ссылается ArrayList, превышает его текущую емкость, он должен выделить новый массив и скопировать элементы из старого в новый. В отличие от этого, ListView спроектирован таким образом, чтобы минимизировать затраты на память и избегать дополнительных копирований.
  4. Поддержка оператора:
    • ListView также может предоставлять дополнительные возможности, такие как эффективная работа с различными типами агрегатов и упрощенное управление порядком обработки элементов. Возможность добавления элементов и последующего получения их в порядке добавления без необходимости ручного контроля порядка может значительно упростить реализацию ваших UDAF.

Наблюдение за процессами конверсии

Как вы упомянули, с Flink 1.14 и выше процесс генерации кода изменился, и видимость этапов преобразования была уменьшена. Чтобы лучше понять, что происходит под капотом, вы можете сделать следующее:

  1. Логирование и отладка:

    • Используйте логирование на этапе обработки, чтобы отследить, как данные проходят через различные этапы вашего UDAF. Вставьте сообщения о логах в методы accumulate, merge и getValue.
  2. Использование профилирования:

    • Профилируйте вашу задачу, используя инструменты, такие как JVM профайлеры, которые могут помочь вам зафиксировать время выполнения каждого метода вашего UDAF и выявить узкие места.
  3. Обратитесь к документации Flink:

    • Просмотрите последние изменения и улучшения в документации Flink, так как это может дать вам представление о том, как происходят изменения в обработке состояний и генерации кода.
  4. Исследование исходного кода:
    • Если вам удобно, вы можете рассмотреть исходный код Flink для классов, связанных с Aggregations и State Backends, чтобы получить более глубокое понимание процесса.

Заключение

Использование ListView вместо ArrayList в вашем UDAF для расчетов медианы оказывается значительно более эффективным подходом при работе с большими наборами данных в системе Flink. Это связано с его оптимизированным управлением состоянием, сериализацией и общей производительностью. Надеюсь, эти аспекты помогут вам лучше понять разницу и прояснить, где и как вы можете отслеживать происходящие процессы. Если у вас остались вопросы, не стесняйтесь задавать их!

Оцените материал
Добавить комментарий

Капча загружается...