diff --git a/runner/Dockerfile b/runner/Dockerfile index f4138e8e..d9ed5cd9 100644 --- a/runner/Dockerfile +++ b/runner/Dockerfile @@ -1,19 +1,14 @@ # Based on https://github.com/huggingface/api-inference-community/blob/main/docker_images/diffusers/Dockerfile - -FROM nvidia/cuda:12.1.1-cudnn8-runtime-ubuntu20.04 -LABEL maintainer="Yondon Fu " - -# Add any system dependency here -# RUN apt-get update -y && apt-get install libXXX -y +FROM nvidia/cuda:12.2.2-cudnn8-devel-ubuntu22.04 ENV DEBIAN_FRONTEND=noninteractive # Install prerequisites RUN apt-get update && \ - apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev \ - libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev \ - xz-utils tk-dev libffi-dev liblzma-dev python3-openssl git \ - ffmpeg + apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev \ + libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev \ + xz-utils tk-dev libffi-dev liblzma-dev python3-openssl git runit libzmq3-dev && \ + rm -rf /var/lib/apt/lists/* # Install pyenv RUN curl https://pyenv.run | bash @@ -25,32 +20,25 @@ ENV PATH $PYENV_ROOT/shims:$PYENV_ROOT/bin:$PATH # Install your desired Python version ARG PYTHON_VERSION=3.11 RUN pyenv install $PYTHON_VERSION && \ - pyenv global $PYTHON_VERSION && \ - pyenv rehash + pyenv global $PYTHON_VERSION && \ + pyenv rehash # Upgrade pip and install your desired packages ARG PIP_VERSION=23.3.2 -RUN pip install --no-cache-dir --upgrade pip==${PIP_VERSION} setuptools==69.5.1 wheel==0.43.0 && \ - pip install --no-cache-dir torch==2.1.1 torchvision==0.16.1 torchaudio==2.1.1 - -WORKDIR /app -COPY ./requirements.txt /app -RUN pip install --no-cache-dir -r requirements.txt - -RUN pip install https://github.com/chengzeyi/stable-fast/releases/download/v1.0.3/stable_fast-1.0.3+torch211cu121-cp311-cp311-manylinux2014_x86_64.whl +RUN pip install --no-cache-dir --upgrade pip==${PIP_VERSION} setuptools==69.5.1 wheel==0.43.0 -# Most DL models are quite large in terms of memory, using workers is a HUGE -# slowdown because of the fork and GIL with python. -# Using multiple pods seems like a better default strategy. -# Feel free to override if it does not make sense for your library. -ARG max_workers=1 -ENV MAX_WORKERS=$max_workers +# Set environment variables +ENV MAX_WORKERS=1 ENV HUGGINGFACE_HUB_CACHE=/models ENV DIFFUSERS_CACHE=/models ENV MODEL_DIR=/models -COPY app/ /app/app -COPY images/ /app/images -COPY bench.py /app/bench.py +# The following steps have been moved to the app Dockerfile: +# - Copying application files +# - Setting up and compiling Go application +# - Setting up runit service directories +# - Creating log directories +# - Setting the init system to runit -CMD ["uvicorn", "app.main:app", "--log-config", "app/cfg/uvicorn_logging_config.json", "--host", "0.0.0.0", "--port", "8000"] +# We're keeping the WORKDIR /app here as it's a good default +WORKDIR /app \ No newline at end of file diff --git a/runner/app/go/ingress/go.mod b/runner/app/go/ingress/go.mod new file mode 100644 index 00000000..e8f0d390 --- /dev/null +++ b/runner/app/go/ingress/go.mod @@ -0,0 +1,13 @@ +module github.com/livepeer/ai-worker/runner/app/go/ingress + +go 1.22 + +toolchain go1.22.8 + +require ( + github.com/go-gst/go-glib v1.0.0 + github.com/go-gst/go-gst v1.0.0 + github.com/pebbe/zmq4 v1.2.11 +) + +require github.com/mattn/go-pointer v0.0.1 // indirect diff --git a/runner/app/go/ingress/go.sum b/runner/app/go/ingress/go.sum new file mode 100644 index 00000000..3d9113a3 --- /dev/null +++ b/runner/app/go/ingress/go.sum @@ -0,0 +1,8 @@ +github.com/go-gst/go-glib v1.0.0 h1:/Gl3lk3M3MmWoSEtOyH3bpUxMUvO1gUL35A2drbr/K0= +github.com/go-gst/go-glib v1.0.0/go.mod h1:7Ehl6klsMBT94bf+Bic9qRyEkXARhhqpiZnU2PXeO6I= +github.com/go-gst/go-gst v1.0.0 h1:YBzE3JVZvbrnWWb/iGCXuiaOvHQ7HW+xXUBR++EgEtQ= +github.com/go-gst/go-gst v1.0.0/go.mod h1:sQMWMnR98s2B4w52e4IXyGvz75rXV8CZ1bejdPT3KIs= +github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0= +github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc= +github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q= +github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= diff --git a/runner/app/go/ingress/main.go b/runner/app/go/ingress/main.go new file mode 100644 index 00000000..b1a9dcaf --- /dev/null +++ b/runner/app/go/ingress/main.go @@ -0,0 +1,173 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/go-gst/go-gst/gst" + "github.com/go-gst/go-gst/gst/app" + "github.com/go-gst/go-glib/glib" + zmq "github.com/pebbe/zmq4" +) + +func main() { + // Initialize GStreamer + gst.Init(nil) + + // Create a new pipeline + pipeline, err := gst.NewPipeline("") + if err != nil { + log.Fatalf("Failed to create pipeline: %s", err) + } + + // Create elements + src, err := gst.NewElement("filesrc") + if err != nil { + log.Fatalf("Failed to create filesrc: %s", err) + } + src.SetProperty("location", "10s.mp4") // Replace with your MP4 file path + + decodebin, err := gst.NewElement("decodebin") + if err != nil { + log.Fatalf("Failed to create decodebin: %s", err) + } + + queue, err := gst.NewElement("queue") + if err != nil { + log.Fatalf("Failed to create queue: %s", err) + } + queue.SetProperty("max-size-buffers", uint(1)) + queue.SetProperty("max-size-time", uint64(0)) + queue.SetProperty("max-size-bytes", uint(0)) + + videoconvert, err := gst.NewElement("videoconvert") + if err != nil { + log.Fatalf("Failed to create videoconvert: %s", err) + } + + videoscale, err := gst.NewElement("videoscale") + if err != nil { + log.Fatalf("Failed to create videoscale: %s", err) + } + + capsfilter, err := gst.NewElement("capsfilter") + if err != nil { + log.Fatalf("Failed to create capsfilter: %s", err) + } + capsfilter.SetProperty("caps", gst.NewCapsFromString("video/x-raw,width=512,height=512")) + + jpegenc, err := gst.NewElement("jpegenc") + if err != nil { + log.Fatalf("Failed to create jpegenc: %s", err) + } + + appsink, err := app.NewAppSink() + if err != nil { + log.Fatalf("Failed to create appsink: %s", err) + } + appsink.SetProperty("max-buffers", uint(1)) + appsink.SetProperty("drop", true) + appsink.SetProperty("sync", false) + + // Add elements to pipeline + pipeline.AddMany(src, decodebin, queue, videoconvert, videoscale, capsfilter, jpegenc, appsink.Element) + + // Link elements + src.Link(decodebin) + queue.Link(videoconvert) + videoconvert.Link(videoscale) + videoscale.Link(capsfilter) + capsfilter.Link(jpegenc) + jpegenc.Link(appsink.Element) + + // Connect pad-added signal for decodebin + decodebin.Connect("pad-added", func(element *gst.Element, pad *gst.Pad) { + sinkpad := queue.GetStaticPad("sink") + pad.Link(sinkpad) + }) + + // Set up ZMQ PUB socket + publisher, err := zmq.NewSocket(zmq.PUB) + if err != nil { + log.Fatalf("Failed to create ZMQ socket: %s", err) + } + defer publisher.Close() + + // Set high water mark + err = publisher.SetSndhwm(1) + if err != nil { + log.Fatalf("Failed to set high water mark: %s", err) + } + + err = publisher.Bind("tcp://*:5555") + if err != nil { + log.Fatalf("Failed to bind ZMQ socket: %s", err) + } + + // Start playing + pipeline.SetState(gst.StatePlaying) + + // Variables for frame rate calculation + frameCount := 0 + startTime := time.Now() + + // Callback function for new samples + appsink.SetCallbacks(&app.SinkCallbacks{ + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { + sample := sink.PullSample() + if sample == nil { + log.Printf("Failed to pull sample") + return gst.FlowError + } + + buffer := sample.GetBuffer() + data := buffer.Bytes() + + // Send frame via ZMQ + _, err = publisher.SendBytes(data, zmq.DONTWAIT) + if err != nil { + log.Printf("Failed to send frame: %s", err) + } + + // Update frame count and calculate FPS + frameCount++ + elapsed := time.Since(startTime) + if elapsed >= time.Second { + fps := float64(frameCount) / elapsed.Seconds() + log.Printf("Producer FPS: %.2f", fps) + frameCount = 0 + startTime = time.Now() + } + + return gst.FlowOK + }, + }) + + // Create a main loop + mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) + + // Add a message handler to the pipeline bus + pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { + switch msg.Type() { + case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop + pipeline.BlockSetState(gst.StateNull) + mainLoop.Quit() + case gst.MessageError: // Error messages are always fatal + err := msg.ParseError() + fmt.Println("ERROR:", err.Error()) + if debug := err.DebugString(); debug != "" { + fmt.Println("DEBUG:", debug) + } + mainLoop.Quit() + default: + // All messages implement a Stringer. However, this is + // typically an expensive thing to do and should be avoided. + fmt.Println(msg) + } + return true + }) + + // Run the main loop + mainLoop.Run() +} \ No newline at end of file diff --git a/runner/app/python/infer/StreamDiffusionWrapper/__init__.py b/runner/app/python/infer/StreamDiffusionWrapper/__init__.py new file mode 100644 index 00000000..f53940ec --- /dev/null +++ b/runner/app/python/infer/StreamDiffusionWrapper/__init__.py @@ -0,0 +1,3 @@ +from .wrapper import StreamDiffusionWrapper + +__all__ = ["StreamDiffusionWrapper"] diff --git a/runner/app/python/infer/StreamDiffusionWrapper/wrapper.py b/runner/app/python/infer/StreamDiffusionWrapper/wrapper.py new file mode 100644 index 00000000..558848ee --- /dev/null +++ b/runner/app/python/infer/StreamDiffusionWrapper/wrapper.py @@ -0,0 +1,665 @@ +# Copied from StreamDiffusion/utils/wrapper.py + +import gc +import os +from pathlib import Path +import traceback +from typing import List, Literal, Optional, Union, Dict + +import numpy as np +import torch +from diffusers import AutoencoderTiny, StableDiffusionPipeline +from PIL import Image + +from streamdiffusion import StreamDiffusion +from streamdiffusion.image_utils import postprocess_image + + +torch.set_grad_enabled(False) +torch.backends.cuda.matmul.allow_tf32 = True +torch.backends.cudnn.allow_tf32 = True + + +class StreamDiffusionWrapper: + def __init__( + self, + model_id_or_path: str, + t_index_list: List[int], + lora_dict: Optional[Dict[str, float]] = None, + mode: Literal["img2img", "txt2img"] = "img2img", + output_type: Literal["pil", "pt", "np", "latent"] = "pil", + lcm_lora_id: Optional[str] = None, + vae_id: Optional[str] = None, + device: Literal["cpu", "cuda"] = "cuda", + dtype: torch.dtype = torch.float16, + frame_buffer_size: int = 1, + width: int = 512, + height: int = 512, + warmup: int = 10, + acceleration: Literal["none", "xformers", "tensorrt"] = "tensorrt", + do_add_noise: bool = True, + device_ids: Optional[List[int]] = None, + use_lcm_lora: bool = True, + use_tiny_vae: bool = True, + enable_similar_image_filter: bool = False, + similar_image_filter_threshold: float = 0.98, + similar_image_filter_max_skip_frame: int = 10, + use_denoising_batch: bool = True, + cfg_type: Literal["none", "full", "self", "initialize"] = "self", + seed: int = 2, + use_safety_checker: bool = False, + engine_dir: Optional[Union[str, Path]] = "engines", + ): + """ + Initializes the StreamDiffusionWrapper. + + Parameters + ---------- + model_id_or_path : str + The model id or path to load. + t_index_list : List[int] + The t_index_list to use for inference. + lora_dict : Optional[Dict[str, float]], optional + The lora_dict to load, by default None. + Keys are the LoRA names and values are the LoRA scales. + Example: {'LoRA_1' : 0.5 , 'LoRA_2' : 0.7 ,...} + mode : Literal["img2img", "txt2img"], optional + txt2img or img2img, by default "img2img". + output_type : Literal["pil", "pt", "np", "latent"], optional + The output type of image, by default "pil". + lcm_lora_id : Optional[str], optional + The lcm_lora_id to load, by default None. + If None, the default LCM-LoRA + ("latent-consistency/lcm-lora-sdv1-5") will be used. + vae_id : Optional[str], optional + The vae_id to load, by default None. + If None, the default TinyVAE + ("madebyollin/taesd") will be used. + device : Literal["cpu", "cuda"], optional + The device to use for inference, by default "cuda". + dtype : torch.dtype, optional + The dtype for inference, by default torch.float16. + frame_buffer_size : int, optional + The frame buffer size for denoising batch, by default 1. + width : int, optional + The width of the image, by default 512. + height : int, optional + The height of the image, by default 512. + warmup : int, optional + The number of warmup steps to perform, by default 10. + acceleration : Literal["none", "xformers", "tensorrt"], optional + The acceleration method, by default "tensorrt". + do_add_noise : bool, optional + Whether to add noise for following denoising steps or not, + by default True. + device_ids : Optional[List[int]], optional + The device ids to use for DataParallel, by default None. + use_lcm_lora : bool, optional + Whether to use LCM-LoRA or not, by default True. + use_tiny_vae : bool, optional + Whether to use TinyVAE or not, by default True. + enable_similar_image_filter : bool, optional + Whether to enable similar image filter or not, + by default False. + similar_image_filter_threshold : float, optional + The threshold for similar image filter, by default 0.98. + similar_image_filter_max_skip_frame : int, optional + The max skip frame for similar image filter, by default 10. + use_denoising_batch : bool, optional + Whether to use denoising batch or not, by default True. + cfg_type : Literal["none", "full", "self", "initialize"], + optional + The cfg_type for img2img mode, by default "self". + You cannot use anything other than "none" for txt2img mode. + seed : int, optional + The seed, by default 2. + use_safety_checker : bool, optional + Whether to use safety checker or not, by default False. + """ + self.sd_turbo = "turbo" in model_id_or_path + + if mode == "txt2img": + if cfg_type != "none": + raise ValueError( + f"txt2img mode accepts only cfg_type = 'none', but got {cfg_type}" + ) + if use_denoising_batch and frame_buffer_size > 1: + if not self.sd_turbo: + raise ValueError( + "txt2img mode cannot use denoising batch with frame_buffer_size > 1." + ) + + if mode == "img2img": + if not use_denoising_batch: + raise NotImplementedError( + "img2img mode must use denoising batch for now." + ) + + self.device = device + self.dtype = dtype + self.width = width + self.height = height + self.mode = mode + self.output_type = output_type + self.frame_buffer_size = frame_buffer_size + self.batch_size = ( + len(t_index_list) * frame_buffer_size + if use_denoising_batch + else frame_buffer_size + ) + + self.use_denoising_batch = use_denoising_batch + self.use_safety_checker = use_safety_checker + + self.stream: StreamDiffusion = self._load_model( + model_id_or_path=model_id_or_path, + lora_dict=lora_dict, + lcm_lora_id=lcm_lora_id, + vae_id=vae_id, + t_index_list=t_index_list, + acceleration=acceleration, + warmup=warmup, + do_add_noise=do_add_noise, + use_lcm_lora=use_lcm_lora, + use_tiny_vae=use_tiny_vae, + cfg_type=cfg_type, + seed=seed, + engine_dir=engine_dir, + ) + + if device_ids is not None: + self.stream.unet = torch.nn.DataParallel( + self.stream.unet, device_ids=device_ids + ) + + if enable_similar_image_filter: + self.stream.enable_similar_image_filter(similar_image_filter_threshold, similar_image_filter_max_skip_frame) + + def prepare( + self, + prompt: str, + negative_prompt: str = "", + num_inference_steps: int = 50, + guidance_scale: float = 1.2, + delta: float = 1.0, + ) -> None: + """ + Prepares the model for inference. + + Parameters + ---------- + prompt : str + The prompt to generate images from. + num_inference_steps : int, optional + The number of inference steps to perform, by default 50. + guidance_scale : float, optional + The guidance scale to use, by default 1.2. + delta : float, optional + The delta multiplier of virtual residual noise, + by default 1.0. + """ + self.stream.prepare( + prompt, + negative_prompt, + num_inference_steps=num_inference_steps, + guidance_scale=guidance_scale, + delta=delta, + ) + + def __call__( + self, + image: Optional[Union[str, Image.Image, torch.Tensor]] = None, + prompt: Optional[str] = None, + ) -> Union[Image.Image, List[Image.Image]]: + """ + Performs img2img or txt2img based on the mode. + + Parameters + ---------- + image : Optional[Union[str, Image.Image, torch.Tensor]] + The image to generate from. + prompt : Optional[str] + The prompt to generate images from. + + Returns + ------- + Union[Image.Image, List[Image.Image]] + The generated image. + """ + if self.mode == "img2img": + return self.img2img(image, prompt) + else: + return self.txt2img(prompt) + + def txt2img( + self, prompt: Optional[str] = None + ) -> Union[Image.Image, List[Image.Image], torch.Tensor, np.ndarray]: + """ + Performs txt2img. + + Parameters + ---------- + prompt : Optional[str] + The prompt to generate images from. + + Returns + ------- + Union[Image.Image, List[Image.Image]] + The generated image. + """ + if prompt is not None: + self.stream.update_prompt(prompt) + + if self.sd_turbo: + image_tensor = self.stream.txt2img_sd_turbo(self.batch_size) + else: + image_tensor = self.stream.txt2img(self.frame_buffer_size) + image = self.postprocess_image(image_tensor, output_type=self.output_type) + + if self.use_safety_checker: + safety_checker_input = self.feature_extractor( + image, return_tensors="pt" + ).to(self.device) + _, has_nsfw_concept = self.safety_checker( + images=image_tensor.to(self.dtype), + clip_input=safety_checker_input.pixel_values.to(self.dtype), + ) + image = self.nsfw_fallback_img if has_nsfw_concept[0] else image + + return image + + def img2img( + self, image: Union[str, Image.Image, torch.Tensor], prompt: Optional[str] = None + ) -> Union[Image.Image, List[Image.Image], torch.Tensor, np.ndarray]: + """ + Performs img2img. + + Parameters + ---------- + image : Union[str, Image.Image, torch.Tensor] + The image to generate from. + + Returns + ------- + Image.Image + The generated image. + """ + if prompt is not None: + self.stream.update_prompt(prompt) + + if isinstance(image, str) or isinstance(image, Image.Image): + image = self.preprocess_image(image) + + image_tensor = self.stream(image) + image = self.postprocess_image(image_tensor, output_type=self.output_type) + + if self.use_safety_checker: + safety_checker_input = self.feature_extractor( + image, return_tensors="pt" + ).to(self.device) + _, has_nsfw_concept = self.safety_checker( + images=image_tensor.to(self.dtype), + clip_input=safety_checker_input.pixel_values.to(self.dtype), + ) + image = self.nsfw_fallback_img if has_nsfw_concept[0] else image + + return image + + def preprocess_image(self, image: Union[str, Image.Image]) -> torch.Tensor: + """ + Preprocesses the image. + + Parameters + ---------- + image : Union[str, Image.Image, torch.Tensor] + The image to preprocess. + + Returns + ------- + torch.Tensor + The preprocessed image. + """ + if isinstance(image, str): + image = Image.open(image).convert("RGB").resize((self.width, self.height)) + if isinstance(image, Image.Image): + image = image.convert("RGB").resize((self.width, self.height)) + + return self.stream.image_processor.preprocess( + image, self.height, self.width + ).to(device=self.device, dtype=self.dtype) + + def postprocess_image( + self, image_tensor: torch.Tensor, output_type: str = "pil" + ) -> Union[Image.Image, List[Image.Image], torch.Tensor, np.ndarray]: + """ + Postprocesses the image. + + Parameters + ---------- + image_tensor : torch.Tensor + The image tensor to postprocess. + + Returns + ------- + Union[Image.Image, List[Image.Image]] + The postprocessed image. + """ + if self.frame_buffer_size > 1: + return postprocess_image(image_tensor.cpu(), output_type=output_type) + else: + return postprocess_image(image_tensor.cpu(), output_type=output_type)[0] + + def _load_model( + self, + model_id_or_path: str, + t_index_list: List[int], + lora_dict: Optional[Dict[str, float]] = None, + lcm_lora_id: Optional[str] = None, + vae_id: Optional[str] = None, + acceleration: Literal["none", "xformers", "tensorrt"] = "tensorrt", + warmup: int = 10, + do_add_noise: bool = True, + use_lcm_lora: bool = True, + use_tiny_vae: bool = True, + cfg_type: Literal["none", "full", "self", "initialize"] = "self", + seed: int = 2, + engine_dir: Optional[Union[str, Path]] = "engines", + ) -> StreamDiffusion: + """ + Loads the model. + + This method does the following: + + 1. Loads the model from the model_id_or_path. + 2. Loads and fuses the LCM-LoRA model from the lcm_lora_id if needed. + 3. Loads the VAE model from the vae_id if needed. + 4. Enables acceleration if needed. + 5. Prepares the model for inference. + 6. Load the safety checker if needed. + + Parameters + ---------- + model_id_or_path : str + The model id or path to load. + t_index_list : List[int] + The t_index_list to use for inference. + lora_dict : Optional[Dict[str, float]], optional + The lora_dict to load, by default None. + Keys are the LoRA names and values are the LoRA scales. + Example: {'LoRA_1' : 0.5 , 'LoRA_2' : 0.7 ,...} + lcm_lora_id : Optional[str], optional + The lcm_lora_id to load, by default None. + vae_id : Optional[str], optional + The vae_id to load, by default None. + acceleration : Literal["none", "xfomers", "sfast", "tensorrt"], optional + The acceleration method, by default "tensorrt". + warmup : int, optional + The number of warmup steps to perform, by default 10. + do_add_noise : bool, optional + Whether to add noise for following denoising steps or not, + by default True. + use_lcm_lora : bool, optional + Whether to use LCM-LoRA or not, by default True. + use_tiny_vae : bool, optional + Whether to use TinyVAE or not, by default True. + cfg_type : Literal["none", "full", "self", "initialize"], + optional + The cfg_type for img2img mode, by default "self". + You cannot use anything other than "none" for txt2img mode. + seed : int, optional + The seed, by default 2. + + Returns + ------- + StreamDiffusion + The loaded model. + """ + + try: # Load from local directory + pipe: StableDiffusionPipeline = StableDiffusionPipeline.from_pretrained( + model_id_or_path, + ).to(device=self.device, dtype=self.dtype) + + except ValueError: # Load from huggingface + pipe: StableDiffusionPipeline = StableDiffusionPipeline.from_single_file( + model_id_or_path, + ).to(device=self.device, dtype=self.dtype) + except Exception: # No model found + traceback.print_exc() + print("Model load has failed. Doesn't exist.") + exit() + + stream = StreamDiffusion( + pipe=pipe, + t_index_list=t_index_list, + torch_dtype=self.dtype, + width=self.width, + height=self.height, + do_add_noise=do_add_noise, + frame_buffer_size=self.frame_buffer_size, + use_denoising_batch=self.use_denoising_batch, + cfg_type=cfg_type, + ) + if not self.sd_turbo: + if use_lcm_lora: + if lcm_lora_id is not None: + stream.load_lcm_lora( + pretrained_model_name_or_path_or_dict=lcm_lora_id + ) + else: + stream.load_lcm_lora() + stream.fuse_lora() + + if lora_dict is not None: + for lora_name, lora_scale in lora_dict.items(): + stream.load_lora(lora_name) + stream.fuse_lora(lora_scale=lora_scale) + print(f"Use LoRA: {lora_name} in weights {lora_scale}") + + if use_tiny_vae: + if vae_id is not None: + stream.vae = AutoencoderTiny.from_pretrained(vae_id).to( + device=pipe.device, dtype=pipe.dtype + ) + else: + stream.vae = AutoencoderTiny.from_pretrained("madebyollin/taesd").to( + device=pipe.device, dtype=pipe.dtype + ) + + try: + if acceleration == "xformers": + stream.pipe.enable_xformers_memory_efficient_attention() + if acceleration == "tensorrt": + from polygraphy import cuda + from streamdiffusion.acceleration.tensorrt import ( + TorchVAEEncoder, + compile_unet, + compile_vae_decoder, + compile_vae_encoder, + ) + from streamdiffusion.acceleration.tensorrt.engine import ( + AutoencoderKLEngine, + UNet2DConditionModelEngine, + ) + from streamdiffusion.acceleration.tensorrt.models import ( + VAE, + UNet, + VAEEncoder, + ) + + def create_prefix( + model_id_or_path: str, + max_batch_size: int, + min_batch_size: int, + ): + maybe_path = Path(model_id_or_path) + if maybe_path.exists(): + return f"{maybe_path.stem}--lcm_lora-{use_lcm_lora}--tiny_vae-{use_tiny_vae}--max_batch-{max_batch_size}--min_batch-{min_batch_size}--mode-{self.mode}" + else: + return f"{model_id_or_path}--lcm_lora-{use_lcm_lora}--tiny_vae-{use_tiny_vae}--max_batch-{max_batch_size}--min_batch-{min_batch_size}--mode-{self.mode}" + + engine_dir = Path(engine_dir) + unet_path = os.path.join( + engine_dir, + create_prefix( + model_id_or_path=model_id_or_path, + max_batch_size=stream.trt_unet_batch_size, + min_batch_size=stream.trt_unet_batch_size, + ), + "unet.engine", + ) + vae_encoder_path = os.path.join( + engine_dir, + create_prefix( + model_id_or_path=model_id_or_path, + max_batch_size=self.batch_size + if self.mode == "txt2img" + else stream.frame_bff_size, + min_batch_size=self.batch_size + if self.mode == "txt2img" + else stream.frame_bff_size, + ), + "vae_encoder.engine", + ) + vae_decoder_path = os.path.join( + engine_dir, + create_prefix( + model_id_or_path=model_id_or_path, + max_batch_size=self.batch_size + if self.mode == "txt2img" + else stream.frame_bff_size, + min_batch_size=self.batch_size + if self.mode == "txt2img" + else stream.frame_bff_size, + ), + "vae_decoder.engine", + ) + + if not os.path.exists(unet_path): + os.makedirs(os.path.dirname(unet_path), exist_ok=True) + unet_model = UNet( + fp16=True, + device=stream.device, + max_batch_size=stream.trt_unet_batch_size, + min_batch_size=stream.trt_unet_batch_size, + embedding_dim=stream.text_encoder.config.hidden_size, + unet_dim=stream.unet.config.in_channels, + ) + compile_unet( + stream.unet, + unet_model, + unet_path + ".onnx", + unet_path + ".opt.onnx", + unet_path, + opt_batch_size=stream.trt_unet_batch_size, + ) + + if not os.path.exists(vae_decoder_path): + os.makedirs(os.path.dirname(vae_decoder_path), exist_ok=True) + stream.vae.forward = stream.vae.decode + vae_decoder_model = VAE( + device=stream.device, + max_batch_size=self.batch_size + if self.mode == "txt2img" + else stream.frame_bff_size, + min_batch_size=self.batch_size + if self.mode == "txt2img" + else stream.frame_bff_size, + ) + compile_vae_decoder( + stream.vae, + vae_decoder_model, + vae_decoder_path + ".onnx", + vae_decoder_path + ".opt.onnx", + vae_decoder_path, + opt_batch_size=self.batch_size + if self.mode == "txt2img" + else stream.frame_bff_size, + ) + delattr(stream.vae, "forward") + + if not os.path.exists(vae_encoder_path): + os.makedirs(os.path.dirname(vae_encoder_path), exist_ok=True) + vae_encoder = TorchVAEEncoder(stream.vae).to(torch.device("cuda")) + vae_encoder_model = VAEEncoder( + device=stream.device, + max_batch_size=self.batch_size + if self.mode == "txt2img" + else stream.frame_bff_size, + min_batch_size=self.batch_size + if self.mode == "txt2img" + else stream.frame_bff_size, + ) + compile_vae_encoder( + vae_encoder, + vae_encoder_model, + vae_encoder_path + ".onnx", + vae_encoder_path + ".opt.onnx", + vae_encoder_path, + opt_batch_size=self.batch_size + if self.mode == "txt2img" + else stream.frame_bff_size, + ) + + cuda_stream = cuda.Stream() + + vae_config = stream.vae.config + vae_dtype = stream.vae.dtype + + stream.unet = UNet2DConditionModelEngine( + unet_path, cuda_stream, use_cuda_graph=False + ) + stream.vae = AutoencoderKLEngine( + vae_encoder_path, + vae_decoder_path, + cuda_stream, + stream.pipe.vae_scale_factor, + use_cuda_graph=False, + ) + setattr(stream.vae, "config", vae_config) + setattr(stream.vae, "dtype", vae_dtype) + + gc.collect() + torch.cuda.empty_cache() + + print("TensorRT acceleration enabled.") + if acceleration == "sfast": + from streamdiffusion.acceleration.sfast import ( + accelerate_with_stable_fast, + ) + + stream = accelerate_with_stable_fast(stream) + print("StableFast acceleration enabled.") + except Exception: + traceback.print_exc() + print("Acceleration has failed. Falling back to normal mode.") + + if seed < 0: # Random seed + seed = np.random.randint(0, 1000000) + + stream.prepare( + "", + "", + num_inference_steps=50, + guidance_scale=1.1 + if stream.cfg_type in ["full", "self", "initialize"] + else 1.0, + generator=torch.manual_seed(seed), + seed=seed, + ) + + if self.use_safety_checker: + from transformers import CLIPFeatureExtractor + from diffusers.pipelines.stable_diffusion.safety_checker import ( + StableDiffusionSafetyChecker, + ) + + self.safety_checker = StableDiffusionSafetyChecker.from_pretrained( + "CompVis/stable-diffusion-safety-checker" + ).to(pipe.device) + self.feature_extractor = CLIPFeatureExtractor.from_pretrained( + "openai/clip-vit-base-patch32" + ) + self.nsfw_fallback_img = Image.new("RGB", (512, 512), (0, 0, 0)) + + return stream diff --git a/runner/app/python/infer/infer.py b/runner/app/python/infer/infer.py new file mode 100644 index 00000000..b871c26f --- /dev/null +++ b/runner/app/python/infer/infer.py @@ -0,0 +1,296 @@ +import argparse +import asyncio +import logging +import os +import io +import signal +import time +import traceback +from typing import Callable, List + +import watchdog.events +import watchdog.observers +import zmq.asyncio +from PIL import Image +from aiohttp import web + +from transmorgrifiers import Transmorgrifier +import multiprocessing as mp +import queue +prompt_file = "./prompt.txt" +fps_log_interval = 10 + +def to_jpeg_bytes(frame: Image.Image): + buffer = io.BytesIO() + frame.save(buffer, format='JPEG') + bytes = buffer.getvalue() + buffer.close() + return bytes + +def from_jpeg_bytes(frame_bytes: bytes): + image = Image.open(io.BytesIO(frame_bytes)) + if image.mode != 'RGBA': + image = image.convert('RGBA') + return image + +class FileWatcher(watchdog.events.FileSystemEventHandler): + def __init__(self, filename: str, callback: Callable[[str], None]): + self.filename = filename + self.callback = callback + self.setup_observer() + + def setup_observer(self): + self.load_file() + observer = watchdog.observers.Observer() + observer.schedule(self, path=os.path.dirname(self.filename), recursive=False) + observer.start() + + def on_modified(self, event): + if event.src_path == self.filename: + self.load_file() + + def load_file(self): + try: + with open(self.filename, 'r') as f: + contents = f.read().strip() + if contents: + self.callback(contents) + except FileNotFoundError: + pass + +def load_transmorgrifier(name: str, **params) -> Transmorgrifier: + if name == "streamkohaku": + from transmorgrifiers.streamkohaku import StreamKohaku + return StreamKohaku(**params) + elif name == "liveportrait": + from transmorgrifiers.liveportrait import LivePortrait + return LivePortrait(**params) + raise ValueError(f"Unknown transmorgrifier: {name}") + +class TransmorgrifierProcess: + def __init__(self, transmorgrifier_name: str): + self.transmorgrifier_name = transmorgrifier_name + + self.ctx = mp.get_context('spawn') + self.input_queue = self.ctx.Queue(maxsize=5) + self.output_queue = self.ctx.Queue() + self.param_update_queue = self.ctx.Queue() + self.done = self.ctx.Event() + + self.process = self.ctx.Process(target=self.process_loop, args=()) + + def start(self): + self.process.start() + + def stop(self): + self.done.set() + self.process.join(timeout=5) + if self.process.is_alive(): + self.process.terminate() + + def send_input(self, frame: Image.Image): + while not self.is_done(): + try: + self.input_queue.put_nowait(frame) + break + except queue.Full: + try: + # remove oldest frame from queue to add new one + self.input_queue.get_nowait() + except queue.Empty: + continue + + async def recv_output(self): + # we cannot do a long get with timeout as that would block the asyncio + # event loop, so we loop with nowait and sleep async instead. + while not self.is_done(): + try: + return self.output_queue.get_nowait() + except queue.Empty: + await asyncio.sleep(0.005) + continue + return None + + def is_done(self): + return self.done.is_set() + + def process_loop(self): + logging.basicConfig(level=logging.INFO) + try: + try: + params = self.param_update_queue.get_nowait() + except queue.Empty: + params = {} + transmorgrifier = load_transmorgrifier(self.transmorgrifier_name, **params) + logging.info("Transmorgrifier loaded successfully") + + while not self.is_done(): + if not self.param_update_queue.empty(): + params = self.param_update_queue.get_nowait() + transmorgrifier.update_params(**params) + logging.info(f"Updated params: {params}") + + try: + input_image = self.input_queue.get(timeout=0.1) + except queue.Empty: + logging.debug(f"Input queue empty") + continue + + try: + output_image = transmorgrifier.process_frame(input_image) + self.output_queue.put(output_image) + except Exception as e: + logging.error(f"Error processing frame: {e}") + except Exception as e: + logging.error(f"Error in process run method: {e}") + +class SocketHandler: + def __init__(self, input_socket: zmq.asyncio.Socket, output_socket: zmq.asyncio.Socket, pipeline: str): + self.input_socket = input_socket + self.output_socket = output_socket + self.process = TransmorgrifierProcess(pipeline) + self.last_prompt = None + self.prompt_watcher = FileWatcher(prompt_file, self.set_prompt) + + def start(self): + self.input_task = asyncio.create_task(self.input_loop()) + self.process.start() + self.output_task = asyncio.create_task(self.output_loop()) + + async def stop(self): + if self.input_task: + self.input_task.cancel() + try: + await self.input_task + finally: + self.input_task = None + + self.process.stop() + + if self.output_task: + self.output_task.cancel() + try: + await self.output_task + finally: + self.output_task = None + + def set_prompt(self, prompt: str): + if prompt != self.last_prompt: + self.update_params({'prompt': prompt}) + logging.info(f"Prompt: {prompt}") + + def update_params(self, params: dict): + self.last_prompt = params.get('prompt', None) + self.process.param_update_queue.put(params) + + async def input_loop(self): + frame_count = 0 + start_time = time.time() + while not self.process.is_done(): + frame_bytes = await self.input_socket.recv() + frame = from_jpeg_bytes(frame_bytes) + + self.process.send_input(frame) + + # Increment frame count and measure FPS + frame_count += 1 + elapsed_time = time.time() - start_time + if elapsed_time >= fps_log_interval: + fps = frame_count / elapsed_time + logging.info(f"Input FPS: {fps:.2f}") + frame_count = 0 + start_time = time.time() + + async def output_loop(self): + frame_count = 0 + start_time = time.time() + while True: + output_image = await self.process.recv_output() + if not output_image: + break + logging.debug(f"Output image received out_width: {output_image.width}, out_height: {output_image.height}") + + await self.output_socket.send(to_jpeg_bytes(output_image)) + + # Increment frame count and measure FPS + frame_count += 1 + elapsed_time = time.time() - start_time + if elapsed_time >= fps_log_interval: + fps = frame_count / elapsed_time + logging.info(f"Output FPS: {fps:.2f}") + frame_count = 0 + start_time = time.time() + +async def handle_params_update(request): + try: + params = await request.json() + request.app['handler'].update_params(params) + return web.Response(text="Params updated successfully") + except Exception as e: + logging.error(f"Error updating params: {e}") + return web.Response(text=f"Error updating params: {str(e)}", status=400) + +async def start_http_server(handler: SocketHandler, port: int): + app = web.Application() + app['handler'] = handler + app.router.add_post('/params', handle_params_update) + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, '0.0.0.0', port) + await site.start() + logging.info(f"HTTP server started on port {port}") + return runner + +async def main(http_port: int, input_address: str, output_address: str, pipeline: str): + context = zmq.asyncio.Context() + + input_socket = context.socket(zmq.SUB) + input_socket.connect(input_address) + input_socket.setsockopt_string(zmq.SUBSCRIBE, '') # Subscribe to all messages + input_socket.set_hwm(10) + + output_socket = context.socket(zmq.PUB) + output_socket.connect(output_address) + output_socket.set_hwm(10) + + handler = SocketHandler(input_socket, output_socket, pipeline) + runner: web.AppRunner + try: + handler.start() + runner = await start_http_server(handler, http_port) + except Exception as e: + logging.error(f"Error starting socket handler or HTTP server: {e}") + logging.error(f"Stack trace:\n{traceback.format_exc()}") + raise e + + await block_until_signal([signal.SIGINT, signal.SIGTERM]) + try: + await runner.cleanup() + await handler.stop() + except Exception as e: + logging.error(f"Error stopping room handler: {e}") + logging.error(f"Stack trace:\n{traceback.format_exc()}") + raise e + +async def block_until_signal(sigs: List[signal.Signals]): + loop = asyncio.get_running_loop() + future: asyncio.Future[signal.Signals] = loop.create_future() + + def signal_handler(sig, _): + logging.info(f"Received signal: {sig}") + loop.call_soon_threadsafe(future.set_result, sig) + for sig in sigs: + signal.signal(sig, signal_handler) + return await future + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Infer process to run the AI pipeline") + parser.add_argument("--http-port", type=int, default=8888, help="Port for the HTTP server") + parser.add_argument("--input-address", type=str, default="tcp://localhost:5555", help="Address for the input socket") + parser.add_argument("--output-address", type=str, default="tcp://localhost:5556", help="Address for the output socket") + parser.add_argument("--pipeline", type=str, default="streamkohaku", help="Pipeline to use") + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO) + + asyncio.run(main(args.http_port, args.input_address, args.output_address, args.pipeline)) diff --git a/runner/app/python/infer/transmorgrifiers/__init__.py b/runner/app/python/infer/transmorgrifiers/__init__.py new file mode 100644 index 00000000..995f16c6 --- /dev/null +++ b/runner/app/python/infer/transmorgrifiers/__init__.py @@ -0,0 +1,3 @@ +from .interface import Transmorgrifier + +__all__ = ["Transmorgrifier"] diff --git a/runner/app/python/infer/transmorgrifiers/interface.py b/runner/app/python/infer/transmorgrifiers/interface.py new file mode 100644 index 00000000..ee2de943 --- /dev/null +++ b/runner/app/python/infer/transmorgrifiers/interface.py @@ -0,0 +1,14 @@ +from abc import ABC, abstractmethod +from PIL import Image + +class Transmorgrifier(ABC): + def __init__(self, **params): + pass + + @abstractmethod + def process_frame(self, frame: Image.Image) -> Image.Image: + pass + + @abstractmethod + def update_params(self, **params): + pass diff --git a/runner/app/python/infer/transmorgrifiers/liveportrait.py b/runner/app/python/infer/transmorgrifiers/liveportrait.py new file mode 100644 index 00000000..05596a4a --- /dev/null +++ b/runner/app/python/infer/transmorgrifiers/liveportrait.py @@ -0,0 +1,69 @@ +from typing import Literal, Optional, List, Dict +from PIL import Image +from pydantic import BaseModel, Field + +from omegaconf import OmegaConf +import cv2 +import numpy as np + + +import sys +import os + +# FasterLivePotrait modules imports files from the root of the project, so we need to monkey patch the sys path +base_flip_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "FasterLivePortrait")) +sys.path.append(base_flip_dir) +from FasterLivePortrait.src.pipelines.faster_live_portrait_pipeline import FasterLivePortraitPipeline + +from .interface import Transmorgrifier + + +def make_flip_path(rel_path): + return os.path.normpath(os.path.join(base_flip_dir, rel_path)) + +class LivePortraitParams(BaseModel): + src_image: str = 'assets/examples/source/s12.jpg' + animal: bool = False + cfg: str = 'configs/trt_infer.yaml' + +class LivePortrait(Transmorgrifier): + def __init__(self, **params): + super().__init__(**params) + self.update_params(**params) + + def process_frame(self, image: Image.Image) -> Image.Image: + cv2_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) + _, out_crop, _ = self.pipe.run(cv2_image, self.pipe.src_imgs[0], self.pipe.src_infos[0], first_frame=self.first_frame) + self.first_frame = False + + if out_crop is None: + print(f"No face in driving frame") + return image + + return Image.fromarray(out_crop) + + def update_params(self, **params): + print(f"params: {params}") + self.params = LivePortraitParams(**params) + self.params.cfg = make_flip_path(self.params.cfg) + self.params.src_image = make_flip_path(self.params.src_image) + + print(f"params.cfg: {self.params}") + + self.infer_cfg = OmegaConf.load(self.params.cfg) + self.infer_cfg.infer_params.mask_crop_path = make_flip_path(self.infer_cfg.infer_params.mask_crop_path) + for model_name in self.infer_cfg.models: + model_params = self.infer_cfg.models[model_name] + if isinstance(model_params.model_path, str): + model_params.model_path = make_flip_path(model_params.model_path) + else: + model_params.model_path = [make_flip_path(path) for path in model_params.model_path] + + self.pipe: Optional[FasterLivePortraitPipeline] = FasterLivePortraitPipeline(cfg=self.infer_cfg, is_animal=self.params.animal) + self.first_frame = True + + prepared_src = self.pipe.prepare_source(self.params.src_image) + if not prepared_src: + raise ValueError(f"no face in {self.params.src_image}!") + else: + self.prepared_src = prepared_src diff --git a/runner/app/python/infer/transmorgrifiers/streamkohaku.py b/runner/app/python/infer/transmorgrifiers/streamkohaku.py new file mode 100644 index 00000000..39fe0c35 --- /dev/null +++ b/runner/app/python/infer/transmorgrifiers/streamkohaku.py @@ -0,0 +1,83 @@ +from typing import Literal, Optional, List, Dict +from PIL import Image +from pydantic import BaseModel, Field + +from StreamDiffusionWrapper import StreamDiffusionWrapper + +from .interface import Transmorgrifier + +class StreamKohakuParams(BaseModel): + prompt: str = "anime drawing style" + model_id: str = "KBlueLeaf/kohaku-v2.1" + lora_dict: Optional[Dict[str, float]] = None + use_lcm_lora: bool = True + num_inference_steps: int = 50 + t_index_list: Optional[List[int]] = None + t_index_ratio_list: Optional[List[float]] = [0.75, 0.9, 0.975] + scale: float = 1.0 + acceleration: Literal["none", "xformers", "tensorrt"] = "tensorrt" + use_denoising_batch: bool = True + enable_similar_image_filter: bool = True + seed: int = 2 + guidance_scale: float = 1.2 + + def __init__(self, **data): + super().__init__(**data) + if self.t_index_ratio_list is not None: + self.t_index_list = [int(i * self.num_inference_steps) for i in self.t_index_ratio_list] + +class StreamKohaku(Transmorgrifier): + def __init__(self, **params): + super().__init__(**params) + self.params = StreamKohakuParams(**params) + self.pipe: Optional[StreamDiffusionWrapper] = None + self.last_prompt = self.params.prompt + + def process_frame(self, image: Image.Image) -> Image.Image: + first = self.pipe is None + if self.pipe is None: + self.pipe = StreamDiffusionWrapper( + model_id_or_path=self.params.model_id, + lora_dict=self.params.lora_dict, + use_lcm_lora=self.params.use_lcm_lora, + t_index_list=self.params.t_index_list, + frame_buffer_size=1, + width=image.width, + height=image.height, + warmup=10, + acceleration=self.params.acceleration, + do_add_noise=False, + mode="img2img", + # output_type="pt", + enable_similar_image_filter=self.params.enable_similar_image_filter, + similar_image_filter_threshold=0.98, + use_denoising_batch=self.params.use_denoising_batch, + seed=self.params.seed, + ) + self.pipe.prepare( + prompt=self.params.prompt, + num_inference_steps=self.params.num_inference_steps, + guidance_scale=self.params.guidance_scale, + ) + + if self.last_prompt != self.params.prompt: + self.pipe.stream.update_prompt(self.params.prompt) + self.last_prompt = self.params.prompt + + img_tensor = self.pipe.preprocess_image(image) + img_tensor = self.pipe.stream.image_processor.denormalize(img_tensor) + + if first: + for _ in range(self.pipe.batch_size): + self.pipe(image=img_tensor) + + return self.pipe(image=img_tensor) + + def update_params(self, **params): + new_params = StreamKohakuParams(**params) + # reset the pipe if anything changed other than the prompt + only_prompt = self.params.model_copy(update={'prompt': new_params.prompt}) + if new_params != only_prompt: + self.pipe = None + print(f"Reset diffuser for params change") + self.params = new_params diff --git a/runner/app/python/ingress/main.py b/runner/app/python/ingress/main.py new file mode 100644 index 00000000..e874002f --- /dev/null +++ b/runner/app/python/ingress/main.py @@ -0,0 +1,130 @@ +import sys +import gi +import zmq +import time + +gi.require_version('Gst', '1.0') +from gi.repository import Gst, GLib + +class VideoIngress: + def __init__(self): + Gst.init(None) + + self.pipeline = Gst.Pipeline.new("video-ingress") + + # Create elements + self.src = Gst.ElementFactory.make("filesrc", "file-source") + self.decodebin = Gst.ElementFactory.make("decodebin", "decode-bin") + self.queue = Gst.ElementFactory.make("queue", "queue") + self.videoconvert = Gst.ElementFactory.make("videoconvert", "video-convert") + self.videoscale = Gst.ElementFactory.make("videoscale", "video-scale") + self.capsfilter = Gst.ElementFactory.make("capsfilter", "caps-filter") + self.jpegenc = Gst.ElementFactory.make("jpegenc", "jpeg-encoder") + self.appsink = Gst.ElementFactory.make("appsink", "app-sink") + + # Set properties + self.src.set_property("location", "10s.mp4") # Replace with your MP4 file path + self.queue.set_property("max-size-buffers", 1) + self.queue.set_property("max-size-time", 0) + self.queue.set_property("max-size-bytes", 0) + self.capsfilter.set_property("caps", Gst.Caps.from_string("video/x-raw,width=512,height=512")) + self.appsink.set_property("max-buffers", 1) + self.appsink.set_property("drop", True) + self.appsink.set_property("sync", False) + + # Add elements to pipeline + self.pipeline.add(self.src) + self.pipeline.add(self.decodebin) + self.pipeline.add(self.queue) + self.pipeline.add(self.videoconvert) + self.pipeline.add(self.videoscale) + self.pipeline.add(self.capsfilter) + self.pipeline.add(self.jpegenc) + self.pipeline.add(self.appsink) + + # Link elements + self.src.link(self.decodebin) + self.queue.link(self.videoconvert) + self.videoconvert.link(self.videoscale) + self.videoscale.link(self.capsfilter) + self.capsfilter.link(self.jpegenc) + self.jpegenc.link(self.appsink) + + # Connect pad-added signal for decodebin + self.decodebin.connect("pad-added", self.on_pad_added) + + # Set up ZMQ PUB socket + self.context = zmq.Context() + self.publisher = self.context.socket(zmq.PUB) + self.publisher.set_hwm(1) + self.publisher.bind("tcp://*:5555") + + # Set up appsink callbacks + self.appsink.set_property('emit-signals', True) + self.appsink.connect("new-sample", self.on_new_sample) + + # Variables for frame rate calculation + self.frame_count = 0 + self.start_time = time.time() + + def on_pad_added(self, element, pad): + sink_pad = self.queue.get_static_pad("sink") + pad.link(sink_pad) + + def on_new_sample(self, sink): + sample = sink.emit("pull-sample") + if sample: + buffer = sample.get_buffer() + data = buffer.extract_dup(0, buffer.get_size()) + + # Send frame via ZMQ + try: + self.publisher.send(data, zmq.NOBLOCK) + except zmq.error.Again: + print("Failed to send frame: High water mark reached") + + # Update frame count and calculate FPS + self.frame_count += 1 + elapsed = time.time() - self.start_time + if elapsed >= 1.0: + fps = self.frame_count / elapsed + print(f"Producer FPS: {fps:.2f}") + self.frame_count = 0 + self.start_time = time.time() + + return Gst.FlowReturn.OK + + def run(self): + # Start playing + self.pipeline.set_state(Gst.State.PLAYING) + + # Run the main loop + loop = GLib.MainLoop() + + # Add a message handler to the pipeline bus + bus = self.pipeline.get_bus() + bus.add_signal_watch() + bus.connect("message", self.on_message, loop) + + try: + loop.run() + except KeyboardInterrupt: + pass + finally: + self.pipeline.set_state(Gst.State.NULL) + + def on_message(self, bus, message, loop): + t = message.type + if t == Gst.MessageType.EOS: + print("End-of-stream") + loop.quit() + elif t == Gst.MessageType.ERROR: + err, debug = message.parse_error() + print(f"Error: {err.message}") + if debug: + print(f"Debug info: {debug}") + loop.quit() + +if __name__ == "__main__": + ingress = VideoIngress() + ingress.run() \ No newline at end of file diff --git a/runner/app/runit/infer/log/run b/runner/app/runit/infer/log/run new file mode 100755 index 00000000..7087e482 --- /dev/null +++ b/runner/app/runit/infer/log/run @@ -0,0 +1,3 @@ +#!/bin/sh + +exec svlogd -tt /var/log/infer diff --git a/runner/app/runit/infer/run b/runner/app/runit/infer/run new file mode 100755 index 00000000..5c985a03 --- /dev/null +++ b/runner/app/runit/infer/run @@ -0,0 +1,13 @@ +#!/bin/sh + +#cd /app/streamdiffusion/examples/img2img/ +echo "Starting single.py" +#exec python single.py 2>&1 +echo "Done single.py" +#sv stop /etc/service/infer + +# just a sample bash loop to verify runit launches this script: +#while true; do +# echo "infer service is running" +# sleep 5 +#done diff --git a/runner/app/runit/uvicorn/log/run b/runner/app/runit/uvicorn/log/run new file mode 100755 index 00000000..84d4d591 --- /dev/null +++ b/runner/app/runit/uvicorn/log/run @@ -0,0 +1,3 @@ +#!/bin/sh + +exec svlogd -tt /var/log/uvicorn diff --git a/runner/app/runit/uvicorn/run b/runner/app/runit/uvicorn/run new file mode 100755 index 00000000..ed0ec412 --- /dev/null +++ b/runner/app/runit/uvicorn/run @@ -0,0 +1,4 @@ +#!/bin/sh + +cd /app +exec uvicorn app.main:app --log-config app/cfg/uvicorn_logging_config.json --host 0.0.0.0 --port 8000 diff --git a/runner/docker/Dockerfile.apps b/runner/docker/Dockerfile.apps new file mode 100644 index 00000000..3d1592ef --- /dev/null +++ b/runner/docker/Dockerfile.apps @@ -0,0 +1,99 @@ +ARG BASE_IMAGE=livepeer/ai-runner:stream-diffusion +FROM ${BASE_IMAGE} + +# Install latest stable Go version and system dependencies +RUN apt-get update && apt-get install -y \ + wget \ + python3-dev \ + python3-gi \ + libcairo2-dev \ + libgirepository1.0-dev \ + pkg-config \ + && \ + wget https://go.dev/dl/go1.21.5.linux-amd64.tar.gz && \ + tar -C /usr/local -xzf go1.21.5.linux-amd64.tar.gz && \ + rm go1.21.5.linux-amd64.tar.gz + +ENV PATH=$PATH:/usr/local/go/bin + +# Set a default value for APP_DIRS +ARG APP_DIRS="ingress" +# Set environment variable for the app directories +# ENV APP_DIRS=${APP_DIRS} + +# Add a build argument for installing Go apps +ARG INSTALL_GO_APPS=false + +# Create the app directory and build Go apps if INSTALL_GO_APPS is true +RUN if [ "$INSTALL_GO_APPS" = "true" ]; then \ + mkdir -p /app/go && \ + cd /app/go && \ + for app in ${APP_DIRS}; do \ + if [ -d "$app" ]; then \ + cd $app && \ + if [ ! -f go.mod ]; then \ + go mod init github.com/livepeer/ai-worker/runner/app/go/$app; \ + fi && \ + go mod tidy && \ + if [ -f main.go ]; then \ + CGO_ENABLED=1 go build -o main; \ + elif [ -f Makefile ]; then \ + make; \ + else \ + echo "No recognized build process for ${app}"; \ + exit 1; \ + fi && \ + cd ..; \ + else \ + echo "Directory not found for ${app}"; \ + exit 1; \ + fi; \ + done && \ + for app in ${APP_DIRS}; do \ + if [ -f ${app}/main ]; then \ + mv ${app}/main /usr/local/bin/${app}; \ + elif [ -f ${app}/${app} ]; then \ + mv ${app}/${app} /usr/local/bin/${app}; \ + else \ + echo "No compiled binary found for ${app}"; \ + exit 1; \ + fi; \ + done; \ +fi + +# Copy Go app source files if INSTALL_GO_APPS is true +COPY --chown=root:root app/go/. /app/go/ +# Return to the app directory +WORKDIR /app +# Install any additional Python packages +COPY requirements.txt /app/ +RUN pip install --upgrade pip && \ + pip install --no-cache-dir -r /app/requirements.txt + +# Install StreamDiffusion +RUN pip install streamdiffusion[tensorrt] + +# Move StreamDiffusion to /app/streamdiffusion instead of copying +RUN mv /streamdiffusion /app/streamdiffusion +# Set up PYTHONPATH +ENV PYTHONPATH="${PYTHONPATH}:/app:/app/streamdiffusion" + +# Copy application files +COPY app/ /app/ +COPY images/ /app/images +COPY bench.py /app/bench.py + +# Set up runit service directories and copy run scripts +COPY app/runit /etc/service +# Create log directories for each service +RUN mkdir -p /var/log/uvicorn /var/log/infer +# Ensure all run scripts are executable +RUN chmod +x /etc/service/*/run /etc/service/*/log/run + +# Set working directory to /app +WORKDIR /app +# Clean up +RUN apt-get clean && rm -rf /var/lib/apt/lists/* + +# Set the init system to runit +CMD ["runsvdir", "/etc/service"] diff --git a/runner/docker/Dockerfile.multimedia b/runner/docker/Dockerfile.multimedia new file mode 100644 index 00000000..ceec30b5 --- /dev/null +++ b/runner/docker/Dockerfile.multimedia @@ -0,0 +1,84 @@ +# Stage 1: Build FFmpeg with the necessary libraries +ARG BASE_IMAGE=livepeer/ai-runner:base +FROM ${BASE_IMAGE} + +# Install necessary packages +RUN apt-get update && apt-get install -y --no-install-recommends \ + autoconf automake build-essential cmake git-core libtool pkg-config wget \ + nasm yasm zlib1g-dev libpng-dev && \ + rm -rf /var/lib/apt/lists/* + +# Install NVIDIA headers for NVENC/NVDEC +RUN apt-get update && apt-get install -y --no-install-recommends \ + nvidia-cuda-dev nvidia-cuda-toolkit nvidia-cuda-toolkit-gcc && \ + rm -rf /var/lib/apt/lists/* + +# Set up environment variables +ENV NV_CODEC_HEADERS=/usr/local/cuda/include/ffnvcodec/ +RUN git clone -b n12.2.72.0 --depth 1 https://git.videolan.org/git/ffmpeg/nv-codec-headers.git && \ + cd nv-codec-headers && \ + make && \ + make install && \ + cd .. && rm -rf nv-codec-headers + +# Clone the FFmpeg repository and checkout the latest release +RUN git clone --branch n7.0.2 --depth 1 https://github.com/FFmpeg/FFmpeg.git ffmpeg + +# Build FFmpeg with static linking and the desired hardware-accelerated features +RUN cd ffmpeg && \ + ./configure --prefix=/compiled \ + --pkg-config-flags="--static" \ + --extra-cflags="-I/ffmpeg_build/include -I/usr/local/cuda/include" \ + --extra-ldflags="-L/ffmpeg_build/lib -L/usr/local/cuda/lib64" \ + --extra-libs="-lpthread -lm" \ + --enable-nonfree \ + --enable-cuda-nvcc \ + --enable-cuda \ + --enable-libnpp \ + --enable-cuvid \ + --enable-nvenc \ + --enable-nvdec \ + --enable-static \ + --disable-shared \ + --disable-debug \ + --disable-doc && \ + make -j$(nproc) && \ + make install && \ + make distclean + +# Copy the compiled FFmpeg binaries to /usr/local +RUN cp -R /compiled/* /usr/local/ + +# Install necessary dependencies +RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \ + libgstreamer1.0-dev \ + libgstreamer-plugins-base1.0-dev \ + libgstreamer-plugins-bad1.0-dev \ + gstreamer1.0-plugins-base \ + gstreamer1.0-plugins-good \ + gstreamer1.0-plugins-bad \ + gstreamer1.0-plugins-ugly \ + gstreamer1.0-libav \ + gstreamer1.0-tools \ + gstreamer1.0-x \ + gstreamer1.0-alsa \ + gstreamer1.0-gl \ + gstreamer1.0-gtk3 \ + gstreamer1.0-qt5 \ + gstreamer1.0-pulseaudio \ + libgstrtspserver-1.0-dev \ + libgirepository1.0-dev \ + python3-gst-1.0 + +# Install NVIDIA codec libraries for hardware acceleration +# Note: The exact package names might vary depending on your NVIDIA driver version +RUN apt-get install -y --no-install-recommends \ + libnvidia-decode-535-server \ + libnvidia-encode-535-server + +# Clean up APT cache to reduce image size +RUN apt-get clean && rm -rf /var/lib/apt/lists/* + +# Set environment variables for GStreamer +ENV GST_PLUGIN_PATH=/usr/lib/x86_64-linux-gnu/gstreamer-1.0 +ENV GST_PLUGIN_SCANNER=/usr/lib/x86_64-linux-gnu/gstreamer1.0/gstreamer-1.0/gst-plugin-scanner diff --git a/runner/docker/Dockerfile.rtinference b/runner/docker/Dockerfile.rtinference new file mode 100644 index 00000000..dbafe821 --- /dev/null +++ b/runner/docker/Dockerfile.rtinference @@ -0,0 +1,42 @@ +ARG BASE_IMAGE=livepeer/ai-runner:base +FROM ${BASE_IMAGE} + +ENV PIP_PREFER_BINARY=1 + +# Install NVIDIA Container Toolkit +RUN apt-get update && apt-get install -y --no-install-recommends \ + nvidia-container-toolkit \ + && rm -rf /var/lib/apt/lists/* + +# Install StreamDiffusion dependencies +RUN pip install --no-cache-dir \ + torch==2.1.0 \ + torchvision==0.16.0 \ + xformers \ + --index-url https://download.pytorch.org/whl/cu118 + +# Clone StreamDiffusion repository +RUN git clone https://github.com/cumulo-autumn/StreamDiffusion.git /streamdiffusion +WORKDIR /streamdiffusion + +# Install StreamDiffusion +RUN python setup.py develop easy_install streamdiffusion[tensorrt] + +# Install TensorRT extension +RUN python -m streamdiffusion.tools.install-tensorrt + +# Set working directory to /app +WORKDIR /app + +# Copy entire StreamDiffusion directory to /app/streamdiffusion +RUN cp -r /streamdiffusion /app/streamdiffusion + +# Add /app and /app/streamdiffusion to PYTHONPATH +ENV PYTHONPATH="${PYTHONPATH}:/app:/app/streamdiffusion" + +# Set environment variables for NVIDIA drivers +ENV NVIDIA_VISIBLE_DEVICES all +ENV NVIDIA_DRIVER_CAPABILITIES compute,utility + +# Overwrite the CMD to start a bash shell +CMD ["/bin/bash"] diff --git a/runner/docker/Dockerfile.stream-diffusion b/runner/docker/Dockerfile.stream-diffusion new file mode 100644 index 00000000..d0aaa79c --- /dev/null +++ b/runner/docker/Dockerfile.stream-diffusion @@ -0,0 +1,30 @@ +ARG BASE_IMAGE=livepeer/ai-runner:multimedia +FROM ${BASE_IMAGE} + +ENV PIP_PREFER_BINARY=1 + +# Install NVIDIA Container Toolkit +RUN apt-get update && apt-get install -y --no-install-recommends \ + nvidia-container-toolkit \ + && rm -rf /var/lib/apt/lists/* + +# Install StreamDiffusion dependencies +RUN pip install --no-cache-dir \ + torch==2.1.0 \ + torchvision==0.16.0 \ + xformers \ + --index-url https://download.pytorch.org/whl/cu121 + +# Clone StreamDiffusion repository +RUN git clone https://github.com/cumulo-autumn/StreamDiffusion.git /streamdiffusion +WORKDIR /streamdiffusion + +# Install StreamDiffusion +RUN python setup.py develop easy_install streamdiffusion[tensorrt] + +# Install TensorRT extension +RUN python -m streamdiffusion.tools.install-tensorrt + +# Set environment variables for NVIDIA drivers +ENV NVIDIA_VISIBLE_DEVICES all +ENV NVIDIA_DRIVER_CAPABILITIES compute,utility,video diff --git a/runner/requirements.txt b/runner/requirements.txt index 87f72e43..bc1922d1 100644 --- a/runner/requirements.txt +++ b/runner/requirements.txt @@ -19,3 +19,9 @@ sentencepiece== 0.2.0 protobuf==5.27.2 bitsandbytes==0.43.3 psutil==6.0.0 +pyzmq==26.2.0 +opencv-python==4.10.0.84 +PyGObject==3.50.0 +onnxruntime==1.19.2 +watchdog==5.0.2 +aiohttp==3.10.9