Проблема реализации многопоточности для LLM

Вопрос или проблема

Я преобразую программу, чтобы она использовала ИИ с многопоточными методами вместо последовательного выполнения. Я попытался смоделировать многопоточность, обрабатывая данные пакетами в моем тестовом коде сначала, чтобы убедиться, что логика назначения различных данных соответствующему потоку правильна. Я это сделал, и это работает хорошо.

Следующий код выполняет умножение матрицы на вектор и многоголовое внимание (только эти две части необходимы в моей задаче):

void mat_vec_mul(float* out, QuantizedTensor *vec, QuantizedTensor *mat, int col, int row) {
    int rows_per_batch = row / num_thread;
    int start_position[num_thread];
    int rows_in_batches[num_thread];
    for(int i = 0; i < num_thread; i++){
        int rows = (i == num_thread-1) ? (row - i * rows_per_batch) : rows_per_batch;
        rows_in_batches[i] = rows;
        int start_row = i * rows_per_batch;
        start_position[i] = start_row;
    }

    for(int i = 0; i < num_thread; i++){
        int rows = rows_in_batches[i];
        int start = start_position[i];
        for(int j = 0; j < rows; j++){
            int in = (start + j) * col;
            float val = 0.0f;
            int32_t ival = 0;
            for (int k = 0; k <= col - GS; k += GS){
                for(int l = 0; l < GS; l++){
                    ival += ((int32_t) vec->q[k + l]) * ((int32_t) mat->q[in + k + l]);
                }
                val += ((float) ival) * mat->s[(in + k)/GS] * vec->s[k/GS];
                ival = 0;
            }
            out[start + j] = val;
        }
    }
}

// ----------------------------------------------------------------------------
// входная функция для многопоточного многоголового внимания
// @note: ВЫ НЕ МОЖЕТЕ ИЗМЕНЯТЬ СИГНАТУРУ ФУНКЦИИ!!!
void multi_head_attn(
    float* out,         // выходной тензор [head, head_size]
    float* q,           // тензор запроса [head, head_size]
    float* key_cache,   // кеш исторического тензора ключей [kv_head, seq_len, head_size]
    float* value_cache, // кеш исторического тензора значений [kv_head, seq_len, head_size]
    float* att,         // буфер для оценки внимания [head, seq_len]
    int seq_len,
    int n_heads,
    int head_size,
    int kv_dim,
    int kv_mul) 
{

    int heads_per_batch = (n_heads + num_thread - 1)/num_thread;
    int start_head = 0;

    int heads_in_batches[num_thread];
    float* att_arr[num_thread];

    for (int i = 0; i < num_thread; i++) {
        int end_head = start_head + heads_per_batch;
        if (end_head > n_heads) end_head = n_heads;
        int heads = end_head - start_head;
        if (heads <= 0) {
            heads_in_batches[i] = 0;
            att_arr[i] = NULL;
        } else {
            heads_in_batches[i] = heads;
            att_arr[i] = att + start_head * seq_len;
        }
        start_head = end_head;
    }

    start_head = 0;
    for(int i = 0; i < num_thread; i++){
        int heads = heads_in_batches[i];
        int end = start_head + heads;
        for(int h = start_head; h < end; h++){
            int head_index = h - start_head;
            float* head_q = q + h * head_size;
            float* head_att = att_arr[i] + head_index * seq_len;
            int t_max = pos;
            if (t_max >= seq_len) t_max = seq_len - 1;

            for(int t = 0; t <= t_max; t++){
                float* head_k = key_cache + t * kv_dim + (h / kv_mul) * head_size;
                float score = 0.0f;
                for(int j = 0; j < head_size; j++){
                    score += head_q[j] * head_k[j];
                }
                score /= sqrtf((float)head_size);
                head_att[t] = score;
            }
            // printf("\nвход в softmax\n");

            softmax(head_att, t_max+1);
            float* head_out = out + h * head_size;
            // printf("\nвход в memset\n");
            memset(head_out, 0, head_size * sizeof(float));

            for(int t = 0; t <= t_max; t++){
                // printf("\npin2\n");
                float* head_v = value_cache + t * kv_dim + (h / kv_mul) * head_size;
                float a = head_att[t];

                for(int j = 0; j < head_size; j++){
                    head_out[j] += a * head_v[j];
                }
            }
        }

        start_head = end;
    }
}    

