@@ -91,14 +91,14 @@ impl PyFunctionExecutor {
9191impl interface:: SimpleFunctionExecutor for Arc < PyFunctionExecutor > {
9292 async fn evaluate ( & self , input : Vec < value:: Value > ) -> Result < value:: Value > {
9393 let self = self . clone ( ) ;
94- let result_fut = Python :: with_gil ( |py| -> Result < _ > {
94+ let result_fut = Python :: attach ( |py| -> Result < _ > {
9595 let result_coro = self . call_py_fn ( py, input) ?;
9696 let task_locals =
9797 pyo3_async_runtimes:: TaskLocals :: new ( self . py_exec_ctx . event_loop . bind ( py) . clone ( ) ) ;
9898 Ok ( from_py_future ( py, & task_locals, result_coro) ?)
9999 } ) ?;
100100 let result = result_fut. await ;
101- Python :: with_gil ( |py| -> Result < _ > {
101+ Python :: attach ( |py| -> Result < _ > {
102102 let result = result. to_result_with_py_trace ( py) ?;
103103 Ok ( py:: value_from_py_object (
104104 & self . result_type . typ ,
@@ -129,7 +129,7 @@ struct PyBatchedFunctionExecutor {
129129#[ async_trait]
130130impl BatchedFunctionExecutor for PyBatchedFunctionExecutor {
131131 async fn evaluate_batch ( & self , args : Vec < Vec < value:: Value > > ) -> Result < Vec < value:: Value > > {
132- let result_fut = Python :: with_gil ( |py| -> pyo3:: PyResult < _ > {
132+ let result_fut = Python :: attach ( |py| -> pyo3:: PyResult < _ > {
133133 let py_args = PyList :: new (
134134 py,
135135 args. into_iter ( )
@@ -155,7 +155,7 @@ impl BatchedFunctionExecutor for PyBatchedFunctionExecutor {
155155 ) ?)
156156 } ) ?;
157157 let result = result_fut. await ;
158- Python :: with_gil ( |py| -> Result < _ > {
158+ Python :: attach ( |py| -> Result < _ > {
159159 let result = result. to_result_with_py_trace ( py) ?;
160160 let result_bound = result. into_bound ( py) ;
161161 let result_list = result_bound. extract :: < Vec < Bound < ' _ , PyAny > > > ( ) ?;
@@ -189,7 +189,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
189189 context : Arc < interface:: FlowInstanceContext > ,
190190 ) -> Result < interface:: SimpleFunctionBuildOutput > {
191191 let ( result_type, executor, kw_args_names, num_positional_args, behavior_version) =
192- Python :: with_gil ( |py| -> anyhow:: Result < _ > {
192+ Python :: attach ( |py| -> anyhow:: Result < _ > {
193193 let mut args = vec ! [ pythonize( py, & spec) ?] ;
194194 let mut kwargs = vec ! [ ] ;
195195 let mut num_positional_args = 0 ;
@@ -246,7 +246,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
246246 . ok_or_else ( || anyhow ! ( "Python execution context is missing" ) ) ?
247247 . clone ( ) ;
248248 let ( prepare_fut, enable_cache, timeout, batching_options) =
249- Python :: with_gil ( |py| -> anyhow:: Result < _ > {
249+ Python :: attach ( |py| -> anyhow:: Result < _ > {
250250 let prepare_coro = executor
251251 . call_method ( py, "prepare" , ( ) , None )
252252 . to_result_with_py_trace ( py)
@@ -342,10 +342,10 @@ impl interface::SourceExecutor for PySourceExecutor {
342342 options : & interface:: SourceExecutorReadOptions ,
343343 ) -> Result < BoxStream < ' async_trait , Result < Vec < interface:: PartialSourceRow > > > > {
344344 let py_exec_ctx = self . py_exec_ctx . clone ( ) ;
345- let py_source_executor = Python :: with_gil ( |py| self . py_source_executor . clone_ref ( py) ) ;
345+ let py_source_executor = Python :: attach ( |py| self . py_source_executor . clone_ref ( py) ) ;
346346
347347 // Get the Python async iterator
348- let py_async_iter = Python :: with_gil ( |py| {
348+ let py_async_iter = Python :: attach ( |py| {
349349 py_source_executor
350350 . call_method ( py, "list_async" , ( pythonize ( py, options) ?, ) , None )
351351 . to_result_with_py_trace ( py)
@@ -375,10 +375,10 @@ impl interface::SourceExecutor for PySourceExecutor {
375375 options : & interface:: SourceExecutorReadOptions ,
376376 ) -> Result < interface:: PartialSourceRowData > {
377377 let py_exec_ctx = self . py_exec_ctx . clone ( ) ;
378- let py_source_executor = Python :: with_gil ( |py| self . py_source_executor . clone_ref ( py) ) ;
378+ let py_source_executor = Python :: attach ( |py| self . py_source_executor . clone_ref ( py) ) ;
379379 let key_clone = key. clone ( ) ;
380380
381- let py_result = Python :: with_gil ( |py| -> Result < _ > {
381+ let py_result = Python :: attach ( |py| -> Result < _ > {
382382 let result_coro = py_source_executor
383383 . call_method (
384384 py,
@@ -406,7 +406,7 @@ impl interface::SourceExecutor for PySourceExecutor {
406406 } ) ?
407407 . await ;
408408
409- Python :: with_gil ( |py| -> Result < _ > {
409+ Python :: attach ( |py| -> Result < _ > {
410410 let result = py_result. to_result_with_py_trace ( py) ?;
411411 let result_bound = result. into_bound ( py) ;
412412 let data = self . parse_partial_source_row_data ( py, & result_bound) ?;
@@ -432,7 +432,7 @@ impl PySourceExecutor {
432432 py_exec_ctx : & Arc < crate :: py:: PythonExecutionContext > ,
433433 ) -> Result < Option < interface:: PartialSourceRow > > {
434434 // Call the Python method to get the next item, avoiding storing Python objects across await points
435- let next_item_coro = Python :: with_gil ( |py| -> Result < _ > {
435+ let next_item_coro = Python :: attach ( |py| -> Result < _ > {
436436 let coro = py_async_iter
437437 . call_method0 ( py, "__anext__" )
438438 . to_result_with_py_trace ( py)
@@ -446,7 +446,7 @@ impl PySourceExecutor {
446446 let py_item_result = next_item_coro. await ;
447447
448448 // Handle StopAsyncIteration and convert to Rust data immediately to avoid Send issues
449- Python :: with_gil ( |py| -> Result < Option < interface:: PartialSourceRow > > {
449+ Python :: attach ( |py| -> Result < Option < interface:: PartialSourceRow > > {
450450 match py_item_result {
451451 Ok ( item) => {
452452 let bound_item = item. into_bound ( py) ;
@@ -472,7 +472,7 @@ impl PySourceExecutor {
472472 ) -> Result < interface:: PartialSourceRow > {
473473 // Each item should be a tuple of (key, data)
474474 let tuple = bound_item
475- . downcast :: < PyTuple > ( )
475+ . cast :: < PyTuple > ( )
476476 . map_err ( |e| anyhow ! ( "Failed to downcast to PyTuple: {}" , e) ) ?;
477477 if tuple. len ( ) != 2 {
478478 api_bail ! ( "Expected tuple of length 2 from Python source iterator" ) ;
@@ -583,7 +583,7 @@ impl interface::SourceFactory for PySourceConnectorFactory {
583583 . clone ( ) ;
584584
585585 // First get the table type (this doesn't require executor)
586- let table_type = Python :: with_gil ( |py| -> Result < _ > {
586+ let table_type = Python :: attach ( |py| -> Result < _ > {
587587 let value_type_result = self
588588 . py_source_connector
589589 . call_method ( py, "get_table_type" , ( ) , None )
@@ -625,7 +625,7 @@ impl interface::SourceFactory for PySourceConnectorFactory {
625625 let source_name = source_name. to_string ( ) ;
626626 let executor_fut = async move {
627627 // Create the executor using the async create_executor method
628- let create_future = Python :: with_gil ( |py| -> Result < _ > {
628+ let create_future = Python :: attach ( |py| -> Result < _ > {
629629 let create_coro = self
630630 . py_source_connector
631631 . call_method ( py, "create_executor" , ( pythonize ( py, & spec) ?, ) , None )
@@ -645,7 +645,7 @@ impl interface::SourceFactory for PySourceConnectorFactory {
645645 let py_executor_context_result = create_future. await ;
646646
647647 let ( py_source_executor_context, provides_ordinal) =
648- Python :: with_gil ( |py| -> Result < _ > {
648+ Python :: attach ( |py| -> Result < _ > {
649649 let executor_context = py_executor_context_result
650650 . to_result_with_py_trace ( py)
651651 . with_context ( || {
@@ -748,7 +748,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
748748 . ok_or_else ( || anyhow ! ( "Python execution context is missing" ) ) ?
749749 . clone ( ) ;
750750 for data_collection in data_collections. into_iter ( ) {
751- let ( py_export_ctx, persistent_key, setup_state) = Python :: with_gil ( |py| {
751+ let ( py_export_ctx, persistent_key, setup_state) = Python :: attach ( |py| {
752752 // Deserialize the spec to Python object.
753753 let py_export_ctx = self
754754 . py_target_connector
@@ -805,7 +805,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
805805 let py_exec_ctx = py_exec_ctx. clone ( ) ;
806806 let build_output = interface:: ExportDataCollectionBuildOutput {
807807 export_context : Box :: pin ( async move {
808- Python :: with_gil ( |py| {
808+ Python :: attach ( |py| {
809809 let prepare_coro = factory
810810 . py_target_connector
811811 . call_method ( py, "prepare_async" , ( & py_export_ctx, ) , None )
@@ -872,7 +872,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
872872 desired_state : & serde_json:: Value ,
873873 existing_state : & serde_json:: Value ,
874874 ) -> Result < SetupStateCompatibility > {
875- let compatibility = Python :: with_gil ( |py| -> Result < _ > {
875+ let compatibility = Python :: attach ( |py| -> Result < _ > {
876876 let result = self
877877 . py_target_connector
878878 . call_method (
@@ -895,7 +895,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
895895 }
896896
897897 fn describe_resource ( & self , key : & serde_json:: Value ) -> Result < String > {
898- Python :: with_gil ( |py| -> Result < String > {
898+ Python :: attach ( |py| -> Result < String > {
899899 let result = self
900900 . py_target_connector
901901 . call_method ( py, "describe_resource" , ( pythonize ( py, key) ?, ) , None )
@@ -950,7 +950,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
950950 . as_ref ( )
951951 . ok_or_else ( || anyhow ! ( "Python execution context is missing" ) ) ?
952952 . clone ( ) ;
953- let py_result = Python :: with_gil ( move |py| -> Result < _ > {
953+ let py_result = Python :: attach ( move |py| -> Result < _ > {
954954 let result_coro = self
955955 . py_target_connector
956956 . call_method (
@@ -972,7 +972,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
972972 ) ?)
973973 } ) ?
974974 . await ;
975- Python :: with_gil ( move |py| {
975+ Python :: attach ( move |py| {
976976 py_result
977977 . to_result_with_py_trace ( py)
978978 . with_context ( || format ! ( "while applying setup changes in user-configured target" ) )
@@ -991,7 +991,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
991991 return Ok ( ( ) ) ;
992992 }
993993
994- let py_result = Python :: with_gil ( |py| -> Result < _ > {
994+ let py_result = Python :: attach ( |py| -> Result < _ > {
995995 // Create a `list[tuple[export_ctx, list[tuple[key, value | None]]]]` for Python, and collect `py_exec_ctx`.
996996 let mut py_args = Vec :: with_capacity ( mutations. len ( ) ) ;
997997 let mut py_exec_ctx: Option < & Arc < crate :: py:: PythonExecutionContext > > = None ;
@@ -1039,7 +1039,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
10391039 } ) ?
10401040 . await ;
10411041
1042- Python :: with_gil ( move |py| {
1042+ Python :: attach ( move |py| {
10431043 py_result
10441044 . to_result_with_py_trace ( py)
10451045 . with_context ( || format ! ( "while applying mutations in user-configured target" ) )
0 commit comments