Skip to content

Commit

Permalink
fix(flow): show client progress bar when using in the flow (#2722)
Browse files Browse the repository at this point in the history
* fix(flow): show client progress bar when using in the flow

* fix(flow): show client progress bar when using in the flow
  • Loading branch information
hanxiao authored Jun 21, 2021
1 parent 80dbd82 commit ed894f3
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 93 deletions.
30 changes: 14 additions & 16 deletions cli/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ def _gaa(key, parser):
'--host-out',
'--socket-in',
'--socket-out',
'--dynamic-out-routing',
'--dynamic-in-routing',
'--memory-hwm',
'--on-error-strategy',
'--num-part',
'--dynamic-routing',
'--dynamic-routing-out',
'--dynamic-routing-in',
'--uses-internal',
'--entrypoint',
'--docker-kwargs',
Expand Down Expand Up @@ -130,12 +131,10 @@ def _gaa(key, parser):
'--shards',
'--replicas',
'--polling',
'--dynamic-routing',
'--no-dynamic-routing',
'--scheduling',
'--external',
'--pod-role',
'--peas-hosts',
'--pod-role',
],
'pod': [
'--help',
Expand All @@ -159,11 +158,12 @@ def _gaa(key, parser):
'--host-out',
'--socket-in',
'--socket-out',
'--dynamic-out-routing',
'--dynamic-in-routing',
'--memory-hwm',
'--on-error-strategy',
'--num-part',
'--dynamic-routing',
'--dynamic-routing-out',
'--dynamic-routing-in',
'--uses-internal',
'--entrypoint',
'--docker-kwargs',
Expand Down Expand Up @@ -191,12 +191,10 @@ def _gaa(key, parser):
'--shards',
'--replicas',
'--polling',
'--dynamic-routing',
'--no-dynamic-routing',
'--scheduling',
'--external',
'--pod-role',
'--peas-hosts',
'--pod-role',
],
'flow': [
'--help',
Expand Down Expand Up @@ -232,11 +230,12 @@ def _gaa(key, parser):
'--host-out',
'--socket-in',
'--socket-out',
'--dynamic-out-routing',
'--dynamic-in-routing',
'--memory-hwm',
'--on-error-strategy',
'--num-part',
'--dynamic-routing',
'--dynamic-routing-out',
'--dynamic-routing-in',
'--prefetch',
'--prefetch-on-recv',
'--title',
Expand Down Expand Up @@ -267,8 +266,6 @@ def _gaa(key, parser):
'--pea-role',
'--noblock-on-start',
'--routing-graph',
'--dynamic-routing',
'--no-dynamic-routing',
],
'hub push': ['--help', '--force', '--secret', '--public', '--private'],
'hub': ['--help', 'push'],
Expand All @@ -294,11 +291,12 @@ def _gaa(key, parser):
'--host-out',
'--socket-in',
'--socket-out',
'--dynamic-out-routing',
'--dynamic-in-routing',
'--memory-hwm',
'--on-error-strategy',
'--num-part',
'--dynamic-routing',
'--dynamic-routing-out',
'--dynamic-routing-in',
'--uses-internal',
'--entrypoint',
'--docker-kwargs',
Expand Down
3 changes: 2 additions & 1 deletion jina/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ def add(
:param entrypoint: The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.
:param env: The map of environment variables that are available inside runtime
:param expose_public: If set, expose the public IP address to remote when necessary, by default it exposesprivate IP address, which only allows accessing under the same network/subnet. Important to set this to true when the Pea will receive input connections from remote Peas
:param external: The Pod will be considered an external Pod that has been started independently from the Flow. This Pod will not be context managed by the Flow.
:param external: The Pod will be considered an external Pod that has been started independently from the Flow.This Pod will not be context managed by the Flow.
:param host: The host address of the runtime, by default it is 0.0.0.0.
:param host_in: The host address for input, by default it is 0.0.0.0
:param host_out: The host address for output, by default it is 0.0.0.0
Expand Down Expand Up @@ -1038,6 +1038,7 @@ def client(self) -> 'BaseClient':
host=self.host,
port_expose=self.port_expose,
protocol=self.protocol,
show_progress=True,
)
kwargs.update(self._common_kwargs)
return Client(**kwargs)
Expand Down
13 changes: 2 additions & 11 deletions jina/logging/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,9 @@ def used_memory(unit: int = 1024 * 1024 * 1024) -> float:
:param unit: Unit of the memory, default in Gigabytes.
:return: Memory usage of the current process.
"""
with ImportExtensions(required=False):
import resource
import resource

return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / unit

from .predefined import default_logger

default_logger.error(
'module "resource" can not be found and you are likely running it on Windows, '
'i will return 0'
)
return 0
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / unit


def used_memory_readable() -> str:
Expand Down
17 changes: 3 additions & 14 deletions jina/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from jina.parsers.client import mixin_comm_protocol_parser
from .helper import _SHOW_ALL_ARGS
import argparse


def set_pea_parser(parser=None):
Expand Down Expand Up @@ -97,21 +99,8 @@ def set_gateway_parser(parser=None):
parser.add_argument(
'--routing-graph',
default=None,
help='Routing graph for the gateway',
help='Routing graph for the gateway' if _SHOW_ALL_ARGS else argparse.SUPPRESS,
)
parser.add_argument(
'--dynamic-routing',
action='store_true',
dest='dynamic_routing',
help='Tells if the gateway should incoming and outgoing traffic as dynamic routing.',
)
parser.add_argument(
'--no-dynamic-routing',
action='store_false',
dest='dynamic_routing',
help='The Gateway should not do dynamic routing.',
)
parser.set_defaults(dynamic_routing=True)

return parser

Expand Down
30 changes: 9 additions & 21 deletions jina/parsers/peapods/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,6 @@ def mixin_base_pod_parser(parser):
''',
)

parser.add_argument(
'--dynamic-routing',
action='store_true',
dest='dynamic_routing',
help='The Pod will setup the socket types of the HeadPea and TailPea depending on this argument.',
)
parser.add_argument(
'--no-dynamic-routing',
action='store_false',
dest='dynamic_routing',
help='The Gateway should not do dynamic routing.',
)
parser.set_defaults(dynamic_routing=True)
gp.add_argument(
'--scheduling',
type=SchedulerType.from_string,
Expand All @@ -82,6 +69,15 @@ def mixin_base_pod_parser(parser):
'This Pod will not be context managed by the Flow.',
)

gp.add_argument(
'--peas-hosts',
nargs='+',
type=str,
help='''The hosts of the peas when parallel greater than 1.
Peas will be evenly distributed among the hosts. By default,
peas are running on host provided by the argument ``host``''',
)

# hidden CLI used for internal only

gp.add_argument(
Expand All @@ -92,11 +88,3 @@ def mixin_base_pod_parser(parser):
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)
gp.add_argument(
'--peas-hosts',
nargs='+',
type=str,
help='''The hosts of the peas when parallel greater than 1.
Peas will be evenly distributed among the hosts. By default,
peas are running on host provided by the argument ``host``''',
)
40 changes: 28 additions & 12 deletions jina/parsers/peapods/runtimes/zed.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,7 @@ def mixin_zed_runtime_parser(parser):
default=SocketType.PUSH_BIND,
help='The socket type for output port',
)
gp.add_argument(
'--dynamic-out-routing',
action='store_true',
default=False,
help='Tells if ZEDRuntime should respect routing graph for outgoing traffic.',
)
gp.add_argument(
'--dynamic-in-routing',
action='store_true',
default=False,
help='Tells if ZEDRuntime should handle incoming traffic as dynamic routing.',
)

gp.add_argument(
'--memory-hwm',
type=int,
Expand Down Expand Up @@ -130,3 +119,30 @@ def mixin_zed_runtime_parser(parser):
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)

parser.add_argument(
'--dynamic-routing',
action='store_true',
default=True,
help='The Pod will setup the socket types of the HeadPea and TailPea depending on this argument.'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)

gp.add_argument(
'--dynamic-routing-out',
action='store_true',
default=False,
help='Tells if ZEDRuntime should respect routing graph for outgoing traffic.'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)

gp.add_argument(
'--dynamic-routing-in',
action='store_true',
default=False,
help='Tells if ZEDRuntime should handle incoming traffic as dynamic routing.'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)
20 changes: 10 additions & 10 deletions jina/peapods/pods/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def _copy_to_head_args(
else:
_head_args.socket_out = SocketType.PUB_BIND

Pod._set_dynamic_in_routing(_head_args)
Pod._set_dynamic_routing_in(_head_args)

if as_router:
_head_args.uses = args.uses_before or __default_executor__
Expand Down Expand Up @@ -229,7 +229,7 @@ def _copy_to_tail_args(
_tail_args.pea_role = PeaRoleType.TAIL
_tail_args.num_part = 1 if polling_type.is_push else args.parallel

Pod._set_dynamic_out_routing(_tail_args)
Pod._set_dynamic_routing_out(_tail_args)

return _tail_args

Expand Down Expand Up @@ -570,15 +570,15 @@ def _set_peas_args(
connect_args=_args,
)
else:
Pod._set_dynamic_in_routing(_args)
Pod._set_dynamic_routing_in(_args)
if tail_args:
_args.host_out = get_connect_host(
bind_host=tail_args.host,
bind_expose_public=tail_args.expose_public,
connect_args=_args,
)
else:
Pod._set_dynamic_out_routing(_args)
Pod._set_dynamic_routing_out(_args)

# pea workspace if not set then derive from workspace
if not _args.workspace:
Expand Down Expand Up @@ -622,23 +622,23 @@ def _parse_base_pod_args(self, args):
else:
self.is_head_router = False
self.is_tail_router = False
Pod._set_dynamic_in_routing(args)
Pod._set_dynamic_out_routing(args)
Pod._set_dynamic_routing_in(args)
Pod._set_dynamic_routing_out(args)
parsed_args['peas'] = [args]

# note that peas_args['peas'][0] exist either way and carries the original property
return parsed_args

@staticmethod
def _set_dynamic_in_routing(args):
def _set_dynamic_routing_in(args):
if args.dynamic_routing:
args.dynamic_in_routing = True
args.dynamic_routing_in = True
args.socket_in = SocketType.ROUTER_BIND

@staticmethod
def _set_dynamic_out_routing(args):
def _set_dynamic_routing_out(args):
if args.dynamic_routing:
args.dynamic_out_routing = True
args.dynamic_routing_out = True
args.socket_out = SocketType.DEALER_CONNECT

def set_routing_graph(self, routing_graph: RoutingGraph) -> None:
Expand Down
1 change: 0 additions & 1 deletion jina/peapods/runtimes/zmq/zed.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ def _post_hook(self, msg: 'Message') -> 'ZEDRuntime':
# all meta information should be stored and accessed via `msg.envelope`

self._last_active_time = time.perf_counter()
self._zmqlet.print_stats()
self._check_memory_watermark()

if self.expect_parts > 1:
Expand Down
6 changes: 3 additions & 3 deletions jina/peapods/zmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def _init_sockets(self) -> Tuple:
self.logger.debug(
f'input {self.args.host_in}:{colored(self.args.port_in, "yellow")}'
)
if not self.args.dynamic_out_routing:
if not self.args.dynamic_routing_out:
out_sock, out_addr = _init_socket(
ctx,
self.args.host_out,
Expand Down Expand Up @@ -293,7 +293,7 @@ def send_message(self, msg: 'Message'):
"""
# choose output sock
if msg.is_data_request:
if self.args.dynamic_out_routing:
if self.args.dynamic_routing_out:
self._send_message_dynamic(msg)
return
out_sock = self.out_sock
Expand Down Expand Up @@ -367,7 +367,7 @@ async def send_message(self, msg: 'Message', sleep: float = 0, **kwargs):
:param kwargs: keyword arguments
"""
# await asyncio.sleep(sleep) # preventing over-speed sending
if self.args.dynamic_out_routing:
if self.args.dynamic_routing_out:
await self._send_message_dynamic(msg)
else:
self._send_message_via(self.out_sock, msg)
Expand Down
7 changes: 3 additions & 4 deletions tests/unit/peapods/zmq/test_dynamic_routing.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import asyncio
import logging
import pytest
import threading
import time

import pytest
from google.protobuf import json_format

from jina.helper import random_identity
from jina.parsers import set_pea_parser
from jina.peapods.zmq import Zmqlet, AsyncZmqlet, ZmqStreamlet
from jina.proto import jina_pb2
from jina.types.message import Message
from jina.helper import random_identity


def get_args():
Expand All @@ -27,7 +26,7 @@ def get_args():
'DEALER_CONNECT',
'--timeout-ctrl',
'-1',
'--dynamic-out-routing',
'--dynamic-routing-out',
]
)

Expand Down

0 comments on commit ed894f3

Please sign in to comment.