Парсинг больших объемов JSON-данных с несколькими различными типами объектов

Вопрос или проблема

Я видел несколько различных вопросов о том, как парсить большие JSON-файлы, но каждый из этих вопросов, с которыми я сталкивался, имел однородные данные по всему JSON-файлу. Каждый пример, который я видел с неравномерными JSON-объектами, обычно включает цикл for, который в моем случае работает очень медленно.

Моя проблема в том, что у меня есть около 5-10 ГБ данных, разделенных на множество JSON-файлов размером от 200 до 2000 кБ, которые содержат множество различных JSON-объектов. Похоже, что большинство решений, которые я видел, предполагают, что все объекты будут одного типа. У меня есть определение для всех типов объектов, но я хочу быстрый и эффективный способ их прочитать и преобразовать в пригодный для использования формат.

В данный момент у меня есть скрипт, который читает JSON-файл объект за объектом и парсит его таким образом, но на это уходит несколько часов для обработки всех данных, и это то, что мне нужно делать довольно часто, поэтому я надеюсь, что смогу как-то сократить это время. Конечная цель заключается в том, чтобы организовать все эти данные в массивы, которые затем можно добавить в существующую SQL-базу данных на основе временных меток.

У меня есть около 15 типов объектов. Каждый тип объекта будет содержать информацию о том, откуда пришли данные, а затем самих данных, хотя некоторые объекты включают данные из нескольких источников в вложенном формате. Ниже я привел пример для самых простых и самых сложных объектов, чтобы дать представление.

Самый сложный: Обычно такой объект содержит информацию примерно о 50 устройствах, но для краткости я уменьшил это количество до двух. Таким образом, списки deviceStatus и devices обычно содержат около 50 элементов для большинства этих сообщений.

{
    "type": "location",
    "source": "gps",
    "broadcastStatus": 1,
    "timestamp": xxxxxxxxxx,
    "deviceStatus": {
        "id1": {
            "msgCount": 1,
            "latestCheckin": "ACK",
            "timestamp": xxxxxxxxxx
        },
        "id2": {
            "msgCount": 3,
            "latestCheckin": "ACK",
            "timestamp": xxxxxxxxxx
        }
    },
    "devices": [
        {
            "deviceId": "id1",
            "position": [
                xx.xxxxx,
                -xx.xxxxx
            ],
            "attitude": [
                x.xxx,
                x.xxx,
                x.xxx
            ],
            "velocity": [
                x.xxx,
                x.xxx,
                x.xxx
            ],
            "spd": xxx.xxxx,
            "percentComplete": x.xxx
        },
        {
            "deviceId": "id2",
            "position": [
                xx.xxxxx,
                -xx.xxxxx
            ],
            "attitude": [
                x.xxx,
                x.xxx,
                x.xxx
            ],
            "velocity": [
                x.xxx,
                x.xxx,
                x.xxx
            ],
            "spd": xxx.xxxx,
            "percentComplete": x.xxx
        }
    ]
}

Самый простой: Все эти объекты одного типа, но существует около 40 каналов, которые транслируют такие данные.

{
    "type": "timebased"
    "source": "daq",
    "deviceId": "id1"
    "timestamp": xxxxxxxxxx,
    "channelName": "Channel1",
    "value": xxx,
    "addData": {
        "junk1": xxxx,
        "junk2": xxxx,
        "junk3": xxxx
    }
}

А ниже приведен некоторый псевдокод, который показывает, как я в данный момент обрабатываю и парсирую все.

basePath = r'путь\к\папке'
fileList = glob(basePath+'*.json')

timebased = {}  # это вложенный словарь, полный списков для разных устройств и каналов данных
location = {}  # то же самое, только для данных о местоположении

for fileName in fileList:
    with open(fileName) as f:
        try:
            objJson = json.load(f)
        except:
            objJson = []
            print('*******  Не удалось загрузить JSON-данные из '+fileName+'  **********')
    for msg in objJson:
        if msg["type"] == "timebased" and msg["source"] == "daq":
            ## Выполняем некоторые действия для обработки этого типа сообщения
            timebased[msg["deviceId"]][msg["channelName"]]["Time"].append(msg["timestamp"])
            timebased[msg["deviceId"]][msg["channelName"]]["Data"].append(msg["value"])
        elif msg["type"] == "location" and msg["source"] == "gps":
            ## Выполняем некоторые действия для обработки этого типа сообщения аналогично предыдущему случаю
        ..... это продолжается для примерно 15 различных случаев
    ## Затем все это записывается в csv, но в идеале должно быть записано в базу данных в будущем

