Вопрос или проблема
Предыстория
Я написал класс на Python
, который предназначен для вычисления времени, необходимого для распространения светового луча между двумя точками (init_point
и term_point
) в сложной среде (модельируемой с использованием параметров medium_model = [A, B, C]
).
Для вычисления этого времени прохождения света нам нужно два условия:
- Указанные начальная и конечная точки.
- Угол (
launch_angle
), который световой луч образует с осью x в начальной точке.
У меня есть выражение для z-координаты луча в терминах x и y координат (которые мы рассматриваем как одну “x”-координату (x = sqrt(x**2 + y**2)
), угол launch_angle
и начальные и конечные точки, так что мы можем определить последние, просто оптимизируя разницу между известной конечной z-координатой и тем, что мы получаем для различных значений launch_angle
.
Код
from dataclasses import dataclass
import warnings
import numpy as np
# Подавить предупреждения (иногда происходит переполнение в _calculate_z_coord и т.д.)
warnings.filterwarnings("ignore", category=RuntimeWarning)
@dataclass
class RayTracer:
medium_model: np.ndarray
SPEED_OF_LIGHT = 299792458 # Скорость света в м/с
def _calculate_z_coord(self, x: np.ndarray, launch_angle: np.ndarray, x0: np.ndarray, z0: np.ndarray) -> np.ndarray:
"""Вычислить z-координату на основе угла запуска и других параметров."""
A, B, C = self.medium_model
exp_Cz0 = np.exp(C * z0)
cos_launch_angle = np.cos(launch_angle)
beta = (A - B * exp_Cz0) * cos_launch_angle
sqrt_A2_beta2 = np.sqrt(A**2 - beta**2)
K = C * sqrt_A2_beta2 / beta
term1 = A**2 - beta**2
term2 = A * B * exp_Cz0
sqrt_term = np.sqrt(term1 + 2 * term2 + B**2 * exp_Cz0**2)
# Предварительные вычисления для повышения эффективности
log_arg = term1 + term2 + sqrt_A2_beta2 * sqrt_term
t = (sqrt_A2_beta2 * C * x0 - beta * C * z0 + beta * np.log(log_arg)) / (sqrt_A2_beta2 * C)
exp_Kx = np.exp(K * x)
exp_Kt = np.exp(K * t)
log_term_num = 2 * term1 * np.exp(K * (t + x))
log_term_den = beta**2 * B**2 * exp_Kx**2 - 2 * A * B * exp_Kt + exp_Kt**2
log_term = log_term_num / log_term_den
return (1 / C) * np.log(log_term)
def _find_launch_angle(self, init_points: np.ndarray, term_points: np.ndarray, num_steps: int = 1000) -> np.ndarray:
"""Найти оптимальный угол запуска."""
x0 = np.hypot(init_points[:, 0], init_points[:, 1])
x1 = np.hypot(term_points[:, 0], term_points[:, 1])
# Грубый поиск с последующим точным поиском
launch_angles = np.linspace(-np.pi, np.pi, num_steps)
# Предварительные вычисления z_coords для всех углов запуска
z_coords = self._calculate_z_coord(x1[:, np.newaxis], launch_angles, x0[:, np.newaxis], init_points[:, 2, np.newaxis])
term_z = term_points[:, 2][:, np.newaxis]
objective_values = (z_coords - term_z)**2
# Найти наилучшие углы
best_indices = np.nanargmin(objective_values, axis=1)
best_angles = launch_angles[best_indices]
return best_angles
def transit_time(self, init_points: np.ndarray, term_points: np.ndarray) -> np.ndarray:
"""Вычислить время прохождения."""
A, B, C = self.medium_model
# Векторизованный поиск угла запуска
launch_angles = self._find_launch_angle(init_points, term_points)
exp_Cz_init = np.exp(C * init_points[:, 2])
beta = np.abs((A - B * exp_Cz_init) * np.cos(launch_angles))
sqrt_A2_beta2 = np.sqrt(A**2 - beta**2)
K = C * sqrt_A2_beta2 / beta
def time_expression(z: np.ndarray, beta: np.ndarray, K: np.ndarray) -> np.ndarray:
exp_Cz = np.exp(C * z)
t = np.sqrt((A + B * exp_Cz - beta) / (A + B * exp_Cz + beta))
alpha = np.sqrt((A - beta) / (A + beta))
log_expr = np.log(np.abs((t - alpha) / (t + alpha)))
return A * np.sqrt((C**2 + K**2) / (C**2 * K**2)) * log_expr
time_diff = time_expression(term_points[:, 2], beta, K) - time_expression(init_points[:, 2], beta, K)
return time_diff / self.SPEED_OF_LIGHT
# Пример использования
if __name__ == "__main__":
ray_tracer = RayTracer(np.array([1.78, 0.454, 0.0132]))
# Генерация случайных начальных и конечных точек
num_points = 100000
init_points = np.random.uniform(low=-50, high=50, size=(num_points, 3))
term_points = np.random.uniform(low=-50, high=50, size=(num_points, 3))
# Я включаю простую, но грубую меру времени выполнения с использованием `time` здесь, для вашего сведения.
import time
start_time = time.time()
transit_times = ray_tracer.transit_time(init_points, term_points)
end_time = time.time()
# Печать результатов
print("Время прохождения:", transit_times)
print(f"Затраченное время: {end_time - start_time:.6f} секунд")
Код векторизован. Инициализация RayTracer
осуществляется следующим образом:
my_ray_tracer = RayTracer(np.array([A, B, C]))
Функция transit_time()
векторизована и принимает массив начальных и конечных точек, например:
init_points = np.random.uniform(low=-50, high=50, size=(num_points, 3))
term_points = np.random.uniform(low=-50, high=50, size=(num_points, 3))
transit_times = ray_tracer.transit_time(init_points, term_points)
Моя “Проблема”
В некоторых приложениях я хотел бы иметь возможность быстро вычислить время распространения для тысяч, десятков тысяч или даже сотен тысяч лучей. Хотя этот код довольно быстрый, он все еще недостаточно быстр для некоторых из моих приложений. Я использовал все приемы, которые мне известны (например, векторизацию), и значительно сократил среднее время выполнения, но все равно не настолько значительно, чтобы это было полезно для того, что мне нужно.
Мои требования таковы:
- Меня не волнует сильно эффективность памяти, только время выполнения
transit_time()
. - Я вношу это в кодовую базу, зависимости которой только
NumPy
иSciPy
. Я не могу использовать дополнительные внешние библиотеки.
Как я могу еще улучшить время выполнения этого кода?
Я знаю, что вы не хотите устанавливать зависимости, но, возможно, вы можете установить те, которые нужны для вашего собственного развития. В таком случае line_profiler
может помочь вам. Посмотрите https://pypi.org/project/line-profiler/.
Когда я профилировал transit_time
таким образом, 99% времени тратится на _find_launch_angle
.
Та же ситуация -> 94% времени тратится на _calculate_z_coord
:
- 19.8%
t = (sqrt_A2_beta2 * C * x0 - beta * C * z0 + beta * np.log(log_arg)) / (sqrt_A2_beta2 * C)
- 11%
log_term_num = 2 * term1 * np.exp(K * (t + x))
- 12%
log_term_den = beta**2 * B**2 * exp_Kx**2 - 2 * A * B * exp_Kt + exp_Kt**2
Я всегда считаю, что глубокое мышление более эффективно, чем любые технологические изменения. Но, оптимизировать вычисления в numpy очень трудно.
Я знаю, что вы не должны добавлять дополнительные зависимости, но я уверена, что вы можете согласиться на одну установку pip и одну дополнительную строку кода… Давайте попробуем…
Я вижу, что _calculate_z_coord
не использует self
, кроме передачи medium_model
. Поэтому я немного адаптировала, чтобы сделать это верхним уровнем функции вместо метода:
def _calculate_z_coord(medium_model: np.array, x: np.ndarray, launch_angle: np.ndarray, x0: np.ndarray, z0: np.ndarray) -> np.ndarray:
"""Вычислить z-координату на основе угла запуска и других параметров."""
A, B, C = medium_model
...
class RayTracer:
def _func_launch_angle(...):
...
z_coords = _calculate_z_coord(self.medium_model, x1[:, np.newaxis], launch_angles, x0[:, np.newaxis], init_points[:, 2, np.newaxis])
...
А теперь сама оптимизация: использование numba
для компиляции Just-In-Time и других автоматических оптимизаций:
@numba.jit(nopython=True, parallel=False, fastmath=True)
def _calculate_z_coord(medium_model: np.array, x: np.ndarray, launch_angle: np.ndarray, x0: np.ndarray, z0: np.ndarray) -> np.ndarray:
"""Вычислить z-координату на основе угла запуска и других параметров."""
Воздействие? Переместить время выполнения с 16 секунд до 10 секунд… только преобразовав метод в функцию и украсив ее.
Почему это предварительное преобразование? Потому что numba
хорошо работает с массивами numpy, а @numba.jitclass
все еще экспериментально…
Возвращаясь к этому ограничению по зависимостям, естественно, что вы используете numpy
для таких интенсивных вычислений. Я думаю, что numba
может стать естественным тоже.
К сожалению, как вы можете догадаться из предложения, я пробовал с параллелизацией numba
, но это не сработало. Вы можете попробовать установить parallel=True
и отладить это, если хотите получить дополнительный прирост производительности, но я не уверена, имеет ли это смысл.
Пожалуйста, ознакомьтесь с https://numba.pydata.org.
Другой вариант, который вы можете попробовать, – перейти к библиотекам, поддерживающим массовую параллельную обработку на GPU, таким как tensorflow или pytorch. Их в основном используют для ИИ, но поскольку … ИИ выполняет множество тензорных вычислений … как RayTracing. Поэтому я думаю, что вы найдете чудеса там … после полного редизайна вашего алгоритма. Но в этом случае история с зависимостями станет более сложной…
numba
вероятно сможет оптимизировать некоторые ваши выражения, ради этого он и был создан. Его настоятельно рекомендуют, но я лично никогда не смог получить от него особой пользы на практике, так как он отстает и требует устаревшую версию numpy
каждый раз, когда я пробовала.
Многие ваши выражения включают в себя несколько векторных элементов, которые создают временные векторы для каждого бинарного оператора. Например, np.sqrt(A**2 - beta**2)
создает 3 временных вектора, последний из которых передается в np.sqrt
, что создает 4-й результирующий вектор.
Вы можете попытаться выполнять модификации векторов на месте, чтобы избежать создания временных векторов для каждого бинарного оператора, и явно указывать выходные векторы для операторов numpy
, чтобы избежать выделения новых векторов. Это значительно ускоряет код, но делает его менее удобным для поддержания. Однако бинарные векторные операторы все еще довольно неэффективны с точки зрения аллокаций памяти, количества загрузок и сохранений и циклов.
Наивысшая производительность ваших векторных вычислений может быть достигнута путем кодирования этих вычислений как элемент-wise с ручным кодом циклов на Cython или C++.
Например, np.sqrt(A**2 - beta**2)
выполняет одно выделение вектора и цикл для вычисления A**2
, вызывая функцию std::pow
для каждого элемента, то же самое для beta**2
, другое выделение и цикл для разности этих временных векторов и еще одно выделение и цикл для np.sqrt
.
Используя Cython или C++, np.sqrt(A**2 - beta**2)
может быть рассчитан с одним выделением памяти для результата и одним циклом, загружая элементы как A
, так и beta
в каждой итерации, возводя их в квадрат простым умножением вместо вызова std::pow
, вычисляя разность и квадратный корень и сохраняя результат. В Cython ему придется вызвать std::sqrt
, но в C++ он может вызвать инструкцию процессора для извлечения квадратного корня, так что все это выражение вычисляется в одном цикле без вызовов библиотечных функций. Это обеспечит на порядок более быстрые вычисления для таких выражений. Более того, в C++ этот один цикл может использовать SIMD-инструкции, чтобы по крайней мере вчетверить свою пропускную способность. numpy
также делает это, но он не может вычислить все это выражение в одном цикле с SIMD и без вызовов библиотек или дополнительных аллокаций памяти.
Мой личный подход к вычислению таких векторных выражений с несколькими терминами – создание обертки функции Cython, которая принимает входные массивы в виде непрерывных представлений памяти, выделяет массив для результата и передает простые указатели на массивы и длину в функцию C++, которая выполняет вычисление выражения. Функция C++ может использовать встроенные SIMD-векторные инструкции, ограниченные указатели, подсказки ветвления и т.д., чтобы произвести вычисление в одном цикле самым быстрым образом. Однако простой элемент-wise цикл C++ для выполнения вычисления, минимизируя загрузки и сохранения, часто бывает достаточно, так как загрузки и сохранения являются основной узкой местом.
Я нашел несколько идей, которые обеспечивают примерно 20% ускорение, не включая никаких новых библиотек.
Во-первых, эта строка:
log_term_num = 2 * term1 * np.exp(K * (t + x))
может быть переписана как
log_term_num = 2 * term1 * exp_Kx * exp_Kt
что экономит одну операцию exponent.
Вторая идея касается этой строки:
t = (sqrt_A2_beta2 * C * x0 - beta * C * z0 + beta * np.log(log_arg)) / (sqrt_A2_beta2 * C)
Это можно упростить через следующую алгебру:
t = (sqrt_A2_beta2 * C * x0 - beta * C * z0 + beta * np.log(log_arg)) / (sqrt_A2_beta2 * C)
# делим верх и низ на beta
t = (sqrt_A2_beta2 * C / beta * x0 - C * z0 + np.log(log_arg)) / (sqrt_A2_beta2 * C / beta)
# заменяем sqrt_A2_beta2 * C / beta на K
t = (K * x0 - C * z0 + np.log(log_arg)) / K
# выносим K * x0 за скобки
t = x0 + (- C * z0 + np.log(log_arg)) / K
Таким образом, эта строка эквивалентна
t = x0 + (- C * z0 + np.log(log_arg)) / K
У меня были проблемы с проверкой, эквивалентно ли это оригинальному выражению – я думаю, эта строка может быть численно нестабильной. Ваша Милость.
Вы можете добавить многопоточность, numpy сбрасывает GIL, так что вы получаете некоторое ускорение из этого, приложения на numpy обычно достигают предела пропускной способности памяти раньше, чем предела вычислений ядер, поэтому не ожидайте слишком большого ускорения. (по крайней мере не ожидайте полной параллельности)
start_time = time.time()
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
with ThreadPool(cpu_count()) as pool:
transit_times = np.hstack(pool.starmap(ray_tracer.transit_time,
zip(np.array_split(init_points, cpu_count()),
np.array_split(term_points,cpu_count()))))
end_time = time.time()
На этой четырехъядерной машине время выполнения сократилось с 15.893981 секунд до 9.272425 секунд, но фактическая выгода зависит как от количества ядер, так и от пропускной способности памяти, (в этом случае я знаю, что соотношение точно равно тому, на сколько пропускная способность моей оперативной памяти больше, чем пропускная способность одного ядра, так что я знаю, что мы достигли предела пропускной способности памяти, а не предела вычислений).
Примечание: всегда используйте фиксированное случайное начальное значение, если вы тестируете код, этот код будет давать существенно разные показатели времени каждую разу из-за случайного значения.
Я не ожидаю, что этот код будет масштабироваться хорошо, потому что он и так ограничен пропускной способностью памяти (что трудно решить, используя только numpy), но если у вас есть мощная машина с несколькими планками оперативной памяти, и на самом деле предельные вычисления происходят раньше, чем предел пропускной способности памяти, то рассмотрите возможность использования многопроцессорности вместо этого, (просто измените ThreadPool
на Pool
в приведенном выше коде), что имеет дополнительную накладную, но должно устранить эффект GIL, увеличивая предельные вычисления до тех пор, пока вы снова не достигнете предела пропускной способности.
Что касается того, как вы можете преодолеть пределы пропускной способности памяти (кроме как купить больше планок оперативной памяти) … есть numba, которую вы можете использовать, чтобы обойти то, насколько медленен интерпретатор python, поэтому вы можете убрать векторизацию (потому что векторизация – это причина, по которой вы достигаете предела пропускной способности памяти), но если вам действительно нужно выжать максимальную производительность из этого, то вам стоит рассмотреть возможность написания расширения на C/C++ для выполнения вычислений.
Игра с размером каждого блока также помогает, но ее эффект зависит от размера кеша процессора, например, на этой машине использование блоков по 100 элементов дало наилучшее ускорение до 4.067283 секунды (снижение с 15 секунд), но повторное выполнение того же кода на компьютере с другим размером кеша и другим размером блока даст лучший результат.
with ThreadPool(cpu_count()) as pool:
transit_times = np.hstack(pool.starmap(ray_tracer.transit_time,
zip(np.array_split(init_points, len(init_points)/100),
np.array_split(term_points, len(term_points)/100))))
Ответ или решение
Оптимизация времени выполнения аналитического алгоритма трассировки лучей
Введение
Разработка алгоритма трассировки лучей на Python для расчета времени, необходимого для распространения света между двумя точками в сложной среде, может столкнуться с проблемами производительности при большом количестве точек. В данном ответе мы рассмотрим подходы по оптимизации исполнения вашего кода, основываясь на профилировании, изменениях в структуре кода и подходах к многопоточности и использованию дополнительных технологий.
Профилирование и определение узких мест
Как вы уже заметили, основными местами, занимающими время исполнения, являются методы _find_launch_angle
и _calculate_z_coord
. Использование инструментов профилирования, таких как line_profiler
, может помочь в дальнейшем выявлении узких мест, но вы уже выделили основные проблемные участки, что – хорошее начало.
Оптимизация кода
-
Перевод метода в функцию:
Можно сделать изменение метода_calculate_z_coord
на обычную функцию, поскольку он не используетself
. Это позволяет более эффективно использовать JIT-компиляцию с библиотекой, такой как Numba.from numba import jit @jit(nopython=True, fastmath=True, parallel=True) def _calculate_z_coord(medium_model, x, launch_angle, x0, z0): # Реализация функции будет здесь
-
Избежание передачи больших массивов:
Использование более компактных структур данных, таких как массивы NumPy, позволяет существенно сократить время выполнения. Чтобы повысить производительность, следует избегать лишних копирований и преобразований. -
Упрощение математических выражений:
Например, замена:log_term_num = 2 * term1 * np.exp(K * (t + x))
на:
log_term_num = 2 * term1 * exp_Kx * exp_Kt
Это уменьшит общее количество операций и, следовательно, повлияет на время исполнения.
-
Параллельное выполнение:
Использование встроенного многопоточности в NumPy можно реализовать при помощиmultiprocessing
. Так, можно использовать пул потоков для разбивки вычислений на несколько ядер:from multiprocessing import Pool with Pool() as pool: transit_times = pool.starmap(ray_tracer.transit_time, zip(init_points, term_points))
Расширенные подходы
-
Использование Cython или C++:
Если ваш код по-прежнему не будет удовлетворять требованиям производительности, можно рассмотреть возможность переписывания наиболее критичных участков кода на Cython или C++. Это предоставит возможность выполнять операции с массивами на низком уровне, что улучшит производительность. -
Подсчет кэширования:
Операции с кэшем также важны в контексте производительности. Следует оптимизировать размер блоков при многопоточном вычислении, чтобы лучше использовать кэш процессора. -
Параллелизация на GPU:
Если ваши задачи могут быть выполнены на GPU, библиотеки как PyTorch или TensorFlow могут значительно ускорить вычисления, но потребуют переработки кода и трудозатрат на обучение.
Заключение
Оптимизация времени выполнения алгоритма трассировки лучей может достигаться многими упомянутыми методами: от простой реорганизации структуры кода до использования JIT-компиляции и эффективных параллельных вычислений. применяя указанные методы, вы сможете улучшить производительность вашего кода и сократить время вычислений для больших массивов данных.
Не забудьте протестировать каждое изменение с использованием профилирования, чтобы убедиться в его эффективности.