diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5c484fd29..e4bd36c1d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -151,3 +151,185 @@ jobs: run: | rm -rf /tmp/.buildx-cache mv /tmp/.buildx-cache-new /tmp/.buildx-cache + + model-api: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - name: Prepare + id: prep + run: | + TAG=$(echo $GITHUB_SHA | head -c7) + IMAGE="ukpsquare/square-model-api" + echo ::set-output name=image::${IMAGE} + echo ::set-output name=tag::${TAG} + - name: Set up Docker Buildx + id: buildx + uses: docker/setup-buildx-action@v1 + with: + install: true + + - name: Cache Docker layers + uses: actions/cache@v2 + with: + path: /tmp/.buildx-cache + key: ${{ runner.os }}-buildx-model_api-${{ github.sha }} + restore-keys: | + ${{ runner.os }}-buildx-model_api- + ${{ runner.os }}-buildx- + + - name: Build test image + uses: docker/build-push-action@v2 + with: + builder: ${{ steps.buildx.outputs.name }} + context: ./square-model-inference-api/inference_server + target: test + load: true + tags: ${{ steps.prep.outputs.image }}:${{ steps.prep.outputs.tag }}-test + cache-from: type=local,src=/tmp/.buildx-cache + cache-to: type=local,mode=max,dest=/tmp/.buildx-cache-new + + # Temp fix + # https://github.com/docker/build-push-action/issues/252 + # https://github.com/moby/buildkit/issues/1896 + - name: Move cache + run: | + rm -rf /tmp/.buildx-cache + mv /tmp/.buildx-cache-new /tmp/.buildx-cache + + - name: Retrieve Test Reports + id: extract + uses: shrink/actions-docker-extract@v1 + with: + image: ${{ steps.prep.outputs.image }}:${{ steps.prep.outputs.tag }}-test + path: /app/test-reports + + - uses: actions/upload-artifact@v2 + with: + name: model_api-test-reports + path: ${{ steps.extract.outputs.destination }}/test-reports + + - name: Publish Test Report + uses: mikepenz/action-junit-report@v2 + with: + report_paths: ${{ steps.extract.outputs.destination }}/test-reports/junit.xml + check_name: Model API Test Report + fail_on_failure: true + + - name: Login to Docker Hub + uses: docker/login-action@v1 + with: + username: ${{ secrets.DOCKER_HUB_USERNAME }} + password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} + + - name: Build deployable image + uses: docker/build-push-action@v2 + with: + builder: ${{ steps.buildx.outputs.name }} + context: ./square-model-inference-api/inference_server + target: build + push: ${{github.ref == 'refs/heads/master'}} + tags: ${{ steps.prep.outputs.image }}:${{ steps.prep.outputs.tag }}, ${{ steps.prep.outputs.image }}:latest + cache-from: type=local,src=/tmp/.buildx-cache + cache-to: type=local,mode=max,dest=/tmp/.buildx-cache-new + + # Temp fix + # https://github.com/docker/build-push-action/issues/252 + # https://github.com/moby/buildkit/issues/1896 + - name: Move cache + run: | + rm -rf /tmp/.buildx-cache + mv /tmp/.buildx-cache-new /tmp/.buildx-cache + + model-api-auth: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - name: Prepare + id: prep + run: | + TAG=$(echo $GITHUB_SHA | head -c7) + IMAGE="ukpsquare/square-model-api-auth" + echo ::set-output name=image::${IMAGE} + echo ::set-output name=tag::${TAG} + - name: Set up Docker Buildx + id: buildx + uses: docker/setup-buildx-action@v1 + with: + install: true + + - name: Cache Docker layers + uses: actions/cache@v2 + with: + path: /tmp/.buildx-cache + key: ${{ runner.os }}-buildx-model_api_auth-${{ github.sha }} + restore-keys: | + ${{ runner.os }}-buildx-model_api_auth- + ${{ runner.os }}-buildx- + + - name: Build test image + uses: docker/build-push-action@v2 + with: + builder: ${{ steps.buildx.outputs.name }} + context: ./square-model-inference-api/auth_server + target: test + load: true + tags: ${{ steps.prep.outputs.image }}:${{ steps.prep.outputs.tag }}-test + cache-from: type=local,src=/tmp/.buildx-cache + cache-to: type=local,mode=max,dest=/tmp/.buildx-cache-new + + # Temp fix + # https://github.com/docker/build-push-action/issues/252 + # https://github.com/moby/buildkit/issues/1896 + - name: Move cache + run: | + rm -rf /tmp/.buildx-cache + mv /tmp/.buildx-cache-new /tmp/.buildx-cache + + - name: Retrieve Test Reports + id: extract + uses: shrink/actions-docker-extract@v1 + with: + image: ${{ steps.prep.outputs.image }}:${{ steps.prep.outputs.tag }}-test + path: /app/test-reports + + - uses: actions/upload-artifact@v2 + with: + name: model_api_auth-test-reports + path: ${{ steps.extract.outputs.destination }}/test-reports + + - name: Publish Test Report + uses: mikepenz/action-junit-report@v2 + with: + report_paths: ${{ steps.extract.outputs.destination }}/test-reports/junit.xml + check_name: Model API Auth Test Report + fail_on_failure: true + + - name: Login to Docker Hub + uses: docker/login-action@v1 + with: + username: ${{ secrets.DOCKER_HUB_USERNAME }} + password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} + + - name: Build deployable image + uses: docker/build-push-action@v2 + with: + builder: ${{ steps.buildx.outputs.name }} + context: ./square-model-inference-api/auth_server + target: build + push: ${{github.ref == 'refs/heads/master'}} + tags: ${{ steps.prep.outputs.image }}:${{ steps.prep.outputs.tag }}, ${{ steps.prep.outputs.image }}:latest + cache-from: type=local,src=/tmp/.buildx-cache + cache-to: type=local,mode=max,dest=/tmp/.buildx-cache-new + + # Temp fix + # https://github.com/docker/build-push-action/issues/252 + # https://github.com/moby/buildkit/issues/1896 + - name: Move cache + run: | + rm -rf /tmp/.buildx-cache + mv /tmp/.buildx-cache-new /tmp/.buildx-cache \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index c093325f6..8d4d17ac9 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -29,6 +29,44 @@ services: ports: - 80:80 +### MODEL API + model_auth: + image: ukpsquare/square-model-api-auth:latest + container_name: square_model_auth + ports: + - 8081:8081 + env_file: + - ./square-model-inference-api/auth_server/.env + + model_nginx: + image: nginx + ports: + - 8080:8080 + volumes: + - ./square-model-inference-api/nginx/nginx.conf:/etc/nginx/nginx.conf:ro + + inference_bert_adapter: + image: ukpsquare/square-model-api:latest + ports: + - 8000:8000 + env_file: + - ./square-model-inference-api/inference_server/.env.bert_adapter + container_name: square_model_inference_bert_adapter + volumes: + - ./.cache/:/etc/huggingface/.cache/ + + inference_dpr: + image: ukpsquare/square-model-api:latest + ports: + - 8001:8000 + env_file: + - ./square-model-inference-api/inference_server/.env.dpr + container_name: square_model_inference_dpr + volumes: + - ./.cache/:/etc/huggingface/.cache/ + +### / MODEL API Finished + #adminer: # image: adminer # restart: always diff --git a/square-model-inference-api/.dockerignore b/square-model-inference-api/.dockerignore new file mode 100644 index 000000000..cc3361cfa --- /dev/null +++ b/square-model-inference-api/.dockerignore @@ -0,0 +1,134 @@ +/images +/tests +.ascii-art + +*.yml +*.ini + +# Hidden files +.DS_store + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# data is not logged +data/ + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +.vscode/ + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +htmlcov-py36/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.model_testing_cache + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +.vscode/ +.pytest-cache/ +.pytest_cache/ +.empty/ + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# virtualenv +.venv +venv/ +ENV/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.idea/ + + + +Child_Watch_API/ +htmlcov-*/ + +# remove large model +ml_model/*.joblib + + +#venv +venv/ + +.idea/ \ No newline at end of file diff --git a/square-model-inference-api/.gitignore b/square-model-inference-api/.gitignore new file mode 100644 index 000000000..ed7883485 --- /dev/null +++ b/square-model-inference-api/.gitignore @@ -0,0 +1,134 @@ +.idea/ + +/ml_model/models + +# Hidden files +.DS_store + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# data is not logged +data/ + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +.vscode/ + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +htmlcov-py36/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.model_testing_cache + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +.vscode/ +.pytest-cache/ +.pytest_cache/ +.empty/ + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# dotenv +.env + +# virtualenv +.venv +venv/ +ENV/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.idea/ + + + +Child_Watch_API/ +htmlcov-*/ + +# remove large model +ml_model/*.joblib + + +#venv +venv/ + + diff --git a/square-model-inference-api/Makefile b/square-model-inference-api/Makefile new file mode 100644 index 000000000..39899afde --- /dev/null +++ b/square-model-inference-api/Makefile @@ -0,0 +1,42 @@ +SHELL := /bin/bash + +# Target section and Global definitions +# ----------------------------------------------------------------------------- +.PHONY: all clean test install run deploy down + +all: clean test install run deploy down + +test: + python -m pip install --upgrade pip && pip install pytest pytest-cov pytest-asyncio + PYTHONPATH=auth_server/ pytest --cov + PYTHONPATH=inference_server/ pytest --cov + +install: + pip install --upgrade pip + pip install -r inference_server/requirements1.txt + pip uninstall -y -r inference_server/uninstall_requirements.txt + pip install -r inference_server/requirements2.txt + +run: + PYTHONPATH=inference_server/ uvicorn inference_server.main:app --reload --host 0.0.0.0 --port 8000 --env-file inference_server/.env + +build: + docker-compose build + +deploy: + docker-compose build + docker-compose up -d + +down: + docker-compose down + +clean: + -find . -name '*.pyc' -exec rm -rf {} \; + -find . -name '__pycache__' -exec rm -rf {} \; + -find . -name 'Thumbs.db' -exec rm -rf {} \; + -find . -name '*~' -exec rm -rf {} \; + -rm -rf build + -rm -rf dist + -rm -rf *.egg-info + -rm -rf docs/_build + -rm -rf .pytest_cache \ No newline at end of file diff --git a/square-model-inference-api/README.md b/square-model-inference-api/README.md new file mode 100644 index 000000000..31eabd254 --- /dev/null +++ b/square-model-inference-api/README.md @@ -0,0 +1,106 @@ +# SQuARE Model API +Inference API that supports SOTA (QA) models & adapters. +Receives input and returns prediction and other artifacts (e.g. attention scores) + +## On the API Path +The 'true' path of the API for the model server is of the form `/api/$endpoint` where the endpoint +is embeddings, question-answering, etc. This is the path you use if you just run a model server locally. + +However, to run and distinguish multiple models, we use an API gateway with nginx so we extend +the path to `/api/$modelname/$endpoint` which is then resolved by nginx to the correct model server and forwarded +to this server's `/api/$endpoint` endpoint. This is the path you use with Docker. +This requires you to setup the docker-compose and nginx config as described below. + +## Project structure + +The Model API uses 3 components: +1 authorization server, n inference servers (each with their own model), +and a nginx server that serves as API gateway to forward requests to the correct inference server and +to handle authorization of requests with the auth server. +``` +├───auth_server # FastAPI Authorization Server +│ ├───main.py # Entry point in server +│ ├───Dockerfile # Dockerfile for server +│ ├───tests # Unit Tests +│ │ ├───test_api +│ └───auth_api +├───inference_server # FastAPI Model API Server +│ ├───tests # Unit Tests +│ │ ├───test_api +│ │ ├───test_inference +│ ├───main.py # Entry point in server +│ ├───Dockerfile # Dockerfile for server +│ └───square_model_inference # Server Logic +│ ├───api # API Routes +│ │ ├───routes +│ ├───core # Server config, Startup logic, etc. +│ ├───models # Input/ output modelling for API +│ └───inference # Deep Model implementation and inference code for NLP tasks +├───nginx # nginx config for API Gateway & Authorizatio +│ └───nginx.conf +├───locust # Load testing configuration with Locust +└───example_docker-compose.yml # Example docker-compose setup for the Model API +``` + +### Logging +The components use the json-formatted logging used by the ELK Stack in square-core/logging. + +## Requirements + +Python 3.7+, Docker (optional), Make (optional) + +## Installation +Install the required packages in your local environment (ideally virtualenv, conda, etc.). +```bash +pip install -r inference_server/requirements1.txt +pip uninstall -y -r inference_server/uninstall_requirements.txt +pip install -r inference_server/requirements2.txt +``` +or +```sh +make install +``` +**Why two requirement.txt and why the uninstall?** +`sentence-transformers` depends on `transformers` and it will be installed along with it. +However, we use `adapter-transformers` (a fork of `transformers`) in this project. +Both `transformers` and `adapter-transformers` use the same namespace so they conflict. +Thus, we first install `sentence-transformers` along with `transformers`, +uninstall `transformers`, and finally install `adapter-transformers`. + + +## Setup +### Docker +1. Create `auth_server/.env` with secret API key. See [here](auth_server/.env.example) for an example. +2. For each model server that should run, create a `.env.$model` to configure it. + See [here](inference_server/.env.example) for an example. +3. Configure `nginx/nginx.conf` to correctly forward requests to each server. The server DNS name has to + match `container_name` of each server in the `docker-compose.yaml`. +4. Configure `docker-compose.yaml` by adding services for the auth server, nginx (with the config), and the + model servers (each with their .env file). See [example_docker-compose.yml](example_docker-compose.yml) for an example. +### Local +Create `inference_server/.env` and configure it as needed for your local model server. +You do not need nginx and the authorization server for local testing. + +## Running + +#### Running Localhost + +```sh +make run +``` +This *only* starts one inference server using `inference_server/.env`. No nginx, no auth server. +For debugging, `inference_server/main.py` can also be used as entry. + + +#### Running Via Docker + +```sh +make deploy +``` + +#### Running Tests +For unit tests: +```sh +make test +``` +For load testing with Locust, see [this README](locust/README.md). diff --git a/square-model-inference-api/auth_server/.env.example b/square-model-inference-api/auth_server/.env.example new file mode 100644 index 000000000..6f37be409 --- /dev/null +++ b/square-model-inference-api/auth_server/.env.example @@ -0,0 +1,2 @@ +API_KEY=example_key +API_KEY_HEADER_NAME=Authorization \ No newline at end of file diff --git a/square-model-inference-api/auth_server/Dockerfile b/square-model-inference-api/auth_server/Dockerfile new file mode 100644 index 000000000..394ce24d8 --- /dev/null +++ b/square-model-inference-api/auth_server/Dockerfile @@ -0,0 +1,30 @@ +FROM python:3.7.6-slim-buster as base + +ENV PYTHONUNBUFFERED 1 + +EXPOSE 8081 +WORKDIR /app + +COPY ElkJsonFormatter.tar.gz ./ElkJsonFormatter.tar.gz +RUN pip install --upgrade pip +COPY requirements.txt ./ +RUN pip install -r requirements.txt + +COPY main.py main.py +COPY ./auth_api auth_api +COPY logging.conf logging.conf + +FROM base as test +RUN pip install pytest pytest-cov pytest-asyncio +RUN mkdir test-reports +RUN PYTHONPATH=./ pytest \ + --junitxml=test-reports/junit.xml \ + --cov \ + --cov-report=xml:test-reports/coverage.xml \ + --cov-report=html:test-reports/coverage.html; \ + echo $? > test-reports/pytest.existcode + +# Deployment stage +FROM base as build + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8081", "--log-config", "logging.conf"] \ No newline at end of file diff --git a/square-model-inference-api/auth_server/ElkJsonFormatter.tar.gz b/square-model-inference-api/auth_server/ElkJsonFormatter.tar.gz new file mode 100644 index 000000000..19a8ecae0 Binary files /dev/null and b/square-model-inference-api/auth_server/ElkJsonFormatter.tar.gz differ diff --git a/square-model-inference-api/auth_server/__init__.py b/square-model-inference-api/auth_server/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/square-model-inference-api/auth_server/auth_api/__init__.py b/square-model-inference-api/auth_server/auth_api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/square-model-inference-api/auth_server/auth_api/config.py b/square-model-inference-api/auth_server/auth_api/config.py new file mode 100644 index 000000000..ec1987091 --- /dev/null +++ b/square-model-inference-api/auth_server/auth_api/config.py @@ -0,0 +1,9 @@ +from starlette.config import Config +from starlette.datastructures import Secret + +config = Config(".env") + +# The API key required to get authorization +API_KEY: Secret = config("API_KEY", cast=Secret) +# Name of the header in the request that contains the API key +API_KEY_HEADER_NAME: str = config("API_KEY_HEADER_NAME", cast=str, default="Authorization") diff --git a/square-model-inference-api/auth_server/auth_api/messages.py b/square-model-inference-api/auth_server/auth_api/messages.py new file mode 100644 index 000000000..16425b6c8 --- /dev/null +++ b/square-model-inference-api/auth_server/auth_api/messages.py @@ -0,0 +1,3 @@ +NO_API_KEY = "No API key provided in header {}." +AUTH_REQ = "Authentication required." + diff --git a/square-model-inference-api/auth_server/auth_api/security.py b/square-model-inference-api/auth_server/auth_api/security.py new file mode 100644 index 000000000..9fd328025 --- /dev/null +++ b/square-model-inference-api/auth_server/auth_api/security.py @@ -0,0 +1,29 @@ +import secrets + +from fastapi import HTTPException, Security +from fastapi.security.api_key import APIKeyHeader +from starlette.status import HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED + +from .config import API_KEY, API_KEY_HEADER_NAME +from .messages import AUTH_REQ, NO_API_KEY + +import logging + +logger = logging.getLogger(__name__) + +api_key_header = APIKeyHeader(name=API_KEY_HEADER_NAME, auto_error=False) + + +def validate_request(header: str = Security(api_key_header),) -> bool: + if header is None: + logger.info("Attempted access without API Key") + raise HTTPException( + status_code=HTTP_400_BAD_REQUEST, detail=NO_API_KEY.format(API_KEY_HEADER_NAME), headers={} + ) + elif not secrets.compare_digest(header, str(API_KEY)): + logger.info(f"Attempted access with wrong API Key {header}") + raise HTTPException( + status_code=HTTP_401_UNAUTHORIZED, detail=AUTH_REQ, headers={} + ) + logger.info("Successful authorization with API Key") + return True diff --git a/square-model-inference-api/auth_server/logging.conf b/square-model-inference-api/auth_server/logging.conf new file mode 100644 index 000000000..d62db8187 --- /dev/null +++ b/square-model-inference-api/auth_server/logging.conf @@ -0,0 +1,21 @@ +[loggers] +keys = root + +[logger_root] +level = DEBUG +handlers = root + +[handlers] +keys = root + +[handler_root] +class = StreamHandler +level = DEBUG +formatter = json + +[formatters] +keys = json + +[formatter_json] +class = ElkJsonFormatter.ElkJsonFormatter + diff --git a/square-model-inference-api/auth_server/main.py b/square-model-inference-api/auth_server/main.py new file mode 100644 index 000000000..b091250e9 --- /dev/null +++ b/square-model-inference-api/auth_server/main.py @@ -0,0 +1,18 @@ +from fastapi import FastAPI, Depends +import auth_api.security as security +from logging.config import fileConfig +import logging + +logger = logging.getLogger(__name__) +try: + fileConfig("logging.conf", disable_existing_loggers=False) +except: + logger.info("Failed to load 'logging.conf'. Continuing without configuring the server logger") +app = FastAPI() + + +@app.get("/auth") +async def auth(authenticated: bool = Depends(security.validate_request)): + # authenticated is always True because security.validate_request raises an exception if validation fails + # and it does not return False + return {"authenticated": authenticated} diff --git a/square-model-inference-api/auth_server/requirements.txt b/square-model-inference-api/auth_server/requirements.txt new file mode 100644 index 000000000..a7128b727 --- /dev/null +++ b/square-model-inference-api/auth_server/requirements.txt @@ -0,0 +1,4 @@ +uvicorn==0.13.4 # ASGI server +fastapi==0.65.1 # REST API Framework +pydantic==1.8.2 # Input/ output modelling +ElkJsonFormatter.tar.gz # Logging Formatter \ No newline at end of file diff --git a/square-model-inference-api/auth_server/tests/conftest.py b/square-model-inference-api/auth_server/tests/conftest.py new file mode 100644 index 000000000..b06b27e94 --- /dev/null +++ b/square-model-inference-api/auth_server/tests/conftest.py @@ -0,0 +1,24 @@ +import pytest + +from starlette.config import environ +API_KEY = "test_key" +API_KEY_HEADER_NAME = "Authorization" +environ["API_KEY"] = API_KEY +environ["API_KEY_HEADER_NAME"] = API_KEY_HEADER_NAME + +from main import app + + +@pytest.fixture() +def test_app(): + return app + + +@pytest.fixture() +def test_key(): + return API_KEY + + +@pytest.fixture() +def test_header(): + return API_KEY_HEADER_NAME \ No newline at end of file diff --git a/square-model-inference-api/auth_server/tests/test_api/__init__.py b/square-model-inference-api/auth_server/tests/test_api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/square-model-inference-api/auth_server/tests/test_api/test_auth.py b/square-model-inference-api/auth_server/tests/test_api/test_auth.py new file mode 100644 index 000000000..10c33ed56 --- /dev/null +++ b/square-model-inference-api/auth_server/tests/test_api/test_auth.py @@ -0,0 +1,34 @@ +from starlette.testclient import TestClient + +def test_api_correct_auth(test_app, test_key, test_header) -> None: + test_client = TestClient(test_app) + response = test_client.get( + "/auth", + headers={test_header: test_key} + ) + assert response.status_code == 200 + assert response.json()["authenticated"] == True + + +def test_api_no_header(test_app) -> None: + test_client = TestClient(test_app) + response = test_client.get( + "/auth" + ) + assert response.status_code == 400 + +def test_api_wrong_header(test_app, test_key, test_header) -> None: + test_client = TestClient(test_app) + response = test_client.get( + "/auth", + headers={test_header+"wrong": test_key} + ) + assert response.status_code == 400 + +def test_api_wrong_key(test_app, test_key, test_header) -> None: + test_client = TestClient(test_app) + response = test_client.get( + "/auth", + headers={test_header: test_key+"wrong"} + ) + assert response.status_code == 401 \ No newline at end of file diff --git a/square-model-inference-api/example_docker-compose.yml b/square-model-inference-api/example_docker-compose.yml new file mode 100644 index 000000000..dbfa7e022 --- /dev/null +++ b/square-model-inference-api/example_docker-compose.yml @@ -0,0 +1,44 @@ +version: "3" + +services: + auth: + #build: ./auth_server/. + image: ukpsquare/square-model-api-auth:latest + container_name: square_model_auth + ports: + - 8081:8081 + env_file: + - ./auth_server/.env.example + + nginx: + image: nginx + ports: + - 8080:8080 + volumes: + - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro + + # Example config for one model server + inference_bert: + #build: ./inference_server/. + image: ukpsquare/square-model-api:latest + ports: + - 8000:8000 + env_file: + - ./inference_server/.env.bert + container_name: square_model_inference_bert + volumes: + - ./.cache/:/etc/huggingface/.cache/ + #- ./inference_server/:/app/ #for developement + + # Example config for another model server + inference_roberta: + #build: ./inference_server/. + image: ukpsquare/square-model-api:latest + ports: + - 8001:8000 + env_file: + - ./inference_server/.env.roberta + container_name: square_model_inference_roberta + volumes: + - ./.cache/:/etc/huggingface/.cache/ + #- ./inference_server/:/app/ #for developement diff --git a/square-model-inference-api/inference_server/.env.bert_adapter b/square-model-inference-api/inference_server/.env.bert_adapter new file mode 100644 index 000000000..6453fb42e --- /dev/null +++ b/square-model-inference-api/inference_server/.env.bert_adapter @@ -0,0 +1,23 @@ +# Corresponds to the Huggingface name for finetuned Transformers or the name of a finetuned SentenceTransformers +MODEL_NAME=bert-base-uncased +# Type of the model, e.g. Transformers, Adapter, ... +# See square_model_inference.core.event_handlers.MODEL_MAPPING for all available names with corresponding model +MODEL_TYPE=adapter + +# Disable CUDA even if available +DISABLE_GPU=True +# Batch size used for many inputs +BATCH_SIZE=32 +# Inputs larger than this size are rejected +MAX_INPUT_SIZE=1024 + +# Cache directory where model weights are stored +# This is the name for the env variable used by transformers and sentence-transformers package +TRANSFORMERS_CACHE=/etc/huggingface/.cache/ + + +# Flag that decides if returned numpy arrays are returned +# as lists or encoded to base64 (smaller but not easily human readable). +# See the comment in square_model_inference.models.prediction._encode_numpy on information on how to decode +# the base64 string back to the numpy array +RETURN_PLAINTEXT_ARRAYS=False \ No newline at end of file diff --git a/square-model-inference-api/inference_server/.env.dpr b/square-model-inference-api/inference_server/.env.dpr new file mode 100644 index 000000000..a1e3784a4 --- /dev/null +++ b/square-model-inference-api/inference_server/.env.dpr @@ -0,0 +1,26 @@ +# Corresponds to the Huggingface name for finetuned Transformers or the name of a finetuned SentenceTransformers +MODEL_NAME=facebook/dpr-question_encoder-single-nq-base +# Type of the model, e.g. Transformers, Adapter, ... +# See square_model_inference.core.event_handlers.MODEL_MAPPING for all available names with corresponding model +MODEL_TYPE=transformer + +# Disable CUDA even if available +DISABLE_GPU=False +# Batch size used for many inputs +BATCH_SIZE=32 +# Inputs larger than this size are rejected +MAX_INPUT_SIZE=1024 + +# Cache directory where model weights are stored +# This is the name for the env variable used by transformers and sentence-transformers package +TRANSFORMERS_CACHE=/etc/huggingface/.cache/ + +# For MODEL_TYPE=transformers: decides the AutoModel* class used +# See square_model_inference.inference.transformer.CLASS_MAPPING for valid names and corresponding class +MODEL_CLASS=base + +# Flag that decides if returned numpy arrays are returned +# as lists or encoded to base64 (smaller but not easily human readable). +# See the comment in square_model_inference.models.prediction._encode_numpy on information on how to decode +# the base64 string back to the numpy array +RETURN_PLAINTEXT_ARRAYS=False \ No newline at end of file diff --git a/square-model-inference-api/inference_server/.env.example b/square-model-inference-api/inference_server/.env.example new file mode 100644 index 000000000..cff919980 --- /dev/null +++ b/square-model-inference-api/inference_server/.env.example @@ -0,0 +1,26 @@ +# Corresponds to the Huggingface name for finetuned Transformers or the name of a finetuned SentenceTransformers +MODEL_NAME=bert-base-uncased +# Type of the model, e.g. Transformers, Adapter, ... +# See square_model_inference.core.event_handlers.MODEL_MAPPING for all available names with corresponding model +MODEL_TYPE=adapter + +# Disable CUDA even if available +DISABLE_GPU=True +# Batch size used for many inputs +BATCH_SIZE=32 +# Inputs larger than this size are rejected +MAX_INPUT_SIZE=1024 + +# Cache directory where model weights are stored +# This is the name for the env variable used by transformers and sentence-transformers package +TRANSFORMERS_CACHE=../.cache + +# For MODEL_TYPE=transformers: decides the AutoModel* class used +# See square_model_inference.inference.transformer.CLASS_MAPPING for valid names and corresponding class +MODEL_CLASS=base + +# Flag that decides if returned numpy arrays are returned +# as lists or encoded to base64 (smaller but not easily human readable). +# See the comment in square_model_inference.models.prediction._encode_numpy on information on how to decode +# the base64 string back to the numpy array +RETURN_PLAINTEXT_ARRAYS=False \ No newline at end of file diff --git a/square-model-inference-api/inference_server/Dockerfile b/square-model-inference-api/inference_server/Dockerfile new file mode 100644 index 000000000..041a92c18 --- /dev/null +++ b/square-model-inference-api/inference_server/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3.7.6-slim-buster as base + +ENV PYTHONUNBUFFERED 1 + +EXPOSE 8000 +WORKDIR /app + +COPY ElkJsonFormatter.tar.gz ./ElkJsonFormatter.tar.gz +RUN pip install --upgrade pip +COPY requirements1.txt requirements2.txt uninstall_requirements.txt ./ +RUN pip install -r requirements1.txt +RUN pip uninstall -y -r uninstall_requirements.txt +RUN pip install -r requirements2.txt + +# Testing stage. We first pre-download any models separately for caching (pre_test_setup_for_docker_caching.py) and then +# run the tests +FROM base as test + +COPY ./tests/pre_test_setup_for_docker_caching.py ./tests/pre_test_setup_for_docker_caching.py +RUN python ./tests/pre_test_setup_for_docker_caching.py +COPY . ./ +RUN pip install pytest pytest-cov pytest-asyncio +RUN mkdir test-reports +RUN PYTHONPATH=./ pytest \ + --junitxml=test-reports/junit.xml \ + --cov \ + --cov-report=xml:test-reports/coverage.xml \ + --cov-report=html:test-reports/coverage.html; \ + echo $? > test-reports/pytest.existcode + +# Deployment stage +FROM base as build + +COPY main.py main.py +COPY ./square_model_inference square_model_inference +COPY logging.conf logging.conf + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--log-config", "logging.conf"] \ No newline at end of file diff --git a/square-model-inference-api/inference_server/ElkJsonFormatter.tar.gz b/square-model-inference-api/inference_server/ElkJsonFormatter.tar.gz new file mode 100644 index 000000000..19a8ecae0 Binary files /dev/null and b/square-model-inference-api/inference_server/ElkJsonFormatter.tar.gz differ diff --git a/square-model-inference-api/inference_server/__init__.py b/square-model-inference-api/inference_server/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/square-model-inference-api/inference_server/logging.conf b/square-model-inference-api/inference_server/logging.conf new file mode 100644 index 000000000..d62db8187 --- /dev/null +++ b/square-model-inference-api/inference_server/logging.conf @@ -0,0 +1,21 @@ +[loggers] +keys = root + +[logger_root] +level = DEBUG +handlers = root + +[handlers] +keys = root + +[handler_root] +class = StreamHandler +level = DEBUG +formatter = json + +[formatters] +keys = json + +[formatter_json] +class = ElkJsonFormatter.ElkJsonFormatter + diff --git a/square-model-inference-api/inference_server/main.py b/square-model-inference-api/inference_server/main.py new file mode 100644 index 000000000..849fa23a2 --- /dev/null +++ b/square-model-inference-api/inference_server/main.py @@ -0,0 +1,28 @@ +from fastapi import FastAPI +from square_model_inference.api.routes.router import api_router +from square_model_inference.core.config import API_PREFIX, APP_NAME, APP_VERSION +from square_model_inference.core.event_handlers import start_app_handler, stop_app_handler +from logging.config import fileConfig +import logging + +logger = logging.getLogger(__name__) + +def get_app() -> FastAPI: + # Set logging config. + try: + fileConfig("logging.conf", disable_existing_loggers=False) + except: + logger.info("Failed to load 'logging.conf'. Continuing without configuring the server logger") + fast_app = FastAPI(title=APP_NAME, version=APP_VERSION) + fast_app.include_router(api_router, prefix=API_PREFIX) + + fast_app.add_event_handler("startup", start_app_handler(fast_app)) + fast_app.add_event_handler("shutdown", stop_app_handler(fast_app)) + + return fast_app + +app = get_app() + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8002) diff --git a/square-model-inference-api/inference_server/requirements1.txt b/square-model-inference-api/inference_server/requirements1.txt new file mode 100644 index 000000000..6db7ba44c --- /dev/null +++ b/square-model-inference-api/inference_server/requirements1.txt @@ -0,0 +1,8 @@ +uvicorn==0.13.4 # ASGI server +fastapi==0.65.1 # REST API Framework +pydantic==1.8.2 # Input/ output modelling +python-dotenv==0.17.1 # Required for .env configs +sentencepiece==0.1.95 +torch==1.8.1 +sentence-transformers==1.2.0 +ElkJsonFormatter.tar.gz # Logging Formatter \ No newline at end of file diff --git a/square-model-inference-api/inference_server/requirements2.txt b/square-model-inference-api/inference_server/requirements2.txt new file mode 100644 index 000000000..02424bec4 --- /dev/null +++ b/square-model-inference-api/inference_server/requirements2.txt @@ -0,0 +1 @@ +adapter-transformers==2.1.2 \ No newline at end of file diff --git a/square-model-inference-api/inference_server/square_model_inference/__init__.py b/square-model-inference-api/inference_server/square_model_inference/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/square-model-inference-api/inference_server/square_model_inference/api/__init__.py b/square-model-inference-api/inference_server/square_model_inference/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/square-model-inference-api/inference_server/square_model_inference/api/routes/__init__.py b/square-model-inference-api/inference_server/square_model_inference/api/routes/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/square-model-inference-api/inference_server/square_model_inference/api/routes/heartbeat.py b/square-model-inference-api/inference_server/square_model_inference/api/routes/heartbeat.py new file mode 100644 index 000000000..67c0570f2 --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/api/routes/heartbeat.py @@ -0,0 +1,11 @@ +from fastapi import APIRouter + +from square_model_inference.models.heartbeat import HeartbeatResult + +router = APIRouter() + + +@router.get("/heartbeat", response_model=HeartbeatResult, name="heartbeat") +def get_hearbeat() -> HeartbeatResult: + heartbeat = HeartbeatResult(is_alive=True) + return heartbeat diff --git a/square-model-inference-api/inference_server/square_model_inference/api/routes/prediction.py b/square-model-inference-api/inference_server/square_model_inference/api/routes/prediction.py new file mode 100644 index 000000000..1de004a0d --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/api/routes/prediction.py @@ -0,0 +1,77 @@ +from fastapi import APIRouter +from starlette.requests import Request + +from square_model_inference.models.request import PredictionRequest, Task +from square_model_inference.models.prediction import PredictionOutputForSequenceClassification, PredictionOutputForTokenClassification, \ + PredictionOutputForQuestionAnswering, PredictionOutputForGeneration, PredictionOutputForEmbedding + +import logging + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.post("/sequence-classification", response_model=PredictionOutputForSequenceClassification, name="sequence classification") +async def sequence_classification( + request: Request, + prediction_request: PredictionRequest, +) -> PredictionOutputForSequenceClassification: + + logger.info(f"Sequence Classification Request: {prediction_request.dict()}") + model = request.app.state.model # Access model from global app state + prediction = await model.predict(prediction_request, Task.sequence_classification) + + return prediction + + +@router.post("/token-classification", response_model=PredictionOutputForTokenClassification, name="token classification") +async def token_classification( + request: Request, + prediction_request: PredictionRequest, +) -> PredictionOutputForTokenClassification: + + logger.info(f"Token Classification Request: {prediction_request.dict()}") + model = request.app.state.model + prediction = await model.predict(prediction_request, Task.token_classification) + + return prediction + + +@router.post("/embedding", response_model=PredictionOutputForEmbedding, name="embedding") +async def embedding( + request: Request, + prediction_request: PredictionRequest, +) -> PredictionOutputForEmbedding: + + logger.info(f"Embedding Request: {prediction_request.dict()}") + model = request.app.state.model + prediction = await model.predict(prediction_request, Task.embedding) + + return prediction + + +@router.post("/question-answering", response_model=PredictionOutputForQuestionAnswering, name="question answering") +async def question_answering( + request: Request, + prediction_request: PredictionRequest, +) -> PredictionOutputForQuestionAnswering: + + logger.info(f"Question Answering Request: {prediction_request.dict()}") + model = request.app.state.model + prediction = await model.predict(prediction_request, Task.question_answering) + + return prediction + + +@router.post("/generation", response_model=PredictionOutputForGeneration, name="generation") +async def generation( + request: Request, + prediction_request: PredictionRequest, +) -> PredictionOutputForGeneration: + + logger.info(f"Generation Request: {prediction_request.dict()}") + model = request.app.state.model + prediction = await model.predict(prediction_request, Task.generation) + + return prediction \ No newline at end of file diff --git a/square-model-inference-api/inference_server/square_model_inference/api/routes/router.py b/square-model-inference-api/inference_server/square_model_inference/api/routes/router.py new file mode 100644 index 000000000..67d60b65e --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/api/routes/router.py @@ -0,0 +1,7 @@ +from fastapi import APIRouter + +from square_model_inference.api.routes import heartbeat, prediction + +api_router = APIRouter() +api_router.include_router(heartbeat.router, tags=["health"], prefix="/health") +api_router.include_router(prediction.router, tags=["prediction"]) diff --git a/square-model-inference-api/inference_server/square_model_inference/core/__init__.py b/square-model-inference-api/inference_server/square_model_inference/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/square-model-inference-api/inference_server/square_model_inference/core/config.py b/square-model-inference-api/inference_server/square_model_inference/core/config.py new file mode 100644 index 000000000..aba698196 --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/core/config.py @@ -0,0 +1,34 @@ +from starlette.config import Config + +APP_VERSION = "0.1.0" +APP_NAME = "SQuARE Model Inference API" +API_PREFIX = "/api" + +config = Config(".env") + +# Corresponds to the Huggingface name for finetuned Transformers or the name of a finetuned SentenceTransformers +MODEL_NAME: str = config("MODEL_NAME") +# Type of the model, e.g. Transformers, Adapter, ... +# See square_model_inference.core.event_handlers.MODEL_MAPPING for all available names with corresponding model +MODEL_TYPE: str = config("MODEL_TYPE") + +# Disable CUDA even if available +DISABLE_GPU: bool = config("DISABLE_GPU", cast=bool, default=False) +# Batch size used for many inputs +BATCH_SIZE: int = config("BATCH_SIZE", cast=int, default=32) +# Inputs larger than this size are rejected +MAX_INPUT_SIZE: int = config("MAX_INPUT_SIZE", cast=int, default=1024) + +# Cache directory where model weights are stored +# This is the name for the env variable used by transformers and sentence-transformers package +TRANSFORMERS_CACHE: str = config("TRANSFORMERS_CACHE") + +# For MODEL_TYPE=transformers: decides the AutoModel* class used +# See square_model_inference.inference.transformer.CLASS_MAPPING for valid names and corresponding class +MODEL_CLASS: str = config("MODEL_CLASS", default="base") + +# Flag that decides if returned numpy arrays are returned +# as lists or encoded to base64 (smaller but not easily human readable). +# See the comment in square_model_inference.models.prediction._encode_numpy on information on how to decode +# the base64 string back to the numpy array +RETURN_PLAINTEXT_ARRAYS = config("RETURN_PLAINTEXT_ARRAYS", cast=bool, default=False) diff --git a/square-model-inference-api/inference_server/square_model_inference/core/event_handlers.py b/square-model-inference-api/inference_server/square_model_inference/core/event_handlers.py new file mode 100644 index 000000000..2a664f050 --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/core/event_handlers.py @@ -0,0 +1,58 @@ +from typing import Callable + +from fastapi import FastAPI + +from square_model_inference.inference.adaptertransformer import AdapterTransformer +from square_model_inference.core.config import MODEL_TYPE, MODEL_NAME, MODEL_CLASS, DISABLE_GPU, BATCH_SIZE, \ + TRANSFORMERS_CACHE, MAX_INPUT_SIZE +from square_model_inference.inference.sentencetransformer import SentenceTransformer +from square_model_inference.inference.transformer import Transformer + +import logging + +logger = logging.getLogger(__name__) + +MODEL_MAPPING = { + "adapter": AdapterTransformer, + "transformer": Transformer, + "sentence-transformer": SentenceTransformer +} + +MODEL_KWARGS = { + "model_name": MODEL_NAME, + "model_class": MODEL_CLASS, + "disable_gpu": DISABLE_GPU, + "batch_size": BATCH_SIZE, + "transformers_cache": TRANSFORMERS_CACHE, + "max_input_size": MAX_INPUT_SIZE +} + + +def _startup_model(app: FastAPI) -> None: + """ + Initialize the model used by the server and set it to the app state for global access + """ + if MODEL_TYPE not in MODEL_MAPPING: + raise RuntimeError(f"Unknown MODEL_MAPPING. Must be one of {MODEL_MAPPING.keys()}") + model_instance = MODEL_MAPPING[MODEL_TYPE](**MODEL_KWARGS) + app.state.model = model_instance + + +def _shutdown_model(app: FastAPI) -> None: + app.state.model = None + + +def start_app_handler(app: FastAPI) -> Callable: + def startup() -> None: + logger.info("Running app start handler.") + _startup_model(app) + + return startup + + +def stop_app_handler(app: FastAPI) -> Callable: + def shutdown() -> None: + logger.info("Running app shutdown handler.") + _shutdown_model(app) + + return shutdown diff --git a/square-model-inference-api/inference_server/square_model_inference/core/messages.py b/square-model-inference-api/inference_server/square_model_inference/core/messages.py new file mode 100644 index 000000000..b1251e7fa --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/core/messages.py @@ -0,0 +1 @@ +HTTP_500_DETAIL = "Internal server error." diff --git a/square-model-inference-api/inference_server/square_model_inference/inference/__init__.py b/square-model-inference-api/inference_server/square_model_inference/inference/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/square-model-inference-api/inference_server/square_model_inference/inference/adaptertransformer.py b/square-model-inference-api/inference_server/square_model_inference/inference/adaptertransformer.py new file mode 100644 index 000000000..2ad247089 --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/inference/adaptertransformer.py @@ -0,0 +1,101 @@ +from transformers import AutoModelWithHeads, list_adapters + +from square_model_inference.inference.transformer import Transformer +from square_model_inference.models.request import PredictionRequest, Task + +from square_model_inference.models.prediction import PredictionOutput + +import logging + +logger = logging.getLogger(__name__) + +class AdapterTransformer(Transformer): + """ + The class for all adapter-based models using the adapter-transformers package + """ + def __init__(self, model_name, batch_size, disable_gpu, transformers_cache, max_input_size, **kwargs): + """ + Initialize the Adapter with its underlying Transformer and pre-load all available adapters from adapterhub.ml + :param model_name: the Huggingface model name + :param batch_size: batch size used for inference + :param disable_gpu: do not move model to GPU even if CUDA is available + :param transformers_cache: Should be same as TRANSFORMERS_CACHE env variable. + This folder will be used to store the adapters + :param max_input_size: requests with a larger input are rejected + :param kwargs: Not used + """ + self._load_model(AutoModelWithHeads, model_name, disable_gpu) + self._load_adapter(model_name, transformers_cache) + self.batch_size = batch_size + self.max_input_size = max_input_size + + def _load_adapter(self, model_name, transformers_cache): + """ + Pre-load all available adapters for MODEL_NAME from adapterhub.ml. + We parse the hub index to extract all names and then load each model. + """ + logger.info("Loading all available adapters") + adapter_infos = [info for info in list_adapters(source="ah") if info.model_name==model_name] + adapters = set(f"{adapter_info.task}/{adapter_info.subtask}@{adapter_info.username}" for adapter_info in adapter_infos) + for adapter in adapters: + logger.debug(f"Loading adapter {adapter}") + try: + self.model.load_adapter(adapter, load_as=adapter, with_head=True, cache_dir=transformers_cache) + except RuntimeError as e: + if "Error(s) in loading state_dict" in e.args[0]: + logger.debug(f"Could not load {adapter} due to missing label_ids in config resulting in exception:\n{e.args[0]}") + else: + raise e + # Move all freshly loaded adapter weights to the same device as the model + self.model.to(self.model.device) + +# def _load_single_adapter(self, adapter_name: str): + # if adapter_name not in self.model.config.adapters.adapters: + # logger.info(f"Loading new adapter {adapter_name}") + # self.model.load_adapter(adapter_name, with_head=True, load_as=adapter_name) + # else: + # logger.debug(f"Adapter {adapter_name} is already loaded. Not loading again") + + def _token_classification(self, request: PredictionRequest) -> PredictionOutput: + # We only have to change the label2id mapping from config.label2id (what super() uses) to the mapping + # of the chosen head + prediction = super()._token_classification(request) + + label2id = self.model.config.prediction_heads[request.adapter_name]["label2id"] + id2label = {v:k for k,v in label2id.items()} + prediction.id2label = id2label + + return prediction + + def _sequence_classification(self, request: PredictionRequest) -> PredictionOutput: + # We only have to change the label2id mapping from config.label2id (what super() uses) to the mapping + # of the chosen head + prediction = super()._sequence_classification(request) + + label2id = self.model.config.prediction_heads[request.adapter_name]["label2id"] + id2label = {v:k for k,v in label2id.items()} + prediction.id2label = id2label + + return prediction + + async def predict(self, request: PredictionRequest, task: Task) -> PredictionOutput: + if request.is_preprocessed: + raise ValueError("is_preprocessed=True is not supported for this model. Please use text as input.") + if len(request.input) > self.max_input_size: + raise ValueError(f"Input is too large. Max input size is {self.max_input_size}") + if not request.adapter_name or request.adapter_name not in self.model.config.adapters.adapters: + raise ValueError(f"Unknown or missing adapter {request.adapter_name}. " + f"Please provider a fully specified adapter name from adapterhub.ml") + self.model.set_active_adapters(request.adapter_name) + + if task == Task.sequence_classification: + return self._sequence_classification(request) + elif task == Task.token_classification: + return self._token_classification(request) + elif task == Task.question_answering: + return self._question_answering(request) + elif task == Task.embedding: + return self._embedding(request) + elif task == Task.generation: + return self._generation(request) + diff --git a/square-model-inference-api/inference_server/square_model_inference/inference/model.py b/square-model-inference-api/inference_server/square_model_inference/inference/model.py new file mode 100644 index 000000000..f8fbed524 --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/inference/model.py @@ -0,0 +1,19 @@ +from square_model_inference.models.request import PredictionRequest, Task +from square_model_inference.models.prediction import PredictionOutput + + +class Model: + """ + Base class for all models. + __init__ is supposed to load all weights and other necessary files (e.g. tokenizer) + so that the model can directly perform inference on request + """ + async def predict(self, payload: PredictionRequest, task: Task) -> PredictionOutput: + """ + Take an input, pre-process it accordingly, perform inference according to the task, + post-process the result and return it + :param payload: the prediction request containing the input and any other parameters required + :param task: The task that the model should perform with the payload + :return: the result of the prediction + """ + raise NotImplementedError diff --git a/square-model-inference-api/inference_server/square_model_inference/inference/sentencetransformer.py b/square-model-inference-api/inference_server/square_model_inference/inference/sentencetransformer.py new file mode 100644 index 000000000..b07de3bd0 --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/inference/sentencetransformer.py @@ -0,0 +1,55 @@ +import torch +from sentence_transformers import SentenceTransformer as SentenceTransformerModel + +from square_model_inference.inference.model import Model +from square_model_inference.models.request import PredictionRequest, Task + +from square_model_inference.models.prediction import PredictionOutput, PredictionOutputForEmbedding + +import logging + +logger = logging.getLogger(__name__) + +class SentenceTransformer(Model): + """ + The class for all sentence-transformers models + """ + + def __init__(self, model_name, batch_size, disable_gpu, max_input_size, **kwargs): + """ + Initialize the SentenceTransformer + :param model_name: the sentence-transformer model name (https://sbert.net/docs/pretrained_models.html) + :param batch_size: batch size used for inference + :param disable_gpu: do not move model to GPU even if CUDA is available + :param max_input_size: requests with a larger input are rejected + :param kwargs: Not used + """ + self._load_model(model_name, disable_gpu) + self.batch_size = batch_size + self.max_input_size = max_input_size + + def _load_model(self, model_name, disable_gpu): + """ + Load the Transformer model model_name and its tokenizer with Huggingface. + Model will be moved to GPU unless CUDA is unavailable or disable_gpu is true. + """ + logger.debug(f"Loading model {model_name}") + device = "cuda" if torch.cuda.is_available() and not disable_gpu else "cpu" + model = SentenceTransformerModel(model_name_or_path=model_name, device=device) + logger.info(f"Model {model_name} loaded on {device}") + self.model = model + + def _embedding(self, request: PredictionRequest) -> PredictionOutput: + embeddings = self.model.encode(request.input, batch_size=self.batch_size, show_progress_bar=False) + return PredictionOutputForEmbedding(model_outputs={"embeddings": embeddings}) + + + async def predict(self, request: PredictionRequest, task: Task) -> PredictionOutput: + if request.is_preprocessed: + raise ValueError("is_preprocessed=True is not supported for this model. Please use text as input.") + if len(request.input) > self.max_input_size: + raise ValueError(f"Input is too large. Max input size is {self.max_input_size}") + if task != Task.embedding: + raise ValueError("Only embedding task supported by this model") + return self._embedding(request) + diff --git a/square-model-inference-api/inference_server/square_model_inference/inference/transformer.py b/square-model-inference-api/inference_server/square_model_inference/inference/transformer.py new file mode 100644 index 000000000..5b013d8a1 --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/inference/transformer.py @@ -0,0 +1,333 @@ +import json +from collections import defaultdict +from typing import Union, Tuple + +import torch +import numpy as np +from transformers import AutoTokenizer, AutoModel, AutoModelForSequenceClassification, \ + AutoModelForTokenClassification, AutoModelForQuestionAnswering, AutoModelForCausalLM + +from square_model_inference.inference.model import Model +from square_model_inference.models.request import PredictionRequest, Task + +from square_model_inference.models.prediction import PredictionOutput, PredictionOutputForSequenceClassification, PredictionOutputForTokenClassification, \ + PredictionOutputForQuestionAnswering, PredictionOutputForGeneration, PredictionOutputForEmbedding + +import logging + +logger = logging.getLogger(__name__) + +CLASS_MAPPING = { + "base": AutoModel, + "sequence_classification": AutoModelForSequenceClassification, + "token_classification": AutoModelForTokenClassification, + "question_answering": AutoModelForQuestionAnswering, + "generation": AutoModelForCausalLM +} + +class Transformer(Model): + """ + The class for all Huggingface transformer-based models + """ + SUPPORTED_EMBEDDING_MODES = ["mean", "max", "cls", "token", "pooler"] + + def __init__(self, model_name, model_class, batch_size, disable_gpu, max_input_size, **kwargs): + """ + Initialize the Transformer + :param model_name: the Huggingface model name + :param model_class: the class name (according to CLASS_MAPPING) to use + :param batch_size: batch size used for inference + :param disable_gpu: do not move model to GPU even if CUDA is available + :param max_input_size: requests with a larger input are rejected + :param kwargs: Not used + """ + if model_class not in CLASS_MAPPING: + raise RuntimeError(f"Unknown MODEL_CLASS. Must be one of {CLASS_MAPPING.keys()}") + self._load_model(CLASS_MAPPING[model_class], model_name, disable_gpu) + self.batch_size = batch_size + self.max_input_size = max_input_size + + def _load_model(self, model_cls, model_name, disable_gpu): + """ + Load the Transformer model model_name and its tokenizer with Huggingface. + Model will be moved to GPU unless CUDA is unavailable or disable_gpu is true. + """ + logger.debug(f"Loading model {model_name}") + tokenizer = AutoTokenizer.from_pretrained(model_name) + # Check if GPU is available + device = "cuda" if torch.cuda.is_available() and not disable_gpu else "cpu" + model = model_cls.from_pretrained(model_name).to(device) + logger.info(f"Model {model_name} loaded on {device}") + + self.model = model + self.tokenizer = tokenizer + + def _ensure_tensor_on_device(self, **inputs): + """ + Ensure PyTorch tensors are on the specified device. + + Args: + inputs (keyword arguments that should be :obj:`torch.Tensor`): The tensors to place on :obj:`self.device`. + + Return: + :obj:`Dict[str, torch.Tensor]`: The same as :obj:`inputs` but on the proper device. + """ + return {name: tensor.to(self.model.device) for name, tensor in inputs.items()} + + def _predict(self, request: PredictionRequest, output_features=False) \ + -> Union[dict, Tuple[dict, dict]]: + """ + Inference on the input. + :param request: the request with the input and optional kwargs + :param output_features: return the features of the input. + Necessary if, e.g., attention mask is needed for post-processing. + :return: The model outputs and optionally the input features + """ + all_predictions = [] + request.preprocessing_kwargs["padding"] = request.preprocessing_kwargs.get("padding", True) + request.preprocessing_kwargs["truncation"] = request.preprocessing_kwargs.get("truncation", True) + features = self.tokenizer(request.input, + return_tensors="pt", + **request.preprocessing_kwargs) + for start_idx in range(0, len(request.input), self.batch_size): + with torch.no_grad(): + input_features = {k: features[k][start_idx:start_idx+self.batch_size] for k in features.keys()} + input_features = self._ensure_tensor_on_device(**input_features) + predictions = self.model(**input_features, **request.model_kwargs) + all_predictions.append(predictions) + keys = all_predictions[0].keys() + final_prediction = {} + for key in keys: + # HuggingFace outputs for 'attentions' and more is returned as tuple of tensors + # Tuple of tuples only exists for 'past_key_values' which is only relevant for generation. + # Generation should NOT use this function + if isinstance(all_predictions[0][key], tuple): + tuple_of_lists = list(zip(*[[p.cpu() for p in tpl[key]] for tpl in all_predictions])) + final_prediction[key] = tuple(torch.cat(l) for l in tuple_of_lists) + else: + final_prediction[key] = torch.cat([p[key].cpu() for p in all_predictions]) + if output_features: + return final_prediction, features + return final_prediction + + def _embedding(self, request: PredictionRequest) -> PredictionOutput: + request.model_kwargs["output_hidden_states"] = True + predictions, features = self._predict(request, output_features=True) + # We remove hidden_states from predictions! + hidden_state = predictions.pop("hidden_states")[-1] + attention_mask = features["attention_mask"] + + embedding_mode = request.task_kwargs.get("embedding_mode", "mean") + task_outputs = { + "embedding_mode": embedding_mode + } + + if embedding_mode not in self.SUPPORTED_EMBEDDING_MODES: + raise ValueError(f"Embedding mode {embedding_mode} not in list of supported modes {self.SUPPORTED_EMBEDDING_MODES}") + + if embedding_mode == "cls": + emb = hidden_state[:, 0, :] + elif embedding_mode == "pooler": + emb = predictions["pooler_output"] + # copied from sentence-transformers pooling + elif embedding_mode == "max": + input_mask_expanded = attention_mask.unsqueeze(-1).expand(hidden_state.size()).float() + hidden_state[input_mask_expanded == 0] = -1e9 # Set padding tokens to large negative value + emb = torch.max(hidden_state, 1)[0] + # copied from sentence-transformers pooling + elif embedding_mode == "mean": + input_mask_expanded = attention_mask.unsqueeze(-1).expand(hidden_state.size()).float() + sum_embeddings = torch.sum(hidden_state * input_mask_expanded, 1) + sum_mask = input_mask_expanded.sum(1) + emb = sum_embeddings / sum_mask + elif embedding_mode == "token": + emb = hidden_state + task_outputs["word_ids"] = [features.word_ids(i) for i in range(len(request.input))] + predictions["embeddings"] = emb + + return PredictionOutputForEmbedding(model_outputs=predictions, **task_outputs) + + def _token_classification(self, request: PredictionRequest) -> PredictionOutput: + predictions, features = self._predict(request, output_features=True) + # If logits dim > 1 or if the 'is_regression' flag is not set, we assume classification: + # We replace the logits by the softmax and add labels chosen with argmax + label2id = self.model.config.label2id + id2label = {v:k for k,v in label2id.items()} + task_outputs = { + "id2label": id2label, + "word_ids": [features.word_ids(i) for i in range(len(request.input))] + } + if predictions["logits"].size()[-1] != 1 and not request.task_kwargs.get("is_regression", False): + probabilities = torch.softmax(predictions["logits"], dim=-1) + predictions["logits"] = probabilities + task_outputs["labels"] = torch.argmax(predictions["logits"], dim=-1).tolist() + + return PredictionOutputForTokenClassification(model_outputs=predictions, **task_outputs) + + def _sequence_classification(self, request: PredictionRequest) -> PredictionOutput: + predictions = self._predict(request) + label2id = self.model.config.label2id + id2label = {v:k for k,v in label2id.items()} + task_outputs = { + "id2label": id2label + } + # If logits dim > 1 or if the 'is_regression' flag is not set, we assume classification: + # We replace the logits by the softmax and add labels chosen with argmax + if predictions["logits"].size()[-1] != 1 and not request.task_kwargs.get("is_regression", False): + probabilities = torch.softmax(predictions["logits"], dim=-1) + predictions["logits"] = probabilities + task_outputs["labels"] = torch.argmax(predictions["logits"], dim=-1).tolist() + + return PredictionOutputForSequenceClassification(model_outputs=predictions, **task_outputs) + + def _generation(self, request: PredictionRequest) -> PredictionOutput: + request.preprocessing_kwargs["padding"] = request.preprocessing_kwargs.get("padding", False) + request.preprocessing_kwargs["add_special_tokens"] = request.preprocessing_kwargs.get("add_special_tokens", False) + task_outputs = {"generated_texts": []} + model_outputs = defaultdict(list) + # We cannot batch generate so we have to to it separately for each input prompt. + for prompt in request.input: + features = self.tokenizer(prompt, return_tensors="pt", **request.preprocessing_kwargs) + input_ids = features["input_ids"] + input_ids = self._ensure_tensor_on_device(input_ids=input_ids)["input_ids"] + request.model_kwargs.update(request.task_kwargs) + request.model_kwargs["return_dict_in_generate"] = True + res = self.model.generate(input_ids, **request.model_kwargs) + + # put everything on CPU and add it to model_outputs + for key in res.keys(): + if isinstance(res[key], tuple): + if isinstance(res[key][0], tuple): + res[key] = tuple((tuple(tensor.cpu() for tensor in tpl)) for tpl in res[key]) + else: + res[key] = tuple(tensor.cpu() for tensor in res[key]) + else: + res[key] = res[key].cpu() + model_outputs[key].append(res[key]) + + generated_texts = [self.tokenizer.decode(seq, skip_special_tokens=True, + clean_up_tokenization_spaces=request.task_kwargs.get("clean_up_tokenization_spaces", False)) + for seq in res["sequences"]] + task_outputs["generated_texts"].append(generated_texts) + return PredictionOutputForGeneration(model_outputs=model_outputs, **task_outputs) + + def _question_answering(self, request: PredictionRequest) -> PredictionOutput: + """ + Span-based question answering for a given question and context. + + We expect the input to use the (question, context) format for the text pairs. + :param request: + :return: + """ + # Making heavy use of https://huggingface.co/transformers/_modules/transformers/pipelines/question_answering.html#QuestionAnsweringPipeline + def decode(start_: np.ndarray, end_: np.ndarray, topk: int, max_answer_len: int, undesired_tokens_: np.ndarray) -> Tuple: + """ + Take the output of any :obj:`ModelForQuestionAnswering` and will generate probabilities for each span to be the + actual answer. + + In addition, it filters out some unwanted/impossible cases like answer len being greater than max_answer_len or + answer end position being before the starting position. The method supports output the k-best answer through + the topk argument. + + Args: + start_ (:obj:`np.ndarray`): Individual start probabilities for each token. + end (:obj:`np.ndarray`): Individual end_ probabilities for each token. + topk (:obj:`int`): Indicates how many possible answer span(s) to extract from the model output. + max_answer_len (:obj:`int`): Maximum size of the answer to extract from the model's output. + undesired_tokens_ (:obj:`np.ndarray`): Mask determining tokens that can be part of the answer + """ + # Ensure we have batch axis + if start_.ndim == 1: + start_ = start_[None] + + if end_.ndim == 1: + end_ = end_[None] + + # Compute the score of each tuple(start_, end_) to be the real answer + outer = np.matmul(np.expand_dims(start_, -1), np.expand_dims(end_, 1)) + + # Remove candidate with end_ < start_ and end_ - start_ > max_answer_len + candidates = np.tril(np.triu(outer), max_answer_len - 1) + + # Inspired by Chen & al. (https://github.com/facebookresearch/DrQA) + scores_flat = candidates.flatten() + if topk == 1: + idx_sort = [np.argmax(scores_flat)] + elif len(scores_flat) < topk: + idx_sort = np.argsort(-scores_flat) + else: + idx = np.argpartition(-scores_flat, topk)[0:topk] + idx_sort = idx[np.argsort(-scores_flat[idx])] + + starts_, ends_ = np.unravel_index(idx_sort, candidates.shape)[1:] + desired_spans = np.isin(starts_, undesired_tokens_.nonzero()) & np.isin(ends_, undesired_tokens_.nonzero()) + starts_ = starts_[desired_spans] + ends_ = ends_[desired_spans] + scores_ = candidates[0, starts_, ends_] + + return starts_, ends_, scores_ + + request.preprocessing_kwargs["truncation"] = "only_second" + predictions, features = self._predict(request, output_features=True) + + task_outputs = {"answers": []} + for idx, (start, end, (_, context)) in enumerate(zip(predictions["start_logits"], predictions["end_logits"], request.input)): + start = start.numpy() + end = end.numpy() + # Ensure padded tokens & question tokens cannot belong to the set of candidate answers. + question_tokens = np.abs(np.array([s != 1 for s in features.sequence_ids(idx)]) - 1) + # Unmask CLS token for 'no answer' + question_tokens[0] = 1 + undesired_tokens = question_tokens & features["attention_mask"][idx].numpy() + + # Generate mask + undesired_tokens_mask = undesired_tokens == 0.0 + + # Make sure non-context indexes in the tensor cannot contribute to the softmax + start = np.where(undesired_tokens_mask, -10000.0, start) + end = np.where(undesired_tokens_mask, -10000.0, end) + + start = np.exp(start - np.log(np.sum(np.exp(start), axis=-1, keepdims=True))) + end = np.exp(end - np.log(np.sum(np.exp(end), axis=-1, keepdims=True))) + + # Get score for 'no answer' then mask for decoding step (CLS token + no_answer_score = (start[0] * end[0]).item() + start[0] = end[0] = 0.0 + + starts, ends, scores = decode( + start, end, request.task_kwargs.get("topk", 1), request.task_kwargs.get("max_answer_len", 128), undesired_tokens + ) + enc = features[idx] + answers = [ + { + "score": score.item(), + "start": enc.word_to_chars( + enc.token_to_word(s), sequence_index=1)[0], + "end": enc.word_to_chars(enc.token_to_word(e), sequence_index=1)[1], + "answer": context[ + enc.word_to_chars(enc.token_to_word(s), sequence_index=1)[0] : + enc.word_to_chars(enc.token_to_word(e), sequence_index=1)[1]], + } + for s, e, score in zip(starts, ends, scores)] + answers.append({"score": no_answer_score, "start": 0, "end": 0, "answer": ""}) + answers = sorted(answers, key=lambda x: x["score"], reverse=True)[: request.task_kwargs.get("topk", 1)] + task_outputs["answers"].append(answers) + return PredictionOutputForQuestionAnswering(model_outputs=predictions, **task_outputs) + + async def predict(self, request: PredictionRequest, task: Task) -> PredictionOutput: + if request.is_preprocessed: + raise ValueError("is_preprocessed=True is not supported for this model. Please use text as input.") + if len(request.input) > self.max_input_size: + raise ValueError(f"Input is too large. Max input size is {self.max_input_size}") + + if task == Task.sequence_classification: + return self._sequence_classification(request) + elif task == Task.token_classification: + return self._token_classification(request) + elif task == Task.embedding: + return self._embedding(request) + elif task == Task.question_answering: + return self._question_answering(request) + elif task == Task.generation: + return self._generation(request) + diff --git a/square-model-inference-api/inference_server/square_model_inference/models/__init__.py b/square-model-inference-api/inference_server/square_model_inference/models/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/square-model-inference-api/inference_server/square_model_inference/models/heartbeat.py b/square-model-inference-api/inference_server/square_model_inference/models/heartbeat.py new file mode 100644 index 000000000..55f1fea20 --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/models/heartbeat.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class HeartbeatResult(BaseModel): + is_alive: bool diff --git a/square-model-inference-api/inference_server/square_model_inference/models/prediction.py b/square-model-inference-api/inference_server/square_model_inference/models/prediction.py new file mode 100644 index 000000000..f6a10712f --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/models/prediction.py @@ -0,0 +1,160 @@ +from typing import Dict, Union, Tuple, List, Optional, Iterable + +import torch +from io import BytesIO +import base64 +import numpy as np +from pydantic import Field, BaseModel +from square_model_inference.core.config import RETURN_PLAINTEXT_ARRAYS + + +def _encode_numpy(obj: Dict[str, Union[torch.Tensor, Tuple[torch.Tensor]]], return_plaintext: bool=RETURN_PLAINTEXT_ARRAYS) -> Dict[str, Union[list, str]]: + """ + Encodes the Torch Tensors first to Numpy arrays and then encodes them either as plain lists or base64 string + depending on the flag RETURN_PLAINTEXT_ARRAYS + :param obj: the objects whose tensors will be encoded + :return: the same dictionary with all tensors replaced by lists or base64-encoded array strings. + """ + # Encode numpy array either as lists or base64 string + def encode(arr): + if isinstance(arr, torch.Tensor): + arr = arr.numpy() + if return_plaintext: + return arr.tolist() + else: + # np.save expects a file which we emulate with BytesIO + with BytesIO() as b: + np.save(b, arr) + arr_binary = b.getvalue() + arr_binary_b64 = base64.b64encode(arr_binary) + arr_string_b64 = arr_binary_b64.decode("latin1") + return arr_string_b64 + + # DECODE THE VALUE WITH + # arr_binary_b64 = arr_string_b64.encode() + # arr_binary = base64.decodebytes(arr_binary_b64) + # arr = np.load(BytesIO(arr_binary)) + + # Recursively go through a value and encode leaves (=tensors) it or iterate over values and encode them + def enc_or_iterate(val): + # Stop attempt to encode an already encoded array + # This can happen because PredictionOutput is initialized twice + # - once by Model and once when request response is serialized by fastAPI + if isinstance(val, int) or isinstance(val, float) or isinstance(val, str): + raise ValueError("Array is already encoded") + if isinstance(val, Iterable) and not isinstance(val, torch.Tensor) and not isinstance(val, np.ndarray): + return [enc_or_iterate(v) for v in val] + else: + return encode(val) + + for k, v in obj.items(): + try: + v = enc_or_iterate(v) + except ValueError: + break + obj[k] = v + return obj + + +class PredictionOutput(BaseModel): + """ + The results of the prediction of the model on the given input for the requested task. + """ + model_outputs: Dict = Field( + {}, + description="Dictionary containing the model tensor outputs either as plain list or as base64-encoded numpy array.

