Референсная реализация q-learning на Python

Вопрос или проблема

Я новичок в машинном обучении и пытаюсь изучить Q-обучение. Я прочитал несколько текстов и понимаю общий смысл, но что бы мне действительно хотелось увидеть, так это простой пример алгоритма Q-обучения на Python, который я мог бы запустить и поэкспериментировать с ним.

Он может решать самые простые игры, мне не нужно ничего замысловатого.

Я искал и нашел много примеров, которые используют фреймворк gym. Этот фреймворк выглядит отлично, и я, вероятно, буду использовать его позже, но я хочу самую простую версию алгоритма Q-обучения, без автоматизации каких-либо процессов. Я думаю, это поможет мне лучше понять, что происходит.

Знаете ли вы о простой реализации Q-обучения на Python?

Вот мое собственное предложение ниже:

class Qlearning:
    def __init__(self, learning_rate, gamma, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.reset_qtable()

    def update(self, state, action, reward, new_state):
        """Обновить Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]"""
        delta = (
            reward
            + self.gamma * np.max(self.qtable[new_state, :])
            - self.qtable[state, action]
        )
        q_update = self.qtable[state, action] + self.learning_rate * delta
        return q_update

    def reset_qtable(self):
        """Сбросить Q-таблицу."""
        self.qtable = np.zeros((self.state_size, self.action_size))

А вот полный пример, чтобы проиллюстрировать, как это работает в простой среде RandomWalk1D (полная записная книжка доступна здесь):

# ## Зависимости
from typing import NamedTuple
from enum import Enum

import numpy as np
from numpy.random import default_rng
from tqdm import tqdm
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import pandas as pd
import seaborn as sns

sns.set_theme()

# ## Параметры
class Params(NamedTuple):
    total_episodes: int  # Общее количество эпизодов
    learning_rate: float  # Скорость обучения
    gamma: float  # Коэффициент дисконтирования
    seed: int  # Определить начальное значение, чтобы получить воспроизводимые результаты
    n_runs: int  # Количество запусков
    action_size: int  # Количество возможных действий
    state_size: int  # Количество возможных состояний
    epsilon: float  # Вероятность исследования

params = Params(
    total_episodes=50,
    learning_rate=0.3,
    gamma=0.95,
    seed=42,
    n_runs=100,
    action_size=None,
    state_size=None,
    epsilon=0.1,
)

# Установить начальное значение
rng = np.random.default_rng(params.seed)

# ## Среда
class Actions(Enum):
    Left = 0
    Right = 1

class RandomWalk1D:
    """`RandomWalk1D` для тестирования алгоритма Q-обучения.

    Агент (A) начинает в состоянии 3.
    Действия, которые он может предпринять, - это идти налево или направо.
    Эпизод заканчивается, когда он достигает состояния 0 или 6.
    Когда он достигает состояния 0, получает вознаграждение -1,
    когда достигает состояния 6, получает вознаграждение +1.
    В любом другом состоянии получает вознаграждение ноль.

    Вознаграждения:  -1   <-A->  +1
    Состояния:  <-0-1-2-3-4-5-6->

    Среда вдохновлена учебником `ReinforcementLearning.jl`:
    https://juliareinforcementlearning.org/docs/tutorial/
    """

    def __init__(self):
        self.observation_space = np.arange(0, 7)
        self.action_space = [item.value for item in list(Actions)]
        self.right_boundary = 6
        self.left_boundary = 0
        self.reset()

    def reset(self):
        self.current_state = 3
        return self.current_state

    def step(self, action):
        if action == Actions.Left.value:
            new_state = np.max([self.left_boundary, self.current_state - 1])
        elif action == Actions.Right.value:
            new_state = np.min([self.right_boundary, self.current_state + 1])
        else:
            raise ValueError("Невозможный тип действия")
        self.current_state = new_state
        reward = self.reward(self.current_state)
        is_terminated = self.is_terminated(self.current_state)
        return new_state, reward, is_terminated

    def reward(self, observation):
        reward = 0
        if observation == self.right_boundary:
            reward = 1
        elif observation == self.left_boundary:
            reward = -1
        return reward

    def is_terminated(self, observation):
        is_terminated = False
        if observation == self.right_boundary or observation == self.left_boundary:
            is_terminated = True
        return is_terminated

env = RandomWalk1D()

params = params._replace(action_size=len(env.action_space))
params = params._replace(state_size=len(env.observation_space))
print(f"Размер действия: {params.action_size}")
print(f"Размер состояния: {params.state_size}")

# ## Алгоритм обучения: Q-обучение
class Qlearning:
    def __init__(self, learning_rate, gamma, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.reset_qtable()

    def update(self, state, action, reward, new_state):
        """Обновить Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]"""
        delta = (
            reward
            + self.gamma * np.max(self.qtable[new_state, :])
            - self.qtable[state, action]
        )
        q_update = self.qtable[state, action] + self.learning_rate * delta
        return q_update

    def reset_qtable(self):
        """Сбросить Q-таблицу."""
        self.qtable = np.zeros((self.state_size, self.action_size))

# ## Алгоритм исследователя: epsilon-жадный
class EpsilonGreedy:
    def __init__(self, epsilon, rng=None):
        self.epsilon = epsilon
        if rng:
            self.rng = rng
        else:
            self.rng = default_rng()

    def choose_action(self, action_space, state, qtable):
        """Выберите действие `a` в текущем состоянии мира (s)."""
        # Сначала случайно генерируем число
        explor_exploit_tradeoff = self.rng.uniform(0, 1)

        def sample(action_space):
            return self.rng.choice(action_space)

        # Исследование
        if explor_exploit_tradeoff < self.epsilon:
            action = sample(action_space)

        # Эксплуатация (выбор наибольшего значения Q для этого состояния)
        else:
            # Разрываем ничьи случайным образом
            # Если все действия равны для этого состояния, мы выбираем случайное
            # (в противном случае `np.argmax()` всегда выберет первое)
            if np.all(qtable[state, :]) == qtable[state, 0]:
                action = sample(action_space)
            else:
                action = np.argmax(qtable[state, :])
        return action

# ## Запуск среды
learner = Qlearning(
    learning_rate=params.learning_rate,
    gamma=params.gamma,
    state_size=params.state_size,
    action_size=params.action_size,
)

explorer = EpsilonGreedy(epsilon=params.epsilon, rng=rng)

# Это будет наша основная функция для запуска нашей среды до максимального
# количества эпизодов `params.total_episodes`.
# Для учета стохастичности мы также запустим нашу среду несколько раз.

rewards = np.zeros((params.total_episodes, params.n_runs))
steps = np.zeros((params.total_episodes, params.n_runs))
episodes = np.arange(params.total_episodes)
qtables = np.zeros((params.n_runs, params.state_size, params.action_size))
all_states = []
all_actions = []

for run in range(params.n_runs):  # Запускаем несколько раз для учета стохастичности
    learner.reset_qtable()  # Сбрасываем Q-таблицу между запусками

    for episode in tqdm(
        episodes, desc=f"Запуск {run}/{params.n_runs} - Эпизоды", leave=False
    ):
        state = env.reset()  # Сбрасываем среду
        step = 0
        done = False
        total_rewards = 0

        while not done:
            action = explorer.choose_action(
                action_space=env.action_space, state=state, qtable=learner.qtable
            )

            # Записываем все состояния и действия
            all_states.append(state)
            all_actions.append(action)

            # Выполняем действие (a) и наблюдаем за новым состоянием(s') и вознаграждением (r)
            new_state, reward, done = env.step(action)

            learner.qtable[state, action] = learner.update(
                state, action, reward, new_state
            )

            total_rewards += reward
            step += 1

            # Наше новое состояние - это состояние
            state = new_state

        # Записываем все вознаграждения и шаги
        rewards[episode, run] = total_rewards
        steps[episode, run] = step
    qtables[run, :, :] = learner.qtable

# ## Визуализация

def postprocess(episodes, params, rewards, steps, qtables):
    """Преобразовать результаты симуляции в датафреймы."""
    res = pd.DataFrame(
        data={
            "Эпизоды": np.tile(episodes, reps=params.n_runs),
            "Вознаграждения": rewards.flatten(order="F"),
            "Шаги": steps.flatten(order="F"),
        }
    )
    # res["cum_rewards"] = rewards.cumsum(axis=0).flatten(order="F")
    qtable = qtables.mean(axis=0)  # Среднее значение Q-таблицы между запусками
    return res, qtable

res, qtable = postprocess(episodes, params, rewards, steps, qtables)

def plot_steps_and_rewards(df):
    """Построить график шагов и вознаграждений из датафреймов."""
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15, 5))
    sns.lineplot(data=df, x="Эпизоды", y="Вознаграждения", ax=ax[0])
    ax[0].set(ylabel=f"Вознаграждения\nв среднем по {params.n_runs} запускам")

    sns.lineplot(data=df, x="Эпизоды", y="Шаги", ax=ax[1])
    ax[1].set(ylabel=f"Количество шагов\nв среднем по {params.n_runs} запускам")

    fig.tight_layout()
    plt.show()

