Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Subscribe Announces #303

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
25 changes: 24 additions & 1 deletion cmd/examples/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,26 @@ namespace qclient_vars {
bool publish_clock{ false };
}

class MySubscribeAnnouncesHandler : public quicr::SubscribeAnnouncesHandler
{

public:
MySubscribeAnnouncesHandler(const quicr::TrackNamespace& namespace_prefix)
: SubscribeAnnouncesHandler(namespace_prefix)
{
}

void TrackNamespaceReceived(const quicr::TrackNamespace& track_namespace)
{
auto data = std::string{ track_namespace.begin(), track_namespace.end() };
SPDLOG_INFO("SubscribeAnnounceHandler: Matching Announce Received: {0}", data);
}

void StatusChanged([[maybe_unused]] Status status)
{
SPDLOG_INFO("SubscribeAnnounceHandler: Status Changed: {0}", uint8_t(status));
}
};
/**
* @brief Subscribe track handler
* @details Subscribe track handler used for the subscribe command line option.
Expand Down Expand Up @@ -214,13 +234,16 @@ DoSubscriber(const quicr::FullTrackName& full_track_name,
const bool& stop)
{
auto track_handler = std::make_shared<MySubscribeTrackHandler>(full_track_name);

auto announces_handler = std::make_shared<MySubscribeAnnouncesHandler>(full_track_name.name_space);
SPDLOG_INFO("Started subscriber");

bool subscribe_track{ false };

while (not stop) {
if ((!subscribe_track) && (client->GetStatus() == MyClient::Status::kReady)) {
SPDLOG_INFO("Subscribing to announces");
client->SubscribeAnnounces(announces_handler);

SPDLOG_INFO("Subscribing to track");
client->SubscribeTrack(track_handler);
subscribe_track = true;
Expand Down
31 changes: 31 additions & 0 deletions cmd/examples/server.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: Copyright (c) 2024 Cisco Systems
// SPDX-License-Identifier: BSD-2-Clause

#include <algorithm>
#include <condition_variable>
#include <oss/cxxopts.hpp>
#include <set>
Expand All @@ -19,6 +20,12 @@ using FullTrackNameHash = uint64_t;
namespace qserver_vars {
std::mutex state_mutex;

/**
* Map of subscribers who want to learn about announces
*
*/
std::map<quicr::TrackNamespace, std::set<quicr::ConnectionHandle>> announce_subscribers{};

