Skip to content

Commit 9e56b11

Browse files
Feature/js/parallelize matrix build (#211)
Co-authored-by: Lutz <[email protected]>
1 parent 6c139ca commit 9e56b11

File tree

5 files changed

+290
-50
lines changed

5 files changed

+290
-50
lines changed

Changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
**V1.13.12 - Updates**
2+
- Parallelize matrix_build.py for faster CI builds
23
- Added basic test mode that can be run via terminal connection.
34

45
**V1.13.11 - Updates**

matrix_build.py

Lines changed: 80 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@
44
import os
55
import shutil
66
import signal
7-
import subprocess
87
import click
8+
import sys
9+
from pathlib import Path
10+
from typing import List
911

1012
import tabulate
1113
from constraint import *
1214

15+
from matrix_build_parallel import Executor, execute, get_available_executor_idx, get_finished_executor_idx, \
16+
cleanup_tempdirs, create_executors, get_source_files_to_link, wait_for_executor_to_finish, copy_caches_to_executors
17+
1318
CONTINUE_ON_ERROR = False
1419

1520
BOARDS = [
@@ -280,43 +285,28 @@ def get_value(vb, vk):
280285
print(tabulate.tabulate(rows, tablefmt="grid", showindex=map(shorten, keys), colalign=("right",)))
281286

282287

283-
def generate_config_file(flag_values):
284-
content = "#pragma once\n\n"
285-
for key, value in flag_values.items():
286-
content += "#define {} {}\n".format(key, value)
287-
288-
with open("Configuration_local_matrix.hpp", 'w') as f:
289-
f.write(content)
290-
print("Generated local config")
291-
print("Path: {}".format(os.path.abspath(f.name)))
292-
print("Content:")
293-
print(content)
294-
295-
296-
def create_run_environment(flag_values):
297-
build_env = dict(os.environ)
298-
build_flags = " ".join(["-D{}={}".format(key, value) for key, value in flag_values.items()])
299-
build_env["PLATFORMIO_BUILD_FLAGS"] = build_flags
300-
return build_env
288+
def print_failed_executor(executor: Executor):
289+
print(f'Error for the following configuration ({executor.proj_dir}):', file=sys.stderr)
290+
print_solutions_matrix([executor.solution])
291+
configuration_path = Path(executor.proj_dir, 'Configuration_local_matrix.hpp')
292+
print(f'{configuration_path}:')
293+
with open(configuration_path, 'r') as fp:
294+
print(fp.read())
295+
out_bytes, err_bytes = executor.proc.communicate()
296+
if out_bytes:
297+
print(out_bytes.decode())
298+
if err_bytes:
299+
print(err_bytes.decode(), file=sys.stderr)
301300

302301

303-
def execute(board, flag_values, use_config_file=True):
304-
if use_config_file:
305-
build_env = dict(os.environ)
306-
build_env["PLATFORMIO_BUILD_FLAGS"] = "-DMATRIX_LOCAL_CONFIG=1"
307-
generate_config_file(flag_values)
308-
else:
309-
build_env = create_run_environment(flag_values)
310-
311-
proc = subprocess.Popen(
312-
"pio run -e {}".format(board),
313-
# stdout=subprocess.PIPE,
314-
# stderr=subprocess.PIPE,
315-
shell=True,
316-
env=build_env,
317-
)
318-
(stdout, stderr) = proc.communicate()
319-
return stdout, stdout, proc.returncode
302+
def run_solution_blocking(executor: Executor, solution: dict) -> int:
303+
executor.solution = copy.deepcopy(solution)
304+
board = solution.pop("BOARD")
305+
executor.proc = execute(executor.proj_dir, board, solution, jobs=os.cpu_count(), out_pipe=False)
306+
executor.proc.wait()
307+
if executor.proc.returncode != 0:
308+
print_failed_executor(executor)
309+
return executor.proc.returncode
320310

321311

322312
class GracefulKiller:
@@ -354,17 +344,60 @@ def solve(board):
354344
solutions = problem.getSolutions()
355345
print_solutions_matrix(solutions, short_strings=False)
356346

357-
print("Testing {} combinations".format(len(solutions)))
358-
359-
for num, solution in enumerate(solutions, start=1):
360-
print("[{}/{}] Building ...".format(num, len(solutions)), flush=True)
361-
print_solutions_matrix([solution])
362-
363-
board = solution.pop("BOARD")
364-
(o, e, c) = execute(board, solution)
365-
if c and not CONTINUE_ON_ERROR:
366-
exit(c)
367-
print(flush=True)
347+
total_solutions = len(solutions)
348+
print(f'Testing {total_solutions} combinations')
349+
350+
nproc = min(os.cpu_count(), len(solutions))
351+
352+
local_paths_to_link = get_source_files_to_link()
353+
executor_list: List[Executor] = create_executors(nproc, local_paths_to_link)
354+
355+
print('First run to fill cache')
356+
solution = solutions.pop()
357+
retcode = run_solution_blocking(executor_list[0], solution)
358+
if retcode != 0 and not CONTINUE_ON_ERROR:
359+
exit(retcode)
360+
361+
copy_caches_to_executors(executor_list[0].proj_dir, executor_list[1:])
362+
363+
solutions_built = 2 # We've already built one solution, and we're 1-indexing
364+
exit_early = False # Exit trigger
365+
while solutions:
366+
# First fill any open execution slots
367+
while get_available_executor_idx(executor_list) is not None:
368+
available_executor_idx = get_available_executor_idx(executor_list)
369+
executor = executor_list[available_executor_idx]
370+
try:
371+
solution = solutions.pop()
372+
except IndexError:
373+
# No more solutions to try!
374+
break
375+
print(f'[{solutions_built}/{total_solutions}] Building ...')
376+
executor.solution = copy.deepcopy(solution)
377+
board = solution.pop("BOARD")
378+
executor.proc = execute(executor.proj_dir, board, solution)
379+
solutions_built += 1
380+
381+
# Next wait for any processes to finish
382+
wait_for_executor_to_finish(executor_list)
383+
384+
# Go through all the finished processes and check their status
385+
while get_finished_executor_idx(executor_list) is not None:
386+
finished_executor_idx = get_finished_executor_idx(executor_list)
387+
executor = executor_list[finished_executor_idx]
388+
if executor.proc.returncode != 0:
389+
print_failed_executor(executor)
390+
if not CONTINUE_ON_ERROR:
391+
exit_early = True
392+
del executor.proc
393+
executor.proc = None
394+
395+
if exit_early:
396+
break
397+
if exit_early:
398+
exit(1)
399+
print('Done!')
400+
cleanup_tempdirs(executor_list)
368401

369402

370403
if __name__ == '__main__':

matrix_build_parallel.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
"""
2+
Module where all functionality that purely relates to how we parallelize matrix_build.py
3+
should live. It's not a perfect split of course, but it helps to separate the 'matrix'
4+
logic from the 'how we build' logic.
5+
"""
6+
import os
7+
import shutil
8+
import subprocess
9+
import tempfile
10+
import time
11+
from pathlib import Path
12+
from typing import Optional, List
13+
from dataclasses import dataclass
14+
15+
16+
@dataclass
17+
class Executor:
18+
"""
19+
Core data that defines a solution that is being built
20+
"""
21+
# The directory where we are building the solution
22+
proj_dir: Path
23+
# The solution dictionary
24+
solution: Optional[dict] = None
25+
# The process building the solution
26+
proc: Optional[subprocess.Popen] = None
27+
# Object that holds tempdir data, so that it can be cleaned up later
28+
tempdir_obj: Optional[tempfile.TemporaryDirectory] = None
29+
30+
31+
def generate_config_file(project_location: Path, flag_values: dict):
32+
content = "#pragma once\n\n"
33+
for key, value in flag_values.items():
34+
content += "#define {} {}\n".format(key, value)
35+
36+
with open(Path(project_location, "Configuration_local_matrix.hpp"), 'w') as f:
37+
f.write(content)
38+
f.flush()
39+
40+
41+
def execute(project_location: Path, board: str, flag_values: dict, jobs: int = 1, out_pipe=True) -> subprocess.Popen:
42+
"""
43+
Start up an executor that is building a solution
44+
:param project_location: The directory where to build the solution
45+
:param board: The board type (aka environment)
46+
:param flag_values: Dictionary of #defines to create a config file from
47+
:param jobs: How many jobs the build process should use
48+
:param out_pipe: If the executor's stdout/stderr should be pipes
49+
:return: Process object that is executing the solution
50+
"""
51+
build_env = dict(os.environ)
52+
build_env["PLATFORMIO_BUILD_FLAGS"] = "-DMATRIX_LOCAL_CONFIG=1"
53+
generate_config_file(project_location, flag_values)
54+
55+
proc = subprocess.Popen(
56+
['pio',
57+
'run',
58+
f'--project-dir={str(project_location.resolve())}',
59+
f'--environment={board}',
60+
f'--jobs={jobs}',
61+
],
62+
stdout=subprocess.PIPE if out_pipe else None,
63+
stderr=subprocess.PIPE if out_pipe else None,
64+
env=build_env,
65+
close_fds=True,
66+
)
67+
return proc
68+
69+
70+
def get_available_executor_idx(e_list: List[Executor]) -> Optional[int]:
71+
"""
72+
Get the index of an idle executor
73+
:param e_list: List of executors
74+
:return: Idle executor index, else None if all are busy
75+
"""
76+
for i, executor in enumerate(e_list):
77+
if executor.proc is None:
78+
return i
79+
return None
80+
81+
82+
def get_finished_executor_idx(e_list: List[Executor]) -> Optional[int]:
83+
"""
84+
Get the index of a finished executor
85+
:param e_list: List of executors
86+
:return: Finished executor index, else None if all are busy
87+
"""
88+
for i, executor in enumerate(e_list):
89+
if executor.proc is not None and executor.proc.poll() is not None:
90+
return i
91+
return None
92+
93+
94+
def cleanup_tempdirs(e_list: List[Executor]):
95+
"""
96+
Delete all the temporary directories that executors were using
97+
:param e_list: List of executors
98+
"""
99+
for executor in e_list:
100+
if executor.tempdir_obj is not None:
101+
tempdir_path = executor.tempdir_obj.name
102+
print(f'Deleting {tempdir_path}')
103+
shutil.rmtree(tempdir_path, ignore_errors=True)
104+
105+
106+
def create_executors(num_executors: int, local_paths_to_link: List[Path]) -> List[Executor]:
107+
"""
108+
Create a number of executors and their associated temporary directories, then
109+
soft-link all needed project files
110+
:param num_executors: Number of executors to create
111+
:param local_paths_to_link: List of files to soft-link into the executor projects
112+
:return: List of executors
113+
"""
114+
executor_list: List[Executor] = []
115+
print(f'Creating {num_executors} executors')
116+
for executor_idx in range(num_executors):
117+
tempdir = tempfile.TemporaryDirectory()
118+
temp_proj_path = Path(tempdir.name)
119+
for local_path in local_paths_to_link:
120+
temp_dst_path = Path(temp_proj_path, local_path).resolve()
121+
os.makedirs(temp_dst_path.parent, exist_ok=True)
122+
os.symlink(local_path.resolve(), temp_dst_path)
123+
executor_list.append(Executor(temp_proj_path, tempdir_obj=tempdir))
124+
print(f'{executor_idx} ', end='')
125+
print()
126+
return executor_list
127+
128+
129+
def copy_caches_to_executors(src_proj_dir: Path, dst_executors: List[Executor]):
130+
"""
131+
Copy cache directories from a source directory to a number of executor project directories
132+
:param src_proj_dir: Directory to copy from
133+
:param dst_executors: List of executors to copy to
134+
"""
135+
print('Copying caches to other executors')
136+
dir_names_to_copy = ['.pio', 'build_cache']
137+
for dir_name_to_copy in dir_names_to_copy:
138+
src_path = Path(src_proj_dir, dir_name_to_copy)
139+
for dst_executor in dst_executors:
140+
dst_path = Path(dst_executor.proj_dir, dir_name_to_copy)
141+
shutil.copytree(src_path, dst_path)
142+
143+
144+
def get_source_files_to_link() -> List[Path]:
145+
"""
146+
Create a list of the important files from the local project. I didn't want to
147+
use git here, since that might not pick up untracked (but needed) files.
148+
:return: List of source files that a project needs in order to compile
149+
"""
150+
local_proj_path = Path('.')
151+
venv_dirs = list(local_proj_path.glob('*venv*/'))
152+
# Don't link the .pio directory because the builds need to be independent
153+
pio_dirs = list(local_proj_path.glob('*.pio*/'))
154+
cmake_dirs = list(local_proj_path.glob('*cmake-build*/'))
155+
156+
local_dirs_to_not_link = [Path('.git/'), Path('build_cache/')] + venv_dirs + pio_dirs + cmake_dirs
157+
local_filenames_to_not_link = [
158+
Path('Configuration_local.hpp'),
159+
Path('Configuration_local_matrix.hpp'),
160+
]
161+
162+
local_paths_to_link = []
163+
for local_dir_str, local_subdirs, local_files in os.walk(local_proj_path):
164+
local_dir_path = Path(local_dir_str)
165+
dir_shouldnt_be_linked = any(d == local_dir_path or d in local_dir_path.parents for d in local_dirs_to_not_link)
166+
if dir_shouldnt_be_linked:
167+
continue
168+
for local_file in local_files:
169+
local_file_full_path = Path(local_dir_path, local_file)
170+
file_shouldnt_be_linked = any(local_file_full_path == f for f in local_filenames_to_not_link)
171+
if file_shouldnt_be_linked:
172+
continue
173+
local_paths_to_link.append(local_file_full_path)
174+
return local_paths_to_link
175+
176+
177+
def wait_for_executor_to_finish(executor_list: List[Executor], timeout=0.1, poll_time=0.2):
178+
"""
179+
Block until an executor has finished building
180+
:param executor_list: List of executors
181+
:param timeout: Time to communicate() with the running process (kind of a hack)
182+
:param poll_time: Time to wait before checking all executors again
183+
"""
184+
while get_finished_executor_idx(executor_list) is None:
185+
for e in executor_list:
186+
if e.proc is not None and e.proc.poll() is None:
187+
# Communicate with the running processes to stop them from blocking
188+
# (i.e. they spew too much output)
189+
try:
190+
_ = e.proc.communicate(timeout=timeout)
191+
except subprocess.TimeoutExpired:
192+
pass # This is expected and what should happen
193+
time.sleep(poll_time)

post_script_remove_patched_files.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import os
44
import tempfile
5+
from pathlib import Path
56

67

78
def cprint(*args, **kwargs):
@@ -13,7 +14,13 @@ def clean_up_patched_files(*_, **__):
1314
Removes all temporary patched files created previously in the build process
1415
"""
1516
# patch_path_key needs to be kept in sync with pre_script_patch_debug.py
16-
patch_path_key = '_patched_'
17+
# We put the current directory name in the key so that we only remove
18+
# patched files that we know were built by the current build process.
19+
# This is only useful in safeguarding against multiple builds being done in
20+
# different directories at the same time. (i.e. we don't want to remove another
21+
# processes' files while they are still in use)
22+
project_dir_name = Path.cwd().name
23+
patch_path_key = f'_{project_dir_name}_patched_'
1724
tempdir_path = tempfile.gettempdir()
1825
cprint(f'Temp file dir is {tempdir_path}')
1926
patched_filepaths = []
@@ -23,7 +30,11 @@ def clean_up_patched_files(*_, **__):
2330
patched_filepaths.append(full_filepath)
2431
for patched_filepath in patched_filepaths:
2532
cprint(f'Removing {patched_filepath}')
26-
os.remove(patched_filepath)
33+
try:
34+
os.remove(patched_filepath)
35+
pass
36+
except FileNotFoundError:
37+
cprint('Not found (deleted already?)')
2738

2839

2940
env.AddPostAction('buildprog', clean_up_patched_files)

0 commit comments

Comments
 (0)