Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kqueue] hits unreachable code with high completion loads #122

Open
bhansconnect opened this issue Oct 15, 2024 · 7 comments
Open

[kqueue] hits unreachable code with high completion loads #122

bhansconnect opened this issue Oct 15, 2024 · 7 comments

Comments

@bhansconnect
Copy link

I totally might be using the library incorrectly. I am just starting out working with it and trying to set it up to function as an http server. Right now I just have the basics hardcoded. If I benchmark it with wrk and a high number of connections (512) I end up seeing: error(libxev_kqueue): invalid state in submission queue state=backend.kqueue.Completion.State.active and an eventual crash with the following stacktrace:

thread 7376789 panic: reached unreachable code
/opt/homebrew/Cellar/zig/0.13.0/lib/zig/std/debug.zig:412:14: 0x100ff6c63 in assert (platform)
    if (!ok) unreachable; // assertion failure
             ^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/queue.zig:24:19: 0x100ff515f in push (platform)
            assert(v.next == null);
                  ^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/backend/kqueue.zig:225:38: 0x100ff2763 in submit (platform)
                self.completions.push(c);
                                     ^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/backend/kqueue.zig:333:24: 0x100ff6dd7 in tick (platform)
        try self.submit();
                       ^
/Users/bren077s/.cache/zig/p/1220612bc023c21d75234882ec9a8c6a1cbd9d642da3dfb899297f14bb5bd7b6cd78/src/backend/kqueue.zig:264:62: 0x100ff8b47 in run (platform)
            .until_done => while (!self.done()) try self.tick(1),
                                                             ^
/Users/bren077s/Projects/roc-coro/platform/src/main.zig:63:22: 0x100ff8fcb in main (platform)
    try root_loop.run(.until_done);
                     ^
