Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions Lib/concurrent/interpreters/_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ def qsize(self):

def put(self, obj, block=True, timeout=None, *,
unbounditems=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.

Expand Down Expand Up @@ -226,7 +225,6 @@ def put(self, obj, block=True, timeout=None, *,
except QueueFull:
if timeout is not None and time.time() >= end:
raise # re-raise
time.sleep(_delay)
else:
break

Expand All @@ -235,11 +233,9 @@ def put_nowait(self, obj, *, unbounditems=None):
unboundop = -1
else:
unboundop, = _serialize_unbound(unbounditems)
_queues.put(self._id, obj, unboundop)
_queues.put(self._id, obj, unboundop, False)

def get(self, block=True, timeout=None, *,
_delay=10 / 1000, # 10 milliseconds
):
def get(self, block=True, timeout=None):
"""Return the next object from the queue.

If "block" is true, this blocks while the queue is empty.
Expand All @@ -261,7 +257,6 @@ def get(self, block=True, timeout=None, *,
except QueueEmpty:
if timeout is not None and time.time() >= end:
raise # re-raise
time.sleep(_delay)
else:
break
if unboundop is not None:
Expand All @@ -276,7 +271,7 @@ def get_nowait(self):
is the same as get().
"""
try:
obj, unboundop = _queues.get(self._id)
obj, unboundop = _queues.get(self._id, False)
except QueueEmpty:
raise # re-raise
if unboundop is not None:
Expand Down
32 changes: 26 additions & 6 deletions Modules/_interpqueuesmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "Python.h"
#include "pycore_crossinterp.h" // _PyXIData_t
#include "pycore_lock.h" // PyEvent

#define REGISTERS_HEAP_TYPES
#define HAS_FALLBACK
Expand Down Expand Up @@ -532,6 +533,8 @@ typedef struct _queue {
xidata_fallback_t fallback;
int unboundop;
} defaults;
PyEvent space_available;
PyEvent has_item;
} _queue;

static int
Expand All @@ -549,6 +552,8 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, struct _queuedefaults defaults)
.maxsize = maxsize,
},
.defaults = defaults,
.space_available = (PyEvent){1},
.has_item = (PyEvent){0}
};
return 0;
}
Expand Down Expand Up @@ -642,6 +647,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
}
if (queue->items.count >= maxsize) {
_queue_unlock(queue);
queue->space_available = (PyEvent){0};
return ERR_QUEUE_FULL;
}

Expand All @@ -661,6 +667,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
queue->items.last = item;

_queue_unlock(queue);
queue->has_item = (PyEvent){1};
return 0;
}

Expand All @@ -676,6 +683,7 @@ _queue_next(_queue *queue, _PyXIData_t **p_data, int *p_unboundop)
_queueitem *item = queue->items.first;
if (item == NULL) {
_queue_unlock(queue);
queue->has_item = (PyEvent){0};
return ERR_QUEUE_EMPTY;
}
queue->items.first = item->next;
Expand Down Expand Up @@ -1124,7 +1132,7 @@ queue_destroy(_queues *queues, int64_t qid)
// Push an object onto the queue.
static int
queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
xidata_fallback_t fallback)
xidata_fallback_t fallback, int block)
{
PyThreadState *tstate = PyThreadState_Get();

Expand All @@ -1136,6 +1144,11 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
}
assert(queue != NULL);

// Wait for the queue to have space
if (block == 1) {
PyEvent_Wait(&queue->space_available);
}

// Convert the object to cross-interpreter data.
_PyXIData_t *xidata = _PyXIData_New();
if (xidata == NULL) {
Expand Down Expand Up @@ -1168,7 +1181,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
// XXX Support a "wait" mutex?
static int
queue_get(_queues *queues, int64_t qid,
PyObject **res, int *p_unboundop)
PyObject **res, int *p_unboundop, int block)
{
int err;
*res = NULL;
Expand All @@ -1182,6 +1195,11 @@ queue_get(_queues *queues, int64_t qid,
// Past this point we are responsible for releasing the mutex.
assert(queue != NULL);

// Wait for the queue to have some value
if (block == 1) {
PyEvent_Wait(&queue->has_item);
}

// Pop off the next item from the queue.
_PyXIData_t *data = NULL;
err = _queue_next(queue, &data, p_unboundop);
Expand Down Expand Up @@ -1613,13 +1631,14 @@ _interpqueues.put
obj: object
unboundop as unboundarg: int = -1
fallback as fallbackarg: int = -1
block: bool = True

Add the object's data to the queue.
[clinic start generated code]*/

static PyObject *
_interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
int unboundarg, int fallbackarg)
int unboundarg, int fallbackarg, int block)
/*[clinic end generated code: output=2e0b31c6eaec29c9 input=4906550ab5c73be3]*/
{
struct _queuedefaults defaults = {-1, -1};
Expand All @@ -1639,7 +1658,7 @@ _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
}

/* Queue up the object. */
int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback);
int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback, block);
// This is the only place that raises QueueFull.
if (handle_queue_error(err, module, qid)) {
return NULL;
Expand All @@ -1651,19 +1670,20 @@ _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
/*[clinic input]
_interpqueues.get
qid: qidarg
block: bool = True

Return the (object, unbound op) from the front of the queue.

If there is nothing to receive then raise QueueEmpty.
[clinic start generated code]*/

static PyObject *
_interpqueues_get_impl(PyObject *module, int64_t qid)
_interpqueues_get_impl(PyObject *module, int64_t qid, int block)
/*[clinic end generated code: output=b0988a0e29194f05 input=c5bccbc409ad0190]*/
{
PyObject *obj = NULL;
int unboundop = 0;
int err = queue_get(&_globals.queues, qid, &obj, &unboundop);
int err = queue_get(&_globals.queues, qid, &obj, &unboundop, block);
// This is the only place that raises QueueEmpty.
if (handle_queue_error(err, module, qid)) {
return NULL;
Expand Down
39 changes: 27 additions & 12 deletions Modules/clinic/_interpqueuesmodule.c.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading