Потребуется помощь в загрузке нормализованных данных из файла JSON в DataFrame в Python

Вопрос или проблема

С помощью MS Fabric Data Factory я извлек данные журнала активности Power BI в файл json, который я загрузил в OneLake. Я пытаюсь перенести эти данные в таблицу OneLake.

Вот пример данных в формате json:

{
    "activityEventEntities": [
        {
            "Id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
            "RecordType": 20,
            "CreationTime": "2024-08-16T00:01:27",
            "Operation": "GenerateDataflowSasToken",
            "OrganizationId": "bbbbbbbb-cccc-dddd-eeee-ffffffffffff",
            "UserKey": "22222222222222E1",
            "Workload": "PowerBI",
            "UserId": [email protected],
            "Activity": "GenerateDataflowSasToken",
            "ItemName": "AZURE-Item-Dim",
            "WorkSpaceName": "Workspace1",
            "CapacityId": "cccccccc-dddd-eeee-ffff-aaaaaaaaaaaa",
            "CapacityName": "ACME_POWERBI_PREMIUM",
            "WorkspaceId": "11111111-2222-3333-4444-555555555555",
            "DataflowId": "22222222-3333-4444-5555-666666666666",
            "DataflowName": "AZURE-Item-DimTest",
            "DataflowAccessTokenRequestParameters": {
                "tokenLifetimeInMinutes": 1440,
                "permissions": 1,
                "entityName": "Test",
                "partitionUri": https://cabowaboap1.blob.core.windows.net/aaaaa/test.csv?snapshot=2024-08-15T23%3A04%3A10.7142181Z
            },
            "IsSuccess": true,
            "DataflowType": "Internal",
            "RequestId": "33333333-4444-5555-6666-777777777777",
            "ActivityId": "ffffffff-aaaa-bbbb-cccc-dddddddddddd",
            "RefreshEnforcementPolicy": 0
        },

        {
            "Id": "11111111-aaaa-2222-bbbb-33333333333",
            "RecordType": 20,
            "CreationTime": "2024-08-16T01:26:58",
            "Operation": "ExportReport",
            "OrganizationId": "77777777-3333-4444-5555-666666666666",
            "Workload": "Rdl",
            "UserId": [email protected],
            "Activity": "ExportReport",
        }
    ],
    "continuationUri": https://api.powerbi.com/v1.0/myorg/admin/activityevents?continuationToken='q34rtioaifdegnfoignfvsdfgnsowieiwtyoisrgW4Tdsadfa%3D%3D',
    "continuationToken": "q34rtioaifdegnfoignfvsdfgnsowieiwtyoisrgW4Tdsadfa%3D%3D",
    "lastResultSet": false,
    "ADF_PipelineRunId": "0000000-3333-4444-5555-666666666666",
    "ADFPipelineTriggerTime": "2024-09-12T19:57:48.8806663Z"
}
{
    "activityEventEntities": [
        {
            "Id": "88888888-4444-5555-6666-777777777777",
            "RecordType": 20,
            "CreationTime": "2024-08-16T01:25:47",
            "Operation": "ViewReport",
            "UserKey": "777777777777777FD2",
            "Workload": "PowerBI",
            "UserId": [email protected],
            "Activity": "ViewReport",
            "ItemName": "Report2",
            "WorkSpaceName": "Workspace1",
            "DatasetName": "WLIC",
            "ReportName": "Report2",
            "CapacityId": "cccccccc-dddd-eeee-ffff-aaaaaaaaaaaa",
            "CapacityName": "ACME_POWERBI_PREMIUM",
            "WorkspaceId": "00000000-dddd-eeee-ffff-aaaaaaaaaaaa",
            "IsSuccess": true,
            "ReportType": "PaginatedReport",
            "ArtifactKind": "Report",
            "RefreshEnforcementPolicy": 0
        },
        {
            "Id": "55555555-2222-3333-4444-555555555555",
            "RecordType": 20,
            "CreationTime": "2024-08-16T23:53:00",
            "Operation": "GetSnapshots",
            "OrganizationId": "77777777-3333-4444-5555-666666666666",
            "UserType": 0,
            "UserKey": "00000000000000E9",
            "Workload": "PowerBI",
            "UserId": [email protected],
            "Activity": "GetSnapshots",
            "IsSuccess": true,
            "RefreshEnforcementPolicy": 0
        }
    ],
    "continuationUri": null,
    "continuationToken": null,
    "lastResultSet": true,
    "ADF_PipelineRunId": "0000000-3333-4444-5555-666666666666",
    "ADFPipelineTriggerTime": "2024-09-12T19:57:48.8806663Z"
}

