Skip to content

Commit 0c27212

Browse files
authored
Merge pull request #267 from Pyomo/PR265
Pr265
2 parents cb62806 + 92bf5f7 commit 0c27212

19 files changed

+89
-156
lines changed

.github/workflows/pull_push_regression.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
- name: Install dependencies
2626
run: |
2727
conda install mpi4py pandas setuptools
28-
pip install pyomo sphinx sphinx_rtd_theme cplex
28+
pip install pyomo sphinx sphinx_rtd_theme dill gridx-egret cplex
2929
pip install xpress
3030
3131
- name: setup the program
@@ -41,6 +41,11 @@ jobs:
4141
cd examples
4242
python afew.py xpress_persistent
4343
44+
- name: Test run_all nouc
45+
run: |
46+
cd examples
47+
python run_all.py xpress_persistent "" nouc
48+
4449
- name: Test docs
4550
run: |
4651
cd ./doc/src/

doc/src/spokes.rst

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -121,25 +121,3 @@ cross scenario
121121
^^^^^^^^^^^^^^
122122

123123
Passes cross scenario cuts.
124-
125-
126-
spoke_sleep_time
127-
----------------
128-
129-
This is an advanced topic and rarely encountered.
130-
In some settings, particularly with small sub-problems, it is possible for
131-
ranks within spokes to become of of sync. The most common manifestation of this
132-
is that some ranks do not see the kill signal and sit in a busy-wait I/O loop
133-
until something external kills them; but it can also be the case that Lagrangian
134-
bound spokes start operating on data from different hub iterations; they should notice
135-
this an emit a message if it happens.
136-
137-
This problem is normally avoided by default actions in lower level code (in `spcommunicator.py`)
138-
that insert a short sleep. To compute the sleep duration, it uses a heuristic based on the
139-
number of non-anticipative variables. It is also possible to explicitly set this sleep time.
140-
At the lowest levels, this is done by setting a value for "spoke_sleep_time" in the options
141-
dictionary passed to the ``SPCommunicator`` constructor. At a higher level, it is possible
142-
to pass a `spoke_sleep_time` keyword argument to the vanilla hub and spoke constructors. This
143-
is illustrated in `hydro_cylinders.py` example (in the `hyrdo` example directory). You
144-
should probably pass the same value to all constructors. The importance of setting
145-
the spoke sleep time is going down as we improve the code for setting this value automatically.

examples/farmer/farmer_mmw.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ mpiexec -np 3 python -m mpi4py farmer_cylinders.py --num-scens 3 --lagrangian -
1010

1111
echo "starting mmw"
1212

13-
python -m mpisppy.confidence_intervals.mmw_conf farmer --xhatpath farmer_cyl_nonants.npy --solver-name ${SOLVERNAME} --MMW-num-batches 5 --MMW-batch-size 10 --confidence-level 0.9 --start-scen 10
13+
python -m mpisppy.confidence_intervals.mmw_conf farmer --xhatpath farmer_cyl_nonants.npy --EF-solver-name ${SOLVERNAME} --MMW-num-batches 5 --MMW-batch-size 10 --confidence-level 0.9 --start-scen 10

examples/hydro/hydro_cylinders.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,6 @@
1313

1414
import mpisppy.cylinders as cylinders
1515

16-
# For this problem, the subproblems are
17-
# small and take no time to solve. The
18-
# default SPOKE_SLEEP_TIME of 0.01 *causes*
19-
# synchronization issues in this case, so
20-
# we reduce it so as not to dominate the
21-
# time spent for cylinder synchronization
22-
SPOKE_SLEEP_TIME = 0.0001
23-
2416
write_solution = True
2517

2618
def _parse_args():
@@ -74,15 +66,15 @@ def main():
7466
ph_extensions=None,
7567
rho_setter = rho_setter,
7668
all_nodenames = all_nodenames,
77-
spoke_sleep_time = SPOKE_SLEEP_TIME)
69+
)
7870

7971
# Standard Lagrangian bound spoke
8072
if lagrangian:
8173
lagrangian_spoke = vanilla.lagrangian_spoke(*beans,
8274
scenario_creator_kwargs=scenario_creator_kwargs,
8375
rho_setter = rho_setter,
8476
all_nodenames = all_nodenames,
85-
spoke_sleep_time = SPOKE_SLEEP_TIME)
77+
)
8678

8779

