Вопрос или проблема
Я столкнулся с проблемой при настройке UDAF с использованием Flink (версия 1.20). Я хотел реализовать UDAF, который рассчитывает медиану, и использовал следующие два метода:
public class MedianUDAF2 extends AggregateFunction<Double, MedianUDAF2.State> {
public static class State {
public int scale = 2;
public ListView<Double> numbers;
public State() {}
}
@Override
public State createAccumulator() {
State state = new State();
state.numbers = new ListView<>();
return state;
}
public void accumulate(State acc, Double val, Integer scale) throws Exception {
acc.numbers.add(val);
if (scale != null && scale > 0) acc.scale = scale;
}
public void merge(State acc, Iterable<State> it) throws Exception {
for (State a : it) {
acc.numbers.addAll(a.numbers.getList());
}
}
@Override
public Double getValue(State acc) {
try {
List<Double> numbers = acc.numbers.getList();
numbers.sort(Double::compareTo);
double n = numbers.size() - 1;
double index = n * 0.5;
int low = (int) Math.floor(index);
int high = (int) Math.ceil(index);
double value = low == high ? (numbers.get(low) + numbers.get(high)) * 0.5 : numbers.get(high);
BigDecimal decimal = new BigDecimal(value);
return decimal.setScale(acc.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
} catch (Exception ignored) {
}
return 0.0;
}
}
public class MedianUDAF extends AggregateFunction<Double, MedianUDAF.State> {
public static class State {
public int scale = 2;
@DataTypeHint(value = "ARRAY<DOUBLE>")
public ArrayList<Double> numbers;
public State() {}
}
@Override
public State createAccumulator() {
State state = new State();
state.numbers = new ArrayList<>();
return state;
}
public void accumulate(State acc, Double val, Integer scale) throws Exception {
acc.numbers.add(val);
if (scale != null && scale > 0) acc.scale = scale;
}
public void merge(State acc, Iterable<State> it) throws Exception {
for (State a : it) {
acc.numbers.addAll(a.numbers);
}
}
@Override
public Double getValue(State acc) {
try {
List<Double> numbers = acc.numbers;
numbers.sort(Double::compareTo);
double n = numbers.size() - 1;
double index = n * 0.5;
int low = (int) Math.floor(index);
int high = (int) Math.ceil(index);
double value = low == high ? (numbers.get(low) + numbers.get(high)) * 0.5 : numbers.get(high);
BigDecimal decimal = new BigDecimal(value);
return decimal.setScale(acc.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
} catch (Exception ignored) {
}
return 0.0;
}
}
tableEnvironment.createTemporarySystemFunction("median", new MedianUDAF()); // Или new MedianUDAF2()
Table table = tableEnvironment.sqlQuery("select median(l_linenumber, 2) from lineitem");
Их отличие заключается в том, что один использует ArrayList, а другой – ListView в State, и разница в производительности очень большая. Почему?
В комментариях к ListView говорится, что он будет использовать бэкенд состояния при больших объемах данных. До Flink-table-planner 1.14 в addAccumulatorDataViews в AggregationCodeGenerator можно было видеть этот процесс конверсии, но в версии 1.20 он больше не виден. Я пытался отладить этот процесс конверсии в классе AggsHandlerCodeGenerator, но не смог добиться успеха. Могу ли я спросить, где произошел этот процесс конверсии и как я должен наблюдать за этим явлением? Большое спасибо!!!
Ответ или решение
Конечно, давайте разберем вопрос о производительности ListView
по сравнению с ArrayList
в контексте вашего случая с расчетом медианы в Flink.
Разница в производительности ListView
и ArrayList
На первый взгляд, разница между ListView
и ArrayList
может показаться незначительной, но при работе с большими объемами данных она становится критически важной. Давайте рассмотрим несколько направлений, которые могут объяснить преимущества ListView
.
-
Управление состоянием:
ListView
предназначен для хранения состояния в рамках задачи Flink и позволяет эффективно обрабатывать большие объемы данных. Он оптимизирован для работы с распределенным состоянием, что позволяет ему интегрироваться с бекендом состояний Flink, такими как RocksDB или filesystem-based state backends. Это значит, чтоListView
может хранить данные на диске, а не в оперативной памяти, что значительно снижает потребление памяти для больших объемов данных.
-
Сериализация:
ListView
использует специальную стратегию сериализации, которая позволяет ему сохранять добавленные элементы в безопасном виде. В отличие от этого,ArrayList
требует больше ресурсов для сериализации и десериализации данных, что может приводить к замедлению работы при выполнении операций accumulate и merge.
-
Производительность и память:
- В случае больших наборов данных
ArrayList
может столкнуться с проблемами динамического расширения. Как только размер массива, на который ссылаетсяArrayList
, превышает его текущую емкость, он должен выделить новый массив и скопировать элементы из старого в новый. В отличие от этого,ListView
спроектирован таким образом, чтобы минимизировать затраты на память и избегать дополнительных копирований.
- В случае больших наборов данных
- Поддержка оператора:
ListView
также может предоставлять дополнительные возможности, такие как эффективная работа с различными типами агрегатов и упрощенное управление порядком обработки элементов. Возможность добавления элементов и последующего получения их в порядке добавления без необходимости ручного контроля порядка может значительно упростить реализацию ваших UDAF.
Наблюдение за процессами конверсии
Как вы упомянули, с Flink 1.14 и выше процесс генерации кода изменился, и видимость этапов преобразования была уменьшена. Чтобы лучше понять, что происходит под капотом, вы можете сделать следующее:
-
Логирование и отладка:
- Используйте логирование на этапе обработки, чтобы отследить, как данные проходят через различные этапы вашего UDAF. Вставьте сообщения о логах в методы accumulate, merge и getValue.
-
Использование профилирования:
- Профилируйте вашу задачу, используя инструменты, такие как JVM профайлеры, которые могут помочь вам зафиксировать время выполнения каждого метода вашего UDAF и выявить узкие места.
-
Обратитесь к документации Flink:
- Просмотрите последние изменения и улучшения в документации Flink, так как это может дать вам представление о том, как происходят изменения в обработке состояний и генерации кода.
- Исследование исходного кода:
- Если вам удобно, вы можете рассмотреть исходный код Flink для классов, связанных с
Aggregations
иState Backends
, чтобы получить более глубокое понимание процесса.
- Если вам удобно, вы можете рассмотреть исходный код Flink для классов, связанных с
Заключение
Использование ListView
вместо ArrayList
в вашем UDAF для расчетов медианы оказывается значительно более эффективным подходом при работе с большими наборами данных в системе Flink. Это связано с его оптимизированным управлением состоянием, сериализацией и общей производительностью. Надеюсь, эти аспекты помогут вам лучше понять разницу и прояснить, где и как вы можете отслеживать происходящие процессы. Если у вас остались вопросы, не стесняйтесь задавать их!