Сгруппировать spark dataframe от начального события до конечного события.

Вопрос или проблема

Дан ряд событий (с датой и временем), например:

failed, failed, passed, failed, passed, passed

Я хочу получить время от первого “failed” до первого “passed”, сбрасывая каждый раз, когда снова происходит “failed”, поскольку хочу измерить время восстановления.

Мне удалось сделать это только с помощью цикла for, поскольку когда я применял groupBy к событию с min по дате, я терял порядок событий, так как хочу группировать по парам “failed-passed”.

В конечном счете я хочу измерить среднее время восстановления этого теста.

Пример данных:

from pyspark.sql import Row
from datetime import datetime

df = spark.createDataFrame([
  Row(event="failed", date=datetime(2021, 8, 11, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 12, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 13, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 14, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 15, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 16, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 17, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 18, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 19, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 20, 0, 0))
])

df.show()

+------+-------------------+
| event|               date|
+------+-------------------+
|failed|2021-08-11 00:00:00|
|failed|2021-08-12 00:00:00|
|passed|2021-08-13 00:00:00|
|failed|2021-08-14 00:00:00|
|failed|2021-08-15 00:00:00|
|passed|2021-08-16 00:00:00|
|passed|2021-08-17 00:00:00|
|passed|2021-08-18 00:00:00|
|failed|2021-08-19 00:00:00|
|passed|2021-08-20 00:00:00|
+------+-------------------+

ожидаемый результат:

df = spark.createDataFrame([
  Row(failed=datetime(2021, 8, 11, 0, 0), recovered=datetime(2021, 8, 13, 0, 0)),
  Row(failed=datetime(2021, 8, 14, 0, 0), recovered=datetime(2021, 8, 16, 0, 0)),
  Row(failed=datetime(2021, 8, 19, 0, 0), recovered=datetime(2021, 8, 20, 0, 0)),
])

df.show()

+-------------------+-------------------+
|             failed|          recovered|
+-------------------+-------------------+
|2021-08-11 00:00:00|2021-08-13 00:00:00|
|2021-08-14 00:00:00|2021-08-16 00:00:00|
|2021-08-19 00:00:00|2021-08-20 00:00:00|
+-------------------+-------------------+

возможно, существуют более эффективные способы сделать это, но данный метод, кажется, работает

  1. исключить повторяющиеся события путем сравнения с отставанием с и без разделения
  2. вычислить опережение без разделения
  3. отфильтровать события “failed”
from pyspark.sql import Row
from pyspark.sql import Window
import pyspark.sql.functions as F
from datetime import datetime

df = spark.createDataFrame([
  Row(event="failed", date=datetime(2021, 8, 11, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 12, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 13, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 14, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 15, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 16, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 17, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 18, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 19, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 20, 0, 0))
])

w = Window.partitionBy("event").orderBy("date")
w_wo_key = Window.orderBy("date")

(
df
 .withColumn("lag_date", F.lag("date", 1, None).over(w))
 .withColumn("lag_wo_key", F.lag("date",1,None).over(w_wo_key))
 .filter((F.col("lag_date").isNull()) | (F.col("lag_date") != F.col("lag_wo_key")))
 .withColumn("recovered", F.lead("date",1,None).over(w_wo_key))
 .filter(F.col("event") == 'failed')
 .select(
   F.col("date").alias("failed"),
   F.col("recovered"))
).show()

""" Возвращает
+-------------------+-------------------+
|             failed|          recovered|
+-------------------+-------------------+
|2021-08-11 00:00:00|2021-08-13 00:00:00|
|2021-08-14 00:00:00|2021-08-16 00:00:00|
|2021-08-19 00:00:00|2021-08-20 00:00:00|
+-------------------+-------------------+
"""

