Как лучше всего итерировать/парсить df.collect() в pyspark (необычный формат данных)?

Вопрос или проблема

Я использую Databricks pyspark, читаю объект из s3, но это не обычный CSV. У него есть заголовки и данные посередине файла, поэтому я читаю его таким образом.

df = (spark.read
    .format("text")
    .option("mode", "PERMISSIVE")
    .option("header", "false")
    .option("inferSchema","false")...

Таким образом, df.collect() выдает мне:

[Row(value=":::::"),
 Row(value="./some_file_name.csv"),
 Row(value=":::::"),
 Row(value="col_one|col_two|col_three"),
 Row(value="1|2|3"),
 Row(value="./another_file.csv"),
 Row(value="more stuff|of|varying|length")]

Мне нужно получить col_one, col_two и строку сразу после них с 1, 2, 3 и т.д.

Я могу итерироваться как список и использовать регулярное выражение, но есть ли более эффективный или предпочтительный способ? Я знаю, что итерация по датафреймам считается неэффективной.

Объем данных не так уж велик, так что я не переживаю о времени/ресурсах. Просто интересно, что будет самым “pyspark” способом сделать это.

Ответ или решение

Ваша задача заключается в извлечении данных из нестандартного текстового формата, который вы успешно читаете с помощью PySpark. Чтобы работать с этими данными, вместо того чтобы использовать df.collect() и итерироваться по полученному списку, мы можем использовать функции обработки данных, встроенные в PySpark, которые позволят нам создать более эффективный и "пифакторный" подход.

Шаги для решения задачи:

  1. Загрузите данные: Убедитесь, что данные загружены в DataFrame, как вы уже это сделали.

  2. Преобразуйте данные: Используйте функции PySpark для классификации строк в вашем DataFrame, где будут строки с заголовками и следующими за ними данными. Мы можем назначить "пометки" (метки) строк, чтобы указать, какие строки содержат заголовки, а какие – данные.

  3. Создайте новую структуру данных: Сформируйте новый DataFrame, который будет содержать только нужные вам данные.

Приведем код, который демонстрирует указанные шаги:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lag
from pyspark.sql.window import Window

# Создайте сессию Spark
spark = SparkSession.builder.appName("CustomDataProcessing").getOrCreate()

# Загрузите данные
df = (spark.read
    .format("text")
    .option("mode", "PERMISSIVE")
    .option("header", "false")
    .load("s3://your-bucket/path/to/your/file.txt")
)

# Создайте вспомогательный столбец, который обозначает, является ли строка заголовком
df = df.withColumn("is_header", when(col("value").contains("col_one"), True).otherwise(False))

# Теперь мы можем использовать окна для темпоральной обработки
window = Window.orderBy("value")
df = df.withColumn("prev_value", lag("value").over(window))

# Извлекаем только строки, которые содержат данные сразу после заголовков
data_df = df.filter(col("is_header") | (col("prev_value").contains("col_one")))

# Теперь извлечем колонки из данных
result_df = data_df.selectExpr("split(value, '\\|') as columns") \
                   .filter(data_df['is_header'] == False)

# Преобразуем данные в удобный формат
final_df = result_df.withColumn("col_one", col("columns")[0]) \
                     .withColumn("col_two", col("columns")[1]) \
                     .withColumn("col_three", col("columns")[2]) \
                     .select("col_one", "col_two", "col_three")

# Теперь вы можете просмотреть конечный DataFrame
final_df.show(truncate=False)

Пояснения:

  • Чтение файла: Вы загружаете данные, как уже указано.
  • Флаги заголовков: Мы создали дополнительный столбец с помощью withColumn для определения, является ли текущая строка заголовком.
  • Окна: Используем Window-функции для доступа к предыдущим строкам, чтобы связать заголовки с данными.
  • Фильтрация данных: Отфильтровываем строки так, чтобы остались только нужные данные.
  • Преобразование в удобный формат: Разделяем строки на колонки и извлекаем нужные значения.

Этот подход позволяет вам работать с вашими данными более эффективно и в соответствии с обычными практиками работы в PySpark.

Оцените материал
Добавить комментарий

Капча загружается...