Skip to content

Commit

Permalink
feat: added gst-interpipe, decoupled Gstreamer video pipelines, suppo…
Browse files Browse the repository at this point in the history
…rts coop
  • Loading branch information
ABeltramo committed Oct 1, 2024
1 parent 888d2f0 commit 1a78e85
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 94 deletions.
7 changes: 7 additions & 0 deletions docker/gstreamer.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ RUN <<_GSTREAMER_INSTALL
equivs-build $SOURCE_PATH/gstreamer.control
dpkg -i gstreamer-wolf_${GSTREAMER_VERSION}_all.deb

# Add GstInterpipe
git clone https://github.com/RidgeRun/gst-interpipe.git $SOURCE_PATH/gst-interpipe
cd $SOURCE_PATH/gst-interpipe
mkdir build
meson build -Denable-gtk-doc=false
meson install -C build

# Final cleanup stage
apt-mark auto $DEV_PACKAGES
apt-get autoremove -y --purge
Expand Down
21 changes: 21 additions & 0 deletions docs/modules/user/pages/configuration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,27 @@ In order to automatically pick up the right encoder at runtime based on the user

You can read more about gstreamer and custom pipelines in the xref:gstreamer.adoc[] page.

== CO-OP sessions

There's experimental support for CO-OP sessions, this will allow multiple clients to connect to the same virtual session and play together. +

In order to enable this you'll have to add the following app entry in the `config.toml` file:

[source,toml]
....
[[apps]]
title = "CO-OP session"
# Get the parent session ID from the Wolf logs
runner = { type = "child_session", parent_session_id = "4135727842959053255" }
video = { source = "interpipesrc listen-to=4135727842959053255 is-live=true stream-sync=restart-ts" }
audio = { source = "pulsesrc device=\"virtual_sink_4135727842959053255.monitor\" server=\"{server_name}\"" }
start_virtual_compositor = false
....

In this example, the `parent_session_id` is the session ID of the parent session that you want to connect to. +
The process will be simplified once we complete implementing a custom UI for Wolf, see: https://github.com/games-on-whales/wolf/issues/80[games-on-whales/wolf#80].

