Skip to content

Commit

Permalink
Rename metadata to headers
Browse files Browse the repository at this point in the history
  • Loading branch information
NarekA committed Dec 13, 2023
1 parent 1c2a7c2 commit e0b54de
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 102 deletions.
3 changes: 2 additions & 1 deletion jina/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime as _datetime
import os as _os
import sys as _sys
from pathlib import Path as _Path
import datetime as _datetime

__windows__ = _sys.platform == 'win32'
__uptime__ = _datetime.datetime.now().isoformat()
Expand Down Expand Up @@ -53,6 +53,7 @@
__args_executor_func__ = {
'docs',
'parameters',
'headers',
'docs_matrix',
}
__args_executor_init__ = {'metas', 'requests', 'runtime_args'}
Expand Down
5 changes: 4 additions & 1 deletion jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ def add_post_route(
)
app_kwargs['response_class'] = DocArrayResponse

from fastapi import Request

Check warning on line 190 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L190

Added line #L190 was not covered by tests

@app.api_route(**app_kwargs)
async def post(body: input_model, response: Response):
async def post(body: input_model, response: Response, request: Request):

Check warning on line 193 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L193

Added line #L193 was not covered by tests
target_executor = None
req_id = None
if body.header is not None:
Expand All @@ -208,6 +210,7 @@ async def post(body: input_model, response: Response):
docs,
exec_endpoint=endpoint_path,
parameters=body.parameters,
headers=request.headers,
target_executor=target_executor,
request_id=req_id,
return_results=True,
Expand Down
73 changes: 31 additions & 42 deletions jina/serve/runtimes/gateway/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,48 +157,37 @@ async def _load_balance(self, request):
try:
async with aiohttp.ClientSession() as session:

if request.method == 'GET':
request_kwargs = {}
try:
payload = await request.json()
if payload:
request_kwargs['json'] = payload
except Exception:
self.logger.debug('No JSON payload found in request')

async with session.get(
url=target_url, **request_kwargs
) as response:
# Create a StreamResponse with the same headers and status as the target response
stream_response = web.StreamResponse(
status=response.status,
headers=response.headers,
)

# Prepare the response to send headers
await stream_response.prepare(request)

# Stream the response from the target server to the client
async for chunk in response.content.iter_any():
await stream_response.write(chunk)

# Close the stream response once all chunks are sent
await stream_response.write_eof()
return stream_response

elif request.method == 'POST':
d = await request.read()
import json

async with session.post(
url=target_url, json=json.loads(d.decode())
) as response:
content = await response.read()
return web.Response(
body=content,
status=response.status,
content_type=response.content_type,
)
request_kwargs = {}
try:
payload = await request.json()
if payload:
request_kwargs['json'] = payload
except Exception:
self.logger.debug('No JSON payload found in request')

Check warning on line 166 in jina/serve/runtimes/gateway/request_handling.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/request_handling.py#L160-L166

Added lines #L160 - L166 were not covered by tests

async with session.request(

Check warning on line 168 in jina/serve/runtimes/gateway/request_handling.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/request_handling.py#L168

Added line #L168 was not covered by tests
request.method,
url=target_url,
auto_decompress=False,
**request_kwargs,
) as response:
# Create a StreamResponse with the same headers and status as the target response
stream_response = web.StreamResponse(

Check warning on line 175 in jina/serve/runtimes/gateway/request_handling.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/request_handling.py#L175

Added line #L175 was not covered by tests
status=response.status,
headers=response.headers,
)

# Prepare the response to send headers
await stream_response.prepare(request)

Check warning on line 181 in jina/serve/runtimes/gateway/request_handling.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/request_handling.py#L181

Added line #L181 was not covered by tests

# Stream the response from the target server to the client
async for chunk in response.content.iter_any():
await stream_response.write(chunk)

Check warning on line 185 in jina/serve/runtimes/gateway/request_handling.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/request_handling.py#L184-L185

Added lines #L184 - L185 were not covered by tests

# Close the stream response once all chunks are sent
await stream_response.write_eof()
return stream_response

Check warning on line 189 in jina/serve/runtimes/gateway/request_handling.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/request_handling.py#L188-L189

Added lines #L188 - L189 were not covered by tests

except aiohttp.ClientError as e:
return web.Response(text=f'Error: {str(e)}', status=500)

