Skip to content

Commit

Permalink
fix(flow): pea/pod start on the Flow level should not block (#1902)
Browse files Browse the repository at this point in the history
* fix(flow): pea/pod start on the Flow level should not block
  • Loading branch information
hanxiao authored Feb 8, 2021
1 parent 6dae7b5 commit 7a4d894
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 23 deletions.
6 changes: 5 additions & 1 deletion jina/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,13 @@ def start(self):
for k, v in self._env.items():
os.environ[k] = str(v)

for k, v in self:
v.args.noblock_on_start = True
self.enter_context(v)

for k, v in self:
try:
self.enter_context(v)
v.wait_start_success()
except Exception as ex:
self.logger.error(f'{k}:{v!r} can not be started due to {ex!r}, Flow is aborted')
self.close()
Expand Down
5 changes: 4 additions & 1 deletion jina/logging/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,15 @@ def _enter_msg(self):
print(self.task_name, end=' ...\t', flush=True)

def __exit__(self, typ, value, traceback):
self.duration = time.perf_counter() - self.start
self.duration = self.now()

self.readable_duration = get_readable_time(seconds=self.duration)

self._exit_msg()

def now(self) -> float:
return time.perf_counter() - self.start

def _exit_msg(self):
if self._logger:
self._logger.info(f'{self.task_name} takes {self.readable_duration} ({self.duration:.2f}s)')
Expand Down
5 changes: 5 additions & 0 deletions jina/parsers/peapods/pea.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,8 @@ def mixin_pea_parser(parser):
gp.add_argument('--pea-role', type=PeaRoleType.from_string, choices=list(PeaRoleType),
default=PeaRoleType.SINGLETON,
help='The role of this Pea in a Pod' if _SHOW_ALL_ARGS else argparse.SUPPRESS)

gp.add_argument('--noblock-on-start', action='store_true', default=False,
help='If set, starting a Pea/Pod does not block the thread/process. It then relies on '
'`wait_start_success` at outer function for the postpone check.'
if _SHOW_ALL_ARGS else argparse.SUPPRESS)
13 changes: 10 additions & 3 deletions jina/peapods/peas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,21 @@ def start(self):
"""

super().start() #: required here to call process/thread method
if not self.args.noblock_on_start:
self.wait_start_success()

return self

def wait_start_success(self):
"""Block until all peas starts successfully.
If not success, it will raise an error hoping the outer function to catch it
"""
_timeout = self.args.timeout_ready
if _timeout <= 0:
_timeout = None
else:
_timeout /= 1e3

if self.ready_or_shutdown.wait(_timeout):
if self.is_shutdown.is_set():
# return too early and the shutdown is set, means something fails!!
Expand All @@ -124,8 +133,6 @@ def start(self):
raise TimeoutError(
f'{typename(self)}:{self.name} can not be initialized after {_timeout * 1e3}ms')

return self

def close(self) -> None:
""" Close the Pea
Expand Down
32 changes: 26 additions & 6 deletions jina/peapods/pods/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self, args: Union['argparse.Namespace', Dict], needs: Set[str] = No
"""
super().__init__()
self.args = args
BasePod._set_conditional_args(self.args)
self._set_conditional_args(self.args)
self.needs = needs if needs else set() #: used in the :class:`jina.flow.Flow` to build the graph

self.peas = [] # type: List['BasePea']
Expand All @@ -38,9 +38,6 @@ def __init__(self, args: Union['argparse.Namespace', Dict], needs: Set[str] = No
else:
self.peas_args = self._parse_args(args)

for a in self.all_args:
BasePod._set_conditional_args(a)

@property
def role(self) -> 'PodRoleType':
"""Return the role of this :class:`BasePod`."""
Expand Down Expand Up @@ -195,13 +192,36 @@ def start(self) -> 'BasePod':
If one of the :class:`BasePea` fails to start, make sure that all of them
are properly closed.
"""
try:
if getattr(self.args, 'noblock_on_start', False):
for _args in self.all_args:
_args.noblock_on_start = True
self._enter_pea(BasePea(_args))
# now rely on higher level to call `wait_start_success`
return self
else:
try:
for _args in self.all_args:
self._enter_pea(BasePea(_args))
except:
self.close()
raise
return self

def wait_start_success(self) -> None:
"""Block until all peas starts successfully.
If not success, it will raise an error hoping the outer function to catch it
"""

if not self.args.noblock_on_start:
raise ValueError(f'{self.wait_start_success!r} should only be called when `noblock_on_start` is set to True')

try:
for p in self.peas:
p.wait_start_success()
except:
self.close()
raise
return self

def _enter_pea(self, pea: 'BasePea') -> None:
self.peas.append(pea)
Expand Down
3 changes: 2 additions & 1 deletion jina/peapods/runtimes/jinad/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def __init__(self, args: 'argparse.Namespace'):
self.timeout_ctrl = args.timeout_ctrl
self.host = args.host
self.port_expose = args.port_expose
self.api = PeaDaemonClient(host=self.host, port=self.port_expose, logger=self.logger)
self.api = PeaDaemonClient(host=self.host, port=self.port_expose, logger=self.logger,
timeout=self.args.timeout_ready)

def setup(self):
"""
Expand Down
1 change: 1 addition & 0 deletions jina/peapods/runtimes/jinad/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def _mask_args(self, args: 'argparse.Namespace'):

_args.log_config = '' # do not use local log_config
_args.upload_files = [] # reset upload files
_args.noblock_on_start = False # wait until start success

changes = []
for k, v in vars(_args).items():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import time
from typing import Any


from jina.executors.decorators import as_ndarray
from jina.executors.encoders import BaseEncoder


class DelayedExecutor(BaseEncoder):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = 'delayed-executor'

def post_init(self):
self.logger.info('sleeping for 8 secs')
time.sleep(8)

def encode(self, data: Any, *args, **kwargs) -> Any:
return data
@as_ndarray
def encode(self, data, *args, **kwargs):
return [[1, 2]] * len(data)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from jina import Flow
from tests import random_docs

cur_dir = os.path.dirname(os.path.abspath(__file__))

Expand Down Expand Up @@ -145,15 +146,15 @@ def test_l_r_l_with_upload(silent_log, parallels, docker_image, mocker):
response_mock.assert_called()


@pytest.mark.parametrize('parallels', [1, 2])
@pytest.mark.parametrize('parallels', [2])
def test_create_pea_timeout(parallels):
f = (Flow()
.add()
.add(uses='delayed_executor.yml',
host=CLOUD_HOST,
parallel=parallels,
upload_files=['delayed_executor.py'],
timeout_ready=10000)
timeout_ready=20000)
.add())
with f:
f.index_ndarray(['abc'])
f.index(random_docs(10))
38 changes: 38 additions & 0 deletions tests/unit/flow/test_flow_start_noblock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import time

import pytest

from jina.excepts import RuntimeFailToStart
from jina.executors import BaseExecutor
from jina.flow import Flow
from jina.logging.profile import TimeContext


class SlowExecutor(BaseExecutor):

def post_init(self):
time.sleep(4)


def test_flow_slow_executor_intra():
f = Flow().add(uses='SlowExecutor', parallel=2)

with f, TimeContext('start flow') as tc:
assert tc.now() < 8


def test_flow_slow_executor_inter():
f = (Flow().add(uses='SlowExecutor', parallel=3)
.add(uses='SlowExecutor', parallel=3))

with f, TimeContext('start flow') as tc:
assert tc.now() < 8


def test_flow_slow_executor_bad_fail_early():
f = (Flow().add(uses='SlowExecutor', parallel=3)
.add(uses='BADNAME_EXECUTOR', parallel=3))

with pytest.raises(RuntimeFailToStart):
with f, TimeContext('start flow') as tc:
assert tc.now() < 8

0 comments on commit 7a4d894

Please sign in to comment.