Skip to content

Commit

Permalink
Add tracing information (#24)
Browse files Browse the repository at this point in the history
Add some debug information for tracing by default server side.
  • Loading branch information
eyurtsev authored Oct 11, 2023
1 parent 6ec7cea commit 5f30428
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 41 deletions.
12 changes: 2 additions & 10 deletions langserve/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
"""Main entrypoint into package."""
from importlib import metadata

from langserve.client import RemoteRunnable
from langserve.server import add_routes
from langserve.version import __version__

__all__ = ["RemoteRunnable", "add_routes"]


try:
__version__ = metadata.version(__package__)
except metadata.PackageNotFoundError:
# Case where package metadata is not available.
__version__ = ""
del metadata # optional, avoids polluting the results of dir(__package__)
__all__ = ["RemoteRunnable", "add_routes", "__version__"]
86 changes: 61 additions & 25 deletions langserve/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
Union,
)

from fastapi import Request
from langchain.callbacks.tracers.log_stream import RunLog, RunLogPatch
from langchain.load.serializable import Serializable
from langchain.schema.runnable import Runnable
from typing_extensions import Annotated

from langserve.version import __version__

try:
from pydantic.v1 import BaseModel, create_model
except ImportError:
Expand Down Expand Up @@ -124,6 +127,24 @@ class Config:
return model_with_unique_name


def _add_tracing_info_to_metadata(config: Dict[str, Any], request: Request) -> None:
"""Add information useful for tracing and debugging purposes.
Args:
config: The config to expand with tracing information.
request: The request to use for expanding the metadata.
"""

metadata = config["metadata"] if "metadata" in config else {}

info = {
"__useragent": request.headers.get("user-agent"),
"__langserve_version": __version__,
}
metadata.update(info)
config["metadata"] = metadata


# PUBLIC API


Expand Down Expand Up @@ -201,34 +222,46 @@ def add_routes(
response_model=InvokeResponse,
)
async def invoke(
request: Annotated[InvokeRequest, InvokeRequest]
invoke_request: Annotated[InvokeRequest, InvokeRequest],
request: Request,
) -> InvokeResponse:
"""Invoke the runnable with the given input and config."""
# Request is first validated using InvokeRequest which takes into account
# config_keys as well as input_type.
config = _unpack_config(request.config, config_keys)
config = _unpack_config(invoke_request.config, config_keys)
_add_tracing_info_to_metadata(config, request)
output = await runnable.ainvoke(
_unpack_input(request.input), config=config, **request.kwargs
_unpack_input(invoke_request.input), config=config, **invoke_request.kwargs
)

return InvokeResponse(output=simple_dumpd(output))

#
@app.post(f"{namespace}/batch", response_model=BatchResponse)
async def batch(request: Annotated[BatchRequest, BatchRequest]) -> BatchResponse:
async def batch(
batch_request: Annotated[BatchRequest, BatchRequest],
request: Request,
) -> BatchResponse:
"""Invoke the runnable with the given inputs and config."""
if isinstance(request.config, list):
config = [_unpack_config(config, config_keys) for config in request.config]
if isinstance(batch_request.config, list):
config = [
_unpack_config(config, config_keys) for config in batch_request.config
]

for c in config:
_add_tracing_info_to_metadata(c, request)
else:
config = _unpack_config(request.config, config_keys)
inputs = [_unpack_input(input_) for input_ in request.inputs]
output = await runnable.abatch(inputs, config=config, **request.kwargs)
config = _unpack_config(batch_request.config, config_keys)
_add_tracing_info_to_metadata(config, request)
inputs = [_unpack_input(input_) for input_ in batch_request.inputs]
output = await runnable.abatch(inputs, config=config, **batch_request.kwargs)

return BatchResponse(output=simple_dumpd(output))

