diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index f2f8b08b9da7..ab70e47aec3f 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -44,6 +44,23 @@ Information about a barrier task. - :class:`InheritableThread`: A inheritable thread to use in Spark when the pinned thread mode is on. + +Spark Connect compatibility annotations +======================================= + +The following RST directives annotate PySpark modules, classes, and methods with +their Spark Connect compatibility status: + +- ``.. classic:: true`` -- the API is only available in Classic Spark (not Spark Connect). +- ``.. connect:: true`` -- the API is available in Spark Connect. +- ``.. connect_migration:: `` -- migration guidance for users transitioning + from Classic Spark to Spark Connect. + +Annotations are resolved by inheriting from the nearest annotated ancestor. A child +annotation overrides the parent's. + +.. classic:: true +.. connect:: true """ import sys diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index cd2eea5258c8..119d04427b38 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -15,6 +15,13 @@ # limitations under the License. # +""" +.. classic:: true + +.. connect_migration:: Use `df.observe(name, *exprs)` to collect named metrics during + query execution. +""" + import os import sys import select diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index 2f109b007b9c..2981db647ca0 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -15,6 +15,13 @@ # limitations under the License. # +""" +.. classic:: true + +.. connect_migration:: Read Spark SQL configuration values using `spark.conf.get(key)` + and write them using `spark.conf.set(key, value)`. +""" + __all__ = ["SparkConf"] import sys diff --git a/python/pyspark/core/__init__.py b/python/pyspark/core/__init__.py index cce3acad34a4..763b47972874 100644 --- a/python/pyspark/core/__init__.py +++ b/python/pyspark/core/__init__.py @@ -14,3 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +""" +.. classic:: true +""" diff --git a/python/pyspark/core/context.py b/python/pyspark/core/context.py index 8bdea00dad4e..a464565003b3 100644 --- a/python/pyspark/core/context.py +++ b/python/pyspark/core/context.py @@ -558,6 +558,9 @@ def setLogLevel(self, logLevel: str) -> None: Examples -------- >>> sc.setLogLevel("WARN") # doctest :+SKIP + + .. connect_migration:: Replace sc.setLogLevel(level) with + spark.conf.set("spark.log.level", level) """ self._jsc.setLogLevel(logLevel) @@ -630,6 +633,8 @@ def applicationId(self) -> str: -------- >>> sc.applicationId # doctest: +ELLIPSIS 'local-...' + + .. connect_migration:: Replace spark.sparkContext.applicationId with spark.conf.get("spark.app.id") """ return self._jsc.sc().applicationId() @@ -675,6 +680,10 @@ def defaultParallelism(self) -> int: -------- >>> sc.defaultParallelism > 0 True + + .. connect_migration:: Replace spark.sparkContext.defaultParallelism with + int(spark.conf.get("spark.default.parallelism")) if set, otherwise + the default depends on the cluster executor configuration. """ return self._jsc.sc().defaultParallelism() @@ -734,6 +743,9 @@ def emptyRDD(self) -> RDD[Any]: EmptyRDD... >>> sc.emptyRDD().count() 0 + + .. connect_migration:: Replace sc.emptyRDD with an empty list. When used with + createDataFrame: spark.createDataFrame([], schema) """ return RDD(self._jsc.emptyRDD(), self, NoOpSerializer()) @@ -828,6 +840,9 @@ def parallelize(self, c: Iterable[T], numSlices: Optional[int] = None) -> RDD[T] >>> strings = ["a", "b", "c"] >>> sc.parallelize(strings, 2).glom().collect() [['a'], ['b', 'c']] + + .. connect_migration:: Replace sc.parallelize(data) with the Python collection directly. + When used with createDataFrame: spark.createDataFrame(data, schema) """ numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism if isinstance(c, range): @@ -2212,6 +2227,11 @@ def setJobGroup(self, groupId: str, description: str, interruptOnCancel: bool = >>> suppress = lock.acquire() >>> print(result) Cancelled + + .. connect_migration:: Replace sc.setJobGroup(groupId, desc) with + spark.addTag(groupId) for grouping and cancellation via + spark.interruptTag(groupId). Job description and interruptOnCancel + are not directly supported in Spark Connect. """ self._jsc.setJobGroup(groupId, description, interruptOnCancel) @@ -2410,6 +2430,11 @@ def setLocalProperty(self, key: str, value: str) -> None: ----- If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread local inheritance. + + .. connect_migration:: Replace spark.sparkContext.setLocalProperty(key, value) with + spark.conf.set(key, value) for SQL configuration keys. Note that + spark.conf.set is session-global; thread-local isolation is not + supported in Spark Connect. """ self._jsc.setLocalProperty(key, value) @@ -2441,6 +2466,9 @@ def setJobDescription(self, value: str) -> None: ----- If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread local inheritance. + + .. connect_migration:: Replace sc.setJobDescription(desc) with + spark.conf.set("spark.job.description", desc) """ self._jsc.setJobDescription(value) @@ -2610,6 +2638,9 @@ def getConf(self) -> SparkConf: """Return a copy of this SparkContext's configuration :class:`SparkConf`. .. versionadded:: 2.1.0 + + .. connect_migration:: Replace sc.getConf() with spark.conf. For a specific key use + spark.conf.get(key) """ conf = SparkConf() conf.setAll(self._conf.getAll()) diff --git a/python/pyspark/core/rdd.py b/python/pyspark/core/rdd.py index 8ec147cd9049..2b395a8c4e5d 100644 --- a/python/pyspark/core/rdd.py +++ b/python/pyspark/core/rdd.py @@ -609,6 +609,9 @@ def map(self: "RDD[T]", f: Callable[[T], U], preservesPartitioning: bool = False >>> rdd = sc.parallelize(["b", "a", "c"]) >>> sorted(rdd.map(lambda x: (x, 1)).collect()) [('a', 1), ('b', 1), ('c', 1)] + + .. connect_migration:: Replace rdd.map() with DataFrame operations. Use + df.withColumn(), df.select() with a UDF, or a pandas UDF instead. """ def func(_: int, iterator: Iterable[T]) -> Iterable[U]: @@ -697,6 +700,9 @@ def mapPartitions( ... >>> rdd.mapPartitions(f).collect() [3, 7] + + .. connect_migration:: Replace rdd.mapPartitions() with a pandas UDF using + applyInPandas. """ def func(_: int, iterator: Iterable[T]) -> Iterable[U]: diff --git a/python/pyspark/errors/exceptions/connect.py b/python/pyspark/errors/exceptions/connect.py index 90537c2cc364..37bee7062864 100644 --- a/python/pyspark/errors/exceptions/connect.py +++ b/python/pyspark/errors/exceptions/connect.py @@ -14,6 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +""" +.. connect:: true +""" + import grpc import json from grpc import StatusCode diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 6303a4361857..ab926489f653 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -15,6 +15,10 @@ # limitations under the License. # +""" +.. classic:: true +""" + import atexit import os import signal diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index c3964724dd3a..87310d5c2d8d 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -1429,6 +1429,10 @@ class LDAModel(JavaModel, _LDAParams): including local and distributed data structures. .. versionadded:: 2.0.0 + + .. classic:: true + + .. connect_migration:: LDA model family is not supported in Spark Connect. """ @since("3.0.0") @@ -1530,6 +1534,10 @@ class DistributedLDAModel(LDAModel, JavaMLReadable["DistributedLDAModel"], JavaM for each training document. .. versionadded:: 2.0.0 + + .. classic:: true + + .. connect_migration:: LDA model family is not supported in Spark Connect. """ @functools.cache @@ -1608,6 +1616,10 @@ class LocalLDAModel(LDAModel, JavaMLReadable["LocalLDAModel"], JavaMLWritable): This model stores the inferred topics only; it does not store info about the training dataset. .. versionadded:: 2.0.0 + + .. classic:: true + + .. connect_migration:: LDA model family is not supported in Spark Connect. """ pass @@ -1682,6 +1694,10 @@ class LDA(JavaEstimator[LDAModel], _LDAParams, JavaMLReadable["LDA"], JavaMLWrit >>> sameLocalModel = LocalLDAModel.load(local_model_path) >>> model.transform(df).take(1) == sameLocalModel.transform(df).take(1) True + + .. classic:: true + + .. connect_migration:: LDA model family is not supported in Spark Connect. """ _input_kwargs: Dict[str, Any] diff --git a/python/pyspark/ml/connect/__init__.py b/python/pyspark/ml/connect/__init__.py index c4bc8c9d84d2..a06c22cf3233 100644 --- a/python/pyspark/ml/connect/__init__.py +++ b/python/pyspark/ml/connect/__init__.py @@ -15,7 +15,11 @@ # limitations under the License. # -"""Spark Connect Python Client - ML module""" +""" +Spark Connect Python Client - ML module + +.. connect:: true +""" from pyspark.sql.connect.utils import check_dependencies diff --git a/python/pyspark/ml/deepspeed/__init__.py b/python/pyspark/ml/deepspeed/__init__.py index cce3acad34a4..6a545526600b 100644 --- a/python/pyspark/ml/deepspeed/__init__.py +++ b/python/pyspark/ml/deepspeed/__init__.py @@ -14,3 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +""" +.. classic:: true + +.. connect_migration:: Migrate to pyspark.ml.connect +""" diff --git a/python/pyspark/ml/torch/__init__.py b/python/pyspark/ml/torch/__init__.py index cce3acad34a4..6a545526600b 100644 --- a/python/pyspark/ml/torch/__init__.py +++ b/python/pyspark/ml/torch/__init__.py @@ -14,3 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +""" +.. classic:: true + +.. connect_migration:: Migrate to pyspark.ml.connect +""" diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index cd8df055daa8..fef8b571534d 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -15,6 +15,10 @@ # limitations under the License. # +""" +.. classic:: true +""" + from abc import ABCMeta, abstractmethod from typing import Any, Generic, Optional, List, Type, TypeVar, TYPE_CHECKING diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py index fdf35368ade9..55b9ddcc8c26 100644 --- a/python/pyspark/mllib/__init__.py +++ b/python/pyspark/mllib/__init__.py @@ -20,6 +20,13 @@ The `pyspark.mllib` package is in maintenance mode as of the Spark 2.0.0 release to encourage migration to the DataFrame-based APIs under the `pyspark.ml` package. + +.. classic:: true + +.. connect_migration:: Migrate to pyspark.ml.connect where supported (classification, + evaluation, feature, pipeline, summarizer, tuning). Modules without a Connect + equivalent (clustering, fpm, linalg, random, recommendation, regression, stat, + tree) have no direct replacement. """ # MLlib currently needs NumPy 1.4+, so complain if lower diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py index 8f797b5f21fc..f3180369bbf5 100644 --- a/python/pyspark/profiler.py +++ b/python/pyspark/profiler.py @@ -15,6 +15,10 @@ # limitations under the License. # +""" +.. classic:: true +""" + from typing import ( Any, Callable, diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 14db5af9e653..9576f8b7f4b1 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -15,6 +15,10 @@ # limitations under the License. # +""" +.. classic:: true +""" + import sys import random import math diff --git a/python/pyspark/sql/classic/__init__.py b/python/pyspark/sql/classic/__init__.py index f7ae391c3186..c3a8cd510d89 100644 --- a/python/pyspark/sql/classic/__init__.py +++ b/python/pyspark/sql/classic/__init__.py @@ -15,4 +15,8 @@ # limitations under the License. # -"""Spark Classic specific""" +""" +Spark Classic specific + +.. classic:: true +""" diff --git a/python/pyspark/sql/connect/__init__.py b/python/pyspark/sql/connect/__init__.py index cff3174b321d..84856e0e834d 100644 --- a/python/pyspark/sql/connect/__init__.py +++ b/python/pyspark/sql/connect/__init__.py @@ -15,7 +15,11 @@ # limitations under the License. # -"""Spark Connect client""" +""" +Spark Connect client + +.. connect:: true +""" from pyspark.sql.connect.utils import check_dependencies diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 7145b27f2cf3..5dcaad4ab3ef 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -15,6 +15,12 @@ # limitations under the License. # +""" +.. classic:: true + +.. connect_migration:: Use SparkSession instead. +""" + import sys import warnings from typing import ( diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index b87eefd9ca4e..dd33af10764c 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -187,6 +187,9 @@ def rdd(self) -> "RDD[Row]": >>> df = spark.range(1) >>> type(df.rdd) + + .. connect_migration:: Use DataFrame operations directly as RDD is not supported + in Spark Connect. """ ... diff --git a/python/pyspark/sql/metrics.py b/python/pyspark/sql/metrics.py index a258a4f1db70..bba4b6a4b06c 100644 --- a/python/pyspark/sql/metrics.py +++ b/python/pyspark/sql/metrics.py @@ -14,6 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +""" +.. connect:: true +""" + import abc import dataclasses from typing import Optional, List, Tuple, Dict, Any, Union, TYPE_CHECKING, Sequence diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index 88f48070f205..d94511011893 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -14,6 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +""" +.. classic:: true +""" + from abc import ABC, abstractmethod from io import StringIO import cProfile diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bed87788d2c1..e89a59511040 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -434,6 +434,12 @@ def json( +----+---+ | Bob| 30| +----+---+ + + .. connect_migration:: Does not support RDD arguments in Spark Connect. For a list + of JSON strings, parse them first: import json; + spark.createDataFrame([json.loads(s) for s in json_strings]). + Alternatively, write the strings to a file path and use + spark.read.json(path). """ self._set_opts( schema=schema, diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index e586c560deeb..47c6b0be47bb 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -772,6 +772,11 @@ def registerJavaFunction( ... # doctest: +SKIP >>> spark.sql("SELECT javaStringLength3('test')").collect() # doctest: +SKIP [Row(javaStringLength3(test)=4)] + + .. connect_migration:: Java UDF registration is not supported in Spark Connect. + In mixed-language notebooks, register the UDF from a Scala cell using + spark.udf.register; it will then be accessible from Python in the same + session. For standalone applications, rewrite the UDF in Python. """ jdt = None diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 92bf0b11a3e1..224144a3077a 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -15,6 +15,13 @@ # limitations under the License. # +""" +.. classic:: true + +.. connect_migration:: Use `df.describe()` for summary statistics, or + `df.observe(name, *exprs)` for named metrics. +""" + # This file is ported from spark/util/StatCounter.scala import copy diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py index 7a7dc98c0b6d..3e5b9d62136b 100644 --- a/python/pyspark/streaming/__init__.py +++ b/python/pyspark/streaming/__init__.py @@ -15,6 +15,10 @@ # limitations under the License. # +""" +.. classic:: true +""" + from pyspark.streaming.context import StreamingContext from pyspark.streaming.dstream import DStream from pyspark.streaming.listener import StreamingListener diff --git a/python/pyspark/testing/mllibutils.py b/python/pyspark/testing/mllibutils.py index 3edb7f03088d..34d576305c98 100644 --- a/python/pyspark/testing/mllibutils.py +++ b/python/pyspark/testing/mllibutils.py @@ -15,6 +15,10 @@ # limitations under the License. # +""" +.. classic:: true +""" + import unittest from pyspark import SparkContext