diff --git a/pulseaudio_dlna/__main__.py b/pulseaudio_dlna/__main__.py index 9257f8b3..a14e1878 100644 --- a/pulseaudio_dlna/__main__.py +++ b/pulseaudio_dlna/__main__.py @@ -17,7 +17,7 @@ ''' Usage: - pulseaudio-dlna pulseaudio-dlna [--host ] [--port ][--encoder | --codec ] [--bit-rate=] + pulseaudio-dlna [--host ] [--port ][--encoder | --codec ] [--bit-rate=] [--encoder-backend ] [--filter-device=] [--renderer-urls ] diff --git a/pulseaudio_dlna/application.py b/pulseaudio_dlna/application.py index fc2439c3..20290679 100644 --- a/pulseaudio_dlna/application.py +++ b/pulseaudio_dlna/application.py @@ -130,7 +130,9 @@ def run(self, options): if options['--chunk-size']: chunk_size = int(options['--chunk-size']) - pulseaudio_dlna.streamserver.ProcessStream.CHUNK_SIZE = chunk_size + if chunk_size > 0: + pulseaudio_dlna.streamserver.ProcessThread.CHUNK_SIZE = \ + chunk_size if options['--ssdp-ttl']: ssdp_ttl = int(options['--ssdp-ttl']) diff --git a/pulseaudio_dlna/pulseaudio.py b/pulseaudio_dlna/pulseaudio.py index 7d3026f4..31fdb159 100644 --- a/pulseaudio_dlna/pulseaudio.py +++ b/pulseaudio_dlna/pulseaudio.py @@ -648,7 +648,7 @@ def on_device_updated(self, sink_path): logger.info('on_device_updated "{path}"'.format( path=sink_path)) self.update() - self._delayed_handle_sink_update(sink_path) + self._delayed_handle_sink_update(sink_path, 100) def on_fallback_sink_updated(self, sink_path): self.default_sink = PulseSinkFactory.new(self.bus, sink_path) @@ -674,11 +674,11 @@ def on_playback_stream_removed(self, stream_path): self._delayed_handle_sink_update(sink.object_path) return - def _delayed_handle_sink_update(self, sink_path): + def _delayed_handle_sink_update(self, sink_path, delay=3000): if self.signal_timers.get(sink_path, None): GObject.source_remove(self.signal_timers[sink_path]) self.signal_timers[sink_path] = GObject.timeout_add( - 1000, self._handle_sink_update, sink_path) + delay, self._handle_sink_update, sink_path) def _handle_sink_update(self, sink_path): if not self.ASYNC_EXECUTION: diff --git a/pulseaudio_dlna/streamserver.py b/pulseaudio_dlna/streamserver.py index b7e285b8..23669781 100644 --- a/pulseaudio_dlna/streamserver.py +++ b/pulseaudio_dlna/streamserver.py @@ -34,6 +34,8 @@ import pkg_resources import BaseHTTPServer import SocketServer +import Queue +import threading import pulseaudio_dlna.encoders import pulseaudio_dlna.codecs @@ -47,38 +49,96 @@ PROTOCOL_VERSION_V11 = 'HTTP/1.1' -class ProcessStream(object): +class ProcessQueue(Queue.Queue): - CHUNK_SIZE = 1024 * 4 - RUNNING = True + def data(self): + data = self.get() + if not self.empty(): + data = [data] + while not self.empty(): + data.append(self.get()) + data = b''.join(data) + return data - def __init__(self, path, sock, recorder, encoder, bridge): + +class ProcessThread(threading.Thread): + + CHUNK_SIZE = 1024 * 32 + + def __init__(self, path, encoder, recorder, queue, *args, **kwargs): + threading.Thread.__init__(self, *args, **kwargs) self.path = path - self.sock = sock - self.recorder = recorder self.encoder = encoder - self.bridge = bridge - - self.id = hex(id(self)) + self.recorder = recorder self.recorder_process = None self.encoder_process = None + self.queue = queue + self.reinitialize_count = 0 + self.stop_event = threading.Event() GObject.timeout_add( 10000, self._on_regenerate_reinitialize_count) + def _on_regenerate_reinitialize_count(self): + if self.reinitialize_count > 0: + self.reinitialize_count -= 1 + return True + + def stop(self): + self.stop_event.set() + + @property + def is_stopped(self): + return self.stop_event.isSet() + def run(self): - while self.RUNNING: - if not self.do_processes_exist(): - self.create_processes() - logger.info( - 'Processes of {path} initialized ...'.format( - path=self.path)) - if not self.do_processes_respond(): + + def create_processes(): + logger.info('Starting processes "{recorder} | {encoder}"'.format( + recorder=' '.join(self.recorder.command), + encoder=' '.join(self.encoder.command))) + rec_process = subprocess.Popen( + self.recorder.command, + stdout=subprocess.PIPE) + enc_process = subprocess.Popen( + self.encoder.command, + stdin=rec_process.stdout, + stdout=subprocess.PIPE, + bufsize=-1) + rec_process.stdout.close() + return rec_process, enc_process, enc_process.stdout.read + + def do_processes_respond(rec_process, enc_process): + return (rec_process.poll() is None and + enc_process.poll() is None) + + def terminate_processes(processes): + for process in processes: + pid = process.pid + logger.debug('Terminating process {} ...'.format(pid)) + try: + os.kill(pid, signal.SIGTERM) + _pid, return_code = os.waitpid(pid, 0) + except: + try: + os.kill(pid, signal.SIGKILL) + except: + pass + + chunk_size = self.CHUNK_SIZE + queue = self.queue + + rec_process, enc_process, enc_read = create_processes() + logger.info( + 'Processes of {path} initialized ...'.format( + path=self.path)) + while not self.is_stopped: + if not do_processes_respond(rec_process, enc_process): if self.reinitialize_count < 3: self.reinitialize_count += 1 - self.terminate_processes() - self.create_processes() + terminate_processes([rec_process, enc_process]) + rec_process, enc_process, enc_read = create_processes() logger.info( 'Processes of {path} reinitialized ...'.format( path=self.path)) @@ -89,72 +149,64 @@ def run(self): self.reinitialize_count)) break - data = self.encoder_process.stdout.read(self.CHUNK_SIZE) - r, w, e = select.select([self.sock], [self.sock], [], 0) + data = enc_read(chunk_size) + if len(data) > 0: + queue.put(data) + + terminate_processes([rec_process, enc_process]) + queue.put(b'') + + +class ProcessStream(object): + + RUNNING = True + + def __init__(self, path, sock, recorder, encoder, bridge): + self.path = path + self.sock = sock + self.recorder = recorder + self.encoder = encoder + self.bridge = bridge + + self.id = hex(id(self)) + + def run(self): + + queue = ProcessQueue() + process_thread = ProcessThread( + self.path, self.encoder, self.recorder, queue) + process_thread.daemon = True + process_thread.start() - if self.sock in w: + empty_list = [] + select_select = select.select + sock = self.sock + sock_list = [self.sock] + sock_sendall = self.sock.sendall + sock_recv = self.sock.recv + queue_data = queue.data + + while self.RUNNING: + r, w, e = select_select(sock_list, sock_list, empty_list, 0) + + if sock in w: + data = queue_data() + if len(data) == 0: + break try: - self._send_data(self.sock, data) + sock_sendall(data) except socket.error: break - if self.sock in r: + if sock in r: try: - data = self.sock.recv(1024) + data = sock_recv(1024) if len(data) == 0: break except socket.error: break - self.terminate_processes() - - def _send_data(self, sock, data): - bytes_total = len(data) - bytes_sent = 0 - while bytes_sent < bytes_total: - bytes_sent += sock.send(data[bytes_sent:]) - - def _on_regenerate_reinitialize_count(self): - if self.reinitialize_count > 0: - self.reinitialize_count -= 1 - return True - - def do_processes_exist(self): - return (self.encoder_process is not None and - self.recorder_process is not None) - - def do_processes_respond(self): - return (self.recorder_process.poll() is None and - self.encoder_process.poll() is None) - - def terminate_processes(self): - - def _kill_process(process): - pid = process.pid - logger.debug('Terminating process {} ...'.format(pid)) - try: - os.kill(pid, signal.SIGTERM) - _pid, return_code = os.waitpid(pid, 0) - except: - try: - os.kill(pid, signal.SIGKILL) - except: - pass - _kill_process(self.encoder_process) - _kill_process(self.recorder_process) - - def create_processes(self): - logger.info('Starting processes "{recorder} | {encoder}"'.format( - recorder=' '.join(self.recorder.command), - encoder=' '.join(self.encoder.command))) - self.recorder_process = subprocess.Popen( - self.recorder.command, - stdout=subprocess.PIPE) - self.encoder_process = subprocess.Popen( - self.encoder.command, - stdin=self.recorder_process.stdout, - stdout=subprocess.PIPE) - self.recorder_process.stdout.close() + process_thread.stop() def __str__(self): return '<{} id="{}">\n'.format(