/opt/homebrew/Cellar/zig/0.13.0/lib/zig/std/start.zig:524:37: 0x100ff96af in main (platform)
            const result = root.main() catch |err| {
                                    ^
???:?:?: 0x18e714273 in ??? (???)
???:?:?: 0x0 in ??? (???)
run
└─ run platform failure
error: the following command terminated unexpectedly:
/Users/bren077s/Projects/roc-coro/platform/zig-out/bin/platform 

I don't think this is the same as #111, but I might be wrong.
Any general advise would be greatly appreciated (even if the advice is that libxev is not ready and I should really switch to something else). I'm not trying to build anything production ready, just working on a prototype of a coroutine based webserver to use with roc (which is a new programming language).

The full repo at the current state is here: https://github.com/bhansconnect/roc-coro-webserver/tree/9b07612a95d1b937b0b8e7ec90a14c1a310f5b2b/platform

Here is a slightly reduced version of the main file that is leading to the errors:

const std = @import("std");
const xev = @import("xev");
const log = std.log.scoped(.platform);
const Allocator = std.mem.Allocator;

pub const std_options: std.Options = .{
    .log_level = .info,
};

var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{};
var allocator: Allocator = gpa.allocator();

pub fn main() !void {
    var root_loop = try xev.Loop.init(.{});
    defer root_loop.deinit();

    var address = try std.net.Address.parseIp4("127.0.0.1", 8000);
    const server = try xev.TCP.init(address);

    // Bind and listen
    try server.bind(address);
    try server.listen(128);

    // Is this needed? I think it is getting the port.
    // But we specify a specific port...
    const fd = if (xev.backend == .iocp) @as(std.os.windows.ws2_32.SOCKET, @ptrCast(server.fd)) else server.fd;
    var sock_len = address.getOsSockLen();
    try std.posix.getsockname(fd, &address.any, &sock_len);
    log.info("Starting server at: http://{}", .{address});

    // Setup accepting connections
    var c_accept: xev.Completion = undefined;
    server.accept(&root_loop, &c_accept, void, null, (struct {
        fn callback(
            _: ?*void,
            loop: *xev.Loop,
            _: *xev.Completion,
            accept_result: xev.AcceptError!xev.TCP,
        ) xev.CallbackAction {
            const socket = accept_result catch |err| {
                log.err("Failed to accept connection: {}", .{err});
                return .rearm;
            };
            log.debug("Accepting new TCP connection", .{});
            var handler = allocator.create(Handler) catch unreachable;
            socket.read(loop, &handler.completion, .{ .slice = &handler.buffer }, Handler, handler, Handler.read_callback);
            return .rearm;
        }
    }).callback);

    try root_loop.run(.until_done);
}

const Handler = struct {
    const Self = @This();

    completion: xev.Completion,
    buffer: [4096]u8,

    fn read_callback(
        self: ?*Self,
        loop: *xev.Loop,
        _: *xev.Completion,
        socket: xev.TCP,
        rb: xev.ReadBuffer,
        len_result: xev.ReadError!usize,
    ) xev.CallbackAction {
        const len = len_result catch |err| {
            // I'm not sure this is correct, but I think we need to retry on would block.
            // Feels like something that libxev should handle on its own.
            if (err == error.WouldBlock) {
                return .rearm;
            }
            if (err != error.ConnectionResetByPeer and err != error.EOF) {
                log.warn("Failed to read from tcp connection: {}", .{err});
            }
            self.?.close(loop, socket);
            return .disarm;
        };
        // TODO: handle partial reads.
        // I'm a bit suprised that reading 0 bytes doesn't count as would block.
        if (len == 0) {
            return .rearm;
        }
        log.debug("Request: \n{s}\n", .{rb.slice[0..len]});

        // TODO: This is where we should parse the header, make sure it is valid.
        // Check the full lengh and keep polling if more is to come.
        // Also should check keep alive.

        const response =
            "HTTP/1.1 200 OK\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: 13\r\n\r\nHello, World!";
        const slice = std.fmt.bufPrint(&self.?.buffer, response, .{}) catch |err| {
            log.warn("Failed to write to io buffer: {}", .{err});
            self.?.close(loop, socket);
            return .disarm;
        };
        socket.write(loop, &self.?.completion, .{ .slice = slice }, Self, self, Self.write_callback);
        return .disarm;
    }

    fn write_callback(
        self: ?*Self,
        loop: *xev.Loop,
        _: *xev.Completion,
        socket: xev.TCP,
        wb: xev.WriteBuffer,
        len_result: xev.WriteError!usize,
    ) xev.CallbackAction {
        const len = len_result catch |err| {
            // I'm not sure this is correct, but I think we need to retry on would block.
            // Feels like something that libxev should handle on its own.
            if (err == error.WouldBlock) {
                return .rearm;
            }
            if (err != error.ConnectionResetByPeer and err != error.EOF) {
                log.warn("Failed to write to tcp connection: {}", .{err});
            }
            self.?.close(loop, socket);
            return .disarm;
        };
        // TODO: handle partial writes.
        // I'm a bit suprised that writing 0 bytes doesn't count as would block.
        if (len == 0) {
            return .rearm;
        }
        log.debug("Response: \n{s}\n", .{wb.slice[0..len]});

        // Send back to reading. Just assuming keep alive for now.
        socket.read(loop, &self.?.completion, .{ .slice = &self.?.buffer }, Self, self, Self.read_callback);
        return .disarm;
    }

    fn close(self: *Self, loop: *xev.Loop, socket: xev.TCP) void {
        socket.close(loop, &self.completion, Self, self, close_callback);
    }

    fn close_callback(
        self: ?*Self,
        _: *xev.Loop,
        _: *xev.Completion,
        _: xev.TCP,
        _: xev.ShutdownError!void,
    ) xev.CallbackAction {
        // If shutdowns fails, should this retry?
        allocator.destroy(self.?);
        return .disarm;
    }
};

Thanks for creating libxev and any help. Initial perf tests of this libxev are looking really good.

@bhansconnect
Copy link
Author

As an extra note. Did a quick test on a linux machine. Everything seems to work without hiccup on linux.

@mitchellh
Copy link
Owner

Thanks for the report! I will check this out soon.

@bhansconnect bhansconnect changed the title [kqueue] [kqueue] hits unreachable code with high completion loads Oct 17, 2024
@bhansconnect
Copy link
Author

One other added note, I realized that this is probably the same as: #105

@bhansconnect
Copy link
Author

bhansconnect commented Oct 18, 2024

Ok, I'm not sure the best solution yet, but I think I found the core issue. If loop.run(.no_wait) is used, the kqueue backend will hit this break:

// If we ran through the loop once we break if we don't care.
if (wait == 0) break;

This break can happen when there are still changes left from here:

events[changes] = ev;
events[changes].flags = posix.system.EV_DELETE;
events[changes].udata = 0;
changes += 1;
assert(changes <= events.len);

If there are any changes left, they are never passed to kevent to clear things up. So those extra changes either need to be delayed to the next call to loop.run(), or they need to be cleaned up when breaking out of the loop here.

@bhansconnect
Copy link
Author

Note, a minimal changes that works around the bug (but obviously leads to waiting) is:

if (wait == 0 and changes == 0) break; 

@steeve
Copy link

steeve commented Nov 19, 2024

Can you try #115 ?

@bhansconnect
Copy link
Author

No, #115 is not enough to fix my issue above. I have #115 along with a few other kqueue changes here and still haven't fully fixed the issues I am hitting with kqueue and high concurrency: main...bhansconnect:libxev:main#diff-caa3f8691afca2b16eaf30614ff2616aa0d6cb38bbe2750f39e7669e4902ff81

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants