ARGOBOTS  1.1
stream.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 ABTU_ret_err static int xstream_create(ABTI_global *p_global,
9  ABTI_sched *p_sched,
10  ABTI_xstream_type xstream_type, int rank,
11  ABT_bool start,
12  ABTI_xstream **pp_xstream);
13 ABTU_ret_err static int xstream_join(ABTI_local **pp_local,
14  ABTI_xstream *p_xstream);
16  ABTI_xstream *p_newxstream, int rank);
18  ABTI_xstream *p_xstream, int rank);
19 static void xstream_return_rank(ABTI_global *p_global, ABTI_xstream *p_xstream);
20 static inline void xstream_schedule_ythread(ABTI_global *p_global,
21  ABTI_xstream **pp_local_xstream,
22  ABTI_ythread *p_ythread);
23 static inline void xstream_schedule_task(ABTI_global *p_global,
24  ABTI_xstream *p_local_xstream,
25  ABTI_thread *p_task);
26 static void xstream_init_main_sched(ABTI_xstream *p_xstream,
27  ABTI_sched *p_sched);
28 ABTU_ret_err static int
30  ABTI_xstream **pp_local_xstream,
31  ABTI_xstream *p_xstream, ABTI_sched *p_sched);
32 static void *xstream_launch_root_ythread(void *p_xstream);
33 #ifndef ABT_CONFIG_DISABLE_MIGRATION
35  ABTI_local *p_local,
36  ABTI_thread *p_thread);
37 #endif
38 
87 int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
88 {
89 #ifndef ABT_CONFIG_ENABLE_VER_20_API
90  /* Argobots 1.x sets newxstream to NULL on error. */
91  *newxstream = ABT_XSTREAM_NULL;
92 #endif
93  int abt_errno;
94  ABTI_xstream *p_newxstream;
95 
96  ABTI_global *p_global;
97  ABTI_SETUP_GLOBAL(&p_global);
98 
99  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
100  if (!p_sched) {
101  abt_errno =
102  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
103  ABTI_CHECK_ERROR(abt_errno);
104  } else {
105 #ifndef ABT_CONFIG_ENABLE_VER_20_API
108 #endif
109  }
110 
111  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_SECONDARY,
112  -1, ABT_TRUE, &p_newxstream);
113  if (abt_errno != ABT_SUCCESS) {
114  if (!ABTI_sched_get_ptr(sched)) {
115  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
116  ABT_FALSE);
117  }
118  ABTI_HANDLE_ERROR(abt_errno);
119  }
120 
121  /* Return value */
122  *newxstream = ABTI_xstream_get_handle(p_newxstream);
123  return ABT_SUCCESS;
124 }
125 
179 int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools,
180  ABT_pool *pools, ABT_sched_config config,
181  ABT_xstream *newxstream)
182 {
183 #ifndef ABT_CONFIG_ENABLE_VER_20_API
184  /* Argobots 1.x sets newxstream to NULL on error. */
185  *newxstream = ABT_XSTREAM_NULL;
186 #endif
187  ABTI_CHECK_TRUE(num_pools >= 0, ABT_ERR_INV_ARG);
188 
189  int abt_errno;
190  ABTI_xstream *p_newxstream;
191  ABTI_sched_config *p_config = ABTI_sched_config_get_ptr(config);
192 
193  ABTI_global *p_global;
194  ABTI_SETUP_GLOBAL(&p_global);
195 
196  ABTI_sched *p_sched;
197  abt_errno =
198  ABTI_sched_create_basic(predef, num_pools, pools, p_config, &p_sched);
199  ABTI_CHECK_ERROR(abt_errno);
200 
201  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_SECONDARY,
202  -1, ABT_TRUE, &p_newxstream);
203  if (abt_errno != ABT_SUCCESS) {
204  int i;
205  for (i = 0; i < num_pools; i++) {
206  if (pools[i] != ABT_POOL_NULL) {
207  /* Avoid freeing user-given pools. */
209  p_sched->pools[i] = ABT_POOL_NULL;
210  }
211  }
212  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
213  ABT_FALSE);
214  ABTI_HANDLE_ERROR(abt_errno);
215  }
216 
217  *newxstream = ABTI_xstream_get_handle(p_newxstream);
218  return ABT_SUCCESS;
219 }
220 
273  ABT_xstream *newxstream)
274 {
275 #ifndef ABT_CONFIG_ENABLE_VER_20_API
276  /* Argobots 1.x sets newxstream to NULL on error. */
277  *newxstream = ABT_XSTREAM_NULL;
278 #endif
279  int abt_errno;
280  ABTI_xstream *p_newxstream;
281 
282  ABTI_global *p_global;
283  ABTI_SETUP_GLOBAL(&p_global);
284 
286 
287  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
288  if (!p_sched) {
289  abt_errno =
290  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
291  ABTI_CHECK_ERROR(abt_errno);
292  } else {
293 #ifndef ABT_CONFIG_ENABLE_VER_20_API
296 #endif
297  }
298 
299  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_SECONDARY,
300  rank, ABT_TRUE, &p_newxstream);
301  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
302  if (!ABTI_sched_get_ptr(sched)) {
303  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
304  ABT_FALSE);
305  }
306  ABTI_HANDLE_ERROR(abt_errno);
307  }
308 
309  /* Return value */
310  *newxstream = ABTI_xstream_get_handle(p_newxstream);
311  return ABT_SUCCESS;
312 }
313 
345 {
346  ABTI_global *p_global = ABTI_global_get_global();
347  ABTI_local *p_local = ABTI_local_get_local();
348  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
349  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
350 
351  /* Revives the main scheduler thread. */
352  ABTI_sched *p_main_sched = p_xstream->p_main_sched;
353  ABTI_ythread *p_main_sched_ythread = p_main_sched->p_ythread;
354  /* TODO: should we check the thread state instead of the xstream state? */
356  &p_main_sched_ythread->thread.state) ==
359 
360  ABTD_atomic_relaxed_store_uint32(&p_main_sched->request, 0);
361  ABTI_tool_event_thread_join(p_local, &p_main_sched_ythread->thread,
363  ? ABTI_local_get_xstream(p_local)->p_thread
364  : NULL);
365 
366  int abt_errno =
367  ABTI_thread_revive(p_global, p_local, p_xstream->p_root_pool,
368  p_main_sched_ythread->thread.f_thread,
369  p_main_sched_ythread->thread.p_arg,
370  &p_main_sched_ythread->thread);
371  /* ABTI_thread_revive() never fails since it does not update an associated
372  * pool.*/
373  assert(abt_errno == ABT_SUCCESS);
374 
376  ABTD_xstream_context_revive(&p_xstream->ctx);
377  return ABT_SUCCESS;
378 }
379 
421 int ABT_xstream_free(ABT_xstream *xstream)
422 {
423  ABTI_global *p_global;
424  ABTI_SETUP_GLOBAL(&p_global);
425 
426  ABTI_local *p_local = ABTI_local_get_local();
427  ABT_xstream h_xstream = *xstream;
428 
429  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(h_xstream);
430  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
431 
432  /* We first need to check whether p_local_xstream is NULL because this
433  * routine might be called by external threads. */
436  "The current xstream cannot be freed.");
437 
440  "The primary xstream cannot be freed explicitly.");
441 
442  /* Wait until xstream terminates */
443  int abt_errno = xstream_join(&p_local, p_xstream);
444  ABTI_CHECK_ERROR(abt_errno);
445 
446  /* Free the xstream object */
447  ABTI_xstream_free(p_global, p_local, p_xstream, ABT_FALSE);
448 
449  /* Return value */
450  *xstream = ABT_XSTREAM_NULL;
451  return ABT_SUCCESS;
452 }
453 
485 int ABT_xstream_join(ABT_xstream xstream)
486 {
487  ABTI_local *p_local = ABTI_local_get_local();
488  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
489  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
490 
491  int abt_errno = xstream_join(&p_local, p_xstream);
492  ABTI_CHECK_ERROR(abt_errno);
493  return ABT_SUCCESS;
494 }
495 
530 int ABT_xstream_exit(void)
531 {
532  ABTI_xstream *p_local_xstream;
533  ABTI_ythread *p_ythread;
534 #ifndef ABT_CONFIG_ENABLE_VER_20_API
535  ABTI_SETUP_GLOBAL(NULL);
536 #endif
537  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, &p_ythread);
538  /* Check if the target is the primary execution stream. */
539  ABTI_CHECK_TRUE(p_local_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
541 
542  /* Terminate the main scheduler. */
544  ->thread.request,
546  /* Terminate this ULT */
547  ABTI_ythread_exit(p_local_xstream, p_ythread);
549  return ABT_SUCCESS;
550 }
551 
580 int ABT_xstream_cancel(ABT_xstream xstream)
581 {
582  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
583  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
586 
587  /* Terminate the main scheduler of the target xstream. */
589  .request,
591  return ABT_SUCCESS;
592 }
593 
625 int ABT_xstream_self(ABT_xstream *xstream)
626 {
627  ABTI_xstream *p_local_xstream;
628 #ifndef ABT_CONFIG_ENABLE_VER_20_API
629  *xstream = ABT_XSTREAM_NULL;
630  ABTI_SETUP_GLOBAL(NULL);
631 #endif
632  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
633 
634  /* Return value */
635  *xstream = ABTI_xstream_get_handle(p_local_xstream);
636  return ABT_SUCCESS;
637 }
638 
668 int ABT_xstream_self_rank(int *rank)
669 {
670  ABTI_xstream *p_local_xstream;
671 #ifndef ABT_CONFIG_ENABLE_VER_20_API
672  ABTI_SETUP_GLOBAL(NULL);
673 #endif
674  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
675  /* Return value */
676  *rank = (int)p_local_xstream->rank;
677  return ABT_SUCCESS;
678 }
679 
715 int ABT_xstream_set_rank(ABT_xstream xstream, int rank)
716 {
717  ABTI_global *p_global;
718  ABTI_SETUP_GLOBAL(&p_global);
719 
720  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
721  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
725 
726  ABT_bool is_changed = xstream_change_rank(p_global, p_xstream, rank);
728 
729  /* Set the CPU affinity for the ES */
730  if (p_global->set_affinity == ABT_TRUE) {
731  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
732  }
733  return ABT_SUCCESS;
734 }
735 
760 int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
761 {
762  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
763  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
764 
765  *rank = (int)p_xstream->rank;
766  return ABT_SUCCESS;
767 }
768 
848 {
849  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
851 
852  ABTI_global *p_global;
853  ABTI_SETUP_GLOBAL(&p_global);
854 
855  ABTI_xstream *p_local_xstream;
856  ABTI_ythread *p_self;
857  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, &p_self);
858 
859 #ifndef ABT_CONFIG_ENABLE_VER_20_API
862  p_local_xstream == p_xstream,
864 #else
867  p_local_xstream == p_xstream,
869 #endif
870 
871  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
872  if (!p_sched) {
873  int abt_errno =
874  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
875  ABTI_CHECK_ERROR(abt_errno);
876  } else {
877 #ifndef ABT_CONFIG_ENABLE_VER_20_API
880 #endif
881  }
882 
883  int abt_errno = xstream_update_main_sched(p_global, &p_local_xstream,
884  p_xstream, p_sched);
885  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
886  if (!ABTI_sched_get_ptr(sched)) {
887  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
888  ABT_FALSE);
889  }
890  ABTI_HANDLE_ERROR(abt_errno);
891  }
892  return ABT_SUCCESS;
893 }
894 
953  ABT_sched_predef predef, int num_pools,
954  ABT_pool *pools)
955 {
956  int abt_errno;
957  ABTI_global *p_global;
958  ABTI_SETUP_GLOBAL(&p_global);
959 
960  ABTI_xstream *p_local_xstream;
961  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
962 
963  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
964  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
965 
966  ABTI_sched *p_sched;
967  abt_errno =
968  ABTI_sched_create_basic(predef, num_pools, pools, NULL, &p_sched);
969  ABTI_CHECK_ERROR(abt_errno);
970 
971  abt_errno = xstream_update_main_sched(p_global, &p_local_xstream, p_xstream,
972  p_sched);
973  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
974  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
975  ABT_FALSE);
976  ABTI_HANDLE_ERROR(abt_errno);
977  }
978  return ABT_SUCCESS;
979 }
980 
1004 {
1005  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1007 
1008  *sched = ABTI_sched_get_handle(p_xstream->p_main_sched);
1009  return ABT_SUCCESS;
1010 }
1011 
1040 int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools,
1041  ABT_pool *pools)
1042 {
1043  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1044  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1045 
1046  ABTI_sched *p_sched = p_xstream->p_main_sched;
1047  max_pools = ABTU_min_int(p_sched->num_pools, max_pools);
1048  memcpy(pools, p_sched->pools, sizeof(ABT_pool) * max_pools);
1049  return ABT_SUCCESS;
1050 }
1051 
1077 {
1078  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1080 
1081  *state = (ABT_xstream_state)ABTD_atomic_acquire_load_int(&p_xstream->state);
1082  return ABT_SUCCESS;
1083 }
1084 
1112 int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2,
1113  ABT_bool *result)
1114 {
1115  ABTI_xstream *p_xstream1 = ABTI_xstream_get_ptr(xstream1);
1116  ABTI_xstream *p_xstream2 = ABTI_xstream_get_ptr(xstream2);
1117  *result = (p_xstream1 == p_xstream2) ? ABT_TRUE : ABT_FALSE;
1118  return ABT_SUCCESS;
1119 }
1120 
1149 int ABT_xstream_get_num(int *num_xstreams)
1150 {
1151  ABTI_global *p_global;
1152  ABTI_SETUP_GLOBAL(&p_global);
1153 
1154  *num_xstreams = p_global->num_xstreams;
1155  return ABT_SUCCESS;
1156 }
1157 
1182 int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *is_primary)
1183 {
1184  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1186 
1187  /* Return value */
1188  *is_primary =
1189  (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) ? ABT_TRUE : ABT_FALSE;
1190  return ABT_SUCCESS;
1191 }
1192 
1221 int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool)
1222 {
1223  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
1226  ABTI_global *p_global;
1227  ABTI_SETUP_GLOBAL(&p_global);
1228 
1229  ABTI_xstream *p_local_xstream;
1230  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
1231 
1232  ABTI_thread *p_thread;
1233  int abt_errno =
1234  ABTI_unit_set_associated_pool(p_global, unit, p_pool, &p_thread);
1235  ABTI_CHECK_ERROR(abt_errno);
1236  ABTI_xstream_run_thread(p_global, &p_local_xstream, p_thread);
1237  return ABT_SUCCESS;
1238 }
1239 
1273 {
1274  ABTI_xstream *p_local_xstream;
1275 #ifndef ABT_CONFIG_ENABLE_VER_20_API
1276  ABTI_SETUP_GLOBAL(NULL);
1277 #endif
1278  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
1279 
1280  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
1281  ABTI_CHECK_NULL_SCHED_PTR(p_sched);
1282  ABTI_CHECK_TRUE(p_local_xstream->p_thread == &p_sched->p_ythread->thread,
1284 
1285  ABTI_xstream_check_events(p_local_xstream, p_sched);
1286  return ABT_SUCCESS;
1287 }
1288 
1317 int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
1318 {
1319  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1321 
1322  ABTD_affinity_cpuset cpuset;
1323  cpuset.num_cpuids = 1;
1324  cpuset.cpuids = &cpuid;
1325  int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &cpuset);
1326  /* Do not free cpuset since cpuids points to a user pointer. */
1327  ABTI_CHECK_ERROR(abt_errno);
1328  return ABT_SUCCESS;
1329 }
1330 
1366 int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
1367 {
1368  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1369  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1371  int num_cpuid;
1372  int cpuids[1];
1373  int abt_errno =
1374  ABTD_affinity_cpuset_read(&p_xstream->ctx, 1, cpuids, &num_cpuid);
1375  ABTI_CHECK_ERROR(abt_errno);
1376  ABTI_CHECK_TRUE(num_cpuid > 0, ABT_ERR_CPUID);
1377 
1378  *cpuid = cpuids[0];
1379  return ABT_SUCCESS;
1380 }
1381 
1415 int ABT_xstream_set_affinity(ABT_xstream xstream, int num_cpuids, int *cpuids)
1416 {
1417  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1418  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1420 
1421  ABTD_affinity_cpuset affinity;
1422  affinity.num_cpuids = num_cpuids;
1423  affinity.cpuids = cpuids;
1424  int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &affinity);
1425  /* Do not free affinity since cpuids may not be freed. */
1426  ABTI_CHECK_ERROR(abt_errno);
1427  return ABT_SUCCESS;
1428 }
1429 
1478 int ABT_xstream_get_affinity(ABT_xstream xstream, int max_cpuids, int *cpuids,
1479  int *num_cpuids)
1480 {
1481  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1482  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1484 
1485  int abt_errno = ABTD_affinity_cpuset_read(&p_xstream->ctx, max_cpuids,
1486  cpuids, num_cpuids);
1487  ABTI_CHECK_ERROR(abt_errno);
1488  return abt_errno;
1489 }
1490 
1491 /*****************************************************************************/
1492 /* Private APIs */
1493 /*****************************************************************************/
1494 
1496  ABTI_xstream **pp_xstream)
1497 {
1498  int abt_errno;
1499  ABTI_xstream *p_newxstream;
1500  ABTI_sched *p_sched;
1501 
1502  /* For the primary ES, a default scheduler is created. */
1503  abt_errno =
1504  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
1505  ABTI_CHECK_ERROR(abt_errno);
1506 
1507  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_PRIMARY, -1,
1508  ABT_FALSE, &p_newxstream);
1509  if (abt_errno != ABT_SUCCESS) {
1510  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
1511  ABT_TRUE);
1512  ABTI_HANDLE_ERROR(abt_errno);
1513  }
1514 
1515  *pp_xstream = p_newxstream;
1516  return ABT_SUCCESS;
1517 }
1518 
1519 /* This routine starts the primary ES. It should be called in ABT_init. */
1521  ABTI_xstream **pp_local_xstream,
1522  ABTI_xstream *p_xstream,
1523  ABTI_ythread *p_ythread)
1524 {
1525  /* p_ythread must be the main thread. */
1527  /* The ES's state must be running here. */
1530 
1531  LOG_DEBUG("[E%d] start\n", p_xstream->rank);
1532 
1533  ABTD_xstream_context_set_self(&p_xstream->ctx);
1534 
1535  /* Set the CPU affinity for the ES */
1536  if (p_global->set_affinity == ABT_TRUE) {
1537  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
1538  }
1539 
1540  /* Context switch to the root thread. */
1541  p_xstream->p_root_ythread->thread.p_last_xstream = p_xstream;
1542  ABTD_ythread_context_switch(&p_ythread->ctx,
1543  &p_xstream->p_root_ythread->ctx);
1544  /* Come back to the primary thread. Now this thread is executed on top of
1545  * the main scheduler, which is running on the root thread. */
1546  (*pp_local_xstream)->p_thread = &p_ythread->thread;
1547 }
1548 
1549 void ABTI_xstream_run_thread(ABTI_global *p_global,
1550  ABTI_xstream **pp_local_xstream,
1551  ABTI_thread *p_thread)
1552 {
1553  if (p_thread->type & ABTI_THREAD_TYPE_YIELDABLE) {
1554  ABTI_ythread *p_ythread = ABTI_thread_get_ythread(p_thread);
1555  /* Execute a ULT */
1556  xstream_schedule_ythread(p_global, pp_local_xstream, p_ythread);
1557  } else {
1558  /* Execute a tasklet */
1559  xstream_schedule_task(p_global, *pp_local_xstream, p_thread);
1560  }
1561 }
1562 
1563 void ABTI_xstream_check_events(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
1564 {
1566 
1567  uint32_t request = ABTD_atomic_acquire_load_uint32(
1569  if (request & ABTI_THREAD_REQ_JOIN) {
1570  ABTI_sched_finish(p_sched);
1571  }
1572 
1573  if ((request & ABTI_THREAD_REQ_TERMINATE) ||
1574  (request & ABTI_THREAD_REQ_CANCEL)) {
1575  ABTI_sched_exit(p_sched);
1576  }
1577 }
1578 
1579 void ABTI_xstream_free(ABTI_global *p_global, ABTI_local *p_local,
1580  ABTI_xstream *p_xstream, ABT_bool force_free)
1581 {
1582  LOG_DEBUG("[E%d] freed\n", p_xstream->rank);
1583 
1584  /* Clean up memory pool. */
1585  ABTI_mem_finalize_local(p_xstream);
1586  /* Return rank for reuse. rank must be returned prior to other free
1587  * functions so that other xstreams cannot refer to this xstream. */
1588  xstream_return_rank(p_global, p_xstream);
1589 
1590  /* Free the scheduler */
1591  ABTI_sched *p_cursched = p_xstream->p_main_sched;
1592  if (p_cursched != NULL) {
1593  /* Join a scheduler thread. */
1594  ABTI_tool_event_thread_join(p_local, &p_cursched->p_ythread->thread,
1596  ? ABTI_local_get_xstream(p_local)
1597  ->p_thread
1598  : NULL);
1599  ABTI_sched_discard_and_free(p_global, p_local, p_cursched, force_free);
1600  /* The main scheduler thread is also freed. */
1601  }
1602 
1603  /* Free the root thread and pool. */
1604  ABTI_ythread_free_root(p_global, p_local, p_xstream->p_root_ythread);
1605  ABTI_pool_free(p_xstream->p_root_pool);
1606 
1607  /* Free the context if a given xstream is secondary. */
1608  if (p_xstream->type == ABTI_XSTREAM_TYPE_SECONDARY) {
1609  ABTD_xstream_context_free(&p_xstream->ctx);
1610  }
1611 
1612  ABTU_free(p_xstream);
1613 }
1614 
1615 void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os, int indent,
1616  ABT_bool print_sub)
1617 {
1618  if (p_xstream == NULL) {
1619  fprintf(p_os, "%*s== NULL ES ==\n", indent, "");
1620  } else {
1621  const char *type, *state;
1622  switch (p_xstream->type) {
1624  type = "PRIMARY";
1625  break;
1627  type = "SECONDARY";
1628  break;
1629  default:
1630  type = "UNKNOWN";
1631  break;
1632  }
1633  switch (ABTD_atomic_acquire_load_int(&p_xstream->state)) {
1635  state = "RUNNING";
1636  break;
1638  state = "TERMINATED";
1639  break;
1640  default:
1641  state = "UNKNOWN";
1642  break;
1643  }
1644 
1645  fprintf(p_os,
1646  "%*s== ES (%p) ==\n"
1647  "%*srank : %d\n"
1648  "%*stype : %s\n"
1649  "%*sstate : %s\n"
1650  "%*sroot_ythread : %p\n"
1651  "%*sroot_pool : %p\n"
1652  "%*sthread : %p\n"
1653  "%*smain_sched : %p\n",
1654  indent, "", (void *)p_xstream, indent, "", p_xstream->rank,
1655  indent, "", type, indent, "", state, indent, "",
1656  (void *)p_xstream->p_root_ythread, indent, "",
1657  (void *)p_xstream->p_root_pool, indent, "",
1658  (void *)p_xstream->p_thread, indent, "",
1659  (void *)p_xstream->p_main_sched);
1660 
1661  if (print_sub == ABT_TRUE) {
1662  ABTI_sched_print(p_xstream->p_main_sched, p_os,
1663  indent + ABTI_INDENT, ABT_TRUE);
1664  }
1665  fprintf(p_os, "%*sctx :\n", indent, "");
1666  ABTD_xstream_context_print(&p_xstream->ctx, p_os, indent + ABTI_INDENT);
1667  }
1668  fflush(p_os);
1669 }
1670 
1671 static void *xstream_launch_root_ythread(void *p_xstream)
1672 {
1673  ABTI_xstream *p_local_xstream = (ABTI_xstream *)p_xstream;
1674 
1675  /* Initialization of the local variables */
1676  ABTI_local_set_xstream(p_local_xstream);
1677 
1678  LOG_DEBUG("[E%d] start\n", p_local_xstream->rank);
1679 
1680  /* Set the root thread as the current thread */
1681  ABTI_ythread *p_root_ythread = p_local_xstream->p_root_ythread;
1682  p_local_xstream->p_thread = &p_local_xstream->p_root_ythread->thread;
1683  p_root_ythread->thread.f_thread(p_root_ythread->thread.p_arg);
1684 
1685  LOG_DEBUG("[E%d] end\n", p_local_xstream->rank);
1686 
1687  /* Reset the current ES and its local info. */
1688  ABTI_local_set_xstream(NULL);
1689  return NULL;
1690 }
1691 
1692 /*****************************************************************************/
1693 /* Internal static functions */
1694 /*****************************************************************************/
1695 
1696 ABTU_ret_err static int xstream_create(ABTI_global *p_global,
1697  ABTI_sched *p_sched,
1698  ABTI_xstream_type xstream_type, int rank,
1699  ABT_bool start,
1700  ABTI_xstream **pp_xstream)
1702  int abt_errno, init_stage = 0;
1703  ABTI_xstream *p_newxstream;
1704 
1705  abt_errno = ABTU_malloc(sizeof(ABTI_xstream), (void **)&p_newxstream);
1706  ABTI_CHECK_ERROR(abt_errno);
1707 
1708  p_newxstream->p_prev = NULL;
1709  p_newxstream->p_next = NULL;
1710 
1711  if (xstream_set_new_rank(p_global, p_newxstream, rank) == ABT_FALSE) {
1712  abt_errno = ABT_ERR_INV_XSTREAM_RANK;
1713  goto FAILED;
1714  }
1715  init_stage = 1;
1716 
1717  p_newxstream->type = xstream_type;
1718  ABTD_atomic_relaxed_store_int(&p_newxstream->state,
1720  p_newxstream->p_main_sched = NULL;
1721  p_newxstream->p_thread = NULL;
1722  abt_errno = ABTI_mem_init_local(p_global, p_newxstream);
1723  if (abt_errno != ABT_SUCCESS)
1724  goto FAILED;
1725  init_stage = 2;
1726 
1727  /* Set the main scheduler */
1728  xstream_init_main_sched(p_newxstream, p_sched);
1729 
1730  /* Create the root thread. */
1731  abt_errno =
1732  ABTI_ythread_create_root(p_global, ABTI_xstream_get_local(p_newxstream),
1733  p_newxstream, &p_newxstream->p_root_ythread);
1734  if (abt_errno != ABT_SUCCESS)
1735  goto FAILED;
1736  init_stage = 3;
1737 
1738  /* Create the root pool. */
1740  ABT_FALSE, &p_newxstream->p_root_pool);
1741  if (abt_errno != ABT_SUCCESS)
1742  goto FAILED;
1743  init_stage = 4;
1744 
1745  /* Create the main scheduler thread. */
1746  abt_errno =
1748  ABTI_xstream_get_local(p_newxstream),
1749  p_newxstream,
1750  p_newxstream->p_main_sched);
1751  if (abt_errno != ABT_SUCCESS)
1752  goto FAILED;
1753  init_stage = 5;
1754 
1755  if (start) {
1756  /* The ES's state must be RUNNING */
1757  ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_newxstream->state) ==
1759  ABTI_ASSERT(p_newxstream->type != ABTI_XSTREAM_TYPE_PRIMARY);
1760  /* Start the main scheduler on a different ES */
1762  (void *)p_newxstream,
1763  &p_newxstream->ctx);
1764  if (abt_errno != ABT_SUCCESS)
1765  goto FAILED;
1766  init_stage = 6;
1767 
1768  /* Set the CPU affinity for the ES */
1769  if (p_global->set_affinity == ABT_TRUE) {
1770  ABTD_affinity_cpuset_apply_default(&p_newxstream->ctx,
1771  p_newxstream->rank);
1772  }
1773  }
1774 
1775  LOG_DEBUG("[E%d] created\n", p_newxstream->rank);
1776 
1777  /* Return value */
1778  *pp_xstream = p_newxstream;
1779  return ABT_SUCCESS;
1780 FAILED:
1781  if (init_stage >= 5) {
1782  ABTI_thread_free(p_global, ABTI_xstream_get_local(p_newxstream),
1783  &p_newxstream->p_main_sched->p_ythread->thread);
1784  p_newxstream->p_main_sched->p_ythread = NULL;
1785  }
1786  if (init_stage >= 4) {
1787  ABTI_pool_free(p_newxstream->p_root_pool);
1788  }
1789  if (init_stage >= 3) {
1790  ABTI_ythread_free_root(p_global, ABTI_xstream_get_local(p_newxstream),
1791  p_newxstream->p_root_ythread);
1792  }
1793  if (init_stage >= 2) {
1794  p_sched->used = ABTI_SCHED_NOT_USED;
1795  ABTI_mem_finalize_local(p_newxstream);
1796  }
1797  if (init_stage >= 1) {
1798  xstream_return_rank(p_global, p_newxstream);
1799  }
1800  ABTU_free(p_newxstream);
1801  return abt_errno;
1802 }
1803 
1804 ABTU_ret_err static int xstream_join(ABTI_local **pp_local,
1805  ABTI_xstream *p_xstream)
1806 {
1807  /* The primary ES cannot be joined. */
1810  /* The main scheduler cannot join itself. */
1812  &p_xstream->p_main_sched->p_ythread->thread !=
1813  ABTI_local_get_xstream(*pp_local)->p_thread,
1815 
1816  /* Wait until the target ES terminates */
1817  ABTI_sched_finish(p_xstream->p_main_sched);
1818  ABTI_thread_join(pp_local, &p_xstream->p_main_sched->p_ythread->thread);
1819 
1820  /* Normal join request */
1821  ABTD_xstream_context_join(&p_xstream->ctx);
1822 
1825  return ABT_SUCCESS;
1826 }
1827 
1828 static inline void xstream_schedule_ythread(ABTI_global *p_global,
1829  ABTI_xstream **pp_local_xstream,
1830  ABTI_ythread *p_ythread)
1831 {
1832  ABTI_xstream *p_local_xstream = *pp_local_xstream;
1834 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1837  LOG_DEBUG("[U%" PRIu64 ":E%d] canceled\n",
1838  ABTI_thread_get_id(&p_ythread->thread),
1839  p_local_xstream->rank);
1840  ABTD_ythread_cancel(p_local_xstream, p_ythread);
1842  ABTI_xstream_get_local(p_local_xstream),
1843  &p_ythread->thread);
1844  return;
1845  }
1846 #endif
1847 
1848 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1851  int abt_errno =
1852  xstream_migrate_thread(p_global,
1853  ABTI_xstream_get_local(p_local_xstream),
1854  &p_ythread->thread);
1855  if (!ABTI_IS_ERROR_CHECK_ENABLED || abt_errno == ABT_SUCCESS) {
1856  /* Migration succeeded, so we do not need to schedule p_ythread. */
1857  return;
1858  }
1859  }
1860 #endif
1861 
1862  /* Change the last ES */
1863  p_ythread->thread.p_last_xstream = p_local_xstream;
1864 
1865  /* Change the ULT state */
1868 
1869  /* Switch the context */
1870  LOG_DEBUG("[U%" PRIu64 ":E%d] start running\n",
1871  ABTI_thread_get_id(&p_ythread->thread), p_local_xstream->rank);
1872 
1873  /* Since the argument is pp_local_xstream, p_local_xstream->p_thread must be
1874  * yieldable. */
1875  ABTI_ythread *p_self = ABTI_thread_get_ythread(p_local_xstream->p_thread);
1876  p_ythread = ABTI_ythread_context_switch_to_child(pp_local_xstream, p_self,
1877  p_ythread);
1878  /* The previous ULT (p_ythread) may not be the same as one to which the
1879  * context has been switched. */
1880  /* The scheduler continues from here. */
1881  p_local_xstream = *pp_local_xstream;
1882 
1883  LOG_DEBUG("[U%" PRIu64 ":E%d] stopped\n",
1884  ABTI_thread_get_id(&p_ythread->thread), p_local_xstream->rank);
1885 
1886  /* Request handling. */
1887  /* We do not need to acquire-load request since all critical requests
1888  * (BLOCK, ORPHAN, STOP, and NOPUSH) are written by p_ythread. CANCEL might
1889  * be delayed. */
1890  uint32_t request =
1892  if (request & ABTI_THREAD_REQ_TERMINATE) {
1893  /* The ULT has completed its execution or it called the exit request. */
1894  LOG_DEBUG("[U%" PRIu64 ":E%d] finished\n",
1895  ABTI_thread_get_id(&p_ythread->thread),
1896  p_local_xstream->rank);
1898  ABTI_xstream_get_local(p_local_xstream),
1899  &p_ythread->thread);
1900 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1901  } else if (request & ABTI_THREAD_REQ_CANCEL) {
1902  LOG_DEBUG("[U%" PRIu64 ":E%d] canceled\n",
1903  ABTI_thread_get_id(&p_ythread->thread),
1904  p_local_xstream->rank);
1905  ABTD_ythread_cancel(p_local_xstream, p_ythread);
1907  ABTI_xstream_get_local(p_local_xstream),
1908  &p_ythread->thread);
1909 #endif
1910  } else if (!(request & ABTI_THREAD_REQ_NON_YIELD)) {
1911  /* The ULT did not finish its execution.
1912  * Change the state of current running ULT and
1913  * add it to the pool again. */
1914  ABTI_pool_add_thread(&p_ythread->thread);
1915  } else if (request & ABTI_THREAD_REQ_BLOCK) {
1916  LOG_DEBUG("[U%" PRIu64 ":E%d] check blocked\n",
1917  ABTI_thread_get_id(&p_ythread->thread),
1918  p_local_xstream->rank);
1920 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1921  } else if (request & ABTI_THREAD_REQ_MIGRATE) {
1922  /* This is the case when the ULT requests migration of itself. */
1923  int abt_errno =
1924  xstream_migrate_thread(p_global,
1925  ABTI_xstream_get_local(p_local_xstream),
1926  &p_ythread->thread);
1927  /* Migration is optional, so it is okay if it fails. */
1928  (void)abt_errno;
1929 #endif
1930  } else if (request & ABTI_THREAD_REQ_ORPHAN) {
1931  /* The ULT is not pushed back to the pool and is disconnected from any
1932  * pool. */
1933  LOG_DEBUG("[U%" PRIu64 ":E%d] orphaned\n",
1934  ABTI_thread_get_id(&p_ythread->thread),
1935  p_local_xstream->rank);
1937  ABTI_thread_unset_associated_pool(p_global, &p_ythread->thread);
1938  } else {
1939  ABTI_ASSERT(0);
1940  ABTU_unreachable();
1941  }
1942 }
1943 
1944 static inline void xstream_schedule_task(ABTI_global *p_global,
1945  ABTI_xstream *p_local_xstream,
1946  ABTI_thread *p_task)
1947 {
1948 #ifndef ABT_CONFIG_DISABLE_TASK_CANCEL
1951  ABTI_tool_event_thread_cancel(p_local_xstream, p_task);
1953  ABTI_xstream_get_local(p_local_xstream),
1954  p_task);
1955  return;
1956  }
1957 #endif
1958 
1959  /* Change the task state */
1961 
1962  /* Set the associated ES */
1963  p_task->p_last_xstream = p_local_xstream;
1964 
1965  /* Execute the task function */
1966  LOG_DEBUG("[T%" PRIu64 ":E%d] running\n", ABTI_thread_get_id(p_task),
1967  p_local_xstream->rank);
1968 
1969  ABTI_thread *p_sched_thread = p_local_xstream->p_thread;
1970  p_local_xstream->p_thread = p_task;
1971  p_task->p_parent = p_sched_thread;
1972 
1973  /* Execute the task function */
1974  ABTI_tool_event_thread_run(p_local_xstream, p_task, p_sched_thread,
1975  p_sched_thread);
1976  LOG_DEBUG("[T%" PRIu64 ":E%d] running\n", ABTI_thread_get_id(p_task),
1977  p_local_xstream->rank);
1978  p_task->f_thread(p_task->p_arg);
1979  ABTI_tool_event_thread_finish(p_local_xstream, p_task, p_sched_thread);
1980  LOG_DEBUG("[T%" PRIu64 ":E%d] stopped\n", ABTI_thread_get_id(p_task),
1981  p_local_xstream->rank);
1982 
1983  /* Set the current running scheduler's thread */
1984  p_local_xstream->p_thread = p_sched_thread;
1985 
1986  /* Terminate the tasklet */
1988  ABTI_xstream_get_local(p_local_xstream),
1989  p_task);
1990 }
1991 
1992 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1993 ABTU_ret_err static int xstream_migrate_thread(ABTI_global *p_global,
1994  ABTI_local *p_local,
1995  ABTI_thread *p_thread)
1996 {
1997  int abt_errno;
1998  ABTI_pool *p_pool;
1999 
2000  ABTI_thread_mig_data *p_mig_data;
2001  abt_errno =
2002  ABTI_thread_get_mig_data(p_global, p_local, p_thread, &p_mig_data);
2003  ABTI_CHECK_ERROR(abt_errno);
2004 
2005  /* Extracting an argument embedded in a migration request. */
2006  p_pool = ABTD_atomic_relaxed_load_ptr(&p_mig_data->p_migration_pool);
2007 
2008  /* Change the associated pool */
2009  abt_errno = ABTI_thread_set_associated_pool(p_global, p_thread, p_pool);
2010  ABTI_CHECK_ERROR(abt_errno);
2011 
2012  /* callback function */
2013  if (p_mig_data->f_migration_cb) {
2014  ABTI_ythread *p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
2015  if (p_ythread) {
2016  ABT_thread thread = ABTI_ythread_get_handle(p_ythread);
2017  p_mig_data->f_migration_cb(thread, p_mig_data->p_migration_cb_arg);
2018  }
2019  }
2020 
2021  /* If request is set, p_migration_pool has a valid pool pointer. */
2024 
2025  /* Unset the migration request. */
2027 
2028  /* Add the unit to the scheduler's pool */
2029  ABTI_pool_push(p_pool, p_thread->unit);
2030  return ABT_SUCCESS;
2031 }
2032 #endif
2033 
2034 static void xstream_init_main_sched(ABTI_xstream *p_xstream,
2035  ABTI_sched *p_sched)
2036 {
2037  ABTI_ASSERT(p_xstream->p_main_sched == NULL);
2038  /* Set the scheduler as a main scheduler */
2039  p_sched->used = ABTI_SCHED_MAIN;
2040  /* Set the scheduler */
2041  p_xstream->p_main_sched = p_sched;
2042 }
2043 
2044 static int xstream_update_main_sched(ABTI_global *p_global,
2045  ABTI_xstream **pp_local_xstream,
2046  ABTI_xstream *p_xstream,
2047  ABTI_sched *p_sched)
2048 {
2049  ABTI_sched *p_main_sched = p_xstream->p_main_sched;
2050  if (p_main_sched == NULL) {
2051  /* Set the scheduler as a main scheduler */
2052  p_sched->used = ABTI_SCHED_MAIN;
2053  /* Set the scheduler */
2054  p_xstream->p_main_sched = p_sched;
2055  return ABT_SUCCESS;
2056  } else if (*pp_local_xstream != p_xstream) {
2057  /* Changing the scheduler of another execution stream. */
2059  /* Use the original scheduler's thread. Unit creation might fail, so it
2060  * should be done first. */
2061  ABTI_pool *p_tar_pool = ABTI_pool_get_ptr(p_sched->pools[0]);
2062  int abt_errno =
2064  &p_main_sched->p_ythread->thread,
2065  p_tar_pool);
2066  ABTI_CHECK_ERROR(abt_errno);
2067 
2068  /* Set the scheduler as a main scheduler */
2069  p_sched->used = ABTI_SCHED_MAIN;
2070  p_sched->p_ythread = p_main_sched->p_ythread;
2071  p_main_sched->p_ythread = NULL;
2072  /* p_main_sched is no longer used. */
2073  p_xstream->p_main_sched->used = ABTI_SCHED_NOT_USED;
2074  if (p_xstream->p_main_sched->automatic) {
2075  /* Free that scheduler. */
2076  ABTI_sched_free(p_global, ABTI_xstream_get_local(*pp_local_xstream),
2077  p_xstream->p_main_sched, ABT_FALSE);
2078  }
2079  p_xstream->p_main_sched = p_sched;
2080  return ABT_SUCCESS;
2081  } else {
2082  /* If the ES has a main scheduler, we have to free it */
2083  ABTI_thread *p_thread = (*pp_local_xstream)->p_thread;
2085  ABTI_ythread *p_ythread = ABTI_thread_get_ythread(p_thread);
2086  ABTI_pool *p_tar_pool = ABTI_pool_get_ptr(p_sched->pools[0]);
2087 
2088  /* If the caller ULT is associated with a pool of the current main
2089  * scheduler, it needs to be associated to a pool of new scheduler. */
2090  size_t p;
2091  for (p = 0; p < p_main_sched->num_pools; p++) {
2092  if (p_ythread->thread.p_pool ==
2093  ABTI_pool_get_ptr(p_main_sched->pools[p])) {
2094  /* Associate the work unit to the first pool of new scheduler */
2095  int abt_errno =
2097  &p_ythread->thread,
2098  p_tar_pool);
2099  ABTI_CHECK_ERROR(abt_errno);
2100  break;
2101  }
2102  }
2103 
2104  if (p_main_sched->p_replace_sched) {
2105  /* We need to overwrite the scheduler. Free the existing one. */
2106  ABTI_ythread *p_waiter = p_main_sched->p_replace_waiter;
2107  ABTI_sched_discard_and_free(p_global,
2109  *pp_local_xstream),
2110  p_main_sched->p_replace_sched,
2111  ABT_FALSE);
2112  p_main_sched->p_replace_sched = NULL;
2113  p_main_sched->p_replace_waiter = NULL;
2114  /* Resume the waiter. This waiter sees that the scheduler finished
2115  * immediately and was replaced by this new scheduler. */
2117  p_waiter);
2118  }
2119  ABTI_ythread_set_blocked(p_ythread);
2120  /* Set the replace scheduler */
2121  p_main_sched->p_replace_sched = p_sched;
2122  p_main_sched->p_replace_waiter = p_ythread;
2123  /* Ask the current main scheduler to replace its scheduler */
2125 
2126  /* Switch to the current main scheduler. The current ULT is pushed to
2127  * the new scheduler's pool so that when the new scheduler starts, this
2128  * ULT can be scheduled by the new scheduler. The existing main
2129  * scheduler will be freed by ABTI_SCHED_REQ_RELEASE. */
2130  ABTI_ythread_suspend(pp_local_xstream, p_ythread,
2132  return ABT_SUCCESS;
2133  }
2134 }
2135 
2136 static void xstream_update_max_xstreams(ABTI_global *p_global, int newrank)
2137 {
2138  /* The lock must be taken. */
2139  if (newrank >= p_global->max_xstreams) {
2140  static int max_xstreams_warning_once = 0;
2141  if (max_xstreams_warning_once == 0) {
2142  /* Because some Argobots functionalities depend on the runtime value
2143  * ABT_MAX_NUM_XSTREAMS (or p_global->max_xstreams), changing
2144  * this value at run-time can cause an error. For example, using
2145  * ABT_mutex created before updating max_xstreams causes an error
2146  * since ABTI_thread_htable's array size depends on
2147  * ABT_MAX_NUM_XSTREAMS. To fix this issue, please set a larger
2148  * number to ABT_MAX_NUM_XSTREAMS in advance. */
2149  char *warning_message;
2150  int abt_errno =
2151  ABTU_malloc(sizeof(char) * 1024, (void **)&warning_message);
2152  if (!ABTI_IS_ERROR_CHECK_ENABLED || abt_errno == ABT_SUCCESS) {
2153  snprintf(warning_message, 1024,
2154  "Warning: the number of execution streams exceeds "
2155  "ABT_MAX_NUM_XSTREAMS (=%d). This may cause an error.",
2156  p_global->max_xstreams);
2157  HANDLE_WARNING(warning_message);
2158  ABTU_free(warning_message);
2159  max_xstreams_warning_once = 1;
2160  }
2161  }
2162  /* Anyway. let's increase max_xstreams. */
2163  p_global->max_xstreams = newrank + 1;
2164  }
2165 }
2166 
2167 /* Add p_newxstream to the list. This does not check the rank duplication. */
2168 static void xstream_add_xstream_list(ABTI_global *p_global,
2169  ABTI_xstream *p_newxstream)
2170 {
2171  int rank = p_newxstream->rank;
2172  ABTI_xstream *p_prev_xstream = p_global->p_xstream_head;
2173  ABTI_xstream *p_xstream = p_prev_xstream;
2174  /* Check if a certain rank is available */
2175  while (p_xstream) {
2176  ABTI_ASSERT(p_xstream->rank != rank);
2177  if (p_xstream->rank > rank) {
2178  /* Use this p_xstream. */
2179  break;
2180  }
2181  p_prev_xstream = p_xstream;
2182  p_xstream = p_xstream->p_next;
2183  }
2184 
2185  if (!p_xstream) {
2186  /* p_newxstream is appended to p_prev_xstream */
2187  if (p_prev_xstream) {
2188  p_prev_xstream->p_next = p_newxstream;
2189  p_newxstream->p_prev = p_prev_xstream;
2190  p_newxstream->p_next = NULL;
2191  } else {
2192  ABTI_ASSERT(p_global->p_xstream_head == NULL);
2193  p_newxstream->p_prev = NULL;
2194  p_newxstream->p_next = NULL;
2195  p_global->p_xstream_head = p_newxstream;
2196  }
2197  } else {
2198  /* p_newxstream is inserted in the middle.
2199  * (p_xstream->p_prev) -> p_new_xstream -> p_xstream */
2200  if (p_xstream->p_prev) {
2201  p_xstream->p_prev->p_next = p_newxstream;
2202  p_newxstream->p_prev = p_xstream->p_prev;
2203  } else {
2204  /* This p_xstream is the first element */
2205  ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
2206  p_global->p_xstream_head = p_newxstream;
2207  }
2208  p_xstream->p_prev = p_newxstream;
2209  p_newxstream->p_next = p_xstream;
2210  }
2211 }
2212 
2213 /* Remove p_xstream from the list. */
2214 static void xstream_remove_xstream_list(ABTI_global *p_global,
2215  ABTI_xstream *p_xstream)
2216 {
2217  if (!p_xstream->p_prev) {
2218  ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
2219  p_global->p_xstream_head = p_xstream->p_next;
2220  } else {
2221  p_xstream->p_prev->p_next = p_xstream->p_next;
2222  }
2223  if (p_xstream->p_next) {
2224  p_xstream->p_next->p_prev = p_xstream->p_prev;
2225  }
2226 }
2227 
2228 /* Set a new rank to ES */
2229 static ABT_bool xstream_set_new_rank(ABTI_global *p_global,
2230  ABTI_xstream *p_newxstream, int rank)
2231 {
2233 
2234  if (rank == -1) {
2235  /* Find an unused rank from 0. */
2236  rank = 0;
2237  ABTI_xstream *p_xstream = p_global->p_xstream_head;
2238  while (p_xstream) {
2239  if (p_xstream->rank == rank) {
2240  rank++;
2241  } else {
2242  /* Use this rank. */
2243  break;
2244  }
2245  p_xstream = p_xstream->p_next;
2246  }
2247  } else {
2248  /* Check if a certain rank is available */
2249  ABTI_xstream *p_xstream = p_global->p_xstream_head;
2250  while (p_xstream) {
2251  if (p_xstream->rank == rank) {
2253  return ABT_FALSE;
2254  } else if (p_xstream->rank > rank) {
2255  break;
2256  }
2257  p_xstream = p_xstream->p_next;
2258  }
2259  }
2260  /* Set the rank */
2261  p_newxstream->rank = rank;
2262  xstream_add_xstream_list(p_global, p_newxstream);
2263  xstream_update_max_xstreams(p_global, rank);
2264  p_global->num_xstreams++;
2265 
2267  return ABT_TRUE;
2268 }
2269 
2270 /* Change the rank of ES */
2271 static ABT_bool xstream_change_rank(ABTI_global *p_global,
2272  ABTI_xstream *p_xstream, int rank)
2273 {
2274  if (p_xstream->rank == rank) {
2275  /* No need to change the rank. */
2276  return ABT_TRUE;
2277  }
2278 
2280 
2281  ABTI_xstream *p_next = p_global->p_xstream_head;
2282  /* Check if a certain rank is available. */
2283  while (p_next) {
2284  if (p_next->rank == rank) {
2286  return ABT_FALSE;
2287  } else if (p_next->rank > rank) {
2288  break;
2289  }
2290  p_next = p_next->p_next;
2291  }
2292  /* Let's remove p_xstream from the list first. */
2293  xstream_remove_xstream_list(p_global, p_xstream);
2294  /* Then, let's add this p_xstream. */
2295  p_xstream->rank = rank;
2296  xstream_add_xstream_list(p_global, p_xstream);
2297  xstream_update_max_xstreams(p_global, rank);
2298 
2300  return ABT_TRUE;
2301 }
2302 
2303 static void xstream_return_rank(ABTI_global *p_global, ABTI_xstream *p_xstream)
2304 {
2305  /* Remove this xstream from the global ES list */
2307 
2308  xstream_remove_xstream_list(p_global, p_xstream);
2309  p_global->num_xstreams--;
2310 
2312 }
ABT_xstream_set_affinity
int ABT_xstream_set_affinity(ABT_xstream xstream, int num_cpuids, int *cpuids)
Bind an execution stream to target CPUs.
Definition: stream.c:1419
ABT_THREAD_STATE_TERMINATED
@ ABT_THREAD_STATE_TERMINATED
Definition: abt.h:423
HANDLE_WARNING
#define HANDLE_WARNING(msg)
Definition: abti_error.h:30
ABTU_min_int
static int ABTU_min_int(int a, int b)
Definition: abtu.h:45
ABTI_CHECK_NULL_SCHED_PTR
#define ABTI_CHECK_NULL_SCHED_PTR(p)
Definition: abti_error.h:177
ABT_ERR_INV_THREAD
#define ABT_ERR_INV_THREAD
Error code: invalid work unit.
Definition: abt.h:176
ABT_sched_predef
ABT_sched_predef
Predefined scheduler type.
Definition: abt.h:465
ABTI_sched_get_ptr
static ABTI_sched * ABTI_sched_get_ptr(ABT_sched sched)
Definition: abti_sched.h:11
xstream_join
static ABTU_ret_err int xstream_join(ABTI_local **pp_local, ABTI_xstream *p_xstream)
Definition: stream.c:1809
ABTI_xstream::ctx
ABTD_xstream_context ctx
Definition: abti.h:274
ABTI_thread_mig_data::f_migration_cb
void(* f_migration_cb)(ABT_thread, void *)
Definition: abti.h:400
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1001
ABTD_XSTREAM_CONTEXT_STATE_WAITING
@ ABTD_XSTREAM_CONTEXT_STATE_WAITING
Definition: abtd.h:19
ABTI_XSTREAM_TYPE_PRIMARY
@ ABTI_XSTREAM_TYPE_PRIMARY
Definition: abti.h:71
ABT_ERR_CPUID
#define ABT_ERR_CPUID
Error code: error related to CPU ID.
Definition: abt.h:398
ABTI_THREAD_REQ_TERMINATE
#define ABTI_THREAD_REQ_TERMINATE
Definition: abti.h:43
xstream_update_max_xstreams
static void xstream_update_max_xstreams(ABTI_global *p_global, int newrank)
Definition: stream.c:2141
ABTI_sched_create_basic
ABTU_ret_err int ABTI_sched_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABTI_sched_config *p_config, ABTI_sched **pp_newsched)
Definition: sched.c:655
ABTI_SCHED_NOT_USED
@ ABTI_SCHED_NOT_USED
Definition: abti.h:76
ABTI_SETUP_LOCAL_XSTREAM
#define ABTI_SETUP_LOCAL_XSTREAM(pp_local_xstream)
Definition: abti_error.h:73
ABTD_atomic_acquire_load_uint32
static uint32_t ABTD_atomic_acquire_load_uint32(const ABTD_atomic_uint32 *ptr)
Definition: abtd_atomic.h:928
ABTD_affinity_cpuset_apply
ABTU_ret_err int ABTD_affinity_cpuset_apply(ABTD_xstream_context *p_ctx, const ABTD_affinity_cpuset *p_cpuset)
Definition: abtd_affinity.c:436
ABT_thread
struct ABT_thread_opaque * ABT_thread
Work unit handle type.
Definition: abt.h:890
ABTI_thread_mig_data::p_migration_pool
ABTD_atomic_ptr p_migration_pool
Definition: abti.h:403
ABTI_xstream_create_primary
ABTU_ret_err int ABTI_xstream_create_primary(ABTI_global *p_global, ABTI_xstream **pp_xstream)
Definition: stream.c:1500
ABTI_SETUP_GLOBAL
#define ABTI_SETUP_GLOBAL(pp_global)
Definition: abti_error.h:59
ABTI_global::xstream_list_lock
ABTD_spinlock xstream_list_lock
Definition: abti.h:201
ABTI_unit_set_associated_pool
static ABTU_ret_err int ABTI_unit_set_associated_pool(ABTI_global *p_global, ABT_unit unit, ABTI_pool *p_pool, ABTI_thread **pp_thread)
Definition: abti_unit.h:55
ABT_xstream_self
int ABT_xstream_self(ABT_xstream *xstream)
Get an execution stream that is running the calling work unit.
Definition: stream.c:626
ABTD_xstream_context_print
void ABTD_xstream_context_print(ABTD_xstream_context *p_ctx, FILE *p_os, int indent)
Definition: abtd_stream.c:143
ABT_ERR_INV_XSTREAM
#define ABT_ERR_INV_XSTREAM
Error code: invalid execution stream.
Definition: abt.h:114
ABTI_thread::type
ABTI_thread_type type
Definition: abti.h:375
ABTI_thread_revive
ABTU_ret_err int ABTI_thread_revive(ABTI_global *p_global, ABTI_local *p_local, ABTI_pool *p_pool, void(*thread_func)(void *), void *arg, ABTI_thread *p_thread)
Definition: thread.c:2257
xstream_change_rank
static ABT_bool xstream_change_rank(ABTI_global *p_global, ABTI_xstream *p_xstream, int rank)
Definition: stream.c:2276
ABTI_sched_exit
void ABTI_sched_exit(ABTI_sched *p_sched)
Definition: sched.c:650
ABTI_global_get_global
static ABTI_global * ABTI_global_get_global(void)
Definition: abti_global.h:9
ABT_xstream_set_rank
int ABT_xstream_set_rank(ABT_xstream xstream, int rank)
Set a rank for an execution stream.
Definition: stream.c:716
ABT_xstream_get_num
int ABT_xstream_get_num(int *num_xstreams)
Get the number of current existing execution streams.
Definition: stream.c:1152
ABT_xstream_get_affinity
int ABT_xstream_get_affinity(ABT_xstream xstream, int max_cpuids, int *cpuids, int *num_cpuids)
Get CPU IDs of CPUs to which an execution stream is bound.
Definition: stream.c:1483
ABTI_CHECK_ERROR
#define ABTI_CHECK_ERROR(abt_errno)
Definition: abti_error.h:120
ABTI_sched::num_pools
size_t num_pools
Definition: abti.h:299
ABTI_sched::automatic
ABT_bool automatic
Definition: abti.h:291
ABTI_xstream::rank
int rank
Definition: abti.h:269
ABTI_thread::p_arg
void * p_arg
Definition: abti.h:380
ABTI_THREAD_REQ_ORPHAN
#define ABTI_THREAD_REQ_ORPHAN
Definition: abti.h:47
ABTI_sched_get_handle
static ABT_sched ABTI_sched_get_handle(ABTI_sched *p_sched)
Definition: abti_sched.h:26
ABTI_thread::unit
ABT_unit unit
Definition: abti.h:376
ABT_sched_config
struct ABT_sched_config_opaque * ABT_sched_config
Scheduler configuration handle type.
Definition: abt.h:815
ABTI_xstream::type
ABTI_xstream_type type
Definition: abti.h:270
ABT_SCHED_DEFAULT
@ ABT_SCHED_DEFAULT
Definition: abt.h:467
ABTI_THREAD_TYPE_YIELDABLE
#define ABTI_THREAD_TYPE_YIELDABLE
Definition: abti.h:86
ABTI_thread::request
ABTD_atomic_uint32 request
Definition: abti.h:382
ABTI_INDENT
#define ABTI_INDENT
Definition: abti.h:56
ABTI_thread_get_ythread
static ABTI_ythread * ABTI_thread_get_ythread(ABTI_thread *p_thread)
Definition: abti_thread.h:52
xstream_schedule_ythread
static void xstream_schedule_ythread(ABTI_global *p_global, ABTI_xstream **pp_local_xstream, ABTI_ythread *p_ythread)
Definition: stream.c:1833
ABTD_xstream_context_free
void ABTD_xstream_context_free(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:90
ABTI_CHECK_TRUE_MSG
#define ABTI_CHECK_TRUE_MSG(cond, abt_errno, msg)
Definition: abti_error.h:151
ABTI_thread_get_id
ABT_unit_id ABTI_thread_get_id(ABTI_thread *p_thread)
Definition: thread.c:2561
xstream_return_rank
static void xstream_return_rank(ABTI_global *p_global, ABTI_xstream *p_xstream)
Definition: stream.c:2308
ABTI_thread_mig_data
Definition: abti.h:399
ABTI_THREAD_REQ_NON_YIELD
#define ABTI_THREAD_REQ_NON_YIELD
Definition: abti.h:48
ABTI_pool_push
static void ABTI_pool_push(ABTI_pool *p_pool, ABT_unit unit)
Definition: abti_pool.h:53
ABTI_thread_free
void ABTI_thread_free(ABTI_global *p_global, ABTI_local *p_local, ABTI_thread *p_thread)
Definition: thread.c:2400
ABTI_xstream_run_thread
void ABTI_xstream_run_thread(ABTI_global *p_global, ABTI_xstream **pp_local_xstream, ABTI_thread *p_thread)
Definition: stream.c:1554
ABTI_thread_set_associated_pool
static ABTU_ret_err int ABTI_thread_set_associated_pool(ABTI_global *p_global, ABTI_thread *p_thread, ABTI_pool *p_pool)
Definition: abti_unit.h:147
xstream_remove_xstream_list
static void xstream_remove_xstream_list(ABTI_global *p_global, ABTI_xstream *p_xstream)
Definition: stream.c:2219
ABT_xstream_cancel
int ABT_xstream_cancel(ABT_xstream xstream)
Send a cancellation request to an execution stream.
Definition: stream.c:581
ABTI_thread
Definition: abti.h:371
ABTI_IS_ERROR_CHECK_ENABLED
#define ABTI_IS_ERROR_CHECK_ENABLED
Definition: abti.h:20
ABT_xstream_create
int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
Create a new execution stream.
Definition: stream.c:87
ABTI_xstream_check_events
void ABTI_xstream_check_events(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:1568
xstream_migrate_thread
static ABTU_ret_err int xstream_migrate_thread(ABTI_global *p_global, ABTI_local *p_local, ABTI_thread *p_thread)
Definition: stream.c:1998
ABTI_xstream
Definition: abti.h:264
ABT_xstream_get_main_sched
int ABT_xstream_get_main_sched(ABT_xstream xstream, ABT_sched *sched)
Retrieve the main scheduler of an execution stream.
Definition: stream.c:1006
ABTI_sched_config
Definition: abti.h:323
ABT_pool
struct ABT_pool_opaque * ABT_pool
Pool handle type.
Definition: abt.h:841
ABTI_sched::p_ythread
ABTI_ythread * p_ythread
Definition: abti.h:300
ABT_xstream_get_rank
int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
Retrieve a rank of an execution stream.
Definition: stream.c:761
ABT_xstream_self_rank
int ABT_xstream_self_rank(int *rank)
Return a rank of an execution stream associated with a caller.
Definition: stream.c:669
ABTI_xstream::p_next
ABTI_xstream * p_next
Definition: abti.h:267
ABT_POOL_ACCESS_MPSC
@ ABT_POOL_ACCESS_MPSC
Definition: abt.h:535
xstream_update_main_sched
static ABTU_ret_err int xstream_update_main_sched(ABTI_global *p_global, ABTI_xstream **pp_local_xstream, ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:2049
ABTD_xstream_context_revive
void ABTD_xstream_context_revive(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:128
ABT_xstream_is_primary
int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *is_primary)
Check if the target execution stream is primary.
Definition: stream.c:1185
ABTI_sched::used
ABTI_sched_used used
Definition: abti.h:290
ABT_sched
struct ABT_sched_opaque * ABT_sched
Scheduler handle type.
Definition: abt.h:808
ABT_ERR_INV_SCHED
#define ABT_ERR_INV_SCHED
Error code: invalid scheduler.
Definition: abt.h:129
ABTI_pool
Definition: abti.h:327
ABTI_THREAD_TYPE_PRIMARY
#define ABTI_THREAD_TYPE_PRIMARY
Definition: abti.h:84
ABTI_xstream_get_handle
static ABT_xstream ABTI_xstream_get_handle(ABTI_xstream *p_xstream)
Definition: abti_stream.h:26
ABT_POOL_NULL
#define ABT_POOL_NULL
Definition: abt.h:1059
xstream_init_main_sched
static void xstream_init_main_sched(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:2039
abti.h
ABTI_pool_free
void ABTI_pool_free(ABTI_pool *p_pool)
Definition: pool.c:891
ABTI_THREAD_REQ_MIGRATE
#define ABTI_THREAD_REQ_MIGRATE
Definition: abti.h:45
ABTI_info_check_print_all_thread_stacks
void ABTI_info_check_print_all_thread_stacks(void)
Definition: info.c:939
ABTD_xstream_context_create
ABTU_ret_err int ABTD_xstream_context_create(void *(*f_xstream)(void *), void *p_arg, ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:48
ABTI_SCHED_REQ_REPLACE
#define ABTI_SCHED_REQ_REPLACE
Definition: abti.h:40
xstream_create
static ABTU_ret_err int xstream_create(ABTI_global *p_global, ABTI_sched *p_sched, ABTI_xstream_type xstream_type, int rank, ABT_bool start, ABTI_xstream **pp_xstream)
Definition: stream.c:1701
ABTD_atomic_relaxed_store_uint32
static void ABTD_atomic_relaxed_store_uint32(ABTD_atomic_uint32 *ptr, uint32_t val)
Definition: abtd_atomic.h:1025
ABTI_ythread_exit
ABTU_noreturn void ABTI_ythread_exit(ABTI_xstream *p_local_xstream, ABTI_ythread *p_ythread)
Definition: thread.c:2425
ABTI_sched_discard_and_free
static void ABTI_sched_discard_and_free(ABTI_global *p_global, ABTI_local *p_local, ABTI_sched *p_sched, ABT_bool force_free)
Definition: abti_sched.h:43
ABTD_atomic_relaxed_load_int
static int ABTD_atomic_relaxed_load_int(const ABTD_atomic_int *ptr)
Definition: abtd_atomic.h:763
ABTI_ythread_free_root
void ABTI_ythread_free_root(ABTI_global *p_global, ABTI_local *p_local, ABTI_ythread *p_ythread)
Definition: thread.c:2419
ABT_SYNC_EVENT_TYPE_OTHER
@ ABT_SYNC_EVENT_TYPE_OTHER
Definition: abt.h:666
ABT_xstream
struct ABT_xstream_opaque * ABT_xstream
Execution stream handle type.
Definition: abt.h:789
ABTI_thread_get_ythread_or_null
static ABTI_ythread * ABTI_thread_get_ythread_or_null(ABTI_thread *p_thread)
Definition: abti_thread.h:59
ABTI_pool_add_thread
static void ABTI_pool_add_thread(ABTI_thread *p_thread)
Definition: abti_pool.h:59
ABTI_thread::state
ABTD_atomic_int state
Definition: abti.h:381
ABTD_spinlock_acquire
static void ABTD_spinlock_acquire(ABTD_spinlock *p_lock)
Definition: abtd_spinlock.h:28
ABTI_HANDLE_ERROR
#define ABTI_HANDLE_ERROR(n)
Definition: abti_error.h:114
ABT_xstream_create_basic
int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABT_xstream *newxstream)
Create a new execution stream with a predefined scheduler.
Definition: stream.c:179
ABTD_xstream_context_set_self
void ABTD_xstream_context_set_self(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:138
ABT_xstream_create_with_rank
int ABT_xstream_create_with_rank(ABT_sched sched, int rank, ABT_xstream *newxstream)
Create a new execution stream with a specific rank.
Definition: stream.c:272
xstream_set_new_rank
static ABT_bool xstream_set_new_rank(ABTI_global *p_global, ABTI_xstream *p_newxstream, int rank)
Definition: stream.c:2234
ABTU_malloc
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:262
LOG_DEBUG
#define LOG_DEBUG(fmt,...)
Definition: abti_log.h:26
ABTI_sched_finish
void ABTI_sched_finish(ABTI_sched *p_sched)
Definition: sched.c:645
ABTD_atomic_relaxed_store_int
static void ABTD_atomic_relaxed_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:996
ABT_xstream_check_events
int ABT_xstream_check_events(ABT_sched sched)
Process events associated with a scheduler.
Definition: stream.c:1275
ABT_ERR_INV_UNIT
#define ABT_ERR_INV_UNIT
Error code: invalid work unit for scheduling.
Definition: abt.h:171
ABT_ERR_XSTREAM_STATE
#define ABT_ERR_XSTREAM_STATE
Error code: error related to an execution stream state.
Definition: abt.h:262
ABT_POOL_FIFO
@ ABT_POOL_FIFO
Definition: abt.h:506
ABTI_pool_create_basic
ABTU_ret_err int ABTI_pool_create_basic(ABT_pool_kind kind, ABT_pool_access access, ABT_bool automatic, ABTI_pool **pp_newpool)
Definition: pool.c:858
ABTI_xstream::p_main_sched
ABTI_sched * p_main_sched
Definition: abti.h:272
ABTI_thread_mig_data::p_migration_cb_arg
void * p_migration_cb_arg
Definition: abti.h:401
ABT_xstream_set_main_sched_basic
int ABT_xstream_set_main_sched_basic(ABT_xstream xstream, ABT_sched_predef predef, int num_pools, ABT_pool *pools)
Set the main scheduler of an execution stream to a predefined scheduler.
Definition: stream.c:955
ABT_xstream_get_main_pools
int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools, ABT_pool *pools)
Get pools associated with the main scheduler of an execution stream.
Definition: stream.c:1043
ABTI_XSTREAM_TYPE_SECONDARY
@ ABTI_XSTREAM_TYPE_SECONDARY
Definition: abti.h:72
ABT_xstream_set_cpubind
int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
Bind an execution stream to a target CPU.
Definition: stream.c:1320
ABTI_thread_join
void ABTI_thread_join(ABTI_local **pp_local, ABTI_thread *p_thread)
Definition: thread.c:2395
xstream_schedule_task
static void xstream_schedule_task(ABTI_global *p_global, ABTI_xstream *p_local_xstream, ABTI_thread *p_task)
Definition: stream.c:1949
ABT_unit
struct ABT_unit_opaque * ABT_unit
Work unit handle type for scheduling.
Definition: abt.h:869
ABTI_ythread::ctx
ABTD_ythread_context ctx
Definition: abti.h:408
ABTI_ASSERT
#define ABTI_ASSERT(cond)
Definition: abti_error.h:12
ABTI_sched_set_request
static void ABTI_sched_set_request(ABTI_sched *p_sched, uint32_t req)
Definition: abti_sched.h:60
ABTI_xstream_terminate_thread
static void ABTI_xstream_terminate_thread(ABTI_global *p_global, ABTI_local *p_local, ABTI_thread *p_thread)
Definition: abti_stream.h:48
ABTI_sched::pools
ABT_pool * pools
Definition: abti.h:298
ABTI_sched::p_replace_sched
ABTI_sched * p_replace_sched
Definition: abti.h:294
ABT_xstream_exit
int ABT_xstream_exit(void)
Terminate an execution stream that is running the calling ULT.
Definition: stream.c:531
ABTI_tool_event_thread_run
#define ABTI_tool_event_thread_run(p_local_xstream, p_thread, p_prev, p_parent)
Definition: abti_tool.h:288
ABTI_local_get_local
static ABTI_local * ABTI_local_get_local(void)
Definition: abti_local.h:41
ABTI_xstream::state
ABTD_atomic_int state
Definition: abti.h:271
ABTI_global::num_xstreams
int num_xstreams
Definition: abti.h:198
ABT_xstream_join
int ABT_xstream_join(ABT_xstream xstream)
Wait for an execution stream to terminate.
Definition: stream.c:486
ABTI_local_set_xstream
static void ABTI_local_set_xstream(ABTI_xstream *p_local_xstream)
Definition: abti_local.h:60
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
ABTI_tool_event_thread_join
#define ABTI_tool_event_thread_join(p_local, p_thread, p_caller)
Definition: abti_tool.h:266
ABTI_xstream::p_root_pool
ABTI_pool * p_root_pool
Definition: abti.h:278
ABTI_local_get_local_uninlined
static ABTI_local * ABTI_local_get_local_uninlined(void)
Definition: abti_local.h:51
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:146
ABTI_ythread_get_handle
static ABT_thread ABTI_ythread_get_handle(ABTI_ythread *p_thread)
ABTI_xstream_free
void ABTI_xstream_free(ABTI_global *p_global, ABTI_local *p_local, ABTI_xstream *p_xstream, ABT_bool force_free)
Definition: stream.c:1584
ABTI_SETUP_LOCAL_YTHREAD
#define ABTI_SETUP_LOCAL_YTHREAD(pp_local_xstream, pp_ythread)
Definition: abti_error.h:88
ABTI_global::p_xstream_head
ABTI_xstream * p_xstream_head
Definition: abti.h:199
ABTD_atomic_acquire_load_int
static int ABTD_atomic_acquire_load_int(const ABTD_atomic_int *ptr)
Definition: abtd_atomic.h:878
ABT_xstream_get_cpubind
int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
Get CPU ID of a CPU to which an execution stream is bound.
Definition: stream.c:1370
ABTI_ythread_set_blocked
void ABTI_ythread_set_blocked(ABTI_ythread *p_ythread)
Definition: ythread.c:23
ABTI_THREAD_REQ_CANCEL
#define ABTI_THREAD_REQ_CANCEL
Definition: abti.h:44
ABTD_xstream_context::state
ABTD_xstream_context_state state
Definition: abtd.h:28
ABTI_local_get_xstream_or_null
static ABTI_xstream * ABTI_local_get_xstream_or_null(ABTI_local *p_local)
Definition: abti_local.h:77
ABTI_SCHED_MAIN
@ ABTI_SCHED_MAIN
Definition: abti.h:77
ABTI_mem_init_local
ABTU_ret_err int ABTI_mem_init_local(ABTI_global *p_global, ABTI_xstream *p_local_xstream)
Definition: malloc.c:186
ABTI_xstream_get_local
static ABTI_local * ABTI_xstream_get_local(ABTI_xstream *p_xstream)
Definition: abti_stream.h:68
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:748
ABTI_pool_get_ptr
static ABTI_pool * ABTI_pool_get_ptr(ABT_pool pool)
Definition: abti_pool.h:11
ABTI_sched::request
ABTD_atomic_uint32 request
Definition: abti.h:297
ABT_XSTREAM_STATE_RUNNING
@ ABT_XSTREAM_STATE_RUNNING
Definition: abt.h:406
ABTI_sched
Definition: abti.h:289
ABTI_ythread_create_root
ABTU_ret_err int ABTI_ythread_create_root(ABTI_global *p_global, ABTI_local *p_local, ABTI_xstream *p_xstream, ABTI_ythread **pp_root_ythread)
Definition: thread.c:2328
ABTI_xstream::p_root_ythread
ABTI_ythread * p_root_ythread
Definition: abti.h:277
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:750
ABTI_ythread
Definition: abti.h:406
ABT_xstream_free
int ABT_xstream_free(ABT_xstream *xstream)
Free an execution stream.
Definition: stream.c:422
ABTI_ythread_set_ready
void ABTI_ythread_set_ready(ABTI_local *p_local, ABTI_ythread *p_ythread)
Definition: ythread.c:65
ABTD_affinity_cpuset_apply_default
int ABTD_affinity_cpuset_apply_default(ABTD_xstream_context *p_ctx, int rank)
Definition: abtd_affinity.c:442
ABTI_ythread_suspend
void ABTI_ythread_suspend(ABTI_xstream **pp_local_xstream, ABTI_ythread *p_ythread, ABT_sync_event_type sync_event_type, void *p_sync)
Definition: ythread.c:45
ABTD_ythread_context_switch
static void ABTD_ythread_context_switch(ABTD_ythread_context *p_old, ABTD_ythread_context *p_new)
Definition: abtd_ythread.h:73
ABT_ERR_INV_ARG
#define ABT_ERR_INV_ARG
Error code: invalid user argument.
Definition: abt.h:250
ABTI_xstream_start_primary
void ABTI_xstream_start_primary(ABTI_global *p_global, ABTI_xstream **pp_local_xstream, ABTI_xstream *p_xstream, ABTI_ythread *p_ythread)
Definition: stream.c:1525
ABTI_CHECK_NULL_POOL_PTR
#define ABTI_CHECK_NULL_POOL_PTR(p)
Definition: abti_error.h:168
ABTD_affinity_cpuset::num_cpuids
size_t num_cpuids
Definition: abtd.h:39
ABTI_CHECK_NULL_XSTREAM_PTR
#define ABTI_CHECK_NULL_XSTREAM_PTR(p)
Definition: abti_error.h:159
ABT_xstream_get_state
int ABT_xstream_get_state(ABT_xstream xstream, ABT_xstream_state *state)
Get a state of an execution stream.
Definition: stream.c:1079
ABTU_free
static void ABTU_free(void *ptr)
Definition: abtu.h:217
ABTI_thread::p_parent
ABTI_thread * p_parent
Definition: abti.h:378
ABTD_atomic_fetch_or_uint32
static uint32_t ABTD_atomic_fetch_or_uint32(ABTD_atomic_uint32 *ptr, uint32_t v)
Definition: abtd_atomic.h:631
ABTD_spinlock_release
static void ABTD_spinlock_release(ABTD_spinlock *p_lock)
Definition: abtd_spinlock.h:42
ABTI_ythread_context_switch_to_child
static ABTI_ythread * ABTI_ythread_context_switch_to_child(ABTI_xstream **pp_local_xstream, ABTI_ythread *p_old, ABTI_ythread *p_new)
Definition: abti_ythread.h:345
ABTD_affinity_cpuset_read
ABTU_ret_err int ABTD_affinity_cpuset_read(ABTD_xstream_context *p_ctx, int max_cpuids, int *cpuids, int *p_num_cpuids)
Definition: abtd_affinity.c:428
ABTD_xstream_context_join
void ABTD_xstream_context_join(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:112
ABTI_xstream_get_ptr
static ABTI_xstream * ABTI_xstream_get_ptr(ABT_xstream xstream)
Definition: abti_stream.h:11
ABT_xstream_set_main_sched
int ABT_xstream_set_main_sched(ABT_xstream xstream, ABT_sched sched)
Set the main scheduler of an execution stream.
Definition: stream.c:850
ABTI_xstream_print
void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os, int indent, ABT_bool print_sub)
Definition: stream.c:1620
ABTI_ythread_create_main_sched
ABTU_ret_err int ABTI_ythread_create_main_sched(ABTI_global *p_global, ABTI_local *p_local, ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: thread.c:2355
ABTI_thread::p_last_xstream
ABTI_xstream * p_last_xstream
Definition: abti.h:377
ABTI_sched_free
void ABTI_sched_free(ABTI_global *p_global, ABTI_local *p_local, ABTI_sched *p_sched, ABT_bool force_free)
Definition: sched.c:834
xstream_add_xstream_list
static void xstream_add_xstream_list(ABTI_global *p_global, ABTI_xstream *p_newxstream)
Definition: stream.c:2173
ABTI_thread_get_mig_data
ABTU_ret_err int ABTI_thread_get_mig_data(ABTI_global *p_global, ABTI_local *p_local, ABTI_thread *p_thread, ABTI_thread_mig_data **pp_mig_data)
Definition: thread.c:2436
ABTI_ythread::thread
ABTI_thread thread
Definition: abti.h:407
ABTI_local
struct ABTI_local ABTI_local
Definition: abti.h:110
ABT_XSTREAM_NULL
#define ABT_XSTREAM_NULL
Definition: abt.h:1055
ABTI_thread_unset_associated_pool
static void ABTI_thread_unset_associated_pool(ABTI_global *p_global, ABTI_thread *p_thread)
Definition: abti_unit.h:200
ABTI_tool_event_thread_cancel
#define ABTI_tool_event_thread_cancel(p_local_xstream, p_thread)
Definition: abti_tool.h:305
ABTI_global::set_affinity
ABT_bool set_affinity
Definition: abti.h:205
ABTI_THREAD_REQ_BLOCK
#define ABTI_THREAD_REQ_BLOCK
Definition: abti.h:46
ABTD_ythread_cancel
void ABTD_ythread_cancel(ABTI_xstream *p_local_xstream, ABTI_ythread *p_ythread)
Definition: abtd_ythread.c:120
ABTI_CHECK_TRUE
#define ABTI_CHECK_TRUE(cond, abt_errno)
Definition: abti_error.h:130
ABTI_thread_unset_request
static void ABTI_thread_unset_request(ABTI_thread *p_thread, uint32_t req)
Definition: abti_thread.h:73
ABTI_sched_config_get_ptr
static ABTI_sched_config * ABTI_sched_config_get_ptr(ABT_sched_config config)
Definition: abti_config.h:12
ABTI_global
Definition: abti.h:196
ABTI_thread::f_thread
void(* f_thread)(void *)
Definition: abti.h:379
xstream_launch_root_ythread
static void * xstream_launch_root_ythread(void *p_xstream)
Definition: stream.c:1676
ABT_ERR_INV_XSTREAM_RANK
#define ABT_ERR_INV_XSTREAM_RANK
Error code: invalid execution stream rank.
Definition: abt.h:119
ABT_THREAD_STATE_RUNNING
@ ABT_THREAD_STATE_RUNNING
Definition: abt.h:419
ABT_xstream_revive
int ABT_xstream_revive(ABT_xstream xstream)
Revive a terminated execution stream.
Definition: stream.c:344
ABT_xstream_run_unit
int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool)
Execute a work unit.
Definition: stream.c:1224
ABTI_thread::p_pool
ABTI_pool * p_pool
Definition: abti.h:383
ABT_UNIT_NULL
#define ABT_UNIT_NULL
Definition: abt.h:1061
ABTI_pool_release
static int32_t ABTI_pool_release(ABTI_pool *p_pool)
Definition: abti_pool.h:115
ABTI_xstream::p_prev
ABTI_xstream * p_prev
Definition: abti.h:266
ABTD_affinity_cpuset
Definition: abtd.h:38
ABTI_xstream_type
ABTI_xstream_type
Definition: abti.h:70
ABTI_local_get_xstream
static ABTI_xstream * ABTI_local_get_xstream(ABTI_local *p_local)
Definition: abti_local.h:86
ABT_xstream_equal
int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2, ABT_bool *result)
Compare two execution stream handles for equality.
Definition: stream.c:1115
ABTI_mem_finalize_local
void ABTI_mem_finalize_local(ABTI_xstream *p_local_xstream)
Definition: malloc.c:196
ABTU_unreachable
static ABTU_noreturn void ABTU_unreachable(void)
Definition: abtu.h:126
ABTI_sched::p_replace_waiter
ABTI_ythread * p_replace_waiter
Definition: abti.h:296
ABTD_atomic_release_store_int
static void ABTD_atomic_release_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:1065
ABTI_global::max_xstreams
int max_xstreams
Definition: abti.h:197
ABT_XSTREAM_STATE_TERMINATED
@ ABT_XSTREAM_STATE_TERMINATED
Definition: abt.h:408
ABTI_tool_event_thread_finish
#define ABTI_tool_event_thread_finish(p_local_xstream, p_thread, p_parent)
Definition: abti_tool.h:297
ABTD_atomic_relaxed_load_ptr
static void * ABTD_atomic_relaxed_load_ptr(const ABTD_atomic_ptr *ptr)
Definition: abtd_atomic.h:846
ABTD_affinity_cpuset::cpuids
int * cpuids
Definition: abtd.h:40
ABT_xstream_state
ABT_xstream_state
State of an execution stream.
Definition: abt.h:404
ABTI_sched_print
void ABTI_sched_print(ABTI_sched *p_sched, FILE *p_os, int indent, ABT_bool print_sub)
Definition: sched.c:959
ABTI_THREAD_REQ_JOIN
#define ABTI_THREAD_REQ_JOIN
Definition: abti.h:42