Использование частей для загрузки огромных файлов в Lambda

Вопрос или проблема

Я пытаюсь загрузить большой parquet файл из S3, используя функцию Lambda в AWS. В качестве оптимизации, поскольку есть проблема с памятью в Lambda, я попробовал разбить полученные данные на части. Вот код функции :

`
import awswrangler as wr
import boto3
import pandas as pd

s3 = boto3.client(‘s3’)

def process_object(bucket_name, object_prefix, flow_name, columns_to_keep, chunk_size=100_000):
rows = []

# Чтение файла Parquet частями с помощью awswrangler
dfs = wr.s3.read_parquet(
    path=f"s3://{bucket_name}/{object_prefix}",
    columns=columns_to_keep,
    chunked=True  # Чтение частями для экономии памяти
)

for df in dfs:  # Итерация по каждому сгенерированному DataFrame
    # Ограничить количество строк по частям, если необходимо
    if chunk_size:
        df = df.head(chunk_size)

    # Удалить дубликаты по определенным столбцам
    if flow_name == "R50":
        df = df.drop_duplicates(subset=["nom_archive", "nom_fichier"])
    else:
        df = df.drop_duplicates(subset=["nom_archive", "nom_fichier", "publication_date"])

    # Сортировать по столбцу "nom_archive" в порядке убывания и оставить первую строку
    df = df.sort_values("nom_archive", ascending=False).head(1)

    # Оптимизация типов данных
    df["nom_archive"] = df["nom_archive"].astype("category")
    df["nom_fichier"] = df["nom_fichier"].astype("category")

    # Добавить обработанные данные в список строк
    for index, row in df.iterrows():
        rows.append({
            "grid_operator": "grid_operator",  # Замените на вашу реальную переменную
            "file_name": row["nom_fichier"],
            "file_size": None,
            "archive_name": row["nom_archive"],
            "parent_path": "parent_path",  # Замените на вашу реальную переменную
            "event_time": "event_time",  # Замените на вашу реальную переменную
            "upload_day": "upload_day",  # Замените на вашу реальную переменную
            "upload_month": "upload_month",  # Замените на вашу реальную переменную
            "extraction_date": row.get("publication_date"),
            "flow_name": flow_name
        })

return rows`

Но возвращается ошибка:

Ошибка ArrowInvalid: Схема на индексе 0 отличалась.

Я пытался привести все поля к строковому типу. Но это не сработало.

Помогите, пожалуйста?

Ответ или решение

Загрузка больших файлов в AWS Lambda с использованием методов деления на чанки

Ваша задача заключается в оптимизации загрузки больших файлов Parquet из Amazon S3 в AWS Lambda. Это делается для решения проблемы с памятью, которая часто возникает при обработке больших данных в среде AWS Lambda. В данной статье мы разберем ваш код, рассмотрим возникающую ошибку и предложим пути решения.

Код вызова функции и ошибка

Вы представили следующий код, который использует библиотеку awswrangler для чтения данных из S3:

import awswrangler as wr
import boto3
import pandas as pd

s3 = boto3.client('s3')

def process_object(bucket_name, object_prefix, flow_name, columns_to_keep, chunk_size=100_000):
    rows = []
    dfs = wr.s3.read_parquet(
        path=f"s3://{bucket_name}/{object_prefix}",
        columns=columns_to_keep,
        chunked=True
    )

    for df in dfs:
        if chunk_size:
            df = df.head(chunk_size)

        if flow_name == "R50":
            df = df.drop_duplicates(subset=["nom_archive", "nom_fichier"])
        else:
            df = df.drop_duplicates(subset=["nom_archive", "nom_fichier", "publication_date"])

        df = df.sort_values("nom_archive", ascending=False).head(1)

        df["nom_archive"] = df["nom_archive"].astype("category")
        df["nom_fichier"] = df["nom_fichier"].astype("category")

        for index, row in df.iterrows():
            rows.append({
                "grid_operator": "grid_operator",
                "file_name": row["nom_fichier"],
                "file_size": None,
                "archive_name": row["nom_archive"],
                "parent_path": "parent_path",
                "event_time": "event_time",
                "upload_day": "upload_day",
                "upload_month": "upload_month",
                "extraction_date": row.get("publication_date"),
                "flow_name": flow_name
            })

    return rows

Ошибка, с которой вы столкнулись: ArrowInvalid: Schema at index 0 was different. Эта ошибка возникает, когда схемы (типы колонок) в чанках данных различаются, что делает невозможным обработку данных с помощью PyArrow, на котором основан awswrangler.

Потенциальные решения проблемы

  1. Убедитесь в согласованности данных: Проверьте, чтобы столбцы, которые вы загружаете, имели одинаковые типы данных во всех чанках. Вы можете сделать это, выполнив предварительный анализ данных в S3, используя pandas и другие инструменты.

  2. Преобразование типов данных: Поскольку вы уже пробовали преобразовать все поля в строку, также стоит проверить, правильно ли применяются типы данных к вашим DataFrame. Если ваши данные имеют различные типы для некоторых колонок между чанками, это может вызвать ошибку. Используйте метод pd.concat() для того, чтобы объединять DataFrames перед обработкой, что обеспечит однородность типов.

  3. Обработка разных схем: Если из-за изменений в данных возможно наличие различных схем по умолчанию (например, разные форматы дат), для каждого чанка можно выставить стандартные значения, что будет активировать преобразование типов. Это можно сделать, добавив перед обработкой:

df["nom_archive"] = df["nom_archive"].astype(str)
df["nom_fichier"] = df["nom_fichier"].astype(str)
  1. Логгирование схемы: Вы также можете добавлять логирование, чтобы фиксировать схемы данных, которые вы получаете из S3. Это поможет вам понять, в каком именно чанке возникает ошибка:
    for df in dfs:
        print(df.dtypes)  # Логируем схемы

Заключение

Загрузка крупных Parquet файлов в AWS Lambda может быть сложной задачей, требующей аккуратного обращения с памятью и типами данных. Следуя предложенным шагам, вы сможете устранить ошибку, сохранив при этом оптимизацию по памяти. Обязательно тестируйте каждую часть кода на небольших объемах данных перед обработкой крупных наборов для минимизации возможных ошибок.

Если у вас возникнут дополнительные вопросы или потребуется помощь в других аспектах работы с AWS и обработкой данных, не стесняйтесь обращаться.

Оцените материал
Добавить комментарий

Капча загружается...