Skip to content

enm goes into infinite loop with pipeline tcp connection - when tcp service is not running. #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
harishdeshmukh opened this issue May 15, 2015 · 19 comments
Assignees

Comments

@harishdeshmukh
Copy link

enm goes into infinite loop with pipeline tcp connection - when tcp service is not running.

start() ->
    enm:start_link(),
    Url = "tcp://localhost:8789",
    {ok,Push} = enm:push([{connect,Url},list]),
    Send1 = "Hello, World!",
    io:format("pushing message \"~s\"~n", [Send1]),
    ok = enm:send(Push, Send1),
    Send2 = "Goodbye.",
    io:format("pushing message \"~s\"~n", [Send2]),
    ok = enm:send(Push, Send2),
    enm:close(Push),
    enm:stop().

Scenario:
a. Connect Url is tcp
b. The tcp service is not running
c. Run the following code. I just did

$ erl -pa ebin deps/*/ebin
1> c("src/pipeline.erl").
{ok,pipeline}
3> pipeline:start().
pushing message "Hello, World!"
pushing message "Goodbye."

d. Look at top - the erlang process is taking 100% CPU.
e. CentOS - Linux version 3.10.0-123.20.1.el7.x86_64 ([email protected])
f. gcc version 4.8.2 20140120

Is it a known issue? Please let me know if its a user error.

Thanks.

@vinoski
Copy link
Contributor

vinoski commented May 15, 2015

Thanks, will take a look.

@vinoski vinoski self-assigned this May 15, 2015
@harishdeshmukh
Copy link
Author

Here the rc from enm_do_send comes back as -1. This is the function sequence that repeats.

(gdb) where
#0  enm_ready_output (drv_data=0x7fae7bd0b338, event=0xa) at c_src/enm_drv.c:831
#1  0x00000000004c27eb in erts_port_task_execute ()
#2  0x00000000004ba2b4 in schedule ()
#3  0x0000000000572373 in process_main ()
#4  0x00000000004aa3ea in sched_thread_func ()
#5  0x0000000000606275 in thr_wrapper ()
#6  0x00007fae7cc15df5 in start_thread (arg=0x7fae73dbf700) at pthread_create.c:308
#7  0x00007fae7c73b1ad in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:113
(gdb) list
826     memset(&msghdr, 0, sizeof msghdr);
827     msghdr.msg_iov = (struct nn_iovec*)ev.iov;
828     msghdr.msg_iovlen = ev.vsize;
829     msghdr.msg_control = 0;
830     rc = enm_do_send(d, &msghdr);
831     if (rc > 0)
832         driver_deq(d->port, rc);
833     if (rc == total && d->b.writable) {
834         if (d->b.busy) {
835             d->b.busy = 0;
(gdb) n
833     if (rc == total && d->b.writable) {
(gdb) p rc
$14 = -1
(gdb) n
840 }

I've not done any erl port and new to erlang. But i think somehow erlport needs to be notified that operation failed.

static void
enm_ready_output(ErlDrvData drv_data, ErlDrvEvent event)
{
    EnmData* d = enm_sockets[(long)event];
    struct nn_msghdr msghdr;
    ErlIOVec ev;
    ErlDrvSizeT total;
    int rc;

    d->b.writable = 1;
    total = driver_peekqv(d->port, &ev);
    if (total == 0)
        return;
    memset(&msghdr, 0, sizeof msghdr);
    msghdr.msg_iov = (struct nn_iovec*)ev.iov;
    msghdr.msg_iovlen = ev.vsize;
    msghdr.msg_control = 0;
    rc = enm_do_send(d, &msghdr);
    if (rc > 0)
        driver_deq(d->port, rc);
    if (rc == total && d->b.writable) {
        if (d->b.busy) {
            d->b.busy = 0;
            set_busy_port(d->port, d->b.busy);
        }
        enm_write_select(d, 0);
    }
}

Anyway - you'll know it best. Please let me know if I can help.

@harishdeshmukh
Copy link
Author

adding driver_failure(d->port, errno); is making it crash as oppose to hang.

static void
enm_ready_output(ErlDrvData drv_data, ErlDrvEvent event)
{
    EnmData* d = enm_sockets[(long)event];
    struct nn_msghdr msghdr;
    ErlIOVec ev;
    ErlDrvSizeT total;
    int rc;

    d->b.writable = 1;
    total = driver_peekqv(d->port, &ev);
    if (total == 0)
        return;
    memset(&msghdr, 0, sizeof msghdr);
    msghdr.msg_iov = (struct nn_iovec*)ev.iov;
    msghdr.msg_iovlen = ev.vsize;
    msghdr.msg_control = 0;
    rc = enm_do_send(d, &msghdr);
    if (rc > 0)
        driver_deq(d->port, rc);
    if (rc == total && d->b.writable) {
        if (d->b.busy) {
            d->b.busy = 0;
            set_busy_port(d->port, d->b.busy);
        }
        enm_write_select(d, 0);
    }
    if(rc <0){
        driver_failure(d->port, errno);
    }
}

vinoski added a commit that referenced this issue May 16, 2015
Prevent a TCP socket that's not actually connected from causing the
driver enm_read_output function from trying repeatedly to send data
and failing. Close the socket and indicate connection eof to the
caller. Add a regression test for this case.
@vinoski
Copy link
Contributor

vinoski commented May 16, 2015

The problem with driver_failure is that it's too harsh for this situation. Please look at branch vinoski/fix7, which I think is a better fix, and try it out. @rzezeski can you have a look at that branch too and let me know your thoughts on this?

@harishdeshmukh
Copy link
Author

Thanks Steve. I also thought driver_failure was not harsh but was not sure about eof. Will let test it next week and let you know.

@harishdeshmukh
Copy link
Author

Hi Steve,

This fix works for us. We effectively get the closed socket effect and are going to retry with backoff.

Thanks a lot for help. When would you be able to push it to master?

Regards,
Harish.

@vinoski
Copy link
Contributor

vinoski commented May 19, 2015

Thanks for testing it. I'll need to get the change reviewed before it can go to master. I'll try to get that done as soon as possible.

vinoski added a commit that referenced this issue May 27, 2015
Fix issue #7: avoid spinning on send to unconnected socket
@supershal
Copy link

I wrote a small program to test this fix. It gives me {error, close} on second message even if the tcp server is listening properly. I tested the same code against older build(before the patch), it worked fine.
Please let me know if the test is doing anything wrong.

-module(enm_stress).

-export([stress/0]).

stress() ->
  enm:start_link(),
  Url = "tcp://127.0.0.1:9090",
  %Url = "inproc://pipeline",
  spawn(fun() -> acceptor(Url) end),
  timer:sleep(1000),
  spawn(fun() -> sender(Url) end).

acceptor(Url) ->
{ok, Socket} = enm:pull([{bind, Url}]),
io:format("accepting on socket ~p~n", [Socket]),
listen(Socket).

listen(Socket) ->
  receive
      {nnpull, Socket, <<"quit">>} ->
        io:format("quitting server~n"),
        enm:close(Socket);
      {nnpull, Socket, Message} ->
        io:format("~p : ~p~n", [erlang:now(), Message]),
        listen(Socket)
  after 10000 ->
        io:format("timeout. quitting server~n"),
        enm:close(Socket)
  end.

sender(Url) ->
 {ok, Client} = enm:push([{connect, Url}]),
 send(Client, 1000).

send(Socket, 0) ->
  io:format("Quitting client~n"),
  ok=enm:send(Socket, <<"quit">>),
  timer:sleep(2000),
  enm:close(Socket);
send(Socket, Tick) ->
  io:format("sending ~p ~n", [Tick]),
  ok=enm:send(Socket, integer_to_binary(Tick)),
  timer:sleep(5), % milliseconds 
  send(Socket, Tick-1).

@vinoski
Copy link
Contributor

vinoski commented May 30, 2015

@supershal thanks for reporting this. Sometimes when I run your test code, it runs correctly to completion, but other times it hits the same error you describe. I'll need to look into it some more.

@vinoski
Copy link
Contributor

vinoski commented May 31, 2015

I have a fix that preserves the behavior of the original change in this thread but that also avoids the problem @supershal has seen. The fix involves counting the number of EAGAIN spins we take through the enm_ready_output function, and if we reach 64 without having sent any data, a number I admittedly chose unscientifically, we close the socket. With this change the new test for @harishdeshmukh's issue still works and I've been able to run @supershal's test program for hundreds of iterations without failure. I'll commit the change soon. Given the nanomsg API, I don't know of another alternative that works here.

vinoski added a commit that referenced this issue May 31, 2015
The initial fix for issue 7 could result in sockets getting closed
prematurely. Unfortunately the nanomsg API limits how we can handle
this, so this fix uses a counter to determine if we're spinning in a
ready_output loop, closing the socket if it hits EAGAIN 64 times with
no intervening I/O.
@vinoski
Copy link
Contributor

vinoski commented May 31, 2015

@supershal can you try this fix and let me know if it works for you?

@supershal
Copy link

Thanks @vinoski for looking into it. I tested the fix using above stand alone module, it works fine. However, when I tried the fix in our application, I still see socket closed error even if the server is running. Our application traffic is coming in bursts over (1000 messages at once with a message size can be 5-8 KB) thats where the socket is closing prematurely. However I was unable to reproduce the same issue with a standalone test (sending more than 1000 message at once, without any delay) where message size is too small. Please allow me a day or two. let me write a reproducible test and play with EAGAIN count.

@vinoski
Copy link
Contributor

vinoski commented Jun 1, 2015

Thanks @supershal. I may have a better idea for fixing this, so in the meantime I'll experiment with that.

@supershal
Copy link

I did some further testing. I found out that the issue is with message size not when sending messages in bursts. A message size over 1 KB breaks pipeline after sending few messages.
Here is the test: https://github.com/supershal/enm/blob/vinoski/fix7-round2/examples/stress.erl
The test is to just crash the client when sending fails with {error, closed}.
output:

> erl -pa ebin
Erlang/OTP 17 [erts-6.2] [source-5c974be] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V6.2  (abort with ^G)
1> c("examples/stress.erl",[{o, "examples"}]),
1> stress:start().
accepting on socket: #Port<0.2536>
accepting on socket: #Port<0.2554>
Quitting client: #Port<0.2551>
Quitting server : #Port<0.2536>
<0.44.0>
2>
=ERROR REPORT==== 1-Jun-2015::13:09:11 ===
Error in process <0.44.0> with exit value: {{badmatch,{error,closed}},[{stress,send,3,[{file,"examples/stress.erl"},{line,54}]}]}

Timeout server : #Port<0.2554>

@supershal
Copy link

@vinoski, Does the nanomsg closes socket immediately Or the messages are queued up/dropped and EAGAIN is returned when server is unavailable? I was wondering if returning {error, eagain} upon receiving EAGAIN would be preferred way to handle this usecase. What are conventions? Should client crash and re-establish new socket or client can drop/queue messages itself when it receives {error, eagain} and let nanomsg keep retrying to reconnect with the server?

@vinoski
Copy link
Contributor

vinoski commented Jun 5, 2015

@supershal: the main problem with returning {error, egain} is that the EAGAIN case occurs completely internally to the linked-in driver, i.e., there's no Erlang-level function call involved so there's nowhere to return such a value. It would also be an unusual approach for an Erlang application.

I'm still experimenting with an alternative fix.

@vinoski
Copy link
Contributor

vinoski commented Jun 12, 2015

@supershal and @harishdeshmukh please have a look at 5e19468 and give it a try. It adds a send_timeout option you can set on a socket; if it's set and the socket fails to send within the given timeout period, the socket is closed. If it's not set, it will just keep retrying, same as the original behavior. This provides a different way to address issue #7.

@supershal
Copy link

Thanks @vinoski for the fix. I was able to run my test over the fix successfully. It does not prematurely closes socket anymore. Please let me know following assumptions are correct.

  1. The application will have to reconnect to the endpoint (or crash) upon receiving the {error, closed} message.
  2. The application does not have to perform any cleanup when it receives {error, closed}. enm will cleanup buffered messages.
  3. if enm reaches timeout, all messages received during the timeout window will be lost.

@vinoski
Copy link
Contributor

vinoski commented Jun 15, 2015

@supershal answering your questions:

  1. Yes, if enm returns {error, closed} that means it has closed the socket, so if the application wants to communicate again with the same remote endpoint, it will need to open a new socket.
  2. Correct, enm will clean up all buffered messages if it returns {error, closed}.
  3. Correct, if enm hits a socket send timeout, it closes the socket and discards all unsent messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants