-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtictactoe.py
593 lines (480 loc) · 18.3 KB
/
tictactoe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
# -*- coding: utf-8 -*-
"""
Training TensorFlow neural network to play Tic-Tac-Toe game using one-step Q-learning algorithm.
Requirements:
- TensorFlow (https://www.tensorflow.org/versions/r0.10/get_started/os_setup.html)
- Colorama (pip install colorama)
References:
- Michael L. Littman. Markov games as a framework for multi-agent reinforcement learning.
Machine Learning, 11:157–163, 1994.
- W. T. Uther and M. Veloso. Adversarial reinforcement learning,
School Comput. Sci., Carnegie Mellon Univ., Pittsburgh, PA, 1997.
- R. A. C. Bianchi, C. H. C. Ribeiro, and A. H. R. Costa. Heuristic selection of actions
in multiagent reinforcement learning. In IJCAI’07, Hyderabad, India, 2007.
"""
from __future__ import print_function
import time
import colorama
from colorama import Fore, Back, Style
import numpy as np
import tensorflow as tf
from tensorflow.contrib import layers
from tensorflow.python.ops import nn
# Python 3 compatiblilty hack
try:
xrange
except:
xrange = range
# Board size
board_size = 3
# Number of contiguous marks to win
marks_win = 3
# Win reward
REWARD_WIN = 1.
# Draw reward
REWARD_DRAW = 0.
# Ordinary action reward
REWARD_ACTION = 0.
# Hidden layer size
hidden_layer_size = 50
# Reward discount rate
gamma = 0.8
# Initial exploration rate
epsilon_initial = 1.0
# Final exploration rate
epsilon_final = .01
# Number of training episodes to anneal epsilon
epsilon_anneal_episodes = 5000
# Learning rate
learning_rate = .001
# Number of training episodes to run
episode_max = 10000
# Number of training episodes to accumulate stats
episode_stats = 100
# Run name for tensorboard
run_name = "%s" % int(time.time())
# Directory for storing tensorboard summaries
summary_dir = '/tmp/tensorflow/tictactoe'
def dump_board(sx, so, move_index=None, win_indices=None, q=None):
"""
Dump board state to the terminal.
"""
for i in xrange(board_size):
for j in xrange(board_size):
if (i, j) == move_index:
color = Fore.GREEN
else:
color = Fore.BLACK
if win_indices is not None and (i, j) in win_indices:
color += Back.LIGHTYELLOW_EX
print(" ", end="")
if sx[i, j] and so[i, j]:
print(Fore.RED + "?" + Fore.RESET, end="")
elif sx[i, j]:
print(color + "X" + Style.RESET_ALL, end="")
elif so[i, j]:
print(color + "O" + Style.RESET_ALL, end="")
else:
print(".", end="")
if q is not None:
print(" ", end="")
for j in xrange(board_size):
if (i, j) == move_index:
color = Fore.GREEN
else:
color = Fore.BLACK
if not (sx[i, j] or so[i, j]) or (i, j) == move_index:
print(color + " %6.3f" % q[i, j] + Style.RESET_ALL, end="")
else:
print(Fore.LIGHTBLACK_EX + " * " + Style.RESET_ALL, end="")
print()
print()
def check_win(s):
"""
Count marks and check for the win state.
"""
# Check rows and columns
for i in xrange(board_size):
marks_r = 0
marks_c = 0
win_indices_c = []
win_indices_r = []
for j in xrange(board_size):
# Check row
if s[i, j]:
marks_r += 1
win_indices_r.append((i, j))
if marks_r >= marks_win:
return True, win_indices_r
else:
marks_r = 0
win_indices_r = []
# Check column
if s[j, i]:
marks_c += 1
win_indices_c.append((j, i))
if marks_c >= marks_win:
return True, win_indices_c
else:
marks_c = 0
win_indices_c = []
# Check diagonals
for i in xrange(board_size):
if i+1 < marks_win:
continue # diagonals are shorter than number of marks to win
marks_d1 = 0
marks_d2 = 0
marks_d3 = 0
marks_d4 = 0
win_indices_d1 = []
win_indices_d2 = []
win_indices_d3 = []
win_indices_d4 = []
for j in xrange(i+1):
# Check first (top) pair of diagonals
if s[j, i-j]:
marks_d1 += 1
win_indices_d1.append((j, i-j))
if marks_d1 >= marks_win:
return True, win_indices_d1
else:
marks_d1 = 0
win_indices_d1 = []
if s[j, board_size-i+j-1]:
marks_d2 += 1
win_indices_d2.append((j, board_size-i+j-1))
if marks_d2 >= marks_win:
return True, win_indices_d2
else:
marks_d2 = 0
win_indices_d2 = []
# Check second (bottom) pair of diagonals
if i == board_size-1:
continue # main diagonals already checked
if s[board_size-i+j-1, j]:
marks_d3 += 1
win_indices_d3.append((board_size-i+j-1, j))
if marks_d3 >= marks_win:
return True, win_indices_d3
else:
marks_d3 = 0
win_indices_d3 = []
if s[board_size-i+j-1, board_size-j-1]:
marks_d4 += 1
win_indices_d4.append((board_size-i+j-1, board_size-j-1))
if marks_d4 >= marks_win:
return True, win_indices_d4
else:
marks_d4 = 0
win_indices_d4 = []
return False, []
def check_draw(sx, so):
"""
Check for draw.
"""
return np.all(sx+so)
def train(session, graph_ops, summary_ops, saver):
"""
Train model.
"""
# Initialize variables
session.run(tf.global_variables_initializer())
# Initialize summaries writer for tensorflow
writer = tf.summary.FileWriter(summary_dir + "/" + run_name, session.graph)
summary_op = tf.summary.merge_all()
# Unpack graph ops
q_nn, q_nn_update, s, a, y, loss = graph_ops
# Unpack summary ops
win_rate_summary, episode_length_summary, epsilon_summary, loss_summary = summary_ops
# Setup exploration rate parameters
epsilon = epsilon_initial
epsilon_step = (epsilon_initial - epsilon_final) / epsilon_anneal_episodes
# X player state
sx_t = np.empty([board_size, board_size], dtype=np.bool)
# O player state
so_t = np.empty_like(sx_t)
# Accumulated stats
stats = []
# X move first
move_x = True
episode_num = 1
while episode_num <= episode_max:
# Start new game training episode
sx_t[:] = False
so_t[:] = False
sar_prev = [(None, None, None), (None, None, None)] # [(s, a, r(a)), (s(a), o, r(o)]
move_num = 1
while True:
# Observe the next state
s_t = create_state(move_x, sx_t, so_t)
# Get Q values for all actions
q_t = q_values(session, q_nn, s, s_t)
# Choose action based on epsilon-greedy policy
q_max_index, a_t_index = choose_action(q_t, sx_t, so_t, epsilon)
# Retrieve previous player state/action/reward (if present)
s_t_prev, a_t_prev, r_t_prev = sar_prev.pop(0)
if s_t_prev is not None:
# Calculate updated Q value
y_t_prev = r_t_prev + gamma * q_t[q_max_index]
# Apply equivalent transforms
s_t_prev, a_t_prev = apply_transforms(s_t_prev, a_t_prev)
# Update Q network
q_update(session, q_nn_update,
s, s_t_prev,
a, a_t_prev,
y, [y_t_prev] * len(s_t_prev))
# Apply action to state
r_t, sx_t, so_t, terminal = apply_action(move_x, sx_t, so_t, a_t_index)
a_t = np.zeros_like(sx_t, dtype=np.float32)
a_t[a_t_index] = 1.
if terminal: # win or draw
y_t = r_t # reward for current player
s_t_prev, a_t_prev, r_t_prev = sar_prev[-1] # previous opponent state/action/reward
y_t_prev = r_t_prev - gamma * r_t # discounted negative reward for opponent
# Apply equivalent transforms
s_t, a_t = apply_transforms(s_t, a_t)
s_t_prev, a_t_prev = apply_transforms(s_t_prev, a_t_prev)
# Update Q network
s_up = s_t + s_t_prev
a_up = a_t + a_t_prev
y_up = [y_t] * len(s_t) + [y_t_prev] * len(s_t_prev)
q_update(session, q_nn_update, s, s_up, a, a_up, y, y_up)
# Get episode loss
loss_ep = q_loss(session, loss, s, s_up, a, a_up, y, y_up)
# Play test game before next episode
length_ep, win_x, win_o = test(session, q_nn, s)
stats.append([win_x or win_o, length_ep, loss_ep])
break
# Store state, action and its reward
sar_prev.append((s_t, a_t, r_t))
# Next move
move_x = not move_x
move_num += 1
# Scale down epsilon after episode
if epsilon > epsilon_final:
epsilon -= epsilon_step
# Process stats
if len(stats) >= episode_stats:
mean_win_rate, mean_length, mean_loss = np.mean(stats, axis=0)
print("episode: %d," % episode_num, "epsilon: %.5f," % epsilon,
"mean win rate: %.3f," % mean_win_rate, "mean length: %.3f," % mean_length,
"mean loss: %.3f" % mean_loss)
summary_str = session.run(summary_op, feed_dict={win_rate_summary: mean_win_rate,
episode_length_summary: mean_length,
epsilon_summary: epsilon,
loss_summary: mean_loss})
writer.add_summary(summary_str, episode_num)
stats = []
# Next episode
episode_num += 1
test(session, q_nn, s, dump=True)
def test(session, q_nn, s, dump=False):
"""
Play test game.
"""
# X player state
sx_t = np.zeros([board_size, board_size], dtype=np.bool)
# O player state
so_t = np.zeros_like(sx_t)
move_x = True
move_num = 1
if dump:
print()
while True:
# Choose action
s_t = create_state(move_x, sx_t, so_t)
# Get Q values for all actions
q_t = q_values(session, q_nn, s, s_t)
_q_max_index, a_t_index = choose_action(q_t, sx_t, so_t, -1.)
# Apply action to state
r_t, sx_t, so_t, terminal = apply_action(move_x, sx_t, so_t, a_t_index)
if dump:
if terminal:
if move_x:
_win, win_indices = check_win(sx_t)
else:
_win, win_indices = check_win(so_t)
else:
win_indices = None
print(Fore.CYAN + "Move:", move_num, Fore.RESET + "\n")
dump_board(sx_t, so_t, a_t_index, win_indices, q_t)
if terminal:
if not r_t:
# Draw
if dump:
print("Draw!\n")
return move_num, False, False
elif move_x:
# X wins
if dump:
print("X wins!\n")
return move_num, True, False
# O wins
if dump:
print("O wins!\n")
return move_num, False, True
move_x = not move_x
move_num += 1
def apply_transforms(s, a):
"""
Apply state/action equivalent transforms (rotations/flips).
"""
# Get composite state and apply action to it (with reverse sign to distinct from existing marks)
sa = np.sum(s, 0) - a
# Transpose state from [channel, height, width] to [height, width, channel]
s = np.transpose(s, [1, 2, 0])
s_trans = [s]
a_trans = [a]
sa_trans = [sa]
# Apply rotations
sa_next = sa
for i in xrange(1, 4): # rotate to 90, 180, 270 degrees
sa_next = np.rot90(sa_next)
if same_states(sa_trans, sa_next):
# Skip rotated state matching state already contained in list
continue
s_trans.append(np.rot90(s, i))
a_trans.append(np.rot90(a, i))
sa_trans.append(sa_next)
# Apply flips
sa_next = np.fliplr(sa)
if not same_states(sa_trans, sa_next):
s_trans.append(np.fliplr(s))
a_trans.append(np.fliplr(a))
sa_trans.append(sa_next)
sa_next = np.flipud(sa)
if not same_states(sa_trans, sa_next):
s_trans.append(np.flipud(s))
a_trans.append(np.flipud(a))
sa_trans.append(sa_next)
return [np.transpose(s, [2, 0, 1]) for s in s_trans], a_trans
def same_states(s1, s2):
"""
Check states s1 (or one of in case of array-like) and s2 are the same.
"""
return np.any(np.isclose(np.mean(np.square(s1-s2), axis=(1, 2)), 0))
def create_state(move_x, sx, so):
"""
Create full state from X and O states.
"""
return np.array([sx, so] if move_x else [so, sx], dtype=np.float)
def choose_action(q, sx, so, epsilon):
"""
Choose action index for given state.
"""
# Get valid action indices
a_vindices = np.where((sx+so)==False)
a_tvindices = np.transpose(a_vindices)
q_max_index = tuple(a_tvindices[np.argmax(q[a_vindices])])
# Choose next action based on epsilon-greedy policy
if np.random.random() <= epsilon:
# Choose random action from list of valid actions
a_index = tuple(a_tvindices[np.random.randint(len(a_tvindices))])
else:
# Choose valid action w/ max Q
a_index = q_max_index
return q_max_index, a_index
def apply_action(move_x, sx, so, a_index):
"""
Apply action to state, get reward and check for terminal state.
"""
if move_x:
s = sx
else:
s = so
s[a_index] = True
win, _win_indices = check_win(s)
if win:
return REWARD_WIN, sx, so, True
if check_draw(sx, so):
return REWARD_DRAW, sx, so, True
return REWARD_ACTION, sx, so, False
def q_values(session, q_nn, s, s_t):
"""
Get Q values for actions from network for given state.
"""
return q_nn.eval(session=session, feed_dict={s: [s_t]})[0]
def q_update(session, q_nn_update, s, s_t, a, a_t, y, y_t):
"""
Update Q network with (s, a, y) values.
"""
session.run(q_nn_update, feed_dict={s: s_t, a: a_t, y: y_t})
def q_loss(session, loss, s, s_t, a, a_t, y, y_t):
"""
Get loss for (s, a, y) values.
"""
return loss.eval(session=session, feed_dict={s: s_t, a: a_t, y: y_t})
def build_summaries():
"""
Build tensorboard summaries.
"""
win_rate_op = tf.Variable(0.)
tf.summary.scalar("Win Rate", win_rate_op)
episode_length_op = tf.Variable(0.)
tf.summary.scalar("Episode Length", episode_length_op)
epsilon_op = tf.Variable(0.)
tf.summary.scalar("Epsilon", epsilon_op)
loss_op = tf.Variable(0.)
tf.summary.scalar("Loss", loss_op)
return win_rate_op, episode_length_op, epsilon_op, loss_op
def build_graph():
"""
Build tensorflow Q network graph.
"""
s = tf.placeholder(tf.float32, [None, 2, board_size, board_size], name="s")
# Inputs shape: [batch, channel, height, width] need to be changed into
# shape [batch, height, width, channel]
net = tf.transpose(s, [0, 2, 3, 1])
# Flatten inputs
net = tf.reshape(net, [-1, int(np.prod(net.get_shape().as_list()[1:]))])
# Hidden fully connected layer
net = layers.fully_connected(net, hidden_layer_size, activation_fn=nn.relu)
# Output layer
net = layers.fully_connected(net, board_size*board_size, activation_fn=None)
# Reshape output to board actions
q_nn = tf.reshape(net, [-1, board_size, board_size])
# Define loss and gradient update ops
a = tf.placeholder(tf.float32, [None, board_size, board_size], name="a")
y = tf.placeholder(tf.float32, [None], name="y")
action_q_values = tf.reduce_sum(tf.multiply(q_nn, a), axis=[1, 2])
loss = tf.reduce_mean(tf.square(y - action_q_values))
optimizer = tf.train.AdamOptimizer(learning_rate)
q_nn_update = optimizer.minimize(loss, var_list=tf.trainable_variables())
return q_nn, q_nn_update, s, a, y, loss
def main(_):
with tf.Session() as session:
graph_ops = build_graph()
summary_ops = build_summaries()
saver = tf.train.Saver(max_to_keep=5)
train(session, graph_ops, summary_ops, saver)
def parse_flags():
global run_name, board_size, marks_win, episode_max, learning_rate, gamma, epsilon_initial, \
epsilon_final, epsilon_anneal_episodes, hidden_layer_size, summary_dir
flags = tf.app.flags
flags.DEFINE_string("name", run_name, "Tensorboard run name")
flags.DEFINE_string("summary_dir", summary_dir, "Tensorboard summary directory")
flags.DEFINE_integer("board_size", board_size, "Board size")
flags.DEFINE_integer("marks_win", marks_win, "Number of contiguous marks to win")
flags.DEFINE_integer("hidden_layer_size", hidden_layer_size, "Hidden layer size")
flags.DEFINE_integer("episodes", episode_max, "Number of training episodes to run")
flags.DEFINE_float("learning_rate", learning_rate, "Learning rate")
flags.DEFINE_float("gamma", gamma, "Reward discount rate")
flags.DEFINE_float("epsilon_initial", epsilon_initial, "Initial exploration rate")
flags.DEFINE_float("epsilon_final", epsilon_final, "Final exploration rate")
flags.DEFINE_integer("epsilon_anneal", epsilon_anneal_episodes, "Number of training episodes to anneal epsilon")
FLAGS = flags.FLAGS
run_name = FLAGS.name
summary_dir = FLAGS.summary_dir
board_size = FLAGS.board_size
marks_win = FLAGS.marks_win
hidden_layer_size = FLAGS.hidden_layer_size
episode_max = FLAGS.episodes
learning_rate = FLAGS.learning_rate
gamma = FLAGS.gamma
epsilon_initial = FLAGS.epsilon_initial
epsilon_final = FLAGS.epsilon_final
epsilon_anneal_episodes = FLAGS.epsilon_anneal
if __name__ == "__main__":
colorama.init()
parse_flags()
tf.app.run()