diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index cb7dfa5552600..ab3de2fe01682 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -186,18 +186,22 @@ def toPandas(self) -> "PandasDataFrameLike": # Below is toPandas without Arrow optimization. rows = self.collect() - if len(rows) > 0: - pdf = pd.DataFrame.from_records( - rows, index=range(len(rows)), columns=self.columns # type: ignore[arg-type] - ) - else: - pdf = pd.DataFrame(columns=self.columns) - if len(pdf.columns) > 0: + if len(self.columns) > 0: timezone = jconf.sessionLocalTimeZone() struct_in_pandas = jconf.pandasStructHandlingMode() - return pd.concat( + # Avoid intermediate pandas DataFrame creation by directly converting columns + if len(rows) > 0: + # Extract columns from rows + columns_data = list(zip(*rows)) + series_list = [pd.Series(col_data) for col_data in columns_data] + else: + # Empty rows - create empty DataFrame and extract empty Series + pdf_temp = pd.DataFrame(columns=self.columns) + series_list = [pser for _, pser in pdf_temp.items()] + + pdf = pd.concat( [ _create_converter_to_pandas( field.dataType, @@ -208,13 +212,15 @@ def toPandas(self) -> "PandasDataFrameLike": ), error_on_duplicated_field_names=False, timestamp_utc_localized=False, - )(pser) - for (_, pser), field in zip(pdf.items(), self.schema.fields) + )(series) + for series, field in zip(series_list, self.schema.fields) ], axis="columns", ) - else: + pdf.columns = self.columns return pdf + else: + return pd.DataFrame(columns=[], index=range(len(rows))) def toArrow(self) -> "pa.Table": from pyspark.sql.dataframe import DataFrame