Skip to content

PgListener interface improvements #1102

@Diggsey

Description

@Diggsey

Whilst you can use try_recv to be notified of the connection dropping (and possible lost messages) you get the notification before the connection has been re-established, making it fairly useless. This is because more messages could be dropped between receiving None and the next time you call recv.

Here is some pseudocode for how I think a PgListener could be used:

let mut listener = PgListener::new(pool);
let mut maybe_work_item = None;

listener.listen("work_channel").await;

loop {
    if let Some(work_item) = maybe_work_item {
        // Busy state
        spawn_with_limit(work_item).await;

        maybe_work_item  = look_for_work().await;
        if maybe_work_item.is_none() {
            listener.listen("work_channel").await;
        }
    } else {
        // Idle state
        maybe_work_item  = look_for_work().await;
        if maybe_work_item.is_none() {
            // Wait to be notified of new work
            listener.recv().await;
        } else {
            listener.unlisten("work_channel");
        }
    }
}

Here, recv would return an enum with the following variants:

enum ListenerEvent {
    ConnectionLost,
    ConnectionEstablished,
    Notified(Notification),
}

ConnectionEstablished would only be sent after the LISTEN request, so that you can guarantee at-least-once delivery by waiting for either a Notified event or a ConnectionEstablished event.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions