Skip to content

Commit

Permalink
fix: use list instead of dictionary for peas-hosts (#2242)
Browse files Browse the repository at this point in the history
* fix(parser): adding type to pea hosts

* fix: use list instead of dictionary for peas-hosts

* fix: fix black

* fix: reformat to black standards

* fix: fix the distributed tests

* fix: fix the jinad tests

* fix: fix the arg type issue

Co-authored-by: Deepankar Mahapatro <[email protected]>
  • Loading branch information
nan-wang and deepankarm authored Mar 29, 2021
1 parent d184070 commit 2ad1a5e
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 19 deletions.
15 changes: 6 additions & 9 deletions jina/parsers/peapods/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def mixin_base_pod_parser(parser):
choices=list(PollingType),
default=PollingType.ANY,
help='''
The polling strategy of the Pod (when `parallel>1`)
The polling strategy of the Pod (when `parallel>1`)
- ANY: only one (whoever is idle) Pea polls the message
- ALL: all Peas poll the message (like a broadcast)
''',
Expand All @@ -64,12 +64,9 @@ def mixin_base_pod_parser(parser):
)
gp.add_argument(
'--peas-hosts',
action=KVAppendAction,
metavar='KEY: VALUE',
nargs='*',
help='''The hosts of the peas when parallel greater than 1,
pea have a new host address if the pea_id present in the map.
otherwise pea host will be identical to the host of pod.
Represented as a key value pair in argument.
key is the pea_id, and value is the host address.''',
nargs='+',
type=str,
help='''The hosts of the peas when parallel greater than 1.
Peas will be evenly distributed among the hosts. By default,
peas are running in the same host as the pod.''',
)
12 changes: 10 additions & 2 deletions jina/peapods/pods/helper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
from argparse import Namespace
from typing import List, Optional
from itertools import cycle

from ... import __default_host__
from ...enums import SchedulerType, SocketType, PeaRoleType
Expand All @@ -12,16 +13,23 @@ def _set_peas_args(
args: Namespace, head_args: Optional[Namespace] = None, tail_args: Namespace = None
) -> List[Namespace]:
result = []
_host_list = (
args.peas_hosts
if args.peas_hosts
else [
args.host,
]
)

for idx in range(args.parallel):
for idx, pea_host in zip(range(args.parallel), cycle(_host_list)):
_args = copy.deepcopy(args)

if args.parallel > 1:
_args.pea_id = idx + 1 #: if it is parallel, then pea_id is 1-indexed
_args.pea_role = PeaRoleType.PARALLEL
_args.identity = random_identity()
if _args.peas_hosts:
_args.host = _args.peas_hosts.get(str(_args.pea_id), args.host)
_args.host = pea_host
if _args.name:
_args.name += f'/{_args.pea_id}'
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pods:
polling: all
host: $JINA_INDEXER_HOST
peas_hosts:
1: $JINA_ENCODER_HOST
- $JINA_ENCODER_HOST
port_expose: 8000
- name: slice
uses: slice.yml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ pods:
parallel: 3
host: $JINA_POD1_HOST
peas_hosts:
1: $JINA_POD2_HOST
- $JINA_POD2_HOST
port_expose: 8000
- name: pod2
uses: _pass
parallel: 3
host: $JINA_POD2_HOST
peas_hosts:
1: $JINA_POD1_HOST
- $JINA_POD1_HOST
port_expose: 8000
10 changes: 5 additions & 5 deletions tests/unit/peapods/pods/test_pods.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def test_pod_args_remove_uses_ba():

def test_pod_remote_pea_without_parallel():
args = set_pod_parser().parse_args(
['--peas-hosts', '1: 0.0.0.1', '--parallel', str(1)]
['--peas-hosts', '0.0.0.1', '--parallel', str(1)]
)
with Pod(args) as pod:
peas = pod.peas
Expand All @@ -196,7 +196,7 @@ def test_pod_remote_pea_parallel_pea_host_set_partially(
expected_host_out,
):
args = set_pod_parser().parse_args(
['--peas-hosts', f'1: {pea1_host}', '--parallel', str(2), '--host', pod_host]
['--peas-hosts', f'{pea1_host}', '--parallel', str(2), '--host', pod_host]
)
assert args.host == pod_host
pod = Pod(args)
Expand All @@ -205,7 +205,7 @@ def test_pod_remote_pea_parallel_pea_host_set_partially(
assert v.host == args.host
else:
for pea_arg in v:
if pea_arg.pea_id == 1:
if pea_arg.pea_id in (1, 2):
assert pea_arg.host == pea1_host
assert pea_arg.host_in == expected_host_in
assert pea_arg.host_out == expected_host_out
Expand All @@ -232,8 +232,8 @@ def test_pod_remote_pea_parallel_pea_host_set_completely(
args = set_pod_parser().parse_args(
[
'--peas-hosts',
f'1: {peas_hosts[0]}',
f'2: {peas_hosts[1]}',
f'{peas_hosts[0]}',
f'{peas_hosts[1]}',
'--parallel',
str(2),
'--host',
Expand Down

0 comments on commit 2ad1a5e

Please sign in to comment.