diff --git a/llmware/web_services.py b/llmware/web_services.py index 409b68fc..f91cc573 100644 --- a/llmware/web_services.py +++ b/llmware/web_services.py @@ -968,3 +968,429 @@ def get_all_img(self, save_dir): return 0 + +def get_api_catalog(): + + """ Representative baseline set of APIs to expose on server-side implementation of llmware behind API server. """ + + api_catalog = [ + + # inference + model related APIs + + {"api_name": "inference", + "methods": ["POST"], + "endpoint": "/inference/", + "function": "inference", + "params": ["prompt", "model_name", "max_output", "temperature", "sample", "api_key", "context", + "params", "fx", "trusted_key"], + "response": ["llm_response"], + "category": "model", + "timeout": 60}, + + {"api_name": "function_call", + "methods": ["POST"], + "endpoint": "/function_call/", + "function": "fx", + "params": ["model_name", "prompt", "context", "params", "function", "api_key", "get_logits", + "max_output", "temperature", "sample", "trusted_key"], + "response": ["llm_response"], + "category": "model", + "timeout": 60}, + + {"api_name": "stream", + "methods": ["POST"], + "endpoint": "/stream/", + "function": "stream", + "params": ["model_name", "prompt", "max_output", "temperature", "sample", "context", "api_key", "trusted_key"], + "response": ["llm_response"], + "category": "model", + "timeout": 60}, + + {"api_name": "document_batch_analysis", + "methods": ["POST"], + "endpoint": "/document_batch_analysis/", + "function": "document_batch_analysis", + "params": ["question_list", "uploaded_files", "trusted_key", "reranker", "rag"], + "response": ["analysis_report"], + "category": "models", + "timeout": 60}, + + {"api_name": "process_workflow", + "methods": ["POST"], + "endpoint": "/process_workflow/", + "function": "process_workflow", + "params": ["process_map", "trusted_key"], + "response": ["report", "research", "safety_record", "response_list", "journal"], + "category": "models", + "timeout": 60}, + + {"api_name": "get_api_catalog", + "methods": ["POST"], + "endpoint": "/library/get_api_catalog/", + "function": "get_api_catalog", + "params": ["trusted_key"], + "response": ["response"], + "category": "admin", + "timeout": 3}, + + {"api_name": "ping", + "methods": ["POST"], + "endpoint": "/ping/", + "function": "ping", + "params": ["trusted_key"], + "response": ["ping"], + "category": "admin", + "timeout": 1}, + + {"api_name": "model_lookup", + "methods": ["POST"], + "endpoint": "/model_lookup/", + "function": "model_lookup", + "params": ["model_name", "trusted_key"], + "response": ["response"], + "category": "admin", + "timeout": 3}, + + {"api_name": "model_load", + "methods": ["POST"], + "endpoint": "/model_load/", + "function": "model_load", + "params": ["model_name", "trusted_key", "sample", "temperature", "max_output"], + "response": ["response"], + "category": "admin", + "timeout": 20}, + + {"api_name": "model_unload", + "methods": ["POST"], + "endpoint": "/model_unload/", + "function": "model_unload", + "params": ["model_name", "trusted_key"], + "response": ["response"], + "category": "admin", + "timeout": 20}, + + {"api_name": "new_process_map", + "methods": ["POST"], + "endpoint": "/new_process_map/", + "function": "new_process_map", + "params": ["model_name", "prompt"], + "response": ["llm_response"], + "category": "model", + "timeout": 20} + + ] + + return api_catalog + + +class LLMWareClient: + + """ Representative baseline implementation of a simple Client library that wraps POST + request calls to invoke REST APIs on server side LLMWare inference server (e.g., implemented with Flask or FastAPI.) + + Note: there is a reference API server implementation on Flask in llmware.models module, along with two separate + stand-alone examples for creating a 'pop-up' inference server implementation in the Models examples. + + The intent of providing this client class is as a starting point for a simple, easy-to-modify client + library that can be used in conjunction with a variety of different server-based implementations of llmware. + + This code should be viewed as experimental, and not intended to be implemented 'as-is' but rather as a + starting point guide for custom implementations. """ + + def __init__(self, api_endpoint=None, api_key=None, account_id=None, library_name=None, + api_custom_catalog=None, timeout=20): + + self.URL_BASE = api_endpoint + self.api_key = api_key + self.account_id = account_id + self.library_name = library_name + self.timeout = timeout + + if api_custom_catalog: + self.api_catalog = api_custom_catalog + else: + self.api_catalog = get_api_catalog() + + def package_url_base(self, ip_address=None, port=None, http_type="http", endpoint_dict=None): + + """ Utility to package url base from an endpoint description of components, e.g., + + endpoint = { "api_server": "test_server_1", + "ip_address": "3.83.132.29", + "port": "8088", + "secret_key": "abc-123", + "http_type": "HTTP" + } + """ + + if endpoint_dict and not ip_address and not port: + if isinstance(endpoint_dict, dict): + ip_address = endpoint_dict.get("ip_address", None) + port = endpoint_dict.get("port", None) + http_type = endpoint_dict.get("http_type", None) + + http_type = str(http_type).lower() + "://" + + self.URL_BASE = http_type + str(ip_address) + ":" + str(port) + + return self.URL_BASE + + def _lookup_endpoint(self, api_name): + + """ Internal lookup utility to pull api card. """ + + for entries in self.api_catalog: + if entries["api_name"] == api_name: + return entries + + return {} + + def _exec_standard_api(self, api_name, timeout=None, **kwargs): + + """ Internal utility to package and post request. """ + + api = self._lookup_endpoint(api_name) + + endpoint = api["endpoint"] + params = api["params"] + + if "timeout" in api: + timeout = api["timeout"] + + input_dict = {} + + for k, v in kwargs.items(): + if k in params: + input_dict.update({k: v}) + + if not timeout: + timeout = self.timeout + + url = self.URL_BASE + "{}".format(endpoint) + + try: + import requests + import json + + output_raw = requests.post(url, data=input_dict, timeout=timeout) + output = json.loads(output_raw.text) + + except Exception as e: + print("caught exception: ", e) + output = {} + + return output + + def inference(self, **kwargs): + + """ Inference method """ + + api_name = "inference" + api = self._lookup_endpoint(api_name) + + endpoint = api["endpoint"] + params = api["params"] + inp = {} + for k, v in kwargs.items(): + if k in params: + inp.update({k: v}) + + url = self.URL_BASE + "{}".format(endpoint) + + try: + import requests + import json + + output_raw = requests.post(url, data=inp) + output = json.loads(output_raw.text) + + except Exception as e: + print("caught exception: ", e) + output = {} + + return output + + def stream(self, **kwargs): + + """ Streaming token API - Intended to be consumed as a generator function, e.g., + for token in client.stream('What is ...?'): + print(token, end="") + """ + + api_name = "stream" + api = self._lookup_endpoint(api_name) + endpoint = api["endpoint"] + params = api["params"] + inp = {} + for k, v in kwargs.items(): + if k in params: + inp.update({k: v}) + + url = self.URL_BASE + "{}".format(endpoint) + + try: + import requests + r = requests.post(url=url, data=inp, stream=True) + + except Exception as e: + print("caught exception: ", e) + r = {} + + output = "" + + if r: + for tokens in r.iter_content(chunk_size=1): + if tokens: + new_out = str(tokens, encoding='utf-8', errors='ignore') + # print(new_out, end="") + output += new_out + yield new_out + + response = {"llm_response": output} + + return response + + def function_call(self, **kwargs): + + """ Function call REST API to invoke a SLIM model on server side. """ + + api_name = "function_call" + api = self._lookup_endpoint(api_name) + + endpoint = api["endpoint"] + params = api["params"] + inp = {} + + for k, v in kwargs.items(): + if k in params: + inp.update({k: v}) + + try: + import requests + import json + + url = self.URL_BASE + "{}".format(endpoint) + output_raw = requests.post(url, data=inp) + output = json.loads(output_raw.text) + + except Exception as e: + print("caught exception: ", e) + output = {} + + return output + + def document_batch_analysis(self, local_folder_path, question_list, use_async=False, **kwargs): + + """ Implements a custom document batch analysis method on server side. """ + + if use_async: + api_name = "document_batch_analysis_async" + else: + api_name = "document_batch_analysis" + + api = self._lookup_endpoint(api_name) + + endpoint = api["endpoint"] + params = api["params"] + inp = {} + inp.update({"question_list": question_list}) + + for k, v in kwargs.items(): + if k in params: + inp.update({k: v}) + + import os + import json + + # need to prepare file input + files = os.listdir(local_folder_path) + file_list = [] + for f in list(files): + fp = os.path.join(local_folder_path, f) + new_entry = ('uploaded_files', (f, open(fp, 'rb'))) # ,'image/png')) + file_list.append(new_entry) + + try: + import requests + url = self.URL_BASE + "{}".format(endpoint) + output_raw = requests.post(url, data=inp, files=file_list, timeout=360) + output = json.loads(output_raw.text) + except Exception as e: + print("caught exception: ", e) + output = {} + + return output + + def process_workflow(self, process_map, trusted_key, **kwargs): + + """ Implements a JSON-based custom process workflow on server-side implementation. """ + + api_name = "process_workflow" + + api = self._lookup_endpoint(api_name) + + endpoint = api["endpoint"] + params = api["params"] + inp = {} + inp.update({"trusted_key": trusted_key}) + + for k, v in kwargs.items(): + if k in params: + inp.update({k: v}) + + # need to prepare file input + new_entry = [('process_map', ("process_file", open(process_map, 'rb')))] # ,'image/png')) + + url = self.URL_BASE + "{}".format(endpoint) + + try: + import requests + import json + + output_raw = requests.post(url, data=inp, files=new_entry) + output = json.loads(output_raw.text) + + except Exception as e: + print("caught exception: ", e) + output = {} + + return output + + def ping(self, **kwargs): + + """ Ping to confirm API server availability. """ + + api_name = "ping" + return self._exec_standard_api(api_name, **kwargs) + + def get_api_catalog(self, **kwargs): + + """ Posts the API catalog available on the server-side implementation. """ + + api_name = "get_api_catalog" + return self._exec_standard_api(api_name, **kwargs) + + def model_lookup(self, model_name, **kwargs): + + """ Lookup a model availability on the server. """ + + api_name = "model_lookup" + kwargs.update({"model_name": model_name}) + return self._exec_standard_api(api_name, **kwargs) + + def model_load(self, model_name, **kwargs): + + """ Load a model on the server. """ + + api_name = "model_load" + kwargs.update({"model_name": model_name}) + return self._exec_standard_api(api_name, **kwargs) + + def model_unload(self, model_name, **kwargs): + + """ Unload a model on the server. """ + + api_name = "model_unload" + kwargs.update({"model_name": model_name}) + return self._exec_standard_api(api_name, **kwargs) +