plot_steps_and_rewards(res)

qtable_flat = qtable.flatten()[np.newaxis, :]

def plot_q_values():
    fig, ax = plt.subplots(figsize=(15, 1.5))
    cmap = sns.color_palette("vlag", as_cmap=True)
    chart = sns.heatmap(
        qtable.flatten()[np.newaxis, :],
        annot=True,
        ax=ax,
        cmap=cmap,
        yticklabels=False,  # linewidth=0.5
        center=0,
    )
    states_nodes = np.arange(1, 14, 2)
    chart.set_xticks(states_nodes)
    chart.set_xticklabels([str(item) for item in np.arange(0, 7, 1)])
    chart.set_title("Q значения")
    ax.tick_params(bottom=True)

    # Добавить стрелки действия
    for node in states_nodes:
        arrows_left = {"x_tail": node, "y_tail": 1.3, "x_head": node - 1, "y_head": 1.3}
        arrow = mpatches.FancyArrowPatch(
            (arrows_left["x_tail"], arrows_left["y_tail"]),
            (arrows_left["x_head"], arrows_left["y_head"]),
            mutation_scale=10,
            clip_on=False,
            color="k",
        )
        ax.add_patch(arrow)
        arrows_right = {
            "x_tail": node,
            "y_tail": 1.3,
            "x_head": node + 1,
            "y_head": 1.3,
        }
        arrow = mpatches.FancyArrowPatch(
            (arrows_right["x_tail"], arrows_right["y_tail"]),
            (arrows_right["x_head"], arrows_right["y_head"]),
            mutation_scale=10,
            clip_on=False,
            color="k",
        )
        ax.add_patch(arrow)

        # Добавить прямоугольник, чтобы отделить каждую пару состояний
        rect = mpatches.Rectangle(
            (node - 1, 0),
            2,
            1,
            linewidth=2,
            edgecolor="k",
            facecolor="none",
            clip_on=False,
        )
        ax.add_patch(rect)

    plt.show()

