Вопрос или проблема
Дан ряд событий (с датой и временем), например:
failed, failed, passed, failed, passed, passed
Я хочу получить время от первого “failed” до первого “passed”, сбрасывая каждый раз, когда снова происходит “failed”, поскольку хочу измерить время восстановления.
Мне удалось сделать это только с помощью цикла for, поскольку когда я применял groupBy
к событию с min
по дате, я терял порядок событий, так как хочу группировать по парам “failed-passed”.
В конечном счете я хочу измерить среднее время восстановления этого теста.
Пример данных:
from pyspark.sql import Row
from datetime import datetime
df = spark.createDataFrame([
Row(event="failed", date=datetime(2021, 8, 11, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 12, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 13, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 14, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 15, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 16, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 17, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 18, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 19, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 20, 0, 0))
])
df.show()
+------+-------------------+
| event| date|
+------+-------------------+
|failed|2021-08-11 00:00:00|
|failed|2021-08-12 00:00:00|
|passed|2021-08-13 00:00:00|
|failed|2021-08-14 00:00:00|
|failed|2021-08-15 00:00:00|
|passed|2021-08-16 00:00:00|
|passed|2021-08-17 00:00:00|
|passed|2021-08-18 00:00:00|
|failed|2021-08-19 00:00:00|
|passed|2021-08-20 00:00:00|
+------+-------------------+
ожидаемый результат:
df = spark.createDataFrame([
Row(failed=datetime(2021, 8, 11, 0, 0), recovered=datetime(2021, 8, 13, 0, 0)),
Row(failed=datetime(2021, 8, 14, 0, 0), recovered=datetime(2021, 8, 16, 0, 0)),
Row(failed=datetime(2021, 8, 19, 0, 0), recovered=datetime(2021, 8, 20, 0, 0)),
])
df.show()
+-------------------+-------------------+
| failed| recovered|
+-------------------+-------------------+
|2021-08-11 00:00:00|2021-08-13 00:00:00|
|2021-08-14 00:00:00|2021-08-16 00:00:00|
|2021-08-19 00:00:00|2021-08-20 00:00:00|
+-------------------+-------------------+
возможно, существуют более эффективные способы сделать это, но данный метод, кажется, работает
- исключить повторяющиеся события путем сравнения с отставанием с и без разделения
- вычислить опережение без разделения
- отфильтровать события “failed”
from pyspark.sql import Row
from pyspark.sql import Window
import pyspark.sql.functions as F
from datetime import datetime
df = spark.createDataFrame([
Row(event="failed", date=datetime(2021, 8, 11, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 12, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 13, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 14, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 15, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 16, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 17, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 18, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 19, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 20, 0, 0))
])
w = Window.partitionBy("event").orderBy("date")
w_wo_key = Window.orderBy("date")
(
df
.withColumn("lag_date", F.lag("date", 1, None).over(w))
.withColumn("lag_wo_key", F.lag("date",1,None).over(w_wo_key))
.filter((F.col("lag_date").isNull()) | (F.col("lag_date") != F.col("lag_wo_key")))
.withColumn("recovered", F.lead("date",1,None).over(w_wo_key))
.filter(F.col("event") == 'failed')
.select(
F.col("date").alias("failed"),
F.col("recovered"))
).show()
""" Возвращает
+-------------------+-------------------+
| failed| recovered|
+-------------------+-------------------+
|2021-08-11 00:00:00|2021-08-13 00:00:00|
|2021-08-14 00:00:00|2021-08-16 00:00:00|
|2021-08-19 00:00:00|2021-08-20 00:00:00|
+-------------------+-------------------+
"""
```
Ответ или решение
В данном техническом задании рассматривается задача анализа временных данных с использованием Apache Spark, которая заключается в определении времени восстановления из состояния "failed" в состояние "passed". Данная задача имеет исключительную важность при мониторинге надежности системы и ее компонентов, поскольку позволяет определить среднее время восстановления, что в свою очередь может помочь в оценке стабильности системы и планировании работ по ее усовершенствованию. Чтобы решить эту задачу, используется фреймворк PySpark, инструмент, позволяющий обрабатывать большие объемы данных на основе Apache Spark.
Теория
Apache Spark — это распределенная платформа для обработки данных, которая позволяет проводить анализ данных в большой распределенной системе. Это становится возможным благодаря удобной абстракции DataFrame, представляющей собой набор данных, организованных по аналогии с таблицами в базах данных, что позволяет применять к ним SQL-подобные операции.
Основной принцип выполнения задачи заключается в использовании оконных функций, которые обладают возможностью производить вычисления для заданного окна строк. Эти функции особенно полезны, когда необходимо изучить зависимость между строками в пределах одной или нескольких групп. В контексте нашей задачи, оконные функции позволяют семантически "перемещаться" по последовательности событий таким образом, что мы можем управлять и корректировать выборку данных в зависимости от требований.
Пример
На примере имеющегося кода мы можем видеть следующее:
-
Создание DataFrame.
Сначала создается DataFrame из последовательности строк (объектов Row), где каждая строка представляет событие с временной меткой. Это исходит из того, что данные о событиях (например, "failed", "passed") часто собираются в этой форме из систем лога или мониторинга. -
Определение оконных функций.
Далее, создается две оконных функции: одна группирует данные по событию и сортирует их по дате, вторая сортирует данные без разделения по событию. Это позволяет управлять данными не только внутри одной последовательности одинаковых событий, но и обрабатывать переходы от одного типа события к другому. -
Фильтрация данных.
С помощью методовlag
иlead
определяется начальная и конечная точка последовательности (в нашем случае "failed" и "passed").lag
иlead
позволяют смещаться по рядам в пределах окна, извлекая предыдущие или последующие значения, что особенно полезно для анализа последовательности изменений состояний. -
Формирование результирующего DataFrame.
После применения всех необходимых фильтраций, данные представляются в виде новой таблицы, где для каждой начальной точки "failed" вычисляется дата первой точки восстановления "passed".
from pyspark.sql import Row, Window
import pyspark.sql.functions as F
from datetime import datetime
df = spark.createDataFrame([
Row(event="failed", date=datetime(2021, 8, 11, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 12, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 13, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 14, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 15, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 16, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 17, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 18, 0, 0)),
Row(event="failed", date=datetime(2021, 8, 19, 0, 0)),
Row(event="passed", date=datetime(2021, 8, 20, 0, 0))
])
w = Window.partitionBy("event").orderBy("date")
w_wo_key = Window.orderBy("date")
result_df = (
df.withColumn("lag_date", F.lag("date", 1, None).over(w))
.withColumn("lag_wo_key", F.lag("date", 1, None).over(w_wo_key))
.filter((F.col("lag_date").isNull()) | (F.col("lag_date") != F.col("lag_wo_key")))
.withColumn("recovered", F.lead("date", 1, None).over(w_wo_key))
.filter(F.col("event") == 'failed')
.select(
F.col("date").alias("failed"),
F.col("recovered"))
)
result_df.show()
Применение
Вышеописанный метод позволяет обрабатывать данные в контексте бизнес-анализа, помогая визуализировать важную метрику — время восстановления из системы. Полученные данные могут использоваться специалистами для построения отчетов о надежности системы, оценке текущих данных уведомлений об инцидентах, а также для определения участков цепи поставки, где необходимо улучшить устойчивость и надежность.
Среднее время восстановления дает представление о времени, необходимом для возврата компонента к штатной эксплуатации после сбоя, чем меньше это время, тем более надежна система. В дополнение к этому, возможно привлечение среднего времени восстановления в прогнозирование возможных сбоев на основе исторических данных, что позволит смоделировать и предложить решение для потенциальных проблем до их реального наступления.
Таким образом, подобный подход не только предоставляет аналитикам и инженерам инструменты для измерений и оценки текущих показателей, но и способствует совершенствованию методов мониторинга системы в целом.