HIVE_PARTITION_SCHEMA_MISMATCH: Существует несоответствие между схемами таблицы и раздела.

Вопрос или проблема

Я реализовал конвейер в AWS, где мои данные хранятся в бакете с именем “input-bucket”. В этом бакете есть файл, содержащий различные ZIP-архивы. Я написал задачу Glue для распаковки этих данных, преобразования их в формат CSV и сохранения в целевом бакете.

Ниже приведен код моей задачи Glue:

   import boto3
   import tempfile
   from pyspark.sql import SparkSession
   import zipfile
   import os
   import pyspark.sql.functions as f
   import datetime
   from pyspark.sql.functions import col

   spark = (
       SparkSession.builder.appName("Unzip_Zipped_Files")
       .config("spark.driver.extraJavaOptions", "-Duser.timezone=GMT+5:30")
       .config("spark.executor.extraJavaOptions", "-Duser.timezone=GMT+5:30")
       .getOrCreate()
   )

def list_folders_and_files(read_bucket_name, prefix):
    s3_client = boto3.client('s3')
    paginator = s3_client.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=read_bucket_name, Prefix=prefix, Delimiter="https://stackoverflow.com/")
    target_bucket_name = "target-bucket"
    for page in page_iterator:
        for prefix in page.get('CommonPrefixes', []):
            folder_name = prefix.get('Prefix').rstrip("https://stackoverflow.com/")
            print(f"Найдена папка: {folder_name}")
            list_folders_and_files(read_bucket_name, folder_name+ "https://stackoverflow.com/")
        for obj in page.get('Contents', []):
            key = obj['Key']
            if key.endswith('.zip'):
                file_name = key.split("https://stackoverflow.com/")[-1]
                current_folder_name = key.split("/")[0]
                print(f"неизвлеченное имя файла {key}")
                unzip_and_upload_file(s3_client, read_bucket_name, target_bucket_name, key,             current_folder_name)
                    
def unzip_and_upload_file(s3_client, read_bucket_name, target_bucket_name, key, new_target_folder_key):
         with tempfile.TemporaryDirectory() as tmpdir:
            download_path = os.path.join(tmpdir, os.path.basename(key))
            s3_client.download_file(read_bucket_name, key, download_path)
            with zipfile.ZipFile(download_path, 'r') as zip_ref:
                zip_ref.extractall(tmpdir)
            for file in os.listdir(tmpdir):
                if file != os.path.basename(key):
                    file_name = file.split(".csv")[0]
                    if file_name.lower() != ("static_pool_data_ftd") and file_name !=  ("mShakti_Usage_Report_FTD.xls"):
                        new_key = new_target_folder_key + "/"+ file
                        print(f"файл, который будет загружен по этому пути {new_key}")
                        s3_client.upload_file(os.path.join(tmpdir, file), target_bucket_name, new_key) # загрузка файла в целевой бакет
                        df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"s3://{target_bucket_name}/{new_target_folder_key}/{file}", sep='|')
                        #result_df = df
                        result_df= df.withColumn("partition_date",f.lit(new_target_folder_key))
                        if file_name =="Loan_Closure_and_Foreclosure_Report_MTD":
                            clean_Loan_Closure_and_Foreclosure_Report_MTD_data(result_df, target_bucket_name, file_name, new_target_folder_key)
                        elif file_name=="NPA_Recovery_Report_MTD":
                            clean_NPA_recovery_report_rate(result_df, target_bucket_name, file_name, new_target_folder_key)
                        elif file_name == "customer_wise_disbursement_report_mtd":
                            result_df = result_df.withColumn("sales officer code", col("sales officer code").cast("double"))
                            result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header", "true").csv(f"s3://{target_bucket_name}/{file_name}/")
                        elif file_name == "collection_recon_report_mtd":
                            result_df = result_df.withColumn("total collection", col("total collection").cast("string"))
                            result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header", "true").csv(f"s3://{target_bucket_name}/{file_name}/")
                        else:
                            result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
                        print(f"s3://{target_bucket_name}/{new_target_folder_key}/{file} - файл прочитан...")
                        s3_client.delete_object(Bucket=target_bucket_name,Key=f"{new_target_folder_key}/{file}")
                        print(f"Удаленные объекты {new_target_folder_key}/{file}")


def clean_NPA_recovery_report_rate(df, target_bucket_name, file_name, new_target_folder_key):
    try:
        df = df.withColumn("Outstanding amount as on NPA date", f.regexp_replace(f.col("Outstanding amount as on NPA date"), ",", ""))
        # df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
        df.coalesce(1).write.mode("overwrite").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/{new_target_folder_key}/")
    except Exception as e:
        raise e    

