-
Notifications
You must be signed in to change notification settings - Fork 111
/
09-task-pull.py
executable file
·77 lines (65 loc) · 2.63 KB
/
09-task-pull.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env python
"""Demonstrate the task-pull paradigm for high-throughput computing
using mpi4py. Task pull is an efficient way to perform a large number of
independent tasks when there are more tasks than processors, especially
when the run times vary for each task.
This code is over-commented for instructional purposes.
This example was contributed by Craig Finch ([email protected]).
Inspired by http://math.acadiau.ca/ACMMaC/Rmpi/index.html
"""
from __future__ import print_function
from mpi4py import MPI
def enum(*sequential, **named):
"""Handy way to fake an enumerated type in Python
http://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python
"""
enums = dict(zip(sequential, range(len(sequential))), **named)
return type('Enum', (), enums)
# Define MPI message tags
tags = enum('READY', 'DONE', 'EXIT', 'START')
# Initializations and preliminaries
comm = MPI.COMM_WORLD # get MPI communicator object
size = comm.size # total number of processes
rank = comm.rank # rank of this process
status = MPI.Status() # get MPI status object
if rank == 0:
# Master process executes code below
tasks = range(2*size)
task_index = 0
num_workers = size - 1
closed_workers = 0
print("Master starting with %d workers" % num_workers)
while closed_workers < num_workers:
data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
source = status.Get_source()
tag = status.Get_tag()
if tag == tags.READY:
# Worker is ready, so send it a task
if task_index < len(tasks):
comm.send(tasks[task_index], dest=source, tag=tags.START)
print("Sending task %d to worker %d" % (task_index, source))
task_index += 1
else:
comm.send(None, dest=source, tag=tags.EXIT)
elif tag == tags.DONE:
results = data
print("Got data from worker %d" % source)
elif tag == tags.EXIT:
print("Worker %d exited." % source)
closed_workers += 1
print("Master finishing")
else:
# Worker processes execute code below
name = MPI.Get_processor_name()
print("I am a worker with rank %d on %s." % (rank, name))
while True:
comm.send(None, dest=0, tag=tags.READY)
task = comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
tag = status.Get_tag()
if tag == tags.START:
# Do the work here
result = task**2
comm.send(result, dest=0, tag=tags.DONE)
elif tag == tags.EXIT:
break
comm.send(None, dest=0, tag=tags.EXIT)