Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,23 @@
Information about a barrier task.
- :class:`InheritableThread`:
A inheritable thread to use in Spark when the pinned thread mode is on.

Spark Connect compatibility annotations
=======================================

The following RST directives annotate PySpark modules, classes, and methods with
their Spark Connect compatibility status:

- ``.. classic:: true`` -- the API is only available in Classic Spark (not Spark Connect).
- ``.. connect:: true`` -- the API is available in Spark Connect.
- ``.. connect_migration:: <message>`` -- migration guidance for users transitioning
from Classic Spark to Spark Connect.

Annotations are resolved by inheriting from the nearest annotated ancestor. A child
annotation overrides the parent's.

.. classic:: true
.. connect:: true
"""

import sys
Expand Down
7 changes: 7 additions & 0 deletions python/pyspark/accumulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
# limitations under the License.
#

"""
.. classic:: true

.. connect_migration:: Use `df.observe(name, *exprs)` to collect named metrics during
query execution.
"""

import os
import sys
import select
Expand Down
7 changes: 7 additions & 0 deletions python/pyspark/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
# limitations under the License.
#

"""
.. classic:: true

.. connect_migration:: Read Spark SQL configuration values using `spark.conf.get(key)`
and write them using `spark.conf.set(key, value)`.
"""

__all__ = ["SparkConf"]

import sys
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
.. classic:: true
"""
31 changes: 31 additions & 0 deletions python/pyspark/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,9 @@ def setLogLevel(self, logLevel: str) -> None:
Examples
--------
>>> sc.setLogLevel("WARN") # doctest :+SKIP

.. connect_migration:: Replace sc.setLogLevel(level) with
spark.conf.set("spark.log.level", level)
"""
self._jsc.setLogLevel(logLevel)

Expand Down Expand Up @@ -630,6 +633,8 @@ def applicationId(self) -> str:
--------
>>> sc.applicationId # doctest: +ELLIPSIS
'local-...'

.. connect_migration:: Replace spark.sparkContext.applicationId with spark.conf.get("spark.app.id")
"""
return self._jsc.sc().applicationId()

Expand Down Expand Up @@ -675,6 +680,10 @@ def defaultParallelism(self) -> int:
--------
>>> sc.defaultParallelism > 0
True

.. connect_migration:: Replace spark.sparkContext.defaultParallelism with
int(spark.conf.get("spark.default.parallelism")) if set, otherwise
the default depends on the cluster executor configuration.
"""
return self._jsc.sc().defaultParallelism()

Expand Down Expand Up @@ -734,6 +743,9 @@ def emptyRDD(self) -> RDD[Any]:
EmptyRDD...
>>> sc.emptyRDD().count()
0

.. connect_migration:: Replace sc.emptyRDD with an empty list. When used with
createDataFrame: spark.createDataFrame([], schema)
"""
return RDD(self._jsc.emptyRDD(), self, NoOpSerializer())

Expand Down Expand Up @@ -828,6 +840,9 @@ def parallelize(self, c: Iterable[T], numSlices: Optional[int] = None) -> RDD[T]
>>> strings = ["a", "b", "c"]
>>> sc.parallelize(strings, 2).glom().collect()
[['a'], ['b', 'c']]

.. connect_migration:: Replace sc.parallelize(data) with the Python collection directly.
When used with createDataFrame: spark.createDataFrame(data, schema)
"""
numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism
if isinstance(c, range):
Expand Down Expand Up @@ -2212,6 +2227,11 @@ def setJobGroup(self, groupId: str, description: str, interruptOnCancel: bool =
>>> suppress = lock.acquire()
>>> print(result)
Cancelled

.. connect_migration:: Replace sc.setJobGroup(groupId, desc) with
spark.addTag(groupId) for grouping and cancellation via
spark.interruptTag(groupId). Job description and interruptOnCancel
are not directly supported in Spark Connect.
"""
self._jsc.setJobGroup(groupId, description, interruptOnCancel)

Expand Down Expand Up @@ -2410,6 +2430,11 @@ def setLocalProperty(self, key: str, value: str) -> None:
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance.

.. connect_migration:: Replace spark.sparkContext.setLocalProperty(key, value) with
spark.conf.set(key, value) for SQL configuration keys. Note that
spark.conf.set is session-global; thread-local isolation is not
supported in Spark Connect.
"""
self._jsc.setLocalProperty(key, value)

Expand Down Expand Up @@ -2441,6 +2466,9 @@ def setJobDescription(self, value: str) -> None:
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance.

.. connect_migration:: Replace sc.setJobDescription(desc) with
spark.conf.set("spark.job.description", desc)
"""
self._jsc.setJobDescription(value)

Expand Down Expand Up @@ -2610,6 +2638,9 @@ def getConf(self) -> SparkConf:
"""Return a copy of this SparkContext's configuration :class:`SparkConf`.

.. versionadded:: 2.1.0

.. connect_migration:: Replace sc.getConf() with spark.conf. For a specific key use
spark.conf.get(key)
"""
conf = SparkConf()
conf.setAll(self._conf.getAll())
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/core/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,9 @@ def map(self: "RDD[T]", f: Callable[[T], U], preservesPartitioning: bool = False
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]