Expand Down
18 changes: 18 additions & 0 deletions jina/serve/runtimes/gateway/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
AsyncIterator,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -209,6 +210,7 @@ async def stream(
exec_endpoint: Optional[str] = None,
target_executor: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
results_in_order: bool = False,
return_type: Type[DocumentArray] = DocumentArray,
) -> AsyncIterator[Tuple[Union[DocumentArray, 'Request'], 'ExecutorError']]:
Expand All @@ -221,6 +223,7 @@ async def stream(
:param exec_endpoint: The Executor endpoint to which to send the Documents
:param target_executor: A regex expression indicating the Executors that should receive the Request
:param parameters: Parameters to be attached to the Requests
:param headers: Http request headers
:param results_in_order: return the results in the same order as the request_iterator
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
:yield: tuple of Documents or Responses and unpacked error from Executors if any
Expand All @@ -232,6 +235,7 @@ async def stream(
exec_endpoint=exec_endpoint,
target_executor=target_executor,
parameters=parameters,
headers=headers,
results_in_order=results_in_order,
return_type=return_type,
):
Expand All @@ -256,6 +260,7 @@ async def stream_doc(
exec_endpoint: Optional[str] = None,
target_executor: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
request_id: Optional[str] = None,
return_type: Type[DocumentArray] = DocumentArray,
) -> AsyncIterator[Tuple[Union[DocumentArray, 'Request'], 'ExecutorError']]:
Expand All @@ -267,6 +272,7 @@ async def stream_doc(
:param exec_endpoint: The Executor endpoint to which to send the Documents
:param target_executor: A regex expression indicating the Executors that should receive the Request
:param parameters: Parameters to be attached to the Requests
:param headers: Http request headers
:param request_id: Request ID to add to the request streamed to Executor. Only applicable if request_size is equal or less to the length of the docs
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
:yield: tuple of Documents or Responses and unpacked error from Executors if any
Expand All @@ -282,6 +288,8 @@ async def stream_doc(
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers

Check warning on line 292 in jina/serve/runtimes/gateway/streamer.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/streamer.py#L291-L292

Added lines #L291 - L292 were not covered by tests

async for result in self.rpc_stream_doc(request=req, return_type=return_type):
error = None
Expand All @@ -306,6 +314,7 @@ async def stream_docs(
exec_endpoint: Optional[str] = None,
target_executor: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
results_in_order: bool = False,
request_id: Optional[str] = None,
return_type: Type[DocumentArray] = DocumentArray,
Expand All @@ -319,6 +328,7 @@ async def stream_docs(
:param exec_endpoint: The Executor endpoint to which to send the Documents
:param target_executor: A regex expression indicating the Executors that should receive the Request
:param parameters: Parameters to be attached to the Requests
:param headers: Http request headers
:param results_in_order: return the results in the same order as the request_iterator
:param request_id: Request ID to add to the request streamed to Executor. Only applicable if request_size is equal or less to the length of the docs
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
Expand All @@ -339,6 +349,8 @@ def _req_generator():
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers

Check warning on line 353 in jina/serve/runtimes/gateway/streamer.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/streamer.py#L352-L353

Added lines #L352 - L353 were not covered by tests
yield req
else:
from docarray import BaseDoc
Expand All @@ -361,6 +373,8 @@ def batch(iterable, n=1):
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers

Check warning on line 377 in jina/serve/runtimes/gateway/streamer.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/streamer.py#L376-L377

Added lines #L376 - L377 were not covered by tests
yield req
else:
req = DataRequest()
Expand All @@ -374,6 +388,8 @@ def batch(iterable, n=1):
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers

Check warning on line 392 in jina/serve/runtimes/gateway/streamer.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/streamer.py#L391-L392

Added lines #L391 - L392 were not covered by tests
yield req

async for resp in self.rpc_stream(
Expand Down Expand Up @@ -438,6 +454,7 @@ async def post(
request_size: int = 100,
on: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
return_type: Type[DocumentArray] = DocumentArray,
**kwargs,
):
Expand Down Expand Up @@ -505,6 +522,7 @@ async def stream_doc(
inputs: 'Document',
on: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
**kwargs,
):
req: SingleDocumentRequest = SingleDocumentRequest(inputs.to_protobuf())
Expand Down
6 changes: 5 additions & 1 deletion jina/serve/runtimes/worker/http_fastapi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,18 @@ def add_post_route(

app_kwargs['response_class'] = DocArrayResponse

from fastapi import Request

Check warning on line 89 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L89

Added line #L89 was not covered by tests

@app.api_route(**app_kwargs)
async def post(body: input_model, response: Response):
async def post(body: input_model, response: Response, request: Request):

Check warning on line 92 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L92

Added line #L92 was not covered by tests

req = DataRequest()
if body.header is not None:
req.header.request_id = body.header.request_id

if body.parameters is not None:
req.parameters = body.parameters
req.headers = request.headers

Check warning on line 100 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L100

Added line #L100 was not covered by tests
req.header.exec_endpoint = endpoint_path
data = body.data
if isinstance(data, list):
Expand Down Expand Up @@ -149,6 +152,7 @@ async def streaming_get(request: Request = None, body: input_doc_model = None):
body = Document.from_pydantic_model(body)
req = DataRequest()
req.header.exec_endpoint = endpoint_path
req.headers = request.headers

Check warning on line 155 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L155

Added line #L155 was not covered by tests
if not docarray_v2:
req.data.docs = DocumentArray([body])
else:
Expand Down
Loading

0 comments on commit e0b54de

Please sign in to comment.