Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added to_kafka directly from a Dask worker #279

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jsmaupin
Copy link
Contributor

@jsmaupin jsmaupin commented Oct 9, 2019

Currently, we must gather() all the results from a Dask stream back to the master script and then push the results to Kafka. This removes all the benefits of parallel processing we get with Dask and Kafka. It would be much more efficient if we could push data directly from the Dask workers into Kafka.

One issue I had getting this to work is that the Producer class from the Confluent Kafka Python library is not pickle-able. The workaround is to hide the Producer from the pickle function by creating it using "reflection" methods and create it on the worker side. However, I believe this adds a requirement that the confluent_kafka library must be installed on the worker.

Also, this implementation is serial, but Dask itself is parallel.

@codecov
Copy link

codecov bot commented Oct 9, 2019

Codecov Report

Merging #279 into master will decrease coverage by 1.07%.
The diff coverage is 26.92%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #279      +/-   ##
==========================================
- Coverage   94.69%   93.61%   -1.08%     
==========================================
  Files          13       13              
  Lines        1620     1644      +24     
==========================================
+ Hits         1534     1539       +5     
- Misses         86      105      +19
Impacted Files Coverage Δ
streamz/dask.py 85.49% <26.92%> (-14.51%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c6719c0...4860aee. Read the comment docs.

@jsmaupin
Copy link
Contributor Author

jsmaupin commented Oct 9, 2019

I'll add tests to get coverage now.

@chinmaychandak
Copy link
Contributor

@jsmaupin, did you get a chance to write some tests?

@martindurant, @CJ-Wright, Any thoughts on this?

@jsmaupin
Copy link
Contributor Author

@chinmaychandak I suspect this can be done with the existing to_kafka method. We just need to figure out the difference between this implementation and the existing one and add an if dask: functionality. Would you have time to take a look at this?

@chinmaychandak
Copy link
Contributor

Okay, let me take a look

@chinmaychandak
Copy link
Contributor

chinmaychandak commented Nov 22, 2019

We just need to figure out the difference between this implementation and the existing one and add an if dask: functionality.

@jsmaupin The if dask: implementation worked in from_kafka_batched since this was the first node wherein we needed to scatter the stream. In this case, however, the input would be a DaskStream.

Hence, I am thinking that this current implementation of yours would be the best way out?


client = default_client()
result = client.submit(produce, self.topic, x, self.producer_config)
self._emit(result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this call self.emit? That way it integrates with the async support?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Anything else that comes to mind that needs change, @CJ-Wright?

@chinmaychandak
Copy link
Contributor

@martindurant Could you also please take a look at this?

@jsmaupin
Copy link
Contributor Author

Circling back to this. After becoming more familiar with everything here, I'm thinking that this should support the back pressure like the existing to_kafka function does.

@martindurant
Copy link
Member

This PR got left behind. Does anyone remember the status here?

@jsmaupin
Copy link
Contributor Author

I proposed a solution here: https://stackoverflow.com/questions/60764361/write-to-kafka-from-a-dask-worker that I felt was a bit of a hack. I haven't found a better solution. I would be happy to follow through with this implementation if there are no objections.

@martindurant
Copy link
Member

I think that approach is totally fine; but maybe I would improve it by having a dict of producers, with the key being a hash of the connection kwargs, because you could have different kafka jobs live in a cluster at the same time.

Also, the attribute could as easily be a global variable in a module - especially if it's mutable like the dict I'm suggestign above. This seems cleaner to my mind, but I can't think of any technical reason that it's different (there is only one worker instance).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants