Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pass request-headers as metadata #6126

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion jina/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime as _datetime
import os as _os
import sys as _sys
from pathlib import Path as _Path
import datetime as _datetime

__windows__ = _sys.platform == 'win32'
__uptime__ = _datetime.datetime.now().isoformat()
Expand Down Expand Up @@ -53,6 +53,7 @@
__args_executor_func__ = {
'docs',
'parameters',
'headers',
'docs_matrix',
}
__args_executor_init__ = {'metas', 'requests', 'runtime_args'}
Expand Down
5 changes: 4 additions & 1 deletion jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ def add_post_route(
)
app_kwargs['response_class'] = DocArrayResponse

from fastapi import Request

@app.api_route(**app_kwargs)
async def post(body: input_model, response: Response):
async def post(body: input_model, response: Response, request: Request):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this means the header will only be available when gateway is enabled?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway - having access to HTTP header is very important for our production application - we are an infra team supporting different businesses, and we use headers to communicate extra information like auth, tracing id, etc.

@JoanFM

Copy link
Contributor Author

@NarekA NarekA Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work for standalone-fast-api deployments too. See this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @cenhao ,

The feature should be available with and without gateway.

Btw, would be cool to know about ur usecase and how u use/plan to use Jina

target_executor = None
req_id = None
if body.header is not None:
Expand All @@ -208,6 +210,7 @@ async def post(body: input_model, response: Response):
docs,
exec_endpoint=endpoint_path,
parameters=body.parameters,
headers=request.headers,
target_executor=target_executor,
request_id=req_id,
return_results=True,
Expand Down
18 changes: 18 additions & 0 deletions jina/serve/runtimes/gateway/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
AsyncIterator,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -209,6 +210,7 @@ async def stream(
exec_endpoint: Optional[str] = None,
target_executor: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
results_in_order: bool = False,
return_type: Type[DocumentArray] = DocumentArray,
) -> AsyncIterator[Tuple[Union[DocumentArray, 'Request'], 'ExecutorError']]:
Expand All @@ -221,6 +223,7 @@ async def stream(
:param exec_endpoint: The Executor endpoint to which to send the Documents
:param target_executor: A regex expression indicating the Executors that should receive the Request
:param parameters: Parameters to be attached to the Requests
:param headers: Http request headers
:param results_in_order: return the results in the same order as the request_iterator
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
:yield: tuple of Documents or Responses and unpacked error from Executors if any
Expand All @@ -232,6 +235,7 @@ async def stream(
exec_endpoint=exec_endpoint,
target_executor=target_executor,
parameters=parameters,
headers=headers,
results_in_order=results_in_order,
return_type=return_type,
):
Expand All @@ -256,6 +260,7 @@ async def stream_doc(
exec_endpoint: Optional[str] = None,
target_executor: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
request_id: Optional[str] = None,
return_type: Type[DocumentArray] = DocumentArray,
) -> AsyncIterator[Tuple[Union[DocumentArray, 'Request'], 'ExecutorError']]:
Expand All @@ -267,6 +272,7 @@ async def stream_doc(
:param exec_endpoint: The Executor endpoint to which to send the Documents
:param target_executor: A regex expression indicating the Executors that should receive the Request
:param parameters: Parameters to be attached to the Requests
:param headers: Http request headers
:param request_id: Request ID to add to the request streamed to Executor. Only applicable if request_size is equal or less to the length of the docs
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
:yield: tuple of Documents or Responses and unpacked error from Executors if any
Expand All @@ -282,6 +288,8 @@ async def stream_doc(
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers

async for result in self.rpc_stream_doc(request=req, return_type=return_type):
error = None
Expand All @@ -306,6 +314,7 @@ async def stream_docs(
exec_endpoint: Optional[str] = None,
target_executor: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
results_in_order: bool = False,
request_id: Optional[str] = None,
return_type: Type[DocumentArray] = DocumentArray,
Expand All @@ -319,6 +328,7 @@ async def stream_docs(
:param exec_endpoint: The Executor endpoint to which to send the Documents
:param target_executor: A regex expression indicating the Executors that should receive the Request
:param parameters: Parameters to be attached to the Requests
:param headers: Http request headers
:param results_in_order: return the results in the same order as the request_iterator
:param request_id: Request ID to add to the request streamed to Executor. Only applicable if request_size is equal or less to the length of the docs
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
Expand All @@ -339,6 +349,8 @@ def _req_generator():
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers
yield req
else:
from docarray import BaseDoc
Expand All @@ -361,6 +373,8 @@ def batch(iterable, n=1):
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers
yield req
else:
req = DataRequest()
Expand All @@ -374,6 +388,8 @@ def batch(iterable, n=1):
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers
yield req

async for resp in self.rpc_stream(
Expand Down Expand Up @@ -438,6 +454,7 @@ async def post(
request_size: int = 100,
on: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
return_type: Type[DocumentArray] = DocumentArray,
**kwargs,
):
Expand Down Expand Up @@ -505,6 +522,7 @@ async def stream_doc(
inputs: 'Document',
on: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
**kwargs,
):
req: SingleDocumentRequest = SingleDocumentRequest(inputs.to_protobuf())
Expand Down
6 changes: 5 additions & 1 deletion jina/serve/runtimes/worker/http_fastapi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,18 @@ def add_post_route(

app_kwargs['response_class'] = DocArrayResponse

from fastapi import Request

@app.api_route(**app_kwargs)
async def post(body: input_model, response: Response):
async def post(body: input_model, response: Response, request: Request):

req = DataRequest()
if body.header is not None:
req.header.request_id = body.header.request_id

if body.parameters is not None:
req.parameters = body.parameters
req.headers = request.headers
req.header.exec_endpoint = endpoint_path
data = body.data
if isinstance(data, list):
Expand Down Expand Up @@ -149,6 +152,7 @@ async def streaming_get(request: Request = None, body: input_doc_model = None):
body = Document.from_pydantic_model(body)
req = DataRequest()
req.header.exec_endpoint = endpoint_path
req.headers = request.headers
if not docarray_v2:
req.data.docs = DocumentArray([body])
else:
Expand Down
Loading