[#_multiple_gpu]
== Multiple GPUs

Expand Down
7 changes: 4 additions & 3 deletions src/moonlight-server/runners/child_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ class RunChildSession : public events::Runner {
std::shared_ptr<std::atomic_bool> is_over = std::make_shared<std::atomic<bool>>(false);

auto stop_handler = ev_bus->register_handler<immer::box<events::StopStreamEvent>>(
[session_id, is_over](const immer::box<events::StopStreamEvent> &terminate_ev) {
if (terminate_ev->session_id == session_id) {
[session_id, parent_session_id = parent_session_id, is_over](
const immer::box<events::StopStreamEvent> &terminate_ev) {
if (terminate_ev->session_id == session_id || terminate_ev->session_id == parent_session_id) {
*is_over = true;
}
});
Expand Down Expand Up @@ -68,7 +69,7 @@ class RunChildSession : public events::Runner {

rfl::TaggedUnion<"type", wolf::config::AppCMD, wolf::config::AppDocker, wolf::config::AppChildSession>
serialize() override {
return wolf::config::AppChildSession{.parent_session_id = parent_session_id};
return wolf::config::AppChildSession{.parent_session_id = std::to_string(parent_session_id)};
}

private:
Expand Down
8 changes: 5 additions & 3 deletions src/moonlight-server/state/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,18 @@ inline std::string gen_uuid() {
return boost::lexical_cast<std::string>(uuid);
}

static std::shared_ptr<events::Runner> get_runner(const rfl::TaggedUnion<"type", AppCMD, AppDocker, AppChildSession> &runner,
const std::shared_ptr<events::EventBusType> &ev_bus) {
static std::shared_ptr<events::Runner>
get_runner(const rfl::TaggedUnion<"type", AppCMD, AppDocker, AppChildSession> &runner,
const std::shared_ptr<events::EventBusType> &ev_bus) {
if (rfl::holds_alternative<AppCMD>(runner.variant())) {
auto run_cmd = rfl::get<AppCMD>(runner.variant()).run_cmd;
return std::make_shared<process::RunProcess>(ev_bus, run_cmd);
} else if (rfl::holds_alternative<AppDocker>(runner.variant())) {
return std::make_shared<docker::RunDocker>(
docker::RunDocker::from_cfg(ev_bus, rfl::get<AppDocker>(runner.variant())));
} else if (rfl::holds_alternative<AppChildSession>(runner.variant())) {
return std::make_shared<coop::RunChildSession>(rfl::get<AppChildSession>(runner.variant()).parent_session_id, ev_bus);
auto session_id = rfl::get<AppChildSession>(runner.variant()).parent_session_id;
return std::make_shared<coop::RunChildSession>(std::stoul(session_id), ev_bus);
} else {
logs::log(logs::error, "Found runner of unknown type");
throw std::runtime_error("Unknown runner type");
Expand Down
2 changes: 1 addition & 1 deletion src/moonlight-server/state/default/config.v4.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ source = "audiotestsrc wave=ticks is-live=true"

[gstreamer.video]

default_source = "appsrc name=wolf_wayland_source is-live=true block=false format=3 stream-type=0"
default_source = "interpipesrc listen-to={session_id} is-live=true stream-sync=restart-ts"
default_sink = """
rtpmoonlightpay_video name=moonlight_pay \
payload_size={payload_size} fec_percentage={fec_percentage} min_required_fec_packets={min_required_fec_packets} !
Expand Down
4 changes: 2 additions & 2 deletions src/moonlight-server/state/serialised_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ struct AppDocker {
};

struct AppChildSession {
using Tag = rfl::Literal<"coop", "COOP">;
std::size_t parent_session_id;
using Tag = rfl::Literal<"child_session">;
std::string parent_session_id;
};

struct BaseAppVideoOverride {
Expand Down
106 changes: 60 additions & 46 deletions src/moonlight-server/streaming/streaming.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
#include <control/control.hpp>
#include <core/gstreamer.hpp>
#include <events/events.hpp>
#include <functional>
#include <gst-plugin/video.hpp>
#include <gstreamer-1.0/gst/app/gstappsink.h>
#include <gstreamer-1.0/gst/app/gstappsrc.h>
#include <immer/array.hpp>
#include <immer/array_transient.hpp>
#include <immer/box.hpp>
Expand All @@ -13,22 +8,13 @@

namespace streaming {

struct GstAppDataState {
wolf::core::gstreamer::gst_element_ptr app_src;
wolf::core::virtual_display::wl_state_ptr wayland_state;
GMainContext *context;
GSource *source;
int framerate;
GstClockTime timestamp = 0;
};

namespace custom_src {

std::shared_ptr<GstAppDataState> setup_app_src(const immer::box<events::VideoSession> &video_session,
std::shared_ptr<GstAppDataState> setup_app_src(const wolf::core::virtual_display::DisplayMode &video_session,
wolf::core::virtual_display::wl_state_ptr wl_ptr) {
return std::shared_ptr<GstAppDataState>(new GstAppDataState{.wayland_state = std::move(wl_ptr),
.source = nullptr,
.framerate = video_session->display_mode.refreshRate},
.framerate = video_session.refreshRate},
[](const auto &app_data_state) {
logs::log(logs::trace, "~GstAppDataState");
if (app_data_state->source) {
Expand All @@ -39,7 +25,7 @@ std::shared_ptr<GstAppDataState> setup_app_src(const immer::box<events::VideoSes
});
}

static bool push_data(GstAppDataState *data) {
bool push_data(GstAppDataState *data) {
GstFlowReturn ret;

if (data->wayland_state) {
Expand Down Expand Up @@ -69,7 +55,7 @@ static bool push_data(GstAppDataState *data) {
return false;
}

static void app_src_need_data(GstElement *pipeline, guint size, GstAppDataState *data) {
void app_src_need_data(GstElement *pipeline, guint size, GstAppDataState *data) {
if (!data->source) {
logs::log(logs::debug, "[WAYLAND] Start feeding app-src");
data->source = g_idle_source_new();
Expand All @@ -78,7 +64,7 @@ static void app_src_need_data(GstElement *pipeline, guint size, GstAppDataState
}
}

static void app_src_enough_data(GstElement *pipeline, guint size, GstAppDataState *data) {
void app_src_enough_data(GstElement *pipeline, guint size, GstAppDataState *data) {
if (data->source) {
logs::log(logs::trace, "app_src_enough_data");
g_source_destroy(data->source);
Expand All @@ -90,12 +76,64 @@ static void app_src_enough_data(GstElement *pipeline, guint size, GstAppDataStat
using namespace wolf::core::gstreamer;
using namespace wolf::core;

void start_video_producer(std::size_t session_id,
wolf::core::virtual_display::wl_state_ptr wl_state,
const wolf::core::virtual_display::DisplayMode &display_mode,
const std::shared_ptr<events::EventBusType> &event_bus) {
auto appsrc_state = streaming::custom_src::setup_app_src(display_mode, std::move(wl_state));
auto pipeline = fmt::format(
"appsrc is-live=true name=wolf_wayland_source ! queue ! interpipesink name={} sync=true async=false",
session_id);
logs::log(logs::debug, "Starting pipeline: {}", pipeline);
run_pipeline(pipeline, [=](auto pipeline, auto loop) {
if (auto app_src_el = gst_bin_get_by_name(GST_BIN(pipeline.get()), "wolf_wayland_source")) {
appsrc_state->context = g_main_context_get_thread_default();
logs::log(logs::debug, "Setting up wolf_wayland_source");
g_assert(GST_IS_APP_SRC(app_src_el));

auto app_src_ptr = wolf::core::gstreamer::gst_element_ptr(app_src_el, ::gst_object_unref);

auto caps = set_resolution(*appsrc_state->wayland_state, display_mode, app_src_ptr);
g_object_set(app_src_ptr.get(), "caps", caps.get(), NULL);
// No seeking is supported, this is a live stream
g_object_set(app_src_el, "stream-type", GST_APP_STREAM_TYPE_STREAM, NULL);
// appsrc will drop any buffers that are pushed into it once its internal queue is full
g_object_set(app_src_el, "leaky-type", GST_APP_LEAKY_TYPE_DOWNSTREAM, NULL);
// sometimes the encoder or the network sink might lag behind, we'll keep up to 3 buffers in the queue
g_object_set(app_src_el, "max-buffers", 3, NULL);

/* Adapted from the tutorial at:
* https://gstreamer.freedesktop.org/documentation/tutorials/basic/short-cutting-the-pipeline.html?gi-language=c*/
g_signal_connect(app_src_el,
"need-data",
G_CALLBACK(streaming::custom_src::app_src_need_data),
appsrc_state.get());
g_signal_connect(app_src_el,
"enough-data",
G_CALLBACK(streaming::custom_src::app_src_enough_data),
appsrc_state.get());
appsrc_state->app_src = std::move(app_src_ptr);
}

// TODO: pause and resume? Should we do it?

auto stop_handler = event_bus->register_handler<immer::box<events::StopStreamEvent>>(
[session_id, loop](const immer::box<events::StopStreamEvent> &ev) {
if (ev->session_id == session_id) {
logs::log(logs::debug, "[GSTREAMER] Stopping pipeline: {}", session_id);
g_main_loop_quit(loop.get());
}
});

return immer::array<immer::box<events::EventBusHandlers>>{std::move(stop_handler)};
});
}

/**
* Start VIDEO pipeline
*/
void start_streaming_video(const immer::box<events::VideoSession> &video_session,
const std::shared_ptr<events::EventBusType> &event_bus,
wolf::core::virtual_display::wl_state_ptr wl_ptr,
unsigned short client_port) {
std::string color_range = (video_session->color_range == events::ColorRange::JPEG) ? "jpeg" : "mpeg2";
std::string color_space;
Expand All @@ -112,6 +150,7 @@ void start_streaming_video(const immer::box<events::VideoSession> &video_session
}

auto pipeline = fmt::format(fmt::runtime(video_session->gst_pipeline),
fmt::arg("session_id", video_session->session_id),
fmt::arg("width", video_session->display_mode.width),
fmt::arg("height", video_session->display_mode.height),
fmt::arg("fps", video_session->display_mode.refreshRate),
Expand All @@ -127,32 +166,7 @@ void start_streaming_video(const immer::box<events::VideoSession> &video_session
fmt::arg("host_port", video_session->port));
logs::log(logs::debug, "Starting video pipeline: \n{}", pipeline);

auto appsrc_state = custom_src::setup_app_src(video_session, std::move(wl_ptr));

run_pipeline(pipeline, [video_session, event_bus, appsrc_state](auto pipeline, auto loop) {
if (auto app_src_el = gst_bin_get_by_name(GST_BIN(pipeline.get()), "wolf_wayland_source")) {
appsrc_state->context = g_main_context_get_thread_default();
logs::log(logs::debug, "Setting up wolf_wayland_source");
g_assert(GST_IS_APP_SRC(app_src_el));

auto app_src_ptr = wolf::core::gstreamer::gst_element_ptr(app_src_el, ::gst_object_unref);

auto caps = set_resolution(*appsrc_state->wayland_state, video_session->display_mode, app_src_ptr);
g_object_set(app_src_ptr.get(), "caps", caps.get(), NULL);
// No seeking is supported, this is a live stream
g_object_set(app_src_el, "stream-type", GST_APP_STREAM_TYPE_STREAM, NULL);
// appsrc will drop any buffers that are pushed into it once its internal queue is full
g_object_set(app_src_el, "leaky-type", GST_APP_LEAKY_TYPE_DOWNSTREAM, NULL);
// sometimes the encoder or the network sink might lag behind, we'll keep up to 3 buffers in the queue
g_object_set(app_src_el, "max-buffers", 3, NULL);

/* Adapted from the tutorial at:
* https://gstreamer.freedesktop.org/documentation/tutorials/basic/short-cutting-the-pipeline.html?gi-language=c*/
g_signal_connect(app_src_el, "need-data", G_CALLBACK(custom_src::app_src_need_data), appsrc_state.get());
g_signal_connect(app_src_el, "enough-data", G_CALLBACK(custom_src::app_src_enough_data), appsrc_state.get());
appsrc_state->app_src = std::move(app_src_ptr);
}

run_pipeline(pipeline, [video_session, event_bus](auto pipeline, auto loop) {
/*
* The force IDR event will be triggered by the control stream.
* We have to pass this back into the gstreamer pipeline
Expand Down
29 changes: 28 additions & 1 deletion src/moonlight-server/streaming/streaming.hpp
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
#pragma once
#include "moonlight/fec.hpp"
#include <boost/asio.hpp>
#include <core/gstreamer.hpp>
#include <core/virtual-display.hpp>
#include <events/events.hpp>
#include <fmt/core.h>
#include <fmt/format.h>
#include <gst-plugin/gstrtpmoonlightpay_audio.hpp>
#include <gst-plugin/gstrtpmoonlightpay_video.hpp>
#include <gst-plugin/video.hpp>
#include <gst/gst.h>
#include <gstreamer-1.0/gst/app/gstappsink.h>
#include <gstreamer-1.0/gst/app/gstappsrc.h>
#include <immer/box.hpp>
#include <memory>

namespace streaming {

using namespace wolf::core;

void start_video_producer(std::size_t session_id,
wolf::core::virtual_display::wl_state_ptr wl_state,
const wolf::core::virtual_display::DisplayMode &display_mode,
const std::shared_ptr<events::EventBusType> &event_bus);

void start_streaming_video(const immer::box<events::VideoSession> &video_session,
const std::shared_ptr<events::EventBusType> &event_bus,
wolf::core::virtual_display::wl_state_ptr wl_state,
unsigned short client_port);

void start_streaming_audio(const immer::box<events::AudioSession> &audio_session,
Expand All @@ -26,6 +34,25 @@ void start_streaming_audio(const immer::box<events::AudioSession> &audio_session
const std::string &sink_name,
const std::string &server_name);

namespace custom_src {

struct GstAppDataState {
wolf::core::gstreamer::gst_element_ptr app_src;
wolf::core::virtual_display::wl_state_ptr wayland_state;
GMainContext *context;
GSource *source;
int framerate;
GstClockTime timestamp = 0;
};

std::shared_ptr<GstAppDataState> setup_app_src(const wolf::core::virtual_display::DisplayMode &video_session,
wolf::core::virtual_display::wl_state_ptr wl_ptr);

bool push_data(GstAppDataState *data);
void app_src_need_data(GstElement *pipeline, guint size, GstAppDataState *data);
void app_src_enough_data(GstElement *pipeline, guint size, GstAppDataState *data);
} // namespace custom_src

/**
* @return the Gstreamer version we are linked to
*/
Expand Down
Loading

0 comments on commit 1a78e85

Please sign in to comment.