forked from lakinwecker/ChessReanalysis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
preprocess.py
164 lines (144 loc) · 6.03 KB
/
preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from models import *
import chess, chess.pgn, chess.engine
import json
import tqdm
from multiprocessing import Pool, Process, Manager
import sys
sys.setrecursionlimit(10000)
def progress(args):
total_moves = args['total_moves']
progress_queue = args['progress_queue']
pbar = tqdm.tqdm(total=total_moves, desc="Warming the engines")
quit = False
while not quit:
values = progress_queue.get()
if values == "QUIT":
quit = True
else:
number, id, move, total_moves = values
pbar.update(1)
pbar.set_description(f"#{number:02d} [{id}] {move: 3d}/{total_moves: 3d}")
def run(working_set):
# Exclude already-processed games
to_process = set(working_set.keys()) - {g.id for g in Game.select(Game.id).where(Game.is_analyzed == True)}
print(f'Skipping {len(working_set) - len(to_process)} already-processed games')
engine_config = load_engine_config()
parallelism = engine_config.get('parallelism', 1)
print("Starting pool")
manager = Manager()
progress_queue = manager.Queue()
lock = manager.Lock()
total_moves = 0
for gid in to_process:
total_moves += len(list(working_set[gid].mainline_moves()))
pool = Pool(parallelism)
process_args = [
{
'progress_queue': progress_queue,
'db_lock': lock,
'total': len(to_process),
'game_number': i,
"pgn": working_set[gid],
"gid": gid
} for i, gid in enumerate(to_process)
]
print(f"Processing {parallelism} / {len(to_process)} games at a time")
pool.map_async(process_game, process_args)
progress_args = ({
'total_moves': total_moves,
'progress_queue': progress_queue,
'db_lock': lock,
},)
p = Process(target=progress, args=progress_args)
p.start()
pool.close()
pool.join()
progress_queue.put("QUIT")
p.join()
def process_game(args):
#print("Process Game")
game_number = args['game_number']
progress_queue = args['progress_queue']
db_lock = args['db_lock']
gid = args['gid']
pgn = args['pgn']
moves = list(pgn.mainline_moves())
# Get the game DB object and the PGN moves
with db_lock:
game_obj, _ = Game.get_or_create(id=gid)
white, _ = Player.get_or_create(username=pgn.headers['White'].lower())
black, _ = Player.get_or_create(username=pgn.headers['Black'].lower())
GamePlayer.get_or_create(game=game_obj, color='w', defaults={'player':white})
GamePlayer.get_or_create(game=game_obj, color='b', defaults={'player':black})
# Set up the engine
engine_config = load_engine_config()
engine = init_engine(engine_config)
# Set up the board
board = pgn.board()
for m in moves:
board.push(m)
# Process each move in the game in reverse order
moves_processed = 0
for played_move in reversed(moves):
board.pop()
moves_processed += 1
color = 'w' if board.turn == chess.WHITE else 'b'
# Skip already-processed moves
try:
with db_lock:
move = Move.get(game=game_obj, color=color, number=board.fullmove_number)
continue
except DoesNotExist:
pass
progress_queue.put([game_number, gid, moves_processed, len(moves)])
while True:
try:
# Run the engine for the top 5 moves
info = engine.analyse(board, chess.engine.Limit(nodes=engine_config['nodes']), multipv=5, options=engine_config['options'])
# Get the engine results
pvs = {i+1: info['pv'][0] for (i, info) in enumerate(info)}
evals = {i+1: score_to_cp(info['score']) for (i, info) in enumerate(info)}
played_index = None
for i, move in pvs.items():
if move == played_move:
played_index = i
if not played_index:
# The played move was not in the top 5, so we need to analyze it separately
board.push(played_move)
if board.is_checkmate():
played_eval = 29999 if board.turn == chess.BLACK else -29999
else:
one_move_info = engine.analyse(board, chess.engine.Limit(nodes=engine_config['nodes']), multipv=1, options=engine_config['options'])
played_eval = -score_to_cp(one_move_info[0]['score'])
board.pop()
else:
# The played move was in the top 5, so we can copy the corresponding eval to save time
played_eval = evals[played_index]
# Store the evaluations in the DB
with db_lock:
move = Move.create(game=game_obj, color=color, number=board.fullmove_number, \
pv1_eval=evals.get(1), pv2_eval=evals.get(2), pv3_eval=evals.get(3), \
pv4_eval=evals.get(4), pv5_eval=evals.get(5), \
played_rank=played_index, played_eval=played_eval, \
nodes=info[0].get('nodes'), masterdb_matches=masterdb_matches(board, move))
break
except TypeError:
# If we get a bad engine output, score_to_cp will throw a TypeError. We can just retry
continue
with db_lock:
game_obj.is_analyzed = True
game_obj.save()
engine.quit()
def masterdb_matches(board, move):
pass
def score_to_cp(score):
# Some arbitrary extreme values have been picked to represent mate
if score.is_mate():
return 30000 - score.relative.mate() if score.relative.mate() > 0 else -30000 - score.relative.mate()
return min(max(score.relative.score(), -29000), 29000)
def load_engine_config():
with open('./config/engine.json') as config_f:
return json.load(config_f)
def init_engine(config):
engine = chess.engine.SimpleEngine.popen_uci(config['path'])
return engine