Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Subscribe Announces #303

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
25 changes: 24 additions & 1 deletion cmd/examples/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,26 @@ namespace qclient_vars {
bool publish_clock{ false };
}

class MySubscribeAnnouncesHandler : public quicr::SubscribeAnnouncesHandler
{

public:
MySubscribeAnnouncesHandler(const quicr::TrackNamespace& namespace_prefix)
: SubscribeAnnouncesHandler(namespace_prefix)
{
}

void MatchingTrackNamespaceReceived(const quicr::TrackNamespace& track_namespace)
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
{
auto data = std::string{ track_namespace.begin(), track_namespace.end() };
SPDLOG_INFO("SubscribeAnnounceHandler: Matching Announce Received: {0}", data);
}

void StatusChanged([[maybe_unused]] Status status)
{
SPDLOG_INFO("SubscribeAnnounceHandler: Status Changed: {0}", uint8_t(status));
}
};
/**
* @brief Subscribe track handler
* @details Subscribe track handler used for the subscribe command line option.
Expand Down Expand Up @@ -214,13 +234,16 @@ DoSubscriber(const quicr::FullTrackName& full_track_name,
const bool& stop)
{
auto track_handler = std::make_shared<MySubscribeTrackHandler>(full_track_name);

auto announces_handler = std::make_shared<MySubscribeAnnouncesHandler>(full_track_name.name_space);
SPDLOG_INFO("Started subscriber");

bool subscribe_track{ false };

while (not stop) {
if ((!subscribe_track) && (client->GetStatus() == MyClient::Status::kReady)) {
SPDLOG_INFO("Subscribing to announces");
client->SubscribeAnnounces(announces_handler);

SPDLOG_INFO("Subscribing to track");
client->SubscribeTrack(track_handler);
subscribe_track = true;
Expand Down
46 changes: 46 additions & 0 deletions cmd/examples/server.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: Copyright (c) 2024 Cisco Systems
// SPDX-License-Identifier: BSD-2-Clause

#include <algorithm>
#include <condition_variable>
#include <oss/cxxopts.hpp>
#include <set>
Expand All @@ -19,6 +20,12 @@ using FullTrackNameHash = uint64_t;
namespace qserver_vars {
std::mutex state_mutex;

/**
* Map of subscribers who want to learn about announces
*
*/
std::map<quicr::TrackNamespace, std::set<quicr::ConnectionHandle>> announce_subscribers{};