Всего 24 блока для activityEventEntities. Когда я пытаюсь загрузить данные в dataframe, я получаю 24 строки на основе каждого блока activityEventEntities. В загруженном dataframe все атрибуты и значения хранятся в столбце activityEventEntities. Я хотел бы получить все атрибуты (ID, RecordType и т. д.) в виде столбцов со значениями.

Я пытался загрузить файл json в dataframe. Я пытался использовать json_normalize, чтобы нормализовать данные и сделать их пригодными для использования. Вот код, который я использовал:

import pandas as pd
import json

json_file_path = "Files/2024/08/17/PBI_Track.json"

# Загрузка JSON данных в Dataframe
df = spark.read.json(json_file_path)

df.show()
display(df)

normalized_df = pd.json_normalize(df['activityEventEntities'],
                             meta=['ID', 'CreationTime', 'Operation', 'OrganizationId', 'UserType', 'UserKey', 'Workload', 'UserId'])

Однако это завершилось неудачей с сообщением:

File ~/cluster-env/trident_env/lib/python3.10/site-packages/pandas/io/json/_normalize.py:445, in json_normalize(data, record_path, meta, meta_prefix, record_prefix, errors, sep, max_level)
    441     data = [data]
    442 elif isinstance(data, abc.Iterable) and not isinstance(data, str):
    443     # GH35923 Исправить pd.json_normalize, чтобы не пропустить первый элемент
    444     # генератора входных данных
--> 445     data = list(data)
    446 else:
    447     raise NotImplementedError

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/column.py:710, in Column.__iter__(self)
    709 def __iter__(self) -> None:
    710     raise TypeError("Column is not iterable")

Что мне нужно сделать, чтобы это исправить?

Ответ или решение

Чтобы загрузить нормализованные данные из файла JSON в DataFrame в Python, нужно выполнить несколько шагов. Ваша проблема заключается в том, что вы пытаетесь использовать pandas.json_normalize сразу с объектом DataFrame от Spark, что не сработает, поскольку Pandas ожидает обычные Python структуры данных, такие как списки или словари, а не колонны из Spark DataFrame.

Вот полное решение, чтобы загрузить ваш JSON в DataFrame и нормализовать данные:

  1. Сначала импортируйте необходимые библиотеки.
  2. Загрузите данные JSON в Spark DataFrame.
  3. Преобразуйте нужные данные в Pandas DataFrame, а затем нормализуйте данные с помощью json_normalize.

Ниже приведен пример кода для выполнения всех этих шагов:

import pandas as pd
import json
from pyspark.sql import SparkSession

# Создание сессии Spark
spark = SparkSession.builder \
    .appName("Load JSON Data") \
    .getOrCreate()

# Укажите путь к вашему JSON файлу
json_file_path = "Files/2024/08/17/PBI_Track.json"

# Загрузка JSON данных в Spark DataFrame
df = spark.read.json(json_file_path)

# Вывод данных для проверки
df.show(truncate=False)

# Преобразуем данные activityEventEntities в Pandas DataFrame
activity_event_entities = df.select("activityEventEntities").rdd.flatMap(lambda x: x).collect()

# Конвертация в обычный Python список словарей
activity_event_entities_list = [json.loads(json.dumps(event)) for event in activity_event_entities]

# Нормализация данных в Pandas DataFrame
normalized_df = pd.json_normalize(activity_event_entities_list)

# Выводим нормализованный DataFrame
print(normalized_df.head())

Пояснение к коду:

  1. Импортирую библиотеки: Мы используем pandas для работы с DataFrame и pyspark для загрузки JSON данных.

  2. Создание сессии Spark: Необходимо запустить Spark-сессию для работы с данными.

  3. Загрузка JSON данных: Используем spark.read.json() для загрузки JSON файла в Spark DataFrame.

  4. Преобразование данных в Pandas: Мы выбираем колонку activityEventEntities, конвертируем ее в RDD и затем собираем данные в обычный Python список словарей.

  5. Нормализация и вывод: Нормализуем данные с помощью pd.json_normalize() и выводим результат.

С помощью этого кода вы сможете получить DataFrame, в котором каждая атрибута из activityEventEntities будет представлена отдельным столбцом. Убедитесь, что все необходимые библиотеки установлены и доступны в вашей среде.

Оцените материал
Добавить комментарий

Капча загружается...