Skip to content

UCT/EFA/SRD: Add pending and control request and response #10604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 16, 2025

Conversation

tvegas1
Copy link
Contributor

@tvegas1 tvegas1 commented Apr 4, 2025

What?

Add pending management and control message exchange to remotely add AH, before posting RMA operations.

How?

Tracking remote AH state on iface or MD could cause issues as there could be multiple MD (protection domain) opened on the same remote HCA, and while locally iface or MD could remain, the remote entities could have been released and reopened with same QP id, making it difficult to identify if sending another control request is needed.

Steps

All posts are anyways heavily reordered so:

  1. Only one pending queue for an endpoint
  2. We put RMA on pending if confirmation has not been received, as they cannot be posted
  3. We put AM on pending if our pending is not empty (we could probably simply post but this could cause prio issues wrt RMA)
  4. We allow the sending of AM if pending is empty, even when still expecting confirmation of remote AH add
  5. We can only start dequeuing pending after receiving confirmation of remote AH add, since we do not know which type of request is in the queue

@dmitry-monakhov
Copy link

Hi, how do you test it? I've tried ucx_iperf on p5e.48xlarge instance

env UCX_TLS=self,srd UCX_NET_DEVICES='rdmap113s0:1' /opt/ucx/bin/ucx_perftest -t stream_bw localhost
[1743838571.902753] [dmonakhov-dev:2912039:0]        perftest.c:800  UCX  WARN  CPU affinity is not set (bound to 192 cpus). Performance may be impacted.
+--------------+--------------+------------------------------+---------------------+-----------------------+
|              |              |       overhead (usec)        |   bandwidth (MB/s)  |  message rate (msg/s) |
+--------------+--------------+----------+---------+---------+----------+----------+-----------+-----------+
|    Stage     | # iterations | 50.0%ile | average | overall |  average |  overall |  average  |  overall  |
+--------------+--------------+----------+---------+---------+----------+----------+-----------+-----------+
Final:               1000000      0.610     0.771     0.771        9.90       9.90     1297377     1297377
[1743838582.024622] [dmonakhov-dev:2912039:0]           flush.c:58   UCX  ERROR req 0x627bc1fc5280: error during flush: Unsupported operation
[1743838582.024646] [dmonakhov-dev:2912039:0]         libperf.c:951  UCX  WARN  failed to close ep 0x79cae0f3e000 on thread 0: Unsupported operation

I've tried naive approach, simply ignore unsupported flush, it seems to work.