Код для многопоточности выглядит следующим образом:

#define MAT_VEC_MUL 1       // это указывает, что поток должен выполнить умножение матрицы на вектор
#define MULTI_HEAD_ATTN 2   // это указывает, что поток должен выполнить многоголовое внимание
volatile int shutdown = 0;
typedef struct {
    int id;                // Уникальный ID для каждого потока
    int task_type;         // Тип задачи (например, MAT_VEC_MUL или MULTI_HEAD_ATTN)
    float* out;            // Указатель на выходной массив (для обеих задач)
    QuantizedTensor* vec;            // Указатель на вектор (для mat_vec_mul)
    QuantizedTensor* mat;            // Указатель на матрицу (для mat_vec_mul)
    int rows;              // Количество строк для обработки (для mat_vec_mul)
    int cols;              // Количество столбцов (для mat_vec_mul)
    float* q;              // Указатель на тензор запроса (для multi_head_attn)
    float* key_cache;      // Указатель на кеш ключей (для multi_head_attn)
    float* value_cache;    // Указатель на кеш значений (для multi_head_attn)
    float* att;            // Указатель на буфер оценки внимания (для multi_head_attn)
    int seq_len;           // Длина последовательности (для multi_head_attn)
    int n_heads;           // Количество голов (для multi_head_attn)
    int head_size;         // Размер головы (для multi_head_attn)
    int kv_dim;
    int kv_mul;
    int start_row;
    int start_head;
    int pos;
} ThreadData;
pthread_t thread_pool[16]; //максимальное количество потоков 16
ThreadData thread_data[16];
int num_thread = 0;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_var_start = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond_var_done = PTHREAD_COND_INITIALIZER;

int thread_working = 0;
int task_in_progress = 0;
void mat_vec_mul_task_func(int id, QuantizedTensor *vec, QuantizedTensor *mat, int col, int row, float* out, int start_row) {

    for(int i = 0; i < row; i++){
        int in = (start_row + i) * col;

        float val = 0.0f;
        int32_t ival = 0;

        for (int j = 0; j <= col - GS; j += GS)
        {
            for(int k = 0; k < GS; k++){
                ival += ((int32_t) vec->q[j + k]) * ((int32_t) mat->q[in + j + k]);
            }
            val += ((float) ival) * mat->s[(in + j) / GS] * vec->s[j / GS];
            ival = 0;
        }
        out[start_row + i] = val;
    }
}

void multi_head_attn_task_func(
    int id,
    float* out,
    float* q,
    float* key_cache,
    float* value_cache,
    float* att,
    int seq_len,
    int n_heads,
    int head_size,
    int kv_dim,
    int kv_mul,
    int start_head,
    int local_pos
) {
    int end_head = start_head + n_heads;

    for (int h = start_head; h < end_head; h++) {
        int head_index = h - start_head; // Корректированный индекс в диапазоне голов потока

        // Корректированные указатели для данных, специфичных для потока
        float* head_q = q + h * head_size;
        float* head_att = att + head_index * seq_len; // Убедитесь, что нет перекрытия
        float* head_out = out + h * head_size;

        // Вычислить оценки внимания
        for (int t = 0; t <= local_pos; t++) {
            float* head_k = key_cache + t * kv_dim + (h / kv_mul) * head_size;
            float score = 0.0f;
            for (int i = 0; i < head_size; i++) {
                score += head_q[i] * head_k[i];
            }
            score /= sqrtf((float)head_size);
            head_att[t] = score;
        }

        // Softmax (предполагая, что он безопасен для потоков)
        softmax(head_att, local_pos + 1);

        // Вычислить взвешенную сумму
        memset(head_out, 0, head_size * sizeof(float));
        for (int t = 0; t <= local_pos; t++) {
            float* head_v = value_cache + t * kv_dim + (h / kv_mul) * head_size;
            float a = head_att[t];
            for (int i = 0; i < head_size; i++) {
                head_out[i] += a * head_v[i];
            }
        }
    }
}