8880
# xhat looper bound spoke
@@ -91,7 +83,7 @@ def main():
9183
xhatshuffle_spoke = vanilla.xhatshuffle_spoke(*beans,
9284
all_nodenames=all_nodenames,
9385
scenario_creator_kwargs=scenario_creator_kwargs,
94-
spoke_sleep_time = SPOKE_SLEEP_TIME)
86+
)
9587

9688
list_of_spoke_dict = list()
9789
if lagrangian:

examples/run_all.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ def do_one_mmw(dirname, runefstring, npyfile, mmwargstring):
139139
badguys[dirname].append(runefstring)
140140
# run mmw, remove .npy file
141141
else:
142-
runstring = "python -m mpisppy.confidence_intervals.mmw_conf {} --xhatpath {} --solver-name {} {}".\
143-
format(dirname, npyfile, solver_name, mmwargstring)
142+
runstring = "python -m mpisppy.confidence_intervals.mmw_conf {} --xhatpath {} {}".\
143+
format(dirname, npyfile, mmwargstring)
144144
code = os.system("echo {} && {}".format(runstring, runstring))
145145
if code != 0:
146146
if dirname not in badguys:
@@ -151,7 +151,6 @@ def do_one_mmw(dirname, runefstring, npyfile, mmwargstring):
151151
os.remove(npyfile)
152152
os.chdir("..")
153153

154-
155154
do_one("farmer", "farmer_ef.py", 1,
156155
"1 3 {}".format(solver_name))
157156
# for farmer_cylinders, the first arg is num_scens and is required
@@ -261,6 +260,7 @@ def do_one_mmw(dirname, runefstring, npyfile, mmwargstring):
261260
"--instance-name=sslp_15_45_10 --bundles-per-rank=2 "
262261
"--max-iterations=5 --default-rho=1 "
263262
"--lagrangian --xhatshuffle --fwph "
263+
"--linearize-proximal-terms "
264264
"--solver-name={} --fwph-stop-check-tol 0.01".format(solver_name))
265265