--- a/src/ucp/rma/flush.c
+++ b/src/ucp/rma/flush.c
@@ -127,7 +127,10 @@ static void ucp_ep_flush_progress(ucp_request_t *req)
                       ucs_status_string(status));
         if (status == UCS_OK) {
             ucp_ep_flush_request_update_uct_comp(req, -1, UCS_BIT(lane));
-        } else if (status == UCS_INPROGRESS) {
+        } else if (status == UCS_ERR_UNSUPPORTED) {
+         ucs_error("Ignore unsupported flush \n");
+         ucp_ep_flush_request_update_uct_comp(req, -1, UCS_BIT(lane));
+       } else if (status == UCS_INPROGRESS) {
             ucp_ep_flush_request_update_uct_comp(req, 0, UCS_BIT(lane));
         } else if (status == UCS_ERR_NO_RESOURCE) {
             if (req->send.lane != UCP_NULL_LANE) {

{
for (auto i = 0; i < 3; i++) {
m_req[i].func = [](uct_pending_req_t*) {
m_count++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if m_count is accessed only from this lambda, then you can make it local static variable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


ASSERT_UCS_OK(uct_ep_pending_add(m_e1->ep(0), &m_req[0], 0));
ASSERT_UCS_OK(uct_ep_pending_add(m_e1->ep(0), &m_req[1], 0));
uct_ep_pending_purge(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is repeated 3 times, maybe make a helper function that purges and ASSERT_EQ

void pending_purge(ep, count)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed (was being added in next pr)

Comment on lines 33 to 35
typedef enum uct_srd_ctl_id {
UCT_SRD_CTL_ID_REQ = UCT_AM_ID_MAX,
UCT_SRD_CTL_ID_RESP = UCT_SRD_CTL_ID_REQ + 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosmetic:

Suggested change
typedef enum uct_srd_ctl_id {
UCT_SRD_CTL_ID_REQ = UCT_AM_ID_MAX,
UCT_SRD_CTL_ID_RESP = UCT_SRD_CTL_ID_REQ + 1,
typedef enum {
UCT_SRD_CTL_ID_REQ = UCT_AM_ID_MAX,
UCT_SRD_CTL_ID_RESP,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if ((hdr->id == UCT_SRD_CTL_ID_REQ) || (hdr->id == UCT_SRD_CTL_ID_RESP)) {
ctl = (uct_srd_ctl_hdr_t*)hdr;
snprintf(p, endp - p, " %s qpn %d ep_uuid %" PRIx64 " ",
(ctl->id == UCT_SRD_CTL_ID_REQ) ? "CTL_REQ" :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it makes sense to implement function "_to_string" for this enum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

(ctl->id == UCT_SRD_CTL_ID_RESP) ? "CTL_RESP" :
"UNKNOWN",
uct_ib_unpack_uint24(ctl->qpn), ctl->ep_uuid);
p += strlen(p);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe increment by value returned from snprintf?
We can also assert on that value (>0 and <max)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, kept strlen() as in non-debug (no assert) we need to add effective size

char *p, *endp;
int am_id;

p = buffer;
endp = buffer + max;

if ((hdr->id == UCT_SRD_CTL_ID_REQ) || (hdr->id == UCT_SRD_CTL_ID_RESP)) {
ctl = (uct_srd_ctl_hdr_t*)hdr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would propose to split this block into a separate function e.g. _dump_header, because it's really independent functionality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, but function is already small

goto out;
}

ucs_assertv(ctl->id == UCT_SRD_CTL_ID_REQ, "iface=%p id=%u", iface,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also use "_to_string" here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

ucs_assertv(ctl->id == UCT_SRD_CTL_ID_REQ, "iface=%p id=%u", iface,
ctl->id);
ucs_assertv((sizeof(*ctl) + iface->super.addr_size) == length,
"req size mismatched expected=%zu received=%u",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"req size mismatched expected=%zu received=%u",
"req size mismatch expected=%zu received=%u",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

ep->pending++;
uct_pending_req_arb_group_push(&ep->pending_group, req);
ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->pending_group);
ucs_trace_data("ep=%p: added pending req=%p psn=%u", ep, req, ep->psn);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

print iface like in other places?

Suggested change
ucs_trace_data("ep=%p: added pending req=%p psn=%u", ep, req, ep->psn);
ucs_trace_data("iface=%p ep=%p: added pending req=%p psn=%u", ep, req, ep->psn);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

uct_srd_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_srd_iface_t);
uct_purge_cb_args_t args = {cb, cb_arg};

ucs_trace_func("ep=%p", ep);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it's missing iface and some description, e.g. iface=%p ep=%p: purge

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's trace function, added some arguments

ucs_list_link_t list; /* Entry in iface tx pending control list */
struct ibv_ah *ah;
int dest_qpn;
uct_srd_ctl_hdr_t hdr[];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: imo, you could also remove it, like with hdr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed all hdr[]

uint64_t ep_uuid; /* Random EP identifier */
uint32_t dest_qpn; /* Remote QP */
uint32_t inflight; /* Entries outstanding list */
int32_t pending; /* Count requests in pending queue */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be negative?
seems like we do not really need this counter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -60,6 +63,7 @@ typedef struct uct_srd_iface {

struct {
int32_t available;
int in_pending; /* true if invoked from arbiter dispatch */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: can be for assert only

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t,
priv);

/* Purge all of them, as requests were only added with pending_add */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this comment mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed (some transports add their own pending_req_t internally, srd currently does not)

@@ -183,6 +284,10 @@ static UCS_F_ALWAYS_INLINE ucs_status_t uct_srd_ep_am_short_prepare(
uct_srd_am_short_hdr_t *am = &iface->tx.am_inl_hdr;
uct_srd_send_op_t *send_op;

if (uct_srd_ep_skip_pending(ep, iface)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do not we check for resources instead?
Specifically for uct_srd_iface_can_tx(iface) && ep->ah_added

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 102 to 95
ctl_op = ucs_malloc(sizeof(*ctl_op) + sizeof(*ctl_op->hdr) +
iface->super.addr_size,
"uct_srd_ctl_op_t");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alignment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indented back line with iface->super.addr_size

@@ -535,8 +779,15 @@ uct_srd_iface_poll_rx(uct_srd_iface_t *iface)
UCT_IB_IFACE_VERBS_FOREACH_RXWQE(&iface->super, i, packet, wc, num_wcs) {
uct_ib_log_recv_completion(&iface->super, &wc[i], packet,
wc[i].byte_len, uct_srd_dump_packet);
uct_srd_iface_process_rx(iface, (uct_srd_hdr_t*)packet, wc[i].byte_len,
(uct_srd_recv_desc_t*)wc[i].wr_id);
if ((*(uint8_t*)packet < UCT_AM_ID_MAX)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd add likely here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 84 to 89
for (auto count = 10; count > 0; count--) {
short_progress_loop();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we progress until ah_added flag on ep is set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

uct_srd_iface_process_rx(iface, (uct_srd_hdr_t*)packet, wc[i].byte_len,
(uct_srd_recv_desc_t*)wc[i].wr_id);
if ((*(uint8_t*)packet < UCT_AM_ID_MAX)) {
uct_srd_iface_process_rx(iface, (uct_srd_hdr_t*)packet,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this cast and one below are redundant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

ucs_mpool_put(desc);
return;

err:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's better to get rid of this goto and just print error in 2 places, but up to you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying something else

khiter_t iter;
ucs_status_t status;

iter = kh_get(uct_srd_ep_hash, &iface->ep_hash, ep->ep_uuid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we put unconditionally and then check the return value:

  • already exists/ error - return error, without goto
  • then handle sucsess case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 453 to 420
hdr = (uct_srd_hdr_t *)(desc + 1);
length = pack_cb(hdr + 1, arg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: align by =

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see it already aligned

Comment on lines 112 to 113
ctl_op->ah = ah;
ctl_op->dest_qpn = dest_qpn;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls remove extra whitespaces now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see it already aligned


ucs_trace_data("iface=%p ep=%p progressing pending request %p", iface, ep,
req);
iface->tx.in_pending = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems it can be done when asserts enabled only

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 242 to 247
if (!uct_srd_ep_can_tx(ep, iface)) {
UCS_STATS_UPDATE_COUNTER(ep->super.stats, UCT_EP_STAT_NO_RES, 1);
return UCS_ERR_NO_RESOURCE;
}

uct_srd_iface_check_pending(iface, &ep->pending_group);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we combine it to one macro, as it is used in other places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure as there is not a single macro used everywhere. maybe we could add to uct_srd_ep_can_tx()

brminich
brminich previously approved these changes Apr 15, 2025
@yosefe yosefe enabled auto-merge April 15, 2025 17:57
@yosefe yosefe merged commit 8c53ab5 into openucx:master Apr 16, 2025
151 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants