Skip to content

Commit f0384ad

Browse files
authored
TL/UCP: Split single and multithreaded send/receive (#1109)
* TL/UCP: completion callback st/mt * TL/UCP: ucc_tl_ucp_send_nb callback * TL/UCP: recv implementation * TL/UCP: fix conflict * TL/UCP: disable clang tidy error * TL/UCP: non zero versions * TL/UCP: rename and format
1 parent d3eca2e commit f0384ad

File tree

4 files changed

+248
-64
lines changed

4 files changed

+248
-64
lines changed

src/components/tl/ucp/tl_ucp.h

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,59 @@ typedef ucc_status_t (*ucc_tl_ucp_copy_test_fn_t)(ucc_tl_ucp_context_t *ctx,
143143
ucc_tl_ucp_copy_task_t *copy_task);
144144
typedef ucc_status_t (*ucc_tl_ucp_copy_finalize_fn_t)(ucc_tl_ucp_copy_task_t *copy_task);
145145

146+
typedef struct ucc_tl_ucp_team {
147+
ucc_tl_team_t super;
148+
ucc_status_t status;
149+
uint32_t seq_num;
150+
ucc_tl_ucp_task_t *preconnect_task;
151+
void * va_base[MAX_NR_SEGMENTS];
152+
size_t base_length[MAX_NR_SEGMENTS];
153+
ucc_tl_ucp_worker_t * worker;
154+
ucc_tl_ucp_team_config_t cfg;
155+
const char * tuning_str;
156+
ucc_topo_t *topo;
157+
ucc_ep_map_t ctx_map;
158+
ucc_rank_t opt_radix; /* generic opt radix */
159+
ucc_rank_t opt_radix_host; /* host specific opt radix */
160+
} ucc_tl_ucp_team_t;
161+
UCC_CLASS_DECLARE(ucc_tl_ucp_team_t, ucc_base_context_t *,
162+
const ucc_base_team_params_t *);
163+
164+
typedef ucc_status_t (*ucc_tl_ucp_send_nb_fn_t)(void *buffer, size_t msglen,
165+
ucc_memory_type_t mtype,
166+
ucc_rank_t dest_group_rank,
167+
ucc_tl_ucp_team_t *team,
168+
ucc_tl_ucp_task_t *task);
169+
170+
typedef ucc_status_t (*ucc_tl_ucp_recv_nb_fn_t)(void *buffer, size_t msglen,
171+
ucc_memory_type_t mtype,
172+
ucc_rank_t dest_group_rank,
173+
ucc_tl_ucp_team_t *team,
174+
ucc_tl_ucp_task_t *task);
175+
176+
typedef ucc_status_t (*ucc_tl_ucp_recv_nz_fn_t)(void *buffer, size_t msglen,
177+
ucc_memory_type_t mtype,
178+
ucc_rank_t dest_group_rank,
179+
ucc_tl_ucp_team_t *team,
180+
ucc_tl_ucp_task_t *task);
181+
182+
typedef ucc_status_t (*ucc_tl_ucp_send_nz_fn_t)(void *buffer, size_t msglen,
183+
ucc_memory_type_t mtype,
184+
ucc_rank_t dest_group_rank,
185+
ucc_tl_ucp_team_t *team,
186+
ucc_tl_ucp_task_t *task);
187+
146188
typedef struct ucc_tl_ucp_context {
147189
ucc_tl_context_t super;
148190
ucc_tl_ucp_context_config_t cfg;
149191
ucc_tl_ucp_worker_t worker;
150192
ucc_tl_ucp_worker_t service_worker;
193+
struct {
194+
ucc_tl_ucp_send_nb_fn_t ucc_tl_ucp_send_nb;
195+
ucc_tl_ucp_recv_nb_fn_t ucc_tl_ucp_recv_nb;
196+
ucc_tl_ucp_send_nz_fn_t ucc_tl_ucp_send_nz;
197+
ucc_tl_ucp_recv_nz_fn_t ucc_tl_ucp_recv_nz;
198+
} sendrecv_cbs;
151199
uint32_t service_worker_throttling_count;
152200
ucc_mpool_t req_mp;
153201
ucc_tl_ucp_remote_info_t * remote_info;
@@ -162,26 +210,8 @@ typedef struct ucc_tl_ucp_context {
162210
} copy;
163211
} ucc_tl_ucp_context_t;
164212
UCC_CLASS_DECLARE(ucc_tl_ucp_context_t, const ucc_base_context_params_t *,
165-
const ucc_base_config_t *);
166-
167-
typedef struct ucc_tl_ucp_team {
168-
ucc_tl_team_t super;
169-
ucc_status_t status;
170-
uint32_t seq_num;
171-
ucc_tl_ucp_task_t *preconnect_task;
172-
void * va_base[MAX_NR_SEGMENTS];
173-
size_t base_length[MAX_NR_SEGMENTS];
174-
ucc_tl_ucp_worker_t * worker;
175-
ucc_tl_ucp_team_config_t cfg;
176-
const char * tuning_str;
177-
ucc_topo_t *topo;
178-
ucc_ep_map_t ctx_map;
179-
ucc_rank_t opt_radix; /* generic opt radix */
180-
ucc_rank_t opt_radix_host; /* host specific opt radix */
181-
} ucc_tl_ucp_team_t;
182-
UCC_CLASS_DECLARE(ucc_tl_ucp_team_t, ucc_base_context_t *,
183-
const ucc_base_team_params_t *);
184-
213+
const ucc_base_config_t *);
214+
185215
extern ucc_config_field_t ucc_tl_ucp_lib_config_table[];
186216