/**
* Map of subscribes (e.g., track alias) sent to announcements
*
Expand Down Expand Up @@ -245,6 +252,15 @@ class MyServer : public quicr::Server
SPDLOG_INFO("New connection handle {0} accepted from {1}:{2}", connection_handle, remote.ip, remote.port);
}

void SubscribeForAnnouncesReceived(quicr::ConnectionHandle connection_handle,
quicr::TrackNamespace& track_namespace_prefix) override
{
SPDLOG_INFO("SubscribeAnnounces received {0} accepted from {1}:{2}",
connection_handle,
track_namespace_prefix.ToString());
qserver_vars::announce_subscribers[track_namespace_prefix].insert(connection_handle);
}

void MetricsSampled(quicr::ConnectionHandle connection_handle, const quicr::ConnectionMetrics& metrics) override
{
SPDLOG_DEBUG("Metrics sample time: {0}"
Expand Down Expand Up @@ -319,6 +335,21 @@ class MyServer : public quicr::Server
announce_response.reason_code = quicr::Server::AnnounceResponse::ReasonCode::kOk;
ResolveAnnounce(connection_handle, track_namespace, announce_response);

// check to see if there are subscribers who need this announce
for (const auto& [ns, conns] : qserver_vars::announce_subscribers) {
if (track_namespace.Contains(ns)) {
SPDLOG_INFO(
"Found a match for announced namespace. prefix: {0}, namespace: {1}", ns.ToString(), ns.ToString());
// Send announce to the subscribers
for (const auto& conn : conns) {
ForwardAnnounce(conn, track_namespace);
}
} else {
SPDLOG_INFO(
"No match Found for announced namespace. prefix: {0}, namespace: {1}", ns.ToString(), ns.ToString());
}
}

auto& anno_tracks = qserver_vars::announce_active[th.track_namespace_hash][connection_handle];

// Check if there are any subscribes. If so, send subscribe to announce for all tracks matching namespace
Expand Down
7 changes: 7 additions & 0 deletions include/quicr/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,13 @@ namespace quicr {
}
}

void SubscribeAnnounces(std::shared_ptr<SubscribeAnnouncesHandler> handler)
{
if (connection_handle_) {
Transport::SubscribeAnnounces(*connection_handle_, std::move(handler));
}
}

private:
bool ProcessCtrlMessage(ConnectionContext& conn_ctx, BytesSpan stream_buffer) override;

Expand Down
45 changes: 45 additions & 0 deletions include/quicr/detail/messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ namespace quicr::messages {

GOAWAY = 0x10,

SUBSCRIBE_ANNOUNCES = 0x11,
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
SUBSCRIBE_ANNOUNCES_OK,
SUBSCRIBE_ANNOUNCES_ERROR,
UNSUBSCRIBE_ANNOUNCES,
MAX_SUBSCRIBE_ID = 0x15,
FETCH,
FETCH_CANCEL,
Expand Down Expand Up @@ -324,6 +328,7 @@ namespace quicr::messages {
//
// GoAway
//

struct MoqGoaway
{
Bytes new_session_uri;
Expand All @@ -332,6 +337,45 @@ namespace quicr::messages {
BytesSpan operator>>(BytesSpan buffer, MoqGoaway& msg);
Bytes& operator<<(Bytes& buffer, const MoqGoaway& msg);

//
// Subscribe Announces
//

struct MoqSubscribeAnnounces
{
TrackNamespace track_namespace_prefix;
std::vector<MoqParameter> params;
};

BytesSpan operator>>(BytesSpan buffer, MoqSubscribeAnnounces& msg);
Bytes& operator<<(Bytes& buffer, const MoqSubscribeAnnounces& msg);

struct MoqSubscribeAnnouncesOk
{
TrackNamespace track_namespace_prefix;
};

BytesSpan operator>>(BytesSpan buffer, MoqSubscribeAnnouncesOk& msg);
Bytes& operator<<(Bytes& buffer, const MoqSubscribeAnnouncesOk& msg);

struct MoqSubscribeAnnouncesError
{
TrackNamespace track_namespace_prefix;
std::optional<ErrorCode> err_code;
std::optional<ReasonPhrase> reason_phrase;
};

BytesSpan operator>>(BytesSpan buffer, MoqSubscribeAnnouncesError& msg);
Bytes& operator<<(Bytes& buffer, const MoqSubscribeAnnouncesError& msg);

struct MoqUnsubscribeAnnounces
{
TrackNamespace track_namespace_prefix;
};

BytesSpan operator>>(BytesSpan buffer, MoqUnsubscribeAnnounces& msg);
Bytes& operator<<(Bytes& buffer, const MoqUnsubscribeAnnounces& msg);

//
// Fetch
//
Expand Down Expand Up @@ -388,6 +432,7 @@ namespace quicr::messages {
// Data Streams
//

// Datagrams
struct MoqObjectDatagram
{
SubscribeId subscribe_id;
Expand Down
21 changes: 21 additions & 0 deletions include/quicr/detail/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <quicr/config.h>
#include <quicr/metrics.h>
#include <quicr/publish_track_handler.h>
#include <quicr/subscribe_announces_handler.h>
#include <quicr/subscribe_track_handler.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
Expand Down Expand Up @@ -147,6 +148,16 @@ namespace quicr {
void UnpublishTrack(ConnectionHandle connection_handle,
const std::shared_ptr<PublishTrackHandler>& track_handler);

/**
*
* @param connection_handle
* @param handler
*/
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
void SubscribeAnnounces(ConnectionHandle connection_handle,
const std::shared_ptr<SubscribeAnnouncesHandler>& handler);

void SendAnnounce(ConnectionHandle connection_handle, const TrackNamespace& track_namespace);

/**
* @brief Get the status of the Client
*
Expand Down Expand Up @@ -229,6 +240,9 @@ namespace quicr {
/// Published tracks by quic transport data context ID.
std::map<DataContextId, std::shared_ptr<PublishTrackHandler>> pub_tracks_by_data_ctx_id;

// Tracknamespaces prefixes active subscriptions
std::map<TrackNamespace, std::shared_ptr<SubscribeAnnouncesHandler>> announce_subscriptions{};

ConnectionMetrics metrics; ///< Connection metrics
};

Expand Down Expand Up @@ -268,6 +282,13 @@ namespace quicr {
uint64_t track_alias,
messages::SubscribeError error,
const std::string& reason);
void SendSubscribeAnnounces(ConnectionContext& conn_ctx, const TrackNamespace& prefix);
void SendSubscribeAnnouncesOk(ConnectionContext& conn_ctx, const TrackNamespace& prefix);
void SendSubscribeAnnouncesError(ConnectionContext& conn_ctx,
const TrackNamespace& prefix,
messages::SubscribeError error,
const std::string& reason);
void SendUnsubscribeAnnounces(ConnectionContext& conn_ctx, const TrackNamespace& prefix);
void SendFetchError(ConnectionContext& conn_ctx,
uint64_t subscribe_id,
messages::FetchError error,
Expand Down
48 changes: 48 additions & 0 deletions include/quicr/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ namespace quicr {
std::optional<Bytes> reason_phrase;
};

// TODO (suhas): Add details.
struct SubscribeAnnouncesResponse
{};

/**
* @brief MoQ Server constructor to create the MOQ server mode instance
*
Expand Down Expand Up @@ -214,6 +218,50 @@ namespace quicr {
*/
virtual void UnsubscribeReceived(ConnectionHandle connection_handle, uint64_t subscribe_id) = 0;