def clean_Loan_Closure_and_Foreclosure_Report_MTD_data(df, target_bucket_name, file_name, new_target_folder_key):
    try:
        for column in ["BRANCH CODE","CUSTOMER NUMBER","ACCOUNT NUMBER"]:
            df = df.withColumn(column, f.regexp_replace(f.col(column), "'", ""))
        # df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
        df.coalesce(1).write.mode("overwrite").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/{new_target_folder_key}/")
    except Exception as e:
        raise e

if __name__=="__main__":
    list_folders_and_files("input-bucket","28092024")

После выполнения задачи я запускаю краулер на своем целевом бакете, который создает каталог данных, упрощающий запрос данных в Athena.

Это работает хорошо, когда я запрашиваю данные впервые. Однако, когда я загружаю данные на следующий день в режиме OVERWRITE и когда я запрашиваю их в Athena, я сталкиваюсь с ошибкой, указывающей на то, что для одной из таблиц тип данных в предыдущем разделе – BIGINT, в то время как теперь он объявлен как DOUBLE, что приводит к несоответствию.

Я попытался привести тип данных к DOUBLE (для новых поступивших данных), но все равно получил ту же ошибку, указывающую на то, что предыдущий раздел содержит DOUBLE, в то время как текущие данные – BIGINT.

Мой конечный случай использования – это обеспечить отображение только последних данных в Athena.

Не могли бы вы посоветовать, как решить эту проблему?

Ответ или решение

Ошибка HIVE_PARTITION_SCHEMA_MISMATCH: Мismatch между схемами таблицы и разделов в AWS Glue и Athena

Ваша проблема с ошибкой HIVE_PARTITION_SCHEMA_MISMATCH возникает из-за несоответствия схемы данных между различными разделами в таблице Athena, созданной на основе данных AWS Glue. Давайте рассмотрим, как можно выявить и устранить эту проблему, а затем обеспечим правильное отображение данных в Athena.

Причины ошибки

Ошибка, с которой вы столкнулись, происходит из-за изменения типа данных в столбцах между разными загрузками данных в вашу целевую таблицу. Например, если в одном разделе столбец имел тип данных BIGINT, а в последующем разделе он стал DOUBLE, это приводит к возникновению конфликта, который и вызывает ошибку.

Каждый раз, когда вы загружаете данные в таблицу, созданную с помощью Glue, Athena ожидает, что поля в новых разделах будут иметь те же типы данных, что и в предыдущих разделах. При наличии различий во типах данных Athena не может выполнить запросы к таблице, и вы получаете ошибку.

Рекомендации по решению проблемы

  1. Унификация типов данных: Убедитесь, что во всех разделах, которые вы загружаете, используется один и тот же тип данных для всех столбцов. Проанализируйте код вашего Glue job и установите явные типы данных для столбцов перед их загрузкой.

    Например, если вы предполагаете, что sales officer code должен быть DOUBLE, сделайте это в коде перед его записью:

    result_df = result_df.withColumn("sales officer code", col("sales officer code").cast("double"))
  2. Переписывание данных: Если уже существуют данные в Athena с различными схемами, подумайте о том, чтобы переписать все данные в раздел с правильной схемой. Сделайте это, перегрузив данные с использованием единого типа данных и пересоздайте таблицу в Athena, чтобы переопределить существующие разделы.

  3. Настройка Glue Crawler: Убедитесь, что ваш crawler настроен правильно для правильной обработки и обновления схемы. Вы можете настроить crawler на что бы он не менял существующие типы данных, если они уже определены в предыдущих разделах.

  4. Статическое определение схемы: Вместо использования inferSchema, вы можете определить схему вручную, чтобы избежать неявного изменения типов данных:

    from pyspark.sql.types import StructType, StructField, StringType, DoubleType
    customSchema = StructType([
       StructField("sales officer code", DoubleType(), True),
       StructField("another_column", StringType(), True)
    ])
    df = spark.read.option("header", "true").schema(customSchema).csv(f"s3://{target_bucket_name}/{new_target_folder_key}/{file}")
  5. Управление версиями данных: Если это возможно, используйте таймстемпы в названиях разделов. Например, можно добавлять дату в ключи разделов, чтобы в будущем избежать конфликтов между типами данных.

  6. Создание резервной копии данных: При изменении структуры данных создавайте резервные копии старых данных и тестируйте изменения в новой таблице, прежде чем применять их в продакшене.

Заключение

Для решения проблемы HIVE_PARTITION_SCHEMA_MISMATCH в Athena, самое важное — это обеспечить единообразие типов данных в разных разделах вашей таблицы. Рекомендуется пересмотреть вашу загрузку данных, настройку Glue job и crawler, а также стратегии управления версиями данных для избежания подобных ошибок в будущем. Следуя представленным рекомендациям, вы сможете построить надежный и устойчивый процесс обработки данных, что обеспечит корректное взаимодействие с Athena.

Оцените материал
Добавить комментарий

Капча загружается...