plot_q_values()

Что приведет к следующему графику вознаграждений и количества шагов до завершения эпизода:
график шагов и вознаграждений

А также к следующему графику изученных Q-значений:
график q-значений

Также, если это поможет, вот три других реализации Q-обучения на Python:

Ответ или решение

Реализация Q-обучения на Python

Q-обучение — это один из фундаметальных алгоритмов в области обучения с подкреплением, который позволяет агентам изучать оптимальные стратегии поведения в средах с последовательными решениями. Давайте рассмотрим простую реализацию Q-обучения на Python, которая подойдет новичкам и поможет вам понять, как работает алгоритм, без использования автоматизированных фреймворков, таких как gym.

Описание проблемы

В данной реализации мы будем использовать простую среду RandomWalk1D. В этой среде агент находится на одном из семи состояний (от 0 до 6) и имеет две действия: двигаться влево (в сторону 0) или вправо (в сторону 6). Если агент достигнет состояния 0, он получит награду -1. Если он достигнет состояния 6, он получит награду +1. На остальных состояниях награды равны 0.

Полный код реализации

# ## Импортируем необходимые библиотеки
import numpy as np
from enum import Enum
from numpy.random import default_rng
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

sns.set_theme()

# ## Параметры
class Params:
    total_episodes = 50  # Всего эпизодов
    learning_rate = 0.3  # Скорость обучения
    gamma = 0.95  # Дисконтирующая ставка
    seed = 42  # Случайное число для воспроизводимости
    n_runs = 100  # Количество пробегов
    epsilon = 0.1  # Вероятность исследования

# Устанавливаем случайный генератор
rng = np.random.default_rng(Params.seed)

# ## Среда
class Actions(Enum):
    Left = 0
    Right = 1

