Skip to content

Commit c801d31

Browse files
authored
add checker (#4711)
1 parent 096d87d commit c801d31

File tree

3 files changed

+73
-1
lines changed

3 files changed

+73
-1
lines changed

fastdeploy/engine/common_engine.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,14 @@
4747
from fastdeploy.plugins.token_processor import load_token_processor_plugins
4848
from fastdeploy.splitwise.internal_adapter_utils import InternalAdapter
4949
from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector
50-
from fastdeploy.utils import EngineError, envs, get_logger, llm_logger
50+
from fastdeploy.utils import (
51+
EngineError,
52+
check_download_links,
53+
envs,
54+
get_logger,
55+
init_bos_client,
56+
llm_logger,
57+
)
5158

5259
try:
5360
TokenProcessor = load_token_processor_plugins()
@@ -128,6 +135,7 @@ def __init__(self, cfg, start_queue=True):
128135
* self.cfg.cache_config.block_size
129136
)
130137

138+
self.bos_client = None
131139
self.guided_decoding_checker = None
132140
if self.cfg.structured_outputs_config.guided_decoding_backend != "off":
133141
self.guided_decoding_checker = schema_checker(
@@ -827,6 +835,24 @@ def _insert_zmq_task_to_scheduler(self):
827835
self.llm_logger.error(f"Receive request error: {err_msg}")
828836
results.append((request.request_id, err_msg))
829837

838+
if self._has_features_info(request) and err_msg is None:
839+
if self.bos_client is None:
840+
self.bos_client = init_bos_client()
841+
842+
download_urls = []
843+
inputs = request.multimodal_inputs
844+
if inputs.get("video_feature_urls") is not None:
845+
download_urls.extend(inputs.get("video_feature_urls"))
846+
if inputs.get("image_feature_urls") is not None:
847+
download_urls.extend(inputs.get("image_feature_urls"))
848+
if inputs.get("audio_feature_urls") is not None:
849+
download_urls.extend(inputs.get("audio_feature_urls"))
850+
851+
err_msg = check_download_links(self.bos_client, download_urls)
852+
if err_msg:
853+
llm_logger.error(f"Receive request {request.request_id} download error: {err_msg}")
854+
results.append((request.request_id, err_msg))
855+
830856
if err_msg is None:
831857
insert_task.append(request)
832858

@@ -877,6 +903,19 @@ def _decode_token(self, token_ids, req_id, is_end):
877903
del self.data_processor.decode_status[req_id]
878904
return delta_text, token_ids
879905

906+
def _has_features_info(self, task):
907+
inputs = task.multimodal_inputs
908+
if inputs is None or len(inputs) == 0:
909+
return False
910+
911+
if (
912+
(inputs.get("video_feature_urls") is not None and len(inputs["video_feature_urls"]) > 0)
913+
or (inputs.get("image_feature_urls") is not None and len(inputs["image_feature_urls"]) > 0)
914+
or (inputs.get("audio_feature_urls") is not None and len(inputs["audio_feature_urls"]) > 0)
915+
):
916+
return True
917+
return False
918+
880919
def _zmq_send_generated_tokens(self):
881920
"""
882921
Recieve output for zmq

fastdeploy/envs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@
128128
"FD_CACHE_PROC_EXIT_TIMEOUT": lambda: int(os.getenv("FD_CACHE_PROC_EXIT_TIMEOUT", "600")),
129129
# Count for cache_transfer_manager process error
130130
"FD_CACHE_PROC_ERROR_COUNT": lambda: int(os.getenv("FD_CACHE_PROC_ERROR_COUNT", "10")),
131+
"ENCODE_FEATURE_BOS_AK": lambda: os.getenv("ENCODE_FEATURE_BOS_AK"),
132+
"ENCODE_FEATURE_BOS_SK": lambda: os.getenv("ENCODE_FEATURE_BOS_SK"),
131133
}
132134

133135

fastdeploy/utils.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,37 @@ def get_logger(name, file_name=None, without_formater=False, print_to_console=Fa
944944
return FastDeployLogger().get_logger(name, file_name, without_formater, print_to_console)
945945

946946

947+
def check_download_links(bos_client, links, timeout=1):
948+
"""
949+
check bos download links
950+
"""
951+
for link in links:
952+
try:
953+
if link.startswith("bos://"):
954+
link = link.replace("bos://", "")
955+
956+
bucket_name = "/".join(link.split("/")[1:-1])
957+
object_key = link.split("/")[-1]
958+
response = bos_client.get_object_meta_data(bucket_name, object_key)
959+
assert (
960+
int(response.metadata.content_length) > 0
961+
), f"bos download length error, {response.metadata.content_length}"
962+
except Exception as e:
963+
return f"link {link} download error: {str(e)}"
964+
return None
965+
966+
967+
def init_bos_client():
968+
from baidubce.auth.bce_credentials import BceCredentials
969+
from baidubce.bce_client_configuration import BceClientConfiguration
970+
from baidubce.services.bos.bos_client import BosClient
971+
972+
cfg = BceClientConfiguration(
973+
credentials=BceCredentials(envs.ENCODE_FEATURE_BOS_AK, envs.ENCODE_FEATURE_BOS_SK), endpoint="bj.bcebos.com"
974+
)
975+
return BosClient(cfg)
976+
977+
947978
llm_logger = get_logger("fastdeploy", "fastdeploy.log")
948979
data_processor_logger = get_logger("data_processor", "data_processor.log")
949980
scheduler_logger = get_logger("scheduler", "scheduler.log")

0 commit comments

Comments
 (0)