Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions tests/netizen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb):
self.sock.close()
self.close()

async def __aenter__(self):
if self.loop is None:
Expand Down Expand Up @@ -112,7 +112,7 @@ async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
self.sock.close()
self.close()

def recv(self, n):
if self.sock.gettimeout() == 0:
Expand All @@ -123,7 +123,7 @@ def recv(self, n):
while len(buf) < n:
data = self.sock.recv(n - len(buf))

if data == b'':
if not data:
break

buf.extend(data)
Expand All @@ -136,6 +136,9 @@ def sendall(self, data):

return self.sock.sendall(data)

def close(self):
self.sock.close()


class HTTPClient(Client):
def __init__(self, host, port=None, **kwargs):
Expand Down
12 changes: 6 additions & 6 deletions tests/netizen/http_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def __next__(self):
1 if self._buf.endswith((b'\r', b'\n')) else 4
)

if data == b'':
if not data:
raise StopIteration

self._handle_response(data)
Expand All @@ -237,7 +237,7 @@ def __next__(self):

data = self.client.sock.recv(16384)

if data == b'':
if not data:
raise StopIteration

self._buf.extend(data)
Expand All @@ -254,7 +254,7 @@ def __next__(self):
16384 if self.content_length <= -2 else self.content_length
)

if data == b'':
if not data:
raise StopIteration

self.content_length -= len(data)
Expand All @@ -270,7 +270,7 @@ async def __anext__(self):
1 if self._buf.endswith((b'\r', b'\n')) else 4
)

if data == b'':
if not data:
raise StopAsyncIteration

self._handle_response(data)
Expand All @@ -294,7 +294,7 @@ async def __anext__(self):
data = await self.client.loop.sock_recv(self.client.sock,
16384)

if data == b'':
if not data:
raise StopAsyncIteration

self._buf.extend(data)
Expand All @@ -312,7 +312,7 @@ async def __anext__(self):
16384 if self.content_length <= -2 else self.content_length
)

if data == b'':
if not data:
raise StopAsyncIteration

self.content_length -= len(data)
Expand Down
24 changes: 20 additions & 4 deletions tremolo/lib/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@ def __init__(self, context, loop=None, **kwargs):
self.context = context
self.queue = getattr(queue, 'SimpleQueue', queue.Queue)()
self.loop = loop or asyncio.get_event_loop()

self._startup = self.loop.create_future()
self._shutdown = None

def start(self):
super().start()
return self._startup

def run(self):
self._startup.set_result(None)

while self.loop.is_running():
try:
self.loop.call_soon_threadsafe( # set the last active thread
Expand Down Expand Up @@ -104,16 +112,21 @@ def __init__(self, size=5):
self.thread = None # points to the last active thread
self.counter = 1

def start(self, prefix='MultiThreadExecutor', **kwargs):
def start(self, prefix='MultiThreadExecutor', loop=None, **kwargs):
aws = []

while len(self.threads) < self.size:
self.thread = ThreadExecutor(
self, name=f'{prefix}-{self.counter}', **kwargs
self, name=f'{prefix}-{self.counter}', loop=loop, **kwargs
)
self.thread.start()
self.threads.append(self.thread)
aws.append(self.thread._startup)

self.counter += 1

return asyncio.gather(*aws)

def submit(self, func, args=(), kwargs={}, name=None):
if self.size == 0 or len(self.threads) < self.size:
raise RuntimeError('no threads are running or not ready')
Expand All @@ -130,8 +143,11 @@ def submit(self, func, args=(), kwargs={}, name=None):
self.start()
raise

async def shutdown(self):
def shutdown(self):
self.size = 0
aws = []

while self.threads:
await self.threads.pop().shutdown()
aws.append(self.threads.pop().shutdown())

return asyncio.gather(*aws)
2 changes: 1 addition & 1 deletion tremolo/tremolo.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ async def serve(self, host, port, *, sock=None, backlog=100, **kwargs):
ssl_context = None

executor = MultiThreadExecutor(size=options.get('thread_pool_size', 5))
executor.start()
await executor.start(loop=self.loop)

if '_locks' in options:
lock = ServerLock(options['_locks'], executor, loop=self.loop)
Expand Down