class RandomWalk1D:
    """Среда RandomWalk1D для тестирования алгоритма Q-learning."""

    def __init__(self):
        self.observation_space = np.arange(0, 7)
        self.action_space = [item.value for item in Actions]
        self.right_boundary = 6
        self.left_boundary = 0
        self.reset()

    def reset(self):
        self.current_state = 3
        return self.current_state

    def step(self, action):
        if action == Actions.Left.value:
            new_state = max(self.left_boundary, self.current_state - 1)
        elif action == Actions.Right.value:
            new_state = min(self.right_boundary, self.current_state + 1)
        else:
            raise ValueError("Неверный тип действия")

        self.current_state = new_state
        reward = self.reward(self.current_state)
        is_terminated = self.is_terminated(self.current_state)
        return new_state, reward, is_terminated

    def reward(self, observation):
        if observation == self.right_boundary:
            return 1
        elif observation == self.left_boundary:
            return -1
        return 0

    def is_terminated(self, observation):
        return observation in {self.right_boundary, self.left_boundary}

env = RandomWalk1D()
params = Params()

class Qlearning:
    def __init__(self, learning_rate, gamma, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.reset_qtable()

    def update(self, state, action, reward, new_state):
        """Обновление Q(s,a)."""
        delta = (reward + self.gamma * np.max(self.qtable[new_state, :]) - self.qtable[state, action])
        q_update = self.qtable[state, action] + self.learning_rate * delta
        return q_update

    def reset_qtable(self):
        self.qtable = np.zeros((self.state_size, self.action_size))

# ## Алгоритм исследования: epsilon-greedy
class EpsilonGreedy:
    def __init__(self, epsilon, rng=None):
        self.epsilon = epsilon
        self.rng = rng or default_rng()

    def choose_action(self, action_space, state, qtable):
        explor_exploit_tradeoff = self.rng.uniform(0, 1)

        if explor_exploit_tradeoff < self.epsilon:
            return self.rng.choice(action_space)
        else:
            return np.argmax(qtable[state, :])

# ## Запуск среда
learner = Qlearning(
    learning_rate=Params.learning_rate,
    gamma=Params.gamma,
    state_size=len(env.observation_space),
    action_size=len(env.action_space),
)

explorer = EpsilonGreedy(epsilon=Params.epsilon, rng=rng)

# Основной цикл
rewards = np.zeros((Params.total_episodes, Params.n_runs))
steps = np.zeros((Params.total_episodes, Params.n_runs))
qtables = np.zeros((Params.n_runs, len(env.observation_space), len(env.action_space)))

for run in range(Params.n_runs):
    learner.reset_qtable()

    for episode in tqdm(range(Params.total_episodes), desc=f"Запуск {run+1}/{Params.n_runs}", leave=False):
        state = env.reset()
        done = False
        total_rewards = 0

        while not done:
            action = explorer.choose_action(env.action_space, state, learner.qtable)
            new_state, reward, done = env.step(action)

            learner.qtable[state, action] = learner.update(state, action, reward, new_state)
            total_rewards += reward
            state = new_state

        rewards[episode, run] = total_rewards
        steps[episode, run] = len(reward)

# ## Визуализация результатов
def plot_results(rewards):
    plt.figure(figsize=(12, 6))
    sns.lineplot(data=rewards)
    plt.title("Награды за эпизоды")
    plt.xlabel("Эпизоды")
    plt.ylabel("Награды")
    plt.show()

plot_results(rewards.mean(axis=1))

Объяснение кода

  1. Импорт необходимых библиотек:
    Для работы с данными и визуализацией мы используем numpy, matplotlib, и seaborn.

  2. Определение параметров:
    Мы задаем основные параметры, такие как скорость обучения и количество эпизодов.

  3. Создание среды:
    В классе RandomWalk1D описаны состояния, возможные действия и функция получения награды.

  4. Основные классы обучения:

    • Класс Qlearning управляет Q-таблицей и ее обновлениями.
    • Класс EpsilonGreedy управляет выбором действий на основе стратегии "epsilon-greedy".
  5. Основной цикл обучения:
    В этом цикле агент проходит через множество эпизодов, обучаясь на основе получаемых наград.

  6. Визуализация результатов:
    Для понимания эффективности обучения выводим график наград.

Заключение

Приведенная выше реализация Q-обучения на Python демонстрирует базовые принципы, лежащие в основе алгоритма. Данная структура дает возможность пользователю самостоятельно поэкспериментировать и изменять параметры для наблюдения за изменениями в стратегии агента. Настоятельно рекомендую вам поэкспериментировать с различными параметрами и расширять среду для достижения лучших результатов.

Оцените материал
Добавить комментарий

Капча загружается...