Как обрабатывать большой текстовый файл одновременно в .NET?

Вопрос или проблема

У меня есть текстовый файл с сотнями тысяч строк, и каждая строка имеет фиксированное количество свойств, разделенных символом |. Я обработал каждую строку следующим образом.

  1. Прочитать строку из файла с помощью StreamReader.ReadLineAsync().
  2. Разделить элементы и заполнить их в DTO.
  3. Выполнить некоторые обработки с DTO.
  4. Если прошли, выполнить пару вызовов API AWS S3 для каждой строки / DTO.
  5. Если шаги 3 и 4 выполнены, вставить строку в таблицу A PostgreSQL.
  6. Если шаги 3 и 4 не выполнены, вставить строку в таблицу B PostgreSQL.

Я успешно реализовал вышеуказанный процесс последовательно, строка за строкой, используя C#, .NET 8 и Dapper (для PostgreSQL 16) и реализовал это как BackgroundService.

Тем не менее, чтобы сократить время выполнения, рекомендуется использовать System.Threading.Channels, чтобы обрабатывать строку сразу после ее чтения? И для каждых 100 / 1000 строк выполнять массовую вставку / оператор копирования в базу данных.

Есть ли какие-либо лучшие рекомендации или подходы?

Редактировать: вызовы AWS асинхронные, а вызовы PostgreSQL синхронные. На обработку 52 строк построчно потребовалось примерно 5 минут.

Вам не нужно реализовывать шаблон производитель/потребитель, который выполняется с помощью Channel.

Вы можете просто определить всю обработку одной строки в отдельном асинхронном методе и выполнять этот метод для каждой строки. Ключ в том, чтобы не ожидать завершения задачи, просто начать работу и считать другую строку, начать задачу, считать строку и т.д.

Для этого вам нужно будет иметь что-то, чтобы отслеживать текущую работу (запущенные Task), например ConcurrentBag<Task>.

Что-то вроде этого:

private readonly ConcurrentBag<Task> _tasks = new();

public async Task HeavyProcessingOfTheFileAsync()
{
    while(!reader.EndOfStream)
    {
        var line = await reader.ReadLineAsync();
        _tasks.Add(ProcessLineAsync(line);
    }
}

// Рассмотрите возможность реализации IAsyncDisposable, чтобы легко ожидать задачи и должным образом освобождать их.
public async ValueTask DisposeAsync()
{
    await Task.WhenAll(_tasks);
}

private async Task ProcessLineAsync(string line)
{
    // ...создать DTO, сохранить его в БД и т.д.
}

Ответ или решение

Для обработки большого текстового файла с сотнями тысяч строк в .NET с использованием параллельных задач, вы можете использовать асинхронный подход с использованием Task. Это позволит вам обрабатывать строки файла одновременно, тем самым уменьшая общее время выполнения. Вот пошаговое решение вашей задачи:

Шаги для реализации параллельной обработки строк файла

  1. Использование StreamReader для чтения файла: Мы будем читать строки файла по одной с помощью асинхронного метода ReadLineAsync().

  2. Асинхронная обработка каждой строки: Каждая строка будет обрабатываться в своем отдельном асинхронном методе, не дожидаясь завершения предыдущей обработки, что позволяет выполнять несколько операций одновременно.

  3. Хранение задач: Мы будем использовать ConcurrentBag<Task> для хранения всех запущенных задач, чтобы можно было отслеживать их состояние и дожидаться завершения.

  4. Синхронные вызовы к PostgreSQL и асинхронные к AWS S3: Поскольку вызовы к AWS S3 асинхронные, их следует вызывать с помощью await. Однако, если у вас есть синхронные операции с БД, то важно организовать их соответственно, также учтя возможность использования пакетной вставки каждые 100 или 1000 строк.

Пример кода

using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading.Tasks;
using Dapper;
using Npgsql;

public class FileProcessor : IAsyncDisposable
{
    private readonly ConcurrentBag<Task> _tasks = new();
    private readonly string _filePath;
    private readonly string _connectionString;

    public FileProcessor(string filePath, string connectionString)
    {
        _filePath = filePath;
        _connectionString = connectionString;
    }

    public async Task ProcessFileAsync()
    {
        using var reader = new StreamReader(_filePath);
        while (!reader.EndOfStream)
        {
            var line = await reader.ReadLineAsync();
            _tasks.Add(ProcessLineAsync(line));
        }

        await Task.WhenAll(_tasks); // Ждем завершения всех задач
    }

    private async Task ProcessLineAsync(string line)
    {
        // 1. Разбить строку и создать DTO
        var properties = line.Split('|');
        var dto = CreateDto(properties);

        // 2. Обработка DTO
        bool isSuccess = await ProcessDtoAsync(dto);

        using var connection = new NpgsqlConnection(_connectionString);
        await connection.OpenAsync();

        if (isSuccess)
        {
            // 3. Вставка в таблицу A
            await InsertIntoTableAAsync(connection, dto);
        }
        else
        {
            // 4. Вставка в таблицу B
            await InsertIntoTableBAsync(connection, dto);
        }
    }

    private object CreateDto(string[] properties)
    {
        // Логика создания DTO из свойств
        // ...
    }

    private async Task<bool> ProcessDtoAsync(object dto)
    {
        // Логика обработки DTO и вызова AWS S3
        // Использование await для асинхронных вызовов
        // ...
    }

    private async Task InsertIntoTableAAsync(NpgsqlConnection connection, object dto)
    {
        // Логика вставки в таблицу A
        var sql = "INSERT INTO TableA (...) VALUES (...)";
        await connection.ExecuteAsync(sql, dto);
    }

    private async Task InsertIntoTableBAsync(NpgsqlConnection connection, object dto)
    {
        // Логика вставки в таблицу B
        var sql = "INSERT INTO TableB (...) VALUES (...)";
        await connection.ExecuteAsync(sql, dto);
    }

    public async ValueTask DisposeAsync()
    {
        await Task.WhenAll(_tasks); // Ждем завершения всех задач
    }
}

Рекомендации

  1. Пакетная вставка: Если возможно, сгруппируйте вставки в базу данных в пакетные операции, каждые 100 или 1000 строк. Это значительно улучшит производительность.

  2. Обработка ошибок: Убедитесь, что ваш код корректно обрабатывает исключения, особенно при работе с внешними сервисами, чтобы избежать частичного успеха.

  3. Измерение производительности: После реализации параллельной обработки, выполните тестирование на больших объемах данных, чтобы убедиться, что процесс значительно ускорился.

Данный подход позволит вам эффективно обрабатывать большой объем данных, максимизируя использование асинхронности и производительности вашей системы.

Оцените материал
Добавить комментарий

Капча загружается...