Skip to content

Commit

Permalink
Avoid calling shutdown after failed write of AsyncWrite (#249)
Browse files Browse the repository at this point in the history
in `serialize_rb_stream_to_object_store`
  • Loading branch information
joroKr21 authored Jul 2, 2024
1 parent a3195c2 commit 3089216
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: WriterType,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
) -> std::result::Result<(WriterType, u64), (Option<WriterType>, DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<SpawnedTask<Result<(usize, Bytes), DataFusionError>>>(100);
let serialize_task = SpawnedTask::spawn(async move {
Expand Down Expand Up @@ -82,7 +82,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
Ok(_) => (),
Err(e) => {
return Err((
writer,
None,
DataFusionError::Execution(format!(
"Error writing to object store: {e}"
)),
Expand All @@ -93,12 +93,12 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
}
Ok(Err(e)) => {
// Return the writer along with the error
return Err((writer, e));
return Err((Some(writer), e));
}
Err(e) => {
// Handle task panic or cancellation
return Err((
writer,
Some(writer),
DataFusionError::Execution(format!(
"Serialization task panicked or was cancelled: {e}"
)),
Expand All @@ -109,10 +109,10 @@ pub(crate) async fn serialize_rb_stream_to_object_store(

match serialize_task.join().await {
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err((writer, e)),
Ok(Err(e)) => return Err((Some(writer), e)),
Err(_) => {
return Err((
writer,
Some(writer),
internal_datafusion_err!("Unknown error writing to object store"),
))
}
Expand Down Expand Up @@ -153,7 +153,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
row_count += cnt;
}
Err((writer, e)) => {
finished_writers.push(writer);
finished_writers.extend(writer);
any_errors = true;
triggering_error = Some(e);
}
Expand Down

0 comments on commit 3089216

Please sign in to comment.