Python-CAN: Буферизация данных и построение графиков в реальном времени

Вопрос или проблема

Я хотел бы считывать данные CAN-Bus с датчика и отображать их в реальном времени с помощью Matplotlib.

Моя идея заключается в том, чтобы постоянно считывать данные с шины с помощью python-can, преобразовывать и сохранять эти значения в циклическом буфере, а затем обновлять график примерно раз в 10 мс с учетом вновь записанных данных, очищая буфер при этом.

Я посмотрел на BufferedReader в python-can, но похоже, что он всегда дает только последнее прочитанное сообщение (если я его неправильно использую, что вполне возможно).

Какой был бы лучший способ сделать это? Я довольно новичок в python-can и анимации matplotlib, поэтому не знаю, с чего начать.

Спасибо заранее!

Ответ или решение

Для реализации задачи по чтению данных с шины CAN и их последующей визуализации в реальном времени с использованием библиотек python-can и matplotlib, вам потребуется комплексный подход, который включает в себя сбор данных, их буферизацию и обновление графика. Рассмотрим последовательные шаги для выполнения этой задачи.

Шаг 1: Установка необходимых библиотек

Прежде всего, убедитесь, что у вас установлены необходимые библиотеки. Вы можете использовать pip для их установки:

pip install python-can matplotlib

Шаг 2: Чтение данных с шины CAN

Для работы с шиной CAN будет использована библиотека python-can. Эта библиотека позволяет работать с различными интерфейсами CAN. Пример кода для чтения данных выглядит следующим образом:

import can

# Настройка интерфейса CAN, поменяйте параметры в зависимости от вашего устройства
bus = can.interface.Bus(channel='can0', bustype='socketcan')

Шаг 3: Организация буфера

Сохраняйте данные в массиве фиксированной длины (например, длиной 100) для поддержания "скользящего" буфера. Это позволит вам хранить последние 100 считанных сообщений.

import numpy as np

buffer_size = 100
data_buffer = np.zeros(buffer_size)
buffer_index = 0

Шаг 4: Чтение данных в отдельном потоке

Для того чтобы не блокировать основной поток выполнения программы, рекомендуется считать данные в отдельном потоке.

import threading
import time

def read_can_data():
    global buffer_index
    while True:
        message = bus.recv()  # Получение сообщения
        if message: 
            # Преобразование данных, например, в float, и добавление в буфер
            data_buffer[buffer_index] = float(message.data[0])  # Пример обработки данных
            buffer_index = (buffer_index + 1) % buffer_size  # Увеличение индекса буфера

# Запуск потока
reader_thread = threading.Thread(target=read_can_data, daemon=True)
reader_thread.start()

Шаг 5: Анимация графика с использованием matplotlib

Теперь, когда данные считываются в фоновом режиме, мы можем визуализировать их с помощью matplotlib. Мы будем обновлять график каждые 10 мс.

import matplotlib.pyplot as plt

def update_plot(frame):
    plt.clf()  # Очистка текущего графика
    plt.plot(data_buffer)
    plt.xlim(0, buffer_size)
    plt.ylim(0, 255)  # Задайте лимиты в зависимости от масштаба ваших данных
    plt.title('Real-time CAN Data')
    plt.xlabel('Sample Number')
    plt.ylabel('CAN Value')

# Инициализация параметров графика
plt.ion()  # Включение интерактивного режима
fig = plt.figure()
ani = plt.FuncAnimation(fig, update_plot, interval=10)  # Обновление графика каждые 10 мс

plt.show()

Заключение

Приведенный выше код демонстрирует, как можно считывать данные с шины CAN в реальном времени и визуализировать их с помощью matplotlib. Подход с использованием потоков позволяет избежать блокировки основного потока программы, что особенно важно для обеспечения плавного обновления графика. Не забывайте об оптимизации и тестировании кода в зависимости от ваших требований и условий работы.

Если у вас возникнут дополнительные вопросы или потребуется помощь в доработке данного решения, не стесняйтесь обращаться за помощью.

Оцените материал
Добавить комментарий

Капча загружается...