" + "Decode the base64 string 'arr_string_b64' back to an array in Python like this:
" + "arr_binary_b64 = arr_string_b64.encode()
" + "arr_binary = base64.decodebytes(arr_binary_b64)
" + "arr = np.load(BytesIO(arr_binary))

" + "SentenceTransformer:
" + "'embedding':
" + "- 'embeddings: Embedding tensors.
" + "Transformer/ Adapter:
" + "Optional tensor depend on request's 'model_kwargs' parameters, e.g. 'output_attentions'. " + "See the Huggingface documentation for information like shape etc.
" + + "'sentence_classification':
" + "- 'logits': (Softmax) logits of the classifier.
" + "'token_classification':
" + "- 'logits': (Softmax) logits of the classifier.
" + "'embedding':
" + "- 'embeddings: Embedding tensors.
" + "'question_answering':
" + "- 'start_logits': Logits for the beginning of the span
" + "- 'end_logits': Logits for the end of the span
" + "'generation':
" + "- 'sequences': The generated vocab ids for the sequence
" + "Task 'generation' does not concatenate the tensors of the inputs together but instead creates a list" + "of the tensors for each input." + ) + model_output_is_encoded: bool = Field( + not RETURN_PLAINTEXT_ARRAYS, + description="Flag indicating that 'model_output' is a base64-encoded numpy array and not a human-readable list." + "See the field description for 'model_output' on information on how to decode the array.") + + def __init__(self, **data): + """ + Data model for the model and task outputs. + The model outputs (,i.e., tensors) will be encoded as base64 strings or as plain lists depending on the flag + RETURN_PLAINTEXT_ARRAYS. + :param data: + 'model_outputs': dict[str: Union[torch.Tensor, Tuple[torch.Tensor]]]. All tensor results of the model + 'task_outputs': dict[str: Any]. All non-tensor results of the processed task like the predicted labels, extracted spans, etc. + """ + super().__init__(**data) + self.model_outputs = _encode_numpy(self.model_outputs) + + +class PredictionOutputForSequenceClassification(PredictionOutput): + labels: List[int] = Field([], description="List of the predicted label ids for the input. Not set for regression.") + id2label: Dict[int, str] = Field({}, description="Mapping from label id to the label name. Not set for regression.") + + def __init__(self, **data): + super().__init__(**data) + + +class PredictionOutputForTokenClassification(PredictionOutput): + labels: List[List[int]] = Field([], description="List of the predicted label ids for the input. Not set for regression.") + id2label: Dict[int, str] = Field({}, description="Mapping from label id to the label name. Not set for regression.") + word_ids: List[List[Optional[int]]] = Field(..., description="Mapping from each token to the corresponding word in the input. " + "'None' represents special tokens added by tokenizer") + def __init__(self, **data): + super().__init__(**data) + + +class PredictionOutputForEmbedding(PredictionOutput): + embedding_mode: str = Field("", description="Only used by Transformers/ Adapters.
One of 'mean', 'max', 'cls', 'pooler', 'token'. The pooling mode used (or not used for 'token')") + word_ids: List[List[Optional[int]]] = Field([], description="Only used by Transformers/ Adapters.
" + "Only set with embedding_mode='token'." + " Mapping from each token to the corresponding word in the input. " + "'None' represents special tokens added by tokenizer") + def __init__(self, **data): + super().__init__(**data) + + +class PredictionOutputForGeneration(PredictionOutput): + generated_texts: List[List[str]] = Field(..., description="List of list of the generated texts. Length of outer list is the number of inputs, " + "length of inner list is parameter 'num_return_sequences' in request's 'task_kwargs'") + def __init__(self, **data): + super().__init__(**data) + + +class QAAnswer(BaseModel): + """ + A span answer for question_answering with a score, the start and end character index and the extracted span answer. + """ + score: float + start: int + end: int + answer: str + + +class PredictionOutputForQuestionAnswering(PredictionOutput): + answers: List[List[QAAnswer]] = Field(..., description="List of lists of answers. Length of outer list is the number of inputs, " + "length of inner list is parameter 'topk' from the request's 'task_kwargs' (default 1). " + "Each answer is a dictionary with 'score', 'start' (span start index in context), 'end' (span end index in context), " + "and 'answer' (the extracted span). The inner list is sorted by score. If no answer span was extracted, " + "the empty span is returned (start and end both 0)") + def __init__(self, **data): + super().__init__(**data) \ No newline at end of file diff --git a/square-model-inference-api/inference_server/square_model_inference/models/request.py b/square-model-inference-api/inference_server/square_model_inference/models/request.py new file mode 100644 index 000000000..89678d9a6 --- /dev/null +++ b/square-model-inference-api/inference_server/square_model_inference/models/request.py @@ -0,0 +1,80 @@ +from enum import Enum +from typing import Union, List, Optional + +from pydantic import BaseModel, Field + + +class Task(str, Enum): + """ + The available tasks that can be handled. Note that most model actually supports only one task. + Requesting a task that a model cannot handle might fail or produce incorrect results. + """ + sequence_classification = "sequence_classification" + token_classification = "token_classification" + question_answering = "question_answering" + embedding = "embedding" + generation = "generation" + + +class PredictionRequest(BaseModel): + """ + Prediction request containing the input, pre-processing parameters, parameters for the model forward pass, + the task with task-specific parameters, and parameters for any post-processing + """ + input: Union[List[str], List[List[str]], dict] = Field( + ..., + description="Input for the model. Supports Huggingface Transformer inputs (i.e., list of sentences, or " + "list of pairs of sentences), a dictionary with Transformer inputs, or a dictionary containing " + "numpy arrays (as lists). For the numpy arrays, also set is_preprocessed=True. " + "

" + "Transformer/ Adapter:
" + "Task 'question_answering' expects the input to be in the (question, context) format." + ) + is_preprocessed: bool = Field( + default=False, + description="Flag indicating that the input contains already pre-processed numpy arrays " + "as list and that it needs no further pre-processing.

" + "Transformer/ Adapter/ SentenceTransformer: 'is_preprocessed' is not supported." + ) + preprocessing_kwargs: dict = Field( + default={}, + description="Optional dictionary containing additional parameters for the pre-processing step.

" + "SentenceTransformer: This is ignored.
" + "Transformer/ Adapter: See the Huggingface tokenizer for possible parameters." + ) + model_kwargs: dict = Field( + default={}, + description="Optional dictionary containing parameters that are passed to the model for the forward pass " + "to control what additional tensors are returned.

" + "SentenceTransformer: This is ignored.
" + "Transformer/ Adapter: See the forward method of the Huggingface models for possible parameters" + "For example, set ‘output_attentions=True’ to receive the attention results in the output." + ) + #task: Task = Field(...) + task_kwargs: dict = Field( + default={}, + description="Optional dictionary containing additional parameters for handling of the task and " + "task-related post-processing.

" + "SentenceTransformer: This is ignored.
" + "Transformer/ Adapter:
" + "'sentence_classification':
" + "- 'is_regression': Flag to treat output of models with num_labels>1 as regression, too, i.e., no softmax and no labels are returned
" + "'token_classification':
" + "- 'is_regression': Flag to treat output of models with num_labels>1 as regression, too, i.e., no softmax and no labels are returned
" + "'embedding':
" + "- 'embedding_mode: One of 'mean', 'max', 'cls', 'pooler', 'token'. The pooling mode used (or not used for 'token'). " + "'pooler' uses the pooler_output of a Transformer, i.e. the processed CLS token. Default value 'mean'.
" + "'question_answering':
" + "- 'topk': Return the top-k most likely spans. Default 1.
" + "- 'max_answer_len': Maximal token length of answers. Default 128.
" + "'generation':
" + "- 'clean_up_tokenization_spaces': See parameter in Huggingface tokenizer.decode(). Default False
" + "- See Huggingface model.generate() for all possible parameters that can be used. " + "Note, 'model_kwargs' and 'task_kwargs' are merged for generation." + + ) + adapter_name: Optional[str] = Field( + default="", + description="Only necessary for Adapter. " + "The fully specified name of the to-be-used adapter from adapterhub.ml" + ) \ No newline at end of file diff --git a/square-model-inference-api/inference_server/tests/conftest.py b/square-model-inference-api/inference_server/tests/conftest.py new file mode 100644 index 000000000..fc8984e78 --- /dev/null +++ b/square-model-inference-api/inference_server/tests/conftest.py @@ -0,0 +1,99 @@ +import pytest + +from fastapi.testclient import TestClient +from pre_test_setup_for_docker_caching import TRANSFORMERS_TESTING_CACHE, TRANSFORMER_MODEL, SENTENCE_MODEL +import torch + +## Due to import and config reasons, the environ is set in pre_test_setup_for_docker_caching ! +## (because we import Transformer, which imports Model, imports PredictionOutput, which imports RETURN_PLAINTEXT_ARRAYS and this creates the starlette config. +## The config is read by this point and starlette forbids overwriting it then) +# from starlette.config import environ +# environ["TRANSFORMERS_CACHE"] = TRANSFORMERS_TESTING_CACHE +# environ["MODEL_NAME"] = "test" +# environ["MODEL_TYPE"] = "test" +# environ["DISABLE_GPU"] = "True" +# environ["BATCH_SIZE"] = "1" +# environ["RETURN_PLAINTEXT_ARRAYS"] = "False" + +from main import get_app +from square_model_inference.inference.model import Model +from square_model_inference.models.prediction import PredictionOutput, PredictionOutputForGeneration, \ + PredictionOutputForEmbedding, PredictionOutputForTokenClassification, PredictionOutputForSequenceClassification, PredictionOutputForQuestionAnswering +from square_model_inference.models.request import PredictionRequest, Task +from square_model_inference.inference.transformer import Transformer +from square_model_inference.inference.adaptertransformer import AdapterTransformer +from square_model_inference.inference.sentencetransformer import SentenceTransformer + + +@pytest.fixture(scope="session") +def test_app(): + app = get_app() + app.state.model = TestModel() + return app + + +class TestModel(Model): + async def predict(self, payload, task) -> PredictionOutput: + if task == Task.generation: + return PredictionOutputForGeneration(generated_texts=[[""]]) + elif task == Task.question_answering: + return PredictionOutputForQuestionAnswering(answers=[[{"score": 0, "start": 0, "end": 0, "answer": ""}]]) + elif task == Task.embedding: + return PredictionOutputForEmbedding(word_ids=[[0]]) + elif task == Task.token_classification: + return PredictionOutputForTokenClassification(word_ids=[[0]]) + elif task == Task.sequence_classification: + return PredictionOutputForSequenceClassification() + + +# We only load bert-base-uncased, so we fix the random seed to always get the same randomly generated heads on top +@pytest.fixture(scope="class") +def test_transformer_sequence_classification(): + torch.manual_seed(987654321) + return Transformer(TRANSFORMER_MODEL, "sequence_classification", 1, True, 50) + + +@pytest.fixture(scope="class") +def test_transformer_embedding(): + torch.manual_seed(987654321) + return Transformer(TRANSFORMER_MODEL, "base", 1, True, 50) + + +@pytest.fixture(scope="class") +def test_transformer_token_classification(): + torch.manual_seed(987654321) + return Transformer(TRANSFORMER_MODEL, "token_classification", 1, True, 50) + + +@pytest.fixture(scope="class") +def test_transformer_question_answering(): + torch.manual_seed(987654321) + return Transformer(TRANSFORMER_MODEL, "question_answering", 1, True, 50) + + +@pytest.fixture(scope="class") +def test_transformer_generation(): + torch.manual_seed(987654321) + return Transformer(TRANSFORMER_MODEL, "generation", 1, True, 50) + + +@pytest.fixture(scope="class") +def test_adapter(): + return AdapterTransformer(TRANSFORMER_MODEL, 1, True, TRANSFORMERS_TESTING_CACHE, 50) + + +@pytest.fixture(scope="class") +def test_sentence_transformer(): + return SentenceTransformer(SENTENCE_MODEL, 1, True, 50) + +@pytest.fixture() +def prediction_request(): + request = PredictionRequest.parse_obj({ + "input": ["test"], + "is_preprocessed": False, + "preprocessing_kwargs": {}, + "model_kwargs": {}, + "task_kwargs": {}, + "adapter_name": "" + }) + return request \ No newline at end of file diff --git a/square-model-inference-api/inference_server/tests/pre_test_setup_for_docker_caching.py b/square-model-inference-api/inference_server/tests/pre_test_setup_for_docker_caching.py new file mode 100644 index 000000000..8460db236 --- /dev/null +++ b/square-model-inference-api/inference_server/tests/pre_test_setup_for_docker_caching.py @@ -0,0 +1,49 @@ +# This file is used for the Docker image to cache long-running setup for the tests, +# i.e., downloading finetuned Transformer models and so on. +# This way, adding new tests or even changing the server code does NOT trigger a new download during building +from sentence_transformers import SentenceTransformer +from starlette.config import environ +from transformers import AutoTokenizer, AutoModelWithHeads, list_adapters +import logging + +logger = logging.getLogger(__name__) + +TRANSFORMERS_TESTING_CACHE = "./.model_testing_cache" +environ["TRANSFORMERS_CACHE"] = TRANSFORMERS_TESTING_CACHE +environ["MODEL_NAME"] = "test" +environ["MODEL_TYPE"] = "test" +environ["DISABLE_GPU"] = "True" +environ["BATCH_SIZE"] = "1" +environ["RETURN_PLAINTEXT_ARRAYS"] = "True" +environ["MAX_INPUT_SIZE"] = "100" + +# Downloaded models: +TRANSFORMER_MODEL = "bert-base-uncased" +SENTENCE_MODEL = "paraphrase-albert-small-v2" + +if __name__ == "__main__": + # We pre-download all models needed for the tests. + # We have to be careful to NOT import anything from square_model_inference because this script runs in the Dockerfile + # BEFORE any other of our code is copied (so that we do not have to re-download after every code change). + device = "cpu" + + # Pre-download Huggingface model for tests + _ = AutoTokenizer.from_pretrained(TRANSFORMER_MODEL) + model = AutoModelWithHeads.from_pretrained(TRANSFORMER_MODEL).to(device) + + # Pre-download adapters + logger.info("Loading all available adapters") + adapter_infos = [info for info in list_adapters(source="ah") if info.model_name==TRANSFORMER_MODEL] + adapters = set(f"{adapter_info.task}/{adapter_info.subtask}@{adapter_info.username}" for adapter_info in adapter_infos) + for adapter in adapters: + logger.debug(f"Loading adapter {adapter}") + try: + model.load_adapter(adapter, load_as=adapter, with_head=True, cache_dir=TRANSFORMERS_TESTING_CACHE) + except RuntimeError as e: + if "Error(s) in loading state_dict" in e.args[0]: + logger.debug(f"Could not load {adapter} due to missing label_ids in config resulting in exception:\n{e.args[0]}") + else: + raise(e) + + # Pre-download sentence-transformer models for tests + _ = SentenceTransformer(model_name_or_path=SENTENCE_MODEL, device=device) diff --git a/square-model-inference-api/inference_server/tests/test_api/test_heartbeat.py b/square-model-inference-api/inference_server/tests/test_api/test_heartbeat.py new file mode 100644 index 000000000..eade6ef16 --- /dev/null +++ b/square-model-inference-api/inference_server/tests/test_api/test_heartbeat.py @@ -0,0 +1,14 @@ +from starlette.testclient import TestClient + + +def test_heartbeat(test_app) -> None: + test_client = TestClient(test_app) + response = test_client.get("/api/health/heartbeat") + assert response.status_code == 200 + assert response.json() == {"is_alive": True} + + +def test_default_route(test_app) -> None: + test_client = TestClient(test_app) + response = test_client.get("/") + assert response.status_code == 404 diff --git a/square-model-inference-api/inference_server/tests/test_api/test_prediction.py b/square-model-inference-api/inference_server/tests/test_api/test_prediction.py new file mode 100644 index 000000000..77767924a --- /dev/null +++ b/square-model-inference-api/inference_server/tests/test_api/test_prediction.py @@ -0,0 +1,106 @@ +from starlette.testclient import TestClient + + +def test_api_sequence_classification(test_app) -> None: + test_client = TestClient(test_app) + response = test_client.post( + "/api/sequence-classification", + json={ + "input": [ + "this is a test" + ], + "is_preprocessed": False, + "preprocessing_kwargs": {}, + "model_kwargs": {}, + "task_kwargs": {}, + "adapter_name": "" + } + ) + assert response.status_code == 200 + + +def test_api_sequence_classification_malformed_input(test_app) -> None: + test_client = TestClient(test_app, raise_server_exceptions=False) + response = test_client.post( + "/api/sequence-classification", json={ + # "input": [ + # "this is a test" + # ], + "is_preprocessed": False, + "preprocessing_kwargs": {}, + "task_kwargs": {}, + "adapter_name": "" + } + ) + assert response.status_code == 422 + + +def test_api_token_classification(test_app) -> None: + test_client = TestClient(test_app) + response = test_client.post( + "/api/token-classification", + json={ + "input": [ + "this is a test" + ], + "is_preprocessed": False, + "preprocessing_kwargs": {}, + "model_kwargs": {}, + "task_kwargs": {}, + "adapter_name": "" + } + ) + assert response.status_code == 200 + +def test_api_embedding(test_app) -> None: + test_client = TestClient(test_app) + response = test_client.post( + "/api/embedding", + json={ + "input": [ + "this is a test" + ], + "is_preprocessed": False, + "preprocessing_kwargs": {}, + "model_kwargs": {}, + "task_kwargs": {}, + "adapter_name": "" + } + ) + assert response.status_code == 200 + + +def test_api_question_answering(test_app) -> None: + test_client = TestClient(test_app) + response = test_client.post( + "/api/question-answering", + json={ + "input": [ + "this is a test" + ], + "is_preprocessed": False, + "preprocessing_kwargs": {}, + "model_kwargs": {}, + "task_kwargs": {}, + "adapter_name": "" + } + ) + assert response.status_code == 200 + + +def test_api_generation(test_app) -> None: + test_client = TestClient(test_app) + response = test_client.post( + "/api/generation", + json={ + "input": [ + "this is a test" + ], + "is_preprocessed": False, + "preprocessing_kwargs": {}, + "model_kwargs": {}, + "task_kwargs": {}, + "adapter_name": "" + } + ) + assert response.status_code == 200 \ No newline at end of file diff --git a/square-model-inference-api/inference_server/tests/test_inference/test_adapter.py b/square-model-inference-api/inference_server/tests/test_inference/test_adapter.py new file mode 100644 index 000000000..b379c3e2a --- /dev/null +++ b/square-model-inference-api/inference_server/tests/test_inference/test_adapter.py @@ -0,0 +1,205 @@ +import pytest + +import numpy as np + +from square_model_inference.models.request import Task + + +@pytest.mark.usefixtures("test_adapter") +class TestTransformerAdapter: + @pytest.mark.asyncio + @pytest.mark.parametrize("input", [(["this is a test"]), + (["this is a test", "this is a test with a longer sentence"])], + ids=["single", "batch"]) + async def test_sequence_classification(self, prediction_request, test_adapter, input): + prediction_request.input = input + prediction_request.adapter_name = "nli/rte@ukp" + + prediction = await test_adapter.predict(prediction_request, Task.sequence_classification) + np.testing.assert_allclose(np.sum(prediction.model_outputs["logits"], axis=-1), [1.0]*len(input), err_msg="logits are softmax") + assert len(prediction.labels) == len(input) + assert all(isinstance(prediction.labels[i], int) for i in range(len(input))) + assert "logits" in prediction.model_outputs + + @pytest.mark.asyncio + async def test_sequence_classification_output_attention(self, prediction_request, test_adapter): + prediction_request.model_kwargs = {"output_attentions": True} + prediction_request.adapter_name = "nli/rte@ukp" + + prediction = await test_adapter.predict(prediction_request, Task.sequence_classification) + assert "attentions" in prediction.model_outputs + + @pytest.mark.asyncio + @pytest.mark.parametrize("input", [(["this is a test"]), + (["this is a test", "this is a test with a longer sentence"])], + ids=["single", "batch"]) + async def test_sequence_classification_regression(self, prediction_request, test_adapter, input): + prediction_request.input = input + prediction_request.task_kwargs = {"is_regression": True} + prediction_request.adapter_name = "nli/rte@ukp" + + prediction = await test_adapter.predict(prediction_request, Task.sequence_classification) + assert not np.array_equal(np.sum(prediction.model_outputs["logits"], axis=-1)-1, [0.0]*len(input)), "logits are not softmax" + assert "logits" in prediction.model_outputs + + + @pytest.mark.asyncio + @pytest.mark.parametrize("input,word_ids", [(["this is a test"], [[None, 0, 1, 2, 3, None]]), + (["this is a test", "this is a test with a longer sentence"], + [[None, 0, 1, 2, 3, None, None, None, None, None], [None, 0, 1, 2, 3, 4, 5, 6, 7, None]])], + ids=["single", "batch"]) + async def test_token_classification(self, prediction_request, test_adapter, input, word_ids): + prediction_request.input = input + prediction_request.adapter_name = "ner/conll2003@ukp" + + prediction = await test_adapter.predict(prediction_request, Task.token_classification) + np.testing.assert_allclose(np.sum(prediction.model_outputs["logits"], axis=-1), np.ones(shape=(len(input), len(word_ids[0]))), atol=1e-6, err_msg="logits should be softmax") + assert all(len(prediction.labels[i]) == len(word_ids[i]) for i in range(len(input))) + assert "logits" in prediction.model_outputs + assert prediction.word_ids == word_ids + + + @pytest.mark.asyncio + @pytest.mark.parametrize("input,word_ids", [(["this is a test"], + [[None, 0, 1, 2, 3, None]]), + (["this is a test", "this is a test with a longer sentence"], + [[None, 0, 1, 2, 3, None, None, None, None, None], [None, 0, 1, 2, 3, 4, 5, 6, 7, None]])], + ids=["single", "batch"]) + async def test_token_classification_regression(self, prediction_request, test_adapter, input, word_ids): + prediction_request.input = input + prediction_request.task_kwargs = {"is_regression": True} + prediction_request.adapter_name = "ner/conll2003@ukp" + + prediction = await test_adapter.predict(prediction_request, Task.token_classification) + assert not np.array_equal((np.sum(prediction.model_outputs["logits"], axis=-1), np.ones_like(word_ids)), "logits are not softmax") + assert "logits" in prediction.model_outputs + assert prediction.word_ids == word_ids + + @pytest.mark.asyncio + @pytest.mark.parametrize("input,mode", [(["this is a test"], "mean"), + (["this is a test", "this is a test with a longer sentence"], "mean"), + (["this is a test"], "max"), + (["this is a test", "this is a test with a longer sentence"], "max"), + (["this is a test"], "cls"), + (["this is a test", "this is a test with a longer sentence"], "cls")], + ) + async def test_embedding(self, prediction_request, test_adapter, input, mode): + prediction_request.input = input + prediction_request.task_kwargs = {"embedding_mode": mode} + prediction_request.adapter_name = "sts/sts-b@ukp" + + prediction = await test_adapter.predict(prediction_request, Task.embedding) + assert np.array(prediction.model_outputs["embeddings"]).shape[1] == 768 + assert np.array(prediction.model_outputs["embeddings"]).shape[0] == len(input) + assert "hidden_states" not in prediction.model_outputs + assert prediction.embedding_mode == mode + + @pytest.mark.asyncio + @pytest.mark.parametrize("input,word_ids", [(["this is a test"], [[None, 0, 1, 2, 3, None]]), + (["this is a test", "this is a test with a longer sentence"], + [[None, 0, 1, 2, 3, None, None, None, None, None], [None, 0, 1, 2, 3, 4, 5, 6, 7, None]])], + ids=["single", "batch"]) + async def test_embedding_token(self, prediction_request, test_adapter, input, word_ids): + prediction_request.input = input + prediction_request.task_kwargs = {"embedding_mode": "token"} + prediction_request.adapter_name = "sts/sts-b@ukp" + + prediction = await test_adapter.predict(prediction_request, Task.embedding) + assert np.array(prediction.model_outputs["embeddings"]).shape[2] == 768 + assert np.array(prediction.model_outputs["embeddings"]).shape[1] == len(word_ids[0]) + assert np.array(prediction.model_outputs["embeddings"]).shape[0] == len(input) + assert "hidden_states" not in prediction.model_outputs + assert prediction.embedding_mode == "token" + assert prediction.word_ids == word_ids + + @pytest.mark.asyncio + async def test_embedding_unknown_mode(self, prediction_request, test_adapter): + prediction_request.task_kwargs = {"embedding_mode": "this mode does not exist"} + prediction_request.adapter_name = "sts/sts-b@ukp" + + with pytest.raises(ValueError): + prediction = await test_adapter.predict(prediction_request, Task.embedding) + + @pytest.mark.asyncio + async def test_forbid_is_preprocessed(self, prediction_request, test_adapter): + prediction_request.is_preprocessed = True + prediction_request.adapter_name = "sts/sts-b@ukp" + + with pytest.raises(ValueError): + prediction = await test_adapter.predict(prediction_request, Task.embedding) + + @pytest.mark.asyncio + async def test_input_too_big(self, prediction_request, test_adapter): + prediction_request.input = ["test"]*1000 + prediction_request.adapter_name = "sts/sts-b@ukp" + + with pytest.raises(ValueError): + prediction = await test_adapter.predict(prediction_request, Task.embedding) + + @pytest.mark.asyncio + @pytest.mark.parametrize("input", [([["What is a test?", "A test is a thing where you test."]]), + ([["What is a test?", "A test is a thing where you test."], + ["What is a longer test?", "A test is a thing where you test. If it is longer you call it longer"]])], + ) + async def test_question_answering(self, prediction_request, test_adapter, input): + prediction_request.input = input + prediction_request.task_kwargs = {"topk": 1} + prediction_request.adapter_name = "qa/squad2@ukp" + + prediction = await test_adapter.predict(prediction_request, Task.question_answering) + answers = [input[i][1][prediction.answers[i][0].start:prediction.answers[i][0].end] for i in range(len(input))] + assert "start_logits" in prediction.model_outputs and "end_logits" in prediction.model_outputs + assert len(prediction.answers) == len(input) + assert all(prediction.answers[i][0].answer == answers[i] for i in range(len(input))) + + @pytest.mark.asyncio + async def test_question_answering_topk(self, prediction_request, test_adapter): + input = [["What is a test?", "A test is a thing where you test."]] + prediction_request.input = input + prediction_request.task_kwargs = {"topk": 2} + prediction_request.adapter_name = "qa/squad2@ukp" + + prediction = await test_adapter.predict(prediction_request, Task.question_answering) + answers = [input[0][1][prediction.answers[0][i].start:prediction.answers[0][i].end] for i in range(2)] + assert "start_logits" in prediction.model_outputs and "end_logits" in prediction.model_outputs + assert len(prediction.answers) == len(input) + assert prediction.answers[0][0].score >= prediction.answers[0][1].score + assert all(prediction.answers[0][i].answer == answers[i] for i in range(2)) + + @pytest.mark.skip("No generation adapter for bert-base-uncased available currently.") + @pytest.mark.asyncio + @pytest.mark.parametrize("input", [(["Generate text"]), + (["Generate text", "And more text"])], + ) + async def test_generation(self, prediction_request, test_adapter, input): + prediction_request.input = input + + prediction = await test_adapter.predict(prediction_request, Task.generation) + assert all(isinstance(prediction.generated_texts[i][0], str) for i in range(len(input))) + + @pytest.mark.skip("No generation adapter for bert-base-uncased available currently.") + @pytest.mark.asyncio + async def test_generation_output_attention_and_scores(self, prediction_request, test_adapter): + prediction_request.model_kwargs = { + "output_attentions": True, + "output_scores": True + } + + prediction = await test_adapter.predict(prediction_request, Task.generation) + assert "scores" in prediction.model_outputs + assert "attentions" in prediction.model_outputs + + @pytest.mark.skip("No generation adapter for bert-base-uncased available currently.") + @pytest.mark.asyncio + async def test_generation_beam_sample_multiple_seqs(self, prediction_request, test_adapter): + prediction_request.task_kwargs = { + "num_beams": 2, + "do_sample": True, + "top_k": 10, + "top_p": 0.5, + "no_repeat_ngram_size": 2, + "num_return_sequences": 2 + } + + prediction = await test_adapter.predict(prediction_request, Task.generation) + assert len(prediction.generated_texts[0]) == 2 \ No newline at end of file diff --git a/square-model-inference-api/inference_server/tests/test_inference/test_prediction_output_numpy.py b/square-model-inference-api/inference_server/tests/test_inference/test_prediction_output_numpy.py new file mode 100644 index 000000000..35c4309c6 --- /dev/null +++ b/square-model-inference-api/inference_server/tests/test_inference/test_prediction_output_numpy.py @@ -0,0 +1,36 @@ +from starlette.config import Environ +from square_model_inference.models.prediction import PredictionOutput, _encode_numpy +from io import BytesIO +import base64 +import torch +import numpy as np + +def test_prediction_output_numpy_encoded() -> None: + + arr = np.ones(shape=(10,10), dtype="float32") + + output = _encode_numpy({"test": torch.from_numpy(arr)}, return_plaintext=False) + + encoded_arr = output["test"] + + # reversing code + arr_binary_b64 = encoded_arr.encode() + arr_binary = base64.decodebytes(arr_binary_b64) + arr_back = np.load(BytesIO(arr_binary)) + + np.testing.assert_equal(arr, arr_back) + + +def test_prediction_output_numpy_plaintext() -> None: + + arr = np.ones(shape=(10,10), dtype="float32") + + output = _encode_numpy({"test": torch.from_numpy(arr)}, return_plaintext=True) + + plaintext_list_arr = output["test"] + + # reversing code + arr_back = np.array(plaintext_list_arr) + + np.testing.assert_equal(arr, arr_back) + diff --git a/square-model-inference-api/inference_server/tests/test_inference/test_sentence_transformer.py b/square-model-inference-api/inference_server/tests/test_inference/test_sentence_transformer.py new file mode 100644 index 000000000..1265aafb2 --- /dev/null +++ b/square-model-inference-api/inference_server/tests/test_inference/test_sentence_transformer.py @@ -0,0 +1,38 @@ +import pytest + +import numpy as np + +from square_model_inference.models.request import Task + + +@pytest.mark.usefixtures("test_sentence_transformer") +class TestSentenceTransformerEmbedding: + + @pytest.mark.asyncio + @pytest.mark.parametrize("input", [(["this is a test"]), + (["this is a test", "this is a test with a longer sentence"])]) + async def test_embedding(self, prediction_request, test_sentence_transformer, input): + prediction_request.input = input + + prediction = await test_sentence_transformer.predict(prediction_request, Task.embedding) + assert np.array(prediction.model_outputs["embeddings"]).shape[1] == 768 + assert np.array(prediction.model_outputs["embeddings"]).shape[0] == len(input) + + @pytest.mark.asyncio + async def test_not_embedding(self, prediction_request, test_sentence_transformer): + with pytest.raises(ValueError): + prediction = await test_sentence_transformer.predict(prediction_request, Task.sequence_classification) + + @pytest.mark.asyncio + async def test_forbid_is_preprocessed(self, prediction_request, test_sentence_transformer): + prediction_request.is_preprocessed = True + + with pytest.raises(ValueError): + prediction = await test_sentence_transformer.predict(prediction_request, Task.embedding) + + @pytest.mark.asyncio + async def test_input_too_big(self, prediction_request, test_sentence_transformer): + prediction_request.input = ["test"]*1000 + + with pytest.raises(ValueError): + prediction = await test_sentence_transformer.predict(prediction_request, Task.embedding) \ No newline at end of file diff --git a/square-model-inference-api/inference_server/tests/test_inference/test_transformers.py b/square-model-inference-api/inference_server/tests/test_inference/test_transformers.py new file mode 100644 index 000000000..c715fd2f1 --- /dev/null +++ b/square-model-inference-api/inference_server/tests/test_inference/test_transformers.py @@ -0,0 +1,204 @@ +import pytest + +import numpy as np + +from square_model_inference.models.request import Task + + +@pytest.mark.usefixtures("test_transformer_sequence_classification") +class TestTransformerSequenceClassification: + @pytest.mark.asyncio + @pytest.mark.parametrize("input", [(["this is a test"]), + (["this is a test", "this is a test with a longer sentence"])], + ids=["single", "batch"]) + async def test_sequence_classification(self, prediction_request, test_transformer_sequence_classification, input): + prediction_request.input = input + + prediction = await test_transformer_sequence_classification.predict(prediction_request, Task.sequence_classification) + np.testing.assert_allclose(np.sum(prediction.model_outputs["logits"], axis=-1), [1.0]*len(input), err_msg="logits are softmax") + assert len(prediction.labels) == len(input) + assert all(isinstance(prediction.labels[i], int) for i in range(len(input))) + assert "logits" in prediction.model_outputs + + @pytest.mark.asyncio + async def test_sequence_classification_output_attention(self, prediction_request, test_transformer_sequence_classification): + prediction_request.model_kwargs = {"output_attentions": True} + + prediction = await test_transformer_sequence_classification.predict(prediction_request, Task.sequence_classification) + assert "attentions" in prediction.model_outputs + + @pytest.mark.asyncio + @pytest.mark.parametrize("input", [(["this is a test"]), + (["this is a test", "this is a test with a longer sentence"])], + ids=["single", "batch"]) + async def test_sequence_classification_regression(self, prediction_request, test_transformer_sequence_classification, input): + prediction_request.input = input + prediction_request.task_kwargs = {"is_regression": True} + + prediction = await test_transformer_sequence_classification.predict(prediction_request, Task.sequence_classification) + assert not np.array_equal(np.sum(prediction.model_outputs["logits"], axis=-1)-1, [0.0]*len(input)), "logits are not softmax" + assert "logits" in prediction.model_outputs + + +@pytest.mark.usefixtures("test_transformer_token_classification") +class TestTransformerTokenClassification: + + @pytest.mark.asyncio + @pytest.mark.parametrize("input,word_ids", [(["this is a test"], [[None, 0, 1, 2, 3, None]]), + (["this is a test", "this is a test with a longer sentence"], + [[None, 0, 1, 2, 3, None, None, None, None, None], [None, 0, 1, 2, 3, 4, 5, 6, 7, None]])], + ids=["single", "batch"]) + async def test_token_classification(self, prediction_request, test_transformer_token_classification, input, word_ids): + prediction_request.input = input + + prediction = await test_transformer_token_classification.predict(prediction_request, Task.token_classification) + np.testing.assert_allclose(np.sum(prediction.model_outputs["logits"], axis=-1), np.ones(shape=(len(input), len(word_ids[0]))), err_msg="logits are softmax") + assert all(len(prediction.labels[i]) == len(word_ids[i]) for i in range(len(input))) + assert "logits" in prediction.model_outputs + assert prediction.word_ids == word_ids + + + @pytest.mark.asyncio + @pytest.mark.parametrize("input,word_ids", [(["this is a test"], + [[None, 0, 1, 2, 3, None]]), + (["this is a test", "this is a test with a longer sentence"], + [[None, 0, 1, 2, 3, None, None, None, None, None], [None, 0, 1, 2, 3, 4, 5, 6, 7, None]])], + ids=["single", "batch"]) + async def test_token_classification_regression(self, prediction_request, test_transformer_token_classification, input, word_ids): + prediction_request.input = input + prediction_request.task_kwargs = {"is_regression": True} + + prediction = await test_transformer_token_classification.predict(prediction_request, Task.token_classification) + assert not np.array_equal((np.sum(prediction.model_outputs["logits"], axis=-1), np.ones_like(word_ids)), "logits are not softmax") + assert "logits" in prediction.model_outputs + assert prediction.word_ids == word_ids + + +@pytest.mark.usefixtures("test_transformer_embedding") +class TestTransformerEmbedding: + @pytest.mark.asyncio + @pytest.mark.parametrize("input,mode", [(["this is a test"], "mean"), + (["this is a test", "this is a test with a longer sentence"], "mean"), + (["this is a test"], "max"), + (["this is a test", "this is a test with a longer sentence"], "max"), + (["this is a test"], "cls"), + (["this is a test", "this is a test with a longer sentence"], "cls"), + (["this is a test"], "pooler"), + (["this is a test", "this is a test with a longer sentence"], "pooler")], + ) + async def test_embedding(self, prediction_request, test_transformer_embedding, input, mode): + prediction_request.input = input + prediction_request.task_kwargs = {"embedding_mode": mode} + + prediction = await test_transformer_embedding.predict(prediction_request, Task.embedding) + assert np.array(prediction.model_outputs["embeddings"]).shape[1] == 768 + assert np.array(prediction.model_outputs["embeddings"]).shape[0] == len(input) + assert "hidden_states" not in prediction.model_outputs + assert prediction.embedding_mode == mode + + @pytest.mark.asyncio + @pytest.mark.parametrize("input,word_ids", [(["this is a test"], [[None, 0, 1, 2, 3, None]]), + (["this is a test", "this is a test with a longer sentence"], + [[None, 0, 1, 2, 3, None, None, None, None, None], [None, 0, 1, 2, 3, 4, 5, 6, 7, None]])], + ids=["single", "batch"]) + async def test_embedding_token(self, prediction_request, test_transformer_embedding, input, word_ids): + prediction_request.input = input + prediction_request.task_kwargs = {"embedding_mode": "token"} + + prediction = await test_transformer_embedding.predict(prediction_request, Task.embedding) + assert np.array(prediction.model_outputs["embeddings"]).shape[2] == 768 + assert np.array(prediction.model_outputs["embeddings"]).shape[1] == len(word_ids[0]) + assert np.array(prediction.model_outputs["embeddings"]).shape[0] == len(input) + assert "hidden_states" not in prediction.model_outputs + assert prediction.embedding_mode == "token" + assert prediction.word_ids == word_ids + + @pytest.mark.asyncio + async def test_embedding_unknown_mode(self, prediction_request, test_transformer_embedding): + prediction_request.task_kwargs = {"embedding_mode": "this mode does not exist"} + + with pytest.raises(ValueError): + prediction = await test_transformer_embedding.predict(prediction_request, Task.embedding) + + @pytest.mark.asyncio + async def test_forbid_is_preprocessed(self, prediction_request, test_transformer_embedding): + prediction_request.is_preprocessed = True + + with pytest.raises(ValueError): + prediction = await test_transformer_embedding.predict(prediction_request, Task.embedding) + + @pytest.mark.asyncio + async def test_input_too_big(self, prediction_request, test_transformer_embedding): + prediction_request.input = ["test"]*1000 + + with pytest.raises(ValueError): + prediction = await test_transformer_embedding.predict(prediction_request, Task.embedding) + +@pytest.mark.usefixtures("test_transformer_question_answering") +class TestTransformerQuestionAnswering: + + @pytest.mark.asyncio + @pytest.mark.parametrize("input", [([["What is a test?", "A test is a thing where you test."]]), + ([["What is a test?", "A test is a thing where you test."], + ["What is a longer test?", "A test is a thing where you test. If it is longer you call it longer"]])], + ) + async def test_question_answering(self, prediction_request, test_transformer_question_answering, input): + prediction_request.input = input + prediction_request.task_kwargs = {"topk": 1} + + prediction = await test_transformer_question_answering.predict(prediction_request, Task.question_answering) + answers = [input[i][1][prediction.answers[i][0].start:prediction.answers[i][0].end] for i in range(len(input))] + assert "start_logits" in prediction.model_outputs and "end_logits" in prediction.model_outputs + assert len(prediction.answers) == len(input) + assert all(prediction.answers[i][0].answer == answers[i] for i in range(len(input))) + + @pytest.mark.asyncio + async def test_question_answering_topk(self, prediction_request, test_transformer_question_answering): + input = [["What is a test?", "A test is a thing where you test."]] + prediction_request.input = input + prediction_request.task_kwargs = {"topk": 2} + + prediction = await test_transformer_question_answering.predict(prediction_request, Task.question_answering) + answers = [input[0][1][prediction.answers[0][i].start:prediction.answers[0][i].end] for i in range(2)] + assert "start_logits" in prediction.model_outputs and "end_logits" in prediction.model_outputs + assert len(prediction.answers) == len(input) + assert prediction.answers[0][0].score >= prediction.answers[0][1].score + assert all(prediction.answers[0][i].answer == answers[i] for i in range(2)) + + +@pytest.mark.usefixtures("test_transformer_generation") +class TestTransformerGeneration: + @pytest.mark.asyncio + @pytest.mark.parametrize("input", [(["Generate text"]), + (["Generate text", "And more text"])], + ) + async def test_generation(self, prediction_request, test_transformer_generation, input): + prediction_request.input = input + + prediction = await test_transformer_generation.predict(prediction_request, Task.generation) + assert all(isinstance(prediction.generated_texts[i][0], str) for i in range(len(input))) + + @pytest.mark.asyncio + async def test_generation_output_attention_and_scores(self, prediction_request, test_transformer_generation): + prediction_request.model_kwargs = { + "output_attentions": True, + "output_scores": True + } + + prediction = await test_transformer_generation.predict(prediction_request, Task.generation) + assert "scores" in prediction.model_outputs + assert "attentions" in prediction.model_outputs + + @pytest.mark.asyncio + async def test_generation_beam_sample_multiple_seqs(self, prediction_request, test_transformer_generation): + prediction_request.task_kwargs = { + "num_beams": 2, + "do_sample": True, + "top_k": 10, + "top_p": 0.5, + "no_repeat_ngram_size": 2, + "num_return_sequences": 2 + } + + prediction = await test_transformer_generation.predict(prediction_request, Task.generation) + assert len(prediction.generated_texts[0]) == 2 \ No newline at end of file diff --git a/square-model-inference-api/inference_server/uninstall_requirements.txt b/square-model-inference-api/inference_server/uninstall_requirements.txt new file mode 100644 index 000000000..747b7aa97 --- /dev/null +++ b/square-model-inference-api/inference_server/uninstall_requirements.txt @@ -0,0 +1 @@ +transformers \ No newline at end of file diff --git a/square-model-inference-api/locust/README.md b/square-model-inference-api/locust/README.md new file mode 100644 index 000000000..261f52f94 --- /dev/null +++ b/square-model-inference-api/locust/README.md @@ -0,0 +1,52 @@ +# Locust +We use Locust for load testing. +See their [documentation](https://docs.locust.io/en/stable/) for more infos. + +## Setup +1. Install Python (>3.6) and Locust ([requirements.txt](requirements.txt)) on your system + (does not have to be the same system as where the Model API runs). +2. Write your ``config.json`` (see below for more) +3. Start the Model API servers (with Docker or locally or however you want). This does *not* have to be +on the same computer/ server as Locust. +4. Start the Locust server in this directory (``locust -f locustfile.py``), visit the web UI and start +the load test. For alternative methods see the above documentation. + +## config.json +We describe the keys with expected values for the config.json: +```json +{ + "config": { + # Time each user waits (min, max) after a request. Default [1, 2] + "wait_time": [1, 2], + # API key for authorization + "api_key": "example_key" + # Header for API key. Default: Authorization + "api_key_header": "Authorization" + }, + # List of Locust tasks. A user randomly selects from this list each time and starts the request + "tasks": [ + { + # Request to /api/$model/$endpoint + "endpoint": "embedding", + "model": "bert-base-uncased", + # Set to integer greater 1 to increase chance that this task is chosen. + # This task appears $weight-times in the list of tasks + # (from which the next task is uniformly chosen) + "weight": 1, + # The JSON of the query that is send to the Model API. + # See the documentation of the API for more details on this. + "query_json": { + "input": + [ + "test input" + ], + "is_preprocessed": false, + "preprocessing_kwargs": { }, + "model_kwargs": { }, + "task_kwargs": { }, + "adapter_name": "" + } + } + ] +} +``` diff --git a/square-model-inference-api/locust/config.json b/square-model-inference-api/locust/config.json new file mode 100644 index 000000000..94fc2f27b --- /dev/null +++ b/square-model-inference-api/locust/config.json @@ -0,0 +1,40 @@ +{ + "config": { + "wait_time": [1, 2], + "api_key": "example_key" + }, + "tasks": [ + { + "endpoint": "sequence-classification", + "model": "bert-base-uncased", + "weight": 1, + "query_json": { + "input": + [ + "test input" + ], + "is_preprocessed": false, + "preprocessing_kwargs": { }, + "model_kwargs": { }, + "task_kwargs": { }, + "adapter_name": "nli/rte@ukp" + } + }, + { + "endpoint": "embedding", + "model": "facebook/dpr-question_encoder-single-nq-base", + "weight": 1, + "query_json": { + "input": + [ + "test input" + ], + "is_preprocessed": false, + "preprocessing_kwargs": { }, + "model_kwargs": { }, + "task_kwargs": { }, + "adapter_name": "" + } + } + ] +} \ No newline at end of file diff --git a/square-model-inference-api/locust/locustfile.py b/square-model-inference-api/locust/locustfile.py new file mode 100644 index 000000000..7738f3278 --- /dev/null +++ b/square-model-inference-api/locust/locustfile.py @@ -0,0 +1,49 @@ +import json + +from locust import between +from locust.contrib.fasthttp import FastHttpUser + + +def task_query(config, endpoint): + """ + Template to make Locust tasks for queries that are generated dynamically based on the given config and endpoint. + Locust calls its task functions only with the user as argument so we create a closure and return it. + :param config: the config for the task with the model name, the API-Key, the JSON input, etc. + :param endpoint: the endpoint for the query + :return: the closure for the Locust task function that makes the specified query + """ + def query(user): + path = f"/api/{config['model']}/{endpoint}" + query_json = config["query_json"] + headers = {config.get("api_key_header", "Authorization"): config["api_key"]} + user.client.post(path, json=query_json, headers=headers) + return query + + +class ModelAPIUser(FastHttpUser): + wait_time = between(1, 2) + tasks = [] + + def __init__(self, *args, **kwargs): + # Load config + config = json.load(open("config.json")) + general_config = config["config"] + + # Setup User + wait_time = general_config.get("wait_time", [1, 2]) + # self.wait_time = between(...) does not work for some reason because it expects one argument that is not used but not supplied in calls + self.wait_time = lambda: between(wait_time[0], wait_time[1])(None) + + # Setup the Locust tasks + tasks = [] + for task in config["tasks"]: + task.update(general_config) + # Endpoint in URL uses - instead of _, so we replace it in case config was wrong + task_function = task_query(task, task["endpoint"].replace("_", "-")) + for _ in range(task.get("weight", 1)): + tasks.append(task_function) + self.tasks = tasks + + super().__init__(*args, **kwargs) + + diff --git a/square-model-inference-api/locust/requirements.txt b/square-model-inference-api/locust/requirements.txt new file mode 100644 index 000000000..61ddec44d --- /dev/null +++ b/square-model-inference-api/locust/requirements.txt @@ -0,0 +1 @@ +locust==2.0.0 \ No newline at end of file diff --git a/square-model-inference-api/nginx/nginx.conf b/square-model-inference-api/nginx/nginx.conf new file mode 100644 index 000000000..56a4d7e53 --- /dev/null +++ b/square-model-inference-api/nginx/nginx.conf @@ -0,0 +1,50 @@ +events { + worker_connections 1024; +} + +http { + log_format json_combined escape=json + '{ "@timestamp": "$time_iso8601", ' + '"remote_addr": "$remote_addr", ' + '"http_referer": "$http_referer", ' + '"request": "$request", ' + '"status": $status, ' + '"body_bytes_sent": $body_bytes_sent, ' + '"http_user_agent": "$http_user_agent", ' + '"http_x_forwarded_for": "$http_x_forwarded_for", ' + '"upstream_addr": "$upstream_addr",' + '"upstream_http_host": "$upstream_http_host",' + '"upstream_response_time": "$upstream_response_time",' + '"request_time": "$request_time"' + '}'; + + server { + access_log /var/log/nginx/access.log json_combined; + + listen 8080; + # Model API Documentation + location /docs { + proxy_pass http://square_model_inference_bert_adapter:8000/docs; + } + location /redoc { + proxy_pass http://square_model_inference_bert_adapter:8000/redoc; + } + + # Model Server API Gateway + location /api/bert-base-uncased { + auth_request /auth; + proxy_pass http://square_model_inference_bert_adapter:8000/api; + } + + location /api/facebook/dpr-question_encoder-single-nq-base { + auth_request /auth; + proxy_pass http://square_model_inference_dpr:8000/api; + } + + # Auth Server + location /auth { + internal; + proxy_pass http://square_model_auth:8081/auth; + } + } +} \ No newline at end of file diff --git a/square-model-inference-api/offline_encoding_for_data_api.py b/square-model-inference-api/offline_encoding_for_data_api.py new file mode 100644 index 000000000..173982e6f --- /dev/null +++ b/square-model-inference-api/offline_encoding_for_data_api.py @@ -0,0 +1,453 @@ +# This is a stand-alone script that does not require additional code (except the imported packages). +# We copy-pasted code from Model API and removing some not needed stuff for this. +import argparse +import os +import logging +import json +import pickle +import time +from dataclasses import dataclass +from typing import Union, List +import h5py +import torch +import numpy as np +# Conditionally load adapter or sentence-transformer later to simplify installation +#from sentence_transformers import SentenceTransformer as SentenceTransformerModel +import transformers +import torch.multiprocessing as mp +import queue +from transformers import AutoModel, AutoTokenizer #, AutoModelWithHeads + +logger = logging.getLogger(__name__) +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter('%(processName)s-%(levelname)s-%(asctime)s: %(message)s')) +logger.addHandler(handler) +logger.setLevel(logging.INFO) + +@dataclass +class PredictionRequest: + """ + Prediction request containing the input, pre-processing parameters, parameters for the model forward pass, + the task with task-specific parameters, and parameters for any post-processing + """ + input: Union[List[str], List[List[str]], dict] + preprocessing_kwargs: dict + model_kwargs: dict + task_kwargs: dict + + +class SentenceTransformer: + """ + The class for all sentence-transformers models + """ + + def __init__(self, model_name, batch_size, disable_gpu): + """ + Initialize the SentenceTransformer + :param model_name: the sentence-transformer model name (https://sbert.net/docs/pretrained_models.html) + :param batch_size: batch size used for inference + :param disable_gpu: do not move model to GPU even if CUDA is available + """ + self._load_model(model_name, disable_gpu) + self.batch_size = batch_size + + def to(self, device): + self.model.to(device) + + def _load_model(self, model_name, disable_gpu): + """ + Load the Transformer model model_name and its tokenizer with Huggingface. + Model will be moved to GPU unless CUDA is unavailable or disable_gpu is true. + """ + import sentence_transformers + logger.debug(f"Loading model {model_name}") + device = "cuda" if torch.cuda.is_available() and not disable_gpu else "cpu" + model = sentence_transformers.SentenceTransformer(model_name_or_path=model_name, device=device) + logger.info(f"Model {model_name} loaded on {device}") + self.model = model + + def embedding(self, request): + embeddings = self.model.encode(request.input, batch_size=self.batch_size, show_progress_bar=False, convert_to_tensor=True) + return embeddings + + +class Transformer: + """ + The class for all Huggingface transformer-based models + """ + SUPPORTED_EMBEDDING_MODES = ["mean", "max", "cls", "token"] + + def __init__(self, model_name, batch_size, disable_gpu): + """ + Initialize the Transformer + :param model_name: the Huggingface model name + :param batch_size: batch size used for inference + :param disable_gpu: do not move model to GPU even if CUDA is available + """ + self._load_model(AutoModel, model_name, disable_gpu) + self.batch_size = batch_size + + def to(self, device): + self.model.to(device) + + def _load_model(self, model_cls, model_name, disable_gpu): + """ + Load the Transformer model model_name and its tokenizer with Huggingface. + Model will be moved to GPU unless CUDA is unavailable or disable_gpu is true. + """ + logger.debug(f"Loading model {model_name}") + tokenizer = AutoTokenizer.from_pretrained(model_name) + # Check if GPU is available + device = "cuda" if torch.cuda.is_available() and not disable_gpu else "cpu" + model = model_cls.from_pretrained(model_name).to(device) + logger.info(f"Model {model_name} loaded on {device}") + + self.model = model + self.tokenizer = tokenizer + + def _ensure_tensor_on_device(self, **inputs): + """ + Ensure PyTorch tensors are on the specified device. + + Args: + inputs (keyword arguments that should be :obj:`torch.Tensor`): The tensors to place on :obj:`self.device`. + + Return: + :obj:`Dict[str, torch.Tensor]`: The same as :obj:`inputs` but on the proper device. + """ + return {name: tensor.to(self.model.device) for name, tensor in inputs.items()} + + def _predict(self, request, output_features=False): + """ + Inference on the input. + :param request: the request with the input and optional kwargs + :param output_features: return the features of the input. + Necessary if, e.g., attention mask is needed for post-processing. + :return: The model outputs and optionally the input features + """ + all_predictions = [] + request.preprocessing_kwargs["padding"] = request.preprocessing_kwargs.get("padding", True) + request.preprocessing_kwargs["truncation"] = request.preprocessing_kwargs.get("truncation", True) + features = self.tokenizer(request.input, + return_tensors="pt", + **request.preprocessing_kwargs) + for start_idx in range(0, len(request.input), self.batch_size): + with torch.no_grad(): + input_features = {k: features[k][start_idx:start_idx+self.batch_size] for k in features.keys()} + input_features = self._ensure_tensor_on_device(**input_features) + predictions = self.model(**input_features, **request.model_kwargs) + all_predictions.append(predictions) + keys = all_predictions[0].keys() + final_prediction = {} + for key in keys: + # HuggingFace outputs for 'attentions' and more is returned as tuple of tensors + # Tuple of tuples only exists for 'past_key_values' which is only relevant for generation. + # Generation should NOT use this function + if isinstance(all_predictions[0][key], tuple): + tuple_of_lists = list(zip(*[[p.cpu() for p in tpl[key]] for tpl in all_predictions])) + final_prediction[key] = tuple(torch.cat(l) for l in tuple_of_lists) + else: + final_prediction[key] = torch.cat([p[key].cpu() for p in all_predictions]) + if output_features: + return final_prediction, features + return final_prediction + + def embedding(self, request): + request.model_kwargs["output_hidden_states"] = True + predictions, features = self._predict(request, output_features=True) + # We remove hidden_states from predictions! + hidden_state = predictions.pop("hidden_states")[-1] + attention_mask = features["attention_mask"] + + embedding_mode = request.task_kwargs.get("embedding_mode", "mean") + + if embedding_mode not in self.SUPPORTED_EMBEDDING_MODES: + raise ValueError(f"Embedding mode {embedding_mode} not in list of supported modes {self.SUPPORTED_EMBEDDING_MODES}") + + if embedding_mode == "cls": + emb = hidden_state[:, 0, :] + # copied from sentence-transformers pooling + elif embedding_mode == "max": + input_mask_expanded = attention_mask.unsqueeze(-1).expand(hidden_state.size()).float() + hidden_state[input_mask_expanded == 0] = -1e9 # Set padding tokens to large negative value + emb = torch.max(hidden_state, 1)[0] + # copied from sentence-transformers pooling + elif embedding_mode == "mean": + input_mask_expanded = attention_mask.unsqueeze(-1).expand(hidden_state.size()).float() + sum_embeddings = torch.sum(hidden_state * input_mask_expanded, 1) + sum_mask = input_mask_expanded.sum(1) + emb = sum_embeddings / sum_mask + elif embedding_mode == "token": + emb = hidden_state + return emb + + +class AdapterTransformer(Transformer): + """ + The class for all adapter-based models using the adapter-transformers package + """ + def __init__(self, model_name, batch_size, disable_gpu, transformers_cache, adapter_name): + """ + Initialize the Adapter with its underlying Transformer and pre-load all available adapters from adapterhub.ml + :param model_name: the Huggingface model name + :param batch_size: batch size used for inference + :param disable_gpu: do not move model to GPU even if CUDA is available + :param transformers_cache: Should be same as TRANSFORMERS_CACHE env variable. + This folder will be used to store the adapters + """ + self._load_model(transformers.AutoModelWithHeads, model_name, disable_gpu) + self.model.load_adapter(adapter_name, load_as=adapter_name, cache_dir=transformers_cache) + self.model.to(self.model.device) + self.model.set_active_adapters(adapter_name) + self.batch_size = batch_size + + +def read_batch(file_pointer, batch_size): + i = 0 + lines = [] + finished = False + while i < batch_size: + line = file_pointer.readline().strip() + # empty string -> file finished + if not line: + finished = True + break + else: + lines.append(line) + i += 1 + return lines, finished + + +def encode(args): + transformers_cache = args.transformers_cache + os.environ["TRANSFORMERS_CACHE"] = transformers_cache + model_name = args.model_name + model_type = args.model_type + batch_size = args.batch_size + chunk_size = args.chunk_size + input_file = args.input_file + output_file = os.path.splitext(args.output_file)[0] # Remove .pkl from name + adapter_name = args.adapter_name + + if model_type == "sentence-transformer": + model = SentenceTransformer(model_name, batch_size, False) + elif model_type == "transformer": + model = Transformer(model_name, batch_size, False) + elif model_type == "adapter": + model = AdapterTransformer(model_name, batch_size, False, transformers_cache, adapter_name) + + logger.info(f"Reading input from {input_file}") + if os.path.dirname(output_file): + os.makedirs(os.path.dirname(output_file), exist_ok=True) + with open(input_file, "r", encoding="utf-8") as f_in: + # We do not know how large the input file is so we read it batch-wise for memory safety reasons + finished_reading = False + total_processed_lines = 0 + chunk_idx = 0 + current_processed_lines = 0 + current_output = {"ids": [], "embeddings": []} + while not finished_reading: + lines, finished_reading = read_batch(f_in, batch_size) + if lines: + lines = [json.loads(line) for line in lines] + texts = [line["text"] for line in lines] + ids = [line["id"] for line in lines] + + input = PredictionRequest(input=texts, preprocessing_kwargs={}, model_kwargs={}, task_kwargs={"embedding_mode": "mean"}) + embeddings = model.embedding(input) + embeddings = embeddings.cpu().numpy() + if args.float16: + embeddings = embeddings.astype("float16") + + current_output["ids"].extend(ids) + current_output["embeddings"].append(embeddings) + + total_processed_lines += len(lines) + current_processed_lines += len(lines) + + if current_processed_lines >= chunk_size or finished_reading: + logger.info(f"Processed {total_processed_lines} lines ({chunk_idx+1} chunks)") + + current_output["embeddings"] = np.concatenate(current_output["embeddings"]) + + if args.hdf5: + chunk_output_file = f"{output_file}_{chunk_idx}.h5" + with h5py.File(chunk_output_file, "w") as out_f: + logger.info(f"Writing chunk in {chunk_output_file}") + if args.hdf5_gzip_level < 0: + out_f.create_dataset("ids", data=np.array(current_output["ids"], dtype="S")) + out_f.create_dataset("embeddings", data=current_output["embeddings"]) + else: + out_f.create_dataset("ids", data=np.array(current_output["ids"], dtype="S"), compression="gzip", compression_opts=min(args.hdf5_gzip_level, 9)) + out_f.create_dataset("embeddings", data=current_output["embeddings"], compression="gzip", compression_opts=min(args.hdf5_gzip_level, 9)) + else: + chunk_output_file = f"{output_file}_{chunk_idx}.pkl" + with open(chunk_output_file, "wb") as out_f: + logger.info(f"Writing chunk in {chunk_output_file}") + pickle.dump(current_output, out_f) + current_processed_lines = 0 + current_output = {"ids": [], "embeddings": []} + chunk_idx += 1 + + +def _read_process(input_file, batch_size, input_queue): + logger.info(f"Reading input from {input_file}") + with open(input_file, "r", encoding="utf-8") as f_in: + # We do not know how large the input file is so we read it batch-wise for memory safety reasons + finished_reading = False + while not finished_reading: + lines, finished_reading = read_batch(f_in, batch_size) + if lines: + lines = [json.loads(line) for line in lines] + texts = [line["text"] for line in lines] + ids = [line["id"] for line in lines] + + input = PredictionRequest(input=texts, preprocessing_kwargs={}, model_kwargs={}, task_kwargs={"embedding_mode": "mean"}) + input_queue.put((input, ids), block=True) + + +def _encode_process(model, device, float16, input_queue, output_queue): + logger.info(f"Moving model to {device}") + model.to(device) + while True: + try: + input, ids = input_queue.get() + embeddings = model.embedding(input) + embeddings = embeddings.cpu().numpy() + if float16: + embeddings = embeddings.astype("float16") + output_queue.put((embeddings, ids)) + except queue.Empty: + break + + +def _write_process(output_file, chunk_size, args, output_queue): + chunk_idx = 0 + current_processed_lines = 0 + total_processed_lines = 0 + current_output = {"ids": [], "embeddings": []} + while True: + try: + # We do not know when we are done writing. Instead we wait 60s and if we get nothing new, we assume we are done. + # i.e., after the timeout, queue.Empty exception is triggered and we write the remaining output and finish + embeddings, ids = output_queue.get(timeout=60) + current_output["ids"].extend(ids) + current_output["embeddings"].append(embeddings) + + total_processed_lines += len(ids) + current_processed_lines += len(ids) + + if current_processed_lines >= chunk_size: + logger.info(f"Processed {total_processed_lines} lines ({chunk_idx+1} chunks)") + + current_output["embeddings"] = np.concatenate(current_output["embeddings"]) + + if args.hdf5: + chunk_output_file = f"{output_file}_{chunk_idx}.h5" + with h5py.File(chunk_output_file, "w") as out_f: + logger.info(f"Writing chunk in {chunk_output_file}") + if args.hdf5_gzip_level < 0: + out_f.create_dataset("ids", data=np.array(current_output["ids"], dtype="S")) + out_f.create_dataset("embeddings", data=current_output["embeddings"]) + else: + out_f.create_dataset("ids", data=np.array(current_output["ids"], dtype="S"), compression="gzip", compression_opts=min(args.hdf5_gzip_level, 9)) + out_f.create_dataset("embeddings", data=current_output["embeddings"], compression="gzip", compression_opts=min(args.hdf5_gzip_level, 9)) + else: + chunk_output_file = f"{output_file}_{chunk_idx}.pkl" + with open(chunk_output_file, "wb") as out_f: + logger.info(f"Writing chunk in {chunk_output_file}") + pickle.dump(current_output, out_f) + current_processed_lines = 0 + current_output = {"ids": [], "embeddings": []} + chunk_idx += 1 + except queue.Empty: + logger.info(f"Processed {total_processed_lines} lines ({chunk_idx+1} chunks)") + current_output["embeddings"] = np.concatenate(current_output["embeddings"]) + if args.hdf5: + chunk_output_file = f"{output_file}_{chunk_idx}.h5" + with h5py.File(chunk_output_file, "w") as out_f: + logger.info(f"Writing chunk in {chunk_output_file}") + if args.hdf5_gzip_level < 0: + out_f.create_dataset("ids", data=np.array(current_output["ids"], dtype="S")) + out_f.create_dataset("embeddings", data=current_output["embeddings"]) + else: + out_f.create_dataset("ids", data=np.array(current_output["ids"], dtype="S"), compression="gzip", compression_opts=min(args.hdf5_gzip_level, 9)) + out_f.create_dataset("embeddings", data=current_output["embeddings"], compression="gzip", compression_opts=min(args.hdf5_gzip_level, 9)) + else: + chunk_output_file = f"{output_file}_{chunk_idx}.pkl" + with open(chunk_output_file, "wb") as out_f: + logger.info(f"Writing chunk in {chunk_output_file}") + pickle.dump(current_output, out_f) + break + + +def encode_multiprocess(args): + transformers_cache = args.transformers_cache + os.environ["TRANSFORMERS_CACHE"] = transformers_cache + model_name = args.model_name + model_type = args.model_type + batch_size = args.batch_size + chunk_size = args.chunk_size + input_file = args.input_file + output_file = os.path.splitext(args.output_file)[0] # Remove file extension from name + adapter_name = args.adapter_name + devices = args.gpus.split(",") + + ctx = mp.get_context('spawn') + + if model_type == "sentence-transformer": + model = SentenceTransformer(model_name, batch_size, True) + elif model_type == "transformer": + model = Transformer(model_name, batch_size, True) + elif model_type == "adapter": + model = AdapterTransformer(model_name, batch_size, True, transformers_cache, adapter_name) + + if os.path.dirname(output_file): + os.makedirs(os.path.dirname(output_file), exist_ok=True) + + input_queue = ctx.Queue(maxsize=2*len(devices)) + output_queue = ctx.Queue() + + read_p = ctx.Process(target=_read_process, args=(input_file, batch_size, input_queue), daemon=True) + read_p.start() + write_p = ctx.Process(target=_write_process, args=(output_file, chunk_size, args, output_queue), daemon=True) + write_p.start() + for cuda_id in devices: + p = ctx.Process(target=_encode_process, args=(model, cuda_id, args.float16, input_queue, output_queue), daemon=True) + p.start() + + write_p.join() + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--transformers_cache", help="Cache folder where model files will be downloaded into and loaded from") + parser.add_argument("--model_name", help="Model name, i.e., name used in transformers oder sentence-transformers to load the pre-trained model") + parser.add_argument("--model_type", help="Model type, one of 'adapter', 'transformer', 'sentence-transformer'") + parser.add_argument("--batch_size", type=int, help="Batch size used for encoding") + parser.add_argument("--chunk_size", type=int, help="Chunk size used for writing out embeddings. " + "ATTENTION: This value will be set to the first value satisfying: true_chunk_size mod batch_size == 0" + "Each output file contains chunk_size embeddings " + "(except the last one if len(input) mod chunk_size != 0)") + parser.add_argument("--input_file", help="Input .jsonl file. Each line is a dict object: {'id': 'xxx', 'text': 'abc...'}") + parser.add_argument("--output_file", help="Output .pkl/.h5 file. A chunk index will be inserted between the name and extension: e.g. 'path/to/name_chunkidx.pkl' ." + "Format: {'ids': List[str], 'embeddings': ndarray}. " + "Note for hdf5, use f['ids'].asstr() to load ids as string because default is binary.") + parser.add_argument("--adapter_name", help="For model_type=adapter, the name of the adapter that should be loaded") + parser.add_argument("--hdf5", action="store_true", help="Save output with hdf5 instead of pickle") + parser.add_argument("--hdf5-gzip-level", type=int, default=4, help="GZIP compression level for HDF5 in range 0-9, default 4 (bigger is more compressed). " + "Set to negative value to disable compression." + "Only used when --hdf5 is also set.") + parser.add_argument("--float16", action="store_true", help="Save embeddings as float16") + parser.add_argument("--gpus", help="Set this value to use multiprocessing." + "Comma-separated list of devices (e.g., cuda:0,cuda:1) for multi-GPU processing." + "Reading, writing and each GPU is assigned its own process." + "Can also be used with only one device to use the multiprocessing for reading/ writing of outputs but this is not necessarily faster with one GPU.") + args = parser.parse_args() + + start_time = time.time() + if not args.gpus: + encode(args) + else: + encode_multiprocess(args) + end_time = time.time() + logger.info(f"Finished encoding in {end_time-start_time}s")