Я знаю, что это не идеально, но мне действительно нужна помощь в том, как лучше пройтись по этим сообщениям и организовать все данные более эффективно. Я знаю, что добавлять элементы в списки – это не лучший вариант, но мне также нужна помощь с этим, поскольку я не знаю, как по-другому это сделать, так как заранее не знаю размер. Как я уже говорил, конечная цель состоит в том, чтобы иметь несколько списков или массивов со значениями времени и данных, которые я затем могу вставить в SQL-подобную базу данных, предназначенную для хранения таких данных. Эта SQL-подобная база данных имеет свой собственный API, который мне нужно будет использовать, что потребует передачи одного списка/массива значений времени и другого равной длины значений данных для каждого канала.

Моя следующая попытка будет заключаться в том, чтобы переключиться на asyncio, так как я парсирую десятки тысяч файлов, и это I/O может быть узким местом, но я хотел бы также получить некоторые отзывы по поводу самого парсинга сообщений.

Ответ или решение

Для обработки больших объемов JSON данных с несколькими различными типами объектов можно применить оптимизированный подход для повышения производительности и уменьшения времени обработки. Ваша текущая реализация, основанная на цикличной обработке и добавлении данных в списки, может вызвать серьезные проблемы с производительностью и эффективностью при работе с большими объемами данных. Ниже приведен ряд методов и рекомендаций для улучшения вашего процесса парсинга.

1. Использование библиотек для повышения производительности

Рассмотрите возможность использования библиотеки ujson или orjson, которые предлагают более быстрый парсинг JSON по сравнению со стандартной библиотекой json в Python. Эти библиотеки могут значительно сократить время обработки.

import orjson

with open(fileName, "rb") as f:
    objJson = orjson.loads(f.read())

2. Чтение и парсинг данных по частям

Учитывая большие размеры файлов, можно рассмотреть возможность чтения данных по частям (чанками). Это позволит считывать и обрабатывать данные, не загружая весь файл в память одновременно. Это особенно важно, если у вас недостаточно оперативной памяти.

3. Использование multiprocessing для параллельной обработки

Так как вам нужно обрабатывать множество файлов, вы можете использовать библиотеку multiprocessing для обработки нескольких файлов параллельно. Это особенно полезно при использовании многоядерных процессоров.

import os
from multiprocessing import Pool

def process_file(fileName):
    # Ваша логика обработки файла
    pass

if __name__ == "__main__":
    basePath = r'path\to\folder'
    fileList = glob(basePath + '*.json')

    with Pool(os.cpu_count()) as pool:
        pool.map(process_file, fileList)

4. Применение asyncio для асинхронной обработки

Если вы предполагаете, что процесс ввода-вывода является узким местом, использование asyncio может помочь в асинхронной обработке файлов. Это позволит не блокировать выполнение программы во время операций с файлами.

import aiofiles
import asyncio
import json

async def process_file_async(fileName):
    async with aiofiles.open(fileName, 'r') as f:
        data = await f.read()
        objJson = json.loads(data)
        # Ваш код обработки JSON

async def main():
    basePath = r'path\to\folder'
    fileList = glob(basePath + '*.json')
    tasks = [process_file_async(fileName) for fileName in fileList]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

5. Оптимизация структуры данных

При добавлении данных в структуру нужно избегать многократных операций добавления в списки. Вместо этого можно использовать defaultdict из модуля collections для автоматического создания новых списков при необходимости.

from collections import defaultdict

timebased = defaultdict(lambda: defaultdict(lambda: {"Time": [], "Data": []}))

6. Запись данных в SQL базу

После того как данные будут организованы в удобные структуры, вы можете пакетно записывать их в базу данных. Пакетная запись может существенно ускорить процесс вставки данных.

Заключение

Соблюдая вышеуказанные рекомендации, вы сможете значительно ускорить обработку ваших JSON файлов. Используйте более быстрые библиотеки для парсинга, оптимизируйте чтение файлов, внедрите параллельную и асинхронную обработку, а также структуры данных, способствующие быстрому добавлению. Применяя эти методы, вы сможете эффективно управлять большими объемами данных и минимизировать время выполнения скрипта.

Оцените материал
Добавить комментарий

Капча загружается...