Вопрос или проблема
У меня есть текстовый файл с сотнями тысяч строк, и каждая строка имеет фиксированное количество свойств, разделенных символом |. Я обработал каждую строку следующим образом.
- Прочитать строку из файла с помощью
StreamReader.ReadLineAsync()
. - Разделить элементы и заполнить их в DTO.
- Выполнить некоторые обработки с DTO.
- Если прошли, выполнить пару вызовов API AWS S3 для каждой строки / DTO.
- Если шаги 3 и 4 выполнены, вставить строку в таблицу A PostgreSQL.
- Если шаги 3 и 4 не выполнены, вставить строку в таблицу B PostgreSQL.
Я успешно реализовал вышеуказанный процесс последовательно, строка за строкой, используя C#
, .NET 8
и Dapper
(для PostgreSQL 16
) и реализовал это как BackgroundService
.
Тем не менее, чтобы сократить время выполнения, рекомендуется использовать System.Threading.Channels
, чтобы обрабатывать строку сразу после ее чтения? И для каждых 100 / 1000 строк выполнять массовую вставку / оператор копирования в базу данных.
Есть ли какие-либо лучшие рекомендации или подходы?
Редактировать: вызовы AWS асинхронные, а вызовы PostgreSQL синхронные. На обработку 52 строк построчно потребовалось примерно 5 минут.
Вам не нужно реализовывать шаблон производитель/потребитель, который выполняется с помощью Channel
.
Вы можете просто определить всю обработку одной строки в отдельном асинхронном методе и выполнять этот метод для каждой строки. Ключ в том, чтобы не ожидать завершения задачи, просто начать работу и считать другую строку, начать задачу, считать строку и т.д.
Для этого вам нужно будет иметь что-то, чтобы отслеживать текущую работу (запущенные Task
), например ConcurrentBag<Task>
.
Что-то вроде этого:
private readonly ConcurrentBag<Task> _tasks = new();
public async Task HeavyProcessingOfTheFileAsync()
{
while(!reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
_tasks.Add(ProcessLineAsync(line);
}
}
// Рассмотрите возможность реализации IAsyncDisposable, чтобы легко ожидать задачи и должным образом освобождать их.
public async ValueTask DisposeAsync()
{
await Task.WhenAll(_tasks);
}
private async Task ProcessLineAsync(string line)
{
// ...создать DTO, сохранить его в БД и т.д.
}
Ответ или решение
Для обработки большого текстового файла с сотнями тысяч строк в .NET с использованием параллельных задач, вы можете использовать асинхронный подход с использованием Task
. Это позволит вам обрабатывать строки файла одновременно, тем самым уменьшая общее время выполнения. Вот пошаговое решение вашей задачи:
Шаги для реализации параллельной обработки строк файла
-
Использование
StreamReader
для чтения файла: Мы будем читать строки файла по одной с помощью асинхронного методаReadLineAsync()
. -
Асинхронная обработка каждой строки: Каждая строка будет обрабатываться в своем отдельном асинхронном методе, не дожидаясь завершения предыдущей обработки, что позволяет выполнять несколько операций одновременно.
-
Хранение задач: Мы будем использовать
ConcurrentBag<Task>
для хранения всех запущенных задач, чтобы можно было отслеживать их состояние и дожидаться завершения. -
Синхронные вызовы к PostgreSQL и асинхронные к AWS S3: Поскольку вызовы к AWS S3 асинхронные, их следует вызывать с помощью
await
. Однако, если у вас есть синхронные операции с БД, то важно организовать их соответственно, также учтя возможность использования пакетной вставки каждые 100 или 1000 строк.
Пример кода
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading.Tasks;
using Dapper;
using Npgsql;
public class FileProcessor : IAsyncDisposable
{
private readonly ConcurrentBag<Task> _tasks = new();
private readonly string _filePath;
private readonly string _connectionString;
public FileProcessor(string filePath, string connectionString)
{
_filePath = filePath;
_connectionString = connectionString;
}
public async Task ProcessFileAsync()
{
using var reader = new StreamReader(_filePath);
while (!reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
_tasks.Add(ProcessLineAsync(line));
}
await Task.WhenAll(_tasks); // Ждем завершения всех задач
}
private async Task ProcessLineAsync(string line)
{
// 1. Разбить строку и создать DTO
var properties = line.Split('|');
var dto = CreateDto(properties);
// 2. Обработка DTO
bool isSuccess = await ProcessDtoAsync(dto);
using var connection = new NpgsqlConnection(_connectionString);
await connection.OpenAsync();
if (isSuccess)
{
// 3. Вставка в таблицу A
await InsertIntoTableAAsync(connection, dto);
}
else
{
// 4. Вставка в таблицу B
await InsertIntoTableBAsync(connection, dto);
}
}
private object CreateDto(string[] properties)
{
// Логика создания DTO из свойств
// ...
}
private async Task<bool> ProcessDtoAsync(object dto)
{
// Логика обработки DTO и вызова AWS S3
// Использование await для асинхронных вызовов
// ...
}
private async Task InsertIntoTableAAsync(NpgsqlConnection connection, object dto)
{
// Логика вставки в таблицу A
var sql = "INSERT INTO TableA (...) VALUES (...)";
await connection.ExecuteAsync(sql, dto);
}
private async Task InsertIntoTableBAsync(NpgsqlConnection connection, object dto)
{
// Логика вставки в таблицу B
var sql = "INSERT INTO TableB (...) VALUES (...)";
await connection.ExecuteAsync(sql, dto);
}
public async ValueTask DisposeAsync()
{
await Task.WhenAll(_tasks); // Ждем завершения всех задач
}
}
Рекомендации
-
Пакетная вставка: Если возможно, сгруппируйте вставки в базу данных в пакетные операции, каждые 100 или 1000 строк. Это значительно улучшит производительность.
-
Обработка ошибок: Убедитесь, что ваш код корректно обрабатывает исключения, особенно при работе с внешними сервисами, чтобы избежать частичного успеха.
-
Измерение производительности: После реализации параллельной обработки, выполните тестирование на больших объемах данных, чтобы убедиться, что процесс значительно ускорился.
Данный подход позволит вам эффективно обрабатывать большой объем данных, максимизируя использование асинхронности и производительности вашей системы.