diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 5885d58d6c..d60e25bcc4 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 6.0 Changelog +* v6.0, 2024-06-15, Merge [#4089](https://github.com/ossrs/srs/pull/4089): SmartPtr: Support shared ptr for live source. v6.0.129 (#4089) * v6.0, 2024-06-14, Merge [#4085](https://github.com/ossrs/srs/pull/4085): SmartPtr: Support shared ptr for RTC source. v6.0.128 (#4085) * v6.0, 2024-06-13, Merge [#4083](https://github.com/ossrs/srs/pull/4083): SmartPtr: Use shared ptr in RTC TCP connection. v6.0.127 (#4083) * v6.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v6.0.126 (#4080) diff --git a/trunk/src/app/srs_app_coworkers.cpp b/trunk/src/app/srs_app_coworkers.cpp index 114ec0a703..1c33640ff3 100644 --- a/trunk/src/app/srs_app_coworkers.cpp +++ b/trunk/src/app/srs_app_coworkers.cpp @@ -122,7 +122,7 @@ SrsRequest* SrsCoWorkers::find_stream_info(string vhost, string app, string stre return it->second; } -srs_error_t SrsCoWorkers::on_publish(SrsLiveSource* s, SrsRequest* r) +srs_error_t SrsCoWorkers::on_publish(SrsRequest* r) { srs_error_t err = srs_success; @@ -140,7 +140,7 @@ srs_error_t SrsCoWorkers::on_publish(SrsLiveSource* s, SrsRequest* r) return err; } -void SrsCoWorkers::on_unpublish(SrsLiveSource* s, SrsRequest* r) +void SrsCoWorkers::on_unpublish(SrsRequest* r) { string url = r->get_stream_url(); diff --git a/trunk/src/app/srs_app_coworkers.hpp b/trunk/src/app/srs_app_coworkers.hpp index ce0322e6e8..cd284d410f 100644 --- a/trunk/src/app/srs_app_coworkers.hpp +++ b/trunk/src/app/srs_app_coworkers.hpp @@ -33,8 +33,8 @@ class SrsCoWorkers private: virtual SrsRequest* find_stream_info(std::string vhost, std::string app, std::string stream); public: - virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r); - virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r); + virtual srs_error_t on_publish(SrsRequest* r); + virtual void on_unpublish(SrsRequest* r); }; #endif diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index f4a52115c3..ba9e12f21d 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -391,7 +391,7 @@ void SrsEdgeFlvUpstream::kbps_sample(const char* label, srs_utime_t age) SrsEdgeIngester::SrsEdgeIngester() { - source = NULL; + source_ = NULL; edge = NULL; req = NULL; #ifdef SRS_APM @@ -415,9 +415,11 @@ SrsEdgeIngester::~SrsEdgeIngester() srs_freep(trd); } -srs_error_t SrsEdgeIngester::initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r) +srs_error_t SrsEdgeIngester::initialize(SrsSharedPtr s, SrsPlayEdge* e, SrsRequest* r) { - source = s; + // Because source references to this object, so we should directly use the source ptr. + source_ = s.get(); + edge = e; req = r; @@ -435,7 +437,7 @@ srs_error_t SrsEdgeIngester::start() { srs_error_t err = srs_success; - if ((err = source->on_publish()) != srs_success) { + if ((err = source_->on_publish()) != srs_success) { return srs_error_wrap(err, "notify source"); } @@ -455,8 +457,8 @@ void SrsEdgeIngester::stop() upstream->close(); // Notify source to un-publish if exists. - if (source) { - source->on_unpublish(); + if (source_) { + source_->on_unpublish(); } } @@ -549,7 +551,7 @@ srs_error_t SrsEdgeIngester::do_cycle() upstream = new SrsEdgeRtmpUpstream(redirect); } - if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) { + if ((err = source_->on_source_id_changed(_srs_context->get_id())) != srs_success) { return srs_error_wrap(err, "on source id changed"); } @@ -635,21 +637,21 @@ srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, stri // process audio packet if (msg->header.is_audio()) { - if ((err = source->on_audio(msg)) != srs_success) { + if ((err = source_->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "source consume audio"); } } // process video packet if (msg->header.is_video()) { - if ((err = source->on_video(msg)) != srs_success) { + if ((err = source_->on_video(msg)) != srs_success) { return srs_error_wrap(err, "source consume video"); } } // process aggregate packet if (msg->header.is_aggregate()) { - if ((err = source->on_aggregate(msg)) != srs_success) { + if ((err = source_->on_aggregate(msg)) != srs_success) { return srs_error_wrap(err, "source consume aggregate"); } return err; @@ -665,7 +667,7 @@ srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, stri if (dynamic_cast(pkt)) { SrsOnMetaDataPacket* metadata = dynamic_cast(pkt); - if ((err = source->on_meta_data(msg, metadata)) != srs_success) { + if ((err = source_->on_meta_data(msg, metadata)) != srs_success) { return srs_error_wrap(err, "source consume metadata"); } return err; @@ -725,7 +727,7 @@ SrsEdgeForwarder::SrsEdgeForwarder() edge = NULL; req = NULL; send_error_code = ERROR_SUCCESS; - source = NULL; + source_ = NULL; sdk = NULL; lb = new SrsLbRoundRobin(); @@ -747,9 +749,11 @@ void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size) return queue->set_queue_size(queue_size); } -srs_error_t SrsEdgeForwarder::initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r) +srs_error_t SrsEdgeForwarder::initialize(SrsSharedPtr s, SrsPublishEdge* e, SrsRequest* r) { - source = s; + // Because source references to this object, so we should directly use the source ptr. + source_ = s.get(); + edge = e; req = r; @@ -956,7 +960,7 @@ SrsPlayEdge::~SrsPlayEdge() srs_freep(ingester); } -srs_error_t SrsPlayEdge::initialize(SrsLiveSource* source, SrsRequest* req) +srs_error_t SrsPlayEdge::initialize(SrsSharedPtr source, SrsRequest* req) { srs_error_t err = srs_success; @@ -1048,7 +1052,7 @@ void SrsPublishEdge::set_queue_size(srs_utime_t queue_size) return forwarder->set_queue_size(queue_size); } -srs_error_t SrsPublishEdge::initialize(SrsLiveSource* source, SrsRequest* req) +srs_error_t SrsPublishEdge::initialize(SrsSharedPtr source, SrsRequest* req) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index efb5705204..a1f049c49f 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -10,6 +10,7 @@ #include #include +#include #include @@ -137,7 +138,9 @@ class SrsEdgeFlvUpstream : public SrsEdgeUpstream class SrsEdgeIngester : public ISrsCoroutineHandler { private: - SrsLiveSource* source; + // Because source references to this object, so we should directly use the source ptr. + SrsLiveSource* source_; +private: SrsPlayEdge* edge; SrsRequest* req; SrsCoroutine* trd; @@ -150,7 +153,7 @@ class SrsEdgeIngester : public ISrsCoroutineHandler SrsEdgeIngester(); virtual ~SrsEdgeIngester(); public: - virtual srs_error_t initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r); + virtual srs_error_t initialize(SrsSharedPtr s, SrsPlayEdge* e, SrsRequest* r); virtual srs_error_t start(); virtual void stop(); virtual std::string get_curr_origin(); @@ -172,7 +175,9 @@ class SrsEdgeIngester : public ISrsCoroutineHandler class SrsEdgeForwarder : public ISrsCoroutineHandler { private: - SrsLiveSource* source; + // Because source references to this object, so we should directly use the source ptr. + SrsLiveSource* source_; +private: SrsPublishEdge* edge; SrsRequest* req; SrsCoroutine* trd; @@ -191,7 +196,7 @@ class SrsEdgeForwarder : public ISrsCoroutineHandler public: virtual void set_queue_size(srs_utime_t queue_size); public: - virtual srs_error_t initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r); + virtual srs_error_t initialize(SrsSharedPtr s, SrsPublishEdge* e, SrsRequest* r); virtual srs_error_t start(); virtual void stop(); // Interface ISrsReusableThread2Handler @@ -216,7 +221,7 @@ class SrsPlayEdge // Always use the req of source, // For we assume all client to edge is invalid, // if auth open, edge must valid it from origin, then service it. - virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req); + virtual srs_error_t initialize(SrsSharedPtr source, SrsRequest* req); // When client play stream on edge. virtual srs_error_t on_client_play(); // When all client stopped play, disconnect to origin. @@ -239,7 +244,7 @@ class SrsPublishEdge public: virtual void set_queue_size(srs_utime_t queue_size); public: - virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req); + virtual srs_error_t initialize(SrsSharedPtr source, SrsRequest* req); virtual bool can_publish(); // When client publish stream on edge. virtual srs_error_t on_client_publish(); diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 721f038a67..f7bab8a4c8 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -547,13 +547,13 @@ srs_error_t SrsHttpServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage return http_static->mux.serve_http(w, r); } -srs_error_t SrsHttpServer::http_mount(SrsLiveSource* s, SrsRequest* r) +srs_error_t SrsHttpServer::http_mount(SrsRequest* r) { - return http_stream->http_mount(s, r); + return http_stream->http_mount(r); } -void SrsHttpServer::http_unmount(SrsLiveSource* s, SrsRequest* r) +void SrsHttpServer::http_unmount(SrsRequest* r) { - http_stream->http_unmount(s, r); + http_stream->http_unmount(r); } diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 7952b40a37..de78c8b3c0 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -187,8 +187,8 @@ class SrsHttpServer : public ISrsHttpServeMux public: virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); public: - virtual srs_error_t http_mount(SrsLiveSource* s, SrsRequest* r); - virtual void http_unmount(SrsLiveSource* s, SrsRequest* r); + virtual srs_error_t http_mount(SrsRequest* r); + virtual void http_unmount(SrsRequest* r); }; #endif diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 181ee14c09..914cefd5c6 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -40,10 +40,9 @@ using namespace std; #include #include -SrsBufferCache::SrsBufferCache(SrsLiveSource* s, SrsRequest* r) +SrsBufferCache::SrsBufferCache(SrsRequest* r) { req = r->copy()->as_http(); - source = s; queue = new SrsMessageQueue(true); trd = new SrsSTCoroutine("http-stream", this); @@ -59,12 +58,11 @@ SrsBufferCache::~SrsBufferCache() srs_freep(req); } -srs_error_t SrsBufferCache::update_auth(SrsLiveSource* s, SrsRequest* r) +srs_error_t SrsBufferCache::update_auth(SrsRequest* r) { srs_freep(req); req = r->copy(); - source = s; - + return srs_success; } @@ -107,15 +105,20 @@ srs_error_t SrsBufferCache::cycle() srs_usleep(SRS_STREAM_CACHE_CYCLE); return err; } + + SrsSharedPtr live_source = _srs_sources->fetch(req); + if (!live_source.get()) { + return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); + } // the stream cache will create consumer to cache stream, // which will trigger to fetch stream from origin for edge. SrsLiveConsumer* consumer = NULL; SrsAutoFree(SrsLiveConsumer, consumer); - if ((err = source->create_consumer(consumer)) != srs_success) { + if ((err = live_source->create_consumer(consumer)) != srs_success) { return srs_error_wrap(err, "create consumer"); } - if ((err = source->consumer_dumps(consumer, false, false, true)) != srs_success) { + if ((err = live_source->consumer_dumps(consumer, false, false, true)) != srs_success) { return srs_error_wrap(err, "dumps consumer"); } @@ -553,9 +556,8 @@ srs_error_t SrsBufferWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwri return writer->writev(iov, iovcnt, pnwrite); } -SrsLiveStream::SrsLiveStream(SrsLiveSource* s, SrsRequest* r, SrsBufferCache* c) +SrsLiveStream::SrsLiveStream(SrsRequest* r, SrsBufferCache* c) { - source = s; cache = c; req = r->copy()->as_http(); security_ = new SrsSecurity(); @@ -567,10 +569,8 @@ SrsLiveStream::~SrsLiveStream() srs_freep(security_); } -srs_error_t SrsLiveStream::update_auth(SrsLiveSource* s, SrsRequest* r) +srs_error_t SrsLiveStream::update_auth(SrsRequest* r) { - source = s; - srs_freep(req); req = r->copy()->as_http(); @@ -660,14 +660,19 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess // Enter chunked mode, because we didn't set the content-length. w->write_header(SRS_CONSTS_HTTP_OK); + + SrsSharedPtr live_source = _srs_sources->fetch(req); + if (!live_source.get()) { + return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); + } // create consumer of souce, ignore gop cache, use the audio gop cache. SrsLiveConsumer* consumer = NULL; SrsAutoFree(SrsLiveConsumer, consumer); - if ((err = source->create_consumer(consumer)) != srs_success) { + if ((err = live_source->create_consumer(consumer)) != srs_success) { return srs_error_wrap(err, "create consumer"); } - if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) { + if ((err = live_source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) { return srs_error_wrap(err, "dumps consumer"); } @@ -689,7 +694,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess // if gop cache enabled for encoder, dump to consumer. if (enc->has_cache()) { - if ((err = enc->dump_cache(consumer, source->jitter())) != srs_success) { + if ((err = enc->dump_cache(consumer, live_source->jitter())) != srs_success) { return srs_error_wrap(err, "encoder dump cache"); } } @@ -876,7 +881,6 @@ SrsLiveEntry::SrsLiveEntry(std::string m) cache = NULL; req = NULL; - source = NULL; std::string ext = srs_path_filext(m); _is_flv = (ext == ".flv"); @@ -954,7 +958,7 @@ srs_error_t SrsHttpStreamServer::initialize() } // TODO: FIXME: rename for HTTP FLV mount. -srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r) +srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r) { srs_error_t err = srs_success; @@ -982,10 +986,9 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r) entry = new SrsLiveEntry(mount); - entry->source = s; entry->req = r->copy()->as_http(); - entry->cache = new SrsBufferCache(s, r); - entry->stream = new SrsLiveStream(s, r, entry->cache); + entry->cache = new SrsBufferCache(r); + entry->stream = new SrsLiveStream(r, entry->cache); // TODO: FIXME: maybe refine the logic of http remux service. // if user push streams followed: @@ -994,8 +997,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r) // and they will using the same template, such as: [vhost]/[app]/[stream].flv // so, need to free last request object, otherwise, it will cause memory leak. srs_freep(tmpl->req); - - tmpl->source = s; + tmpl->req = r->copy()->as_http(); sflvs[sid] = entry; @@ -1015,8 +1017,8 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r) } else { // The entry exists, we reuse it and update the request of stream and cache. entry = sflvs[sid]; - entry->stream->update_auth(s, r); - entry->cache->update_auth(s, r); + entry->stream->update_auth(r); + entry->cache->update_auth(r); } if (entry->stream) { @@ -1027,7 +1029,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r) return err; } -void SrsHttpStreamServer::http_unmount(SrsLiveSource* s, SrsRequest* r) +void SrsHttpStreamServer::http_unmount(SrsRequest* r) { std::string sid = r->get_stream_url(); @@ -1133,20 +1135,20 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle } } } - - SrsLiveSource* s = NULL; - if ((err = _srs_sources->fetch_or_create(r, server, &s)) != srs_success) { + + SrsSharedPtr live_source; + if ((err = _srs_sources->fetch_or_create(r, server, live_source)) != srs_success) { return srs_error_wrap(err, "source create"); } - srs_assert(s != NULL); + srs_assert(live_source.get() != NULL); bool enabled_cache = _srs_config->get_gop_cache(r->vhost); int gcmf = _srs_config->get_gop_cache_max_frames(r->vhost); - s->set_cache(enabled_cache); - s->set_gop_cache_max_frames(gcmf); + live_source->set_cache(enabled_cache); + live_source->set_gop_cache_max_frames(gcmf); // create http streaming handler. - if ((err = http_mount(s, r)) != srs_success) { + if ((err = http_mount(r)) != srs_success) { return srs_error_wrap(err, "http mount"); } @@ -1161,7 +1163,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle // trigger edge to fetch from origin. bool vhost_is_edge = _srs_config->get_vhost_is_edge(r->vhost); srs_trace("flv: source url=%s, is_edge=%d, source_id=%s/%s", - r->get_stream_url().c_str(), vhost_is_edge, s->source_id().c_str(), s->pre_source_id().c_str()); + r->get_stream_url().c_str(), vhost_is_edge, live_source->source_id().c_str(), live_source->pre_source_id().c_str()); return err; } diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index 138eb003c4..b3962d072c 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -23,13 +23,12 @@ class SrsBufferCache : public ISrsCoroutineHandler srs_utime_t fast_cache; private: SrsMessageQueue* queue; - SrsLiveSource* source; SrsRequest* req; SrsCoroutine* trd; public: - SrsBufferCache(SrsLiveSource* s, SrsRequest* r); + SrsBufferCache(SrsRequest* r); virtual ~SrsBufferCache(); - virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r); + virtual srs_error_t update_auth(SrsRequest* r); public: virtual srs_error_t start(); virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter); @@ -178,13 +177,12 @@ class SrsLiveStream : public ISrsHttpHandler { private: SrsRequest* req; - SrsLiveSource* source; SrsBufferCache* cache; SrsSecurity* security_; public: - SrsLiveStream(SrsLiveSource* s, SrsRequest* r, SrsBufferCache* c); + SrsLiveStream(SrsRequest* r, SrsBufferCache* c); virtual ~SrsLiveStream(); - virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r); + virtual srs_error_t update_auth(SrsRequest* r); public: virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); private: @@ -205,8 +203,6 @@ struct SrsLiveEntry public: // We will free the request. SrsRequest* req; - // Shared source. - SrsLiveSource* source; public: // For template, the mount contains variables. // For concrete stream, the mount is url to access. @@ -244,8 +240,8 @@ class SrsHttpStreamServer : public ISrsReloadHandler virtual srs_error_t initialize(); public: // HTTP flv/ts/mp3/aac stream - virtual srs_error_t http_mount(SrsLiveSource* s, SrsRequest* r); - virtual void http_unmount(SrsLiveSource* s, SrsRequest* r); + virtual srs_error_t http_mount(SrsRequest* r); + virtual void http_unmount(SrsRequest* r); // Interface ISrsHttpMatchHijacker public: virtual srs_error_t hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph); diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 5e77245fac..b25fdc9f1b 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -259,13 +259,13 @@ void SrsQueueRecvThread::on_stop() } SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, - int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid) + int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSharedPtr source, SrsContextId parent_cid) : trd(this, rtmp_sdk, tm, parent_cid) { rtmp = rtmp_sdk; _conn = conn; - _source = source; + source_ = source; nn_msgs_for_yield_ = 0; recv_error = srs_success; @@ -370,7 +370,7 @@ srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg) srs_update_system_time(), msg->header.timestamp, msg->size); // the rtmp connection will handle this message - err = _conn->handle_publish_message(_source, msg); + err = _conn->handle_publish_message(source_, msg); // must always free it, // the source will copy it if need to use. diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index fdef672702..5fa011db57 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -16,6 +16,7 @@ #include #include #include +#include class SrsRtmpServer; class SrsCommonMessage; @@ -146,7 +147,7 @@ class SrsPublishRecvThread : public ISrsMessagePumper, public ISrsReloadHandler srs_error_t recv_error; SrsRtmpConn* _conn; // The params for conn callback. - SrsLiveSource* _source; + SrsSharedPtr source_; // The error timeout cond srs_cond_t error; // The merged context id. @@ -154,7 +155,7 @@ class SrsPublishRecvThread : public ISrsMessagePumper, public ISrsReloadHandler SrsContextId ncid; public: SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, - int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid); + int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSharedPtr source, SrsContextId parent_cid); virtual ~SrsPublishRecvThread(); public: // Wait for error for some timeout. diff --git a/trunk/src/app/srs_app_rtc_api.cpp b/trunk/src/app/srs_app_rtc_api.cpp index 393f3869f3..1f2880633f 100644 --- a/trunk/src/app/srs_app_rtc_api.cpp +++ b/trunk/src/app/srs_app_rtc_api.cpp @@ -224,8 +224,8 @@ srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa // For RTMP to RTC, fail if disabled and RTMP is active, see https://github.com/ossrs/srs/issues/2728 if (!is_rtc_stream_active && !_srs_config->get_rtc_from_rtmp(ruc->req_->vhost)) { - SrsLiveSource* rtmp = _srs_sources->fetch(ruc->req_); - if (rtmp && !rtmp->inactive()) { + SrsSharedPtr live_source = _srs_sources->fetch(ruc->req_); + if (live_source.get() && !live_source->inactive()) { return srs_error_new(ERROR_RTC_DISABLED, "Disabled rtmp_to_rtc of %s, see #2728", ruc->req_->vhost.c_str()); } } diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 1ef65a8caf..52d142a4ca 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -646,7 +646,7 @@ srs_error_t SrsRtcPlayStream::cycle() SrsRtcConsumer* consumer = NULL; SrsAutoFree(SrsRtcConsumer, consumer); - if ((err = source->create_consumer(source_, consumer)) != srs_success) { + if ((err = source->create_consumer(consumer)) != srs_success) { return srs_error_wrap(err, "create consumer, source=%s", req_->get_stream_url().c_str()); } @@ -1202,8 +1202,8 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti source_->set_publish_stream(this); // TODO: FIMXE: Check it in SrsRtcConnection::add_publisher? - SrsLiveSource *rtmp = _srs_sources->fetch(r); - if (rtmp && !rtmp->can_publish(false)) { + SrsSharedPtr live_source = _srs_sources->fetch(r); + if (live_source.get() && !live_source->can_publish(false)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp stream %s busy", r->get_stream_url().c_str()); } @@ -1227,16 +1227,16 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti #if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost); if (rtc_to_rtmp) { - if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) { + if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), live_source)) != srs_success) { return srs_error_wrap(err, "create source"); } // Disable GOP cache for RTC2RTMP bridge, to keep the streams in sync, // especially for stream merging. - rtmp->set_cache(false); + live_source->set_cache(false); SrsCompositeBridge* bridge = new SrsCompositeBridge(); - bridge->append(new SrsFrameToRtmpBridge(rtmp)); + bridge->append(new SrsFrameToRtmpBridge(live_source)); if ((err = bridge->initialize(r)) != srs_success) { srs_freep(bridge); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index c8abd37981..7957903d99 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -154,7 +154,7 @@ ISrsRtcSourceChangeCallback::~ISrsRtcSourceChangeCallback() { } -SrsRtcConsumer::SrsRtcConsumer(SrsSharedPtr s) +SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s) { source_ = s; should_update_source_id = false; @@ -486,11 +486,11 @@ void SrsRtcSource::set_bridge(ISrsStreamBridge* bridge) #endif } -srs_error_t SrsRtcSource::create_consumer(SrsSharedPtr source, SrsRtcConsumer*& consumer) +srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer) { srs_error_t err = srs_success; - consumer = new SrsRtcConsumer(source); + consumer = new SrsRtcConsumer(this); consumers.push_back(consumer); // TODO: FIXME: Implements edge cluster. diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index f3b904556b..00328527b0 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -80,7 +80,9 @@ class ISrsRtcSourceChangeCallback class SrsRtcConsumer { private: - SrsSharedPtr source_; + // Because source references to this object, so we should directly use the source ptr. + SrsRtcSource* source_; +private: std::vector queue; // when source id changed, notice all consumers bool should_update_source_id; @@ -92,7 +94,7 @@ class SrsRtcConsumer // The callback for stream change event. ISrsRtcSourceChangeCallback* handler_; public: - SrsRtcConsumer(SrsSharedPtr s); + SrsRtcConsumer(SrsRtcSource* s); virtual ~SrsRtcConsumer(); public: // When source id changed, notice client to print. @@ -215,7 +217,7 @@ class SrsRtcSource : public ISrsFastTimer public: // Create consumer // @param consumer, output the create consumer. - virtual srs_error_t create_consumer(SrsSharedPtr source, SrsRtcConsumer*& consumer); + virtual srs_error_t create_consumer(SrsRtcConsumer*& consumer); // Dumps packets in cache to consumer. // @param ds, whether dumps the sequence header. // @param dm, whether dumps the metadata. diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 58ed3e3caf..fd01a831ef 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -571,19 +571,19 @@ srs_error_t SrsRtmpConn::stream_service_cycle() rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); // find a source to serve. - SrsLiveSource* source = NULL; - if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) { + SrsSharedPtr live_source; + if ((err = _srs_sources->fetch_or_create(req, server, live_source)) != srs_success) { return srs_error_wrap(err, "rtmp: fetch source"); } - srs_assert(source != NULL); + srs_assert(live_source.get() != NULL); bool enabled_cache = _srs_config->get_gop_cache(req->vhost); int gcmf = _srs_config->get_gop_cache_max_frames(req->vhost); srs_trace("source url=%s, ip=%s, cache=%d/%d, is_edge=%d, source_id=%s/%s", - req->get_stream_url().c_str(), ip.c_str(), enabled_cache, gcmf, info->edge, source->source_id().c_str(), - source->pre_source_id().c_str()); - source->set_cache(enabled_cache); - source->set_gop_cache_max_frames(gcmf); + req->get_stream_url().c_str(), ip.c_str(), enabled_cache, gcmf, info->edge, live_source->source_id().c_str(), + live_source->pre_source_id().c_str()); + live_source->set_cache(enabled_cache); + live_source->set_gop_cache_max_frames(gcmf); switch (info->type) { case SrsRtmpConnPlay: { @@ -610,7 +610,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle() span_main_->end(); #endif - err = playing(source); + err = playing(live_source); http_hooks_on_stop(); return err; @@ -627,7 +627,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle() span_main_->end(); #endif - return publishing(source); + return publishing(live_source); } case SrsRtmpConnHaivisionPublish: { if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) { @@ -641,7 +641,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle() span_main_->end(); #endif - return publishing(source); + return publishing(live_source); } case SrsRtmpConnFlashPublish: { if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) { @@ -655,7 +655,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle() span_main_->end(); #endif - return publishing(source); + return publishing(live_source); } default: { return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type); @@ -699,7 +699,7 @@ srs_error_t SrsRtmpConn::check_vhost(bool try_default_vhost) return err; } -srs_error_t SrsRtmpConn::playing(SrsLiveSource* source) +srs_error_t SrsRtmpConn::playing(SrsSharedPtr source) { srs_error_t err = srs_success; @@ -786,7 +786,7 @@ srs_error_t SrsRtmpConn::playing(SrsLiveSource* source) return err; } -srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd) +srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd) { srs_error_t err = srs_success; @@ -923,7 +923,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons return err; } -srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source) +srs_error_t SrsRtmpConn::publishing(SrsSharedPtr source) { srs_error_t err = srs_success; @@ -969,7 +969,7 @@ srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source) return err; } -srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd) +srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr source, SrsPublishRecvThread* rtrd) { srs_error_t err = srs_success; @@ -1073,7 +1073,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre return err; } -srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source) +srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr source) { srs_error_t err = srs_success; @@ -1141,7 +1141,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source) return err; } -void SrsRtmpConn::release_publish(SrsLiveSource* source) +void SrsRtmpConn::release_publish(SrsSharedPtr source) { // when edge, notice edge to change state. // when origin, notice all service to unpublish. @@ -1152,7 +1152,7 @@ void SrsRtmpConn::release_publish(SrsLiveSource* source) } } -srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg) +srs_error_t SrsRtmpConn::handle_publish_message(SrsSharedPtr& source, SrsCommonMessage* msg) { srs_error_t err = srs_success; @@ -1193,7 +1193,7 @@ srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommon return err; } -srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg) +srs_error_t SrsRtmpConn::process_publish_message(SrsSharedPtr& source, SrsCommonMessage* msg) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 9b86f4fab2..8c86627fec 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -16,6 +16,7 @@ #include #include #include +#include class SrsServer; class SrsRtmpServer; @@ -145,14 +146,14 @@ class SrsRtmpConn : public ISrsConnection, public ISrsStartable, public ISrsRelo // The stream(play/publish) service cycle, identify client first. virtual srs_error_t stream_service_cycle(); virtual srs_error_t check_vhost(bool try_default_vhost); - virtual srs_error_t playing(SrsLiveSource* source); - virtual srs_error_t do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd); - virtual srs_error_t publishing(SrsLiveSource* source); - virtual srs_error_t do_publishing(SrsLiveSource* source, SrsPublishRecvThread* trd); - virtual srs_error_t acquire_publish(SrsLiveSource* source); - virtual void release_publish(SrsLiveSource* source); - virtual srs_error_t handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg); - virtual srs_error_t process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg); + virtual srs_error_t playing(SrsSharedPtr source); + virtual srs_error_t do_playing(SrsSharedPtr source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd); + virtual srs_error_t publishing(SrsSharedPtr source); + virtual srs_error_t do_publishing(SrsSharedPtr source, SrsPublishRecvThread* trd); + virtual srs_error_t acquire_publish(SrsSharedPtr source); + virtual void release_publish(SrsSharedPtr source); + virtual srs_error_t handle_publish_message(SrsSharedPtr& source, SrsCommonMessage* msg); + virtual srs_error_t process_publish_message(SrsSharedPtr& source, SrsCommonMessage* msg); virtual srs_error_t process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg); virtual void set_sock_options(); private: diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index af962f38b1..2db8738f88 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1302,28 +1302,28 @@ srs_error_t SrsServer::on_reload_listen() return err; } -srs_error_t SrsServer::on_publish(SrsLiveSource* s, SrsRequest* r) +srs_error_t SrsServer::on_publish(SrsRequest* r) { srs_error_t err = srs_success; - if ((err = http_server->http_mount(s, r)) != srs_success) { + if ((err = http_server->http_mount(r)) != srs_success) { return srs_error_wrap(err, "http mount"); } SrsCoWorkers* coworkers = SrsCoWorkers::instance(); - if ((err = coworkers->on_publish(s, r)) != srs_success) { + if ((err = coworkers->on_publish(r)) != srs_success) { return srs_error_wrap(err, "coworkers"); } return err; } -void SrsServer::on_unpublish(SrsLiveSource* s, SrsRequest* r) +void SrsServer::on_unpublish(SrsRequest* r) { - http_server->http_unmount(s, r); + http_server->http_unmount(r); SrsCoWorkers* coworkers = SrsCoWorkers::instance(); - coworkers->on_unpublish(s, r); + coworkers->on_unpublish(r); } SrsServerAdapter::SrsServerAdapter() diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 795e84f148..07fb52cc44 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -234,8 +234,8 @@ class SrsServer : public ISrsReloadHandler, public ISrsLiveSourceHandler, public virtual srs_error_t on_reload_listen(); // Interface ISrsLiveSourceHandler public: - virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r); - virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r); + virtual srs_error_t on_publish(SrsRequest* r); + virtual void on_unpublish(SrsRequest* r); }; // The SRS server adapter, the master server. diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 7e6368d727..8e84d24aab 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -407,7 +407,7 @@ ISrsWakable::~ISrsWakable() SrsLiveConsumer::SrsLiveConsumer(SrsLiveSource* s) { - source = s; + source_ = s; paused = false; jitter = new SrsRtmpJitter(); queue = new SrsMessageQueue(); @@ -423,7 +423,7 @@ SrsLiveConsumer::SrsLiveConsumer(SrsLiveSource* s) SrsLiveConsumer::~SrsLiveConsumer() { - source->on_consumer_destroy(this); + source_->on_consumer_destroy(this); srs_freep(jitter); srs_freep(queue); @@ -506,7 +506,7 @@ srs_error_t SrsLiveConsumer::dump_packets(SrsMessageArray* msgs, int& count) count = 0; if (should_update_source_id) { - srs_trace("update source_id=%s/%s", source->source_id().c_str(), source->pre_source_id().c_str()); + srs_trace("update source_id=%s/%s", source_->source_id().c_str(), source_->pre_source_id().c_str()); should_update_source_id = false; } @@ -822,7 +822,7 @@ SrsSharedPtrMessage* SrsMixQueue::pop() SrsOriginHub::SrsOriginHub() { - source = NULL; + source_ = NULL; req_ = NULL; is_active = false; @@ -861,12 +861,13 @@ SrsOriginHub::~SrsOriginHub() #endif } -srs_error_t SrsOriginHub::initialize(SrsLiveSource* s, SrsRequest* r) +srs_error_t SrsOriginHub::initialize(SrsSharedPtr s, SrsRequest* r) { srs_error_t err = srs_success; req_ = r; - source = s; + // Because source references to this object, so we should directly use the source ptr. + source_ = s.get(); if ((err = hls->initialize(this, req_)) != srs_success) { return srs_error_wrap(err, "hls initialize"); @@ -936,7 +937,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) srs_error_t err = srs_success; SrsSharedPtrMessage* msg = shared_audio; - SrsRtmpFormat* format = source->format_; + SrsRtmpFormat* format = source_->format_; // Handle the metadata when got sequence header. if (format->is_aac_sequence_header() || format->is_mp3_sequence_header()) { @@ -973,7 +974,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) hls->on_unpublish(); srs_error_reset(err); } else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) { - if (srs_hls_can_continue(srs_error_code(err), source->meta->ash(), msg)) { + if (srs_hls_can_continue(srs_error_code(err), source_->meta->ash(), msg)) { srs_error_reset(err); } else { return srs_error_wrap(err, "hls: audio"); @@ -1022,7 +1023,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se srs_error_t err = srs_success; SrsSharedPtrMessage* msg = shared_video; - SrsRtmpFormat* format = source->format_; + SrsRtmpFormat* format = source_->format_; // cache the sequence header if h264 // donot cache the sequence header to gop_cache, return here. @@ -1066,7 +1067,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se hls->on_unpublish(); srs_error_reset(err); } else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) { - if (srs_hls_can_continue(srs_error_code(err), source->meta->vsh(), msg)) { + if (srs_hls_can_continue(srs_error_code(err), source_->meta->vsh(), msg)) { srs_error_reset(err); } else { return srs_error_wrap(err, "hls: video"); @@ -1177,9 +1178,9 @@ srs_error_t SrsOriginHub::on_forwarder_start(SrsForwarder* forwarder) { srs_error_t err = srs_success; - SrsSharedPtrMessage* cache_metadata = source->meta->data(); - SrsSharedPtrMessage* cache_sh_video = source->meta->vsh(); - SrsSharedPtrMessage* cache_sh_audio = source->meta->ash(); + SrsSharedPtrMessage* cache_metadata = source_->meta->data(); + SrsSharedPtrMessage* cache_sh_video = source_->meta->vsh(); + SrsSharedPtrMessage* cache_sh_audio = source_->meta->ash(); // feed the forwarder the metadata/sequence header, // when reload to enable the forwarder. @@ -1200,9 +1201,9 @@ srs_error_t SrsOriginHub::on_dvr_request_sh() { srs_error_t err = srs_success; - SrsSharedPtrMessage* cache_metadata = source->meta->data(); - SrsSharedPtrMessage* cache_sh_video = source->meta->vsh(); - SrsSharedPtrMessage* cache_sh_audio = source->meta->ash(); + SrsSharedPtrMessage* cache_metadata = source_->meta->data(); + SrsSharedPtrMessage* cache_sh_video = source_->meta->vsh(); + SrsSharedPtrMessage* cache_sh_audio = source_->meta->ash(); // feed the dvr the metadata/sequence header, // when reload to start dvr, dvr will never get the sequence header in stream, @@ -1212,13 +1213,13 @@ srs_error_t SrsOriginHub::on_dvr_request_sh() } if (cache_sh_video) { - if ((err = dvr->on_video(cache_sh_video, source->meta->vsh_format())) != srs_success) { + if ((err = dvr->on_video(cache_sh_video, source_->meta->vsh_format())) != srs_success) { return srs_error_wrap(err, "dvr video"); } } if (cache_sh_audio) { - if ((err = dvr->on_audio(cache_sh_audio, source->meta->ash_format())) != srs_success) { + if ((err = dvr->on_audio(cache_sh_audio, source_->meta->ash_format())) != srs_success) { return srs_error_wrap(err, "dvr audio"); } } @@ -1230,16 +1231,16 @@ srs_error_t SrsOriginHub::on_hls_request_sh() { srs_error_t err = srs_success; - SrsSharedPtrMessage* cache_sh_video = source->meta->vsh(); + SrsSharedPtrMessage* cache_sh_video = source_->meta->vsh(); if (cache_sh_video) { - if ((err = hls->on_video(cache_sh_video, source->meta->vsh_format())) != srs_success) { + if ((err = hls->on_video(cache_sh_video, source_->meta->vsh_format())) != srs_success) { return srs_error_wrap(err, "hls video"); } } - SrsSharedPtrMessage* cache_sh_audio = source->meta->ash(); + SrsSharedPtrMessage* cache_sh_audio = source_->meta->ash(); if (cache_sh_audio) { - if ((err = hls->on_audio(cache_sh_audio, source->meta->ash_format())) != srs_success) { + if ((err = hls->on_audio(cache_sh_audio, source_->meta->ash_format())) != srs_success) { return srs_error_wrap(err, "hls audio"); } } @@ -1295,9 +1296,9 @@ srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost) return srs_error_wrap(err, "dash start publish"); } - SrsRtmpFormat* format = source->format_; + SrsRtmpFormat* format = source_->format_; - SrsSharedPtrMessage* cache_sh_video = source->meta->vsh(); + SrsSharedPtrMessage* cache_sh_video = source_->meta->vsh(); if (cache_sh_video) { if ((err = format->on_video(cache_sh_video)) != srs_success) { return srs_error_wrap(err, "format on_video"); @@ -1307,7 +1308,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost) } } - SrsSharedPtrMessage* cache_sh_audio = source->meta->ash(); + SrsSharedPtrMessage* cache_sh_audio = source_->meta->ash(); if (cache_sh_audio) { if ((err = format->on_audio(cache_sh_audio)) != srs_success) { return srs_error_wrap(err, "format on_audio"); @@ -1759,67 +1760,64 @@ srs_error_t SrsLiveSourceManager::initialize() return setup_ticks(); } -srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsLiveSource** pps) +srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsSharedPtr& pps) { srs_error_t err = srs_success; // Use lock to protect coroutine switch. // @bug https://github.com/ossrs/srs/issues/1230 - // TODO: FIXME: Use smaller lock. + // TODO: FIXME: Use smaller scope lock. SrsLocker(lock); - - SrsLiveSource* source = NULL; - if ((source = fetch(r)) != NULL) { + + string stream_url = r->get_stream_url(); + std::map< std::string, SrsSharedPtr >::iterator it = pool.find(stream_url); + + if (it != pool.end()) { + SrsSharedPtr& source = it->second; + // we always update the request of resource, // for origin auth is on, the token in request maybe invalid, // and we only need to update the token of request, it's simple. source->update_auth(r); - *pps = source; + pps = source; return err; } - - string stream_url = r->get_stream_url(); - string vhost = r->vhost; - - // should always not exists for create a source. - srs_assert (pool.find(stream_url) == pool.end()); + SrsSharedPtr source = new SrsLiveSource(); srs_trace("new live source, stream_url=%s", stream_url.c_str()); - source = new SrsLiveSource(); - if ((err = source->initialize(r, h)) != srs_success) { - err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); - goto failed; + if ((err = source->initialize(source, r, h)) != srs_success) { + return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); } pool[stream_url] = source; - *pps = source; - return err; - -failed: - srs_freep(source); + pps = source; return err; } -SrsLiveSource* SrsLiveSourceManager::fetch(SrsRequest* r) +SrsSharedPtr SrsLiveSourceManager::fetch(SrsRequest* r) { - SrsLiveSource* source = NULL; + // Use lock to protect coroutine switch. + // @bug https://github.com/ossrs/srs/issues/1230 + // TODO: FIXME: Use smaller scope lock. + SrsLocker(lock); string stream_url = r->get_stream_url(); - if (pool.find(stream_url) == pool.end()) { - return NULL; + std::map< std::string, SrsSharedPtr >::iterator it = pool.find(stream_url); + + if (it == pool.end()) { + return SrsSharedPtr(NULL); } - - source = pool[stream_url]; - + + SrsSharedPtr& source = it->second; return source; } void SrsLiveSourceManager::dispose() { - std::map::iterator it; + std::map< std::string, SrsSharedPtr >::iterator it; for (it = pool.begin(); it != pool.end(); ++it) { - SrsLiveSource* source = it->second; + SrsSharedPtr& source = it->second; source->dispose(); } return; @@ -1844,9 +1842,9 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut { srs_error_t err = srs_success; - std::map::iterator it; + std::map< std::string, SrsSharedPtr >::iterator it; for (it = pool.begin(); it != pool.end();) { - SrsLiveSource* source = it->second; + SrsSharedPtr& source = it->second; // Do cycle source to cleanup components, such as hls dispose. if ((err = source->cycle()) != srs_success) { @@ -1856,19 +1854,11 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut // See SrsSrtSource::on_consumer_destroy // TODO: FIXME: support source cleanup. // @see https://github.com/ossrs/srs/issues/713 -#if 0 +#if 1 // When source expired, remove it. if (source->stream_is_dead()) { - int cid = source->source_id(); - if (cid == -1 && source->pre_source_id() > 0) { - cid = source->pre_source_id(); - } - if (cid > 0) { - _srs_context->set_id(cid); - } - srs_trace("cleanup die source, total=%d", (int)pool.size()); - - srs_freep(source); + const SrsContextId& cid = source->source_id(); + srs_trace("cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size()); pool.erase(it++); } else { ++it; @@ -1883,11 +1873,6 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut void SrsLiveSourceManager::destroy() { - std::map::iterator it; - for (it = pool.begin(); it != pool.end(); ++it) { - SrsLiveSource* source = it->second; - srs_freep(source); - } pool.clear(); } @@ -1995,7 +1980,7 @@ bool SrsLiveSource::publisher_is_idle_for(srs_utime_t timeout) return false; } -srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h) +srs_error_t SrsLiveSource::initialize(SrsSharedPtr wrapper, SrsRequest* r, ISrsLiveSourceHandler* h) { srs_error_t err = srs_success; @@ -2013,14 +1998,14 @@ srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h) // Setup the SPS/PPS parsing strategy. format_->try_annexb_first = _srs_config->try_annexb_first(r->vhost); - if ((err = hub->initialize(this, req)) != srs_success) { + if ((err = hub->initialize(wrapper, req)) != srs_success) { return srs_error_wrap(err, "hub"); } - if ((err = play_edge->initialize(this, req)) != srs_success) { + if ((err = play_edge->initialize(wrapper, req)) != srs_success) { return srs_error_wrap(err, "edge(play)"); } - if ((err = publish_edge->initialize(this, req)) != srs_success) { + if ((err = publish_edge->initialize(wrapper, req)) != srs_success) { return srs_error_wrap(err, "edge(publish)"); } @@ -2600,7 +2585,7 @@ srs_error_t SrsLiveSource::on_publish() // notify the handler. srs_assert(handler); - if ((err = handler->on_publish(this, req)) != srs_success) { + if ((err = handler->on_publish(req)) != srs_success) { return srs_error_wrap(err, "handle publish"); } @@ -2652,7 +2637,7 @@ void SrsLiveSource::on_unpublish() SrsStatistic* stat = SrsStatistic::instance(); stat->on_stream_close(req); - handler->on_unpublish(this, req); + handler->on_unpublish(req); if (bridge_) { bridge_->on_unpublish(); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 75a0b6a765..802c2cb202 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -19,6 +19,7 @@ #include #include #include +#include class SrsFormat; class SrsRtmpFormat; @@ -168,9 +169,11 @@ class ISrsWakable // The consumer for SrsLiveSource, that is a play client. class SrsLiveConsumer : public ISrsWakable { +private: + // Because source references to this object, so we should directly use the source ptr. + SrsLiveSource* source_; private: SrsRtmpJitter* jitter; - SrsLiveSource* source; SrsMessageQueue* queue; bool paused; // when source id changed, notice all consumers @@ -288,9 +291,9 @@ class ISrsLiveSourceHandler virtual ~ISrsLiveSourceHandler(); public: // when stream start publish, mount stream. - virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r) = 0; + virtual srs_error_t on_publish(SrsRequest* r) = 0; // when stream stop publish, unmount stream. - virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r) = 0; + virtual void on_unpublish(SrsRequest* r) = 0; }; // The mix queue to correct the timestamp for mix_correct algorithm. @@ -315,7 +318,9 @@ class SrsMixQueue class SrsOriginHub : public ISrsReloadHandler { private: - SrsLiveSource* source; + // Because source references to this object, so we should directly use the source ptr. + SrsLiveSource* source_; +private: SrsRequest* req_; bool is_active; private: @@ -341,7 +346,7 @@ class SrsOriginHub : public ISrsReloadHandler public: // Initialize the hub with source and request. // @param r The request object, managed by source. - virtual srs_error_t initialize(SrsLiveSource* s, SrsRequest* r); + virtual srs_error_t initialize(SrsSharedPtr s, SrsRequest* r); // Dispose the hub, release utilities resource, // For example, delete all HLS pieces. virtual void dispose(); @@ -443,7 +448,7 @@ class SrsLiveSourceManager : public ISrsHourGlass { private: srs_mutex_t lock; - std::map pool; + std::map< std::string, SrsSharedPtr > pool; SrsHourGlass* timer_; public: SrsLiveSourceManager(); @@ -454,10 +459,10 @@ class SrsLiveSourceManager : public ISrsHourGlass // @param r the client request. // @param h the event handler for source. // @param pps the matched source, if success never be NULL. - virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsLiveSource** pps); + virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsSharedPtr& pps); public: // Get the exists source, NULL when not exists. - virtual SrsLiveSource* fetch(SrsRequest* r); + virtual SrsSharedPtr fetch(SrsRequest* r); public: // dispose and cycle all sources. virtual void dispose(); @@ -539,7 +544,7 @@ class SrsLiveSource : public ISrsReloadHandler bool publisher_is_idle_for(srs_utime_t timeout); public: // Initialize the hls with handlers. - virtual srs_error_t initialize(SrsRequest* r, ISrsLiveSourceHandler* h); + virtual srs_error_t initialize(SrsSharedPtr wrapper, SrsRequest* r, ISrsLiveSourceHandler* h); // Bridge to other source, forward packets to it. void set_bridge(ISrsStreamBridge* v); // Interface ISrsReloadHandler diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index a1a4673381..47d3428953 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -368,16 +368,16 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() } // Check rtmp stream is busy. - SrsLiveSource *live_source = _srs_sources->fetch(req_); - if (live_source && !live_source->can_publish(false)) { + SrsSharedPtr live_source = _srs_sources->fetch(req_); + if (live_source.get() && !live_source->can_publish(false)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str()); } - if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) { + if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), live_source)) != srs_success) { return srs_error_wrap(err, "create source"); } - srs_assert(live_source != NULL); + srs_assert(live_source.get() != NULL); bool enabled_cache = _srs_config->get_gop_cache(req_->vhost); int gcmf = _srs_config->get_gop_cache_max_frames(req_->vhost); @@ -489,7 +489,7 @@ srs_error_t SrsMpegtsSrtConn::do_playing() SrsSrtConsumer* consumer = NULL; SrsAutoFree(SrsSrtConsumer, consumer); - if ((err = srt_source_->create_consumer(srt_source_, consumer)) != srs_success) { + if ((err = srt_source_->create_consumer(consumer)) != srs_success) { return srs_error_wrap(err, "create consumer, ts source=%s", req_->get_stream_url().c_str()); } srs_assert(consumer); diff --git a/trunk/src/app/srs_app_srt_source.cpp b/trunk/src/app/srs_app_srt_source.cpp index a0dfa60204..6b65c34773 100644 --- a/trunk/src/app/srs_app_srt_source.cpp +++ b/trunk/src/app/srs_app_srt_source.cpp @@ -152,7 +152,7 @@ void SrsSrtSourceManager::eliminate(SrsRequest* r) SrsSrtSourceManager* _srs_srt_sources = NULL; -SrsSrtConsumer::SrsSrtConsumer(SrsSharedPtr s) +SrsSrtConsumer::SrsSrtConsumer(SrsSrtSource* s) { source_ = s; should_update_source_id = false; @@ -942,11 +942,11 @@ void SrsSrtSource::set_bridge(ISrsStreamBridge* bridge) frame_builder_ = new SrsSrtFrameBuilder(bridge); } -srs_error_t SrsSrtSource::create_consumer(SrsSharedPtr source, SrsSrtConsumer*& consumer) +srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer*& consumer) { srs_error_t err = srs_success; - consumer = new SrsSrtConsumer(source); + consumer = new SrsSrtConsumer(this); consumers.push_back(consumer); return err; diff --git a/trunk/src/app/srs_app_srt_source.hpp b/trunk/src/app/srs_app_srt_source.hpp index 46d5b8d7e2..4bb3a014bb 100644 --- a/trunk/src/app/srs_app_srt_source.hpp +++ b/trunk/src/app/srs_app_srt_source.hpp @@ -70,10 +70,12 @@ extern SrsSrtSourceManager* _srs_srt_sources; class SrsSrtConsumer { public: - SrsSrtConsumer(SrsSharedPtr source); + SrsSrtConsumer(SrsSrtSource* source); virtual ~SrsSrtConsumer(); private: - SrsSharedPtr source_; + // Because source references to this object, so we should directly use the source ptr. + SrsSrtSource* source_; +private: std::vector queue; // when source id changed, notice all consumers bool should_update_source_id; @@ -167,7 +169,7 @@ class SrsSrtSource public: // Create consumer // @param consumer, output the create consumer. - virtual srs_error_t create_consumer(SrsSharedPtr source, SrsSrtConsumer*& consumer); + virtual srs_error_t create_consumer(SrsSrtConsumer*& consumer); // Dumps packets in cache to consumer. virtual srs_error_t consumer_dumps(SrsSrtConsumer* consumer); virtual void on_consumer_destroy(SrsSrtConsumer* consumer); diff --git a/trunk/src/app/srs_app_stream_bridge.cpp b/trunk/src/app/srs_app_stream_bridge.cpp index c024974ad8..d043384159 100644 --- a/trunk/src/app/srs_app_stream_bridge.cpp +++ b/trunk/src/app/srs_app_stream_bridge.cpp @@ -25,9 +25,9 @@ ISrsStreamBridge::~ISrsStreamBridge() { } -SrsFrameToRtmpBridge::SrsFrameToRtmpBridge(SrsLiveSource *src) +SrsFrameToRtmpBridge::SrsFrameToRtmpBridge(SrsSharedPtr source) { - source_ = src; + source_ = source; } SrsFrameToRtmpBridge::~SrsFrameToRtmpBridge() diff --git a/trunk/src/app/srs_app_stream_bridge.hpp b/trunk/src/app/srs_app_stream_bridge.hpp index de9378669d..e78aad96b8 100644 --- a/trunk/src/app/srs_app_stream_bridge.hpp +++ b/trunk/src/app/srs_app_stream_bridge.hpp @@ -42,9 +42,9 @@ class ISrsStreamBridge class SrsFrameToRtmpBridge : public ISrsStreamBridge { private: - SrsLiveSource *source_; + SrsSharedPtr source_; public: - SrsFrameToRtmpBridge(SrsLiveSource *src); + SrsFrameToRtmpBridge(SrsSharedPtr source); virtual ~SrsFrameToRtmpBridge(); public: srs_error_t initialize(SrsRequest* r); diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp index a84aaeca1f..7bc3935c83 100644 --- a/trunk/src/core/srs_core_autofree.hpp +++ b/trunk/src/core/srs_core_autofree.hpp @@ -11,6 +11,9 @@ #include +// The auto free helper, which is actually the unique ptr, without the move feature, +// see https://github.com/ossrs/srs/discussions/3667#discussioncomment-8969107 +// // To free the instance in the current scope, for instance, MyClass* ptr, // which is a ptr and this class will: // 1. free the ptr. @@ -81,7 +84,9 @@ class impl_SrsAutoFree } }; -// Shared ptr smart pointer, see https://github.com/ossrs/srs/discussions/3667#discussioncomment-8969107 +// Shared ptr smart pointer, only support shared ptr, no weak ptr, no shared from this, no inheritance, +// no comparing, see https://github.com/ossrs/srs/discussions/3667#discussioncomment-8969107 +// // Usage: // SrsSharedPtr ptr(new MyClass()); // ptr->do_something(); diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index 30399a8b48..50ebfc6477 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 128 +#define VERSION_REVISION 129 #endif diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 46a94f436a..2b1fa01c1d 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -107,6 +107,7 @@ XX(ERROR_BACKTRACE_ADDR2LINE , 1094, "BacktraceAddr2Line", "Backtrace addr2line failed") \ XX(ERROR_SYSTEM_FILE_NOT_OPEN , 1095, "FileNotOpen", "File is not opened") \ XX(ERROR_SYSTEM_FILE_SETVBUF , 1096, "FileSetVBuf", "Failed to set file vbuf") \ + XX(ERROR_NO_SOURCE , 1097, "NoSource", "No source found") /**************************************************/ /* RTMP protocol error. */ @@ -334,7 +335,7 @@ XX(ERROR_STREAM_CASTER_HEVC_FORMAT , 4057, "CasterTsHevcFormat", "Invalid ts HEVC Format for stream caster") \ XX(ERROR_HTTP_JSONP , 4058, "HttpJsonp", "Invalid callback for JSONP") \ XX(ERROR_HEVC_NALU_UEV , 4059, "HevcNaluUev", "Failed to read UEV for HEVC NALU") \ - XX(ERROR_HEVC_NALU_SEV , 4060, "HevcNaluSev", "Failed to read SEV for HEVC NALU") \ + XX(ERROR_HEVC_NALU_SEV , 4060, "HevcNaluSev", "Failed to read SEV for HEVC NALU") /**************************************************/ diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp index 2344e911ef..155621be0b 100644 --- a/trunk/src/utest/srs_utest_core.cpp +++ b/trunk/src/utest/srs_utest_core.cpp @@ -139,6 +139,17 @@ VOID TEST(CoreLogger, SharedPtrReset) } } +SrsSharedPtr mock_create_from_ptr(SrsSharedPtr p) { + return p; +} + +VOID TEST(CoreLogger, SharedPtrContructor) +{ + int* p = new int(100); + SrsSharedPtr q = mock_create_from_ptr(p); + EXPECT_EQ(100, *q); +} + VOID TEST(CoreLogger, SharedPtrObject) { SrsSharedPtr p(new MyNormalObject(100));