266266
do_one("hydro", "hydro_cylinders.py", 3,
@@ -306,7 +306,7 @@ def do_one_mmw(dirname, runefstring, npyfile, mmwargstring):
306306

307307
#=========MMW TESTS==========
308308
# do_one_mmw is special
309-
do_one_mmw("farmer", f"python farmer_ef.py 3 3 {solver_name}", "farmer_cyl_nonants.npy", "--MMW-num-batches=5 --confidence-level 0.95 --MMW-batch-size=10 --objective-gap --start-scen 4 --EF-solver-name={solver_name}")
309+
do_one_mmw("farmer", f"python farmer_ef.py 3 3 {solver_name}", "farmer_cyl_nonants.npy", f"--MMW-num-batches=5 --confidence-level 0.95 --MMW-batch-size=10 --objective-gap --start-scen 4 --EF-solver-name={solver_name}")
310310

311311

312312
#============================
@@ -386,5 +386,6 @@ def do_one_mmw(dirname, runefstring, npyfile, mmwargstring):
386386
print("Directory={}".format(i))
387387
for c in v:
388388
print(" {}".format(c))
389+
sys.exit(1)
389390
else:
390391
print("\nAll OK.")

examples/uc/uc_ama.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,23 @@
1111
import mpisppy.utils.amalgamator as amalgamator
1212
from uc_funcs import id_fix_list_fct
1313
from mpisppy.utils import config
14+
import pyomo.common.config as pyofig
1415

1516
def main():
1617
solution_files = {"first_stage_solution":"uc_first_stage.csv",
1718
#"tree_solution":"uc_ama_full_solution"
1819
#It takes too long to right the full solution
1920
}
20-
config.add_and_assign("id_fix_list_fct", "fct used by fixer extension",
21+
cfg = config.Config()
22+
cfg.add_and_assign("id_fix_list_fct", "fct used by fixer extension",
2123
domain=None, default=None,
2224
value = id_fix_list_fct)
23-
ama_options = {"2stage": True, # 2stage vs. mstage
24-
"cylinders": ['ph','xhatshuffle','lagranger'],
25-
"extensions": ['fixer'],
26-
"write_solution": solution_files
27-
}
28-
ama = amalgamator.from_module("uc_funcs", ama_options)
25+
cfg.add_and_assign("2stage", description="2stage vsus mstage", domain=bool, default=None, value=True)
26+
cfg.add_and_assign("cylinders", description="list of cylinders", domain=pyofig.ListOf(str), default=None, value=['ph','xhatshuffle','lagranger'])
27+
cfg.add_and_assign("extensions", description="list of extensions", domain=pyofig.ListOf(str), default=None, value= ['fixer'])
28+
cfg.add_and_assign("write_solution", description="list of extensions", domain=None, default=None, value=solution_files)
29+
30+
ama = amalgamator.from_module("uc_funcs", cfg)
2931
ama.run()
3032
if ama.on_hub:
3133
print("first_stage_solution=", ama.first_stage_solution)

examples/uc/uc_funcs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,10 +276,10 @@ def scenario_names_creator(scnt,start=0):
276276
return [F"Scenario{i+1}" for i in range(start,scnt+start)]
277277

278278
#=========
279-
def inparser_adder():
279+
def inparser_adder(cfg):
280280
# (only for Amalgamator): add command options unique to uc
281-
config.num_scens_required()
282-
config.add_to_config("UC_count_for_path",
281+
cfg.num_scens_required()
282+
cfg.add_to_config("UC_count_for_path",
283283
description="Mainly for confidence intervals to give a prefix for the directory providing the scenario data but will be overridden if scen_count is greater (default 0)",
284284
domain=int,
285285
default=0)

mpisppy/confidence_intervals/multi_seqsampling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def run(self, maxit=200):
7979
xhat_scenario_names = refmodel.scenario_names_creator(mk)
8080

8181
xgo = self.xhat_gen_kwargs.copy()
82-
xgo["solvername"] = self.cfg.solvername
82+
xgo["solvername"] = self.solvername
8383
xgo.pop("solver_options", None) # it will be given explicitly
8484
xgo.pop("scenario_names", None) # it will be given explicitly
8585
xgo["branching_factors"] = xhat_branching_factors

mpisppy/cylinders/cross_scen_spoke.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ def _got_kill_signal(self):
4040
''' returns True if a kill signal was received,
4141
and refreshes the array and _locals'''
4242
self._new_locals = self.spoke_from_hub(self._locals)
43-
kill = (self._locals[-1] == -1)
44-
return kill
43+
return self.remote_write_id == -1
4544

4645
def prep_cs_cuts(self):
4746
# create a map scenario -> index, this index is used for various lists containing scenario dependent info.

mpisppy/cylinders/hub.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,8 @@ def hub_to_spoke(self, values, spoke_strata_rank):
346346
f"Attempting to put array of length {len(values)} "
347347
f"into local buffer of length {expected_length}"
348348
)
349+
# this is so the spoke ranks all get the same write_id at approximately the same time
350+
self.cylinder_comm.Barrier()
349351
self.local_write_ids[spoke_strata_rank - 1] += 1
350352
values[-1] = self.local_write_ids[spoke_strata_rank - 1]
351353
window = self.windows[spoke_strata_rank - 1]
@@ -366,13 +368,26 @@ def hub_from_spoke(self, values, spoke_num):
366368
f"Hub trying to get buffer of length {expected_length} "
367369
f"from spoke, but provided buffer has length {len(values)}."
368370
)
371+
# so the window in each rank gets read at approximately the same time,
372+
# and so has the same write_id
373+
self.cylinder_comm.Barrier()
369374
window = self.windows[spoke_num - 1]
370375
window.Lock(spoke_num)
371376
window.Get((values, len(values), MPI.DOUBLE), spoke_num)
372377
window.Unlock(spoke_num)
373378

374-
if values[-1] > self.remote_write_ids[spoke_num - 1]:
375-
self.remote_write_ids[spoke_num - 1] = values[-1]
379+
new_id = int(values[-1])
380+
local_val = np.array((new_id,), 'i')
381+
sum_ids = np.zeros(1, 'i')
382+
self.cylinder_comm.Allreduce((local_val, MPI.INT),
383+
(sum_ids, MPI.INT),
384+
op=MPI.SUM)
385+
386+
if new_id != sum_ids[0] / self.cylinder_comm.size:
387+
return False
388+
389+
if (new_id > self.remote_write_ids[spoke_num - 1]) or (new_id < 0):
390+
self.remote_write_ids[spoke_num - 1] = new_id
376391
return True
377392
return False
378393

0 commit comments

Comments
 (0)