Skip to content

Commit

Permalink
Merge pull request #11736 from bosilca/topic/fix_multi_spawn
Browse files Browse the repository at this point in the history
Retrieve the modex of all jobs during accept/connect
  • Loading branch information
bosilca authored Aug 17, 2023
2 parents 8514e71 + ba0bce4 commit 8e656d9
Showing 1 changed file with 69 additions and 55 deletions.
124 changes: 69 additions & 55 deletions ompi/dpm/dpm.c
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
* fail. */
if (0 >= rportlen) {
rc = rportlen;
/* no need to free here, the root has already done it and everyone else has not yet allocated the rport array */
goto exit;
}

Expand Down Expand Up @@ -406,72 +407,85 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
OPAL_LIST_DESTRUCT(&rlist);
goto exit;
}
if (0 < opal_list_get_size(&ilist)) {
uint32_t *peer_ranks = NULL;
if (!opal_list_is_empty(&ilist)) {
int prn, nprn = 0;
char *val;
uint16_t u16;
opal_process_name_t wildcard_rank;
i = 0; /* start from the begining */

/* convert the list of new procs to a proc_t array */
new_proc_list = (ompi_proc_t**)calloc(opal_list_get_size(&ilist),
sizeof(ompi_proc_t *));
/* get the list of local peers for the new procs */
cd = (ompi_dpm_proct_caddy_t*)opal_list_get_first(&ilist);
proc = cd->p;
wildcard_rank.jobid = proc->super.proc_name.jobid;
wildcard_rank.vpid = OMPI_NAME_WILDCARD->vpid;
/* retrieve the local peers */
OPAL_MODEX_RECV_VALUE_OPTIONAL(rc, PMIX_LOCAL_PEERS,
&wildcard_rank, &val, PMIX_STRING);
if (OPAL_SUCCESS == rc && NULL != val) {
char **peers = opal_argv_split(val, ',');
free(val);
nprn = opal_argv_count(peers);
peer_ranks = (uint32_t*)calloc(nprn, sizeof(uint32_t));
for (prn = 0; NULL != peers[prn]; prn++) {
peer_ranks[prn] = strtoul(peers[prn], NULL, 10);
}
opal_argv_free(peers);
}

i = 0;
OPAL_LIST_FOREACH(cd, &ilist, ompi_dpm_proct_caddy_t) {
/* Extract the modex info for the first proc on the ilist, and then
* remove all processors in the same jobid from the list by getting
* their connection information and moving them into the proc array.
*/
do {
uint32_t *local_ranks_in_jobid = NULL;
ompi_dpm_proct_caddy_t* next = NULL;
cd = (ompi_dpm_proct_caddy_t*)opal_list_get_first(&ilist);
proc = cd->p;
new_proc_list[i] = proc ;
/* ompi_proc_complete_init_single() initializes and optionally retrieves
* OPAL_PMIX_LOCALITY and OPAL_PMIX_HOSTNAME. since we can live without
* them, we are just fine */
ompi_proc_complete_init_single(proc);
/* if this proc is local, then get its locality */
if (NULL != peer_ranks) {
for (prn=0; prn < nprn; prn++) {
if (peer_ranks[prn] == proc->super.proc_name.vpid) {
/* get their locality string */
val = NULL;
OPAL_MODEX_RECV_VALUE_IMMEDIATE(rc, PMIX_LOCALITY_STRING,
&proc->super.proc_name, &val, PMIX_STRING);
if (OPAL_SUCCESS == rc && NULL != ompi_process_info.locality) {
u16 = opal_hwloc_compute_relative_locality(ompi_process_info.locality, val);
free(val);
} else {
/* all we can say is that it shares our node */
u16 = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
wildcard_rank.jobid = proc->super.proc_name.jobid;
wildcard_rank.vpid = OMPI_NAME_WILDCARD->vpid;
/* retrieve the local peers for the specified jobid */
OPAL_MODEX_RECV_VALUE_OPTIONAL(rc, PMIX_LOCAL_PEERS,
&wildcard_rank, &val, PMIX_STRING);
if (OPAL_SUCCESS == rc && NULL != val) {
char **peers = opal_argv_split(val, ',');
free(val);
nprn = opal_argv_count(peers);
local_ranks_in_jobid = (uint32_t*)calloc(nprn, sizeof(uint32_t));
for (prn = 0; NULL != peers[prn]; prn++) {
local_ranks_in_jobid[prn] = strtoul(peers[prn], NULL, 10);
}
opal_argv_free(peers);
}

OPAL_LIST_FOREACH_SAFE(cd, next, &ilist, ompi_dpm_proct_caddy_t) {
proc = cd->p;
if( proc->super.proc_name.jobid != wildcard_rank.jobid )
continue; /* not a proc from this jobid */

new_proc_list[i] = proc;
opal_list_remove_item(&ilist, (opal_list_item_t*)cd); // TODO: do we need to release cd ?
OBJ_RELEASE(cd);
/* ompi_proc_complete_init_single() initializes and optionally retrieves
* OPAL_PMIX_LOCALITY and OPAL_PMIX_HOSTNAME. since we can live without
* them, we are just fine */
ompi_proc_complete_init_single(proc);
/* if this proc is local, then get its locality */
if (NULL != local_ranks_in_jobid) {
uint16_t u16;
for (prn=0; prn < nprn; prn++) {
if (local_ranks_in_jobid[prn] == proc->super.proc_name.vpid) {
/* get their locality string */
val = NULL;
OPAL_MODEX_RECV_VALUE_IMMEDIATE(rc, PMIX_LOCALITY_STRING,
&proc->super.proc_name, &val, PMIX_STRING);
if (OPAL_SUCCESS == rc && NULL != ompi_process_info.locality) {
u16 = opal_hwloc_compute_relative_locality(ompi_process_info.locality, val);
free(val);
} else {
/* all we can say is that it shares our node */
u16 = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
}
proc->super.proc_flags = u16;
/* save the locality for later */
OPAL_PMIX_CONVERT_NAME(&pxproc, &proc->super.proc_name);
pval.type = PMIX_UINT16;
pval.data.uint16 = proc->super.proc_flags;
PMIx_Store_internal(&pxproc, PMIX_LOCALITY, &pval);
break;
}
proc->super.proc_flags = u16;
/* save the locality for later */
OPAL_PMIX_CONVERT_NAME(&pxproc, &proc->super.proc_name);
pval.type = PMIX_UINT16;
pval.data.uint16 = proc->super.proc_flags;
PMIx_Store_internal(&pxproc, PMIX_LOCALITY, &pval);
break;
}
}
++i;
}
++i;
}
if (NULL != peer_ranks) {
free(peer_ranks);
}
if (NULL != local_ranks_in_jobid) {
free(local_ranks_in_jobid);
}
} while (!opal_list_is_empty(&ilist));

/* call add_procs on the new ones */
rc = MCA_PML_CALL(add_procs(new_proc_list, opal_list_get_size(&ilist)));
free(new_proc_list);
Expand Down

0 comments on commit 8e656d9

Please sign in to comment.