Вопрос или проблема
Я реализовал конвейер в AWS, где мои данные хранятся в бакете с именем “input-bucket”. В этом бакете есть файл, содержащий различные ZIP-архивы. Я написал задачу Glue для распаковки этих данных, преобразования их в формат CSV и сохранения в целевом бакете.
Ниже приведен код моей задачи Glue:
import boto3
import tempfile
from pyspark.sql import SparkSession
import zipfile
import os
import pyspark.sql.functions as f
import datetime
from pyspark.sql.functions import col
spark = (
SparkSession.builder.appName("Unzip_Zipped_Files")
.config("spark.driver.extraJavaOptions", "-Duser.timezone=GMT+5:30")
.config("spark.executor.extraJavaOptions", "-Duser.timezone=GMT+5:30")
.getOrCreate()
)
def list_folders_and_files(read_bucket_name, prefix):
s3_client = boto3.client('s3')
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=read_bucket_name, Prefix=prefix, Delimiter="https://stackoverflow.com/")
target_bucket_name = "target-bucket"
for page in page_iterator:
for prefix in page.get('CommonPrefixes', []):
folder_name = prefix.get('Prefix').rstrip("https://stackoverflow.com/")
print(f"Найдена папка: {folder_name}")
list_folders_and_files(read_bucket_name, folder_name+ "https://stackoverflow.com/")
for obj in page.get('Contents', []):
key = obj['Key']
if key.endswith('.zip'):
file_name = key.split("https://stackoverflow.com/")[-1]
current_folder_name = key.split("/")[0]
print(f"неизвлеченное имя файла {key}")
unzip_and_upload_file(s3_client, read_bucket_name, target_bucket_name, key, current_folder_name)
def unzip_and_upload_file(s3_client, read_bucket_name, target_bucket_name, key, new_target_folder_key):
with tempfile.TemporaryDirectory() as tmpdir:
download_path = os.path.join(tmpdir, os.path.basename(key))
s3_client.download_file(read_bucket_name, key, download_path)
with zipfile.ZipFile(download_path, 'r') as zip_ref:
zip_ref.extractall(tmpdir)
for file in os.listdir(tmpdir):
if file != os.path.basename(key):
file_name = file.split(".csv")[0]
if file_name.lower() != ("static_pool_data_ftd") and file_name != ("mShakti_Usage_Report_FTD.xls"):
new_key = new_target_folder_key + "/"+ file
print(f"файл, который будет загружен по этому пути {new_key}")
s3_client.upload_file(os.path.join(tmpdir, file), target_bucket_name, new_key) # загрузка файла в целевой бакет
df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"s3://{target_bucket_name}/{new_target_folder_key}/{file}", sep='|')
#result_df = df
result_df= df.withColumn("partition_date",f.lit(new_target_folder_key))
if file_name =="Loan_Closure_and_Foreclosure_Report_MTD":
clean_Loan_Closure_and_Foreclosure_Report_MTD_data(result_df, target_bucket_name, file_name, new_target_folder_key)
elif file_name=="NPA_Recovery_Report_MTD":
clean_NPA_recovery_report_rate(result_df, target_bucket_name, file_name, new_target_folder_key)
elif file_name == "customer_wise_disbursement_report_mtd":
result_df = result_df.withColumn("sales officer code", col("sales officer code").cast("double"))
result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header", "true").csv(f"s3://{target_bucket_name}/{file_name}/")
elif file_name == "collection_recon_report_mtd":
result_df = result_df.withColumn("total collection", col("total collection").cast("string"))
result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header", "true").csv(f"s3://{target_bucket_name}/{file_name}/")
else:
result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
print(f"s3://{target_bucket_name}/{new_target_folder_key}/{file} - файл прочитан...")
s3_client.delete_object(Bucket=target_bucket_name,Key=f"{new_target_folder_key}/{file}")
print(f"Удаленные объекты {new_target_folder_key}/{file}")
def clean_NPA_recovery_report_rate(df, target_bucket_name, file_name, new_target_folder_key):
try:
df = df.withColumn("Outstanding amount as on NPA date", f.regexp_replace(f.col("Outstanding amount as on NPA date"), ",", ""))
# df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
df.coalesce(1).write.mode("overwrite").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/{new_target_folder_key}/")
except Exception as e:
raise e
def clean_Loan_Closure_and_Foreclosure_Report_MTD_data(df, target_bucket_name, file_name, new_target_folder_key):
try:
for column in ["BRANCH CODE","CUSTOMER NUMBER","ACCOUNT NUMBER"]:
df = df.withColumn(column, f.regexp_replace(f.col(column), "'", ""))
# df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
df.coalesce(1).write.mode("overwrite").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/{new_target_folder_key}/")
except Exception as e:
raise e
if __name__=="__main__":
list_folders_and_files("input-bucket","28092024")
После выполнения задачи я запускаю краулер на своем целевом бакете, который создает каталог данных, упрощающий запрос данных в Athena.
Это работает хорошо, когда я запрашиваю данные впервые. Однако, когда я загружаю данные на следующий день в режиме OVERWRITE и когда я запрашиваю их в Athena, я сталкиваюсь с ошибкой, указывающей на то, что для одной из таблиц тип данных в предыдущем разделе – BIGINT
, в то время как теперь он объявлен как DOUBLE
, что приводит к несоответствию.
Я попытался привести тип данных к DOUBLE
(для новых поступивших данных), но все равно получил ту же ошибку, указывающую на то, что предыдущий раздел содержит DOUBLE
, в то время как текущие данные – BIGINT
.
Мой конечный случай использования – это обеспечить отображение только последних данных в Athena.
Не могли бы вы посоветовать, как решить эту проблему?
Ответ или решение
Ошибка HIVE_PARTITION_SCHEMA_MISMATCH: Мismatch между схемами таблицы и разделов в AWS Glue и Athena
Ваша проблема с ошибкой HIVE_PARTITION_SCHEMA_MISMATCH возникает из-за несоответствия схемы данных между различными разделами в таблице Athena, созданной на основе данных AWS Glue. Давайте рассмотрим, как можно выявить и устранить эту проблему, а затем обеспечим правильное отображение данных в Athena.
Причины ошибки
Ошибка, с которой вы столкнулись, происходит из-за изменения типа данных в столбцах между разными загрузками данных в вашу целевую таблицу. Например, если в одном разделе столбец имел тип данных BIGINT
, а в последующем разделе он стал DOUBLE
, это приводит к возникновению конфликта, который и вызывает ошибку.
Каждый раз, когда вы загружаете данные в таблицу, созданную с помощью Glue, Athena ожидает, что поля в новых разделах будут иметь те же типы данных, что и в предыдущих разделах. При наличии различий во типах данных Athena не может выполнить запросы к таблице, и вы получаете ошибку.
Рекомендации по решению проблемы
-
Унификация типов данных: Убедитесь, что во всех разделах, которые вы загружаете, используется один и тот же тип данных для всех столбцов. Проанализируйте код вашего Glue job и установите явные типы данных для столбцов перед их загрузкой.
Например, если вы предполагаете, что
sales officer code
должен бытьDOUBLE
, сделайте это в коде перед его записью:result_df = result_df.withColumn("sales officer code", col("sales officer code").cast("double"))
-
Переписывание данных: Если уже существуют данные в Athena с различными схемами, подумайте о том, чтобы переписать все данные в раздел с правильной схемой. Сделайте это, перегрузив данные с использованием единого типа данных и пересоздайте таблицу в Athena, чтобы переопределить существующие разделы.
-
Настройка Glue Crawler: Убедитесь, что ваш crawler настроен правильно для правильной обработки и обновления схемы. Вы можете настроить crawler на что бы он не менял существующие типы данных, если они уже определены в предыдущих разделах.
-
Статическое определение схемы: Вместо использования
inferSchema
, вы можете определить схему вручную, чтобы избежать неявного изменения типов данных:from pyspark.sql.types import StructType, StructField, StringType, DoubleType customSchema = StructType([ StructField("sales officer code", DoubleType(), True), StructField("another_column", StringType(), True) ]) df = spark.read.option("header", "true").schema(customSchema).csv(f"s3://{target_bucket_name}/{new_target_folder_key}/{file}")
-
Управление версиями данных: Если это возможно, используйте таймстемпы в названиях разделов. Например, можно добавлять дату в ключи разделов, чтобы в будущем избежать конфликтов между типами данных.
-
Создание резервной копии данных: При изменении структуры данных создавайте резервные копии старых данных и тестируйте изменения в новой таблице, прежде чем применять их в продакшене.
Заключение
Для решения проблемы HIVE_PARTITION_SCHEMA_MISMATCH в Athena, самое важное — это обеспечить единообразие типов данных в разных разделах вашей таблицы. Рекомендуется пересмотреть вашу загрузку данных, настройку Glue job и crawler, а также стратегии управления версиями данных для избежания подобных ошибок в будущем. Следуя представленным рекомендациям, вы сможете построить надежный и устойчивый процесс обработки данных, что обеспечит корректное взаимодействие с Athena.