-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtraining.py
More file actions
93 lines (79 loc) · 3.22 KB
/
training.py
File metadata and controls
93 lines (79 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/env python3
from tensorflow.keras import layers
import os
import numpy as np
import chess.pgn
import time
import tensorflow as tf
import chess
def get_dataset(num_samples=None, total_samples_limit=None):
X, Y = [], []
gn = 0
total_samples = 0
values = {'1/2-1/2': 0, '0-1': -1, '1-0': 1}
with open(r'C:/Users/ataka/Desktop/games.pgn') as pgn:
while True:
game = chess.pgn.read_game(pgn)
if game is None:
break
res = game.headers['Result']
if res not in values:
continue
value = values[res]
board = game.board()
for i, move in enumerate(game.mainline_moves()):
if total_samples_limit is not None and total_samples >= total_samples_limit:
return
board.push(move)
ser = board_to_feature_vector(board)
X.append(ser)
Y.append(value)
total_samples += 1
if num_samples is not None and len(X) >= num_samples:
yield np.array(X), np.array(Y)
X, Y = [], []
print("parsing game %d, got %d examples" % (gn, len(X)))
gn += 1
def board_to_feature_vector(board):
piece_map = board.piece_map()
feature_vector = np.zeros((8, 8, 12), dtype=np.uint8)
for square, piece in piece_map.items():
row, col = divmod(square, 8)
layer = {"P": 0, "N": 1, "B": 2, "R": 3, "Q": 4, "K": 5,
"p": 6, "n": 7, "b": 8, "r": 9, "q": 10, "k": 11}[piece.symbol()]
feature_vector[row, col, layer] = 1
return feature_vector
def create_cnn(input_shape=(8, 8, 12)):
model = tf.keras.Sequential([
layers.Conv2D(32, kernel_size=(3, 3), activation="relu", padding="same", input_shape=input_shape),
layers.BatchNormalization(),
layers.Conv2D(64, kernel_size=(3, 3), activation="relu", padding="same"),
layers.BatchNormalization(),
layers.MaxPooling2D(pool_size=(2, 2)),
layers.Flatten(),
layers.Dense(128, activation="relu"),
layers.BatchNormalization(),
layers.Dense(1, activation="tanh")
])
model.compile(optimizer="adam", loss="mean_squared_error", metrics=["mean_absolute_error"])
return model
def train_model(model, X, Y):
model.fit(X, Y, epochs=10, batch_size=64, validation_split=0.2)
if __name__ == "__main__":
cnn_model = create_cnn()
start_time = time.time()
batch_rounds = 0
total_examples = 0
try:
for X, Y in get_dataset(10000, 5000000): # generate batches of 10000, stop after Y total samples
cnn_model.fit(X, Y, epochs=10)
batch_rounds += 1
total_examples += len(X)
elapsed_time = time.time() - start_time
print(f"Finished batch round {batch_rounds}. Total examples: {total_examples}. Elapsed time: {elapsed_time:.2f} seconds.")
except KeyboardInterrupt:
print("Interrupted, saving model...")
finally:
elapsed_time = time.time() - start_time
print(f"Total examples processed: {total_examples}. Total elapsed time: {elapsed_time:.2f} seconds.")
cnn_model.save('trained_model.h5')