Вопрос или проблема
Я пытаюсь выполнить внутреннее соединение между двумя стриминговыми датафреймами, где на одном из датафреймов была выполнена агрегация. Результирующий датафрейм всегда оказывается пустым. Тестирование с статическими версиями одних и тех же датафреймов и использование такого же соединения дает ожидаемый результат.
Для конкретики, я использую autoloader в databricks для загрузки файлов из облачного хранилища с использованием структурированного стриминга. Из этого потока я создаю другой стриминговый датафрейм, применяю некоторые преобразования и groupBy, а затем присоединяю этот стриминговый датафрейм обратно к исходному (то есть соединение поток-поток).
Я применил водяной знак к агрегированному датафрейму, как это требуется, но согласно документации использование водяного знака является обязательным для внутреннего соединения между двумя потоками.
Пример кода:
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_path)
.schema(schema)
.load(path)
)
second_df = df.filter(df["name"].startswith("abc") # создание второго стримингового датафрейма
second_df = second_df.withColumn("write_timestamp", current_timestamp()) # создание колонки времени
second_df = second_df.withWatermark("write_timestamp", "5 minutes") \
.groupBy("name", "item", "type", "write_timestamp") \
.agg(collect_list("value").alias("values")) # установка водяного знака и агрегация
merged_df = df.join(second_df, on = (df.item == second_df.item), how = "inner")
(merged_df.writeStream
.outputMode("append")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable(tablename))
Должен ли я также включать водяной знак в неагрегированный стриминговый датафрейм? Должно ли условие соединения включать условие диапазона водяного знака? Я не уверен, почему результат этого соединения может быть пустым.
Ответ или решение
Как выполнить внутреннее соединение потоков Spark с агрегацией
Вопрос, касающийся внутреннего соединения двух потоковых DataFrame в Spark с применением агрегации, является достаточно распространённой задачей в рамках обработки потоковых данных. Ваша проблема заключается в том, что после выполнения соединения результирующий DataFrame оказывается пустым. Мы подробно разберем возможные причины и предложим решения для успешного выполнения этой операции, опираясь на ваши исходные данные и код.
1. Понимание контекста
Вы используете Databricks Autoloader для загрузки файлов из облачного хранилища с помощью структурированной обработки потоков (structured streaming). Ваша исходная DataFrame (df) подлежит фильтрации для создания второго потока (second_df), после чего вы применяете агрегацию в сочетании с водяным знаком (watermark):
second_df = second_df.withWatermark("write_timestamp", "5 minutes") \
.groupBy("name", "item", "type", "write_timestamp") \
.agg(collect_list("value").alias("values"))
Затем вы выполняете внутреннее соединение:
merged_df = df.join(second_df, on=(df.item == second_df.item), how="inner")
2. Причины появления пустого результата
2.1. Ограничение времени на основе водяных знаков
Вы правильно применяете водяные знаки к агрегированному DataFrame (second_df). Однако важно упомянуть, что водяные знаки обязаны учитывать временные метки данных. Ваша текущая реализация предполагает, что только один из потоков имеет водяной знак, что может вызвать проблемы с отсутствием совпадений во времени при выполнении соединения. Рекомендуется также добавить водяной знак ко всему потоку df, особенно если требования к временным меткам в обоих потоках разные:
df = df.withWatermark("write_timestamp", "5 minutes") # добавьте водяной знак в исходный поток
2.2. Скорость поступления данных
Если данные в одном из потоков поступают медленно или имеют задержки, это также может привести к тому, что соединение вернет пустой результат. Убедитесь, что у вас достаточно данных для инициализации потоков и их агрегаций. Временные метки данных могут не совпадать из-за задержек.
2.3. Условия соединения
Ваше условие соединения:
on=(df.item == second_df.item)
может быть слишком строгим, если значения в столбце item имеют небольшой диапазон времени или если они сильно различаются в обоих потоках. Возможно, стоит рассмотреть возможность добавления условий, основанных на временных метках, если таковые имеются, для улучшения вероятности совпадений.
3. Рекомендации по улучшению кода
Ваша текущая структура кода выглядит вполне корректной, однако обратим внимание на несколько улучшений:
-
Добавление водяных знаков обоим потокам. Это обеспечит синхронизацию временных данных:
df = df.withWatermark("write_timestamp", "5 minutes")
-
Расширение условия соединения. Если у вас есть возможность, попробуйте добавить временные метки в условие соединения.
-
Мониторинг потока. Используйте
display
илиwriteStream
на промежуточных DataFrame, чтобы следить за потоком данных и убедиться, что в них есть данные прежде, чем продолжать соединение.
4. Заключение
Соединение потоковых DataFrame в Spark с агрегацией может быть сложным, особенно когда речь идет о временных метках и водяных знаках. Следуя приведённым рекомендациям, вы сможете устранить основные причины появления пустых результирующих DataFrame и повысить качество обработки ваших потоковых данных. Понимание принципов работы воды знаков, условий соединения и особенностей обработки потоков являются ключевыми факторами в успешной обработке данных в режиме реального времени.