Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Networking Issues: Zeppelin cannot fetch dependencies #3

Open
gregbrowndev opened this issue Feb 20, 2020 · 5 comments
Open

Networking Issues: Zeppelin cannot fetch dependencies #3

gregbrowndev opened this issue Feb 20, 2020 · 5 comments

Comments

@gregbrowndev
Copy link

Hi, and thank you for this repo it is incredibly useful!

However, I'm having problems running your notebook. I'm not really sure what exactly the problem is (so sorry from the ramblings below) as I've tried a number of different things and getting different issues even after reverting all changes back to your exact code.

Initially, upon executing the notebook, everything apart from the consumer seemed to work. I could see messages being written to the Kafka topic in Grafana. However, the consumer complained that it didn't have the Kafka dependency available.

I found that the Kafka JAR hadn't been downloaded, as indicated in the image below of Zeppelin's interpreter page:
image

I couldn't even wget the JAR from inside the Zeppelin container (I believe the request timedout). However, now (after reverting my changes) it seems I can wget in the container, but Zeppelin is still unable to fetch them itself.

Putting aside the network issue, I've tried to manually add the dependency using:

%sh
mkdir /zeppelin/dep
cd /zeppelin/dep && wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8_2.11/2.0.2/spark-streaming-kafka-0-8_2.11-2.0.2.jar

%producer.dep
z.reset()
z.load("/zeppelin/dep/spark-streaming-kafka-0-8_2.11-2.0.2.jar")

%consumer.dep
z.reset()
z.load("/zeppelin/dep/spark-streaming-kafka-0-8_2.11-2.0.2.jar")

but now the producer times out when connecting to Kafka:

KafkaTimeoutErrorTraceback (most recent call last)
<ipython-input-6-49d1d1bf1849> in <module>()
     20         topic=KAFKA_TOPIC,
     21         key=str(row_dict["_c0"]).encode("utf-8"),
---> 22         value=json.dumps(row_dict).encode("utf-8"))
     23 
     24     try:

/opt/conda/lib/python2.7/site-packages/kafka/producer/kafka.pyc in send(self, topic, value, key, headers, partition, timestamp_ms)
    562         key_bytes = value_bytes = None
    563         try:
--> 564             self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
    565 
    566             key_bytes = self._serialize(

/opt/conda/lib/python2.7/site-packages/kafka/producer/kafka.pyc in _wait_on_metadata(self, topic, max_wait)
    689             if not metadata_event.is_set():
    690                 raise Errors.KafkaTimeoutError(
--> 691                     "Failed to update metadata after %.1f secs." % (max_wait,))
    692             elif topic in self._metadata.unauthorized_topics:
    693                 raise Errors.TopicAuthorizationFailedError(topic)

KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

This is all on a fresh set of containers (docker-compose down && docker-compose up -d).

Any help would be greatly appreciated!

@gregbrowndev
Copy link
Author

I think the Kafka servers may have gotten in a twist due to excessively bringing them up/down. Deleting the kafka/kafka_zookeeper Docker volumes seems to have fixed the KafkaTimeoutError raised by the producer but now I'm getting an error again with the consumer:

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/zeppelin/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 883, in send_command
    response = connection.send_command(command)
  File "/zeppelin/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1040, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
Py4JNetworkError: Error while receiving

Py4JErrorTraceback (most recent call last)
<ipython-input-8-63c862b08c08> in <module>()
     17                             ssc,
     18                             [REDDIT_TOPIC],
---> 19                             {"metadata.broker.list": KAFKA_BROKERS})
     20 
     21 stream = stream.map(lambda x: json.loads(x[1]))

/zeppelin/interpreter/spark/pyspark/pyspark.zip/pyspark/streaming/kafka.py in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, valueDecoder, messageHandler)
    128             func = funcWithoutMessageHandler
    129             jstream = helper.createDirectStreamWithoutMessageHandler(
--> 130                 ssc._jssc, kafkaParams, set(topics), jfromOffsets)
    131         else:
    132             ser = AutoBatchedSerializer(PickleSerializer())

/zeppelin/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/zeppelin/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/zeppelin/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    325             raise Py4JError(
    326                 "An error occurred while calling {0}{1}{2}".
--> 327                 format(target_id, ".", name))
    328     else:
    329         type = answer[1]

Py4JError: An error occurred while calling o66.createDirectStreamWithoutMessageHandler

Note this is using my code above to add the JAR to the consumer.dep interpreter.

I've seen some other people with what looks like the same problem (see https://stackoverflow.com/questions/58975545/azure-databricks-kafkautils-createdirectstream-causes-py4jnetworkerroranswer) and they've suggested upgrading the JAR. I've tried editing the Zeppelin interpreters to use the newer "spark-streaming-kafka-0-8_2.11-2.4.5.jar" but this results in the same set of issues.

@dotdothu
Copy link

dotdothu commented Mar 9, 2020

I am facing with the same issues. Have you resolved it @gregbrowndev yet?

@dotdothu
Copy link

Using the assembly solved the issue:

%sh
cd /zeppelin/dep && wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.0.2/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar
%consumer.dep
z.reset()
z.load("/zeppelin/dep/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar")

@gregbrowndev
Copy link
Author

@dotdothu sorry for the late reply! Glad you managed to fix it.

I added the Jar manually, just like you did, but never quite got it completely working. I still had issues with the consumer: "Py4JError: An error occurred while calling o66.createDirectStreamWithoutMessageHandler"

@puneethrai
Copy link

@gregbrowndev I added your library files to spark dep. below is my entire operation

%sh
mkdir /zeppelin/dep
cd /zeppelin/dep && wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.0.2/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar
cd /zeppelin/dep && wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8_2.11/2.0.2/spark-streaming-kafka-0-8_2.11-2.0.2.jar
cp /zeppelin/dep/spark-streaming-kafka-0-8_2.11-2.0.2.jar /zeppelin/interpreter/spark/dep
cp /zeppelin/dep/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar /zeppelin/interpreter/spark/dep

and

%producer.dep
z.reset()
z.load("/zeppelin/dep/spark-streaming-kafka-0-8_2.11-2.0.2.jar")

%consumer.dep
z.reset()
z.load("/zeppelin/dep/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants