Вопрос или проблема
Я новичок в машинном обучении и пытаюсь изучить Q-обучение. Я прочитал несколько текстов и понимаю общий смысл, но что бы мне действительно хотелось увидеть, так это простой пример алгоритма Q-обучения на Python, который я мог бы запустить и поэкспериментировать с ним.
Он может решать самые простые игры, мне не нужно ничего замысловатого.
Я искал и нашел много примеров, которые используют фреймворк gym
. Этот фреймворк выглядит отлично, и я, вероятно, буду использовать его позже, но я хочу самую простую версию алгоритма Q-обучения, без автоматизации каких-либо процессов. Я думаю, это поможет мне лучше понять, что происходит.
Знаете ли вы о простой реализации Q-обучения на Python?
Вот мое собственное предложение ниже:
class Qlearning:
def __init__(self, learning_rate, gamma, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.gamma = gamma
self.reset_qtable()
def update(self, state, action, reward, new_state):
"""Обновить Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]"""
delta = (
reward
+ self.gamma * np.max(self.qtable[new_state, :])
- self.qtable[state, action]
)
q_update = self.qtable[state, action] + self.learning_rate * delta
return q_update
def reset_qtable(self):
"""Сбросить Q-таблицу."""
self.qtable = np.zeros((self.state_size, self.action_size))
А вот полный пример, чтобы проиллюстрировать, как это работает в простой среде RandomWalk1D
(полная записная книжка доступна здесь):
# ## Зависимости
from typing import NamedTuple
from enum import Enum
import numpy as np
from numpy.random import default_rng
from tqdm import tqdm
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import pandas as pd
import seaborn as sns
sns.set_theme()
# ## Параметры
class Params(NamedTuple):
total_episodes: int # Общее количество эпизодов
learning_rate: float # Скорость обучения
gamma: float # Коэффициент дисконтирования
seed: int # Определить начальное значение, чтобы получить воспроизводимые результаты
n_runs: int # Количество запусков
action_size: int # Количество возможных действий
state_size: int # Количество возможных состояний
epsilon: float # Вероятность исследования
params = Params(
total_episodes=50,
learning_rate=0.3,
gamma=0.95,
seed=42,
n_runs=100,
action_size=None,
state_size=None,
epsilon=0.1,
)
# Установить начальное значение
rng = np.random.default_rng(params.seed)
# ## Среда
class Actions(Enum):
Left = 0
Right = 1
class RandomWalk1D:
"""`RandomWalk1D` для тестирования алгоритма Q-обучения.
Агент (A) начинает в состоянии 3.
Действия, которые он может предпринять, - это идти налево или направо.
Эпизод заканчивается, когда он достигает состояния 0 или 6.
Когда он достигает состояния 0, получает вознаграждение -1,
когда достигает состояния 6, получает вознаграждение +1.
В любом другом состоянии получает вознаграждение ноль.
Вознаграждения: -1 <-A-> +1
Состояния: <-0-1-2-3-4-5-6->
Среда вдохновлена учебником `ReinforcementLearning.jl`:
https://juliareinforcementlearning.org/docs/tutorial/
"""
def __init__(self):
self.observation_space = np.arange(0, 7)
self.action_space = [item.value for item in list(Actions)]
self.right_boundary = 6
self.left_boundary = 0
self.reset()
def reset(self):
self.current_state = 3
return self.current_state
def step(self, action):
if action == Actions.Left.value:
new_state = np.max([self.left_boundary, self.current_state - 1])
elif action == Actions.Right.value:
new_state = np.min([self.right_boundary, self.current_state + 1])
else:
raise ValueError("Невозможный тип действия")
self.current_state = new_state
reward = self.reward(self.current_state)
is_terminated = self.is_terminated(self.current_state)
return new_state, reward, is_terminated
def reward(self, observation):
reward = 0
if observation == self.right_boundary:
reward = 1
elif observation == self.left_boundary:
reward = -1
return reward
def is_terminated(self, observation):
is_terminated = False
if observation == self.right_boundary or observation == self.left_boundary:
is_terminated = True
return is_terminated
env = RandomWalk1D()
params = params._replace(action_size=len(env.action_space))
params = params._replace(state_size=len(env.observation_space))
print(f"Размер действия: {params.action_size}")
print(f"Размер состояния: {params.state_size}")
# ## Алгоритм обучения: Q-обучение
class Qlearning:
def __init__(self, learning_rate, gamma, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.gamma = gamma
self.reset_qtable()
def update(self, state, action, reward, new_state):
"""Обновить Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]"""
delta = (
reward
+ self.gamma * np.max(self.qtable[new_state, :])
- self.qtable[state, action]
)
q_update = self.qtable[state, action] + self.learning_rate * delta
return q_update
def reset_qtable(self):
"""Сбросить Q-таблицу."""
self.qtable = np.zeros((self.state_size, self.action_size))
# ## Алгоритм исследователя: epsilon-жадный
class EpsilonGreedy:
def __init__(self, epsilon, rng=None):
self.epsilon = epsilon
if rng:
self.rng = rng
else:
self.rng = default_rng()
def choose_action(self, action_space, state, qtable):
"""Выберите действие `a` в текущем состоянии мира (s)."""
# Сначала случайно генерируем число
explor_exploit_tradeoff = self.rng.uniform(0, 1)
def sample(action_space):
return self.rng.choice(action_space)
# Исследование
if explor_exploit_tradeoff < self.epsilon:
action = sample(action_space)
# Эксплуатация (выбор наибольшего значения Q для этого состояния)
else:
# Разрываем ничьи случайным образом
# Если все действия равны для этого состояния, мы выбираем случайное
# (в противном случае `np.argmax()` всегда выберет первое)
if np.all(qtable[state, :]) == qtable[state, 0]:
action = sample(action_space)
else:
action = np.argmax(qtable[state, :])
return action
# ## Запуск среды
learner = Qlearning(
learning_rate=params.learning_rate,
gamma=params.gamma,
state_size=params.state_size,
action_size=params.action_size,
)
explorer = EpsilonGreedy(epsilon=params.epsilon, rng=rng)
# Это будет наша основная функция для запуска нашей среды до максимального
# количества эпизодов `params.total_episodes`.
# Для учета стохастичности мы также запустим нашу среду несколько раз.
rewards = np.zeros((params.total_episodes, params.n_runs))
steps = np.zeros((params.total_episodes, params.n_runs))
episodes = np.arange(params.total_episodes)
qtables = np.zeros((params.n_runs, params.state_size, params.action_size))
all_states = []
all_actions = []
for run in range(params.n_runs): # Запускаем несколько раз для учета стохастичности
learner.reset_qtable() # Сбрасываем Q-таблицу между запусками
for episode in tqdm(
episodes, desc=f"Запуск {run}/{params.n_runs} - Эпизоды", leave=False
):
state = env.reset() # Сбрасываем среду
step = 0
done = False
total_rewards = 0
while not done:
action = explorer.choose_action(
action_space=env.action_space, state=state, qtable=learner.qtable
)
# Записываем все состояния и действия
all_states.append(state)
all_actions.append(action)
# Выполняем действие (a) и наблюдаем за новым состоянием(s') и вознаграждением (r)
new_state, reward, done = env.step(action)
learner.qtable[state, action] = learner.update(
state, action, reward, new_state
)
total_rewards += reward
step += 1
# Наше новое состояние - это состояние
state = new_state
# Записываем все вознаграждения и шаги
rewards[episode, run] = total_rewards
steps[episode, run] = step
qtables[run, :, :] = learner.qtable
# ## Визуализация
def postprocess(episodes, params, rewards, steps, qtables):
"""Преобразовать результаты симуляции в датафреймы."""
res = pd.DataFrame(
data={
"Эпизоды": np.tile(episodes, reps=params.n_runs),
"Вознаграждения": rewards.flatten(order="F"),
"Шаги": steps.flatten(order="F"),
}
)
# res["cum_rewards"] = rewards.cumsum(axis=0).flatten(order="F")
qtable = qtables.mean(axis=0) # Среднее значение Q-таблицы между запусками
return res, qtable
res, qtable = postprocess(episodes, params, rewards, steps, qtables)
def plot_steps_and_rewards(df):
"""Построить график шагов и вознаграждений из датафреймов."""
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15, 5))
sns.lineplot(data=df, x="Эпизоды", y="Вознаграждения", ax=ax[0])
ax[0].set(ylabel=f"Вознаграждения\nв среднем по {params.n_runs} запускам")
sns.lineplot(data=df, x="Эпизоды", y="Шаги", ax=ax[1])
ax[1].set(ylabel=f"Количество шагов\nв среднем по {params.n_runs} запускам")
fig.tight_layout()
plt.show()
plot_steps_and_rewards(res)
qtable_flat = qtable.flatten()[np.newaxis, :]
def plot_q_values():
fig, ax = plt.subplots(figsize=(15, 1.5))
cmap = sns.color_palette("vlag", as_cmap=True)
chart = sns.heatmap(
qtable.flatten()[np.newaxis, :],
annot=True,
ax=ax,
cmap=cmap,
yticklabels=False, # linewidth=0.5
center=0,
)
states_nodes = np.arange(1, 14, 2)
chart.set_xticks(states_nodes)
chart.set_xticklabels([str(item) for item in np.arange(0, 7, 1)])
chart.set_title("Q значения")
ax.tick_params(bottom=True)
# Добавить стрелки действия
for node in states_nodes:
arrows_left = {"x_tail": node, "y_tail": 1.3, "x_head": node - 1, "y_head": 1.3}
arrow = mpatches.FancyArrowPatch(
(arrows_left["x_tail"], arrows_left["y_tail"]),
(arrows_left["x_head"], arrows_left["y_head"]),
mutation_scale=10,
clip_on=False,
color="k",
)
ax.add_patch(arrow)
arrows_right = {
"x_tail": node,
"y_tail": 1.3,
"x_head": node + 1,
"y_head": 1.3,
}
arrow = mpatches.FancyArrowPatch(
(arrows_right["x_tail"], arrows_right["y_tail"]),
(arrows_right["x_head"], arrows_right["y_head"]),
mutation_scale=10,
clip_on=False,
color="k",
)
ax.add_patch(arrow)
# Добавить прямоугольник, чтобы отделить каждую пару состояний
rect = mpatches.Rectangle(
(node - 1, 0),
2,
1,
linewidth=2,
edgecolor="k",
facecolor="none",
clip_on=False,
)
ax.add_patch(rect)
plt.show()
plot_q_values()
Что приведет к следующему графику вознаграждений и количества шагов до завершения эпизода:
А также к следующему графику изученных Q-значений:
Также, если это поможет, вот три других реализации Q-обучения на Python:
- Из реализации Шантона Чжана книги Саттона и Барто Укрепляющее обучение: введение (2-е издание): https://github.com/ShangtongZhang/reinforcement-learning-an-introduction/blob/master/chapter06/cliff_walking.py#L128
- Из курса глубокого укрепляющего обучения Томаса Симонини: https://github.com/simoninithomas/Deep_reinforcement_learning_Course/blob/master/Q%20learning/FrozenLake/Q%20Learning%20with%20FrozenLake.ipynb
- Из книги Тима Миллера (Университет Мельбурна) Введение в укрепляющее обучение: https://gibberblot.github.io/rl-notes/index.html
- Из книги Погружение в глубокое обучение: https://d2l.ai/chapter_reinforcement-learning/qlearning.html#implementation-of-q-learning
Ответ или решение
Реализация Q-обучения на Python
Q-обучение — это один из фундаметальных алгоритмов в области обучения с подкреплением, который позволяет агентам изучать оптимальные стратегии поведения в средах с последовательными решениями. Давайте рассмотрим простую реализацию Q-обучения на Python, которая подойдет новичкам и поможет вам понять, как работает алгоритм, без использования автоматизированных фреймворков, таких как gym
.
Описание проблемы
В данной реализации мы будем использовать простую среду RandomWalk1D
. В этой среде агент находится на одном из семи состояний (от 0 до 6) и имеет две действия: двигаться влево (в сторону 0) или вправо (в сторону 6). Если агент достигнет состояния 0, он получит награду -1. Если он достигнет состояния 6, он получит награду +1. На остальных состояниях награды равны 0.
Полный код реализации
# ## Импортируем необходимые библиотеки
import numpy as np
from enum import Enum
from numpy.random import default_rng
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
sns.set_theme()
# ## Параметры
class Params:
total_episodes = 50 # Всего эпизодов
learning_rate = 0.3 # Скорость обучения
gamma = 0.95 # Дисконтирующая ставка
seed = 42 # Случайное число для воспроизводимости
n_runs = 100 # Количество пробегов
epsilon = 0.1 # Вероятность исследования
# Устанавливаем случайный генератор
rng = np.random.default_rng(Params.seed)
# ## Среда
class Actions(Enum):
Left = 0
Right = 1
class RandomWalk1D:
"""Среда RandomWalk1D для тестирования алгоритма Q-learning."""
def __init__(self):
self.observation_space = np.arange(0, 7)
self.action_space = [item.value for item in Actions]
self.right_boundary = 6
self.left_boundary = 0
self.reset()
def reset(self):
self.current_state = 3
return self.current_state
def step(self, action):
if action == Actions.Left.value:
new_state = max(self.left_boundary, self.current_state - 1)
elif action == Actions.Right.value:
new_state = min(self.right_boundary, self.current_state + 1)
else:
raise ValueError("Неверный тип действия")
self.current_state = new_state
reward = self.reward(self.current_state)
is_terminated = self.is_terminated(self.current_state)
return new_state, reward, is_terminated
def reward(self, observation):
if observation == self.right_boundary:
return 1
elif observation == self.left_boundary:
return -1
return 0
def is_terminated(self, observation):
return observation in {self.right_boundary, self.left_boundary}
env = RandomWalk1D()
params = Params()
class Qlearning:
def __init__(self, learning_rate, gamma, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.gamma = gamma
self.reset_qtable()
def update(self, state, action, reward, new_state):
"""Обновление Q(s,a)."""
delta = (reward + self.gamma * np.max(self.qtable[new_state, :]) - self.qtable[state, action])
q_update = self.qtable[state, action] + self.learning_rate * delta
return q_update
def reset_qtable(self):
self.qtable = np.zeros((self.state_size, self.action_size))
# ## Алгоритм исследования: epsilon-greedy
class EpsilonGreedy:
def __init__(self, epsilon, rng=None):
self.epsilon = epsilon
self.rng = rng or default_rng()
def choose_action(self, action_space, state, qtable):
explor_exploit_tradeoff = self.rng.uniform(0, 1)
if explor_exploit_tradeoff < self.epsilon:
return self.rng.choice(action_space)
else:
return np.argmax(qtable[state, :])
# ## Запуск среда
learner = Qlearning(
learning_rate=Params.learning_rate,
gamma=Params.gamma,
state_size=len(env.observation_space),
action_size=len(env.action_space),
)
explorer = EpsilonGreedy(epsilon=Params.epsilon, rng=rng)
# Основной цикл
rewards = np.zeros((Params.total_episodes, Params.n_runs))
steps = np.zeros((Params.total_episodes, Params.n_runs))
qtables = np.zeros((Params.n_runs, len(env.observation_space), len(env.action_space)))
for run in range(Params.n_runs):
learner.reset_qtable()
for episode in tqdm(range(Params.total_episodes), desc=f"Запуск {run+1}/{Params.n_runs}", leave=False):
state = env.reset()
done = False
total_rewards = 0
while not done:
action = explorer.choose_action(env.action_space, state, learner.qtable)
new_state, reward, done = env.step(action)
learner.qtable[state, action] = learner.update(state, action, reward, new_state)
total_rewards += reward
state = new_state
rewards[episode, run] = total_rewards
steps[episode, run] = len(reward)
# ## Визуализация результатов
def plot_results(rewards):
plt.figure(figsize=(12, 6))
sns.lineplot(data=rewards)
plt.title("Награды за эпизоды")
plt.xlabel("Эпизоды")
plt.ylabel("Награды")
plt.show()
plot_results(rewards.mean(axis=1))
Объяснение кода
-
Импорт необходимых библиотек:
Для работы с данными и визуализацией мы используемnumpy
,matplotlib
, иseaborn
. -
Определение параметров:
Мы задаем основные параметры, такие как скорость обучения и количество эпизодов. -
Создание среды:
В классеRandomWalk1D
описаны состояния, возможные действия и функция получения награды. -
Основные классы обучения:
- Класс
Qlearning
управляет Q-таблицей и ее обновлениями. - Класс
EpsilonGreedy
управляет выбором действий на основе стратегии "epsilon-greedy".
- Класс
-
Основной цикл обучения:
В этом цикле агент проходит через множество эпизодов, обучаясь на основе получаемых наград. -
Визуализация результатов:
Для понимания эффективности обучения выводим график наград.
Заключение
Приведенная выше реализация Q-обучения на Python демонстрирует базовые принципы, лежащие в основе алгоритма. Данная структура дает возможность пользователю самостоятельно поэкспериментировать и изменять параметры для наблюдения за изменениями в стратегии агента. Настоятельно рекомендую вам поэкспериментировать с различными параметрами и расширять среду для достижения лучших результатов.