Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Changes for capturing metrics #900

Merged
merged 10 commits into from
Dec 18, 2024
8 changes: 8 additions & 0 deletions backend/api_v2/api_deployment_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def post(
serializer.is_valid(raise_exception=True)
timeout = serializer.validated_data.get(ApiExecution.TIMEOUT_FORM_DATA)
include_metadata = serializer.validated_data.get(ApiExecution.INCLUDE_METADATA)
include_metrics = serializer.validated_data.get(ApiExecution.INCLUDE_METRICS)
use_file_history = serializer.validated_data.get(ApiExecution.USE_FILE_HISTORY)
if not file_objs or len(file_objs) == 0:
raise InvalidAPIRequest("File shouldn't be empty")
Expand All @@ -60,6 +61,7 @@ def post(
file_objs=file_objs,
timeout=timeout,
include_metadata=include_metadata,
include_metrics=include_metrics,
use_file_history=use_file_history,
)
if "error" in response and response["error"]:
Expand All @@ -78,6 +80,10 @@ def get(
request.query_params.get(ApiExecution.INCLUDE_METADATA, "false").lower()
== "true"
)
include_metrics = (
request.query_params.get(ApiExecution.INCLUDE_METRICS, "false").lower()
== "true"
)
if not execution_id:
raise InvalidAPIRequest("execution_id shouldn't be empty")
response: ExecutionResponse = DeploymentHelper.get_execution_status(
Expand All @@ -88,6 +94,8 @@ def get(
response_status = status.HTTP_200_OK
if not include_metadata:
response.remove_result_metadata_keys()
if not include_metrics:
response.remove_result_metrics()
if response.result_acknowledged:
response_status = status.HTTP_406_NOT_ACCEPTABLE
response.result = "Result already acknowledged"
Expand Down
1 change: 1 addition & 0 deletions backend/api_v2/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ class ApiExecution:
FILES_FORM_DATA: str = "files"
TIMEOUT_FORM_DATA: str = "timeout"
INCLUDE_METADATA: str = "include_metadata"
INCLUDE_METRICS: str = "include_metrics"
USE_FILE_HISTORY: str = "use_file_history" # Undocumented parameter
3 changes: 3 additions & 0 deletions backend/api_v2/deployment_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def execute_workflow(
file_objs: list[UploadedFile],
timeout: int,
include_metadata: bool = False,
include_metrics: bool = False,
use_file_history: bool = False,
) -> ReturnDict:
"""Execute workflow by api.
Expand Down Expand Up @@ -180,6 +181,8 @@ def execute_workflow(
)
if not include_metadata:
result.remove_result_metadata_keys()
if not include_metrics:
result.remove_result_metrics()
except Exception as error:
DestinationConnector.delete_api_storage_dir(
workflow_id=workflow_id, execution_id=execution_id
Expand Down
2 changes: 2 additions & 0 deletions backend/api_v2/postman_collection/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def get_form_data_items(self) -> list[FormDataItem]:
value=ApiExecution.MAXIMUM_TIMEOUT_IN_SEC,
),
FormDataItem(key=ApiExecution.INCLUDE_METADATA, type="text", value="False"),
FormDataItem(key=ApiExecution.INCLUDE_METRICS, type="text", value="False"),
]

def get_api_key(self) -> str:
Expand All @@ -131,6 +132,7 @@ def _get_status_api_request(self) -> RequestItem:
status_query_param = {
"execution_id": CollectionKey.STATUS_EXEC_ID_DEFAULT,
ApiExecution.INCLUDE_METADATA: "False",
ApiExecution.INCLUDE_METRICS: "False",
}
status_query_str = urlencode(status_query_param)
abs_api_endpoint = urljoin(settings.WEB_APP_ORIGIN_URL, self.api_endpoint)
Expand Down
2 changes: 2 additions & 0 deletions backend/api_v2/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class ExecutionRequestSerializer(Serializer):
timeout (int): Timeout for the API deployment, maximum value can be 300s.
If -1 it corresponds to async execution. Defaults to -1
include_metadata (bool): Flag to include metadata in API response
include_metrics (bool): Flag to include metrics in API response
use_file_history (bool): Flag to use FileHistory to save and retrieve
responses quickly. This is undocumented to the user and can be
helpful for demos.
Expand All @@ -112,6 +113,7 @@ class ExecutionRequestSerializer(Serializer):
min_value=-1, max_value=ApiExecution.MAXIMUM_TIMEOUT_IN_SEC, default=-1
)
include_metadata = BooleanField(default=False)
include_metrics = BooleanField(default=False)
use_file_history = BooleanField(default=False)


Expand Down
15 changes: 15 additions & 0 deletions backend/workflow_manager/workflow_v2/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@ def remove_result_metadata_keys(self, keys_to_remove: list[str] = []) -> None:

self._remove_specific_keys(result=result, keys_to_remove=keys_to_remove)

def remove_result_metrics(self) -> None:
"""Removes the 'metrics' key from the 'result' dictionary within each
'result' dictionary in the 'result' list attribute of the instance.
"""
if not isinstance(self.result, list):
return

for item in self.result:
if not isinstance(item, dict):
continue

result = item.get("result")
if isinstance(result, dict):
result.pop("metrics", None)

def _remove_specific_keys(self, result: dict, keys_to_remove: list[str]) -> None:
"""Removes specified keys from the 'metadata' dictionary within the
provided 'result' dictionary. If 'keys_to_remove' is empty, the
Expand Down
2 changes: 2 additions & 0 deletions prompt-service/src/unstract/prompt_service/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class PromptServiceContants:
FILE_PATH = "file_path"
HIGHLIGHT_DATA = "highlight_data"
CONFIDENCE_DATA = "confidence_data"
TIME_TAKEN = "time_taken"
Deepak-Kesavan marked this conversation as resolved.
Show resolved Hide resolved
METRICS = "metrics"


class RunLevel(Enum):
Expand Down
24 changes: 21 additions & 3 deletions prompt-service/src/unstract/prompt_service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def prompt_processor() -> Any:
PSKeys.FILE_NAME: doc_name,
PSKeys.CONTEXT: {},
}
metrics: dict = {}
variable_names: list[str] = []
publish_log(
log_events_id,
Expand All @@ -126,8 +127,7 @@ def prompt_processor() -> Any:
prompt_text = output[PSKeys.PROMPT]
chunk_size = output[PSKeys.CHUNK_SIZE]
util = PromptServiceBaseTool(platform_key=platform_key)
index = Index(tool=util)

index = Index(tool=util, run_id=run_id, capture_metrics=True)
if VariableExtractor.is_variables_present(prompt_text=prompt_text):
prompt_text = VariableExtractor.replace_variables_in_prompt(
prompt=output,
Expand Down Expand Up @@ -188,6 +188,7 @@ def prompt_processor() -> Any:
**usage_kwargs,
PSKeys.LLM_USAGE_REASON: PSKeys.EXTRACTION,
},
capture_metrics=True,
)

embedding = Embedding(
Expand Down Expand Up @@ -489,6 +490,7 @@ def prompt_processor() -> Any:
**usage_kwargs,
PSKeys.LLM_USAGE_REASON: PSKeys.CHALLENGE,
},
capture_metrics=True,
gaya3-zipstack marked this conversation as resolved.
Show resolved Hide resolved
)
challenge = challenge_plugin["entrypoint_cls"](
llm=llm,
Expand Down Expand Up @@ -592,6 +594,18 @@ def prompt_processor() -> Any:
f"No eval plugin found to evaluate prompt: {output[PSKeys.NAME]}" # noqa: E501
)
finally:
challenge_metrics = (
{f"{challenge_llm.get_usage_reason()}_llm": challenge_llm.get_metrics()}
if enable_challenge
else {}
)
metrics.setdefault(prompt_name, {}).update(
{
"context_retrieval": index.get_metrics(),
chandrasekharan-zipstack marked this conversation as resolved.
Show resolved Hide resolved
f"{llm.get_usage_reason()}_llm": llm.get_metrics(),
**challenge_metrics,
}
)
vector_db.close()
publish_log(
log_events_id,
Expand Down Expand Up @@ -624,7 +638,11 @@ def prompt_processor() -> Any:
"Execution complete",
)
metadata = query_usage_metadata(token=platform_key, metadata=metadata)
response = {PSKeys.METADATA: metadata, PSKeys.OUTPUT: structured_output}
response = {
PSKeys.METADATA: metadata,
PSKeys.OUTPUT: structured_output,
PSKeys.METRICS: metrics,
}
return response


Expand Down
1 change: 1 addition & 0 deletions tools/structure/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ class SettingsKeys:
CONFIDENCE_DATA = "confidence_data"
EXECUTION_RUN_DATA_FOLDER = "EXECUTION_RUN_DATA_FOLDER"
FILE_PATH = "file_path"
METRICS = "metrics"
17 changes: 16 additions & 1 deletion tools/structure/src/main.py
Deepak-Kesavan marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ def run(
self.stream_update(output_log, state=LogState.OUTPUT_UPDATE)

file_hash = self.get_exec_metadata.get(MetadataKey.SOURCE_HASH)
index = Index(tool=self)
tool_id = tool_metadata[SettingsKeys.TOOL_ID]
tool_settings = tool_metadata[SettingsKeys.TOOL_SETTINGS]
outputs = tool_metadata[SettingsKeys.OUTPUTS]
Expand All @@ -105,6 +104,12 @@ def run(
self.get_env_or_die(SettingsKeys.EXECUTION_RUN_DATA_FOLDER)
)
run_id = CommonUtils.generate_uuid()
index = Index(
tool=self,
run_id=run_id,
capture_metrics=True,
)
index_metrics = {}
extracted_input_file = str(execution_run_data_folder / SettingsKeys.EXTRACT)
# TODO : Resolve and pass log events ID
payload = {
Expand Down Expand Up @@ -187,6 +192,8 @@ def run(
usage_kwargs=usage_kwargs,
process_text=process_text,
)
index_metrics[output[SettingsKeys.NAME]] = index.get_metrics()
index.clear_metrics()

if summarize_as_source:
summarize_file_hash = self._summarize_and_index(
Expand Down Expand Up @@ -242,6 +249,14 @@ def run(
structured_output_dict[SettingsKeys.METADATA] = metadata
structured_output = json.dumps(structured_output_dict)

metrics = structured_output[SettingsKeys.METRICS]
# Merge dictionaries
new_metrics = {
chandrasekharan-zipstack marked this conversation as resolved.
Show resolved Hide resolved
key: {**metrics.get(key, {}), **index_metrics.get(key, {})}
for key in set(metrics)
| set(index_metrics) # Union of keys from both dictionaries
}
structured_output[SettingsKeys.METRICS] = new_metrics
# Update GUI
output_log = (
f"## Result\n**NOTE:** In case of a deployed pipeline, the result would "
Expand Down
Loading