Вопрос или проблема
Я использую Databricks pyspark, читаю объект из s3, но это не обычный CSV. У него есть заголовки и данные посередине файла, поэтому я читаю его таким образом.
df = (spark.read
.format("text")
.option("mode", "PERMISSIVE")
.option("header", "false")
.option("inferSchema","false")...
Таким образом, df.collect() выдает мне:
[Row(value=":::::"),
Row(value="./some_file_name.csv"),
Row(value=":::::"),
Row(value="col_one|col_two|col_three"),
Row(value="1|2|3"),
Row(value="./another_file.csv"),
Row(value="more stuff|of|varying|length")]
Мне нужно получить col_one, col_two и строку сразу после них с 1, 2, 3 и т.д.
Я могу итерироваться как список и использовать регулярное выражение, но есть ли более эффективный или предпочтительный способ? Я знаю, что итерация по датафреймам считается неэффективной.
Объем данных не так уж велик, так что я не переживаю о времени/ресурсах. Просто интересно, что будет самым “pyspark” способом сделать это.
Ответ или решение
Ваша задача заключается в извлечении данных из нестандартного текстового формата, который вы успешно читаете с помощью PySpark. Чтобы работать с этими данными, вместо того чтобы использовать df.collect()
и итерироваться по полученному списку, мы можем использовать функции обработки данных, встроенные в PySpark, которые позволят нам создать более эффективный и "пифакторный" подход.
Шаги для решения задачи:
-
Загрузите данные: Убедитесь, что данные загружены в DataFrame, как вы уже это сделали.
-
Преобразуйте данные: Используйте функции PySpark для классификации строк в вашем DataFrame, где будут строки с заголовками и следующими за ними данными. Мы можем назначить "пометки" (метки) строк, чтобы указать, какие строки содержат заголовки, а какие – данные.
-
Создайте новую структуру данных: Сформируйте новый DataFrame, который будет содержать только нужные вам данные.
Приведем код, который демонстрирует указанные шаги:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lag
from pyspark.sql.window import Window
# Создайте сессию Spark
spark = SparkSession.builder.appName("CustomDataProcessing").getOrCreate()
# Загрузите данные
df = (spark.read
.format("text")
.option("mode", "PERMISSIVE")
.option("header", "false")
.load("s3://your-bucket/path/to/your/file.txt")
)
# Создайте вспомогательный столбец, который обозначает, является ли строка заголовком
df = df.withColumn("is_header", when(col("value").contains("col_one"), True).otherwise(False))
# Теперь мы можем использовать окна для темпоральной обработки
window = Window.orderBy("value")
df = df.withColumn("prev_value", lag("value").over(window))
# Извлекаем только строки, которые содержат данные сразу после заголовков
data_df = df.filter(col("is_header") | (col("prev_value").contains("col_one")))
# Теперь извлечем колонки из данных
result_df = data_df.selectExpr("split(value, '\\|') as columns") \
.filter(data_df['is_header'] == False)
# Преобразуем данные в удобный формат
final_df = result_df.withColumn("col_one", col("columns")[0]) \
.withColumn("col_two", col("columns")[1]) \
.withColumn("col_three", col("columns")[2]) \
.select("col_one", "col_two", "col_three")
# Теперь вы можете просмотреть конечный DataFrame
final_df.show(truncate=False)
Пояснения:
- Чтение файла: Вы загружаете данные, как уже указано.
- Флаги заголовков: Мы создали дополнительный столбец с помощью
withColumn
для определения, является ли текущая строка заголовком. - Окна: Используем Window-функции для доступа к предыдущим строкам, чтобы связать заголовки с данными.
- Фильтрация данных: Отфильтровываем строки так, чтобы остались только нужные данные.
- Преобразование в удобный формат: Разделяем строки на колонки и извлекаем нужные значения.
Этот подход позволяет вам работать с вашими данными более эффективно и в соответствии с обычными практиками работы в PySpark.