-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stdout/stderr pause/resume #53
Comments
@vietj I haven't tried it, but this should work... If you simply don't read any data when you are called back, the pipe will fill-up and the child process itself will block. You would need to keep track of the fact that you have pending reads, and read the ByteBuffer to create room in the pipe. That will unblock the child process. Let me know if it works. |
ok, you mean I leave the byte buffer untouched. I will let you know. |
so that would work to pause the stream, however how do I unpause it and request data again ? i.e how do I get a new callback to drain the pipe ? |
You won't get called back again. Presumably you somehow know when you want to pause and resume, so your callback will need to retain a reference to the ByteBuffer and you will need some other thread or timer to drain the buffer. It's not exactly clear to me why vertx would need to suspend a stream in the first place... |
I see, that sounds doable and I will try. Vert.x would do that for propagating flow control to another stream : you can read that short part of the doc http://vertx.io/docs/vertx-core/java/#streams |
Hi, I'm looking further at this and the javadoc says You do not own the ByteBuffer provided to you. You should not retain a reference to this buffer. which seems contradictory with retaining a reference to the ByteBuffer. |
@vietj Opps, I apologize, the JavaDoc is out of date. It used to be that buffers were reused across processes, but after refactors by @bhamiltoncx this is no longer true. Each process instance has its own stdout, stderr, and stdin ByteBuffer instances. |
@brettwooldridge Does NuProcess have any plan to implement Reactive Streams? |
@brettwooldridge how can we ensure that the buffer is not modified concurrently by NuProcess eventLoop and a possible drain by another thread ? |
@brettwooldridge also now when I access the buffer outside NuProcess event loop I get buffer.remaining() == 0 (the buffer is full) which means the buffer was flipped so when using outside should we flip again this buffer ? |
@pfxuan This is the first that I've heard of the Reactive Streams initiative, but NuProcess would certainly welcome any contribution. If someone were to make such an effort, I would recommend the implementation go into a |
@vietj Can you post your |
@brettwooldridge Sounds a good plan! If @vietj can do this, the integration is going to become very natural: http://vertx.io/docs/vertx-reactive-streams/java/ |
@brettwooldridge here is the handler https://github.com/vietj/vertx-childprocess/blob/stdout-control-flow/src/main/java/io/vertx/ext/childprocess/impl/ChildProcessImpl.java here is a test https://github.com/vietj/vertx-childprocess/blob/stdout-control-flow/src/test/java/io/vertx/ext/childprocess/SpawnTest.java#L214 if you run this test (on osx at least), after resume the buffer size is 0. |
@pfxuan that's something possible, however it would be best to come first with a solution that works with the current api :-) (unless I'm using not well the current API). |
@vietj Agree! I'm looking forward to seeing the first prototype. This integration is very useful. |
@vietj Ok, looking at the code, this is going to be a little tricky. NuProcess is going to compact the buffer after calling your handler. The contract is that the handler should read as much as it can. If you want to create back-pressure, to block the spawned process from writing more data, you need to let that buffer fill-up. Which you seem to be doing. If you flip the buffer, the back-pressure goes away, so don't do that. If your handler is called, and remaining != 0, it is not safe to interact with the ByteBuffer from another thread. Once your handler is called with remaining == 0, it should be safe to interact with the ByteBuffer. Note, once remaining == 0, you may not ever get called again (but you may be called multiple times with remaining == 0). My concern now is, without a change in NuProcess, the NuProcess thread may spin -- consuming lots of CPU. |
@vietj The more I think about it, the more I think NuProcess will need to support a |
@brettwooldridge indeed when using the ByteBuffer outside of NuProcess event loop sounds like an abuse and pause/resume should be supported natively. Whenever you come up with such feature, I can contribute a reactive-streams implementation on top of it. WDYT ? |
I'm reviewing the specifications now. I'll let you know shortly. |
@vietj @pfxuan I will implement the Reactive Streams specification. Please allow a week or so for me to cleanly refactor some of the internals. @bhamiltoncx As I get closer, I may ask for your input. In particular, NuProcess does not elegantly implement back-pressure -- though I have a reasonably well-formed idea how to do it. What is missing from NuProcess is something that more closely mirrors the epoll/kqueue model that it is wrapping -- ie. while there is This is the part that will change. I will attempt to do so in the least intrusive way possible to the API. |
@brettwooldridge I'm so excited for receiving your direct support! |
Yeah, happy to help brainstorm this. I think we'll want to generalize and extend the On Fri, Nov 6, 2015 at 8:01 AM Pengfei Xuan [email protected]
|
I have completed the initial implementation of Reactive Streams in a branch called @pfxuan @vietj Would you mind creating some junit tests for @bhamiltoncx There is a "substantial" (not really) change in the core API. Oh, and the |
@brettwooldridge good job I will have a look soon. I see that the implementation itself does not need reactive-streams lib out of the box (i.e one can use it without the reactive-streams lib) : would you mind to declare the reactive-streams library as optional=true ? |
No problem on the optional dependency. |
@brettwooldridge So awesome! I'll look at it and see if I can do something. |
@brettwooldridge I can not find the implementation on specification of 3.10 in your |
@pfxuan I pushed a minor change to |
@brettwooldridge Got it. Do you know how to setup the number of elements on |
@pfxuan Oh, maybe I missed it, but the user (you) does not construct a Construction goes something like this: NuProcessBuilder builder = new NuProcessBuilder(commands);
NuStreamProcessBuilder streamBuilder = new NuStreamProcessBuilder(builder);
NuStreamProcess process = streamBuilder.start();
NuStreamPublisher stdoutPublisher = process.getStdoutPublisher();
NaiveSubscriber subscriber = new NaiveSubscriber();
stdoutPublisher.subscribe(subscriber);
...
process.waitFor(0, TimeUnit.Seconds); // wait until the process exists NaiveSubscriber: public class NaiveSubscriber implements Subscriber<ByteBuffer> {
private Executor executor;
private Subscription subscription;
public NaiveSubscriber() {
executor = Executors.newSingleThreadedExecutor();
}
public onSubscribe(Subscription sub) {
subscription = sub;
executor( () -> { subscription.request(1) } );
}
public void onNext(ByteBuffer buffer) {
// consume buffer
executor( () -> { subscription.request(1) } );
}
... |
@brettwooldridge How does NuProcess handle back-pressure? Say the command 'cat in.txt' is able to produce 100MB/s textural traffic, but my NaiveSubscriber only can accept/process 30MB/s textual data. Can NuProcess pause or stall the "cat" process internally when receiving a back-pressure signal from Subscriber? |
@pfxuan Another minor set of commits to streams. Also, minor edits to the above example code (the no-arg With respect to a unit test with a bounded number of published elements, consider something like the |
@brettwooldridge Yeah! This is a hot feature I need now. |
@pfxuan Yes, there is a pipe (operating system pipe) between the process and NuProcess. This OS pipe has limited capacity, typically a few kilobytes. When the process (for example |
to ease implementation of subscriber, wouldn't it make sense to configure the NuProcess to have buffers of a fixed (configurable) size ? |
@vietj It may make sense, but it also adds additional configuration and testing vectors that I would rather not have unless absolutely necessary. I tend to lean toward a very small API surface area, and well published capabilities and limitations. Most programmers are sophisticated enough to work within constraints if they are aware of them. Note that |
@brettwooldridge I see. Under the hood, NuProcess still uses Unix Pipes to do interprocess communication. This is why you haven't implemented Windows version yet. Is that correct? |
@pfxuan Mostly correct. Windows works fine (pre- However, in order to support back-pressure, I had to change how NuProcess used epoll and kqueue slightly. Windows can do it as well, I just haven't taken the time to change it yet. It is probably about 4-8 hours of work, and I'll probably get it done this week. |
@pfxuan Just wanted to let you know, if something is not working like it seems it should, there is not necessarily something wrong with your unit test. Because the |
@pfxuan reactive-streams has a TCK : https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck/src/main/java/org/reactivestreams/tck perhaps it can be implemented in this project. I know it is implemented in vertx-reactive-streams project. |
@brettwooldridge Got it! I'll let you know when I find the problem. |
I will update soon the vertx-childprocess project to use it and provide feedback - it will consume mainly the original api as Vert.x is based on pause/resume backpressure system and not request system. |
@brettwooldridge @vietj I integrated junit tests with the TCK test to get a further verification on You can review my changes at pfxuan@d9adac6. |
@pfxuan Thanks for the start. Note that the requirements of the TCK framework itself are different than the Specification per se. The specification does not require that a Publisher accept a number of elements during construction. However, the TCK requires that a Publisher used within the TCK supports that, to simplify testing by the TCK harness. I have modified the TckFiniteStdoutPublisherTest class to contain a publisher that is useful for use within the TCK. It accepts a number of elements during construction, and proxies to the NuProcess publisher. Note that the tests are still failing, at least on OS X (I have not tried the TCK on Linux yet). The tests have uncovered further errors on OS X that will need to be fixed before they can pass. |
@brettwooldridge Your modification is the way better than my original version. Especially, the using of proxy pattern made everything so clear. I was trying to write a self-contained test, but didn't get it. Thanks for pointing it out! In addition, I have tried the modified and original tests on OS X and Linux and summarized the failed cases as followings:
Any thoughts and suggestions for the next moving? |
@pfxuan Until I get a chance to revisit some of the NuProcess internals, there may not be much to be done. It will take a day or three. |
@brettwooldridge Take your time and feel free to assign a task to me if you need any help. |
@pfxuan All tests are now passing on Linux and OS X. |
@brettwooldridge it's super awesome!!! I'll test it and let you know if there is a problem. |
@brettwooldridge I want to do blackbox and whitebox verifications on NuProcess's subscriber. But I found STDIN is actually working as publisher. It seems STDIN should work as subscriber rather than publisher, right? void setSubscriber(final Stream stream, final Subscriber<? super ByteBuffer> subscriber)
{
switch (stream)
{
case STDIN:
stdinSubscriber = subscriber;
stdinRequests.set(0);
break;
case STDOUT:
stdoutSubscriber = subscriber;
stdoutRequests.set(0);
break;
case STDERR:
stderrSubscriber = subscriber;
stderrRequests.set(0);
break;
}
} |
@brettwooldridge current I see a bug with stderr stream that prevents the stderr close and exit to be called on OSX, here is a reproducer:
The EchoStderr is:
Exit is never called, nor a callback with closed=true. This does not happen with stdout. |
I think this issue has conflated both reactive streams support and the support for back pressure. I'd like to draw it back to its original discussion around just providing support for back pressure and let #71 deal with reactive streams. My view is that it'd be great to have just have a |
Another (perhaps better) solution would be to improve NuProcess so that it waits up to 1 second (say) before calling the stdout/err callbacks again. Meanwhile, if |
There is no way to pause/resume the standard output/error when the NuProcessHandler cannot handle the buffers.
It would be a useful addition for the Vert.x 3 integration https://github.com/vietj/vertx-childprocess
Are there plans to implement such feature ?
The text was updated successfully, but these errors were encountered: