Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevaev committed Mar 29, 2024
1 parent 94b1224 commit caf9ed7
Showing 1 changed file with 39 additions and 71 deletions.
110 changes: 39 additions & 71 deletions src/ustreamer/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,38 +164,22 @@ void us_stream_loop(us_stream_s *stream) {
US_THREAD_CREATE(ctx->tid, _releaser_thread, ctx);
}

_worker_context_s jpeg_ctx = {
.queue = us_queue_init(cap->run->n_bufs),
.stream = stream,
.stop = &threads_stop,
};
US_THREAD_CREATE(jpeg_ctx.tid, _jpeg_thread, &jpeg_ctx);

_worker_context_s h264_ctx;
if (run->h264 != NULL) {
h264_ctx.queue = us_queue_init(cap->run->n_bufs);
h264_ctx.stream = stream;
h264_ctx.stop = &threads_stop;
US_THREAD_CREATE(h264_ctx.tid, _h264_thread, &h264_ctx);
}

_worker_context_s raw_ctx;
if (stream->raw_sink != NULL) {
raw_ctx.queue = us_queue_init(2);
raw_ctx.stream = stream;
raw_ctx.stop = &threads_stop;
US_THREAD_CREATE(raw_ctx.tid, _raw_thread, &raw_ctx);
}

# define CREATE_WORKER(x_cond, x_ctx, x_thread, x_capacity) \
_worker_context_s *x_ctx = NULL; \
if (x_cond) { \
US_CALLOC(x_ctx, 1); \
x_ctx->queue = us_queue_init(x_capacity); \
x_ctx->stream = stream; \
x_ctx->stop = &threads_stop; \
US_THREAD_CREATE(x_ctx->tid, (x_thread), x_ctx); \
}
CREATE_WORKER(true, jpeg_ctx, _jpeg_thread, cap->run->n_bufs);
CREATE_WORKER((run->h264 != NULL), h264_ctx, _h264_thread, cap->run->n_bufs);
CREATE_WORKER((stream->raw_sink != NULL), raw_ctx, _raw_thread, 2);
# ifdef WITH_V4P
_worker_context_s drm_ctx;
if (run->drm != NULL) {
drm_ctx.queue = us_queue_init(cap->run->n_bufs);
drm_ctx.stream = stream;
drm_ctx.stop = &threads_stop;
US_THREAD_CREATE(drm_ctx.tid, _drm_thread, &drm_ctx); // cppcheck-suppress assertWithSideEffect
}
CREATE_WORKER((run->drm != NULL), drm_ctx, _drm_thread, cap->run->n_bufs); // cppcheck-suppress assertWithSideEffect
# endif
# undef CREATE_WORKER

uint captured_fps_accum = 0;
sll captured_fps_ts = 0;
Expand Down Expand Up @@ -226,22 +210,17 @@ void us_stream_loop(us_stream_s *stream) {
us_gpio_set_stream_online(true);
# endif

us_capture_hwbuf_incref(hw); // JPEG
us_queue_put(jpeg_ctx.queue, hw, 0);
if (run->h264 != NULL) {
us_capture_hwbuf_incref(hw); // H264
us_queue_put(h264_ctx.queue, hw, 0);
}
if (stream->raw_sink != NULL) {
us_capture_hwbuf_incref(hw); // RAW
us_queue_put(raw_ctx.queue, hw, 0);
}
# define QUEUE_HW(x_ctx) if (x_ctx != NULL) { \
us_capture_hwbuf_incref(hw); \
us_queue_put(x_ctx->queue, hw, 0); \
}
QUEUE_HW(jpeg_ctx);
QUEUE_HW(h264_ctx);
QUEUE_HW(raw_ctx);
# ifdef WITH_V4P
if (run->drm != NULL) {
us_capture_hwbuf_incref(hw); // DRM
us_queue_put(drm_ctx.queue, hw, 0);
}
QUEUE_HW(drm_ctx);
# endif
# undef QUEUE_HW
us_queue_put(releasers[hw->buf.index].queue, hw, 0); // Plan to release

// Мы не обновляем здесь состояние синков, потому что это происходит внутри обслуживающих их потоков
Expand All @@ -258,25 +237,18 @@ void us_stream_loop(us_stream_s *stream) {
close:
atomic_store(&threads_stop, true);

# define DELETE_WORKER(x_ctx) if (x_ctx != NULL) { \
US_THREAD_JOIN(x_ctx->tid); \
us_queue_destroy(x_ctx->queue); \
free(x_ctx); \
}
# ifdef WITH_V4P
if (run->drm != NULL) {
US_THREAD_JOIN(drm_ctx.tid);
us_queue_destroy(drm_ctx.queue);
}
DELETE_WORKER(drm_ctx);
# endif

if (stream->raw_sink != NULL) {
US_THREAD_JOIN(raw_ctx.tid);
us_queue_destroy(raw_ctx.queue);
}

if (run->h264 != NULL) {
US_THREAD_JOIN(h264_ctx.tid);
us_queue_destroy(h264_ctx.queue);
}

US_THREAD_JOIN(jpeg_ctx.tid);
us_queue_destroy(jpeg_ctx.queue);
DELETE_WORKER(raw_ctx);
DELETE_WORKER(h264_ctx);
DELETE_WORKER(jpeg_ctx);
# undef DELETE_WORKER

for (uint index = 0; index < n_releasers; ++index) {
US_THREAD_JOIN(releasers[index].tid);
Expand Down Expand Up @@ -430,15 +402,12 @@ static void *_h264_thread(void *v_ctx) {
}

if (!us_memsink_server_check(h264->sink, NULL)) {
us_capture_hwbuf_decref(hw);
US_LOG_VERBOSE("H264: Passed encoding because nobody is watching");
continue;
goto decref;
}

if (hw->raw.grab_ts < grab_after_ts) {
us_capture_hwbuf_decref(hw);
US_LOG_VERBOSE("H264: Passed encoding for FPS limit: %u", h264->enc->run->fps_limit);
continue;
goto decref;
}

// Форсим кейфрейм, если от захвата давно не было фреймов
Expand All @@ -454,6 +423,7 @@ static void *_h264_thread(void *v_ctx) {
const ldf frame_interval = (ldf)1 / h264->enc->run->fps_limit;
grab_after_ts = hw->raw.grab_ts + frame_interval - 0.01;

decref:
us_capture_hwbuf_decref(hw);
}
return NULL;
Expand All @@ -469,13 +439,11 @@ static void *_raw_thread(void *v_ctx) {
continue;
}

if (!us_memsink_server_check(ctx->stream->raw_sink, NULL)) {
us_capture_hwbuf_decref(hw);
if (us_memsink_server_check(ctx->stream->raw_sink, NULL)) {
us_memsink_server_put(ctx->stream->raw_sink, &hw->raw, false);
} else {
US_LOG_VERBOSE("RAW: Passed publishing because nobody is watching");
continue;
}

us_memsink_server_put(ctx->stream->raw_sink, &hw->raw, false);
us_capture_hwbuf_decref(hw);
}
return NULL;
Expand Down

0 comments on commit caf9ed7

Please sign in to comment.