Вопрос или проблема
Условие задачи:
У нас есть задача в Databricks, состоящая из нескольких параллельно работающих задач. Каждая задача записывает Spark dataframe в таблицу Azure SQL Database. Каждая задача записывает в свою целевую таблицу. Некоторые записывают тысячи записей, но несколько из них — миллионы. Процесс является обрезкой-загрузкой из-за особенностей дизайна, также это усложняет возможность сделать его инкрементальным.
Этот процесс изначально работал нормально. Но после добавления некоторых загрузок больших таблиц мы время от времени сталкиваемся с дублирующимися строками в таблице SQL Server после записи. Мы проанализировали эти записи и поняли, что это связано с тем, что одна из партиций dataframe записывается дважды. Мы не знаем точную причину, но предполагаем, что это связано с проблемой связи между SQL Server и кластером Spark, когда SQL Server сообщает Spark о необходимости повторного выполнения задачи, тогда как записи уже были фактически записаны и зафиксированы. Потенциально это может быть связано с ограничениями со стороны Azure SQL Database, так как использование DTU достигает 100% во время загрузки.
Код для записи:
def write_to_sqldb_table(self, df: DataFrame, target_table_name: str, num_partitions: int = 1) -> None:
df.write.format("SQLSERVER").option("host", self.sql_host_name).option("database", self.sql_db_name).option(
"dbtable", target_table_name
).option("user", self.sql_user_id).option("password", self.sql_user_password).option("truncate", "true").option(
"batchsize", self.batch_size
).option("numPartitions", num_partitions).mode("overwrite").save()
- batch_size установлен на 10.000, ранее мы проводили тестирование производительности, и это кажется оптимальным значением.
- num_partitions рассчитывается относительно количества записей в dataframe: total_records/batch_size, а затем округляется до ближайшего кратного 4, так как наш кластер имеет кратное 4 количество ядер.
Что мы пробовали до сих пор
Определение уникальных ограничений на SQL-таблице и обработка ошибок в PySpark
- Уникальные ограничения справились с поставленной задачей: заставили задачу Spark завершаться с ошибкой при вставке дублирующихся записей.
- Пробовали обработать ошибку в Spark, проверяя текст “Нарушение уникального ограничения” и затем возвращаясь из функции, но не удалось это реализовать. Задача Spark продолжала завершаться с ошибкой, вместо того чтобы поймать ошибку и продолжить с оставшимися вставками до успешного завершения задачи.
def write_to_sqldb_table_with_error_handling(self, df: DataFrame, target_table_name: str, num_partitions: int = 1) -> None:
try:
df.write.format("SQLSERVER").option("host", self.sql_host_name).option("database", self.sql_db_name).option(
"dbtable", target_table_name
).option("user", self.sql_user_id).option("password", self.sql_user_password).option("truncate", "true").option(
"batchsize", self.batch_size
).option("numPartitions", num_partitions).mode("overwrite").save()
except Exception as e:
error_message = str(e)
if "Нарушение уникального ключевого ограничения" in error_message:
return
else:
raise e
Определение уникальных ограничений на SQL-таблице и включение параметра IGNORE_DUP_KEY на уровне таблицы
- Операции вставки для крупных таблиц оказались заблокированными после внедрения IGNORE_DUP_KEY.
- Пробовали уменьшить batch_size, но это не помогло.
Запрос:
Мы ищем надежное решение для предотвращения дублирующих вставок при сохранении хорошей производительности. В данный момент я думаю о следующих двух решениях, которые стоит попробовать, но буду рад любым рекомендациям или опыту других, которые могут быть полезны!
-
вариант 1: использование Apache Spark Connector для Azure SQL с reliabilityLevel = “NO_DUPLICATES”
- этот коннектор часто обсуждается в контексте повышения производительности при записи Spark dataframe в базу данных Azure SQL
- последний релиз поддерживает только Spark 3.4, в то время как мы работаем на Spark 3.5, но, думаю, стоит попробовать
-
вариант 2: обработка дедупликации на стороне SQL Server, добавив столбец IDENTITY в качестве технического ключа в таблицы SQL Server и вызвав хранимую процедуру из моей задачи Spark для удаления дубликатов после операции вставки
Ответ или решение
Обработка дубликатов при записи Spark DataFrame в базу данных Azure SQL Server с использованием Databricks
Введение
В современном мире обработки данных, эффективное управление дубликатами является критически важной задачей, особенно когда речь идет о распределенных системах, таких как Apache Spark и Azure SQL Database. Проблема, описанная выше, связана с дублированием записей в результате параллельной обработки задач в Databricks, что может быть следствием некорректного управления потоками данных или ограничения ресурсов со стороны SQL Server.
Проблема и контекст
Ваша ситуация требует тщательного исследования, так как дублирование данных происходит в процессе загрузки данных через Truncate-Load. При условии большого объема данных, при параллельной загрузке возникает риск того, что часть партиций будет записана дважды. Это может быть связано с ошибками связи между Spark и SQL Server, особенно под высоким уровнем нагрузки на базу данных.
Способы решения проблемы
1. Использование коннектора Apache Spark для Azure SQL с reliabilityLevel = "NO_DUPLICATES"
Одним из вариантов оптимизации записи является использование коннектора Spark, который поддерживает параметр reliabilityLevel
. Хотя данный коннектор на данный момент совместим только с Spark 3.4, возможность его использования стоит рассмотреть, если вы сможете произвести необходимое обновление.
- Преимущества: Параметр
NO_DUPLICATES
обеспечивает автоматическую дедупликацию на уровне соединения, снижая вероятность дублирования. - Недостатки: Ограниченная совместимость и возможные затраты на обновление окружения.
2. Управление дубликатами на стороне SQL Server
Другой подход заключается в том, чтобы внедрить идентификационный столбец (IDENTITY) на стороне SQL Server и вызвать хранимую процедуру для удаления дубликатов после выполнения операций вставки. Этот метод позволит вам обрабатывать дубликаты постфактум, гарантируя целостность данных.
- Преимущества: Позволяет избежать ошибок вставки из-за ограничения уникальности при записи.
- Недостатки: Дополнительные затраты на обработку в базе данных и возможное снижение производительности при очистке.
Обработка ошибок при записи данных
Ваш текущий код для обработки ошибок записи имеет ограничения. Вместо того чтобы просто игнорировать исключения при нарушении уникальности, стоит рассмотреть возможность ведения журнала таких случаев и автоматического повторного выполнения операции с учетом уникальных идентификаторов.
def write_to_sqldb_table_with_error_handling(self, df: DataFrame, target_table_name: str, num_partitions: int = 1) -> None:
try:
df.write.format("SQLSERVER").options(
host=self.sql_host_name,
database=self.sql_db_name,
dbtable=target_table_name,
user=self.sql_user_id,
password=self.sql_user_password,
truncate="true",
batchsize=self.batch_size,
numPartitions=num_partitions
).mode("overwrite").save()
except Exception as e:
error_message = str(e)
if "Violation of UNIQUE KEY constraint" in error_message:
# Обработка дубликатов: запись в журнал или повторное выполнение с учетом идентификатора
self.log_error(error_message)
return # Или реализуйте более сложную логику обработки
else:
raise e
Заключение
Дубль записей в базе данных Azure SQL Server — это сложная проблема, требующая внимательного подхода и различных стратегий. Рассматриваемые варианты, такие как использование коннектора с повышенной надежностью и управление дублированием на уровне базы данных, могут помочь в устранении данного недоразумения и повысить производительность.
Надеюсь, эти рекомендации окажутся полезными для вашей задачи и помогут добиться надежности и эффективности в процессе работы с данными.