Skip to content

Commit 74405d8

Browse files
timsaucerclaude
andcommitted
refactor: keyword-only inlining flag, skip GIL on prefix mismatch
- `SessionContext.with_python_udf_inlining` now keyword-only (`*, enabled`) to match the documented call style and the existing doctests/tests. - `refuse_if_inline` and the three `try_decode_python_*` decoders short- circuit on a `starts_with(family)` check before `Python::attach`, so plans whose UDFs are not Python-defined no longer pay a GIL acquisition per decode call. Semantics preserved: `strip_wire_header` already returns `Ok(None)` when the prefix does not match. - `datafusion.ipc` module docstring wraps the `set_sender_ctx` example in `try`/`finally` and notes that the thread-local holds a strong reference to the installed `SessionContext` until cleared. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent a24daf6 commit 74405d8

3 files changed

Lines changed: 32 additions & 3 deletions

File tree

crates/core/src/codec.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,16 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
393393
/// surface *their* diagnostic instead of the strict-mode message.
394394
/// The strict message implies sender intent ("inlining is disabled"),
395395
/// so it should fire only when the bytes really would have decoded.
396+
///
397+
/// Fast path: short-circuit on the family-magic prefix before
398+
/// acquiring the GIL. Plans with many non-Python UDFs would otherwise
399+
/// pay a GIL acquisition per decode call just to confirm "not a
400+
/// Python UDF". `read_framed_payload` itself rejects buffers that
401+
/// don't start with `family`, so this is purely an optimization.
396402
fn refuse_if_inline(buf: &[u8], family: &[u8], kind: &str, name: &str) -> Result<()> {
403+
if !buf.starts_with(family) {
404+
return Ok(());
405+
}
397406
Python::attach(|py| match read_framed_payload(py, buf, family, kind)? {
398407
Some(_) => Err(refuse_inline_payload(kind, name)),
399408
None => Ok(()),
@@ -578,6 +587,9 @@ pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec<u8>)
578587
/// the caller to delegate to its `inner` codec (and eventually the
579588
/// `FunctionRegistry`).
580589
pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result<Option<Arc<ScalarUDF>>> {
590+
if !buf.starts_with(PY_SCALAR_UDF_FAMILY) {
591+
return Ok(None);
592+
}
581593
Python::attach(|py| -> Result<Option<Arc<ScalarUDF>>> {
582594
let Some(payload) = read_framed_payload(py, buf, PY_SCALAR_UDF_FAMILY, "scalar UDF")?
583595
else {
@@ -834,6 +846,9 @@ pub(crate) fn try_encode_python_udwf(node: &WindowUDF, buf: &mut Vec<u8>) -> Res
834846
}
835847

836848
pub(crate) fn try_decode_python_udwf(buf: &[u8]) -> Result<Option<Arc<WindowUDF>>> {
849+
if !buf.starts_with(PY_WINDOW_UDF_FAMILY) {
850+
return Ok(None);
851+
}
837852
Python::attach(|py| -> Result<Option<Arc<WindowUDF>>> {
838853
let Some(payload) = read_framed_payload(py, buf, PY_WINDOW_UDF_FAMILY, "window UDF")?
839854
else {
@@ -916,6 +931,9 @@ pub(crate) fn try_encode_python_udaf(node: &AggregateUDF, buf: &mut Vec<u8>) ->
916931
}
917932

918933
pub(crate) fn try_decode_python_udaf(buf: &[u8]) -> Result<Option<Arc<AggregateUDF>>> {
934+
if !buf.starts_with(PY_AGG_UDF_FAMILY) {
935+
return Ok(None);
936+
}
919937
Python::attach(|py| -> Result<Option<Arc<AggregateUDF>>> {
920938
let Some(payload) = read_framed_payload(py, buf, PY_AGG_UDF_FAMILY, "aggregate UDF")?
921939
else {

python/datafusion/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1770,7 +1770,7 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext:
17701770
new.ctx = new_internal
17711771
return new
17721772

1773-
def with_python_udf_inlining(self, enabled: bool) -> SessionContext:
1773+
def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext:
17741774
"""Control whether Python UDFs are embedded in serialized expressions.
17751775
17761776
When ``enabled=True`` (the default), serialized expressions carry

python/datafusion/ipc.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,27 @@ def init_worker():
6262
.. code-block:: python
6363
6464
from datafusion import SessionContext
65-
from datafusion.ipc import set_sender_ctx
65+
from datafusion.ipc import clear_sender_ctx, set_sender_ctx
6666
6767
driver_ctx = SessionContext().with_python_udf_inlining(enabled=False)
6868
set_sender_ctx(driver_ctx)
69-
pickle.dumps(expr) # encoded with inlining disabled
69+
try:
70+
pickle.dumps(expr) # encoded with inlining disabled
71+
finally:
72+
clear_sender_ctx()
7073
7174
Without a sender context the default codec is used (Python UDF
7275
inlining on). The sender context only affects pickle / ``to_bytes``
7376
encoding; explicit ``expr.to_bytes(ctx)`` calls still use the supplied
7477
``ctx``.
78+
79+
The thread-local holds a strong reference to the installed
80+
:class:`SessionContext` until :func:`clear_sender_ctx` is called or
81+
the thread exits. Long-running driver threads that install a sender
82+
context once and never clear it will retain that session for the
83+
lifetime of the thread; pair :func:`set_sender_ctx` with
84+
:func:`clear_sender_ctx` (e.g. in a ``try``/``finally``) when the
85+
sender context is only needed for a bounded scope.
7586
"""
7687

7788
from __future__ import annotations

0 commit comments

Comments
 (0)