diff --git a/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py b/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py index 04e1f610047cc..868481f946ff5 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py @@ -30,53 +30,15 @@ import unittest import uuid +from pyspark.sql.tests.streaming.kafka_utils import KafkaUtils from pyspark.testing.sqlutils import ReusedSQLTestCase, search_jar, read_classpath - -# Setup Kafka JAR on classpath before SparkSession is created -# This follows the same pattern as streamingutils.py for Kinesis -kafka_sql_jar = search_jar( - "connector/kafka-0-10-sql", - "spark-sql-kafka-0-10_", - "spark-sql-kafka-0-10_", - return_first=True, +from pyspark.testing.utils import ( + have_kafka, + have_testcontainers, + kafka_requirement_message, + testcontainers_requirement_message, ) -if kafka_sql_jar is None: - raise RuntimeError( - "Kafka SQL connector JAR was not found. " - "To run these tests, you need to build Spark with " - "'build/mvn package' or 'build/sbt Test/package' " - "before running this test." - ) - -# Read the full classpath including all dependencies -# This works for both Maven builds (reads classpath.txt) and SBT builds (queries SBT) -# Define the project name mapping for SBT builds -kafka_project_name_map = { - "connector/kafka-0-10-sql": "sql-kafka-0-10", -} -kafka_classpath = read_classpath("connector/kafka-0-10-sql", kafka_project_name_map) -all_jars = f"{kafka_sql_jar},{kafka_classpath}" - -# Add Kafka JAR to PYSPARK_SUBMIT_ARGS before SparkSession is created -existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell") -jars_args = "--jars %s" % all_jars - -os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args]) - -from pyspark.sql.tests.streaming.kafka_utils import KafkaUtils - - -# Check if required Python dependencies are available -try: - import testcontainers # noqa: F401 - import kafka # noqa: F401 -except ImportError as e: - raise ImportError( - "Kafka test dependencies not available. " - "Install with: pip install testcontainers[kafka] kafka-python" - ) from e - class StreamingKafkaTestsMixin: """ @@ -87,12 +49,52 @@ class StreamingKafkaTestsMixin: @classmethod def setUpClass(cls): super().setUpClass() + + # Setup Kafka JAR on classpath before SparkSession is created + # This follows the same pattern as streamingutils.py for Kinesis + kafka_sql_jar = search_jar( + "connector/kafka-0-10-sql", + "spark-sql-kafka-0-10_", + "spark-sql-kafka-0-10_", + return_first=True, + ) + + if kafka_sql_jar is None: + raise RuntimeError( + "Kafka SQL connector JAR was not found. " + "To run these tests, you need to build Spark with " + "'build/mvn package' or 'build/sbt Test/package' " + "before running this test." + ) + + # Read the full classpath including all dependencies + # This works for both Maven builds (reads classpath.txt) and SBT builds (queries SBT) + # Define the project name mapping for SBT builds + kafka_project_name_map = { + "connector/kafka-0-10-sql": "sql-kafka-0-10", + } + kafka_classpath = read_classpath("connector/kafka-0-10-sql", kafka_project_name_map) + all_jars = f"{kafka_sql_jar},{kafka_classpath}" + + # Add Kafka JAR to PYSPARK_SUBMIT_ARGS before SparkSession is created + cls.original_pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS") + + if cls.original_pyspark_submit_args is None: + pyspark_submit_args = "pyspark-shell" + else: + pyspark_submit_args = cls.original_pyspark_submit_args + jars_args = "--jars %s" % all_jars + + os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, pyspark_submit_args]) + # Start Kafka container - this may take 10-30 seconds on first run cls.kafka_utils = KafkaUtils() cls.kafka_utils.setup() @classmethod def tearDownClass(cls): + os.environ["PYSPARK_SUBMIT_ARGS"] = cls.original_pyspark_submit_args + # Stop Kafka container and clean up resources if hasattr(cls, "kafka_utils"): cls.kafka_utils.teardown() @@ -122,17 +124,14 @@ def _is_docker_available(): return False +@unittest.skipIf(not have_kafka, kafka_requirement_message) +@unittest.skipIf(not have_testcontainers, testcontainers_requirement_message) +@unittest.skipIf(not _is_docker_available(), "Docker is not available") class StreamingKafkaTests(StreamingKafkaTestsMixin, ReusedSQLTestCase): """ Tests for Kafka streaming integration with PySpark. """ - @classmethod - def setUpClass(cls): - if not _is_docker_available(): - raise unittest.SkipTest("Docker is not available") - super().setUpClass() - def test_streaming_stateless(self): """ Test stateless rtm query with earliest offset. @@ -177,10 +176,6 @@ def test_streaming_stateless(self): if __name__ == "__main__": - try: - import xmlrunner + from pyspark.testing import main - testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) - except ImportError: - testRunner = None - unittest.main(testRunner=testRunner, verbosity=2) + main() diff --git a/python/pyspark/testing/utils.py b/python/pyspark/testing/utils.py index 504dd5b4aad9b..83009e22fb140 100644 --- a/python/pyspark/testing/utils.py +++ b/python/pyspark/testing/utils.py @@ -103,6 +103,14 @@ def have_package(name: str) -> bool: have_zstandard = have_package("zstandard") zstandard_requirement_message = "" if have_zstandard else "No module named 'zstandard'" +have_kafka = have_package("kafka") +kafka_requirement_message = "" if have_kafka else "No module named 'kafka'" + +have_testcontainers = have_package("testcontainers") +testcontainers_requirement_message = ( + "" if have_testcontainers else "No module named 'testcontainers'" +) + googleapis_common_protos_requirement_message = ""