Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 51 additions & 56 deletions python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,53 +30,15 @@
import unittest
import uuid

from pyspark.sql.tests.streaming.kafka_utils import KafkaUtils
from pyspark.testing.sqlutils import ReusedSQLTestCase, search_jar, read_classpath

# Setup Kafka JAR on classpath before SparkSession is created
# This follows the same pattern as streamingutils.py for Kinesis
kafka_sql_jar = search_jar(
"connector/kafka-0-10-sql",
"spark-sql-kafka-0-10_",
"spark-sql-kafka-0-10_",
return_first=True,
from pyspark.testing.utils import (
have_kafka,
have_testcontainers,
kafka_requirement_message,
testcontainers_requirement_message,
)

if kafka_sql_jar is None:
raise RuntimeError(
"Kafka SQL connector JAR was not found. "
"To run these tests, you need to build Spark with "
"'build/mvn package' or 'build/sbt Test/package' "
"before running this test."
)

# Read the full classpath including all dependencies
# This works for both Maven builds (reads classpath.txt) and SBT builds (queries SBT)
# Define the project name mapping for SBT builds
kafka_project_name_map = {
"connector/kafka-0-10-sql": "sql-kafka-0-10",
}
kafka_classpath = read_classpath("connector/kafka-0-10-sql", kafka_project_name_map)
all_jars = f"{kafka_sql_jar},{kafka_classpath}"

# Add Kafka JAR to PYSPARK_SUBMIT_ARGS before SparkSession is created
existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
jars_args = "--jars %s" % all_jars

os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])

from pyspark.sql.tests.streaming.kafka_utils import KafkaUtils


# Check if required Python dependencies are available
try:
import testcontainers # noqa: F401
import kafka # noqa: F401
except ImportError as e:
raise ImportError(
"Kafka test dependencies not available. "
"Install with: pip install testcontainers[kafka] kafka-python"
) from e


class StreamingKafkaTestsMixin:
"""
Expand All @@ -87,12 +49,52 @@ class StreamingKafkaTestsMixin:
@classmethod
def setUpClass(cls):
super().setUpClass()

# Setup Kafka JAR on classpath before SparkSession is created
# This follows the same pattern as streamingutils.py for Kinesis
kafka_sql_jar = search_jar(
"connector/kafka-0-10-sql",
"spark-sql-kafka-0-10_",
"spark-sql-kafka-0-10_",
return_first=True,
)

if kafka_sql_jar is None:
raise RuntimeError(
"Kafka SQL connector JAR was not found. "
"To run these tests, you need to build Spark with "
"'build/mvn package' or 'build/sbt Test/package' "
"before running this test."
)

# Read the full classpath including all dependencies
# This works for both Maven builds (reads classpath.txt) and SBT builds (queries SBT)
# Define the project name mapping for SBT builds
kafka_project_name_map = {
"connector/kafka-0-10-sql": "sql-kafka-0-10",
}
kafka_classpath = read_classpath("connector/kafka-0-10-sql", kafka_project_name_map)
all_jars = f"{kafka_sql_jar},{kafka_classpath}"

# Add Kafka JAR to PYSPARK_SUBMIT_ARGS before SparkSession is created
cls.original_pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")

if cls.original_pyspark_submit_args is None:
pyspark_submit_args = "pyspark-shell"
else:
pyspark_submit_args = cls.original_pyspark_submit_args
jars_args = "--jars %s" % all_jars

os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, pyspark_submit_args])

# Start Kafka container - this may take 10-30 seconds on first run
cls.kafka_utils = KafkaUtils()
cls.kafka_utils.setup()

@classmethod
def tearDownClass(cls):
os.environ["PYSPARK_SUBMIT_ARGS"] = cls.original_pyspark_submit_args

# Stop Kafka container and clean up resources
if hasattr(cls, "kafka_utils"):
cls.kafka_utils.teardown()
Expand Down Expand Up @@ -122,17 +124,14 @@ def _is_docker_available():
return False


@unittest.skipIf(not have_kafka, kafka_requirement_message)
@unittest.skipIf(not have_testcontainers, testcontainers_requirement_message)
@unittest.skipIf(not _is_docker_available(), "Docker is not available")
class StreamingKafkaTests(StreamingKafkaTestsMixin, ReusedSQLTestCase):
"""
Tests for Kafka streaming integration with PySpark.
"""

@classmethod
def setUpClass(cls):
if not _is_docker_available():
raise unittest.SkipTest("Docker is not available")
super().setUpClass()

def test_streaming_stateless(self):
"""
Test stateless rtm query with earliest offset.
Expand Down Expand Up @@ -177,10 +176,6 @@ def test_streaming_stateless(self):


if __name__ == "__main__":
try:
import xmlrunner
from pyspark.testing import main

testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
main()
8 changes: 8 additions & 0 deletions python/pyspark/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ def have_package(name: str) -> bool:
have_zstandard = have_package("zstandard")
zstandard_requirement_message = "" if have_zstandard else "No module named 'zstandard'"

have_kafka = have_package("kafka")
kafka_requirement_message = "" if have_kafka else "No module named 'kafka'"

have_testcontainers = have_package("testcontainers")
testcontainers_requirement_message = (
"" if have_testcontainers else "No module named 'testcontainers'"
)


googleapis_common_protos_requirement_message = ""

Expand Down