/**
* Map of subscribes (e.g., track alias) sent to announcements
*
Expand Down Expand Up @@ -86,6 +93,15 @@ namespace qserver_vars {
std::unordered_map<quicr::messages::TrackAlias,
std::unordered_map<quicr::ConnectionHandle, std::shared_ptr<quicr::SubscribeTrackHandler>>>
pub_subscribes;

static std::string ToAscii(const std::vector<Span<const uint8_t>>& data)
{
std::stringstream output;
for (const auto& entry : data) {
output << entry.data() << ",";
}
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
return output.str();
}
}

/**
Expand Down Expand Up @@ -245,6 +261,15 @@ class MyServer : public quicr::Server
SPDLOG_INFO("New connection handle {0} accepted from {1}:{2}", connection_handle, remote.ip, remote.port);
}

void SubscribeAnnouncesReceived(quicr::ConnectionHandle connection_handle,
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
quicr::TrackNamespace& track_namespace_prefix) override
{
SPDLOG_INFO("SubscribeAnnounces received {0} accepted from {1}:{2}",
connection_handle,
qserver_vars::ToAscii(track_namespace_prefix.GetEntries()));
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
qserver_vars::announce_subscribers[track_namespace_prefix].insert(connection_handle);
}

void MetricsSampled(quicr::ConnectionHandle connection_handle, const quicr::ConnectionMetrics& metrics) override
{
SPDLOG_DEBUG("Metrics sample time: {0}"
Expand Down Expand Up @@ -319,6 +344,23 @@ class MyServer : public quicr::Server
announce_response.reason_code = quicr::Server::AnnounceResponse::ReasonCode::kOk;
ResolveAnnounce(connection_handle, track_namespace, announce_response);

// check to see if there are subscribers who need this announce
for (const auto& entry : qserver_vars::announce_subscribers) {
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
if (entry.first.Contains(track_namespace)) {
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
SPDLOG_INFO("Found a match for announced namespace. prefix: {0}, namespace: {1}",
qserver_vars::ToAscii(entry.first.GetEntries()),
qserver_vars::ToAscii(track_namespace.GetEntries()));
// Send announce to the subscribers
for (const auto& conn : entry.second) {
ForwardAnnounce(conn, track_namespace);
}
} else {
SPDLOG_INFO("No match Found for announced namespace. prefix: {0}, namespace: {1}",
qserver_vars::ToAscii(entry.first.GetEntries()),
qserver_vars::ToAscii(track_namespace.GetEntries()));
}
}

auto& anno_tracks = qserver_vars::announce_active[th.track_namespace_hash][connection_handle];

// Check if there are any subscribes. If so, send subscribe to announce for all tracks matching namespace
Expand Down Expand Up @@ -496,6 +538,10 @@ class MyServer : public quicr::Server
}
}
}

private:
// Set of prefixes for which announces needs to be resolved.
std::set<quicr::TrackNamespace> announce_prefixes_;
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
};

/* -------------------------------------------------------------------------------------------------
Expand Down
7 changes: 7 additions & 0 deletions include/quicr/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,13 @@ namespace quicr {
}
}

void SubscribeAnnounces(std::shared_ptr<SubscribeAnnouncesHandler> handler)
{
if (connection_handle_) {
Transport::SubscribeAnnounces(*connection_handle_, std::move(handler));
}
}

private:
bool ProcessCtrlMessage(ConnectionContext& conn_ctx, BytesSpan stream_buffer) override;

Expand Down
45 changes: 45 additions & 0 deletions include/quicr/detail/messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ namespace quicr::messages {

GOAWAY = 0x10,

SUBSCRIBE_ANNOUNCES = 0x11,
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
SUBSCRIBE_ANNOUNCES_OK,
SUBSCRIBE_ANNOUNCES_ERROR,
UNSUBSCRIBE_ANNOUNCES,
MAX_SUBSCRIBE_ID = 0x15,
FETCH,
FETCH_CANCEL,
Expand Down Expand Up @@ -324,6 +328,7 @@ namespace quicr::messages {
//
// GoAway
//

struct MoqGoaway
{
Bytes new_session_uri;
Expand All @@ -332,6 +337,45 @@ namespace quicr::messages {
BytesSpan operator>>(BytesSpan buffer, MoqGoaway& msg);
Bytes& operator<<(Bytes& buffer, const MoqGoaway& msg);

//
// Subscribe Announces
//

struct MoqSubscribeAnnounces
{
TrackNamespace track_namespace_prefix;
std::vector<MoqParameter> params;
};

BytesSpan operator>>(BytesSpan buffer, MoqSubscribeAnnounces& msg);
Bytes& operator<<(Bytes& buffer, const MoqSubscribeAnnounces& msg);

struct MoqSubscribeAnnouncesOk
{
TrackNamespace track_namespace_prefix;
};

BytesSpan operator>>(BytesSpan buffer, MoqSubscribeAnnouncesOk& msg);
Bytes& operator<<(Bytes& buffer, const MoqSubscribeAnnouncesOk& msg);

struct MoqSubscribeAnnouncesError
{
TrackNamespace track_namespace_prefix;
std::optional<ErrorCode> err_code;
std::optional<ReasonPhrase> reason_phrase;
};

BytesSpan operator>>(BytesSpan buffer, MoqSubscribeAnnouncesError& msg);
Bytes& operator<<(Bytes& buffer, const MoqSubscribeAnnouncesError& msg);

struct MoqUnsubscribeAnnounces
{
TrackNamespace track_namespace_prefix;
};

BytesSpan operator>>(BytesSpan buffer, MoqUnsubscribeAnnounces& msg);
Bytes& operator<<(Bytes& buffer, const MoqUnsubscribeAnnounces& msg);

//
// Fetch
//
Expand Down Expand Up @@ -388,6 +432,7 @@ namespace quicr::messages {
// Data Streams
//

// Datagrams
struct MoqObjectDatagram
{
SubscribeId subscribe_id;
Expand Down
21 changes: 21 additions & 0 deletions include/quicr/detail/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <quicr/config.h>
#include <quicr/metrics.h>
#include <quicr/publish_track_handler.h>
#include <quicr/subscribe_announces_handler.h>
#include <quicr/subscribe_track_handler.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
Expand Down Expand Up @@ -147,6 +148,16 @@ namespace quicr {
void UnpublishTrack(ConnectionHandle connection_handle,
const std::shared_ptr<PublishTrackHandler>& track_handler);

/**
*
* @param connection_handle
* @param handler
*/
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
void SubscribeAnnounces(ConnectionHandle connection_handle,
const std::shared_ptr<SubscribeAnnouncesHandler>& handler);