.. connect_migration:: Replace rdd.map() with DataFrame operations. Use
df.withColumn(), df.select() with a UDF, or a pandas UDF instead.
"""

def func(_: int, iterator: Iterable[T]) -> Iterable[U]:
Expand Down Expand Up @@ -697,6 +700,9 @@ def mapPartitions(
...
>>> rdd.mapPartitions(f).collect()
[3, 7]

.. connect_migration:: Replace rdd.mapPartitions() with a pandas UDF using
applyInPandas.
"""

def func(_: int, iterator: Iterable[T]) -> Iterable[U]:
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/errors/exceptions/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
.. connect:: true
"""

import grpc
import json
from grpc import StatusCode
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
# limitations under the License.
#

"""
.. classic:: true
"""

import atexit
import os
import signal
Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,10 @@ class LDAModel(JavaModel, _LDAParams):
including local and distributed data structures.

.. versionadded:: 2.0.0

.. classic:: true

.. connect_migration:: LDA model family is not supported in Spark Connect.
"""

@since("3.0.0")
Expand Down Expand Up @@ -1530,6 +1534,10 @@ class DistributedLDAModel(LDAModel, JavaMLReadable["DistributedLDAModel"], JavaM
for each training document.

.. versionadded:: 2.0.0

.. classic:: true

.. connect_migration:: LDA model family is not supported in Spark Connect.
"""

@functools.cache
Expand Down Expand Up @@ -1608,6 +1616,10 @@ class LocalLDAModel(LDAModel, JavaMLReadable["LocalLDAModel"], JavaMLWritable):
This model stores the inferred topics only; it does not store info about the training dataset.

.. versionadded:: 2.0.0

.. classic:: true

.. connect_migration:: LDA model family is not supported in Spark Connect.
"""

pass
Expand Down Expand Up @@ -1682,6 +1694,10 @@ class LDA(JavaEstimator[LDAModel], _LDAParams, JavaMLReadable["LDA"], JavaMLWrit
>>> sameLocalModel = LocalLDAModel.load(local_model_path)
>>> model.transform(df).take(1) == sameLocalModel.transform(df).take(1)
True

.. classic:: true

.. connect_migration:: LDA model family is not supported in Spark Connect.
"""

_input_kwargs: Dict[str, Any]
Expand Down
6 changes: 5 additions & 1 deletion python/pyspark/ml/connect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
# limitations under the License.
#

"""Spark Connect Python Client - ML module"""
"""
Spark Connect Python Client - ML module

.. connect:: true
"""

from pyspark.sql.connect.utils import check_dependencies

Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/ml/deepspeed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
.. classic:: true

.. connect_migration:: Migrate to pyspark.ml.connect
"""
6 changes: 6 additions & 0 deletions python/pyspark/ml/torch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
.. classic:: true

.. connect_migration:: Migrate to pyspark.ml.connect
"""
4 changes: 4 additions & 0 deletions python/pyspark/ml/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
# limitations under the License.
#

"""
.. classic:: true
"""

from abc import ABCMeta, abstractmethod
from typing import Any, Generic, Optional, List, Type, TypeVar, TYPE_CHECKING

Expand Down
7 changes: 7 additions & 0 deletions python/pyspark/mllib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@

The `pyspark.mllib` package is in maintenance mode as of the Spark 2.0.0 release to encourage
migration to the DataFrame-based APIs under the `pyspark.ml` package.

.. classic:: true

.. connect_migration:: Migrate to pyspark.ml.connect where supported (classification,
evaluation, feature, pipeline, summarizer, tuning). Modules without a Connect
equivalent (clustering, fpm, linalg, random, recommendation, regression, stat,
tree) have no direct replacement.
"""

# MLlib currently needs NumPy 1.4+, so complain if lower
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
# limitations under the License.
#

"""
.. classic:: true
"""

from typing import (
Any,
Callable,
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/rddsampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
# limitations under the License.
#

"""
.. classic:: true
"""

import sys
import random
import math
Expand Down
6 changes: 5 additions & 1 deletion python/pyspark/sql/classic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@
# limitations under the License.
#

"""Spark Classic specific"""
"""
Spark Classic specific

.. classic:: true
"""
6 changes: 5 additions & 1 deletion python/pyspark/sql/connect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
# limitations under the License.
#

"""Spark Connect client"""
"""
Spark Connect client

.. connect:: true
"""

from pyspark.sql.connect.utils import check_dependencies

Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
# limitations under the License.
#

"""
.. classic:: true

.. connect_migration:: Use SparkSession instead.
"""

import sys
import warnings
from typing import (
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ def rdd(self) -> "RDD[Row]":
>>> df = spark.range(1)
>>> type(df.rdd)
<class 'pyspark.core.rdd.RDD'>

.. connect_migration:: Use DataFrame operations directly as RDD is not supported
in Spark Connect.
"""
...

Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
.. connect:: true
"""

import abc
import dataclasses
from typing import Optional, List, Tuple, Dict, Any, Union, TYPE_CHECKING, Sequence
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
.. classic:: true
"""

from abc import ABC, abstractmethod
from io import StringIO
import cProfile
Expand Down
Loading