Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/delayed stream events #300

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pulseaudio_dlna/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

'''
Usage:
pulseaudio-dlna pulseaudio-dlna [--host <host>] [--port <port>][--encoder <encoders> | --codec <codec>] [--bit-rate=<rate>]
pulseaudio-dlna [--host <host>] [--port <port>][--encoder <encoders> | --codec <codec>] [--bit-rate=<rate>]
[--encoder-backend <encoder-backend>]
[--filter-device=<filter-device>]
[--renderer-urls <urls>]
Expand Down
4 changes: 3 additions & 1 deletion pulseaudio_dlna/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ def run(self, options):

if options['--chunk-size']:
chunk_size = int(options['--chunk-size'])
pulseaudio_dlna.streamserver.ProcessStream.CHUNK_SIZE = chunk_size
if chunk_size > 0:
pulseaudio_dlna.streamserver.ProcessThread.CHUNK_SIZE = \
chunk_size

if options['--ssdp-ttl']:
ssdp_ttl = int(options['--ssdp-ttl'])
Expand Down
6 changes: 3 additions & 3 deletions pulseaudio_dlna/pulseaudio.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ def on_device_updated(self, sink_path):
logger.info('on_device_updated "{path}"'.format(
path=sink_path))
self.update()
self._delayed_handle_sink_update(sink_path)
self._delayed_handle_sink_update(sink_path, 100)

def on_fallback_sink_updated(self, sink_path):
self.default_sink = PulseSinkFactory.new(self.bus, sink_path)
Expand All @@ -674,11 +674,11 @@ def on_playback_stream_removed(self, stream_path):
self._delayed_handle_sink_update(sink.object_path)
return

def _delayed_handle_sink_update(self, sink_path):
def _delayed_handle_sink_update(self, sink_path, delay=3000):
if self.signal_timers.get(sink_path, None):
GObject.source_remove(self.signal_timers[sink_path])
self.signal_timers[sink_path] = GObject.timeout_add(
1000, self._handle_sink_update, sink_path)
delay, self._handle_sink_update, sink_path)

def _handle_sink_update(self, sink_path):
if not self.ASYNC_EXECUTION:
Expand Down
198 changes: 125 additions & 73 deletions pulseaudio_dlna/streamserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import pkg_resources
import BaseHTTPServer
import SocketServer
import Queue
import threading

import pulseaudio_dlna.encoders
import pulseaudio_dlna.codecs
Expand All @@ -47,38 +49,96 @@
PROTOCOL_VERSION_V11 = 'HTTP/1.1'


class ProcessStream(object):
class ProcessQueue(Queue.Queue):

CHUNK_SIZE = 1024 * 4
RUNNING = True
def data(self):
data = self.get()
if not self.empty():
data = [data]
while not self.empty():
data.append(self.get())
data = b''.join(data)
return data

def __init__(self, path, sock, recorder, encoder, bridge):

class ProcessThread(threading.Thread):

CHUNK_SIZE = 1024 * 32

def __init__(self, path, encoder, recorder, queue, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
self.path = path
self.sock = sock
self.recorder = recorder
self.encoder = encoder
self.bridge = bridge

self.id = hex(id(self))
self.recorder = recorder
self.recorder_process = None
self.encoder_process = None
self.queue = queue

self.reinitialize_count = 0
self.stop_event = threading.Event()

GObject.timeout_add(
10000, self._on_regenerate_reinitialize_count)

def _on_regenerate_reinitialize_count(self):
if self.reinitialize_count > 0:
self.reinitialize_count -= 1
return True

def stop(self):
self.stop_event.set()

@property
def is_stopped(self):
return self.stop_event.isSet()

def run(self):
while self.RUNNING:
if not self.do_processes_exist():
self.create_processes()
logger.info(
'Processes of {path} initialized ...'.format(
path=self.path))
if not self.do_processes_respond():

def create_processes():
logger.info('Starting processes "{recorder} | {encoder}"'.format(
recorder=' '.join(self.recorder.command),
encoder=' '.join(self.encoder.command)))
rec_process = subprocess.Popen(
self.recorder.command,
stdout=subprocess.PIPE)
enc_process = subprocess.Popen(
self.encoder.command,
stdin=rec_process.stdout,
stdout=subprocess.PIPE,
bufsize=-1)
rec_process.stdout.close()
return rec_process, enc_process, enc_process.stdout.read

def do_processes_respond(rec_process, enc_process):
return (rec_process.poll() is None and
enc_process.poll() is None)

def terminate_processes(processes):
for process in processes:
pid = process.pid
logger.debug('Terminating process {} ...'.format(pid))
try:
os.kill(pid, signal.SIGTERM)
_pid, return_code = os.waitpid(pid, 0)
except:
try:
os.kill(pid, signal.SIGKILL)
except:
pass

chunk_size = self.CHUNK_SIZE
queue = self.queue

rec_process, enc_process, enc_read = create_processes()
logger.info(
'Processes of {path} initialized ...'.format(
path=self.path))
while not self.is_stopped:
if not do_processes_respond(rec_process, enc_process):
if self.reinitialize_count < 3:
self.reinitialize_count += 1
self.terminate_processes()
self.create_processes()
terminate_processes([rec_process, enc_process])
rec_process, enc_process, enc_read = create_processes()
logger.info(
'Processes of {path} reinitialized ...'.format(
path=self.path))
Expand All @@ -89,72 +149,64 @@ def run(self):
self.reinitialize_count))
break

data = self.encoder_process.stdout.read(self.CHUNK_SIZE)
r, w, e = select.select([self.sock], [self.sock], [], 0)
data = enc_read(chunk_size)
if len(data) > 0:
queue.put(data)

terminate_processes([rec_process, enc_process])
queue.put(b'')


class ProcessStream(object):

RUNNING = True

def __init__(self, path, sock, recorder, encoder, bridge):
self.path = path
self.sock = sock
self.recorder = recorder
self.encoder = encoder
self.bridge = bridge

self.id = hex(id(self))

def run(self):

queue = ProcessQueue()
process_thread = ProcessThread(
self.path, self.encoder, self.recorder, queue)
process_thread.daemon = True
process_thread.start()

if self.sock in w:
empty_list = []
select_select = select.select
sock = self.sock
sock_list = [self.sock]
sock_sendall = self.sock.sendall
sock_recv = self.sock.recv
queue_data = queue.data

while self.RUNNING:
r, w, e = select_select(sock_list, sock_list, empty_list, 0)

if sock in w:
data = queue_data()
if len(data) == 0:
break
try:
self._send_data(self.sock, data)
sock_sendall(data)
except socket.error:
break

if self.sock in r:
if sock in r:
try:
data = self.sock.recv(1024)
data = sock_recv(1024)
if len(data) == 0:
break
except socket.error:
break
self.terminate_processes()

def _send_data(self, sock, data):
bytes_total = len(data)
bytes_sent = 0
while bytes_sent < bytes_total:
bytes_sent += sock.send(data[bytes_sent:])

def _on_regenerate_reinitialize_count(self):
if self.reinitialize_count > 0:
self.reinitialize_count -= 1
return True

def do_processes_exist(self):
return (self.encoder_process is not None and
self.recorder_process is not None)

def do_processes_respond(self):
return (self.recorder_process.poll() is None and
self.encoder_process.poll() is None)

def terminate_processes(self):

def _kill_process(process):
pid = process.pid
logger.debug('Terminating process {} ...'.format(pid))
try:
os.kill(pid, signal.SIGTERM)
_pid, return_code = os.waitpid(pid, 0)
except:
try:
os.kill(pid, signal.SIGKILL)
except:
pass

_kill_process(self.encoder_process)
_kill_process(self.recorder_process)

def create_processes(self):
logger.info('Starting processes "{recorder} | {encoder}"'.format(
recorder=' '.join(self.recorder.command),
encoder=' '.join(self.encoder.command)))
self.recorder_process = subprocess.Popen(
self.recorder.command,
stdout=subprocess.PIPE)
self.encoder_process = subprocess.Popen(
self.encoder.command,
stdin=self.recorder_process.stdout,
stdout=subprocess.PIPE)
self.recorder_process.stdout.close()
process_thread.stop()

def __str__(self):
return '<{} id="{}">\n'.format(
Expand Down