diff --git a/Lib/concurrent/interpreters/_queues.py b/Lib/concurrent/interpreters/_queues.py index ee159d7de63827..0272fcd66dbac9 100644 --- a/Lib/concurrent/interpreters/_queues.py +++ b/Lib/concurrent/interpreters/_queues.py @@ -172,7 +172,6 @@ def qsize(self): def put(self, obj, block=True, timeout=None, *, unbounditems=None, - _delay=10 / 1000, # 10 milliseconds ): """Add the object to the queue. @@ -226,7 +225,6 @@ def put(self, obj, block=True, timeout=None, *, except QueueFull: if timeout is not None and time.time() >= end: raise # re-raise - time.sleep(_delay) else: break @@ -235,11 +233,9 @@ def put_nowait(self, obj, *, unbounditems=None): unboundop = -1 else: unboundop, = _serialize_unbound(unbounditems) - _queues.put(self._id, obj, unboundop) + _queues.put(self._id, obj, unboundop, False) - def get(self, block=True, timeout=None, *, - _delay=10 / 1000, # 10 milliseconds - ): + def get(self, block=True, timeout=None): """Return the next object from the queue. If "block" is true, this blocks while the queue is empty. @@ -261,7 +257,6 @@ def get(self, block=True, timeout=None, *, except QueueEmpty: if timeout is not None and time.time() >= end: raise # re-raise - time.sleep(_delay) else: break if unboundop is not None: @@ -276,7 +271,7 @@ def get_nowait(self): is the same as get(). """ try: - obj, unboundop = _queues.get(self._id) + obj, unboundop = _queues.get(self._id, False) except QueueEmpty: raise # re-raise if unboundop is not None: diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index 417c5fbcee2645..1197a6cb1ba0a9 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -7,6 +7,7 @@ #include "Python.h" #include "pycore_crossinterp.h" // _PyXIData_t +#include "pycore_lock.h" // PyEvent #define REGISTERS_HEAP_TYPES #define HAS_FALLBACK @@ -532,6 +533,8 @@ typedef struct _queue { xidata_fallback_t fallback; int unboundop; } defaults; + PyEvent space_available; + PyEvent has_item; } _queue; static int @@ -549,6 +552,8 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, struct _queuedefaults defaults) .maxsize = maxsize, }, .defaults = defaults, + .space_available = (PyEvent){1}, + .has_item = (PyEvent){0} }; return 0; } @@ -642,6 +647,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop) } if (queue->items.count >= maxsize) { _queue_unlock(queue); + queue->space_available = (PyEvent){0}; return ERR_QUEUE_FULL; } @@ -661,6 +667,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop) queue->items.last = item; _queue_unlock(queue); + queue->has_item = (PyEvent){1}; return 0; } @@ -676,6 +683,7 @@ _queue_next(_queue *queue, _PyXIData_t **p_data, int *p_unboundop) _queueitem *item = queue->items.first; if (item == NULL) { _queue_unlock(queue); + queue->has_item = (PyEvent){0}; return ERR_QUEUE_EMPTY; } queue->items.first = item->next; @@ -1124,7 +1132,7 @@ queue_destroy(_queues *queues, int64_t qid) // Push an object onto the queue. static int queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop, - xidata_fallback_t fallback) + xidata_fallback_t fallback, int block) { PyThreadState *tstate = PyThreadState_Get(); @@ -1136,6 +1144,11 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop, } assert(queue != NULL); + // Wait for the queue to have space + if (block == 1) { + PyEvent_Wait(&queue->space_available); + } + // Convert the object to cross-interpreter data. _PyXIData_t *xidata = _PyXIData_New(); if (xidata == NULL) { @@ -1168,7 +1181,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop, // XXX Support a "wait" mutex? static int queue_get(_queues *queues, int64_t qid, - PyObject **res, int *p_unboundop) + PyObject **res, int *p_unboundop, int block) { int err; *res = NULL; @@ -1182,6 +1195,11 @@ queue_get(_queues *queues, int64_t qid, // Past this point we are responsible for releasing the mutex. assert(queue != NULL); + // Wait for the queue to have some value + if (block == 1) { + PyEvent_Wait(&queue->has_item); + } + // Pop off the next item from the queue. _PyXIData_t *data = NULL; err = _queue_next(queue, &data, p_unboundop); @@ -1613,13 +1631,14 @@ _interpqueues.put obj: object unboundop as unboundarg: int = -1 fallback as fallbackarg: int = -1 + block: bool = True Add the object's data to the queue. [clinic start generated code]*/ static PyObject * _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj, - int unboundarg, int fallbackarg) + int unboundarg, int fallbackarg, int block) /*[clinic end generated code: output=2e0b31c6eaec29c9 input=4906550ab5c73be3]*/ { struct _queuedefaults defaults = {-1, -1}; @@ -1639,7 +1658,7 @@ _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj, } /* Queue up the object. */ - int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback); + int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback, block); // This is the only place that raises QueueFull. if (handle_queue_error(err, module, qid)) { return NULL; @@ -1651,6 +1670,7 @@ _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj, /*[clinic input] _interpqueues.get qid: qidarg + block: bool = True Return the (object, unbound op) from the front of the queue. @@ -1658,12 +1678,12 @@ If there is nothing to receive then raise QueueEmpty. [clinic start generated code]*/ static PyObject * -_interpqueues_get_impl(PyObject *module, int64_t qid) +_interpqueues_get_impl(PyObject *module, int64_t qid, int block) /*[clinic end generated code: output=b0988a0e29194f05 input=c5bccbc409ad0190]*/ { PyObject *obj = NULL; int unboundop = 0; - int err = queue_get(&_globals.queues, qid, &obj, &unboundop); + int err = queue_get(&_globals.queues, qid, &obj, &unboundop, block); // This is the only place that raises QueueEmpty. if (handle_queue_error(err, module, qid)) { return NULL; diff --git a/Modules/clinic/_interpqueuesmodule.c.h b/Modules/clinic/_interpqueuesmodule.c.h index 3f08a0cb6d36ab..eb21267fc355a8 100644 --- a/Modules/clinic/_interpqueuesmodule.c.h +++ b/Modules/clinic/_interpqueuesmodule.c.h @@ -196,7 +196,7 @@ PyDoc_STRVAR(_interpqueues_put__doc__, static PyObject * _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj, - int unboundarg, int fallbackarg); + int unboundarg, int fallbackarg, int block); static PyObject * _interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) @@ -204,7 +204,7 @@ _interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO PyObject *return_value = NULL; #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE) - #define NUM_KEYWORDS 4 + #define NUM_KEYWORDS 5 static struct { PyGC_Head _this_is_not_used; PyObject_VAR_HEAD @@ -213,7 +213,7 @@ _interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO } _kwtuple = { .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) .ob_hash = -1, - .ob_item = { &_Py_ID(qid), &_Py_ID(obj), &_Py_ID(unboundop), &_Py_ID(fallback), }, + .ob_item = { &_Py_ID(qid), &_Py_ID(obj), &_Py_ID(unboundop), &_Py_ID(fallback), &_Py_ID(block)}, }; #undef NUM_KEYWORDS #define KWTUPLE (&_kwtuple.ob_base.ob_base) @@ -222,7 +222,7 @@ _interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO # define KWTUPLE NULL #endif // !Py_BUILD_CORE - static const char * const _keywords[] = {"qid", "obj", "unboundop", "fallback", NULL}; + static const char * const _keywords[] = {"qid", "obj", "unboundop", "fallback", "block", NULL}; static _PyArg_Parser _parser = { .keywords = _keywords, .fname = "put", @@ -235,9 +235,10 @@ _interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO PyObject *obj; int unboundarg = -1; int fallbackarg = -1; + int block = 1; args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, - /*minpos*/ 2, /*maxpos*/ 4, /*minkw*/ 0, /*varpos*/ 0, argsbuf); + /*minpos*/ 2, /*maxpos*/ 5, /*minkw*/ 0, /*varpos*/ 0, argsbuf); if (!args) { goto exit; } @@ -261,8 +262,14 @@ _interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO if (fallbackarg == -1 && PyErr_Occurred()) { goto exit; } + if (args[4]) { + block = PyLong_AsInt(args[4]); + if (block == 1 && PyErr_Occurred()) { + goto exit; + } + } skip_optional_pos: - return_value = _interpqueues_put_impl(module, qid, obj, unboundarg, fallbackarg); + return_value = _interpqueues_put_impl(module, qid, obj, unboundarg, fallbackarg, block); exit: return return_value; @@ -280,7 +287,7 @@ PyDoc_STRVAR(_interpqueues_get__doc__, {"get", _PyCFunction_CAST(_interpqueues_get), METH_FASTCALL|METH_KEYWORDS, _interpqueues_get__doc__}, static PyObject * -_interpqueues_get_impl(PyObject *module, int64_t qid); +_interpqueues_get_impl(PyObject *module, int64_t qid, int block); static PyObject * _interpqueues_get(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) @@ -288,7 +295,7 @@ _interpqueues_get(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO PyObject *return_value = NULL; #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE) - #define NUM_KEYWORDS 1 + #define NUM_KEYWORDS 2 static struct { PyGC_Head _this_is_not_used; PyObject_VAR_HEAD @@ -297,7 +304,7 @@ _interpqueues_get(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO } _kwtuple = { .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) .ob_hash = -1, - .ob_item = { &_Py_ID(qid), }, + .ob_item = { &_Py_ID(qid), &_Py_ID(block)}, }; #undef NUM_KEYWORDS #define KWTUPLE (&_kwtuple.ob_base.ob_base) @@ -306,7 +313,7 @@ _interpqueues_get(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO # define KWTUPLE NULL #endif // !Py_BUILD_CORE - static const char * const _keywords[] = {"qid", NULL}; + static const char * const _keywords[] = {"qid", "block", NULL}; static _PyArg_Parser _parser = { .keywords = _keywords, .fname = "get", @@ -315,16 +322,24 @@ _interpqueues_get(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO #undef KWTUPLE PyObject *argsbuf[1]; int64_t qid; + int block = 1; args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, - /*minpos*/ 1, /*maxpos*/ 1, /*minkw*/ 0, /*varpos*/ 0, argsbuf); + /*minpos*/ 1, /*maxpos*/ 2, /*minkw*/ 0, /*varpos*/ 0, argsbuf); if (!args) { goto exit; } if (!qidarg_converter(args[0], &qid)) { goto exit; } - return_value = _interpqueues_get_impl(module, qid); + + if (args[1]) { + block = PyLong_AsInt(args[1]); + if (block == 1 && PyErr_Occurred()) { + goto exit; + } + } + return_value = _interpqueues_get_impl(module, qid, block); exit: return return_value;