void *thr_func(void *arg) {
    ThreadData *data = (ThreadData*) arg;

    while (true) {
        pthread_mutex_lock(&mutex);
        while (!task_in_progress && !shutdown) {
            pthread_cond_wait(&cond_var_start, &mutex);
        }
        if (shutdown) {
            pthread_mutex_unlock(&mutex);
            break; // Выйти из цикла и потока
        }
        ThreadData local_data = *data;
        pthread_mutex_unlock(&mutex);

        if (local_data.task_type == 0) {
            continue; // Нет задания, назначенного этому потоку
        }

        if (local_data.task_type == MAT_VEC_MUL) {
            mat_vec_mul_task_func(
                local_data.id,
                /*
                Остальные параметры
                */
            );
        } else if (local_data.task_type == MULTI_HEAD_ATTN) {
            multi_head_attn_task_func(
                local_data.id,
                /*
                Остальные параметры
                */
            );
        }

        pthread_mutex_lock(&mutex);
        thread_working--;

        if (thread_working == 0) {
            task_in_progress = 0;
            pthread_cond_signal(&cond_var_done);
        }
        pthread_mutex_unlock(&mutex);
    }

    pthread_exit(NULL);
}

void init_thr_pool(int num_thr) {
    num_thread = num_thr;
    for(int i = 0; i < num_thr; i++){
        thread_data[i].id = i;
        int init_thread = pthread_create(&thread_pool[i], NULL, thr_func, &thread_data[i]);
        if (init_thread != 0){
            printf("ошибка инициализации потока с номером потока: %d\n", i);
            exit(EXIT_FAILURE);
        }
    }
}

// функция для закрытия пула потоков
// @note: ВЫ НЕ МОЖЕТЕ ИЗМЕНЯТЬ ЭТУ СИГНАТУРУ ФУНКЦИИ!!!
void close_thr_pool() {
    pthread_mutex_lock(&mutex);
    shutdown = 1;
    pthread_cond_broadcast(&cond_var_start); // Разбудить потоки для выхода
    pthread_mutex_unlock(&mutex);
    for(int i = 0; i < num_thread; i++){
        pthread_join(thread_pool[i], NULL);
    }
}

void mat_vec_mul(float* out, QuantizedTensor *vec, QuantizedTensor *mat, int col, int row) {
    int row_per_thread = row / num_thread;
    int start_row = 0;
    int active_threads = 0;

    for (int i = 0; i < num_thread; i++) {
        int rows_in_thread = (i == num_thread - 1) ? (row - start_row) : row_per_thread;

        if (rows_in_thread <= 0) {
            thread_data[i].task_type = 0; // Нет задачи
            continue;
        }

        thread_data[i].id = i;
        /*
        Остальные назначения в структуру данных
        */

        start_row += rows_in_thread;
        active_threads++;
        if (start_row >= row) {
            break;
        }
    }

    pthread_mutex_lock(&mutex);
    thread_working = active_threads;
    task_in_progress = 1;
    pthread_cond_broadcast(&cond_var_start);
    while (task_in_progress) {
        pthread_cond_wait(&cond_var_done, &mutex);
    }
    pthread_mutex_unlock(&mutex);
}

void multi_head_attn(
    float* out,
    float* q,
    float* key_cache,
    float* value_cache,
    float* att,
    int seq_len,
    int n_heads,
    int head_size,
    int kv_dim,
    int kv_mul) 
{
    int local_pos = pos;
    int heads_per_thread = (n_heads + num_thread - 1) / num_thread;
    int start_head = 0;
    int active_threads = 0; // Количество потоков, которые будут на самом деле работать

    for (int i = 0; i < num_thread; i++) {
        int end_head = start_head + heads_per_thread;
        if (end_head > n_heads) end_head = n_heads;
        int heads_in_thread = end_head - start_head;

        if (heads_in_thread <= 0) {
            thread_data[i].task_type = 0; // Указать, что нет задачи
            continue;
        }

        thread_data[i].task_type = MULTI_HEAD_ATTN;
        /*
        Остальные назначения в структуру данных
        */

        start_head = end_head;
        active_threads++;
        if (start_head >= n_heads) {
            break;
        }
    }

    pthread_mutex_lock(&mutex);
    thread_working = active_threads;
    task_in_progress = 1;
    pthread_cond_broadcast(&cond_var_start);
    while (task_in_progress) {
        pthread_cond_wait(&cond_var_done, &mutex);
    }
    pthread_mutex_unlock(&mutex);
} 

