This repository contains implementations of several reinforcement learning (RL) algorithms to solve the Taxi-v3 environment from the OpenAI Gym. The algorithms include Value Iteration, Policy Iteration, Q-Learning, and Direct Evaluation (Monte Carlo). The project also includes functionality to visualize the performance of the trained policies through video recordings.
To run the code, you need to have Python installed along with the required libraries. You can install the dependencies using the following command:
pip install -r requirements.txt
To test a policy and visualize it, use the test_policy_video
function. For instance:
from IPython.display import HTML
# Assuming `optimal_policy` is defined
video_html = test_policy_video(optimal_policy, 'policy-name')
HTML(video_html)
-
Initializing Parameters
taxi = Taxi(num_iter=200, num_evaluate_steps=20, discount_rate=0.8)
-
Finding Optimal State Values
def value_iteration(vtable, num_iter, discount_rate): for _ in range(num_iter): v_old = np.copy(vtable) for state in range(taxi.state_size): temp_action = np.zeros(taxi.action_size) for action in range(taxi.action_size): action_expected_value = 0 transitions = taxi.env.P[state][action] if type(transitions) == tuple: transitions = [transitions] for transition in transitions: action_expected_value += transition[0] * (transition[2] + discount_rate * vtable[transition[1]]) temp_action[action] = action_expected_value vtable[state] = np.max(temp_action)
-
Extracting The Optimal Policy
def optimal_policy_extraction(vtable, ptable, num_iter, discount_rate): value_iteration(vtable, num_iter, discount_rate) for state in range(taxi.state_size): temp_action = np.zeros(taxi.action_size) for action in range(taxi.action_size): action_expected_value = 0 transitions = taxi.env.P[state][action] if type(transitions) == tuple: transitions = [transitions] for transition in transitions: action_expected_value += transition[0] * (transition[2] + discount_rate * vtable[transition[1]]) temp_action[action] = action_expected_value ptable[state] = np.argmax(temp_action)
-
Running The Algorithm
optimal_policy_extraction(taxi.vtable, taxi.ptable, taxi.num_iter, taxi.discount_rate) optimal_policy = taxi.ptable.copy()
-
Initialize Parameters
taxi = Taxi(num_iter=200, num_evaluate_steps=20, discount_rate=0.8)
-
Policy Evaluation
def evaluate(num_iter, discount_rate): vtable = np.zeros(taxi.state_size) for _ in range(num_iter): v_old = np.copy(vtable) for state in range(taxi.state_size): action = taxi.ptable[state] action_expected_value = 0 transitions = taxi.env.P[state][action] if type(transitions) == tuple: transitions = [transitions] for transition in transitions: action_expected_value += transition[0] * (transition[2] + discount_rate * vtable[transition[1]]) vtable[state] = action_expected_value return vtable
-
Policy Improvement
def improvement(ptable, num_iter, num_evaluate_steps, discount_rate): for _ in range(num_iter): vtable = evaluate(num_evaluate_steps, discount_rate).copy() for state in range(taxi.state_size): temp_action = np.zeros(taxi.action_size) for action in range(taxi.action_size): action_expected_value = 0 transitions = taxi.env.P[state][action] if type(transitions) == tuple: transitions = [transitions] for transition in transitions: action_expected_value += transition[0] * (transition[2] + discount_rate * vtable[transition[1]]) temp_action[action] = action_expected_value ptable[state] = np.argmax(temp_action)
-
Running The Algorithm
improvement(taxi.ptable, taxi.num_iter, taxi.num_evaluate_steps, taxi.discount_rate)
-
Initializing Parameters
taxi = Taxi(num_episodes=10000, max_steps=99, learning_rate=0.9, discount_rate=0.8, epsilon=1.0, decay_rate=0.005)
-
Training
def q_learning_train(qtable, num_episodes, max_steps, learning_rate, discount_rate, epsilon, decay_rate): for episode in range(num_episodes): state, info = taxi.env.reset() done = False for s in range(max_steps): if random.uniform(0, 1) < epsilon: action = taxi.env.action_space.sample() else: action = np.argmax(qtable[state]) new_state, reward, done, truncated, info = taxi.env.step(action) qtable[state][action] = (1 - learning_rate) * qtable[state][action] + learning_rate * (reward + discount_rate * np.max(qtable[new_state])) state = new_state if done: break epsilon = np.exp(-decay_rate * episode)
-
Running The Algorithm
q_learning_train(taxi.qtable, taxi.num_episodes, taxi.max_steps, taxi.learning_rate, taxi.discount_rate, taxi.epsilon, taxi.decay_rate)
-
Extracting The Policy
for state in range(taxi.state_size): taxi.ptable[state] = np.argmax(taxi.qtable[state][:])
-
Initializing Parameters
taxi = Taxi(num_episodes=10000, max_steps=99, discount_rate=0.8)
-
Training
def monte_carlo(ptable, num_episodes, max_steps, discount_rate): count = np.ones(taxi.state_size) vtable = np.zeros(taxi.state_size) for _ in range(num_episodes): state, info = taxi.env.reset() done = False trajectory = [state] rewards = [] for s in range(max_steps): new_state, reward, done, truncated, info = taxi.env.step(ptable[state]) trajectory.append(new_state) count[new_state] += 1 rewards.append(reward) if done: break discounted_reward_sum = 0 rewards = list(reversed(rewards)) trajectory = list(reversed(trajectory))[1:] for i in range(len(trajectory)): discounted_reward_sum += rewards[i] vtable[trajectory[i]] += discounted_reward_sum discounted_reward_sum *= discount_rate vtable /= count return vtable
-
Testing
optimal_policy_state_values = monte_carlo(optimal_policy, taxi.num_episodes, taxi.max_steps, taxi.discount_rate) print(optimal_policy_state_values)
Each algorithm's performance can be tested and visualized using the test_policy_video
function. This function records the environment's behavior under the specified policy and returns an HTML video element for display in Jupyter Notebooks.
This project uses the Taxi-v3 environment from OpenAI's Gym. The code structure and implementation details are inspired by common RL practices and algorithms.