diff --git a/kazoo/client.py b/kazoo/client.py index 27b7c384..57f338a1 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -120,6 +120,7 @@ def __init__( ca=None, use_ssl=False, verify_certs=True, + concurrent_request_limit=0, **kwargs, ): """Create a :class:`KazooClient` instance. All time arguments @@ -241,6 +242,18 @@ def __init__( self.keyfile = keyfile self.keyfile_password = keyfile_password self.ca = ca + if concurrent_request_limit > 0: + self.logger.info( + "Zookeeper client rate-limited to %d concurrent requests", + concurrent_request_limit, + ) + self.rate_limiting_sem = self.handler.semaphore_impl( + concurrent_request_limit + ) + + else: + self.rate_limiting_sem = None + # Curator like simplified state tracking, and listeners for # state transitions self._state = KeeperState.CLOSED @@ -635,6 +648,16 @@ def _call(self, request, async_object): async_object.set_exception(SessionExpiredError()) return False + if self.rate_limiting_sem: + if not self.rate_limiting_sem.acquire(blocking=False): + self.logger.info( + "Limiting concurrent requests. Waiting for completion." + ) + # Actually block on the sempahore here + self.rate_limiting_sem.acquire(blocking=True) + # Register the release of the semaphore on async request completion + async_object.rawlink(lambda _res: self.rate_limiting_sem.release()) + self._queue.append((request, async_object)) # wake the connection, guarding against a race with close() diff --git a/kazoo/handlers/eventlet.py b/kazoo/handlers/eventlet.py index 8869cc57..cb08d05d 100644 --- a/kazoo/handlers/eventlet.py +++ b/kazoo/handlers/eventlet.py @@ -11,6 +11,7 @@ from eventlet.green import threading as green_threading from eventlet.green import selectors as green_selectors from eventlet import queue as green_queue +from eventlet import semaphore as green_semaphore from kazoo.handlers import utils from kazoo.handlers.utils import selector_select @@ -80,6 +81,7 @@ class SequentialEventletHandler(object): name = "sequential_eventlet_handler" queue_impl = green_queue.LightQueue queue_empty = green_queue.Empty + semaphore_impl = green_semaphore.BoundedSemaphore def __init__(self): """Create a :class:`SequentialEventletHandler` instance""" diff --git a/kazoo/handlers/gevent.py b/kazoo/handlers/gevent.py index f36389aa..2bfde4cf 100644 --- a/kazoo/handlers/gevent.py +++ b/kazoo/handlers/gevent.py @@ -1,4 +1,5 @@ """A gevent based handler.""" + from __future__ import absolute_import import atexit @@ -14,7 +15,10 @@ from kazoo.handlers.utils import selector_select -from gevent.lock import Semaphore, RLock +from gevent.lock import ( + BoundedSemaphore as Semaphore, + RLock as RLock, +) from kazoo.handlers import utils @@ -52,6 +56,7 @@ class SequentialGeventHandler(object): queue_impl = gevent.queue.Queue queue_empty = gevent.queue.Empty sleep_func = staticmethod(gevent.sleep) + semaphore_impl = Semaphore def __init__(self): """Create a :class:`SequentialGeventHandler` instance""" diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py index b9acd875..1f3dff12 100644 --- a/kazoo/handlers/threading.py +++ b/kazoo/handlers/threading.py @@ -10,6 +10,7 @@ :class:`~kazoo.handlers.gevent.SequentialGeventHandler` instead. """ + from __future__ import absolute_import import atexit @@ -95,6 +96,7 @@ class SequentialThreadingHandler(object): sleep_func = staticmethod(time.sleep) queue_impl = queue.Queue queue_empty = queue.Empty + semaphore_impl = threading.BoundedSemaphore def __init__(self): """Create a :class:`SequentialThreadingHandler` instance"""