@app.post(f"{namespace}/stream")
async def stream(
request: Annotated[StreamRequest, StreamRequest],
stream_request: Annotated[StreamRequest, StreamRequest],
request: Request,
) -> EventSourceResponse:
"""Invoke the runnable stream the output.
Expand Down Expand Up @@ -264,15 +297,16 @@ async def stream(
# Request is first validated using InvokeRequest which takes into account
# config_keys as well as input_type.
# After validation, the input is loaded using LangChain's load function.
input_ = _unpack_input(request.input)
config = _unpack_config(request.config, config_keys)
input_ = _unpack_input(stream_request.input)
config = _unpack_config(stream_request.config, config_keys)
_add_tracing_info_to_metadata(config, request)

async def _stream() -> AsyncIterator[dict]:
"""Stream the output of the runnable."""
async for chunk in runnable.astream(
input_,
config=config,
**request.kwargs,
**stream_request.kwargs,
):
yield {"data": simple_dumps(chunk), "event": "data"}
yield {"event": "end"}
Expand All @@ -281,7 +315,8 @@ async def _stream() -> AsyncIterator[dict]:

@app.post(f"{namespace}/stream_log")
async def stream_log(
request: Annotated[StreamLogRequest, StreamLogRequest],
stream_log_request: Annotated[StreamLogRequest, StreamLogRequest],
request: Request,
) -> EventSourceResponse:
"""Invoke the runnable stream_log the output.
Expand Down Expand Up @@ -318,24 +353,25 @@ async def stream_log(
# Request is first validated using InvokeRequest which takes into account
# config_keys as well as input_type.
# After validation, the input is loaded using LangChain's load function.
input_ = _unpack_input(request.input)
config = _unpack_config(request.config, config_keys)
input_ = _unpack_input(stream_log_request.input)
config = _unpack_config(stream_log_request.config, config_keys)
_add_tracing_info_to_metadata(config, request)

async def _stream_log() -> AsyncIterator[dict]:
"""Stream the output of the runnable."""
async for chunk in runnable.astream_log(
input_,
config=config,
diff=request.diff,
include_names=request.include_names,
include_types=request.include_types,
include_tags=request.include_tags,
exclude_names=request.exclude_names,
exclude_types=request.exclude_types,
exclude_tags=request.exclude_tags,
**request.kwargs,
diff=stream_log_request.diff,
include_names=stream_log_request.include_names,
include_types=stream_log_request.include_types,
include_tags=stream_log_request.include_tags,
exclude_names=stream_log_request.exclude_names,
exclude_types=stream_log_request.exclude_types,
exclude_tags=stream_log_request.exclude_tags,
**stream_log_request.kwargs,
):
if request.diff: # Run log patch
if stream_log_request.diff: # Run log patch
if not isinstance(chunk, RunLogPatch):
raise AssertionError(
f"Expected a RunLog instance got {type(chunk)}"
Expand Down
9 changes: 9 additions & 0 deletions langserve/version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Main entrypoint into package."""
from importlib import metadata

try:
__version__ = metadata.version(__package__)
except metadata.PackageNotFoundError:
# Case where package metadata is not available.
__version__ = ""
del metadata # optional, avoids polluting the results of dir(__package__)
18 changes: 12 additions & 6 deletions tests/unit_tests/test_server_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,18 +452,24 @@ async def add_one(x: int) -> int:
# Verify that can be invoked with valid input
# Config ignored for runnable1
assert await runnable1.ainvoke(1, config=config) == 2
assert invoke_spy_1.call_args[1]["config"] == {}
# Config should be ignored but default debug information
# will still be added
config_seen = invoke_spy_1.call_args[1]["config"]
assert "metadata" in config_seen
assert "__useragent" in config_seen["metadata"]
assert "__langserve_version" in config_seen["metadata"]

invoke_spy_2 = mocker.spy(server_runnable2, "ainvoke")
async with get_async_client(app, path="/add_one_config") as runnable2:
# Config accepted for runnable2
assert await runnable2.ainvoke(1, config=config) == 2
# Config ignored
assert invoke_spy_2.call_args[1]["config"] == {
"tags": ["test"],
"metadata": {"a": 5},
"run_name": None, # At the moment this is added by default
}

config_seen = invoke_spy_2.call_args[1]["config"]
assert config_seen["tags"] == ["test"]
assert config_seen["metadata"]["a"] == 5
assert "__useragent" in config_seen["metadata"]
assert "__langserve_version" in config_seen["metadata"]


@pytest.mark.asyncio
Expand Down

0 comments on commit 5f30428

Please sign in to comment.