Проблема в том, что я думаю, что часть обработки пакетами двух различных реализаций работает нормально, но код может выполняться правильно только тогда, когда количество потоков равно 1. Я думаю, что проблема возникает из-за состояния гонки или проблемы синхронизации. Я старался отладить это, но не могу найти решение.

Я пытался локализовать данные каждого потока. Я думал, что проблема возникает, когда потоки используют данные потока, и главный поток изменяет или перезаписывает данные потока. Однако это не помогло. Я могу выполнять код правильно только тогда, когда входное количество потоков равно 1. Я также пытался добавить мьютекс для блокировки различных мест, но это, похоже, не решает проблему.

Ответ или решение

Проблемы реализации многопоточности в LLM: Анализ и Решения

При переходе от последовательной работы алгоритмов AI к параллельному выполнению задач через многопоточность, разработчики сталкиваются с рядом проблем, которые могут существенно повлиять на производительность программы. В этой статье мы подробно рассмотрим и проанализируем возможные причины неполадок, с которыми вы столкнулись при реализации многопоточности в задачах матрично-векторного умножения и многошумного внимания (multi-head attention).

Проблемы синхронизации и состояния гонки

Одной из основных причин, по которой ваша программа корректно работает только с одним потоком, является состояние гонки. Состояние гонки происходит, когда несколько потоков одновременно пытаются получить доступ к общим данным, и в результате получаются непредсказуемые результаты. В вашем коде множество переменных и ресурсов, которые могут потенциально изменяться несколькими потоками одновременно:

  1. Переменные потоков: Параметры, такие как start_row, start_head, и другие значения, используемые для настройки работы каждого потока, могут быть изменены материальными потоками одновременно, что приводит к коллизиям в вычислениях.

  2. Общий массив ThreadData: Все потоки обращаются к массиву thread_data, что делает данные, которые каждый поток использует, потенциально подверженными изменениям другими потоками. Убедитесь, что доступ к ним настроен так, чтобы разные потоки не мешали друг другу.

Решения для предотвращения состояния гонки

Для устранения проблем с состоянием гонки убедитесь, что вы точно настраиваете доступ к общим ресурсам, используя правильные механизмы синхронизации:

  • Локализация данных потока: Каждый поток должен иметь отдельную копию данных, которые он будет использовать. Это минимизирует необходимость в синхронизации и уменьшает шансы на возникновение состояния гонки.

  • Блокировки и условия: Используйте mutex, чтобы заблокировать критические секции кода, где происходит обновление общих переменных. Также убедитесь, что вы правильно обрабатываете условия, в которых потоки должны ждать друг друга или сигнализировать о завершении.

  • Использование pthread_barrier_t: Рассмотрите возможность использования барьеров для синхронизации потоков в определенные моменты времени, когда нужно убедиться, что все потоки достигли заданной точки выполнения, прежде чем продолжить.

Отладка и тестирование многопоточности

Чтобы удостовериться в том, что ваши изменения действительно решают проблему, рекомендуется:

  1. Изолированное тестирование потоков: Запустите отдельные компоненты (например, матрично-векторное умножение и многошумное внимание) в одном потоке и убедитесь, что они работают корректно, а затем постепенно добавляйте потоки.

  2. Логи и отладка: Добавьте логи для отслеживания значений ключевых переменных перед, во время и после выполнения критических секций. Это поможет выяснить, где конкретно нарушается логика выполнения программы.

  3. Используйте инструменты профилирования: Такие инструменты, как Valgrind и другие, могут помочь выявить потенциальные проблемы с принудительным выполнением потоков и состояниями гонки.

Заключение

Переход к многопоточному программированию может быть сложным, но с правильным подходом к синхронизации и настроением потоков вы сможете достичь желаемой производительности и надежности. Применяйте приведенные выше рекомендации, и успешная реализация многопоточности для ваших алгоритмов AI станет более достижимой целью. Будьте внимательны к проблемам синхронизации и состояниям гонки, и используйте средства отладки для выявления и устранения потенциальных проблем.

Оцените материал
Добавить комментарий

Капча загружается...