ARGOBOTS  2202510f2bd4ba732a2ba2215171c0820320f58d
stream.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 ABTU_ret_err static int xstream_create(ABTI_global *p_global,
9  ABTI_sched *p_sched,
10  ABTI_xstream_type xstream_type, int rank,
11  ABT_bool start,
12  ABTI_xstream **pp_xstream);
13 ABTU_ret_err static int xstream_join(ABTI_local **pp_local,
14  ABTI_xstream *p_xstream);
15 static ABT_bool xstream_set_new_rank(ABTI_global *p_global,
16  ABTI_xstream *p_newxstream, int rank);
17 static ABT_bool xstream_change_rank(ABTI_global *p_global,
18  ABTI_xstream *p_xstream, int rank);
19 static void xstream_return_rank(ABTI_global *p_global, ABTI_xstream *p_xstream);
20 static inline void xstream_schedule_ythread(ABTI_global *p_global,
21  ABTI_xstream **pp_local_xstream,
22  ABTI_ythread *p_ythread);
23 static inline void xstream_schedule_task(ABTI_global *p_global,
24  ABTI_xstream *p_local_xstream,
25  ABTI_thread *p_task);
26 static void xstream_init_main_sched(ABTI_xstream *p_xstream,
27  ABTI_sched *p_sched);
28 ABTU_ret_err static int
29 xstream_update_main_sched(ABTI_global *p_global,
30  ABTI_xstream **pp_local_xstream,
31  ABTI_xstream *p_xstream, ABTI_sched *p_sched);
32 static void *xstream_launch_root_ythread(void *p_xstream);
33 #ifndef ABT_CONFIG_DISABLE_MIGRATION
34 ABTU_ret_err static int xstream_migrate_thread(ABTI_global *p_global,
35  ABTI_local *p_local,
36  ABTI_thread *p_thread);
37 #endif
38 
87 int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
88 {
89  ABTI_UB_ASSERT(ABTI_initialized());
90  ABTI_UB_ASSERT(newxstream);
91 
92 #ifndef ABT_CONFIG_ENABLE_VER_20_API
93  /* Argobots 1.x sets newxstream to NULL on error. */
94  *newxstream = ABT_XSTREAM_NULL;
95 #endif
96  int abt_errno;
97  ABTI_xstream *p_newxstream;
98 
99  ABTI_global *p_global;
100  ABTI_SETUP_GLOBAL(&p_global);
101 
102  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
103  if (!p_sched) {
104  abt_errno =
105  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
106  ABTI_CHECK_ERROR(abt_errno);
107  } else {
108 #ifndef ABT_CONFIG_ENABLE_VER_20_API
109  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
111 #else
112  ABTI_UB_ASSERT(p_sched->used == ABTI_SCHED_NOT_USED);
113 #endif
114  }
115 
116  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_SECONDARY,
117  -1, ABT_TRUE, &p_newxstream);
118  if (abt_errno != ABT_SUCCESS) {
119  if (!ABTI_sched_get_ptr(sched)) {
120  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
121  ABT_FALSE);
122  }
123  ABTI_HANDLE_ERROR(abt_errno);
124  }
125 
126  /* Return value */
127  *newxstream = ABTI_xstream_get_handle(p_newxstream);
128  return ABT_SUCCESS;
129 }
130 
184 int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools,
185  ABT_pool *pools, ABT_sched_config config,
186  ABT_xstream *newxstream)
187 {
188  ABTI_UB_ASSERT(ABTI_initialized());
189  ABTI_UB_ASSERT(pools || num_pools <= 0);
190  ABTI_UB_ASSERT(newxstream);
191 
192 #ifndef ABT_CONFIG_ENABLE_VER_20_API
193  /* Argobots 1.x sets newxstream to NULL on error. */
194  *newxstream = ABT_XSTREAM_NULL;
195 #endif
196  ABTI_CHECK_TRUE(num_pools >= 0, ABT_ERR_INV_ARG);
197 
198  int abt_errno;
199  ABTI_xstream *p_newxstream;
200  ABTI_sched_config *p_config = ABTI_sched_config_get_ptr(config);
201 
202  ABTI_global *p_global;
203  ABTI_SETUP_GLOBAL(&p_global);
204 
205  ABTI_sched *p_sched;
206  abt_errno =
207  ABTI_sched_create_basic(predef, num_pools, pools, p_config, &p_sched);
208  ABTI_CHECK_ERROR(abt_errno);
209 
210  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_SECONDARY,
211  -1, ABT_TRUE, &p_newxstream);
212  if (abt_errno != ABT_SUCCESS) {
213  int i;
214  for (i = 0; i < num_pools; i++) {
215  if (pools[i] != ABT_POOL_NULL) {
216  /* Avoid freeing user-given pools. */
217  ABTI_pool_release(ABTI_pool_get_ptr(p_sched->pools[i]));
218  p_sched->pools[i] = ABT_POOL_NULL;
219  }
220  }
221  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
222  ABT_FALSE);
223  ABTI_HANDLE_ERROR(abt_errno);
224  }
225 
226  *newxstream = ABTI_xstream_get_handle(p_newxstream);
227  return ABT_SUCCESS;
228 }
229 
282  ABT_xstream *newxstream)
283 {
284  ABTI_UB_ASSERT(ABTI_initialized());
285  ABTI_UB_ASSERT(newxstream);
286 
287 #ifndef ABT_CONFIG_ENABLE_VER_20_API
288  /* Argobots 1.x sets newxstream to NULL on error. */
289  *newxstream = ABT_XSTREAM_NULL;
290 #endif
291  int abt_errno;
292  ABTI_xstream *p_newxstream;
293 
294  ABTI_global *p_global;
295  ABTI_SETUP_GLOBAL(&p_global);
296 
297  ABTI_CHECK_TRUE(rank >= 0, ABT_ERR_INV_XSTREAM_RANK);
298 
299  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
300  if (!p_sched) {
301  abt_errno =
302  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
303  ABTI_CHECK_ERROR(abt_errno);
304  } else {
305 #ifndef ABT_CONFIG_ENABLE_VER_20_API
306  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
308 #else
309  ABTI_UB_ASSERT(p_sched->used == ABTI_SCHED_NOT_USED);
310 #endif
311  }
312 
313  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_SECONDARY,
314  rank, ABT_TRUE, &p_newxstream);
315  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
316  if (!ABTI_sched_get_ptr(sched)) {
317  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
318  ABT_FALSE);
319  }
320  ABTI_HANDLE_ERROR(abt_errno);
321  }
322 
323  /* Return value */
324  *newxstream = ABTI_xstream_get_handle(p_newxstream);
325  return ABT_SUCCESS;
326 }
327 
359 {
360  ABTI_UB_ASSERT(ABTI_initialized());
361 
362  ABTI_global *p_global = ABTI_global_get_global();
363  ABTI_local *p_local = ABTI_local_get_local();
364  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
365  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
366 
367  /* Revives the main scheduler thread. */
368  ABTI_sched *p_main_sched = p_xstream->p_main_sched;
369  ABTI_ythread *p_main_sched_ythread = p_main_sched->p_ythread;
370  /* TODO: should we check the thread state instead of the xstream state? */
371  ABTI_CHECK_TRUE(ABTD_atomic_relaxed_load_int(
372  &p_main_sched_ythread->thread.state) ==
375 
376  ABTD_atomic_relaxed_store_uint32(&p_main_sched->request, 0);
377  ABTI_tool_event_thread_join(p_local, &p_main_sched_ythread->thread,
378  ABTI_local_get_xstream_or_null(p_local)
379  ? ABTI_local_get_xstream(p_local)->p_thread
380  : NULL);
381 
382  int abt_errno =
383  ABTI_thread_revive(p_global, p_local, p_xstream->p_root_pool,
384  p_main_sched_ythread->thread.f_thread,
385  p_main_sched_ythread->thread.p_arg,
386  &p_main_sched_ythread->thread);
387  /* ABTI_thread_revive() never fails since it does not update an associated
388  * pool.*/
389  assert(abt_errno == ABT_SUCCESS);
390 
391  ABTD_atomic_relaxed_store_int(&p_xstream->state, ABT_XSTREAM_STATE_RUNNING);
392  ABTD_xstream_context_revive(&p_xstream->ctx);
393  return ABT_SUCCESS;
394 }
395 
437 int ABT_xstream_free(ABT_xstream *xstream)
438 {
439  ABTI_UB_ASSERT(ABTI_initialized());
440  ABTI_UB_ASSERT(xstream);
441 
442  ABTI_global *p_global;
443  ABTI_SETUP_GLOBAL(&p_global);
444 
445  ABTI_local *p_local = ABTI_local_get_local();
446  ABT_xstream h_xstream = *xstream;
447 
448  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(h_xstream);
449  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
450 
451  /* We first need to check whether p_local_xstream is NULL because this
452  * routine might be called by external threads. */
453  ABTI_CHECK_TRUE_MSG(p_xstream != ABTI_local_get_xstream_or_null(p_local),
455  "The current xstream cannot be freed.");
456 
457  ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
459  "The primary xstream cannot be freed explicitly.");
460 
461  /* Wait until xstream terminates */
462  int abt_errno = xstream_join(&p_local, p_xstream);
463  ABTI_CHECK_ERROR(abt_errno);
464 
465  /* Free the xstream object */
466  ABTI_xstream_free(p_global, p_local, p_xstream, ABT_FALSE);
467 
468  /* Return value */
469  *xstream = ABT_XSTREAM_NULL;
470  return ABT_SUCCESS;
471 }
472 
504 int ABT_xstream_join(ABT_xstream xstream)
505 {
506  ABTI_UB_ASSERT(ABTI_initialized());
507 
508  ABTI_local *p_local = ABTI_local_get_local();
509  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
510  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
511 
512  int abt_errno = xstream_join(&p_local, p_xstream);
513  ABTI_CHECK_ERROR(abt_errno);
514  return ABT_SUCCESS;
515 }
516 
551 int ABT_xstream_exit(void)
552 {
553  ABTI_xstream *p_local_xstream;
554  ABTI_ythread *p_ythread;
555 #ifndef ABT_CONFIG_ENABLE_VER_20_API
556  ABTI_SETUP_GLOBAL(NULL);
557 #else
558  ABTI_UB_ASSERT(ABTI_initialized());
559 #endif
560  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, &p_ythread);
561  /* Check if the target is the primary execution stream. */
562  ABTI_CHECK_TRUE(p_local_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
564 
565  /* Terminate the main scheduler. */
566  ABTD_atomic_fetch_or_uint32(&p_local_xstream->p_main_sched->p_ythread
567  ->thread.request,
568  ABTI_THREAD_REQ_TERMINATE);
569  /* Terminate this ULT */
570  ABTI_ythread_exit(p_local_xstream, p_ythread);
572  return ABT_SUCCESS;
573 }
574 
603 int ABT_xstream_cancel(ABT_xstream xstream)
604 {
605  ABTI_UB_ASSERT(ABTI_initialized());
606 
607  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
608  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
609  ABTI_CHECK_TRUE(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
611 
612  /* Terminate the main scheduler of the target xstream. */
613  ABTD_atomic_fetch_or_uint32(&p_xstream->p_main_sched->p_ythread->thread
614  .request,
615  ABTI_THREAD_REQ_TERMINATE);
616  return ABT_SUCCESS;
617 }
618 
650 int ABT_xstream_self(ABT_xstream *xstream)
651 {
652  ABTI_UB_ASSERT(xstream);
653 
654  ABTI_xstream *p_local_xstream;
655 #ifndef ABT_CONFIG_ENABLE_VER_20_API
656  *xstream = ABT_XSTREAM_NULL;
657  ABTI_SETUP_GLOBAL(NULL);
658 #else
659  ABTI_UB_ASSERT(ABTI_initialized());
660 #endif
661  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
662 
663  /* Return value */
664  *xstream = ABTI_xstream_get_handle(p_local_xstream);
665  return ABT_SUCCESS;
666 }
667 
697 int ABT_xstream_self_rank(int *rank)
698 {
699  ABTI_UB_ASSERT(rank);
700 
701  ABTI_xstream *p_local_xstream;
702 #ifndef ABT_CONFIG_ENABLE_VER_20_API
703  ABTI_SETUP_GLOBAL(NULL);
704 #else
705  ABTI_UB_ASSERT(ABTI_initialized());
706 #endif
707  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
708  /* Return value */
709  *rank = (int)p_local_xstream->rank;
710  return ABT_SUCCESS;
711 }
712 
748 int ABT_xstream_set_rank(ABT_xstream xstream, int rank)
749 {
750  ABTI_UB_ASSERT(ABTI_initialized());
751 
752  ABTI_global *p_global;
753  ABTI_SETUP_GLOBAL(&p_global);
754 
755  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
756  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
757  ABTI_CHECK_TRUE(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
759  ABTI_CHECK_TRUE(rank >= 0, ABT_ERR_INV_XSTREAM_RANK);
760 
761  ABT_bool is_changed = xstream_change_rank(p_global, p_xstream, rank);
762  ABTI_CHECK_TRUE(is_changed, ABT_ERR_INV_XSTREAM_RANK);
763 
764  /* Set the CPU affinity for the ES */
765  if (p_global->set_affinity == ABT_TRUE) {
766  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
767  }
768  return ABT_SUCCESS;
769 }
770 
795 int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
796 {
797  ABTI_UB_ASSERT(ABTI_initialized());
798  ABTI_UB_ASSERT(rank);
799 
800  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
801  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
802 
803  *rank = (int)p_xstream->rank;
804  return ABT_SUCCESS;
805 }
806 
886 {
887  ABTI_UB_ASSERT(ABTI_initialized());
888 
889  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
890  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
891 
892  ABTI_global *p_global;
893  ABTI_SETUP_GLOBAL(&p_global);
894 
895  ABTI_xstream *p_local_xstream;
896  ABTI_ythread *p_self;
897  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, &p_self);
898 
899 #ifndef ABT_CONFIG_ENABLE_VER_20_API
900  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_xstream->state) !=
902  p_local_xstream == p_xstream,
904 #else
905  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_xstream->state) !=
907  p_local_xstream == p_xstream,
909 #endif
910 
911  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
912  if (!p_sched) {
913  int abt_errno =
914  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
915  ABTI_CHECK_ERROR(abt_errno);
916  } else {
917 #ifndef ABT_CONFIG_ENABLE_VER_20_API
918  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
920 #else
921  ABTI_UB_ASSERT(p_sched->used == ABTI_SCHED_NOT_USED);
922 #endif
923  }
924 
925  int abt_errno = xstream_update_main_sched(p_global, &p_local_xstream,
926  p_xstream, p_sched);
927  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
928  if (!ABTI_sched_get_ptr(sched)) {
929  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
930  ABT_FALSE);
931  }
932  ABTI_HANDLE_ERROR(abt_errno);
933  }
934  return ABT_SUCCESS;
935 }
936 
995  ABT_sched_predef predef, int num_pools,
996  ABT_pool *pools)
997 {
998  ABTI_UB_ASSERT(ABTI_initialized());
999  ABTI_UB_ASSERT(pools || num_pools <= 0);
1000 
1001  int abt_errno;
1002  ABTI_global *p_global;
1003  ABTI_SETUP_GLOBAL(&p_global);
1004 
1005  ABTI_xstream *p_local_xstream;
1006  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
1007 
1008  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1009  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1010 
1011  ABTI_sched *p_sched;
1012  abt_errno =
1013  ABTI_sched_create_basic(predef, num_pools, pools, NULL, &p_sched);
1014  ABTI_CHECK_ERROR(abt_errno);
1015 
1016  abt_errno = xstream_update_main_sched(p_global, &p_local_xstream, p_xstream,
1017  p_sched);
1018  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
1019  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
1020  ABT_FALSE);
1021  ABTI_HANDLE_ERROR(abt_errno);
1022  }
1023  return ABT_SUCCESS;
1024 }
1025 
1049 {
1050  ABTI_UB_ASSERT(ABTI_initialized());
1051  ABTI_UB_ASSERT(sched);
1052 
1053  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1054  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1055 
1056  *sched = ABTI_sched_get_handle(p_xstream->p_main_sched);
1057  return ABT_SUCCESS;
1058 }
1059 
1088 int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools,
1089  ABT_pool *pools)
1090 {
1091  ABTI_UB_ASSERT(ABTI_initialized());
1092  ABTI_UB_ASSERT(pools || max_pools <= 0);
1093 
1094  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1095  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1096 
1097  ABTI_sched *p_sched = p_xstream->p_main_sched;
1098  max_pools = ABTU_min_int(p_sched->num_pools, max_pools);
1099  memcpy(pools, p_sched->pools, sizeof(ABT_pool) * max_pools);
1100  return ABT_SUCCESS;
1101 }
1102 
1128 {
1129  ABTI_UB_ASSERT(ABTI_initialized());
1130  ABTI_UB_ASSERT(state);
1131 
1132  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1133  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1134 
1135  *state = (ABT_xstream_state)ABTD_atomic_acquire_load_int(&p_xstream->state);
1136  return ABT_SUCCESS;
1137 }
1138 
1166 int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2,
1167  ABT_bool *result)
1168 {
1169  ABTI_UB_ASSERT(result);
1170 
1171  ABTI_xstream *p_xstream1 = ABTI_xstream_get_ptr(xstream1);
1172  ABTI_xstream *p_xstream2 = ABTI_xstream_get_ptr(xstream2);
1173  *result = (p_xstream1 == p_xstream2) ? ABT_TRUE : ABT_FALSE;
1174  return ABT_SUCCESS;
1175 }
1176 
1205 int ABT_xstream_get_num(int *num_xstreams)
1206 {
1207  ABTI_UB_ASSERT(num_xstreams);
1208 #ifdef ABT_CONFIG_ENABLE_VER_20_API
1209  ABTI_UB_ASSERT(ABTI_initialized());
1210 #endif
1211 
1212  ABTI_global *p_global;
1213  ABTI_SETUP_GLOBAL(&p_global);
1214 
1215  *num_xstreams = p_global->num_xstreams;
1216  return ABT_SUCCESS;
1217 }
1218 
1243 int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *is_primary)
1244 {
1245  ABTI_UB_ASSERT(ABTI_initialized());
1246  ABTI_UB_ASSERT(is_primary);
1247 
1248  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1249  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1250 
1251  /* Return value */
1252  *is_primary =
1253  (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) ? ABT_TRUE : ABT_FALSE;
1254  return ABT_SUCCESS;
1255 }
1256 
1285 int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool)
1286 {
1287  ABTI_UB_ASSERT(ABTI_initialized());
1289  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
1290  ABTI_CHECK_NULL_POOL_PTR(p_pool);
1291  ABTI_CHECK_TRUE(unit != ABT_UNIT_NULL, ABT_ERR_INV_UNIT);
1292  ABTI_global *p_global;
1293  ABTI_SETUP_GLOBAL(&p_global);
1294 
1295  ABTI_xstream *p_local_xstream;
1296  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
1297 
1298  ABTI_thread *p_thread;
1299  int abt_errno =
1300  ABTI_unit_set_associated_pool(p_global, unit, p_pool, &p_thread);
1301  ABTI_CHECK_ERROR(abt_errno);
1302  ABTI_xstream_run_thread(p_global, &p_local_xstream, p_thread);
1303  return ABT_SUCCESS;
1304 }
1305 
1339 {
1340  ABTI_xstream *p_local_xstream;
1341 #ifndef ABT_CONFIG_ENABLE_VER_20_API
1342  ABTI_SETUP_GLOBAL(NULL);
1343 #else
1344  ABTI_UB_ASSERT(ABTI_initialized());
1345 #endif
1346  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
1347 
1348  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
1349  ABTI_CHECK_NULL_SCHED_PTR(p_sched);
1350  ABTI_CHECK_TRUE(p_local_xstream->p_thread == &p_sched->p_ythread->thread,
1352 
1353  ABTI_xstream_check_events(p_local_xstream, p_sched);
1354  return ABT_SUCCESS;
1355 }
1356 
1385 int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
1386 {
1387  ABTI_UB_ASSERT(ABTI_initialized());
1389  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1390  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1391 
1392  ABTD_affinity_cpuset cpuset;
1393  cpuset.num_cpuids = 1;
1394  cpuset.cpuids = &cpuid;
1395  int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &cpuset);
1396  /* Do not free cpuset since cpuids points to a user pointer. */
1397  ABTI_CHECK_ERROR(abt_errno);
1398  return ABT_SUCCESS;
1399 }
1400 
1436 int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
1437 {
1438  ABTI_UB_ASSERT(ABTI_initialized());
1439  ABTI_UB_ASSERT(cpuid);
1441  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1442  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1443 
1444  int num_cpuid;
1445  int cpuids[1];
1446  int abt_errno =
1447  ABTD_affinity_cpuset_read(&p_xstream->ctx, 1, cpuids, &num_cpuid);
1448  ABTI_CHECK_ERROR(abt_errno);
1449  ABTI_CHECK_TRUE(num_cpuid > 0, ABT_ERR_CPUID);
1450 
1451  *cpuid = cpuids[0];
1452  return ABT_SUCCESS;
1453 }
1454 
1488 int ABT_xstream_set_affinity(ABT_xstream xstream, int num_cpuids, int *cpuids)
1489 {
1490  ABTI_UB_ASSERT(ABTI_initialized());
1491  ABTI_UB_ASSERT(cpuids || num_cpuids <= 0);
1493  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1494  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1495  ABTI_CHECK_TRUE(num_cpuids >= 0, ABT_ERR_INV_ARG);
1496 
1497  ABTD_affinity_cpuset affinity;
1498  affinity.num_cpuids = num_cpuids;
1499  affinity.cpuids = cpuids;
1500  int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &affinity);
1501  /* Do not free affinity since cpuids may not be freed. */
1502  ABTI_CHECK_ERROR(abt_errno);
1503  return ABT_SUCCESS;
1504 }
1505 
1554 int ABT_xstream_get_affinity(ABT_xstream xstream, int max_cpuids, int *cpuids,
1555  int *num_cpuids)
1556 {
1557  ABTI_UB_ASSERT(ABTI_initialized());
1558  ABTI_UB_ASSERT(cpuids || max_cpuids <= 0);
1560  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1561  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1562  ABTI_CHECK_TRUE(max_cpuids >= 0, ABT_ERR_INV_ARG);
1563 
1564  int abt_errno = ABTD_affinity_cpuset_read(&p_xstream->ctx, max_cpuids,
1565  cpuids, num_cpuids);
1566  ABTI_CHECK_ERROR(abt_errno);
1567  return abt_errno;
1568 }
1569 
1570 /*****************************************************************************/
1571 /* Private APIs */
1572 /*****************************************************************************/
1573 
1574 ABTU_ret_err int ABTI_xstream_create_primary(ABTI_global *p_global,
1575  ABTI_xstream **pp_xstream)
1576 {
1577  int abt_errno;
1578  ABTI_xstream *p_newxstream;
1579  ABTI_sched *p_sched;
1580 
1581  /* For the primary ES, a default scheduler is created. */
1582  abt_errno =
1583  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
1584  ABTI_CHECK_ERROR(abt_errno);
1585 
1586  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_PRIMARY, -1,
1587  ABT_FALSE, &p_newxstream);
1588  if (abt_errno != ABT_SUCCESS) {
1589  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
1590  ABT_TRUE);
1591  ABTI_HANDLE_ERROR(abt_errno);
1592  }
1593 
1594  *pp_xstream = p_newxstream;
1595  return ABT_SUCCESS;
1596 }
1597 
1598 /* This routine starts the primary ES. It should be called in ABT_init. */
1599 void ABTI_xstream_start_primary(ABTI_global *p_global,
1600  ABTI_xstream **pp_local_xstream,
1601  ABTI_xstream *p_xstream,
1602  ABTI_ythread *p_ythread)
1603 {
1604  /* p_ythread must be the main thread. */
1605  ABTI_ASSERT(p_ythread->thread.type & ABTI_THREAD_TYPE_PRIMARY);
1606  /* The ES's state must be running here. */
1607  ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
1609 
1610  LOG_DEBUG("[E%d] start\n", p_xstream->rank);
1611 
1612  ABTD_xstream_context_set_self(&p_xstream->ctx);
1613 
1614  /* Set the CPU affinity for the ES */
1615  if (p_global->set_affinity == ABT_TRUE) {
1616  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
1617  }
1618 
1619  /* Context switch to the root thread. */
1620  p_xstream->p_root_ythread->thread.p_last_xstream = p_xstream;
1621  ABTD_ythread_context_switch(&p_ythread->ctx,
1622  &p_xstream->p_root_ythread->ctx);
1623  /* Come back to the primary thread. Now this thread is executed on top of
1624  * the main scheduler, which is running on the root thread. */
1625  (*pp_local_xstream)->p_thread = &p_ythread->thread;
1626 }
1627 
1628 void ABTI_xstream_run_thread(ABTI_global *p_global,
1629  ABTI_xstream **pp_local_xstream,
1630  ABTI_thread *p_thread)
1631 {
1632  if (p_thread->type & ABTI_THREAD_TYPE_YIELDABLE) {
1633  ABTI_ythread *p_ythread = ABTI_thread_get_ythread(p_thread);
1634  /* Execute a ULT */
1635  xstream_schedule_ythread(p_global, pp_local_xstream, p_ythread);
1636  } else {
1637  /* Execute a tasklet */
1638  xstream_schedule_task(p_global, *pp_local_xstream, p_thread);
1639  }
1640 }
1641 
1642 void ABTI_xstream_check_events(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
1643 {
1644  ABTI_info_check_print_all_thread_stacks();
1645 
1646  uint32_t request = ABTD_atomic_acquire_load_uint32(
1647  &p_xstream->p_main_sched->p_ythread->thread.request);
1648  if (request & ABTI_THREAD_REQ_JOIN) {
1649  ABTI_sched_finish(p_sched);
1650  }
1651 
1652  if ((request & ABTI_THREAD_REQ_TERMINATE) ||
1653  (request & ABTI_THREAD_REQ_CANCEL)) {
1654  ABTI_sched_exit(p_sched);
1655  }
1656 }
1657 
1658 void ABTI_xstream_free(ABTI_global *p_global, ABTI_local *p_local,
1659  ABTI_xstream *p_xstream, ABT_bool force_free)
1660 {
1661  LOG_DEBUG("[E%d] freed\n", p_xstream->rank);
1662 
1663  /* Clean up memory pool. */
1664  ABTI_mem_finalize_local(p_xstream);
1665  /* Return rank for reuse. rank must be returned prior to other free
1666  * functions so that other xstreams cannot refer to this xstream. */
1667  xstream_return_rank(p_global, p_xstream);
1668 
1669  /* Free the scheduler */
1670  ABTI_sched *p_cursched = p_xstream->p_main_sched;
1671  if (p_cursched != NULL) {
1672  /* Join a scheduler thread. */
1673  ABTI_tool_event_thread_join(p_local, &p_cursched->p_ythread->thread,
1674  ABTI_local_get_xstream_or_null(p_local)
1675  ? ABTI_local_get_xstream(p_local)
1676  ->p_thread
1677  : NULL);
1678  ABTI_sched_discard_and_free(p_global, p_local, p_cursched, force_free);
1679  /* The main scheduler thread is also freed. */
1680  }
1681 
1682  /* Free the root thread and pool. */
1683  ABTI_ythread_free_root(p_global, p_local, p_xstream->p_root_ythread);
1684  ABTI_pool_free(p_xstream->p_root_pool);
1685 
1686  /* Free the context if a given xstream is secondary. */
1687  if (p_xstream->type == ABTI_XSTREAM_TYPE_SECONDARY) {
1688  ABTD_xstream_context_free(&p_xstream->ctx);
1689  }
1690 
1691  ABTU_free(p_xstream);
1692 }
1693 
1694 void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os, int indent,
1695  ABT_bool print_sub)
1696 {
1697  if (p_xstream == NULL) {
1698  fprintf(p_os, "%*s== NULL ES ==\n", indent, "");
1699  } else {
1700  const char *type, *state;
1701  switch (p_xstream->type) {
1702  case ABTI_XSTREAM_TYPE_PRIMARY:
1703  type = "PRIMARY";
1704  break;
1705  case ABTI_XSTREAM_TYPE_SECONDARY:
1706  type = "SECONDARY";
1707  break;
1708  default:
1709  type = "UNKNOWN";
1710  break;
1711  }
1712  switch (ABTD_atomic_acquire_load_int(&p_xstream->state)) {
1714  state = "RUNNING";
1715  break;
1717  state = "TERMINATED";
1718  break;
1719  default:
1720  state = "UNKNOWN";
1721  break;
1722  }
1723 
1724  fprintf(p_os,
1725  "%*s== ES (%p) ==\n"
1726  "%*srank : %d\n"
1727  "%*stype : %s\n"
1728  "%*sstate : %s\n"
1729  "%*sroot_ythread : %p\n"
1730  "%*sroot_pool : %p\n"
1731  "%*sthread : %p\n"
1732  "%*smain_sched : %p\n",
1733  indent, "", (void *)p_xstream, indent, "", p_xstream->rank,
1734  indent, "", type, indent, "", state, indent, "",
1735  (void *)p_xstream->p_root_ythread, indent, "",
1736  (void *)p_xstream->p_root_pool, indent, "",
1737  (void *)p_xstream->p_thread, indent, "",
1738  (void *)p_xstream->p_main_sched);
1739 
1740  if (print_sub == ABT_TRUE) {
1741  ABTI_sched_print(p_xstream->p_main_sched, p_os,
1742  indent + ABTI_INDENT, ABT_TRUE);
1743  }
1744  fprintf(p_os, "%*sctx :\n", indent, "");
1745  ABTD_xstream_context_print(&p_xstream->ctx, p_os, indent + ABTI_INDENT);
1746  }
1747  fflush(p_os);
1748 }
1749 
1750 static void *xstream_launch_root_ythread(void *p_xstream)
1751 {
1752  ABTI_xstream *p_local_xstream = (ABTI_xstream *)p_xstream;
1753 
1754  /* Initialization of the local variables */
1755  ABTI_local_set_xstream(p_local_xstream);
1756 
1757  LOG_DEBUG("[E%d] start\n", p_local_xstream->rank);
1758 
1759  /* Set the root thread as the current thread */
1760  ABTI_ythread *p_root_ythread = p_local_xstream->p_root_ythread;
1761  p_local_xstream->p_thread = &p_local_xstream->p_root_ythread->thread;
1762  p_root_ythread->thread.p_last_xstream = p_local_xstream;
1763  p_root_ythread->thread.f_thread(p_root_ythread->thread.p_arg);
1764 
1765  LOG_DEBUG("[E%d] end\n", p_local_xstream->rank);
1766 
1767  /* Reset the current ES and its local info. */
1768  ABTI_local_set_xstream(NULL);
1769  return NULL;
1770 }
1771 
1772 /*****************************************************************************/
1773 /* Internal static functions */
1774 /*****************************************************************************/
1775 
1776 ABTU_ret_err static int xstream_create(ABTI_global *p_global,
1777  ABTI_sched *p_sched,
1778  ABTI_xstream_type xstream_type, int rank,
1779  ABT_bool start,
1780  ABTI_xstream **pp_xstream)
1782  int abt_errno, init_stage = 0;
1783  ABTI_xstream *p_newxstream;
1784 
1785  abt_errno = ABTU_malloc(sizeof(ABTI_xstream), (void **)&p_newxstream);
1786  ABTI_CHECK_ERROR(abt_errno);
1787 
1788  p_newxstream->p_prev = NULL;
1789  p_newxstream->p_next = NULL;
1790 
1791  if (xstream_set_new_rank(p_global, p_newxstream, rank) == ABT_FALSE) {
1792  abt_errno = ABT_ERR_INV_XSTREAM_RANK;
1793  goto FAILED;
1794  }
1795  init_stage = 1;
1796 
1797  p_newxstream->type = xstream_type;
1798  ABTD_atomic_relaxed_store_int(&p_newxstream->state,
1800  p_newxstream->p_main_sched = NULL;
1801  p_newxstream->p_thread = NULL;
1802  abt_errno = ABTI_mem_init_local(p_global, p_newxstream);
1803  if (abt_errno != ABT_SUCCESS)
1804  goto FAILED;
1805  init_stage = 2;
1806 
1807  /* Set the main scheduler */
1808  xstream_init_main_sched(p_newxstream, p_sched);
1809 
1810  /* Create the root thread. */
1811  abt_errno =
1812  ABTI_ythread_create_root(p_global, ABTI_xstream_get_local(p_newxstream),
1813  p_newxstream, &p_newxstream->p_root_ythread);
1814  if (abt_errno != ABT_SUCCESS)
1815  goto FAILED;
1816  init_stage = 3;
1817 
1818  /* Create the root pool. */
1819  abt_errno = ABTI_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPSC,
1820  ABT_FALSE, &p_newxstream->p_root_pool);
1821  if (abt_errno != ABT_SUCCESS)
1822  goto FAILED;
1823  init_stage = 4;
1824 
1825  /* Create the main scheduler thread. */
1826  abt_errno =
1827  ABTI_ythread_create_main_sched(p_global,
1828  ABTI_xstream_get_local(p_newxstream),
1829  p_newxstream,
1830  p_newxstream->p_main_sched);
1831  if (abt_errno != ABT_SUCCESS)
1832  goto FAILED;
1833  init_stage = 5;
1834 
1835  if (start) {
1836  /* The ES's state must be RUNNING */
1837  ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_newxstream->state) ==
1839  ABTI_ASSERT(p_newxstream->type != ABTI_XSTREAM_TYPE_PRIMARY);
1840  /* Start the main scheduler on a different ES */
1841  abt_errno = ABTD_xstream_context_create(xstream_launch_root_ythread,
1842  (void *)p_newxstream,
1843  &p_newxstream->ctx);
1844  if (abt_errno != ABT_SUCCESS)
1845  goto FAILED;
1846  init_stage = 6;
1847 
1848  /* Set the CPU affinity for the ES */
1849  if (p_global->set_affinity == ABT_TRUE) {
1850  ABTD_affinity_cpuset_apply_default(&p_newxstream->ctx,
1851  p_newxstream->rank);
1852  }
1853  }
1854 
1855  LOG_DEBUG("[E%d] created\n", p_newxstream->rank);
1856 
1857  /* Return value */
1858  *pp_xstream = p_newxstream;
1859  return ABT_SUCCESS;
1860 FAILED:
1861  if (init_stage >= 5) {
1862  ABTI_thread_free(p_global, ABTI_xstream_get_local(p_newxstream),
1863  &p_newxstream->p_main_sched->p_ythread->thread);
1864  p_newxstream->p_main_sched->p_ythread = NULL;
1865  }
1866  if (init_stage >= 4) {
1867  ABTI_pool_free(p_newxstream->p_root_pool);
1868  }
1869  if (init_stage >= 3) {
1870  ABTI_ythread_free_root(p_global, ABTI_xstream_get_local(p_newxstream),
1871  p_newxstream->p_root_ythread);
1872  }
1873  if (init_stage >= 2) {
1874  p_sched->used = ABTI_SCHED_NOT_USED;
1875  ABTI_mem_finalize_local(p_newxstream);
1876  }
1877  if (init_stage >= 1) {
1878  xstream_return_rank(p_global, p_newxstream);
1879  }
1880  ABTU_free(p_newxstream);
1881  return abt_errno;
1882 }
1883 
1884 ABTU_ret_err static int xstream_join(ABTI_local **pp_local,
1885  ABTI_xstream *p_xstream)
1886 {
1887  /* The primary ES cannot be joined. */
1888  ABTI_CHECK_TRUE(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
1890  /* The main scheduler cannot join itself. */
1891  ABTI_CHECK_TRUE(!ABTI_local_get_xstream_or_null(*pp_local) ||
1892  &p_xstream->p_main_sched->p_ythread->thread !=
1893  ABTI_local_get_xstream(*pp_local)->p_thread,
1895 
1896  /* Wait until the target ES terminates */
1897  ABTI_sched_finish(p_xstream->p_main_sched);
1898  ABTI_thread_join(pp_local, &p_xstream->p_main_sched->p_ythread->thread);
1899 
1900  /* Normal join request */
1901  ABTD_xstream_context_join(&p_xstream->ctx);
1902 
1903  ABTI_ASSERT(ABTD_atomic_acquire_load_int(&p_xstream->state) ==
1905  return ABT_SUCCESS;
1906 }
1907 
1908 static inline void xstream_schedule_ythread(ABTI_global *p_global,
1909  ABTI_xstream **pp_local_xstream,
1910  ABTI_ythread *p_ythread)
1911 {
1912  ABTI_xstream *p_local_xstream = *pp_local_xstream;
1914 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1915  if (ABTD_atomic_acquire_load_uint32(&p_ythread->thread.request) &
1916  ABTI_THREAD_REQ_CANCEL) {
1917  LOG_DEBUG("[U%" PRIu64 ":E%d] canceled\n",
1918  ABTI_thread_get_id(&p_ythread->thread),
1919  p_local_xstream->rank);
1920  ABTD_ythread_cancel(p_local_xstream, p_ythread);
1921  ABTI_xstream_terminate_thread(p_global,
1922  ABTI_xstream_get_local(p_local_xstream),
1923  &p_ythread->thread);
1924  return;
1925  }
1926 #endif
1927 
1928 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1929  if (ABTD_atomic_acquire_load_uint32(&p_ythread->thread.request) &
1930  ABTI_THREAD_REQ_MIGRATE) {
1931  int abt_errno =
1932  xstream_migrate_thread(p_global,
1933  ABTI_xstream_get_local(p_local_xstream),
1934  &p_ythread->thread);
1935  if (!ABTI_IS_ERROR_CHECK_ENABLED || abt_errno == ABT_SUCCESS) {
1936  /* Migration succeeded, so we do not need to schedule p_ythread. */
1937  return;
1938  }
1939  }
1940 #endif
1941 
1942  /* Change the last ES */
1943  p_ythread->thread.p_last_xstream = p_local_xstream;
1944 
1945  /* Change the ULT state */
1946  ABTD_atomic_release_store_int(&p_ythread->thread.state,
1948 
1949  /* Switch the context */
1950  LOG_DEBUG("[U%" PRIu64 ":E%d] start running\n",
1951  ABTI_thread_get_id(&p_ythread->thread), p_local_xstream->rank);
1952 
1953  /* Since the argument is pp_local_xstream, p_local_xstream->p_thread must be
1954  * yieldable. */
1955  ABTI_ythread *p_self = ABTI_thread_get_ythread(p_local_xstream->p_thread);
1956  p_ythread =
1957  ABTI_ythread_switch_to_child(pp_local_xstream, p_self, p_ythread);
1958  /* The previous ULT (p_ythread) may not be the same as one to which the
1959  * context has been switched. */
1960  /* The scheduler continues from here. */
1961  p_local_xstream = *pp_local_xstream;
1962 
1963  LOG_DEBUG("[U%" PRIu64 ":E%d] stopped\n",
1964  ABTI_thread_get_id(&p_ythread->thread), p_local_xstream->rank);
1965 
1966  /* Request handling. */
1967  /* We do not need to acquire-load request since all critical requests
1968  * (BLOCK, ORPHAN, STOP, and NOPUSH) are written by p_ythread. CANCEL might
1969  * be delayed. */
1970  uint32_t request =
1971  ABTD_atomic_acquire_load_uint32(&p_ythread->thread.request);
1972  if (request & ABTI_THREAD_REQ_TERMINATE) {
1973  /* The ULT has completed its execution or it called the exit request. */
1974  LOG_DEBUG("[U%" PRIu64 ":E%d] finished\n",
1975  ABTI_thread_get_id(&p_ythread->thread),
1976  p_local_xstream->rank);
1977  ABTI_xstream_terminate_thread(p_global,
1978  ABTI_xstream_get_local(p_local_xstream),
1979  &p_ythread->thread);
1980 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1981  } else if (request & ABTI_THREAD_REQ_CANCEL) {
1982  LOG_DEBUG("[U%" PRIu64 ":E%d] canceled\n",
1983  ABTI_thread_get_id(&p_ythread->thread),
1984  p_local_xstream->rank);
1985  ABTD_ythread_cancel(p_local_xstream, p_ythread);
1986  ABTI_xstream_terminate_thread(p_global,
1987  ABTI_xstream_get_local(p_local_xstream),
1988  &p_ythread->thread);
1989 #endif
1990  } else if (!(request & ABTI_THREAD_REQ_NON_YIELD)) {
1991  /* The ULT did not finish its execution.
1992  * Change the state of current running ULT and
1993  * add it to the pool again. */
1994  ABTI_pool_add_thread(&p_ythread->thread);
1995  } else if (request & ABTI_THREAD_REQ_BLOCK) {
1996  LOG_DEBUG("[U%" PRIu64 ":E%d] check blocked\n",
1997  ABTI_thread_get_id(&p_ythread->thread),
1998  p_local_xstream->rank);
1999  ABTI_thread_unset_request(&p_ythread->thread, ABTI_THREAD_REQ_BLOCK);
2000 #ifndef ABT_CONFIG_DISABLE_MIGRATION
2001  } else if (request & ABTI_THREAD_REQ_MIGRATE) {
2002  /* This is the case when the ULT requests migration of itself. */
2003  int abt_errno =
2004  xstream_migrate_thread(p_global,
2005  ABTI_xstream_get_local(p_local_xstream),
2006  &p_ythread->thread);
2007  /* Migration is optional, so it is okay if it fails. */
2008  (void)abt_errno;
2009 #endif
2010  } else if (request & ABTI_THREAD_REQ_ORPHAN) {
2011  /* The ULT is not pushed back to the pool and is disconnected from any
2012  * pool. */
2013  LOG_DEBUG("[U%" PRIu64 ":E%d] orphaned\n",
2014  ABTI_thread_get_id(&p_ythread->thread),
2015  p_local_xstream->rank);
2016  ABTI_thread_unset_request(&p_ythread->thread, ABTI_THREAD_REQ_ORPHAN);
2017  ABTI_thread_unset_associated_pool(p_global, &p_ythread->thread);
2018  } else {
2019  ABTI_ASSERT(0);
2020  ABTU_unreachable();
2021  }
2022 }
2023 
2024 static inline void xstream_schedule_task(ABTI_global *p_global,
2025  ABTI_xstream *p_local_xstream,
2026  ABTI_thread *p_task)
2027 {
2028 #ifndef ABT_CONFIG_DISABLE_TASK_CANCEL
2029  if (ABTD_atomic_acquire_load_uint32(&p_task->request) &
2030  ABTI_THREAD_REQ_CANCEL) {
2031  ABTI_tool_event_thread_cancel(p_local_xstream, p_task);
2032  ABTI_xstream_terminate_thread(p_global,
2033  ABTI_xstream_get_local(p_local_xstream),
2034  p_task);
2035  return;
2036  }
2037 #endif
2038 
2039  /* Change the task state */
2040  ABTD_atomic_release_store_int(&p_task->state, ABT_THREAD_STATE_RUNNING);
2041 
2042  /* Set the associated ES */
2043  p_task->p_last_xstream = p_local_xstream;
2044 
2045  /* Execute the task function */
2046  LOG_DEBUG("[T%" PRIu64 ":E%d] running\n", ABTI_thread_get_id(p_task),
2047  p_local_xstream->rank);
2048 
2049  ABTI_thread *p_sched_thread = p_local_xstream->p_thread;
2050  p_local_xstream->p_thread = p_task;
2051  p_task->p_parent = p_sched_thread;
2052 
2053  /* Execute the task function */
2054  ABTI_tool_event_thread_run(p_local_xstream, p_task, p_sched_thread,
2055  p_sched_thread);
2056  LOG_DEBUG("[T%" PRIu64 ":E%d] running\n", ABTI_thread_get_id(p_task),
2057  p_local_xstream->rank);
2058  p_task->f_thread(p_task->p_arg);
2059  ABTI_tool_event_thread_finish(p_local_xstream, p_task, p_sched_thread);
2060  LOG_DEBUG("[T%" PRIu64 ":E%d] stopped\n", ABTI_thread_get_id(p_task),
2061  p_local_xstream->rank);
2062 
2063  /* Set the current running scheduler's thread */
2064  p_local_xstream->p_thread = p_sched_thread;
2065 
2066  /* Terminate the tasklet */
2067  ABTI_xstream_terminate_thread(p_global,
2068  ABTI_xstream_get_local(p_local_xstream),
2069  p_task);
2070 }
2071 
2072 #ifndef ABT_CONFIG_DISABLE_MIGRATION
2073 ABTU_ret_err static int xstream_migrate_thread(ABTI_global *p_global,
2074  ABTI_local *p_local,
2075  ABTI_thread *p_thread)
2076 {
2077  int abt_errno;
2078  ABTI_pool *p_pool;
2079 
2080  ABTI_thread_mig_data *p_mig_data;
2081  abt_errno =
2082  ABTI_thread_get_mig_data(p_global, p_local, p_thread, &p_mig_data);
2083  ABTI_CHECK_ERROR(abt_errno);
2084 
2085  /* Extracting an argument embedded in a migration request. */
2086  p_pool = ABTD_atomic_relaxed_load_ptr(&p_mig_data->p_migration_pool);
2087 
2088  /* Change the associated pool */
2089  abt_errno = ABTI_thread_set_associated_pool(p_global, p_thread, p_pool);
2090  ABTI_CHECK_ERROR(abt_errno);
2091 
2092  /* callback function */
2093  if (p_mig_data->f_migration_cb) {
2094  ABTI_ythread *p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
2095  if (p_ythread) {
2096  ABT_thread thread = ABTI_ythread_get_handle(p_ythread);
2097  p_mig_data->f_migration_cb(thread, p_mig_data->p_migration_cb_arg);
2098  }
2099  }
2100 
2101  /* If request is set, p_migration_pool has a valid pool pointer. */
2102  ABTI_ASSERT(ABTD_atomic_acquire_load_uint32(&p_thread->request) &
2103  ABTI_THREAD_REQ_MIGRATE);
2104 
2105  /* Unset the migration request. */
2106  ABTI_thread_unset_request(p_thread, ABTI_THREAD_REQ_MIGRATE);
2107 
2108  /* Add the unit to the scheduler's pool */
2109  ABTI_pool_push(p_pool, p_thread->unit);
2110  return ABT_SUCCESS;
2111 }
2112 #endif
2113 
2114 static void xstream_init_main_sched(ABTI_xstream *p_xstream,
2115  ABTI_sched *p_sched)
2116 {
2117  ABTI_ASSERT(p_xstream->p_main_sched == NULL);
2118  /* Set the scheduler as a main scheduler */
2119  p_sched->used = ABTI_SCHED_MAIN;
2120  /* Set the scheduler */
2121  p_xstream->p_main_sched = p_sched;
2122 }
2123 
2124 static int xstream_update_main_sched(ABTI_global *p_global,
2125  ABTI_xstream **pp_local_xstream,
2126  ABTI_xstream *p_xstream,
2127  ABTI_sched *p_sched)
2128 {
2129  ABTI_sched *p_main_sched = p_xstream->p_main_sched;
2130  if (p_main_sched == NULL) {
2131  /* Set the scheduler as a main scheduler */
2132  p_sched->used = ABTI_SCHED_MAIN;
2133  /* Set the scheduler */
2134  p_xstream->p_main_sched = p_sched;
2135  return ABT_SUCCESS;
2136  } else if (*pp_local_xstream != p_xstream) {
2137  /* Changing the scheduler of another execution stream. */
2138  ABTI_ASSERT(p_xstream->ctx.state == ABTD_XSTREAM_CONTEXT_STATE_WAITING);
2139  /* Use the original scheduler's thread. Unit creation might fail, so it
2140  * should be done first. */
2141  ABTI_pool *p_tar_pool = ABTI_pool_get_ptr(p_sched->pools[0]);
2142  int abt_errno =
2143  ABTI_thread_set_associated_pool(p_global,
2144  &p_main_sched->p_ythread->thread,
2145  p_tar_pool);
2146  ABTI_CHECK_ERROR(abt_errno);
2147 
2148  /* Set the scheduler as a main scheduler */
2149  p_sched->used = ABTI_SCHED_MAIN;
2150  p_sched->p_ythread = p_main_sched->p_ythread;
2151  p_main_sched->p_ythread = NULL;
2152  /* p_main_sched is no longer used. */
2153  p_xstream->p_main_sched->used = ABTI_SCHED_NOT_USED;
2154  if (p_xstream->p_main_sched->automatic) {
2155  /* Free that scheduler. */
2156  ABTI_sched_free(p_global, ABTI_xstream_get_local(*pp_local_xstream),
2157  p_xstream->p_main_sched, ABT_FALSE);
2158  }
2159  p_xstream->p_main_sched = p_sched;
2160  return ABT_SUCCESS;
2161  } else {
2162  /* If the ES has a main scheduler, we have to free it */
2163  ABTI_thread *p_thread = (*pp_local_xstream)->p_thread;
2164  ABTI_ASSERT(p_thread->type & ABTI_THREAD_TYPE_YIELDABLE);
2165  ABTI_ythread *p_ythread = ABTI_thread_get_ythread(p_thread);
2166  ABTI_pool *p_tar_pool = ABTI_pool_get_ptr(p_sched->pools[0]);
2167 
2168  /* If the caller ULT is associated with a pool of the current main
2169  * scheduler, it needs to be associated to a pool of new scheduler. */
2170  size_t p;
2171  for (p = 0; p < p_main_sched->num_pools; p++) {
2172  if (p_ythread->thread.p_pool ==
2173  ABTI_pool_get_ptr(p_main_sched->pools[p])) {
2174  /* Associate the work unit to the first pool of new scheduler */
2175  int abt_errno =
2176  ABTI_thread_set_associated_pool(p_global,
2177  &p_ythread->thread,
2178  p_tar_pool);
2179  ABTI_CHECK_ERROR(abt_errno);
2180  break;
2181  }
2182  }
2183 
2184  if (p_main_sched->p_replace_sched) {
2185  /* We need to overwrite the scheduler. Free the existing one. */
2186  ABTI_ythread *p_waiter = p_main_sched->p_replace_waiter;
2187  ABTI_sched_discard_and_free(p_global,
2188  ABTI_xstream_get_local(
2189  *pp_local_xstream),
2190  p_main_sched->p_replace_sched,
2191  ABT_FALSE);
2192  p_main_sched->p_replace_sched = NULL;
2193  p_main_sched->p_replace_waiter = NULL;
2194  /* Resume the waiter. This waiter sees that the scheduler finished
2195  * immediately and was replaced by this new scheduler. */
2196  ABTI_ythread_set_ready(ABTI_xstream_get_local(*pp_local_xstream),
2197  p_waiter);
2198  }
2199  ABTI_ythread_set_blocked(p_ythread);
2200  /* Set the replace scheduler */
2201  p_main_sched->p_replace_sched = p_sched;
2202  p_main_sched->p_replace_waiter = p_ythread;
2203  /* Ask the current main scheduler to replace its scheduler */
2204  ABTI_sched_set_request(p_main_sched, ABTI_SCHED_REQ_REPLACE);
2205 
2206  /* Switch to the current main scheduler. The current ULT is pushed to
2207  * the new scheduler's pool so that when the new scheduler starts, this
2208  * ULT can be scheduled by the new scheduler. The existing main
2209  * scheduler will be freed by ABTI_SCHED_REQ_RELEASE. */
2210  ABTI_ythread_suspend(pp_local_xstream, p_ythread,
2212  return ABT_SUCCESS;
2213  }
2214 }
2215 
2216 static void xstream_update_max_xstreams(ABTI_global *p_global, int newrank)
2217 {
2218  /* The lock must be taken. */
2219  if (newrank >= p_global->max_xstreams) {
2220  static int max_xstreams_warning_once = 0;
2221  if (max_xstreams_warning_once == 0) {
2222  /* Because some Argobots functionalities depend on the runtime value
2223  * ABT_MAX_NUM_XSTREAMS (or p_global->max_xstreams), changing
2224  * this value at run-time can cause an error. For example, using
2225  * ABT_mutex created before updating max_xstreams causes an error
2226  * since ABTI_thread_htable's array size depends on
2227  * ABT_MAX_NUM_XSTREAMS. To fix this issue, please set a larger
2228  * number to ABT_MAX_NUM_XSTREAMS in advance. */
2229  char *warning_message;
2230  int abt_errno =
2231  ABTU_malloc(sizeof(char) * 1024, (void **)&warning_message);
2232  if (!ABTI_IS_ERROR_CHECK_ENABLED || abt_errno == ABT_SUCCESS) {
2233  snprintf(warning_message, 1024,
2234  "Warning: the number of execution streams exceeds "
2235  "ABT_MAX_NUM_XSTREAMS (=%d). This may cause an error.",
2236  p_global->max_xstreams);
2237  HANDLE_WARNING(warning_message);
2238  ABTU_free(warning_message);
2239  max_xstreams_warning_once = 1;
2240  }
2241  }
2242  /* Anyway. let's increase max_xstreams. */
2243  p_global->max_xstreams = newrank + 1;
2244  }
2245 }
2246 
2247 /* Add p_newxstream to the list. This does not check the rank duplication. */
2248 static void xstream_add_xstream_list(ABTI_global *p_global,
2249  ABTI_xstream *p_newxstream)
2250 {
2251  int rank = p_newxstream->rank;
2252  ABTI_xstream *p_prev_xstream = p_global->p_xstream_head;
2253  ABTI_xstream *p_xstream = p_prev_xstream;
2254  /* Check if a certain rank is available */
2255  while (p_xstream) {
2256  ABTI_ASSERT(p_xstream->rank != rank);
2257  if (p_xstream->rank > rank) {
2258  /* Use this p_xstream. */
2259  break;
2260  }
2261  p_prev_xstream = p_xstream;
2262  p_xstream = p_xstream->p_next;
2263  }
2264 
2265  if (!p_xstream) {
2266  /* p_newxstream is appended to p_prev_xstream */
2267  if (p_prev_xstream) {
2268  p_prev_xstream->p_next = p_newxstream;
2269  p_newxstream->p_prev = p_prev_xstream;
2270  p_newxstream->p_next = NULL;
2271  } else {
2272  ABTI_ASSERT(p_global->p_xstream_head == NULL);
2273  p_newxstream->p_prev = NULL;
2274  p_newxstream->p_next = NULL;
2275  p_global->p_xstream_head = p_newxstream;
2276  }
2277  } else {
2278  /* p_newxstream is inserted in the middle.
2279  * (p_xstream->p_prev) -> p_new_xstream -> p_xstream */
2280  if (p_xstream->p_prev) {
2281  p_xstream->p_prev->p_next = p_newxstream;
2282  p_newxstream->p_prev = p_xstream->p_prev;
2283  } else {
2284  /* This p_xstream is the first element */
2285  ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
2286  p_global->p_xstream_head = p_newxstream;
2287  }
2288  p_xstream->p_prev = p_newxstream;
2289  p_newxstream->p_next = p_xstream;
2290  }
2291 }
2292 
2293 /* Remove p_xstream from the list. */
2294 static void xstream_remove_xstream_list(ABTI_global *p_global,
2295  ABTI_xstream *p_xstream)
2296 {
2297  if (!p_xstream->p_prev) {
2298  ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
2299  p_global->p_xstream_head = p_xstream->p_next;
2300  } else {
2301  p_xstream->p_prev->p_next = p_xstream->p_next;
2302  }
2303  if (p_xstream->p_next) {
2304  p_xstream->p_next->p_prev = p_xstream->p_prev;
2305  }
2306 }
2307 
2308 /* Set a new rank to ES */
2309 static ABT_bool xstream_set_new_rank(ABTI_global *p_global,
2310  ABTI_xstream *p_newxstream, int rank)
2311 {
2312  ABTD_spinlock_acquire(&p_global->xstream_list_lock);
2313 
2314  if (rank == -1) {
2315  /* Find an unused rank from 0. */
2316  rank = 0;
2317  ABTI_xstream *p_xstream = p_global->p_xstream_head;
2318  while (p_xstream) {
2319  if (p_xstream->rank == rank) {
2320  rank++;
2321  } else {
2322  /* Use this rank. */
2323  break;
2324  }
2325  p_xstream = p_xstream->p_next;
2326  }
2327  } else {
2328  /* Check if a certain rank is available */
2329  ABTI_xstream *p_xstream = p_global->p_xstream_head;
2330  while (p_xstream) {
2331  if (p_xstream->rank == rank) {
2332  ABTD_spinlock_release(&p_global->xstream_list_lock);
2333  return ABT_FALSE;
2334  } else if (p_xstream->rank > rank) {
2335  break;
2336  }
2337  p_xstream = p_xstream->p_next;
2338  }
2339  }
2340  /* Set the rank */
2341  p_newxstream->rank = rank;
2342  xstream_add_xstream_list(p_global, p_newxstream);
2343  xstream_update_max_xstreams(p_global, rank);
2344  p_global->num_xstreams++;
2345 
2346  ABTD_spinlock_release(&p_global->xstream_list_lock);
2347  return ABT_TRUE;
2348 }
2349 
2350 /* Change the rank of ES */
2351 static ABT_bool xstream_change_rank(ABTI_global *p_global,
2352  ABTI_xstream *p_xstream, int rank)
2353 {
2354  if (p_xstream->rank == rank) {
2355  /* No need to change the rank. */
2356  return ABT_TRUE;
2357  }
2358 
2359  ABTD_spinlock_acquire(&p_global->xstream_list_lock);
2360 
2361  ABTI_xstream *p_next = p_global->p_xstream_head;
2362  /* Check if a certain rank is available. */
2363  while (p_next) {
2364  if (p_next->rank == rank) {
2365  ABTD_spinlock_release(&p_global->xstream_list_lock);
2366  return ABT_FALSE;
2367  } else if (p_next->rank > rank) {
2368  break;
2369  }
2370  p_next = p_next->p_next;
2371  }
2372  /* Let's remove p_xstream from the list first. */
2373  xstream_remove_xstream_list(p_global, p_xstream);
2374  /* Then, let's add this p_xstream. */
2375  p_xstream->rank = rank;
2376  xstream_add_xstream_list(p_global, p_xstream);
2377  xstream_update_max_xstreams(p_global, rank);
2378 
2379  ABTD_spinlock_release(&p_global->xstream_list_lock);
2380  return ABT_TRUE;
2381 }
2382 
2383 static void xstream_return_rank(ABTI_global *p_global, ABTI_xstream *p_xstream)
2384 {
2385  /* Remove this xstream from the global ES list */
2386  ABTD_spinlock_acquire(&p_global->xstream_list_lock);
2387 
2388  xstream_remove_xstream_list(p_global, p_xstream);
2389  p_global->num_xstreams--;
2390 
2391  ABTD_spinlock_release(&p_global->xstream_list_lock);
2392 }
ABT_xstream_set_affinity
int ABT_xstream_set_affinity(ABT_xstream xstream, int num_cpuids, int *cpuids)
Bind an execution stream to target CPUs.
Definition: stream.c:1492
ABT_THREAD_STATE_TERMINATED
@ ABT_THREAD_STATE_TERMINATED
Definition: abt.h:423
HANDLE_WARNING
#define HANDLE_WARNING(msg)
Definition: abti_error.h:46
ABTU_min_int
static int ABTU_min_int(int a, int b)
Definition: abtu.h:45
ABT_ERR_INV_THREAD
#define ABT_ERR_INV_THREAD
Error code: invalid work unit.
Definition: abt.h:176
ABT_sched_predef
ABT_sched_predef
Predefined scheduler type.
Definition: abt.h:465
xstream_join
static ABTU_ret_err int xstream_join(ABTI_local **pp_local, ABTI_xstream *p_xstream)
Definition: stream.c:1889
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1001
ABT_ERR_CPUID
#define ABT_ERR_CPUID
Error code: error related to CPU ID.
Definition: abt.h:398
xstream_update_max_xstreams
static void xstream_update_max_xstreams(ABTI_global *p_global, int newrank)
Definition: stream.c:2221
ABT_thread
struct ABT_thread_opaque * ABT_thread
Work unit handle type.
Definition: abt.h:890
ABT_xstream_self
int ABT_xstream_self(ABT_xstream *xstream)
Get an execution stream that is running the calling work unit.
Definition: stream.c:651
ABT_ERR_INV_XSTREAM
#define ABT_ERR_INV_XSTREAM
Error code: invalid execution stream.
Definition: abt.h:114
xstream_change_rank
static ABT_bool xstream_change_rank(ABTI_global *p_global, ABTI_xstream *p_xstream, int rank)
Definition: stream.c:2356
ABT_xstream_set_rank
int ABT_xstream_set_rank(ABT_xstream xstream, int rank)
Set a rank for an execution stream.
Definition: stream.c:749
ABT_xstream_get_num
int ABT_xstream_get_num(int *num_xstreams)
Get the number of current existing execution streams.
Definition: stream.c:1208
ABT_xstream_get_affinity
int ABT_xstream_get_affinity(ABT_xstream xstream, int max_cpuids, int *cpuids, int *num_cpuids)
Get CPU IDs of CPUs to which an execution stream is bound.
Definition: stream.c:1559
ABT_sched_config
struct ABT_sched_config_opaque * ABT_sched_config
Scheduler configuration handle type.
Definition: abt.h:815
ABT_SCHED_DEFAULT
@ ABT_SCHED_DEFAULT
Definition: abt.h:467
xstream_schedule_ythread
static void xstream_schedule_ythread(ABTI_global *p_global, ABTI_xstream **pp_local_xstream, ABTI_ythread *p_ythread)
Definition: stream.c:1913
xstream_return_rank
static void xstream_return_rank(ABTI_global *p_global, ABTI_xstream *p_xstream)
Definition: stream.c:2388
xstream_remove_xstream_list
static void xstream_remove_xstream_list(ABTI_global *p_global, ABTI_xstream *p_xstream)
Definition: stream.c:2299
ABT_xstream_cancel
int ABT_xstream_cancel(ABT_xstream xstream)
Send a cancellation request to an execution stream.
Definition: stream.c:604
ABT_xstream_create
int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
Create a new execution stream.
Definition: stream.c:87
xstream_migrate_thread
static ABTU_ret_err int xstream_migrate_thread(ABTI_global *p_global, ABTI_local *p_local, ABTI_thread *p_thread)
Definition: stream.c:2078
ABT_xstream_get_main_sched
int ABT_xstream_get_main_sched(ABT_xstream xstream, ABT_sched *sched)
Retrieve the main scheduler of an execution stream.
Definition: stream.c:1051
ABT_pool
struct ABT_pool_opaque * ABT_pool
Pool handle type.
Definition: abt.h:841
ABT_xstream_get_rank
int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
Retrieve a rank of an execution stream.
Definition: stream.c:796
ABT_xstream_self_rank
int ABT_xstream_self_rank(int *rank)
Return a rank of an execution stream associated with a caller.
Definition: stream.c:698
ABT_POOL_ACCESS_MPSC
@ ABT_POOL_ACCESS_MPSC
Definition: abt.h:535
xstream_update_main_sched
static ABTU_ret_err int xstream_update_main_sched(ABTI_global *p_global, ABTI_xstream **pp_local_xstream, ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:2129
ABTU_unreachable
#define ABTU_unreachable()
Definition: abtu.h:133
ABT_xstream_is_primary
int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *is_primary)
Check if the target execution stream is primary.
Definition: stream.c:1246
ABT_sched
struct ABT_sched_opaque * ABT_sched
Scheduler handle type.
Definition: abt.h:808
ABT_ERR_INV_SCHED
#define ABT_ERR_INV_SCHED
Error code: invalid scheduler.
Definition: abt.h:129
ABT_POOL_NULL
#define ABT_POOL_NULL
Definition: abt.h:1059
xstream_init_main_sched
static void xstream_init_main_sched(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:2119
abti.h
xstream_create
static ABTU_ret_err int xstream_create(ABTI_global *p_global, ABTI_sched *p_sched, ABTI_xstream_type xstream_type, int rank, ABT_bool start, ABTI_xstream **pp_xstream)
Definition: stream.c:1781
ABT_SYNC_EVENT_TYPE_OTHER
@ ABT_SYNC_EVENT_TYPE_OTHER
Definition: abt.h:666
ABT_xstream
struct ABT_xstream_opaque * ABT_xstream
Execution stream handle type.
Definition: abt.h:789
ABT_xstream_create_basic
int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABT_xstream *newxstream)
Create a new execution stream with a predefined scheduler.
Definition: stream.c:184
ABT_xstream_create_with_rank
int ABT_xstream_create_with_rank(ABT_sched sched, int rank, ABT_xstream *newxstream)
Create a new execution stream with a specific rank.
Definition: stream.c:281
xstream_set_new_rank
static ABT_bool xstream_set_new_rank(ABTI_global *p_global, ABTI_xstream *p_newxstream, int rank)
Definition: stream.c:2314
ABTU_malloc
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:235
LOG_DEBUG
#define LOG_DEBUG(fmt,...)
Definition: abti_log.h:26
ABT_xstream_check_events
int ABT_xstream_check_events(ABT_sched sched)
Process events associated with a scheduler.
Definition: stream.c:1341
ABT_ERR_INV_UNIT
#define ABT_ERR_INV_UNIT
Error code: invalid work unit for scheduling.
Definition: abt.h:171
ABT_ERR_XSTREAM_STATE
#define ABT_ERR_XSTREAM_STATE
Error code: error related to an execution stream state.
Definition: abt.h:262
ABT_POOL_FIFO
@ ABT_POOL_FIFO
Definition: abt.h:506
ABT_xstream_set_main_sched_basic
int ABT_xstream_set_main_sched_basic(ABT_xstream xstream, ABT_sched_predef predef, int num_pools, ABT_pool *pools)
Set the main scheduler of an execution stream to a predefined scheduler.
Definition: stream.c:997
ABT_xstream_get_main_pools
int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools, ABT_pool *pools)
Get pools associated with the main scheduler of an execution stream.
Definition: stream.c:1091
ABT_xstream_set_cpubind
int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
Bind an execution stream to a target CPU.
Definition: stream.c:1388
xstream_schedule_task
static void xstream_schedule_task(ABTI_global *p_global, ABTI_xstream *p_local_xstream, ABTI_thread *p_task)
Definition: stream.c:2029
ABT_unit
struct ABT_unit_opaque * ABT_unit
Work unit handle type for scheduling.
Definition: abt.h:869
ABT_xstream_exit
int ABT_xstream_exit(void)
Terminate an execution stream that is running the calling ULT.
Definition: stream.c:552
ABT_xstream_join
int ABT_xstream_join(ABT_xstream xstream)
Wait for an execution stream to terminate.
Definition: stream.c:505
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:155
ABT_xstream_get_cpubind
int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
Get CPU ID of a CPU to which an execution stream is bound.
Definition: stream.c:1440
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:748
ABT_XSTREAM_STATE_RUNNING
@ ABT_XSTREAM_STATE_RUNNING
Definition: abt.h:406
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:750
ABT_xstream_free
int ABT_xstream_free(ABT_xstream *xstream)
Free an execution stream.
Definition: stream.c:438
ABT_ERR_INV_ARG
#define ABT_ERR_INV_ARG
Error code: invalid user argument.
Definition: abt.h:250
ABT_xstream_get_state
int ABT_xstream_get_state(ABT_xstream xstream, ABT_xstream_state *state)
Get a state of an execution stream.
Definition: stream.c:1130
ABTU_free
static void ABTU_free(void *ptr)
Definition: abtu.h:228
ABT_xstream_set_main_sched
int ABT_xstream_set_main_sched(ABT_xstream xstream, ABT_sched sched)
Set the main scheduler of an execution stream.
Definition: stream.c:888
xstream_add_xstream_list
static void xstream_add_xstream_list(ABTI_global *p_global, ABTI_xstream *p_newxstream)
Definition: stream.c:2253
ABT_XSTREAM_NULL
#define ABT_XSTREAM_NULL
Definition: abt.h:1055
xstream_launch_root_ythread
static void * xstream_launch_root_ythread(void *p_xstream)
Definition: stream.c:1755
ABT_ERR_INV_XSTREAM_RANK
#define ABT_ERR_INV_XSTREAM_RANK
Error code: invalid execution stream rank.
Definition: abt.h:119
ABT_THREAD_STATE_RUNNING
@ ABT_THREAD_STATE_RUNNING
Definition: abt.h:419
ABT_xstream_revive
int ABT_xstream_revive(ABT_xstream xstream)
Revive a terminated execution stream.
Definition: stream.c:358
ABT_xstream_run_unit
int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool)
Execute a work unit.
Definition: stream.c:1288
ABT_UNIT_NULL
#define ABT_UNIT_NULL
Definition: abt.h:1061
ABT_xstream_equal
int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2, ABT_bool *result)
Compare two execution stream handles for equality.
Definition: stream.c:1169
ABT_XSTREAM_STATE_TERMINATED
@ ABT_XSTREAM_STATE_TERMINATED
Definition: abt.h:408
ABT_xstream_state
ABT_xstream_state
State of an execution stream.
Definition: abt.h:404