Вопрос или проблема
Я преобразую программу, чтобы она использовала ИИ с многопоточными методами вместо последовательного выполнения. Я попытался смоделировать многопоточность, обрабатывая данные пакетами в моем тестовом коде сначала, чтобы убедиться, что логика назначения различных данных соответствующему потоку правильна. Я это сделал, и это работает хорошо.
Следующий код выполняет умножение матрицы на вектор и многоголовое внимание (только эти две части необходимы в моей задаче):
void mat_vec_mul(float* out, QuantizedTensor *vec, QuantizedTensor *mat, int col, int row) {
int rows_per_batch = row / num_thread;
int start_position[num_thread];
int rows_in_batches[num_thread];
for(int i = 0; i < num_thread; i++){
int rows = (i == num_thread-1) ? (row - i * rows_per_batch) : rows_per_batch;
rows_in_batches[i] = rows;
int start_row = i * rows_per_batch;
start_position[i] = start_row;
}
for(int i = 0; i < num_thread; i++){
int rows = rows_in_batches[i];
int start = start_position[i];
for(int j = 0; j < rows; j++){
int in = (start + j) * col;
float val = 0.0f;
int32_t ival = 0;
for (int k = 0; k <= col - GS; k += GS){
for(int l = 0; l < GS; l++){
ival += ((int32_t) vec->q[k + l]) * ((int32_t) mat->q[in + k + l]);
}
val += ((float) ival) * mat->s[(in + k)/GS] * vec->s[k/GS];
ival = 0;
}
out[start + j] = val;
}
}
}
// ----------------------------------------------------------------------------
// входная функция для многопоточного многоголового внимания
// @note: ВЫ НЕ МОЖЕТЕ ИЗМЕНЯТЬ СИГНАТУРУ ФУНКЦИИ!!!
void multi_head_attn(
float* out, // выходной тензор [head, head_size]
float* q, // тензор запроса [head, head_size]
float* key_cache, // кеш исторического тензора ключей [kv_head, seq_len, head_size]
float* value_cache, // кеш исторического тензора значений [kv_head, seq_len, head_size]
float* att, // буфер для оценки внимания [head, seq_len]
int seq_len,
int n_heads,
int head_size,
int kv_dim,
int kv_mul)
{
int heads_per_batch = (n_heads + num_thread - 1)/num_thread;
int start_head = 0;
int heads_in_batches[num_thread];
float* att_arr[num_thread];
for (int i = 0; i < num_thread; i++) {
int end_head = start_head + heads_per_batch;
if (end_head > n_heads) end_head = n_heads;
int heads = end_head - start_head;
if (heads <= 0) {
heads_in_batches[i] = 0;
att_arr[i] = NULL;
} else {
heads_in_batches[i] = heads;
att_arr[i] = att + start_head * seq_len;
}
start_head = end_head;
}
start_head = 0;
for(int i = 0; i < num_thread; i++){
int heads = heads_in_batches[i];
int end = start_head + heads;
for(int h = start_head; h < end; h++){
int head_index = h - start_head;
float* head_q = q + h * head_size;
float* head_att = att_arr[i] + head_index * seq_len;
int t_max = pos;
if (t_max >= seq_len) t_max = seq_len - 1;
for(int t = 0; t <= t_max; t++){
float* head_k = key_cache + t * kv_dim + (h / kv_mul) * head_size;
float score = 0.0f;
for(int j = 0; j < head_size; j++){
score += head_q[j] * head_k[j];
}
score /= sqrtf((float)head_size);
head_att[t] = score;
}
// printf("\nвход в softmax\n");
softmax(head_att, t_max+1);
float* head_out = out + h * head_size;
// printf("\nвход в memset\n");
memset(head_out, 0, head_size * sizeof(float));
for(int t = 0; t <= t_max; t++){
// printf("\npin2\n");
float* head_v = value_cache + t * kv_dim + (h / kv_mul) * head_size;
float a = head_att[t];
for(int j = 0; j < head_size; j++){
head_out[j] += a * head_v[j];
}
}
}
start_head = end;
}
}
Код для многопоточности выглядит следующим образом:
#define MAT_VEC_MUL 1 // это указывает, что поток должен выполнить умножение матрицы на вектор
#define MULTI_HEAD_ATTN 2 // это указывает, что поток должен выполнить многоголовое внимание
volatile int shutdown = 0;
typedef struct {
int id; // Уникальный ID для каждого потока
int task_type; // Тип задачи (например, MAT_VEC_MUL или MULTI_HEAD_ATTN)
float* out; // Указатель на выходной массив (для обеих задач)
QuantizedTensor* vec; // Указатель на вектор (для mat_vec_mul)
QuantizedTensor* mat; // Указатель на матрицу (для mat_vec_mul)
int rows; // Количество строк для обработки (для mat_vec_mul)
int cols; // Количество столбцов (для mat_vec_mul)
float* q; // Указатель на тензор запроса (для multi_head_attn)
float* key_cache; // Указатель на кеш ключей (для multi_head_attn)
float* value_cache; // Указатель на кеш значений (для multi_head_attn)
float* att; // Указатель на буфер оценки внимания (для multi_head_attn)
int seq_len; // Длина последовательности (для multi_head_attn)
int n_heads; // Количество голов (для multi_head_attn)
int head_size; // Размер головы (для multi_head_attn)
int kv_dim;
int kv_mul;
int start_row;
int start_head;
int pos;
} ThreadData;
pthread_t thread_pool[16]; //максимальное количество потоков 16
ThreadData thread_data[16];
int num_thread = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_var_start = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond_var_done = PTHREAD_COND_INITIALIZER;
int thread_working = 0;
int task_in_progress = 0;
void mat_vec_mul_task_func(int id, QuantizedTensor *vec, QuantizedTensor *mat, int col, int row, float* out, int start_row) {
for(int i = 0; i < row; i++){
int in = (start_row + i) * col;
float val = 0.0f;
int32_t ival = 0;
for (int j = 0; j <= col - GS; j += GS)
{
for(int k = 0; k < GS; k++){
ival += ((int32_t) vec->q[j + k]) * ((int32_t) mat->q[in + j + k]);
}
val += ((float) ival) * mat->s[(in + j) / GS] * vec->s[j / GS];
ival = 0;
}
out[start_row + i] = val;
}
}
void multi_head_attn_task_func(
int id,
float* out,
float* q,
float* key_cache,
float* value_cache,
float* att,
int seq_len,
int n_heads,
int head_size,
int kv_dim,
int kv_mul,
int start_head,
int local_pos
) {
int end_head = start_head + n_heads;
for (int h = start_head; h < end_head; h++) {
int head_index = h - start_head; // Корректированный индекс в диапазоне голов потока
// Корректированные указатели для данных, специфичных для потока
float* head_q = q + h * head_size;
float* head_att = att + head_index * seq_len; // Убедитесь, что нет перекрытия
float* head_out = out + h * head_size;
// Вычислить оценки внимания
for (int t = 0; t <= local_pos; t++) {
float* head_k = key_cache + t * kv_dim + (h / kv_mul) * head_size;
float score = 0.0f;
for (int i = 0; i < head_size; i++) {
score += head_q[i] * head_k[i];
}
score /= sqrtf((float)head_size);
head_att[t] = score;
}
// Softmax (предполагая, что он безопасен для потоков)
softmax(head_att, local_pos + 1);
// Вычислить взвешенную сумму
memset(head_out, 0, head_size * sizeof(float));
for (int t = 0; t <= local_pos; t++) {
float* head_v = value_cache + t * kv_dim + (h / kv_mul) * head_size;
float a = head_att[t];
for (int i = 0; i < head_size; i++) {
head_out[i] += a * head_v[i];
}
}
}
}
void *thr_func(void *arg) {
ThreadData *data = (ThreadData*) arg;
while (true) {
pthread_mutex_lock(&mutex);
while (!task_in_progress && !shutdown) {
pthread_cond_wait(&cond_var_start, &mutex);
}
if (shutdown) {
pthread_mutex_unlock(&mutex);
break; // Выйти из цикла и потока
}
ThreadData local_data = *data;
pthread_mutex_unlock(&mutex);
if (local_data.task_type == 0) {
continue; // Нет задания, назначенного этому потоку
}
if (local_data.task_type == MAT_VEC_MUL) {
mat_vec_mul_task_func(
local_data.id,
/*
Остальные параметры
*/
);
} else if (local_data.task_type == MULTI_HEAD_ATTN) {
multi_head_attn_task_func(
local_data.id,
/*
Остальные параметры
*/
);
}
pthread_mutex_lock(&mutex);
thread_working--;
if (thread_working == 0) {
task_in_progress = 0;
pthread_cond_signal(&cond_var_done);
}
pthread_mutex_unlock(&mutex);
}
pthread_exit(NULL);
}
void init_thr_pool(int num_thr) {
num_thread = num_thr;
for(int i = 0; i < num_thr; i++){
thread_data[i].id = i;
int init_thread = pthread_create(&thread_pool[i], NULL, thr_func, &thread_data[i]);
if (init_thread != 0){
printf("ошибка инициализации потока с номером потока: %d\n", i);
exit(EXIT_FAILURE);
}
}
}
// функция для закрытия пула потоков
// @note: ВЫ НЕ МОЖЕТЕ ИЗМЕНЯТЬ ЭТУ СИГНАТУРУ ФУНКЦИИ!!!
void close_thr_pool() {
pthread_mutex_lock(&mutex);
shutdown = 1;
pthread_cond_broadcast(&cond_var_start); // Разбудить потоки для выхода
pthread_mutex_unlock(&mutex);
for(int i = 0; i < num_thread; i++){
pthread_join(thread_pool[i], NULL);
}
}
void mat_vec_mul(float* out, QuantizedTensor *vec, QuantizedTensor *mat, int col, int row) {
int row_per_thread = row / num_thread;
int start_row = 0;
int active_threads = 0;
for (int i = 0; i < num_thread; i++) {
int rows_in_thread = (i == num_thread - 1) ? (row - start_row) : row_per_thread;
if (rows_in_thread <= 0) {
thread_data[i].task_type = 0; // Нет задачи
continue;
}
thread_data[i].id = i;
/*
Остальные назначения в структуру данных
*/
start_row += rows_in_thread;
active_threads++;
if (start_row >= row) {
break;
}
}
pthread_mutex_lock(&mutex);
thread_working = active_threads;
task_in_progress = 1;
pthread_cond_broadcast(&cond_var_start);
while (task_in_progress) {
pthread_cond_wait(&cond_var_done, &mutex);
}
pthread_mutex_unlock(&mutex);
}
void multi_head_attn(
float* out,
float* q,
float* key_cache,
float* value_cache,
float* att,
int seq_len,
int n_heads,
int head_size,
int kv_dim,
int kv_mul)
{
int local_pos = pos;
int heads_per_thread = (n_heads + num_thread - 1) / num_thread;
int start_head = 0;
int active_threads = 0; // Количество потоков, которые будут на самом деле работать
for (int i = 0; i < num_thread; i++) {
int end_head = start_head + heads_per_thread;
if (end_head > n_heads) end_head = n_heads;
int heads_in_thread = end_head - start_head;
if (heads_in_thread <= 0) {
thread_data[i].task_type = 0; // Указать, что нет задачи
continue;
}
thread_data[i].task_type = MULTI_HEAD_ATTN;
/*
Остальные назначения в структуру данных
*/
start_head = end_head;
active_threads++;
if (start_head >= n_heads) {
break;
}
}
pthread_mutex_lock(&mutex);
thread_working = active_threads;
task_in_progress = 1;
pthread_cond_broadcast(&cond_var_start);
while (task_in_progress) {
pthread_cond_wait(&cond_var_done, &mutex);
}
pthread_mutex_unlock(&mutex);
}
Проблема в том, что я думаю, что часть обработки пакетами двух различных реализаций работает нормально, но код может выполняться правильно только тогда, когда количество потоков равно 1. Я думаю, что проблема возникает из-за состояния гонки или проблемы синхронизации. Я старался отладить это, но не могу найти решение.
Я пытался локализовать данные каждого потока. Я думал, что проблема возникает, когда потоки используют данные потока, и главный поток изменяет или перезаписывает данные потока. Однако это не помогло. Я могу выполнять код правильно только тогда, когда входное количество потоков равно 1. Я также пытался добавить мьютекс для блокировки различных мест, но это, похоже, не решает проблему.
Ответ или решение
Проблемы реализации многопоточности в LLM: Анализ и Решения
При переходе от последовательной работы алгоритмов AI к параллельному выполнению задач через многопоточность, разработчики сталкиваются с рядом проблем, которые могут существенно повлиять на производительность программы. В этой статье мы подробно рассмотрим и проанализируем возможные причины неполадок, с которыми вы столкнулись при реализации многопоточности в задачах матрично-векторного умножения и многошумного внимания (multi-head attention).
Проблемы синхронизации и состояния гонки
Одной из основных причин, по которой ваша программа корректно работает только с одним потоком, является состояние гонки. Состояние гонки происходит, когда несколько потоков одновременно пытаются получить доступ к общим данным, и в результате получаются непредсказуемые результаты. В вашем коде множество переменных и ресурсов, которые могут потенциально изменяться несколькими потоками одновременно:
-
Переменные потоков: Параметры, такие как
start_row
,start_head
, и другие значения, используемые для настройки работы каждого потока, могут быть изменены материальными потоками одновременно, что приводит к коллизиям в вычислениях. -
Общий массив
ThreadData
: Все потоки обращаются к массивуthread_data
, что делает данные, которые каждый поток использует, потенциально подверженными изменениям другими потоками. Убедитесь, что доступ к ним настроен так, чтобы разные потоки не мешали друг другу.
Решения для предотвращения состояния гонки
Для устранения проблем с состоянием гонки убедитесь, что вы точно настраиваете доступ к общим ресурсам, используя правильные механизмы синхронизации:
-
Локализация данных потока: Каждый поток должен иметь отдельную копию данных, которые он будет использовать. Это минимизирует необходимость в синхронизации и уменьшает шансы на возникновение состояния гонки.
-
Блокировки и условия: Используйте mutex, чтобы заблокировать критические секции кода, где происходит обновление общих переменных. Также убедитесь, что вы правильно обрабатываете условия, в которых потоки должны ждать друг друга или сигнализировать о завершении.
-
Использование
pthread_barrier_t
: Рассмотрите возможность использования барьеров для синхронизации потоков в определенные моменты времени, когда нужно убедиться, что все потоки достигли заданной точки выполнения, прежде чем продолжить.
Отладка и тестирование многопоточности
Чтобы удостовериться в том, что ваши изменения действительно решают проблему, рекомендуется:
-
Изолированное тестирование потоков: Запустите отдельные компоненты (например, матрично-векторное умножение и многошумное внимание) в одном потоке и убедитесь, что они работают корректно, а затем постепенно добавляйте потоки.
-
Логи и отладка: Добавьте логи для отслеживания значений ключевых переменных перед, во время и после выполнения критических секций. Это поможет выяснить, где конкретно нарушается логика выполнения программы.
-
Используйте инструменты профилирования: Такие инструменты, как Valgrind и другие, могут помочь выявить потенциальные проблемы с принудительным выполнением потоков и состояниями гонки.
Заключение
Переход к многопоточному программированию может быть сложным, но с правильным подходом к синхронизации и настроением потоков вы сможете достичь желаемой производительности и надежности. Применяйте приведенные выше рекомендации, и успешная реализация многопоточности для ваших алгоритмов AI станет более достижимой целью. Будьте внимательны к проблемам синхронизации и состояниям гонки, и используйте средства отладки для выявления и устранения потенциальных проблем.