diff --git a/Cargo.lock b/Cargo.lock index 72441430240..4b4e7670ee7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,18 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-rustls" version = "0.3.0" @@ -758,6 +770,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.8" @@ -1158,6 +1179,27 @@ dependencies = [ "cc", ] +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "exr" version = "1.72.0" @@ -2732,6 +2774,20 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + [[package]] name = "opentelemetry-otlp" version = "0.13.0" @@ -2849,6 +2905,24 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry_sdk" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry 0.26.0", + "percent-encoding", + "rand", + "thiserror", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -2890,6 +2964,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" @@ -4183,6 +4263,35 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "text-generation-backend-llamacpp" +version = "2.4.1-dev0" +dependencies = [ + "async-channel", + "async-trait", + "clap 4.5.20", + "cmake", + "cxx", + "cxx-build", + "hf-hub", + "image", + "log", + "metrics", + "metrics-exporter-prometheus", + "num_cpus", + "pkg-config", + "serde_json", + "text-generation-router", + "thiserror", + "tokenizers", + "tokio", + "tokio-stream", + "tracing", + "tracing-opentelemetry 0.27.0", + "tracing-subscriber", + "utoipa", +] + [[package]] name = "text-generation-backends-trtllm" version = "2.4.1-dev0" @@ -4925,6 +5034,24 @@ dependencies = [ "web-time 1.1.0", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry 0.26.0", + "opentelemetry_sdk 0.26.0", + "smallvec", + "tracing", + "tracing-core", + "tracing-log 0.2.0", + "tracing-subscriber", + "web-time 1.1.0", +] + [[package]] name = "tracing-opentelemetry-instrumentation-sdk" version = "0.16.0" diff --git a/Cargo.toml b/Cargo.toml index 9a7e76c412b..f3ab5ee546f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ "backends/trtllm", "launcher", "router" -] +, "backends/llamacpp"] default-members = [ "benchmark", "backends/v2", diff --git a/Dockerfile.llamacpp b/Dockerfile.llamacpp new file mode 100644 index 00000000000..78b3636bcf4 --- /dev/null +++ b/Dockerfile.llamacpp @@ -0,0 +1,101 @@ +# Build dependencies resolver stage +FROM lukemathwalker/cargo-chef:latest AS chef +WORKDIR /usr/src/text-generation-inference/ + +FROM chef AS planner +COPY Cargo.lock Cargo.lock +COPY Cargo.toml Cargo.toml +COPY rust-toolchain.toml rust-toolchain.toml +COPY backends backends +COPY benchmark benchmark +COPY clients clients +COPY launcher launcher +COPY router router + +RUN cargo chef prepare --recipe-path recipe.json + +FROM chef AS builder +ENV CMAKE_INSTALL_PREFIX=/usr/src/text-generation-inference/dist +RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \ + --mount=type=cache,target=/var/lib/apt,sharing=locked \ + apt update && DEBIAN_FRONTEND=noninteractive apt install -y \ + clang \ + cmake \ + gcc g++ \ + libc++-dev \ + libnuma-dev \ + libopenmpi-dev \ + libssl-dev \ + ninja-build \ + openssl \ + python3-dev + +RUN update-alternatives --install /usr/bin/cc cc /usr/bin/clang 10 \ + && update-alternatives --install /usr/bin/c++ c++ /usr/bin/clang 10 \ + && update-alternatives --auto cc \ + && update-alternatives --auto c++ \ + && update-alternatives --display cc \ + && update-alternatives --display c++ \ + && cc --version \ + && c++ --version + +COPY --from=planner /usr/src/text-generation-inference/recipe.json recipe.json +RUN cargo chef cook --profile release-opt --package text-generation-backend-llamacpp --bin text-generation-backend-llamacpp --recipe-path recipe.json + +COPY Cargo.lock Cargo.lock +COPY Cargo.toml Cargo.toml +COPY rust-toolchain.toml rust-toolchain.toml +COPY backends backends +COPY benchmark benchmark +COPY launcher launcher +COPY router router + +ENV RUSTFLAGS="-L/usr/lib" +ENV CMAKE_INSTALL_PREFIX=/usr/src/text-generation-inference/dist +RUN cargo build --profile release-opt --package text-generation-backend-llamacpp --bin text-generation-backend-llamacpp --frozen + +FROM ubuntu:22.04 AS mimalloc-builder +ENV DEBIAN_FRONTEND=noninteractive +ENV MIMALLOC_VERSION=2.1.7 +RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \ + --mount=type=cache,target=/var/lib/apt,sharing=locked \ + apt update && \ + apt upgrade -y && \ + apt install -y \ + clang \ + cmake \ + ninja-build \ + wget + +RUN wget https://github.com/microsoft/mimalloc/archive/refs/tags/v${MIMALLOC_VERSION}.tar.gz -O mimalloc-${MIMALLOC_VERSION}.tar.gz && \ + tar -xzf mimalloc-${MIMALLOC_VERSION}.tar.gz && \ + cd mimalloc-${MIMALLOC_VERSION} && \ + cmake -G Ninja -DCMAKE_BUILD_TYPE=Release -B build . && \ + cmake --build build --parallel && \ + cmake --install build + +FROM ubuntu:22.04 +ENV DEBIAN_FRONTEND=noninteractive + +RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \ + --mount=type=cache,target=/var/lib/apt,sharing=locked \ + apt update && \ + apt upgrade -y && \ + apt install -y \ + libopenmpi3 \ + numactl \ + openssl \ + python3.11-dev \ + python3.11-venv + +COPY --from=builder /usr/src/text-generation-inference/target/release-opt/text-generation-backend-llamacpp /usr/src/text-generation-inference/text-generation-launcher +COPY --from=builder /usr/src/text-generation-inference/dist /usr/ +COPY --from=builder /usr/src/text-generation-inference/backends/llamacpp/requirements.txt requirements.txt +COPY --from=mimalloc-builder /usr/local/lib/libmimalloc.so.2.1 /usr/lib/libmimalloc.so.2.1 + +RUN /usr/bin/python3.11 -m venv /usr/src/text-generation-inference/venv +ENV PATH="/usr/src/text-generation-inference/venv/bin:$PATH" +RUN pip3 install --no-cache-dir -r requirements.txt +ENV PORT=8080 +WORKDIR /usr/src/text-generation-inference +ENTRYPOINT ["LD_PRELOAD=/usr/lib/libmimalloc.so.2.1", "text-generation-launcher"] \ No newline at end of file diff --git a/LICENSE b/LICENSE index 7d0e80345c7..faa86e9b0a6 100644 --- a/LICENSE +++ b/LICENSE @@ -1,3 +1,4 @@ + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ @@ -186,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2022 Hugging Face + Copyright 2024 Hugging Face Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/backends/llamacpp/CMakeLists.txt b/backends/llamacpp/CMakeLists.txt new file mode 100644 index 00000000000..6599fd692e9 --- /dev/null +++ b/backends/llamacpp/CMakeLists.txt @@ -0,0 +1,68 @@ +cmake_minimum_required(VERSION 3.24) + +project(tgi-llama-cpp-backend VERSION 1.0.0) +set(CMAKE_CXX_STANDARD 23) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +include(FetchContent) + +set(LLAMA_CPP_TARGET_VERSION "b3837" CACHE STRING "Version of llama.cpp to build against") +set(LLAMA_CPP_TARGET_CUDA_ARCHS "75-real;80-real;86-real;89-real;90-real" CACHE STRING "CUDA arch(s) to build") +option(LLAMA_CPP_BUILD_OFFLINE_RUNNER "Flag to build the standalone c++ backend runner") +option(LLAMA_CPP_BUILD_CUDA "Flag to build CUDA enabled inference through llama.cpp") + +if (${CMAKE_CXX_COMPILER_ID} STREQUAL "Clang" AND (${CMAKE_SYSTEM_NAME} STREQUAL "Linux" OR ${CMAKE_SYSTEM_NAME} STREQUAL "Darwin")) + message(STATUS "Targeting libc++") + set(CMAKE_CXX_FLAGS -stdlib=libc++ ${CMAKE_CXX_FLAGS}) +else () + message(STATUS "Not using libc++ ${CMAKE_CXX_COMPILER_ID} ${CMAKE_SYSTEM_NAME}") +endif () + +# add linker options for Darwin +if (${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -L$HOMEBREW_PREFIX/opt/llvm/lib/c++ -L$HOMEBREW_PREFIX/opt/llvm/lib/unwind -lunwind") +endif () + +# Add dependencies +include(cmake/numa.cmake) +include(cmake/spdlog.cmake) + +if (${LLAMA_CPP_BUILD_CUDA}) + message(STATUS "Enabling llama.cpp CUDA support") + + if (NOT DEFINED CMAKE_CUDA_ARCHITECTURES) + set(CMAKE_CUDA_ARCHITECTURES ${LLAMA_CPP_TARGET_CUDA_ARCHS}) + endif () + set(GGML_CUDA ON) +endif () + +# Download llama.cpp repo at the specific version +fetchcontent_declare( + llama + URL https://github.com/ggerganov/llama.cpp/archive/refs/tags/b4215.tar.gz +) + +fetchcontent_makeavailable(llama) + +add_library(tgi_llamacpp_backend_impl STATIC csrc/backend.hpp csrc/backend.cpp) +target_compile_features(tgi_llamacpp_backend_impl PRIVATE cxx_std_11) +target_link_libraries(tgi_llamacpp_backend_impl PUBLIC spdlog::spdlog llama) + +if (NUMA_FOUND) + target_link_libraries(tgi_llamacpp_backend_impl PUBLIC numa) +endif () + +install(TARGETS tgi_llamacpp_backend_impl spdlog llama) + +if (${CMAKE_BUILD_TYPE} STREQUAL "Debug") + target_compile_definitions(tgi_llamacpp_backend_impl PRIVATE TGI_LLAMACPP_BACKEND_DEBUG=1) +endif () + +if (${LLAMA_CPP_BUILD_OFFLINE_RUNNER}) + message(STATUS "Building llama.cpp offline runner") + add_executable(tgi_llamacpp_offline_runner offline/main.cpp) + + target_link_libraries(tgi_llamacpp_offline_runner PUBLIC tgi_llamacpp_backend_impl llama spdlog::spdlog) +endif () + + diff --git a/backends/llamacpp/Cargo.toml b/backends/llamacpp/Cargo.toml new file mode 100644 index 00000000000..df2c3421866 --- /dev/null +++ b/backends/llamacpp/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "text-generation-backend-llamacpp" +version.workspace = true +edition.workspace = true +authors.workspace = true +homepage.workspace = true + +[dependencies] +async-trait = "0.1" +async-channel = "2.3" +clap = { version = "4.5.19", features = ["derive"] } +cxx = "1.0" +num_cpus = "1" +hf-hub = { workspace = true } +image = { version = "0.25.1", features = ["default-formats"] } +metrics = { workspace = true } +metrics-exporter-prometheus = { workspace = true } +serde_json = "1.0.128" +text-generation-router = { path = "../../router" } +thiserror = "1.0.64" +tokio = "1.40.0" +tokio-stream = "0.1.16" +tokenizers = { workspace = true } +tracing = "0.1" +tracing-opentelemetry = "0.27.0" +tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] } +utoipa = { version = "4.2.3", features = ["axum_extras"] } +log = "0.4.22" + +[build-dependencies] +cmake = "0.1" +cxx-build = { version = "1.0", features = ["parallel"] } +pkg-config = "0.3" \ No newline at end of file diff --git a/backends/llamacpp/README.md b/backends/llamacpp/README.md new file mode 100644 index 00000000000..0931339c40d --- /dev/null +++ b/backends/llamacpp/README.md @@ -0,0 +1,17 @@ +## Compiling with MacOS + +To compile the Llama.cpp backend on MacOS, you need to install `clang` and `cmake` via Homebrew: + +```bash +brew install llvm cmake +``` + +You then need to configure CMakelists.txt to use the newly installed clang compiler. +You can do this by configuring your IDE or adding the following lines to the top of the file: + +```cmake +set(CMAKE_C_COMPILER /opt/homebrew/opt/llvm/bin/clang) +set(CMAKE_CXX_COMPILER /opt/homebrew/opt/llvm/bin/clang++) +``` + +CMakelist.txt assumes that Homebrew installs libc++ in `$HOMEBREW_PREFIX/opt/llvm/lib/c++`. \ No newline at end of file diff --git a/backends/llamacpp/build.rs b/backends/llamacpp/build.rs new file mode 100644 index 00000000000..b5fd7bc0463 --- /dev/null +++ b/backends/llamacpp/build.rs @@ -0,0 +1,128 @@ +use cxx_build::CFG; +use std::env; +use std::path::{Path, PathBuf}; + +const CMAKE_LLAMA_CPP_DEFAULT_CUDA_ARCHS: &str = "75-real;80-real;86-real;89-real;90-real"; +const CMAKE_LLAMA_CPP_TARGET: &str = "tgi_llamacpp_backend_impl"; +const CMAKE_LLAMA_CPP_FFI_TARGET: &str = "tgi_llamacpp_backend"; +const MPI_REQUIRED_VERSION: &str = "4.1"; + +const BACKEND_DEPS: [&str; 2] = [CMAKE_LLAMA_CPP_TARGET, CMAKE_LLAMA_CPP_FFI_TARGET]; + +macro_rules! probe { + ($name: expr, $version: expr) => { + if let Err(_) = pkg_config::probe_library($name) { + match pkg_config::probe_library(&format!("{}-{}", $name, $version)) { + Ok(_) => Ok(()), + Err(_) => Err(()), + } + } else { + Ok(()) + } + }; +} + +fn build_backend( + is_debug: bool, + opt_level: &str, + out_dir: &Path, + install_path: &PathBuf, +) -> PathBuf { + let build_cuda = option_env!("LLAMA_CPP_BUILD_CUDA").unwrap_or("OFF"); + let cuda_archs = + option_env!("LLAMA_CPP_TARGET_CUDA_ARCHS").unwrap_or(CMAKE_LLAMA_CPP_DEFAULT_CUDA_ARCHS); + + let _ = cmake::Config::new(".") + .uses_cxx11() + .generator("Ninja") + .profile(match is_debug { + true => "Debug", + false => "Release", + }) + .env("OPT_LEVEL", opt_level) + .define("CMAKE_INSTALL_PREFIX", &install_path) + .define("LLAMA_CPP_BUILD_CUDA", build_cuda) + .define("LLAMA_CPP_TARGET_CUDA_ARCHS", cuda_archs) + .build(); + + // On some x64 and ARM mainly the lib install destination is "lib" and not "lib64" + let lib_path = if install_path.join("lib64").exists() { + install_path.join("lib64") + } else { + install_path.join("lib") + }; + println!("cargo:rustc-link-search=native={}", lib_path.display()); + + let deps_folder = out_dir.join("build").join("_deps"); + deps_folder +} + +fn build_ffi_layer(is_debug: bool, install_prefix: &Path) { + CFG.include_prefix = "backends/llamacpp"; + + let mut bridge = cxx_build::bridge("src/lib.rs"); + + bridge + .static_flag(true) + .std("c++23") + .include(install_prefix.join("include")) + .include("csrc") + .file("csrc/ffi.hpp"); + + if is_debug { + bridge.define("TGI_LLAMACPP_BACKEND_DEBUG", ""); + } + + if probe!("numa", "2.0").is_ok() { + bridge.define("NUMA_AVAILABLE", ""); + }; + + bridge.compile(CMAKE_LLAMA_CPP_FFI_TARGET); // Make sure this target is not the same as cmake above +} + +fn main() { + // Misc variables + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + let build_profile = env::var("PROFILE").unwrap(); + let (is_debug, opt_level) = match build_profile.as_ref() { + "debug" => (true, "0"), + _ => (false, "3"), + }; + + let install_path = env::var("CMAKE_INSTALL_PREFIX") + .map(|val| PathBuf::from(val)) + .unwrap_or(out_dir.join("dist")); + + // Build the backend + let _ = build_backend(is_debug, opt_level, out_dir.as_path(), &install_path); + + // Build the FFI layer calling the backend above + build_ffi_layer(is_debug, &install_path); + + // Emit linkage search path + if probe!("ompi", MPI_REQUIRED_VERSION).is_err() { + panic!("An implement of MPI is required"); + } + + // Backend + BACKEND_DEPS.iter().for_each(|name| { + println!("cargo:rustc-link-lib=static={}", name); + }); + + // Linkage info + println!("cargo:rustc-link-search=native={}", out_dir.display()); + + let spdlog_linkage_target = if is_debug { "spdlogd" } else { "spdlog" }; + println!("cargo:rustc-link-lib=dylib={spdlog_linkage_target}"); + println!("cargo:rustc-link-lib=dylib=ggml"); + println!("cargo:rustc-link-lib=dylib=ggml-base"); + println!("cargo:rustc-link-lib=dylib=ggml-cpu"); + println!("cargo:rustc-link-lib=dylib=ggml-amx"); + println!("cargo:rustc-link-lib=dylib=llama"); + + // Rerun if one of these file change + println!("cargo:rerun-if-changed=CMakeLists.txt"); + println!("cargo:rerun-if-changed=csrc/backend.hpp"); + println!("cargo:rerun-if-changed=csrc/backend.cpp"); + println!("cargo:rerun-if-changed=csrc/ffi.hpp"); +} diff --git a/backends/llamacpp/cmake/numa.cmake b/backends/llamacpp/cmake/numa.cmake new file mode 100644 index 00000000000..94dfddc2779 --- /dev/null +++ b/backends/llamacpp/cmake/numa.cmake @@ -0,0 +1,20 @@ +# Find the numa policy library. +# Output variables: +# NUMA_INCLUDE_DIR : e.g., /usr/include/. +# NUMA_LIBRARY : Library path of numa library +# NUMA_FOUND : True if found. +FIND_PATH(NUMA_INCLUDE_DIR NAME numa.h + HINTS $ENV{HOME}/local/include /opt/local/include /usr/local/include /usr/include) + +FIND_LIBRARY(NUMA_LIBRARY NAME numa + HINTS $ENV{HOME}/local/lib64 $ENV{HOME}/local/lib /usr/local/lib64 /usr/local/lib /opt/local/lib64 /opt/local/lib /usr/lib64 /usr/lib +) + +IF (NUMA_INCLUDE_DIR AND NUMA_LIBRARY) + SET(NUMA_FOUND TRUE) + MESSAGE(STATUS "Found numa library: inc=${NUMA_INCLUDE_DIR}, lib=${NUMA_LIBRARY}") + add_compile_definitions(NUMA_AVAILABLE) +ELSE () + SET(NUMA_FOUND FALSE) + MESSAGE(STATUS "WARNING: Numa library not found.") +ENDIF () \ No newline at end of file diff --git a/backends/llamacpp/cmake/spdlog.cmake b/backends/llamacpp/cmake/spdlog.cmake new file mode 100644 index 00000000000..f9d590a7847 --- /dev/null +++ b/backends/llamacpp/cmake/spdlog.cmake @@ -0,0 +1,23 @@ +set(SPDLOG_USE_FMT ON) +set(SPDLOG_BUILD_SHARED OFF) +set(SPDLOG_FMT_EXTERNAL OFF) +set(SPDLOG_INSTALL ON) +set(SPDLOG_NO_ATOMIC_LEVELS ON) # We are not modifying log levels concurrently + +if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux") + set(SPDLOG_CLOCK_COARSE ON) +endif () + +# Define the level at which SPDLOG_ compilation level is defined +if (CMAKE_BUILD_TYPE STREQUAL "Debug") + message(STATUS "Verbose logging is enabled in debug build") + add_compile_definitions(SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_DEBUG) +else () + add_compile_definitions(SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_INFO) +endif () + +fetchcontent_declare( + spdlog + URL https://github.com/gabime/spdlog/archive/refs/tags/v1.14.1.tar.gz +) +fetchcontent_makeavailable(spdlog) diff --git a/backends/llamacpp/csrc/backend.cpp b/backends/llamacpp/csrc/backend.cpp new file mode 100644 index 00000000000..d3f89adca61 --- /dev/null +++ b/backends/llamacpp/csrc/backend.cpp @@ -0,0 +1,144 @@ +// +// Created by Morgan Funtowicz on 9/28/2024. +// + +#include + +#include +#include +#include +#include + +#include "backend.hpp" + +namespace huggingface::tgi::backends::llamacpp { + + llama_sampler_ptr sampling_params_t::into_llama_sampler(const llama_model *model) const { + auto *sampler = llama_sampler_chain_init({.no_perf = false}); + + // Penalties + llama_sampler_chain_add(sampler, llama_sampler_init_penalties( + llama_n_vocab(model), + llama_token_eos(model), + llama_token_nl(model), + 0.0f, + repetition_penalty, + frequency_penalty, + 0.0f, + false, + false + )); + llama_sampler_chain_add(sampler, llama_sampler_init_top_k(static_cast(top_k))); + + if (0 < top_p && top_p < 1) { + llama_sampler_chain_add(sampler, llama_sampler_init_top_p(top_p, 0)); + } + + llama_sampler_chain_add(sampler, llama_sampler_init_temp(temperature)); + llama_sampler_chain_add(sampler, llama_sampler_init_dist(seed)); + return llama_sampler_ptr(sampler); + } + + std::expected get_batch_from_prompt(std::span prompt) { + auto batch = llama_batch_init(static_cast(prompt.size()), 0, 1); + batch.n_tokens = 0; + + std::for_each(prompt.begin(), prompt.end(), [&batch](const llama_token token) { + batch.token[batch.n_tokens] = token; + batch.pos[batch.n_tokens] = batch.n_tokens; + batch.n_seq_id[batch.n_tokens] = 1; + batch.seq_id[batch.n_tokens][0] = 0; + batch.logits[batch.n_tokens] = false; + batch.n_tokens++; + }); + + batch.logits[batch.n_tokens - 1] = true; + return batch; + } + + int32_t update_batch_for_decoding(llama_batch &batch, llama_token token, size_t position) { + batch.token[0] = token; + batch.pos[0] = static_cast(position); + batch.logits[0] = true; + batch.n_tokens = 1; + return 0; // Decoding will always happen at position 0 + } + + worker_t::worker_t(std::shared_ptr model, const llama_context_params &¶ms) + : model_(model), context_(llama_new_context_with_model(model_.get(), params)) { + +#ifdef TGI_LLAMACPP_BACKEND_DEBUG + char modelName[256]; + llama_model_meta_val_str(model.get(), "general.name", modelName, sizeof(modelName)); + SPDLOG_DEBUG(FMT_STRING("Created llama.cpp backend for model: '{}'"), std::string_view(modelName)); +#endif + } + + std::expected + worker_t::generate(const generation_context_t &generation_context, + const std::optional &callback) const { + // Store information about context and generation size + const auto callback_ = callback.value_or(llama_void_callback); + auto max_new_tokens = generation_context.generation_params.max_new_tokens; + + // Convert sampling params to what llama.cpp is looking for + auto sampler = generation_context.sampling_params.into_llama_sampler(model_.get()); + + // Set up the prompt + if (auto maybe_batch = get_batch_from_prompt(generation_context.input_tokens); maybe_batch.has_value()) { + auto batch = *maybe_batch; + + // Keep track of where we are + auto n_decoded_tokens = 0; + auto position = batch.n_tokens; + auto sampling_index = batch.n_tokens - 1; + + // Decode + for (bool generating = true; generating; ++n_decoded_tokens) { + +#ifdef TGI_LLAMACPP_BACKEND_DEBUG + const auto start = std::chrono::steady_clock::now(); + const auto status = llama_decode(context_.get(), batch); + const auto end = std::chrono::steady_clock::now(); + const auto latency = std::chrono::duration_cast(end - start); + SPDLOG_DEBUG(FMT_STRING("Successfully decoded {:d} token(s) in {}"), batch.n_tokens, latency); +#else + const auto status = llama_decode(context_.get(), batch); +#endif + if (LLAMA_SUCCESS(status)) [[likely]] { + // Sample the new token + auto new_token_id = llama_sampler_sample(sampler.get(), context_.get(), sampling_index); + const auto is_eog = llama_token_is_eog(model_.get(), new_token_id); + const auto *new_token_logits = llama_get_logits_ith(context_.get(), sampling_index) + new_token_id; + + // Handle termination cases + const bool has_reach_max_tokens = n_decoded_tokens >= max_new_tokens - 1; + const bool has_reach_eog = !generation_context.generation_params.ignore_eos_token & is_eog; + const bool is_final = has_reach_max_tokens | has_reach_eog; + + // Bubble up the generated token if a callback is provided + const auto should_stop = callback_(new_token_id, *new_token_logits, is_final, n_decoded_tokens + 1); + + // Compute the continuation flag + generating = !(should_stop | is_final); + + // Update the batch for the next generation + sampling_index = update_batch_for_decoding(batch, new_token_id, position); + position += 1; + } else { + if (status == 1) { + return backend_error_t::NO_KV_SLOT_AVAILABLE; + } else { + return backend_error_t::DECODING_ERROR; + } + } + } + + llama_batch_free(batch); + + return n_decoded_tokens; + } else { + return maybe_batch.error(); + } + } +} \ No newline at end of file diff --git a/backends/llamacpp/csrc/backend.hpp b/backends/llamacpp/csrc/backend.hpp new file mode 100644 index 00000000000..84602e77d08 --- /dev/null +++ b/backends/llamacpp/csrc/backend.hpp @@ -0,0 +1,105 @@ +// +// Created by Morgan Funtowicz on 9/28/2024. +// +#ifndef TGI_LLAMA_CPP_BACKEND_BACKEND_HPP +#define TGI_LLAMA_CPP_BACKEND_BACKEND_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#define LLAMA_SUCCESS(x) x == 0 + +namespace huggingface::tgi::backends::llamacpp { + typedef std::function llama_decode_callback; + static constexpr auto llama_void_callback = [](llama_token, float_t, bool, size_t) -> bool { return false; }; + + /** + * Represent an error which can be returned as part of an std::expected + */ + enum backend_error_t : uint8_t { + // Provided model filepath doesnt exist + MODEL_FILE_DOESNT_EXIST = 1, + NO_KV_SLOT_AVAILABLE = 2, + DECODING_ERROR = 3 + }; + + /** + * Hold all the parameters provided by TGI to sample from the final distribution of tokens + */ + struct sampling_params_t { + uint32_t top_k = std::numeric_limits::max(); + float_t top_p = 1.0f; + float_t frequency_penalty = 0.0f; + float_t repetition_penalty = 0.0f; + float_t temperature = 0.0f; + uint64_t seed = 2014; + + /** + * Convert this GenerationParams to the respective llama_sampler structure + * @param Pointer to the model data + * @return + */ + llama_sampler_ptr into_llama_sampler(const llama_model *pModel) const; + }; + + /** + * Hold all the parameters provided by TGI to control the generation process + */ + struct generation_params_t { + uint32_t max_new_tokens = std::numeric_limits::max(); + bool ignore_eos_token = false; + }; + + /** + * Container structure wrapping up the current generation context composed by: + * - a non-owning view over the prompt tokens + * - the sampling parameters + * - the generation parameters + */ + struct generation_context_t { + generation_params_t generation_params; + sampling_params_t sampling_params; + std::span input_tokens; + }; + + /** + * Represent the actual model execution (i.e. "forward") and generation loop for llama.cpp + */ + class worker_t { + private: + std::shared_ptr model_; + llama_context_ptr context_; + + public: + /** + * Create a new llama.cpp worker from the provided llama_model and the context parameters + * @param model Previously allocated `llama_model` holding the weights of the neural network + * @param params Parameters to allocate the execution context of the model + */ + worker_t(std::shared_ptr, const llama_context_params &&); + + /** + * Generate multiple successive tokens, sampled from the distribution generated by executing a forward pass + * over the neural network operations and matrices + * @param generation_context The generation context holding sampling and generation parameters along with prompt tokens + * @param callback An optional callback function which would be called everytime a new token is sampled + */ + [[nodiscard]] std::expected + generate(const generation_context_t &, const std::optional &) const; + }; +} + +#endif //TGI_LLAMA_CPP_BACKEND_BACKEND_HPP diff --git a/backends/llamacpp/csrc/ffi.hpp b/backends/llamacpp/csrc/ffi.hpp new file mode 100644 index 00000000000..2f1437397ca --- /dev/null +++ b/backends/llamacpp/csrc/ffi.hpp @@ -0,0 +1,263 @@ +// +// Created by mfuntowicz on 10/23/24. +// + +#ifndef TGI_LLAMA_CPP_BACKEND_FFI_HPP +#define TGI_LLAMA_CPP_BACKEND_FFI_HPP + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#ifdef NUMA_AVAILABLE +#define CURRENT_THREAD 0 +#include +#include +#include +#endif + +namespace huggingface::tgi::backends::llamacpp { + class llama_cpp_worker_frontend_t; +} + +#include "backend.hpp" +#include "backends/llamacpp/src/lib.rs.h" +#include "rust/cxx.h" + +namespace huggingface::tgi::backends::llamacpp { + + /** + * Smart pointer to drop a llama_model when going out of scope + */ + auto llama_model_deleter = [](llama_model *model) { llama_free_model(model); }; + auto make_shared_llama_model = [](llama_model *model) { + return std::shared_ptr(model, llama_model_deleter); + }; + + auto get_llama_context_params = [](size_t num_threads) { + auto params = llama_context_default_params(); + params.n_threads = num_threads; + params.n_threads_batch = num_threads; + params.flash_attn = true; + params.no_perf = false; + return params; + }; + + /** + * llama.cpp backend specific exception mapped from `backend_exception_t` to throw at the FFI level and + * allow automatic implementation of Result<_, Exception> from C++ to Rust + */ + class llama_cpp_backend_exception_t : std::exception { + public: + backend_error_t error; + + llama_cpp_backend_exception_t(const backend_error_t error): error(error) {}; + }; + + /** + * Llama.cpp frontend over the worker interfacing with Rust FFI layer + */ + class llama_cpp_worker_frontend_t { + private: + std::shared_ptr model_; + worker_t worker_; + + public: + /** + * Create a new llama.cpp worker frontend allowing to map custom Rust FFI types from CXX crate to c++ boundary + * @param model The `llama_model` to use on the worker + * @param num_threads The number of threads the worker is allowed to spawn accross for its threadpool + */ + explicit llama_cpp_worker_frontend_t(llama_model *model, int32_t num_threads): + model_{ make_shared_llama_model(model) }, worker_(model_, get_llama_context_params(num_threads)) {} + + /** + * Generate a new set of tokens from the provided `input_tokens`, streaming each individual token generated + * through the `callback`. + * Individual tokens are generated using the sampling parameters provided through `sampling_params` and the + * generation parameters, provided through `generation_params` allowing to define the behaviour of the generation loop. + * `ctx` is an opaque structure defined on Rust side which holds stream information to send tokens back to the originating client. + * @param input_tokens Prompt input tokens originating from the tokenization of the request's text input + * @param generation_params Parameters controlling the generation loop such as ignoring the end of sentence token or + * the maximum number of tokens to generate + * @param sampling_params Parameters controlling the sampling process on the final token distribution + * @param ctx Opaque structure from Rust holding HTTP channel to stream back response to the client + * @param callback Function pointer called everytime a new token is generated during the generation loop. + * If this callback returns `true` it signals an early termination request on the Rust side. + * @return Number of generated tokens + */ + size_t stream( + rust::Slice input_tokens, + const generation_params_t generation_params, + const sampling_params_t &sampling_params, + InferContext *ctx, + rust::Fn callback + ) { + // Wrapper around the provided Rust callback to inject the InferContext when returning from the C++ FFI boundaries + // It captures the context (ctx) using reference and will automatically call the Rust callback forwarding the InferContext + auto context_forwarding_callback = + [=, &ctx](uint32_t new_token_id, float_t logits, bool is_eos, size_t n_generated_tokens) -> bool { + return callback(ctx, new_token_id, logits, is_eos, n_generated_tokens); + }; + + // Ask the compiler to create view over Rust slice transmuting from uint32_t* to llama_token* + static auto as_llama_token = [](const uint32_t x){ return static_cast(x); }; + +#ifdef __cpp_lib_ranges_to_container + auto input_tokens_v = input_tokens | std::views::transform(as_llama_token) | std::ranges::to(); +#else + auto input_tokens_ = input_tokens | std::views::transform(as_llama_token); + auto input_tokens_v = std::vector(input_tokens_.begin(), input_tokens_.end()); +#endif + + // Defer the generation to the actual worker_t + const auto generation_context = generation_context_t {generation_params, sampling_params, input_tokens_v}; + if(const auto result = worker_.generate(generation_context, context_forwarding_callback); result.has_value()) [[likely]] { + return *result; + } else { + throw llama_cpp_backend_exception_t(result.error()); + } + } + }; + + /** + * Utility method to allocate a new worker frontend from Rust + * @param modelPath The GGUF model path as an UTF-8 string from Rust + * @param num_threads Integer greater than zero representing the number of threads the worker is allowed to use for computations + * @return unique ownership of `llama_cpp_worker_frontend_t` pointer + */ + std::unique_ptr create_worker_frontend(rust::Str modelPath, uint32_t num_threads) { +#ifdef TGI_LLAMACPP_BACKEND_DEBUG + spdlog::set_level(spdlog::level::debug); +#endif + + // Initialize the numa context from numactl + static const bool INITIALIZED_NUMA_CONTEXT_ONCE = [](){ + llama_numa_init(GGML_NUMA_STRATEGY_NUMACTL); + return true; + }(); + + // Allocate model weights parameters + auto params = llama_model_default_params(); + params.use_mmap = true; + + // Allocate the model from the Rust provided, string path + auto *model = (llama_load_model_from_file(static_cast(modelPath).c_str(), params)); + return std::make_unique(model, static_cast(num_threads)); + } + + /** + * Smart pointer to automatically destroy the underlying numa_bitset * when going out of scope + */ + struct numa_cpumask_deleter { void operator()(struct bitmask* cpumask){ numa_free_cpumask(cpumask); }}; + typedef std::unique_ptr unique_cpumask_ptr; + + /** + * Define the NUMA core and memory affinity for the current thread by binding cores and memory to respective NUMA node(s) + * @param affinity The set of allowed execution cores to inform the scheduler for the current thread + */ + void set_numa_core_affinity(rust::Slice affinity) { +// void set_numactl_core_affinity(std::vector affinity) { +#ifdef NUMA_AVAILABLE + if(numa_available()) { + SPDLOG_INFO("Setting numactl cores affinity to {} for thread {}", affinity, std::this_thread::get_id()); + + auto cpumask = unique_cpumask_ptr(numa_allocate_cpumask()); + std::ranges::for_each(affinity, [&cpumask](size_t cpu) { numa_bitmask_setbit(cpumask.get(), cpu); }); + numa_sched_setaffinity(CURRENT_THREAD, cpumask.get()); + + // Retrieve some information about the current setup + if(const auto numa_num_nodes = numa_num_configured_nodes(); numa_num_nodes > 1) { + const auto *numa_all_cpus = numa_all_cpus_ptr; + SPDLOG_INFO(FMT_STRING("All CPUs: {:b} (# Nodes: {:d}"), *numa_all_cpus->maskp, numa_num_nodes); + + // Retrieve the cpumask specific for the current node + auto cpus_per_node = unique_cpumask_ptr(numa_allocate_cpumask()); + + // Allocate a set which keeps track of which nodes is being targeted + auto numa_spawning_nodes = std::unordered_set(); + for(auto node = 0; node < numa_num_nodes; ++node) { + // Retrieve the cpumask for the target node + numa_node_to_cpus(node, cpus_per_node.get()); + + // intersect which cores on the nodes are targeted, in no one on that specific node + // the value of allocated_cpus_on_node will be 0 as the result of the AND operation. + const auto allocated_cpus_on_node = *cpus_per_node->maskp & *cpumask->maskp; + if(allocated_cpus_on_node > 0) { + + // If we have some cores on the node, attempt to insert in the set of targeted node + if(const auto [_, was_inserted] = numa_spawning_nodes.emplace(node); was_inserted) { + SPDLOG_DEBUG("Allocated thread spawning node: {:d}", node); + } + } + + // Clear all the bits relative to the current node + numa_bitmask_clearall(cpus_per_node.get()); + } + + // Bind the memory if we spawn a single node, otherwise, let's display a warning + if(numa_spawning_nodes.size() == 1) { + SPDLOG_INFO(FMT_STRING("Setting memory affinity to node: {:d}"), *numa_spawning_nodes.begin()); + numa_set_preferred(*numa_spawning_nodes.begin()); + } else { + SPDLOG_WARN(FMT_STRING("Specified thread affinity spawn multiple NUMA nodes: {}"), numa_spawning_nodes); + } + } + +#ifdef TGI_LLAMACPP_BACKEND_DEBUG + // Sanity check in the logs... + auto *cpumask_check = numa_allocate_cpumask(); + numa_sched_getaffinity(CURRENT_THREAD, cpumask_check); + SPDLOG_DEBUG( + FMT_STRING("numa_sched_affinity for thread {} -> {:b}"), + std::this_thread::get_id(), *cpumask_check->maskp); + numa_free_cpumask(cpumask_check); +#endif + } +#else + SPDLOG_WARN("TGI's llama.cpp backend was compiled without NUMA support"); +#endif + } + + /** + * Force an update of the llama.cpp/ggml threadpool, reading from NUMA cores affinity + */ + void update_numa_affinity() { + SPDLOG_INFO("Rebinding NUMA affinity for current worker on thread: {}", std::this_thread::get_id()); + llama_numa_init(ggml_numa_strategy::GGML_NUMA_STRATEGY_NUMACTL); + } +} + +// Error handle converting to rust Result +template +static void trycatch(Try &&func, Fail &&fail) noexcept try { + func(); +} catch (const huggingface::tgi::backends::llamacpp::llama_cpp_backend_exception_t &e) { + switch (e.error) { + case huggingface::tgi::backends::llamacpp::backend_error_t::MODEL_FILE_DOESNT_EXIST: { + fail("Specified model path doesn't exist."); + break; + } + case huggingface::tgi::backends::llamacpp::backend_error_t::NO_KV_SLOT_AVAILABLE: { + fail("Keys/Values cache is full, no slot available for the new batch."); + break; + } + case huggingface::tgi::backends::llamacpp::backend_error_t::DECODING_ERROR: { + fail("An error what detected during the generation."); + break; + } + } + fail(); +} + + + +#endif //TGI_LLAMA_CPP_BACKEND_FFI_HPP diff --git a/backends/llamacpp/offline/main.cpp b/backends/llamacpp/offline/main.cpp new file mode 100644 index 00000000000..fad97b3a1ed --- /dev/null +++ b/backends/llamacpp/offline/main.cpp @@ -0,0 +1,63 @@ +// +// Created by mfuntowicz on 10/3/24. +// +#include + +#include +#include +#include +#include "../csrc/backend.hpp" + +using namespace huggingface::tgi::backends::llamacpp; + +const auto llama_model_deleter = [](llama_model *model) { llama_free_model(model); }; + +int main(int argc, char **argv) { + if (argc < 2) { + fmt::print("No model folder provider"); + return 1; + } + + spdlog::set_level(spdlog::level::debug); + + const auto modelPath = absolute(std::filesystem::path(argv[1])); + const auto params = llama_model_default_params(); + auto model = std::shared_ptr( + llama_load_model_from_file(modelPath.c_str(), params), + llama_model_deleter + ); + + auto prompt = std::string("My name is Morgan"); + auto tokens = std::vector(128); + const auto nb_tokens = llama_tokenize(model.get(), prompt.c_str(), prompt.size(), tokens.data(), tokens.size(), + true, + false); + tokens.resize(nb_tokens); + llama_numa_init(ggml_numa_strategy::GGML_NUMA_STRATEGY_DISTRIBUTE); + auto backend = worker_t(model, llama_context_default_params()); + + fmt::println("Tokenized: {}", tokens); + + // generate + auto generated_tokens = std::vector(32); + const auto n_generated_tokens = backend.generate( + {{.max_new_tokens = 32}, {.top_k = 40, .top_p = 0.95, .temperature = 0.8}, + tokens}, + [&generated_tokens](llama_token new_token_id, float_t logit, bool is_eos, size_t step) -> bool { + generated_tokens.emplace(generated_tokens.begin() + (step - 1), new_token_id); + return false; + } + ); + generated_tokens.resize(n_generated_tokens.value()); + + std::string decoded = std::string(256, 'a'); + const size_t length = llama_detokenize(model.get(), + generated_tokens.data(), + generated_tokens.size(), + decoded.data(), + decoded.size(), + false, false); + decoded.resize(std::min(length, decoded.size())); + fmt::println("Generated tokens: {}", generated_tokens); + fmt::println("Generated text: {}", decoded); +} diff --git a/backends/llamacpp/requirements.txt b/backends/llamacpp/requirements.txt new file mode 100644 index 00000000000..2372d58ba53 --- /dev/null +++ b/backends/llamacpp/requirements.txt @@ -0,0 +1 @@ +transformers==4.45.2 ; python_version >= "3.9" and python_version < "3.13" \ No newline at end of file diff --git a/backends/llamacpp/src/backend.rs b/backends/llamacpp/src/backend.rs new file mode 100644 index 00000000000..e1575b1d027 --- /dev/null +++ b/backends/llamacpp/src/backend.rs @@ -0,0 +1,496 @@ +use crate::ffi::{ + create_worker_frontend, set_numa_core_affinity, update_numa_affinity, GenerationParams, + LlamaCppWorkerFrontend, SamplingParams, +}; +use async_channel::{unbounded as mpmc_unbounded, Receiver as MpmcReceiver, Sender as MpmcSender}; +use async_trait::async_trait; +use cxx::UniquePtr; +use log::warn; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::thread::spawn; +use text_generation_router::infer::InferError::GenerationError; +use text_generation_router::infer::{Backend, GeneratedText, InferError, InferStreamResponse}; +use text_generation_router::validation::{ + ValidGenerateRequest, ValidParameters, ValidStoppingParameters, +}; +use text_generation_router::{FinishReason, Token}; +use thiserror::Error; +use tokenizers::Tokenizer; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::task::JoinHandle; +use tokio::time::Instant; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::{debug, error, info}; + +/// Detect the number of CPU cores on the machine +/// +/// returns: usize Integer greater than 0 representing the number of CPU cores on the machine +/// +#[cfg(not(test))] +fn get_num_cores() -> usize { + match option_env!("TGI_USE_PHYSICAL_CORES") + .unwrap_or("OFF") + .to_uppercase() + .as_str() + { + "ON" => { + info!("Using only physical cores on the machine"); + num_cpus::get_physical() + } + _ => { + info!("Using physical and logical cores on the machine"); + num_cpus::get() + } + } +} + +#[cfg(test)] +fn get_num_cores() -> usize { + match option_env!("TGI_USE_PHYSICAL_CORES") + .unwrap_or("OFF") + .to_uppercase() + .as_str() + { + "ON" => 16, + _ => 32, + } +} + +/// Subdivide the set of CPU cores available on the system to equal, non-overlapping, subsets of CPU cores +/// +/// # Arguments +/// +/// * `num_cores_per_instance`: Minimum number of cores for each instance +/// +/// returns: Vec, Global> +/// +/// # Examples +/// +/// ``` +/// +/// ``` +fn get_cores_allocation(num_cores_per_instance: usize) -> Vec> { + // Get the total number of cores on the CPU + let cores_count = get_num_cores(); + + // Make sure each instance has some cores available + let mut effective_num_cores_per_instance = match num_cores_per_instance { + 0 => cores_count, + _ => num_cores_per_instance, + }; + + // If we have spare cores, let's see if we can give everyone one more core + let num_instances = cores_count / effective_num_cores_per_instance; + + (0..num_instances) + .map(|ordinal| { + let start = ordinal * effective_num_cores_per_instance; + let end = (ordinal + 1) * effective_num_cores_per_instance - 1; + start..end + }) + .collect() +} + +type InferResult = Result; + +unsafe impl Send for LlamaCppWorkerFrontend {} + +impl From<&ValidParameters> for SamplingParams { + fn from(v: &ValidParameters) -> Self { + Self { + top_k: v.top_k, + top_p: v.top_p, + frequency_penalty: v.frequency_penalty, + repetition_penalty: v.repetition_penalty, + temperature: v.temperature, + seed: v.seed, + } + } +} + +impl From<&ValidStoppingParameters> for GenerationParams { + fn from(v: &ValidStoppingParameters) -> Self { + Self { + max_new_tokens: v.max_new_tokens, + ignore_eos_token: v.ignore_eos_token, + } + } +} + +#[cfg_attr(debug_assertions, derive(Debug))] +pub(crate) struct GenerationContext { + pub(crate) input_tokens: Arc>, + pub(crate) generated_tokens: Vec, + pub(crate) generation_params: GenerationParams, + pub(crate) sampling_params: SamplingParams, +} + +pub(crate) struct InferContext<'a> { + pub(crate) start: Instant, + pub(crate) stream: UnboundedSender, + pub(crate) tokenizer: &'a Tokenizer, + pub(crate) generation: GenerationContext, +} + +#[derive(Debug, Error)] +pub enum LlamaCppBackendError { + #[error("Provided GGUF model path {0} doesn't exist")] + ModelFileDoesntExist(String), + + #[error("Failed to initialize model from GGUF file {0}: {1}")] + ModelInitializationFailed(PathBuf, String), +} + +pub struct LlamaCppBackend { + scheduler_sender: UnboundedSender<(GenerationContext, UnboundedSender)>, + scheduler_handle: JoinHandle<()>, +} + +impl LlamaCppBackend { + /// Attempt to create a new llama.cpp worker from the provided model path + /// + /// # Arguments + /// + /// * `path`: Path to the GGUF model file to load + /// * `num_threads`: Number of cores the model is allowed to spawn for its computations + /// + /// returns: Result, LlamaCppBackendError> + /// + /// # Examples + /// + /// ``` + /// + /// ``` + fn allocate_worker( + path: &Path, + num_threads: u32, + ) -> Result, LlamaCppBackendError> { + create_worker_frontend(&path.display().to_string(), num_threads).map_err(|ref err| { + LlamaCppBackendError::ModelInitializationFailed(path.to_path_buf(), err.to_string()) + }) + } + + pub fn new>( + model_path: P, + tokenizer: Arc, + num_cores_per_instance: u16, + ) -> Result { + let path = model_path.as_ref(); + if !path.exists() { + return Err(LlamaCppBackendError::ModelFileDoesntExist( + path.display().to_string(), + )); + } + + // Allocate the multi-consumer queue to orchestrate all the workers + let (backlog_submitter, backlog_receiver) = mpmc_unbounded(); + + // Allocate all the workers + let cores_allocation = get_cores_allocation(num_cores_per_instance as usize); + cores_allocation.iter().for_each(|affinity| { + match Self::allocate_worker(path, affinity.len() as u32) { + Ok(worker) => { + let tokenizer = Arc::clone(&tokenizer); + let affinity = affinity.clone().collect::>(); + let backlog_receiver = backlog_receiver.clone(); + spawn(move || worker_loop(worker, affinity, tokenizer, backlog_receiver)); + } + Err(e) => {} + } + }); + + // Start the scheduler loop + let (scheduler_sender, scheduler_receiver) = unbounded_channel(); + let scheduler_handle = tokio::spawn(scheduler_loop(scheduler_receiver, backlog_submitter)); + Ok(Self { + scheduler_sender, + scheduler_handle, + }) + } +} + +/// llama.cpp worker actual streaming callback, called everytime a new token is being generated +/// +/// # Arguments +/// +/// * `ctx`: InferContext holding the channel to stream back generated token to the client. +/// *UNSAFE* This parameter is unsafe and represented as a mutable pointer to avoid automatic drop of its +/// referenced resources after the first iteration step. +/// It's the responsibility of the caller to ensure a `Box::from_raw` is taking back full ownership of the pointer +/// for correct deletion. +/// * `new_token_id`: The sampled token identifier +/// * `new_token_logit`: the sampled token identifier log probability +/// * `is_final`: Flag indicating if the sampled token is a final one +/// * `n_generated_tokens`: Counter representing the actual number of token generated at this stage +/// +/// returns: bool `true` if the worker should stop the generation at this stage, `false` to continue +/// +/// # Examples +/// +/// ``` +/// +/// ``` +fn llama_generate_callback( + ctx: *mut InferContext, + new_token_id: u32, + new_token_logit: f32, + is_final: bool, + n_generated_tokens: usize, +) -> bool { + debug!("Generated token: {new_token_id} -> logits={new_token_logit}, is_final={is_final} ({n_generated_tokens})"); + + let ctx = unsafe { &mut *ctx }; + + // Append the new token to the generated ones + ctx.generation.generated_tokens.push(new_token_id); + + // Generate response + let response = match ctx.tokenizer.decode(&[new_token_id], false) { + Ok(text) => { + let special = ctx.tokenizer.get_added_vocabulary().is_special_token(&text); + let token = Token { + id: new_token_id, + text, + logprob: new_token_logit, + special, + }; + + // Should we generate an ending or intermediate response? + match is_final { + false => Ok(InferStreamResponse::Intermediate { + token, + top_tokens: vec![], + }), + true => { + // Decode the whole text + match ctx + .tokenizer + .decode(&ctx.generation.generated_tokens, false) + { + Ok(text) => Ok(InferStreamResponse::End { + token, + top_tokens: vec![], + generated_text: GeneratedText { + text, + generated_tokens: n_generated_tokens as u32, + finish_reason: FinishReason::Length, + seed: Some(ctx.generation.sampling_params.seed), + }, + start: ctx.start, + queued: ctx.start, + }), + Err(err) => Err(GenerationError(err.to_string())), + } + } + } + } + Err(ref err) => Err(GenerationError(err.to_string())), + }; + + // Send back to the client + let status = ctx.stream.send(response).inspect_err(|err| { + error!("Failed to send back the response: {}", err); + }); + status.is_err() +} + +/// Main loop allowing scheduling incoming requests without blocking the main execution thread +/// +/// # Arguments +/// +/// * `queue`: Synchronized container to receive new request +/// * `backlog`: Synchronized container to dispatch new request towards all the workers for one to pick it up. +/// +/// returns: () +/// +/// # Examples +/// +/// ``` +/// +/// ``` +async fn scheduler_loop( + mut queue: UnboundedReceiver<(GenerationContext, UnboundedSender)>, + backlog: MpmcSender<(GenerationContext, UnboundedSender)>, +) { + // Let's receive incoming requests + loop { + match queue.recv().await { + None => break, + Some((ctx, sender)) => { + if let Err(e) = backlog.send((ctx, sender)).await { + todo!("What do we do") + } + } + } + } +} + +/// llama.cpp worker thread receiving incoming requests from the scheduler and handling all generation +/// process along with the streaming logic back to the client. +/// +/// # Arguments +/// +/// * `backend`: Owned llama.cpp worker with allocated execution resources +/// * `affinity`: Set of CPUs to bind the worker's thread for scheduling +/// * `tokenizer`: Tokenizer to use to decode generated token +/// * `backlog`: Multi-consumers queue holding the requests waiting to be handled by a worker +/// +/// returns: () +/// +/// # Examples +/// +/// ``` +/// +/// ``` +fn worker_loop( + mut backend: UniquePtr, + affinity: Vec, + tokenizer: Arc, + backlog: MpmcReceiver<(GenerationContext, UnboundedSender)>, +) { + // This loop will mostly decode single token at every step, so no need to rely on parallelism + tokenizers::utils::parallelism::set_parallelism(false); + + // Bind cores for the current thread and make sure it's taken into account + set_numa_core_affinity(&affinity); + update_numa_affinity(); + + loop { + if let Ok((generation, stream)) = backlog.recv_blocking() { + let start = Instant::now(); + let generation_params = generation.generation_params; // copy + let sampling_params = generation.sampling_params; // copy + let input_tokens = Arc::clone(&generation.input_tokens); + + // Creating the whole InferContext and pushing it to the heap + let ctx = Box::new(InferContext { + start, + stream, + tokenizer: &tokenizer, + generation, + }); + + // We leak the box to avoid it being freed after the first callback call + // when going out of scope + unsafe { + let boxed_ctx = Box::into_raw(ctx); + if let Err(e) = backend.pin_mut().stream( + &input_tokens, + generation_params, + &sampling_params, + boxed_ctx, + llama_generate_callback, + ) { + error!("Error while decoding tokens... {}", e.what()); + // TODO: What error to give back to the user? + } + + // Make sure we re-keep track of the OpaqueStream box + let _ = Box::from_raw(boxed_ctx); + } + } else { + info!("IPC channel is closed, exiting the scheduler loop"); + break; + } + } +} + +#[async_trait] +impl Backend for LlamaCppBackend { + fn schedule( + &self, + request: ValidGenerateRequest, + ) -> Result, InferError> { + if let Some(input_ids) = request.input_ids { + let (sx, rx) = unbounded_channel(); + let sampling_params = SamplingParams::from(&request.parameters); + let generation_params = GenerationParams::from(&request.stopping_parameters); + + let ctx = GenerationContext { + input_tokens: Arc::clone(&input_ids), + generated_tokens: Vec::with_capacity(generation_params.max_new_tokens as usize), + generation_params, + sampling_params, + }; + + // We send the workload to the scheduler + if let Err(e) = self.scheduler_sender.send((ctx, sx)) { + Err(InferError::IncompleteGenerationStream) + } else { + // We are returning the associated channel as early as we can, potentially closing it up + Ok(UnboundedReceiverStream::new(rx)) + } + } else { + Err(GenerationError("Unsupported modalities".to_string())) + } + } + + async fn health(&self, _: bool) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use crate::backend::{get_cores_allocation, get_num_cores}; + + fn test_get_num_cores() { + std::env::set_var("TGI_USE_PHYSICAL_CORES", "OFF"); + assert_eq!(get_num_cores(), 32); + + std::env::set_var("TGI_USE_PHYSICAL_CORES", "ON"); + assert_eq!(get_num_cores(), 16); + } + + fn test_get_cores_allocation_single_instance() { + std::env::set_var("TGI_USE_PHYSICAL_CORES", "OFF"); + let smt_allocation = get_cores_allocation(0); + assert_eq!(smt_allocation.len(), 1); + assert_eq!( + smt_allocation[0].clone().collect::>(), + (0..32).collect::>() + ); + + std::env::set_var("TGI_USE_PHYSICAL_CORES", "ON"); + let smt_allocation = get_cores_allocation(0); + assert_eq!(smt_allocation.len(), 1); + assert_eq!( + smt_allocation[0].clone().collect::>(), + (0..16).collect::>() + ); + } + + fn test_get_cores_allocation_multi_instances() { + for cores_per_instance in [1, 2, 4, 8, 16, 3, 7] { + std::env::set_var("TGI_USE_PHYSICAL_CORES", "OFF"); + + let num_instances = 32 / cores_per_instance; + let smt_allocation = get_cores_allocation(cores_per_instance); + + for i in 0..num_instances { + let start = i * cores_per_instance; + let end = start + cores_per_instance; + assert_eq!( + smt_allocation[i].clone().collect::>(), + (start..end).collect::>() + ); + } + + std::env::set_var("TGI_USE_PHYSICAL_CORES", "ON"); + let num_instances = 16 / cores_per_instance; + let smt_allocation = get_cores_allocation(cores_per_instance); + assert_eq!(smt_allocation.len(), num_instances); + + for i in 0..num_instances { + let start = i * cores_per_instance; + let end = start + cores_per_instance; + assert_eq!( + smt_allocation[i].clone().collect::>(), + (start..end).collect::>() + ); + } + } + } +} diff --git a/backends/llamacpp/src/lib.rs b/backends/llamacpp/src/lib.rs new file mode 100644 index 00000000000..3507217ff86 --- /dev/null +++ b/backends/llamacpp/src/lib.rs @@ -0,0 +1,131 @@ +use crate::backend::InferContext; +use crate::ffi::SamplingParams; + +pub mod backend; + +impl Default for SamplingParams { + fn default() -> Self { + Self { + top_k: u32::MAX, + top_p: 1.0f32, + frequency_penalty: 0.0f32, + repetition_penalty: 0.0f32, + temperature: 1.0f32, + seed: 2014u64, + } + } +} + +#[cxx::bridge(namespace = "huggingface::tgi::backends::llamacpp")] +mod ffi { + #[derive(Debug, Copy, Clone)] + struct GenerationParams { + max_new_tokens: u32, + ignore_eos_token: bool, + } + + #[derive(Debug, Copy, Clone)] + struct SamplingParams { + top_k: u32, + top_p: f32, + frequency_penalty: f32, + repetition_penalty: f32, + temperature: f32, + seed: u64, + } + + extern "Rust" { + type InferContext<'a>; + } + + unsafe extern "C++" { + include!("backends/llamacpp/csrc/ffi.hpp"); + + #[cxx_name = "generation_params_t"] + type GenerationParams; + + #[cxx_name = "sampling_params_t"] + type SamplingParams; + + /// Represent an instance of the llama.cpp backend instance on C++ side + #[cxx_name = "llama_cpp_worker_frontend_t"] + type LlamaCppWorkerFrontend; + + /// Create a new llama.cpp worker + /// + /// # Arguments + /// + /// * `modelPath`: Path to the GGUF model file to load + /// * `num_threads`: Number of threads the worker is allowed to spawn to run computations + /// + /// returns: Result<, > + /// + /// # Examples + /// + /// ``` + /// + /// ``` + fn create_worker_frontend( + modelPath: &str, + num_threads: u32, + ) -> Result>; + + /// Define the NUMA cores affinity on which the current thread is allowed to be scheduled. + /// + /// # Arguments + /// + /// * `affinity`: Set of CPU cores allowed for scheduling + /// + /// returns: () + /// + /// # Examples + /// + /// ``` + /// // Bind the current thread for execution on cores 0, 1, 2, 3 + /// set_numa_core_affinity(&[0, 1, 2, 3]); + /// ``` + fn set_numa_core_affinity(affinity: &[usize]); + + /// Force llama.cpp to reevaluate the allowed NUMA context (core and memory affinity) for + /// its internal threads scheduling. + /// This method can potentially cause llama.cpp / ggml to reallocate its internal threadpool to + /// match the new affinity constraints + /// + /// returns: () + /// + /// # Examples + /// + /// ``` + /// set_numa_core_affinity(&[0, 1, 2, 3]); + /// update_numa_affinity(); + /// ``` + fn update_numa_affinity(); + + /// Generate new tokens from the provided prompt input `tokens` and generation and sampling parameters, + /// streaming back each generated individual token through the `callback`. + /// + /// # Arguments + /// + /// * `tokens`: Prompt input tokenized from the request's text input + /// * `generation_params`: Parameters controling the generation loop + /// * `sampling_params`: Parameters controling the sampling from the token distribution + /// * `stream`: Opaque structure mapping HTTP client transport to stream back token + /// * `callback`: Function pointer called everytime a new token is generated + /// + /// returns: Result> + /// + /// # Examples + /// + /// ``` + /// + /// ``` + unsafe fn stream( + self: Pin<&mut LlamaCppWorkerFrontend>, + tokens: &[u32], + generation_params: GenerationParams, + sampling_params: &SamplingParams, + stream: *mut InferContext, + callback: unsafe fn(*mut InferContext, u32, f32, bool, usize) -> bool, + ) -> Result; + } +} diff --git a/backends/llamacpp/src/main.rs b/backends/llamacpp/src/main.rs new file mode 100644 index 00000000000..adc183edc5b --- /dev/null +++ b/backends/llamacpp/src/main.rs @@ -0,0 +1,211 @@ +use clap::{Parser, Subcommand}; +use std::path::PathBuf; +use std::sync::Arc; +use text_generation_backend_llamacpp::backend::{LlamaCppBackend, LlamaCppBackendError}; +use text_generation_router::server::ApiDoc; +use text_generation_router::{server, usage_stats}; +use thiserror::Error; +use tokenizers::FromPretrainedParameters; + +/// App Configuration +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +struct Args { + #[command(subcommand)] + command: Option, + + #[clap(default_value = "128", long, env)] + max_concurrent_requests: usize, + #[clap(default_value = "2", long, env)] + max_best_of: usize, + #[clap(default_value = "4", long, env)] + max_stop_sequences: usize, + #[clap(default_value = "5", long, env)] + max_top_n_tokens: u32, + #[clap(default_value = "1024", long, env)] + max_input_tokens: usize, + #[clap(default_value = "2048", long, env)] + max_total_tokens: usize, + #[clap(default_value = "4096", long, env)] + max_batch_prefill_tokens: u32, + #[clap(long, env)] + max_batch_total_tokens: Option, + #[clap(long, env)] + max_batch_size: Option, + #[clap(default_value = "0.0.0.0", long, env)] + hostname: String, + #[clap(default_value = "3000", long, short, env)] + port: u16, + #[clap(long, env, help = "Path to GGUF model file(s) to load")] + gguf_path: PathBuf, + #[clap(long, env, help = "Number of CPU core per instance(s)")] + num_cores_per_instance: Option, + #[clap(long, env, required = true)] + tokenizer_name: String, + #[clap(long, env)] + tokenizer_config_path: Option, + #[clap(long, env)] + revision: Option, + #[clap(default_value = "2", long, env)] + validation_workers: usize, + #[clap(long, env)] + api_key: Option, + #[clap(long, env)] + json_output: bool, + #[clap(long, env)] + otlp_endpoint: Option, + #[clap(default_value = "text-generation-inference.router", long, env)] + otlp_service_name: String, + #[clap(long, env)] + cors_allow_origin: Option>, + #[clap(long, env)] + ngrok: bool, + #[clap(long, env)] + ngrok_authtoken: Option, + #[clap(long, env)] + ngrok_edge: Option, + #[clap(long, env, default_value_t = false)] + disable_grammar_support: bool, + #[clap(default_value = "4", long, env)] + max_client_batch_size: usize, + #[clap(default_value = "on", long, env)] + usage_stats: usage_stats::UsageStatsLevel, +} + +#[derive(Debug, Subcommand)] +enum Commands { + PrintSchema, +} + +#[tokio::main] +async fn main() -> Result<(), RouterError> { + // Get args + let args = Args::parse(); + // Pattern match configuration + let Args { + command, + max_concurrent_requests, + max_best_of, + max_stop_sequences, + max_top_n_tokens, + max_input_tokens, + max_total_tokens, + max_batch_prefill_tokens, + max_batch_total_tokens, + max_batch_size, + hostname, + port, + gguf_path, + num_cores_per_instance, + tokenizer_name, + tokenizer_config_path, + revision, + validation_workers, + api_key, + json_output, + otlp_endpoint, + otlp_service_name, + cors_allow_origin, + ngrok, + ngrok_authtoken, + ngrok_edge, + disable_grammar_support, + max_client_batch_size, + usage_stats, + } = args; + + if let Some(Commands::PrintSchema) = command { + use utoipa::OpenApi; + let api_doc = ApiDoc::openapi().to_pretty_json().unwrap(); + println!("{}", api_doc); + std::process::exit(0); + }; + text_generation_router::logging::init_logging(otlp_endpoint, otlp_service_name, json_output); + + // Validate args + if max_input_tokens >= max_total_tokens { + return Err(RouterError::ArgumentValidation( + "`max_input_tokens` must be < `max_total_tokens`".to_string(), + )); + } + if max_input_tokens as u32 > max_batch_prefill_tokens { + return Err(RouterError::ArgumentValidation(format!("`max_batch_prefill_tokens` must be >= `max_input_tokens`. Given: {max_batch_prefill_tokens} and {max_input_tokens}"))); + } + + if validation_workers == 0 { + return Err(RouterError::ArgumentValidation( + "`validation_workers` must be > 0".to_string(), + )); + } + + if let Some(max_batch_total_tokens) = max_batch_total_tokens { + if max_batch_prefill_tokens > max_batch_total_tokens { + return Err(RouterError::ArgumentValidation(format!("`max_batch_prefill_tokens` must be <= `max_batch_total_tokens`. Given: {max_batch_prefill_tokens} and {max_batch_total_tokens}"))); + } + if max_total_tokens as u32 > max_batch_total_tokens { + return Err(RouterError::ArgumentValidation(format!("`max_total_tokens` must be <= `max_batch_total_tokens`. Given: {max_total_tokens} and {max_batch_total_tokens}"))); + } + } + + if let Some(max_batch_size) = max_batch_size { + if max_batch_size == 0 { + return Err(RouterError::ArgumentValidation( + "`max_batch_size` must be > 0".to_string(), + )); + } + } + + let auth_token = std::env::var("HF_TOKEN") + .or_else(|_| std::env::var("HUGGING_FACE_HUB_TOKEN")) + .ok(); + let options = FromPretrainedParameters { + revision: revision.clone().unwrap_or("main".to_string()), + user_agent: Default::default(), + auth_token, + }; + let tokenizer = Arc::new( + tokenizers::Tokenizer::from_pretrained(tokenizer_name.clone(), Some(options)) + .expect("Failed to retrieve tokenizer"), + ); + let backend = LlamaCppBackend::new(gguf_path, tokenizer, num_cores_per_instance.unwrap_or(0))?; + + // Run server + server::run( + backend, + max_concurrent_requests, + max_best_of, + max_stop_sequences, + max_top_n_tokens, + max_input_tokens, + max_total_tokens, + validation_workers, + api_key, + tokenizer_name, + tokenizer_config_path, + revision, + false, + hostname, + port, + cors_allow_origin, + ngrok, + ngrok_authtoken, + ngrok_edge, + disable_grammar_support, + max_client_batch_size, + usage_stats, + ) + .await?; + Ok(()) +} + +#[derive(Debug, Error)] +enum RouterError { + #[error("Argument validation error: {0}")] + ArgumentValidation(String), + #[error("Backend failed: {0}")] + Backend(#[from] LlamaCppBackendError), + #[error("WebServer error: {0}")] + WebServer(#[from] server::WebServerError), + #[error("Tokio runtime failed to start: {0}")] + Tokio(#[from] std::io::Error), +}