187217
#define UCC_TL_UCP_SUPPORTED_COLLS \

src/components/tl/ucp/tl_ucp_coll.c

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,21 @@ void ucc_tl_ucp_team_default_score_str_free(
9797
}
9898
}
9999

100-
void ucc_tl_ucp_send_completion_cb(void *request, ucs_status_t status,
101-
void *user_data)
100+
void ucc_tl_ucp_send_completion_cb_st(void *request, ucs_status_t status,
101+
void *user_data)
102+
{
103+
ucc_tl_ucp_task_t *task = (ucc_tl_ucp_task_t *)user_data;
104+
if (ucc_unlikely(UCS_OK != status)) {
105+
tl_error(UCC_TASK_LIB(task), "failure in send completion %s",
106+
ucs_status_string(status));
107+
task->super.status = ucs_status_to_ucc_status(status);
108+
}
109+
++task->tagged.send_completed;
110+
ucp_request_free(request);
111+
}
112+
113+
void ucc_tl_ucp_send_completion_cb_mt(void *request, ucs_status_t status,
114+
void *user_data)
102115
{
103116
ucc_tl_ucp_task_t *task = (ucc_tl_ucp_task_t *)user_data;
104117
if (ucc_unlikely(UCS_OK != status)) {
@@ -136,9 +149,9 @@ void ucc_tl_ucp_get_completion_cb(void *request, ucs_status_t status,
136149
ucp_request_free(request);
137150
}
138151

139-
void ucc_tl_ucp_recv_completion_cb(void *request, ucs_status_t status,
140-
const ucp_tag_recv_info_t *info, /* NOLINT */
141-
void *user_data)
152+
void ucc_tl_ucp_recv_completion_cb_mt(void *request, ucs_status_t status,
153+
const ucp_tag_recv_info_t *info, /* NOLINT */
154+
void *user_data)
142155
{
143156
ucc_tl_ucp_task_t *task = (ucc_tl_ucp_task_t *)user_data;
144157
if (ucc_unlikely(UCS_OK != status)) {
@@ -150,6 +163,20 @@ void ucc_tl_ucp_recv_completion_cb(void *request, ucs_status_t status,
150163
ucp_request_free(request);
151164
}
152165

166+
void ucc_tl_ucp_recv_completion_cb_st(void *request, ucs_status_t status,
167+
const ucp_tag_recv_info_t *info, /* NOLINT */
168+
void *user_data)
169+
{
170+
ucc_tl_ucp_task_t *task = (ucc_tl_ucp_task_t *)user_data;
171+
if (ucc_unlikely(UCS_OK != status)) {
172+
tl_error(UCC_TASK_LIB(task), "failure in recv completion %s",
173+
ucs_status_string(status));
174+
task->super.status = ucs_status_to_ucc_status(status);
175+
}
176+
++task->tagged.recv_completed;
177+
ucp_request_free(request);
178+
}
179+
153180
ucc_status_t ucc_tl_ucp_coll_finalize(ucc_coll_task_t *coll_task)
154181
{
155182
ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t);

src/components/tl/ucp/tl_ucp_context.c

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include "tl_ucp_copy.h"
1616
#include <limits.h>
1717

18+
#include "tl_ucp_sendrecv.h"
19+
1820
#define UCP_CHECK(function, msg, go, ctx) \
1921
status = function; \
2022
if (UCS_OK != status) { \
@@ -215,9 +217,17 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_context_t,
215217
case UCC_THREAD_SINGLE:
216218
case UCC_THREAD_FUNNELED:
217219
worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
220+
self->sendrecv_cbs.ucc_tl_ucp_send_nb = ucc_tl_ucp_send_nb_st;
221+
self->sendrecv_cbs.ucc_tl_ucp_recv_nb = ucc_tl_ucp_recv_nb_st;
222+
self->sendrecv_cbs.ucc_tl_ucp_send_nz = ucc_tl_ucp_send_nz_st;
223+
self->sendrecv_cbs.ucc_tl_ucp_recv_nz = ucc_tl_ucp_recv_nz_st;
218224
break;
219225
case UCC_THREAD_MULTIPLE:
220226
worker_params.thread_mode = UCS_THREAD_MODE_MULTI;
227+
self->sendrecv_cbs.ucc_tl_ucp_send_nb = ucc_tl_ucp_send_nb_mt;
228+
self->sendrecv_cbs.ucc_tl_ucp_recv_nb = ucc_tl_ucp_recv_nb_mt;
229+
self->sendrecv_cbs.ucc_tl_ucp_send_nz = ucc_tl_ucp_send_nz_mt;
230+
self->sendrecv_cbs.ucc_tl_ucp_recv_nz = ucc_tl_ucp_recv_nz_mt;
221231
break;
222232
default:
223233
/* unreachable */
@@ -294,7 +304,6 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_context_t,
294304
ucc_free(prefix);
295305
prefix = NULL;
296306

297-
298307
switch (self->cfg.local_copy_type) {
299308
case UCC_TL_UCP_LOCAL_COPY_TYPE_MC:
300309
self->copy.post = ucc_tl_ucp_mc_copy_post;

0 commit comments

Comments
 (0)