void SendAnnounce(ConnectionHandle connection_handle, const TrackNamespace& track_namespace);

/**
* @brief Get the status of the Client
*
Expand Down Expand Up @@ -229,6 +240,9 @@ namespace quicr {
/// Published tracks by quic transport data context ID.
std::map<DataContextId, std::shared_ptr<PublishTrackHandler>> pub_tracks_by_data_ctx_id;

// Tracknamespaces prefixes active subscriptions
std::map<TrackNamespace, std::shared_ptr<SubscribeAnnouncesHandler>> announce_subscriptions{};

ConnectionMetrics metrics; ///< Connection metrics
};

Expand Down Expand Up @@ -268,6 +282,13 @@ namespace quicr {
uint64_t track_alias,
messages::SubscribeError error,
const std::string& reason);
void SendSubscribeAnnounces(ConnectionContext& conn_ctx, const TrackNamespace& prefix);
void SendSubscribeAnnouncesOk(ConnectionContext& conn_ctx, const TrackNamespace& prefix);
void SendSubscribeAnnouncesError(ConnectionContext& conn_ctx,
const TrackNamespace& prefix,
messages::SubscribeError error,
const std::string& reason);
void SendUnsubscribeAnnounces(ConnectionContext& conn_ctx, const TrackNamespace& prefix);
void SendFetchError(ConnectionContext& conn_ctx,
uint64_t subscribe_id,
messages::FetchError error,
Expand Down
39 changes: 39 additions & 0 deletions include/quicr/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,45 @@ namespace quicr {
*/
virtual void UnsubscribeReceived(ConnectionHandle connection_handle, uint64_t subscribe_id) = 0;

/**
*
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
* @param connection_handle
* @param track_namespace_prefix
*
* TODO: Add support for receiving parameters
*/
virtual void SubscribeAnnouncesReceived(ConnectionHandle connection_handle,
TrackNamespace& track_namespace_prefix);

/**
*
* @param connection_handle
* @param track_namespace
* @param announce_response
*
* TODO: AnnounceResponse is incorrect here.
*/
void ResolveSubscribeAnnounces(ConnectionHandle connection_handle,
const TrackNamespace& track_namespace,
const AnnounceResponse& announce_response);

/**
* Send existing and new announces that match namespace prefix in SubscribeAnnounces
suhasHere marked this conversation as resolved.
Show resolved Hide resolved
* This function allows the subscriber to learn about new announces.
* @param connection_handle
* @param track_namespace
*/
void ForwardAnnounce(ConnectionHandle connection_handle, const TrackNamespace& track_namespace)
{

if (connection_handle) {
// TODO: This is a fire and forget sending announcement.
// Not sure if server should even worry about getting an OK,
// since if a subscribe follows , that implies it as OK
Transport::SendAnnounce(connection_handle, track_namespace);
}
}

///@}
// --END OF CALLBACKS ----------------------------------------------------------------------------------

Expand Down
Loading