```

Ответ или решение

В данном техническом задании рассматривается задача анализа временных данных с использованием Apache Spark, которая заключается в определении времени восстановления из состояния "failed" в состояние "passed". Данная задача имеет исключительную важность при мониторинге надежности системы и ее компонентов, поскольку позволяет определить среднее время восстановления, что в свою очередь может помочь в оценке стабильности системы и планировании работ по ее усовершенствованию. Чтобы решить эту задачу, используется фреймворк PySpark, инструмент, позволяющий обрабатывать большие объемы данных на основе Apache Spark.

Теория

Apache Spark — это распределенная платформа для обработки данных, которая позволяет проводить анализ данных в большой распределенной системе. Это становится возможным благодаря удобной абстракции DataFrame, представляющей собой набор данных, организованных по аналогии с таблицами в базах данных, что позволяет применять к ним SQL-подобные операции.

Основной принцип выполнения задачи заключается в использовании оконных функций, которые обладают возможностью производить вычисления для заданного окна строк. Эти функции особенно полезны, когда необходимо изучить зависимость между строками в пределах одной или нескольких групп. В контексте нашей задачи, оконные функции позволяют семантически "перемещаться" по последовательности событий таким образом, что мы можем управлять и корректировать выборку данных в зависимости от требований.

Пример

На примере имеющегося кода мы можем видеть следующее:

  1. Создание DataFrame.
    Сначала создается DataFrame из последовательности строк (объектов Row), где каждая строка представляет событие с временной меткой. Это исходит из того, что данные о событиях (например, "failed", "passed") часто собираются в этой форме из систем лога или мониторинга.

  2. Определение оконных функций.
    Далее, создается две оконных функции: одна группирует данные по событию и сортирует их по дате, вторая сортирует данные без разделения по событию. Это позволяет управлять данными не только внутри одной последовательности одинаковых событий, но и обрабатывать переходы от одного типа события к другому.

  3. Фильтрация данных.
    С помощью методов lag и lead определяется начальная и конечная точка последовательности (в нашем случае "failed" и "passed"). lag и lead позволяют смещаться по рядам в пределах окна, извлекая предыдущие или последующие значения, что особенно полезно для анализа последовательности изменений состояний.

  4. Формирование результирующего DataFrame.
    После применения всех необходимых фильтраций, данные представляются в виде новой таблицы, где для каждой начальной точки "failed" вычисляется дата первой точки восстановления "passed".

from pyspark.sql import Row, Window
import pyspark.sql.functions as F
from datetime import datetime

df = spark.createDataFrame([
  Row(event="failed", date=datetime(2021, 8, 11, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 12, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 13, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 14, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 15, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 16, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 17, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 18, 0, 0)),
  Row(event="failed", date=datetime(2021, 8, 19, 0, 0)),
  Row(event="passed", date=datetime(2021, 8, 20, 0, 0))
])

w = Window.partitionBy("event").orderBy("date")
w_wo_key = Window.orderBy("date")

result_df = (
    df.withColumn("lag_date", F.lag("date", 1, None).over(w))
      .withColumn("lag_wo_key", F.lag("date", 1, None).over(w_wo_key))
      .filter((F.col("lag_date").isNull()) | (F.col("lag_date") != F.col("lag_wo_key")))
      .withColumn("recovered", F.lead("date", 1, None).over(w_wo_key))
      .filter(F.col("event") == 'failed')
      .select(
          F.col("date").alias("failed"),
          F.col("recovered"))
)

result_df.show()

Применение

Вышеописанный метод позволяет обрабатывать данные в контексте бизнес-анализа, помогая визуализировать важную метрику — время восстановления из системы. Полученные данные могут использоваться специалистами для построения отчетов о надежности системы, оценке текущих данных уведомлений об инцидентах, а также для определения участков цепи поставки, где необходимо улучшить устойчивость и надежность.

Среднее время восстановления дает представление о времени, необходимом для возврата компонента к штатной эксплуатации после сбоя, чем меньше это время, тем более надежна система. В дополнение к этому, возможно привлечение среднего времени восстановления в прогнозирование возможных сбоев на основе исторических данных, что позволит смоделировать и предложить решение для потенциальных проблем до их реального наступления.

Таким образом, подобный подход не только предоставляет аналитикам и инженерам инструменты для измерений и оценки текущих показателей, но и способствует совершенствованию методов мониторинга системы в целом.

Оцените материал
Добавить комментарий

Капча загружается...