AWS Glue не извлекает полный набор данных во время выполнения задания

Вопрос или проблема

Я создал задачу, используя SQL Server в качестве источника данных, и всё работает нормально в предварительном просмотре данных.

Однако, когда я запускаю задачу, она работает только с набором данных, которые были загружены в предварительном просмотре данных, и не выбирает ничего другого из базы данных SQL Server.

Есть ли идеи, как отладить и решить эту проблему?

Скрипт Visual ETL:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame

def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(query)
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)

# Скрипт, сгенерированный для узла Custom Transform
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    dynamic_frame = dfc.select(list(dfc.keys())[0])
    spark_df = dynamic_frame.toDF()
    spark_df.write.format("iceberg").mode("append").save("MyCatalog.MyDatabase.MyTable")

    return DynamicFrameCollection({"default": DynamicFrame.fromDF(spark_df, glueContext, "default")}, glueContext)

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Скрипт, сгенерированный для узла Microsoft SQL Server
MicrosoftSQLServer_node = glueContext.create_dynamic_frame.from_options(
    connection_type = "sqlserver",
    connection_options = {
        "useConnectionProperties": "true",
        "dbtable": "dbo.MyTableName",
        "connectionName": "MyConnectionName",
    },
    transformation_ctx = "MicrosoftSQLServer_node"
)

# Скрипт, сгенерированный для узла SQL Query
SqlQuery0 = '''Мой SQL запрос'''
SQLQuery_node = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"myDataSource":MicrosoftSQLServer_node}, transformation_ctx = "SQLQuery_node")

# Скрипт, сгенерированный для узла Custom Transform
CustomTransform_node = MyTransform(glueContext, DynamicFrameCollection({"SQLQuery_node": SQLQuery_node}, glueContext))

job.commit()

Вот мои параметры задачи:

--conf        spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions 
 --conf        spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog 
 --conf        spark.sql.catalog.glue_catalog.warehouse=MyS3WareHouse
 --conf        spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog 
 --conf        spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO 
 --conf        spark.sql.iceberg.handle-timestamp-without-timezone=true 

Ожидается, что записи будут извлекаться из моего локального сервера базы данных и записываться в таблицу Iceberg, которая у меня есть на моем AWS S3.

Ответ или решение

Проблема, с которой вы столкнулись, может быть обусловлена несколькими факторами. Давайте рассмотрим возможные шаги для диагностики и решения проблемы с неполучением полного набора данных при выполнении задания AWS Glue:

1. Проблема с выборкой данных:

Поскольку в предварительном просмотре данных всё работает нормально, однако при фактическом выполнении задания данные не полностью извлекаются, это может быть связано с тем, что SQL-запрос, используемый в вашем коде, может не возвращать все строки, которые вы ожидаете.

  • Проверьте SQL-запрос. Убедитесь, что SQL-запрос, который вы используете в переменной SqlQuery0, не содержит условий, ограничивающих выбор данных (например, фильтры WHERE, ограничения по дате и т.д.). Попробуйте временно заменить его на запрос, который выбирает все данные, например:
    SELECT * FROM dbo.MyTableName

2. Проверка параметров соединения:

Проблемы могут возникать из-за неверных настроек соединения. Убедитесь, что ваша конфигурация connection_options корректна, включая параметры соединения и имя таблицы.

  • Используйте все параметры соединения. Попробуйте установить параметр "dbtable" на полное имя вашей таблицы (например, с указанием схемы). Попробуйте указать явным образом schema.table:
    "dbtable": "dbo.MyTableName"

3. Логи и отладка:

Обратите внимание на логи AWS Glue после выполнения задания. Логи содержат важную информацию, включая ошибки и предупреждения, которые могут помочь вам диагностировать проблему.

  • Используйте CloudWatch Logs. Проверьте, есть ли в логах ошибки, связанные с ресурсами, например, OutOfMemoryError, что может привести к ограничению объёма данных, который вы можете извлечь.

4. Фильтрация данных в разделе Размещения:

Если вы извлекаете данные в Iceberg, проверьте, нет ли в настройках Iceberg или конфигурации Glue каких-либо дополнительных фильтров, которые могут препятствовать записи всех данных.

5. Тестирование и алгоритм извлечения:

Поскольку ваш текущий код использует DynamicFrame, попробуйте протестировать, работает ли он корректно с простыми преобразованиями и небольшими выборками данных, чтобы установить, не вызывает ли ошибка сложное преобразование.

  • Тест с уменьшенной выборкой. Например, можете попробовать извлечь и записать данные из одной строки или небольшой выборки, чтобы видеть, что именно происходит.

6. Размер извлечения данных:

В случае если данные велики, возможно, потребуется увеличить количество частей, с которыми Glue работает. Убедитесь, что задания настроены для обработки больших наборов данных.

Заключение:

Убедившись в правильности SQL-запроса и конфигурации соединения, изучив логи выполнения, вы сможете выявить и исправить возникшие ошибки. Если после этих изменений проблема не разрешится, возможно, стоит обратиться в службу поддержки AWS, предоставив информацию о вашем задании и логах для более глубокого анализа.

Оцените материал
Добавить комментарий

Капча загружается...