/**
* @brief Subscription for announces
*
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
* @param connection_handle Subscriber connection Id
* @param track_namespace_prefix TrackNamespace prefix to report matching announcements
*
* TODO: Add support for receiving parameters
*/
virtual void SubscribeForAnnouncesReceived(ConnectionHandle connection_handle,
TrackNamespace& track_namespace_prefix);

/**
* @brief Resolve received subscribe for announces by providing
* appropriate response.
*
* @param connection_handle Subscriber connection Id
* @param track_namespace Namespace prefix matching the subscribe_announces message
* @param response Response to resolve subscribe_announces
*
*/
void ResolveSubscribeAnnounces(ConnectionHandle connection_handle,
const TrackNamespace& track_namespace,
const SubscribeAnnouncesResponse& aresponse);

/**
* @brief Forward announcements to subscribers
*
* @details Send existing and new announces that match namespace prefix in SubscribeAnnounces
* This function allows the subscriber to learn about new announces.
*
* @param connection_handle
* @param track_namespace
*/
void ForwardAnnounce(ConnectionHandle connection_handle, const TrackNamespace& track_namespace)
{

if (connection_handle) {
// TODO: This is a fire and forget sending announcement.
// Not sure if server should even worry about getting an OK,
// since if a subscribe follows , that implies it as OK
Transport::SendAnnounce(connection_handle, track_namespace);
}
}

///@}
// --END OF CALLBACKS ----------------------------------------------------------------------------------

Expand Down
94 changes: 94 additions & 0 deletions include/quicr/subscribe_announces_handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// SPDX-FileCopyrightText: Copyright (c) 2024 Cisco Systems
// SPDX-License-Identifier: BSD-2-Clause

#pragma once

#include <quicr/detail/base_track_handler.h>
#include <quicr/metrics.h>
#include <quicr/publish_track_handler.h>

namespace quicr {

class SubscribeAnnouncesHandler
{
public:
enum class Error : uint8_t
{
kOk = 0,
kNotAuthorized,
kNotSubscribed,
kNoData
};

enum class Status : uint8_t
{
kOk = 0,
kNotConnected,
kSubscribeError,
kNotAuthorized,
kNotSubscribed,
kPendingSubscribeResponse,
kSendingUnsubscribe ///< In this state, callbacks will not be called
};

protected:
SubscribeAnnouncesHandler(const TrackNamespace& namespace_prefix)
: track_namespace_prefix_(namespace_prefix)
{
}

public:
static std::shared_ptr<SubscribeAnnouncesHandler> Create(const TrackNamespace& namespace_prefix)
{
return std::shared_ptr<SubscribeAnnouncesHandler>(new SubscribeAnnouncesHandler(namespace_prefix));
}

constexpr Status GetStatus() const noexcept { return status_; }

TrackNamespace GetTrackNamespacePrefix() const noexcept { return track_namespace_prefix_; }

// --------------------------------------------------------------------------
// Public Virtual API callback event methods
// --------------------------------------------------------------------------
/** @name Callbacks
*/
///@{

virtual void TrackNamespaceReceived([[maybe_unused]] const quicr::TrackNamespace& track_namespace) {}

virtual void StatusChanged([[maybe_unused]] Status status) {}

virtual ~SubscribeAnnouncesHandler() = default;
///@}

// --------------------------------------------------------------------------
// Metrics
// --------------------------------------------------------------------------

// --------------------------------------------------------------------------
// Internal
// --------------------------------------------------------------------------
private:
/**
* @brief Set the subscribe status
* @param status Status of the subscribe
*/
void SetStatus(Status status) noexcept
{
status_ = status;
StatusChanged(status);
}

// --------------------------------------------------------------------------
// Member variables
// --------------------------------------------------------------------------
Status status_{ Status::kNotSubscribed };

TrackNamespace track_namespace_prefix_;

friend class Transport;
friend class Client;
friend class Server;
};

} // namespace moq
Loading