From f44b44b751bf968047611d95f0dcceeca9e94938 Mon Sep 17 00:00:00 2001 From: Massimo Mund Date: Sat, 8 Apr 2017 20:07:40 +0200 Subject: [PATCH 1/5] Implement a separate thread for the encoder and recorder process --- pulseaudio_dlna/application.py | 4 +- pulseaudio_dlna/streamserver.py | 176 ++++++++++++++++++++------------ 2 files changed, 115 insertions(+), 65 deletions(-) diff --git a/pulseaudio_dlna/application.py b/pulseaudio_dlna/application.py index fc2439c3..20290679 100644 --- a/pulseaudio_dlna/application.py +++ b/pulseaudio_dlna/application.py @@ -130,7 +130,9 @@ def run(self, options): if options['--chunk-size']: chunk_size = int(options['--chunk-size']) - pulseaudio_dlna.streamserver.ProcessStream.CHUNK_SIZE = chunk_size + if chunk_size > 0: + pulseaudio_dlna.streamserver.ProcessThread.CHUNK_SIZE = \ + chunk_size if options['--ssdp-ttl']: ssdp_ttl = int(options['--ssdp-ttl']) diff --git a/pulseaudio_dlna/streamserver.py b/pulseaudio_dlna/streamserver.py index b7e285b8..441fa972 100644 --- a/pulseaudio_dlna/streamserver.py +++ b/pulseaudio_dlna/streamserver.py @@ -34,6 +34,8 @@ import pkg_resources import BaseHTTPServer import SocketServer +import Queue +import threading import pulseaudio_dlna.encoders import pulseaudio_dlna.codecs @@ -47,33 +49,88 @@ PROTOCOL_VERSION_V11 = 'HTTP/1.1' -class ProcessStream(object): +class ProcessQueue(Queue.Queue): + + def data(self): + data = self.get() + if not self.empty(): + data = [data] + while not self.empty(): + data.append(self.get()) + data = b''.join(data) + return data + + +class ProcessThread(threading.Thread): CHUNK_SIZE = 1024 * 4 - RUNNING = True - def __init__(self, path, sock, recorder, encoder, bridge): + def __init__(self, path, encoder, recorder, queue, *args, **kwargs): + threading.Thread.__init__(self, *args, **kwargs) self.path = path - self.sock = sock - self.recorder = recorder self.encoder = encoder - self.bridge = bridge - - self.id = hex(id(self)) + self.recorder = recorder self.recorder_process = None self.encoder_process = None + self.queue = queue + self.reinitialize_count = 0 + self.stop_event = threading.Event() GObject.timeout_add( 10000, self._on_regenerate_reinitialize_count) + def _on_regenerate_reinitialize_count(self): + if self.reinitialize_count > 0: + self.reinitialize_count -= 1 + return True + + def create_processes(self): + logger.info('Starting processes "{recorder} | {encoder}"'.format( + recorder=' '.join(self.recorder.command), + encoder=' '.join(self.encoder.command))) + self.recorder_process = subprocess.Popen( + self.recorder.command, + stdout=subprocess.PIPE) + self.encoder_process = subprocess.Popen( + self.encoder.command, + stdin=self.recorder_process.stdout, + stdout=subprocess.PIPE, + bufsize=-1) + self.recorder_process.stdout.close() + + def do_processes_exist(self): + return (self.encoder_process is not None and + self.recorder_process is not None) + + def do_processes_respond(self): + return (self.recorder_process.poll() is None and + self.encoder_process.poll() is None) + + def terminate_processes(self): + + def _kill_process(process): + pid = process.pid + logger.debug('Terminating process {} ...'.format(pid)) + try: + os.kill(pid, signal.SIGTERM) + _pid, return_code = os.waitpid(pid, 0) + except: + try: + os.kill(pid, signal.SIGKILL) + except: + pass + + _kill_process(self.encoder_process) + _kill_process(self.recorder_process) + def run(self): - while self.RUNNING: - if not self.do_processes_exist(): - self.create_processes() - logger.info( - 'Processes of {path} initialized ...'.format( - path=self.path)) + + self.create_processes() + logger.info( + 'Processes of {path} initialized ...'.format( + path=self.path)) + while not self.is_stopped: if not self.do_processes_respond(): if self.reinitialize_count < 3: self.reinitialize_count += 1 @@ -90,11 +147,50 @@ def run(self): break data = self.encoder_process.stdout.read(self.CHUNK_SIZE) + if len(data) > 0: + self.queue.put(data) + + self.terminate_processes() + self.queue.put(b'') + + def stop(self): + self.stop_event.set() + + @property + def is_stopped(self): + return self.stop_event.isSet() + + +class ProcessStream(object): + + RUNNING = True + + def __init__(self, path, sock, recorder, encoder, bridge): + self.path = path + self.sock = sock + self.recorder = recorder + self.encoder = encoder + self.bridge = bridge + + self.id = hex(id(self)) + + def run(self): + + queue = ProcessQueue() + process_thread = ProcessThread( + self.path, self.encoder, self.recorder, queue) + process_thread.daemon = True + process_thread.start() + + while self.RUNNING: r, w, e = select.select([self.sock], [self.sock], [], 0) if self.sock in w: + data = queue.data() + if len(data) == 0: + break try: - self._send_data(self.sock, data) + self.sock.sendall(data) except socket.error: break @@ -105,56 +201,8 @@ def run(self): break except socket.error: break - self.terminate_processes() - - def _send_data(self, sock, data): - bytes_total = len(data) - bytes_sent = 0 - while bytes_sent < bytes_total: - bytes_sent += sock.send(data[bytes_sent:]) - - def _on_regenerate_reinitialize_count(self): - if self.reinitialize_count > 0: - self.reinitialize_count -= 1 - return True - - def do_processes_exist(self): - return (self.encoder_process is not None and - self.recorder_process is not None) - def do_processes_respond(self): - return (self.recorder_process.poll() is None and - self.encoder_process.poll() is None) - - def terminate_processes(self): - - def _kill_process(process): - pid = process.pid - logger.debug('Terminating process {} ...'.format(pid)) - try: - os.kill(pid, signal.SIGTERM) - _pid, return_code = os.waitpid(pid, 0) - except: - try: - os.kill(pid, signal.SIGKILL) - except: - pass - - _kill_process(self.encoder_process) - _kill_process(self.recorder_process) - - def create_processes(self): - logger.info('Starting processes "{recorder} | {encoder}"'.format( - recorder=' '.join(self.recorder.command), - encoder=' '.join(self.encoder.command))) - self.recorder_process = subprocess.Popen( - self.recorder.command, - stdout=subprocess.PIPE) - self.encoder_process = subprocess.Popen( - self.encoder.command, - stdin=self.recorder_process.stdout, - stdout=subprocess.PIPE) - self.recorder_process.stdout.close() + process_thread.stop() def __str__(self): return '<{} id="{}">\n'.format( From 09269032c3976a84c2fa63f5c5eed4d24ced5820 Mon Sep 17 00:00:00 2001 From: Massimo Mund Date: Mon, 10 Apr 2017 00:17:34 +0200 Subject: [PATCH 2/5] Optimize the streaming code --- pulseaudio_dlna/streamserver.py | 120 +++++++++++++++++--------------- 1 file changed, 62 insertions(+), 58 deletions(-) diff --git a/pulseaudio_dlna/streamserver.py b/pulseaudio_dlna/streamserver.py index 441fa972..d3ae1484 100644 --- a/pulseaudio_dlna/streamserver.py +++ b/pulseaudio_dlna/streamserver.py @@ -85,57 +85,60 @@ def _on_regenerate_reinitialize_count(self): self.reinitialize_count -= 1 return True - def create_processes(self): - logger.info('Starting processes "{recorder} | {encoder}"'.format( - recorder=' '.join(self.recorder.command), - encoder=' '.join(self.encoder.command))) - self.recorder_process = subprocess.Popen( - self.recorder.command, - stdout=subprocess.PIPE) - self.encoder_process = subprocess.Popen( - self.encoder.command, - stdin=self.recorder_process.stdout, - stdout=subprocess.PIPE, - bufsize=-1) - self.recorder_process.stdout.close() - - def do_processes_exist(self): - return (self.encoder_process is not None and - self.recorder_process is not None) - - def do_processes_respond(self): - return (self.recorder_process.poll() is None and - self.encoder_process.poll() is None) - - def terminate_processes(self): - - def _kill_process(process): - pid = process.pid - logger.debug('Terminating process {} ...'.format(pid)) - try: - os.kill(pid, signal.SIGTERM) - _pid, return_code = os.waitpid(pid, 0) - except: - try: - os.kill(pid, signal.SIGKILL) - except: - pass + def stop(self): + self.stop_event.set() - _kill_process(self.encoder_process) - _kill_process(self.recorder_process) + @property + def is_stopped(self): + return self.stop_event.isSet() def run(self): - self.create_processes() + def create_processes(): + logger.info('Starting processes "{recorder} | {encoder}"'.format( + recorder=' '.join(self.recorder.command), + encoder=' '.join(self.encoder.command))) + rec_process = subprocess.Popen( + self.recorder.command, + stdout=subprocess.PIPE) + enc_process = subprocess.Popen( + self.encoder.command, + stdin=rec_process.stdout, + stdout=subprocess.PIPE, + bufsize=-1) + rec_process.stdout.close() + return rec_process, enc_process, enc_process.stdout.read + + def do_processes_respond(rec_process, enc_process): + return (rec_process.poll() is None and + enc_process.poll() is None) + + def terminate_processes(processes): + for process in processes: + pid = process.pid + logger.debug('Terminating process {} ...'.format(pid)) + try: + os.kill(pid, signal.SIGTERM) + _pid, return_code = os.waitpid(pid, 0) + except: + try: + os.kill(pid, signal.SIGKILL) + except: + pass + + chunk_size = self.CHUNK_SIZE + queue = self.queue + + rec_process, enc_process, enc_read = create_processes() logger.info( 'Processes of {path} initialized ...'.format( path=self.path)) while not self.is_stopped: - if not self.do_processes_respond(): + if not do_processes_respond(rec_process, enc_process): if self.reinitialize_count < 3: self.reinitialize_count += 1 - self.terminate_processes() - self.create_processes() + terminate_processes([rec_process, enc_process]) + rec_process, enc_process, enc_read = create_processes() logger.info( 'Processes of {path} reinitialized ...'.format( path=self.path)) @@ -146,19 +149,12 @@ def run(self): self.reinitialize_count)) break - data = self.encoder_process.stdout.read(self.CHUNK_SIZE) + data = enc_read(chunk_size) if len(data) > 0: - self.queue.put(data) - - self.terminate_processes() - self.queue.put(b'') - - def stop(self): - self.stop_event.set() + queue.put(data) - @property - def is_stopped(self): - return self.stop_event.isSet() + terminate_processes([rec_process, enc_process]) + queue.put(b'') class ProcessStream(object): @@ -182,21 +178,29 @@ def run(self): process_thread.daemon = True process_thread.start() + empty_list = [] + select_select = select.select + sock = self.sock + sock_list = [self.sock] + sock_sendall = self.sock.sendall + sock_recv = self.sock.recv + queue_data = queue.data + while self.RUNNING: - r, w, e = select.select([self.sock], [self.sock], [], 0) + r, w, e = select_select(sock_list, sock_list, empty_list, 0) - if self.sock in w: - data = queue.data() + if sock in w: + data = queue_data() if len(data) == 0: break try: - self.sock.sendall(data) + sock_sendall(data) except socket.error: break - if self.sock in r: + if sock in r: try: - data = self.sock.recv(1024) + data = sock_recv(1024) if len(data) == 0: break except socket.error: From 893992a605d78af8af8e40831a581dce3e2cfdcc Mon Sep 17 00:00:00 2001 From: Massimo Mund Date: Mon, 10 Apr 2017 00:26:33 +0200 Subject: [PATCH 3/5] Set the default chunk size to 32kb - When using 4kb as chunk size you really need a *stable* network. The large amount of smaller packets can trigger potentially more TCP retransmission and TCP reconnects when using Wifi. The downside of 32kb is an extra delay of ~1.3sec in contrast to a chunk size of 4kb. --- pulseaudio_dlna/streamserver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulseaudio_dlna/streamserver.py b/pulseaudio_dlna/streamserver.py index d3ae1484..23669781 100644 --- a/pulseaudio_dlna/streamserver.py +++ b/pulseaudio_dlna/streamserver.py @@ -63,7 +63,7 @@ def data(self): class ProcessThread(threading.Thread): - CHUNK_SIZE = 1024 * 4 + CHUNK_SIZE = 1024 * 32 def __init__(self, path, encoder, recorder, queue, *args, **kwargs): threading.Thread.__init__(self, *args, **kwargs) From 44cad397117daca0b5d4e54475c9112a4816fde7 Mon Sep 17 00:00:00 2001 From: Massimo Mund Date: Tue, 11 Apr 2017 10:13:34 +0200 Subject: [PATCH 4/5] Just delay the sink update if there was actually a stream changed --- pulseaudio_dlna/pulseaudio.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulseaudio_dlna/pulseaudio.py b/pulseaudio_dlna/pulseaudio.py index 7d3026f4..31fdb159 100644 --- a/pulseaudio_dlna/pulseaudio.py +++ b/pulseaudio_dlna/pulseaudio.py @@ -648,7 +648,7 @@ def on_device_updated(self, sink_path): logger.info('on_device_updated "{path}"'.format( path=sink_path)) self.update() - self._delayed_handle_sink_update(sink_path) + self._delayed_handle_sink_update(sink_path, 100) def on_fallback_sink_updated(self, sink_path): self.default_sink = PulseSinkFactory.new(self.bus, sink_path) @@ -674,11 +674,11 @@ def on_playback_stream_removed(self, stream_path): self._delayed_handle_sink_update(sink.object_path) return - def _delayed_handle_sink_update(self, sink_path): + def _delayed_handle_sink_update(self, sink_path, delay=3000): if self.signal_timers.get(sink_path, None): GObject.source_remove(self.signal_timers[sink_path]) self.signal_timers[sink_path] = GObject.timeout_add( - 1000, self._handle_sink_update, sink_path) + delay, self._handle_sink_update, sink_path) def _handle_sink_update(self, sink_path): if not self.ASYNC_EXECUTION: From ff48fdc03b600add77d491b007e1e31513962cae Mon Sep 17 00:00:00 2001 From: Massimo Mund Date: Tue, 11 Apr 2017 10:44:10 +0200 Subject: [PATCH 5/5] Fix the main doc string --- pulseaudio_dlna/__main__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulseaudio_dlna/__main__.py b/pulseaudio_dlna/__main__.py index 9257f8b3..a14e1878 100644 --- a/pulseaudio_dlna/__main__.py +++ b/pulseaudio_dlna/__main__.py @@ -17,7 +17,7 @@ ''' Usage: - pulseaudio-dlna pulseaudio-dlna [--host ] [--port ][--encoder | --codec ] [--bit-rate=] + pulseaudio-dlna [--host ] [--port ][--encoder | --codec ] [--bit-rate=] [--encoder-backend ] [--filter-device=] [--renderer-urls ]