Вопрос или проблема
С помощью MS Fabric Data Factory я извлек данные журнала активности Power BI в файл json, который я загрузил в OneLake. Я пытаюсь перенести эти данные в таблицу OneLake.
Вот пример данных в формате json:
{
"activityEventEntities": [
{
"Id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
"RecordType": 20,
"CreationTime": "2024-08-16T00:01:27",
"Operation": "GenerateDataflowSasToken",
"OrganizationId": "bbbbbbbb-cccc-dddd-eeee-ffffffffffff",
"UserKey": "22222222222222E1",
"Workload": "PowerBI",
"UserId": [email protected],
"Activity": "GenerateDataflowSasToken",
"ItemName": "AZURE-Item-Dim",
"WorkSpaceName": "Workspace1",
"CapacityId": "cccccccc-dddd-eeee-ffff-aaaaaaaaaaaa",
"CapacityName": "ACME_POWERBI_PREMIUM",
"WorkspaceId": "11111111-2222-3333-4444-555555555555",
"DataflowId": "22222222-3333-4444-5555-666666666666",
"DataflowName": "AZURE-Item-DimTest",
"DataflowAccessTokenRequestParameters": {
"tokenLifetimeInMinutes": 1440,
"permissions": 1,
"entityName": "Test",
"partitionUri": https://cabowaboap1.blob.core.windows.net/aaaaa/test.csv?snapshot=2024-08-15T23%3A04%3A10.7142181Z
},
"IsSuccess": true,
"DataflowType": "Internal",
"RequestId": "33333333-4444-5555-6666-777777777777",
"ActivityId": "ffffffff-aaaa-bbbb-cccc-dddddddddddd",
"RefreshEnforcementPolicy": 0
},
{
"Id": "11111111-aaaa-2222-bbbb-33333333333",
"RecordType": 20,
"CreationTime": "2024-08-16T01:26:58",
"Operation": "ExportReport",
"OrganizationId": "77777777-3333-4444-5555-666666666666",
"Workload": "Rdl",
"UserId": [email protected],
"Activity": "ExportReport",
}
],
"continuationUri": https://api.powerbi.com/v1.0/myorg/admin/activityevents?continuationToken='q34rtioaifdegnfoignfvsdfgnsowieiwtyoisrgW4Tdsadfa%3D%3D',
"continuationToken": "q34rtioaifdegnfoignfvsdfgnsowieiwtyoisrgW4Tdsadfa%3D%3D",
"lastResultSet": false,
"ADF_PipelineRunId": "0000000-3333-4444-5555-666666666666",
"ADFPipelineTriggerTime": "2024-09-12T19:57:48.8806663Z"
}
{
"activityEventEntities": [
{
"Id": "88888888-4444-5555-6666-777777777777",
"RecordType": 20,
"CreationTime": "2024-08-16T01:25:47",
"Operation": "ViewReport",
"UserKey": "777777777777777FD2",
"Workload": "PowerBI",
"UserId": [email protected],
"Activity": "ViewReport",
"ItemName": "Report2",
"WorkSpaceName": "Workspace1",
"DatasetName": "WLIC",
"ReportName": "Report2",
"CapacityId": "cccccccc-dddd-eeee-ffff-aaaaaaaaaaaa",
"CapacityName": "ACME_POWERBI_PREMIUM",
"WorkspaceId": "00000000-dddd-eeee-ffff-aaaaaaaaaaaa",
"IsSuccess": true,
"ReportType": "PaginatedReport",
"ArtifactKind": "Report",
"RefreshEnforcementPolicy": 0
},
{
"Id": "55555555-2222-3333-4444-555555555555",
"RecordType": 20,
"CreationTime": "2024-08-16T23:53:00",
"Operation": "GetSnapshots",
"OrganizationId": "77777777-3333-4444-5555-666666666666",
"UserType": 0,
"UserKey": "00000000000000E9",
"Workload": "PowerBI",
"UserId": [email protected],
"Activity": "GetSnapshots",
"IsSuccess": true,
"RefreshEnforcementPolicy": 0
}
],
"continuationUri": null,
"continuationToken": null,
"lastResultSet": true,
"ADF_PipelineRunId": "0000000-3333-4444-5555-666666666666",
"ADFPipelineTriggerTime": "2024-09-12T19:57:48.8806663Z"
}
Всего 24 блока для activityEventEntities. Когда я пытаюсь загрузить данные в dataframe, я получаю 24 строки на основе каждого блока activityEventEntities. В загруженном dataframe все атрибуты и значения хранятся в столбце activityEventEntities. Я хотел бы получить все атрибуты (ID, RecordType и т. д.) в виде столбцов со значениями.
Я пытался загрузить файл json в dataframe. Я пытался использовать json_normalize, чтобы нормализовать данные и сделать их пригодными для использования. Вот код, который я использовал:
import pandas as pd
import json
json_file_path = "Files/2024/08/17/PBI_Track.json"
# Загрузка JSON данных в Dataframe
df = spark.read.json(json_file_path)
df.show()
display(df)
normalized_df = pd.json_normalize(df['activityEventEntities'],
meta=['ID', 'CreationTime', 'Operation', 'OrganizationId', 'UserType', 'UserKey', 'Workload', 'UserId'])
Однако это завершилось неудачей с сообщением:
File ~/cluster-env/trident_env/lib/python3.10/site-packages/pandas/io/json/_normalize.py:445, in json_normalize(data, record_path, meta, meta_prefix, record_prefix, errors, sep, max_level)
441 data = [data]
442 elif isinstance(data, abc.Iterable) and not isinstance(data, str):
443 # GH35923 Исправить pd.json_normalize, чтобы не пропустить первый элемент
444 # генератора входных данных
--> 445 data = list(data)
446 else:
447 raise NotImplementedError
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/column.py:710, in Column.__iter__(self)
709 def __iter__(self) -> None:
710 raise TypeError("Column is not iterable")
Что мне нужно сделать, чтобы это исправить?
Ответ или решение
Чтобы загрузить нормализованные данные из файла JSON в DataFrame в Python, нужно выполнить несколько шагов. Ваша проблема заключается в том, что вы пытаетесь использовать pandas.json_normalize
сразу с объектом DataFrame от Spark, что не сработает, поскольку Pandas ожидает обычные Python структуры данных, такие как списки или словари, а не колонны из Spark DataFrame.
Вот полное решение, чтобы загрузить ваш JSON в DataFrame и нормализовать данные:
- Сначала импортируйте необходимые библиотеки.
- Загрузите данные JSON в Spark DataFrame.
- Преобразуйте нужные данные в Pandas DataFrame, а затем нормализуйте данные с помощью
json_normalize
.
Ниже приведен пример кода для выполнения всех этих шагов:
import pandas as pd
import json
from pyspark.sql import SparkSession
# Создание сессии Spark
spark = SparkSession.builder \
.appName("Load JSON Data") \
.getOrCreate()
# Укажите путь к вашему JSON файлу
json_file_path = "Files/2024/08/17/PBI_Track.json"
# Загрузка JSON данных в Spark DataFrame
df = spark.read.json(json_file_path)
# Вывод данных для проверки
df.show(truncate=False)
# Преобразуем данные activityEventEntities в Pandas DataFrame
activity_event_entities = df.select("activityEventEntities").rdd.flatMap(lambda x: x).collect()
# Конвертация в обычный Python список словарей
activity_event_entities_list = [json.loads(json.dumps(event)) for event in activity_event_entities]
# Нормализация данных в Pandas DataFrame
normalized_df = pd.json_normalize(activity_event_entities_list)
# Выводим нормализованный DataFrame
print(normalized_df.head())
Пояснение к коду:
-
Импортирую библиотеки: Мы используем
pandas
для работы с DataFrame иpyspark
для загрузки JSON данных. -
Создание сессии Spark: Необходимо запустить Spark-сессию для работы с данными.
-
Загрузка JSON данных: Используем
spark.read.json()
для загрузки JSON файла в Spark DataFrame. -
Преобразование данных в Pandas: Мы выбираем колонку
activityEventEntities
, конвертируем ее в RDD и затем собираем данные в обычный Python список словарей. - Нормализация и вывод: Нормализуем данные с помощью
pd.json_normalize()
и выводим результат.
С помощью этого кода вы сможете получить DataFrame
, в котором каждая атрибута из activityEventEntities
будет представлена отдельным столбцом. Убедитесь, что все необходимые библиотеки установлены и доступны в вашей среде.