ARGOBOTS  dce6e727ffc4ca5b3ffc04cb9517c6689be51ec5
stream.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 ABTU_ret_err static int xstream_create(ABTI_global *p_global,
9  ABTI_sched *p_sched,
10  ABTI_xstream_type xstream_type, int rank,
11  ABT_bool start,
12  ABTI_xstream **pp_xstream);
13 ABTU_ret_err static int xstream_join(ABTI_local **pp_local,
14  ABTI_xstream *p_xstream);
15 static ABT_bool xstream_set_new_rank(ABTI_global *p_global,
16  ABTI_xstream *p_newxstream, int rank);
17 static ABT_bool xstream_change_rank(ABTI_global *p_global,
18  ABTI_xstream *p_xstream, int rank);
19 static void xstream_return_rank(ABTI_global *p_global, ABTI_xstream *p_xstream);
20 static void xstream_init_main_sched(ABTI_xstream *p_xstream,
21  ABTI_sched *p_sched);
22 ABTU_ret_err static int
23 xstream_update_main_sched(ABTI_global *p_global,
24  ABTI_xstream **pp_local_xstream,
25  ABTI_xstream *p_xstream, ABTI_sched *p_sched);
26 static void *xstream_launch_root_ythread(void *p_xstream);
27 
76 int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
77 {
78  ABTI_UB_ASSERT(ABTI_initialized());
79  ABTI_UB_ASSERT(newxstream);
80 
81 #ifndef ABT_CONFIG_ENABLE_VER_20_API
82  /* Argobots 1.x sets newxstream to NULL on error. */
83  *newxstream = ABT_XSTREAM_NULL;
84 #endif
85  int abt_errno;
86  ABTI_xstream *p_newxstream;
87 
88  ABTI_global *p_global;
89  ABTI_SETUP_GLOBAL(&p_global);
90 
91  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
92  if (!p_sched) {
93  abt_errno =
94  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
95  ABTI_CHECK_ERROR(abt_errno);
96  } else {
97 #ifndef ABT_CONFIG_ENABLE_VER_20_API
98  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
100 #else
101  ABTI_UB_ASSERT(p_sched->used == ABTI_SCHED_NOT_USED);
102 #endif
103  }
104 
105  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_SECONDARY,
106  -1, ABT_TRUE, &p_newxstream);
107  if (abt_errno != ABT_SUCCESS) {
108  if (!ABTI_sched_get_ptr(sched)) {
109  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
110  ABT_FALSE);
111  }
112  ABTI_HANDLE_ERROR(abt_errno);
113  }
114 
115  /* Return value */
116  *newxstream = ABTI_xstream_get_handle(p_newxstream);
117  return ABT_SUCCESS;
118 }
119 
173 int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools,
174  ABT_pool *pools, ABT_sched_config config,
175  ABT_xstream *newxstream)
176 {
177  ABTI_UB_ASSERT(ABTI_initialized());
178  ABTI_UB_ASSERT(pools || num_pools <= 0);
179  ABTI_UB_ASSERT(newxstream);
180 
181 #ifndef ABT_CONFIG_ENABLE_VER_20_API
182  /* Argobots 1.x sets newxstream to NULL on error. */
183  *newxstream = ABT_XSTREAM_NULL;
184 #endif
185  ABTI_CHECK_TRUE(num_pools >= 0, ABT_ERR_INV_ARG);
186 
187  int abt_errno;
188  ABTI_xstream *p_newxstream;
189  ABTI_sched_config *p_config = ABTI_sched_config_get_ptr(config);
190 
191  ABTI_global *p_global;
192  ABTI_SETUP_GLOBAL(&p_global);
193 
194  ABTI_sched *p_sched;
195  abt_errno =
196  ABTI_sched_create_basic(predef, num_pools, pools, p_config, &p_sched);
197  ABTI_CHECK_ERROR(abt_errno);
198 
199  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_SECONDARY,
200  -1, ABT_TRUE, &p_newxstream);
201  if (abt_errno != ABT_SUCCESS) {
202  int i;
203  for (i = 0; i < num_pools; i++) {
204  if (pools[i] != ABT_POOL_NULL) {
205  /* Avoid freeing user-given pools. */
206  ABTI_pool_release(ABTI_pool_get_ptr(p_sched->pools[i]));
207  p_sched->pools[i] = ABT_POOL_NULL;
208  }
209  }
210  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
211  ABT_FALSE);
212  ABTI_HANDLE_ERROR(abt_errno);
213  }
214 
215  *newxstream = ABTI_xstream_get_handle(p_newxstream);
216  return ABT_SUCCESS;
217 }
218 
271  ABT_xstream *newxstream)
272 {
273  ABTI_UB_ASSERT(ABTI_initialized());
274  ABTI_UB_ASSERT(newxstream);
275 
276 #ifndef ABT_CONFIG_ENABLE_VER_20_API
277  /* Argobots 1.x sets newxstream to NULL on error. */
278  *newxstream = ABT_XSTREAM_NULL;
279 #endif
280  int abt_errno;
281  ABTI_xstream *p_newxstream;
282 
283  ABTI_global *p_global;
284  ABTI_SETUP_GLOBAL(&p_global);
285 
286  ABTI_CHECK_TRUE(rank >= 0, ABT_ERR_INV_XSTREAM_RANK);
287 
288  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
289  if (!p_sched) {
290  abt_errno =
291  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
292  ABTI_CHECK_ERROR(abt_errno);
293  } else {
294 #ifndef ABT_CONFIG_ENABLE_VER_20_API
295  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
297 #else
298  ABTI_UB_ASSERT(p_sched->used == ABTI_SCHED_NOT_USED);
299 #endif
300  }
301 
302  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_SECONDARY,
303  rank, ABT_TRUE, &p_newxstream);
304  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
305  if (!ABTI_sched_get_ptr(sched)) {
306  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
307  ABT_FALSE);
308  }
309  ABTI_HANDLE_ERROR(abt_errno);
310  }
311 
312  /* Return value */
313  *newxstream = ABTI_xstream_get_handle(p_newxstream);
314  return ABT_SUCCESS;
315 }
316 
348 {
349  ABTI_UB_ASSERT(ABTI_initialized());
350 
351  ABTI_global *p_global = ABTI_global_get_global();
352  ABTI_local *p_local = ABTI_local_get_local();
353  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
354  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
355 
356  /* Revives the main scheduler thread. */
357  ABTI_sched *p_main_sched = p_xstream->p_main_sched;
358  ABTI_ythread *p_main_sched_ythread = p_main_sched->p_ythread;
359  /* TODO: should we check the thread state instead of the xstream state? */
360  ABTI_CHECK_TRUE(ABTD_atomic_relaxed_load_int(
361  &p_main_sched_ythread->thread.state) ==
364 
365  ABTD_atomic_relaxed_store_uint32(&p_main_sched->request, 0);
366  ABTI_event_thread_join(p_local, &p_main_sched_ythread->thread,
367  ABTI_local_get_xstream_or_null(p_local)
368  ? ABTI_local_get_xstream(p_local)->p_thread
369  : NULL);
370 
371  int abt_errno =
372  ABTI_thread_revive(p_global, p_local, p_xstream->p_root_pool,
373  p_main_sched_ythread->thread.f_thread,
374  p_main_sched_ythread->thread.p_arg,
375  &p_main_sched_ythread->thread);
376  /* ABTI_thread_revive() never fails since it does not update an associated
377  * pool.*/
378  assert(abt_errno == ABT_SUCCESS);
379 
380  ABTD_atomic_relaxed_store_int(&p_xstream->state, ABT_XSTREAM_STATE_RUNNING);
381  ABTD_xstream_context_revive(&p_xstream->ctx);
382  return ABT_SUCCESS;
383 }
384 
426 int ABT_xstream_free(ABT_xstream *xstream)
427 {
428  ABTI_UB_ASSERT(ABTI_initialized());
429  ABTI_UB_ASSERT(xstream);
430 
431  ABTI_global *p_global;
432  ABTI_SETUP_GLOBAL(&p_global);
433 
434  ABTI_local *p_local = ABTI_local_get_local();
435  ABT_xstream h_xstream = *xstream;
436 
437  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(h_xstream);
438  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
439 
440  /* We first need to check whether p_local_xstream is NULL because this
441  * routine might be called by external threads. */
442  ABTI_CHECK_TRUE_MSG(p_xstream != ABTI_local_get_xstream_or_null(p_local),
444  "The current xstream cannot be freed.");
445 
446  ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
448  "The primary xstream cannot be freed explicitly.");
449 
450  /* Wait until xstream terminates */
451  int abt_errno = xstream_join(&p_local, p_xstream);
452  ABTI_CHECK_ERROR(abt_errno);
453 
454  /* Free the xstream object */
455  ABTI_xstream_free(p_global, p_local, p_xstream, ABT_FALSE);
456 
457  /* Return value */
458  *xstream = ABT_XSTREAM_NULL;
459  return ABT_SUCCESS;
460 }
461 
493 int ABT_xstream_join(ABT_xstream xstream)
494 {
495  ABTI_UB_ASSERT(ABTI_initialized());
496 
497  ABTI_local *p_local = ABTI_local_get_local();
498  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
499  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
500 
501  int abt_errno = xstream_join(&p_local, p_xstream);
502  ABTI_CHECK_ERROR(abt_errno);
503  return ABT_SUCCESS;
504 }
505 
540 int ABT_xstream_exit(void)
541 {
542  ABTI_xstream *p_local_xstream;
543  ABTI_ythread *p_ythread;
544 #ifndef ABT_CONFIG_ENABLE_VER_20_API
545  ABTI_SETUP_GLOBAL(NULL);
546 #else
547  ABTI_UB_ASSERT(ABTI_initialized());
548 #endif
549  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, &p_ythread);
550  /* Check if the target is the primary execution stream. */
551  ABTI_CHECK_TRUE(p_local_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
553 
554  /* Terminate the main scheduler. */
555  ABTD_atomic_fetch_or_uint32(&p_local_xstream->p_main_sched->p_ythread
556  ->thread.request,
557  ABTI_THREAD_REQ_CANCEL);
558  /* Terminate this ULT */
559  ABTI_ythread_exit(p_local_xstream, p_ythread);
561  return ABT_SUCCESS;
562 }
563 
592 int ABT_xstream_cancel(ABT_xstream xstream)
593 {
594  ABTI_UB_ASSERT(ABTI_initialized());
595 
596  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
597  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
598  ABTI_CHECK_TRUE(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
600 
601  /* Terminate the main scheduler of the target xstream. */
602  ABTD_atomic_fetch_or_uint32(&p_xstream->p_main_sched->p_ythread->thread
603  .request,
604  ABTI_THREAD_REQ_CANCEL);
605  return ABT_SUCCESS;
606 }
607 
639 int ABT_xstream_self(ABT_xstream *xstream)
640 {
641  ABTI_UB_ASSERT(xstream);
642 
643  ABTI_xstream *p_local_xstream;
644 #ifndef ABT_CONFIG_ENABLE_VER_20_API
645  *xstream = ABT_XSTREAM_NULL;
646  ABTI_SETUP_GLOBAL(NULL);
647 #else
648  ABTI_UB_ASSERT(ABTI_initialized());
649 #endif
650  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
651 
652  /* Return value */
653  *xstream = ABTI_xstream_get_handle(p_local_xstream);
654  return ABT_SUCCESS;
655 }
656 
686 int ABT_xstream_self_rank(int *rank)
687 {
688  ABTI_UB_ASSERT(rank);
689 
690  ABTI_xstream *p_local_xstream;
691 #ifndef ABT_CONFIG_ENABLE_VER_20_API
692  ABTI_SETUP_GLOBAL(NULL);
693 #else
694  ABTI_UB_ASSERT(ABTI_initialized());
695 #endif
696  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
697  /* Return value */
698  *rank = (int)p_local_xstream->rank;
699  return ABT_SUCCESS;
700 }
701 
737 int ABT_xstream_set_rank(ABT_xstream xstream, int rank)
738 {
739  ABTI_UB_ASSERT(ABTI_initialized());
740 
741  ABTI_global *p_global;
742  ABTI_SETUP_GLOBAL(&p_global);
743 
744  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
745  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
746  ABTI_CHECK_TRUE(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
748  ABTI_CHECK_TRUE(rank >= 0, ABT_ERR_INV_XSTREAM_RANK);
749 
750  ABT_bool is_changed = xstream_change_rank(p_global, p_xstream, rank);
751  ABTI_CHECK_TRUE(is_changed, ABT_ERR_INV_XSTREAM_RANK);
752 
753  /* Set the CPU affinity for the ES */
754  if (p_global->set_affinity == ABT_TRUE) {
755  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
756  }
757  return ABT_SUCCESS;
758 }
759 
784 int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
785 {
786  ABTI_UB_ASSERT(ABTI_initialized());
787  ABTI_UB_ASSERT(rank);
788 
789  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
790  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
791 
792  *rank = (int)p_xstream->rank;
793  return ABT_SUCCESS;
794 }
795 
875 {
876  ABTI_UB_ASSERT(ABTI_initialized());
877 
878  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
879  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
880 
881  ABTI_global *p_global;
882  ABTI_SETUP_GLOBAL(&p_global);
883 
884  ABTI_xstream *p_local_xstream;
885  ABTI_ythread *p_self;
886  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, &p_self);
887 
888 #ifndef ABT_CONFIG_ENABLE_VER_20_API
889  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_xstream->state) !=
891  p_local_xstream == p_xstream,
893 #else
894  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_xstream->state) !=
896  p_local_xstream == p_xstream,
898 #endif
899 
900  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
901  if (!p_sched) {
902  int abt_errno =
903  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
904  ABTI_CHECK_ERROR(abt_errno);
905  } else {
906 #ifndef ABT_CONFIG_ENABLE_VER_20_API
907  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
909 #else
910  ABTI_UB_ASSERT(p_sched->used == ABTI_SCHED_NOT_USED);
911 #endif
912  }
913 
914  int abt_errno = xstream_update_main_sched(p_global, &p_local_xstream,
915  p_xstream, p_sched);
916  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
917  if (!ABTI_sched_get_ptr(sched)) {
918  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
919  ABT_FALSE);
920  }
921  ABTI_HANDLE_ERROR(abt_errno);
922  }
923  return ABT_SUCCESS;
924 }
925 
980  ABT_sched_predef predef, int num_pools,
981  ABT_pool *pools)
982 {
983  ABTI_UB_ASSERT(ABTI_initialized());
984  ABTI_UB_ASSERT(pools || num_pools <= 0);
985 
986  int abt_errno;
987  ABTI_global *p_global;
988  ABTI_SETUP_GLOBAL(&p_global);
989 
990  ABTI_xstream *p_local_xstream;
991  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
992 
993  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
994  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
995 
996  ABTI_sched *p_sched;
997  abt_errno =
998  ABTI_sched_create_basic(predef, num_pools, pools, NULL, &p_sched);
999  ABTI_CHECK_ERROR(abt_errno);
1000 
1001  abt_errno = xstream_update_main_sched(p_global, &p_local_xstream, p_xstream,
1002  p_sched);
1003  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
1004  int i;
1005  for (i = 0; i < num_pools; i++) {
1006  if (pools[i] != ABT_POOL_NULL) {
1007  /* Avoid freeing user-given pools. */
1008  ABTI_pool_release(ABTI_pool_get_ptr(p_sched->pools[i]));
1009  p_sched->pools[i] = ABT_POOL_NULL;
1010  }
1011  }
1012  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
1013  ABT_FALSE);
1014  ABTI_HANDLE_ERROR(abt_errno);
1015  }
1016  return ABT_SUCCESS;
1017 }
1018 
1042 {
1043  ABTI_UB_ASSERT(ABTI_initialized());
1044  ABTI_UB_ASSERT(sched);
1045 
1046  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1047  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1048 
1049  *sched = ABTI_sched_get_handle(p_xstream->p_main_sched);
1050  return ABT_SUCCESS;
1051 }
1052 
1081 int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools,
1082  ABT_pool *pools)
1083 {
1084  ABTI_UB_ASSERT(ABTI_initialized());
1085  ABTI_UB_ASSERT(pools || max_pools <= 0);
1086 
1087  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1088  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1089 
1090  ABTI_sched *p_sched = p_xstream->p_main_sched;
1091  max_pools = ABTU_min_int(p_sched->num_pools, max_pools);
1092  memcpy(pools, p_sched->pools, sizeof(ABT_pool) * max_pools);
1093  return ABT_SUCCESS;
1094 }
1095 
1121 {
1122  ABTI_UB_ASSERT(ABTI_initialized());
1123  ABTI_UB_ASSERT(state);
1124 
1125  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1126  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1127 
1128  *state = (ABT_xstream_state)ABTD_atomic_acquire_load_int(&p_xstream->state);
1129  return ABT_SUCCESS;
1130 }
1131 
1159 int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2,
1160  ABT_bool *result)
1161 {
1162  ABTI_UB_ASSERT(result);
1163 
1164  ABTI_xstream *p_xstream1 = ABTI_xstream_get_ptr(xstream1);
1165  ABTI_xstream *p_xstream2 = ABTI_xstream_get_ptr(xstream2);
1166  *result = (p_xstream1 == p_xstream2) ? ABT_TRUE : ABT_FALSE;
1167  return ABT_SUCCESS;
1168 }
1169 
1198 int ABT_xstream_get_num(int *num_xstreams)
1199 {
1200  ABTI_UB_ASSERT(num_xstreams);
1201 #ifdef ABT_CONFIG_ENABLE_VER_20_API
1202  ABTI_UB_ASSERT(ABTI_initialized());
1203 #endif
1204 
1205  ABTI_global *p_global;
1206  ABTI_SETUP_GLOBAL(&p_global);
1207 
1208  *num_xstreams = p_global->num_xstreams;
1209  return ABT_SUCCESS;
1210 }
1211 
1236 int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *is_primary)
1237 {
1238  ABTI_UB_ASSERT(ABTI_initialized());
1239  ABTI_UB_ASSERT(is_primary);
1240 
1241  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1242  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1243 
1244  /* Return value */
1245  *is_primary =
1246  (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) ? ABT_TRUE : ABT_FALSE;
1247  return ABT_SUCCESS;
1248 }
1249 
1278 int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool)
1279 {
1280  ABTI_UB_ASSERT(ABTI_initialized());
1282  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
1283  ABTI_CHECK_NULL_POOL_PTR(p_pool);
1284  ABTI_CHECK_TRUE(unit != ABT_UNIT_NULL, ABT_ERR_INV_UNIT);
1285  ABTI_global *p_global;
1286  ABTI_SETUP_GLOBAL(&p_global);
1287 
1288  ABTI_xstream *p_local_xstream;
1289  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
1290 
1291  ABTI_thread *p_thread;
1292  int abt_errno =
1293  ABTI_unit_set_associated_pool(p_global, unit, p_pool, &p_thread);
1294  ABTI_CHECK_ERROR(abt_errno);
1295  ABTI_ythread_schedule(p_global, &p_local_xstream, p_thread);
1296  return ABT_SUCCESS;
1297 }
1298 
1332 {
1333  ABTI_xstream *p_local_xstream;
1334 #ifndef ABT_CONFIG_ENABLE_VER_20_API
1335  ABTI_SETUP_GLOBAL(NULL);
1336 #else
1337  ABTI_UB_ASSERT(ABTI_initialized());
1338 #endif
1339  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
1340 
1341  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
1342  ABTI_CHECK_NULL_SCHED_PTR(p_sched);
1343  ABTI_CHECK_TRUE(p_local_xstream->p_thread == &p_sched->p_ythread->thread,
1345 
1346  ABTI_xstream_check_events(p_local_xstream, p_sched);
1347  return ABT_SUCCESS;
1348 }
1349 
1378 int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
1379 {
1380  ABTI_UB_ASSERT(ABTI_initialized());
1382  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1383  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1384 
1385  ABTD_affinity_cpuset cpuset;
1386  cpuset.num_cpuids = 1;
1387  cpuset.cpuids = &cpuid;
1388  int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &cpuset);
1389  /* Do not free cpuset since cpuids points to a user pointer. */
1390  ABTI_CHECK_ERROR(abt_errno);
1391  return ABT_SUCCESS;
1392 }
1393 
1429 int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
1430 {
1431  ABTI_UB_ASSERT(ABTI_initialized());
1432  ABTI_UB_ASSERT(cpuid);
1434  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1435  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1436 
1437  int num_cpuid;
1438  int cpuids[1];
1439  int abt_errno =
1440  ABTD_affinity_cpuset_read(&p_xstream->ctx, 1, cpuids, &num_cpuid);
1441  ABTI_CHECK_ERROR(abt_errno);
1442  ABTI_CHECK_TRUE(num_cpuid > 0, ABT_ERR_CPUID);
1443 
1444  *cpuid = cpuids[0];
1445  return ABT_SUCCESS;
1446 }
1447 
1481 int ABT_xstream_set_affinity(ABT_xstream xstream, int num_cpuids, int *cpuids)
1482 {
1483  ABTI_UB_ASSERT(ABTI_initialized());
1484  ABTI_UB_ASSERT(cpuids || num_cpuids <= 0);
1486  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1487  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1488  ABTI_CHECK_TRUE(num_cpuids >= 0, ABT_ERR_INV_ARG);
1489 
1490  ABTD_affinity_cpuset affinity;
1491  affinity.num_cpuids = num_cpuids;
1492  affinity.cpuids = cpuids;
1493  int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &affinity);
1494  /* Do not free affinity since cpuids may not be freed. */
1495  ABTI_CHECK_ERROR(abt_errno);
1496  return ABT_SUCCESS;
1497 }
1498 
1547 int ABT_xstream_get_affinity(ABT_xstream xstream, int max_cpuids, int *cpuids,
1548  int *num_cpuids)
1549 {
1550  ABTI_UB_ASSERT(ABTI_initialized());
1551  ABTI_UB_ASSERT(cpuids || max_cpuids <= 0);
1553  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1554  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1555  ABTI_CHECK_TRUE(max_cpuids >= 0, ABT_ERR_INV_ARG);
1556 
1557  int abt_errno = ABTD_affinity_cpuset_read(&p_xstream->ctx, max_cpuids,
1558  cpuids, num_cpuids);
1559  ABTI_CHECK_ERROR(abt_errno);
1560  return abt_errno;
1561 }
1562 
1563 /*****************************************************************************/
1564 /* Private APIs */
1565 /*****************************************************************************/
1566 
1567 ABTU_ret_err int ABTI_xstream_create_primary(ABTI_global *p_global,
1568  ABTI_xstream **pp_xstream)
1569 {
1570  int abt_errno;
1571  ABTI_xstream *p_newxstream;
1572  ABTI_sched *p_sched;
1573 
1574  /* For the primary ES, a default scheduler is created. */
1575  abt_errno =
1576  ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL, NULL, &p_sched);
1577  ABTI_CHECK_ERROR(abt_errno);
1578 
1579  abt_errno = xstream_create(p_global, p_sched, ABTI_XSTREAM_TYPE_PRIMARY, -1,
1580  ABT_FALSE, &p_newxstream);
1581  if (abt_errno != ABT_SUCCESS) {
1582  ABTI_sched_free(p_global, ABTI_local_get_local_uninlined(), p_sched,
1583  ABT_TRUE);
1584  ABTI_HANDLE_ERROR(abt_errno);
1585  }
1586 
1587  *pp_xstream = p_newxstream;
1588  return ABT_SUCCESS;
1589 }
1590 
1591 /* This routine starts the primary ES. It should be called in ABT_init. */
1592 void ABTI_xstream_start_primary(ABTI_global *p_global,
1593  ABTI_xstream **pp_local_xstream,
1594  ABTI_xstream *p_xstream,
1595  ABTI_ythread *p_ythread)
1596 {
1597  /* p_ythread must be the main thread. */
1598  ABTI_ASSERT(p_ythread->thread.type & ABTI_THREAD_TYPE_PRIMARY);
1599  /* The ES's state must be running here. */
1600  ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
1602 
1603  ABTD_xstream_context_set_self(&p_xstream->ctx);
1604 
1605  /* Set the CPU affinity for the ES */
1606  if (p_global->set_affinity == ABT_TRUE) {
1607  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
1608  }
1609 
1610  /* Context switch to the root thread. */
1611  p_xstream->p_root_ythread->thread.p_last_xstream = p_xstream;
1612  ABTI_ythread_context_switch(*pp_local_xstream, p_ythread,
1613  p_xstream->p_root_ythread);
1614  /* Come back to the primary thread. Now this thread is executed on top of
1615  * the main scheduler, which is running on the root thread. */
1616  (*pp_local_xstream)->p_thread = &p_ythread->thread;
1617 }
1618 
1619 void ABTI_xstream_check_events(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
1620 {
1621  ABTI_info_check_print_all_thread_stacks();
1622 
1623  uint32_t request = ABTD_atomic_acquire_load_uint32(
1624  &p_xstream->p_main_sched->p_ythread->thread.request);
1625  if (request & ABTI_THREAD_REQ_JOIN) {
1626  ABTI_sched_finish(p_sched);
1627  }
1628 
1629  if (request & ABTI_THREAD_REQ_CANCEL) {
1630  ABTI_sched_exit(p_sched);
1631  }
1632 }
1633 
1634 void ABTI_xstream_free(ABTI_global *p_global, ABTI_local *p_local,
1635  ABTI_xstream *p_xstream, ABT_bool force_free)
1636 {
1637  /* Clean up memory pool. */
1638  ABTI_mem_finalize_local(p_xstream);
1639  /* Return rank for reuse. rank must be returned prior to other free
1640  * functions so that other xstreams cannot refer to this xstream. */
1641  xstream_return_rank(p_global, p_xstream);
1642 
1643  /* Free the scheduler */
1644  ABTI_sched *p_cursched = p_xstream->p_main_sched;
1645  if (p_cursched != NULL) {
1646  /* Join a scheduler thread. */
1647  ABTI_event_thread_join(p_local, &p_cursched->p_ythread->thread,
1648  ABTI_local_get_xstream_or_null(p_local)
1649  ? ABTI_local_get_xstream(p_local)->p_thread
1650  : NULL);
1651  ABTI_sched_discard_and_free(p_global, p_local, p_cursched, force_free);
1652  /* The main scheduler thread is also freed. */
1653  }
1654 
1655  /* Free the root thread and pool. */
1656  ABTI_ythread_free_root(p_global, p_local, p_xstream->p_root_ythread);
1657  ABTI_pool_free(p_xstream->p_root_pool);
1658 
1659  /* Free the context if a given xstream is secondary. */
1660  if (p_xstream->type == ABTI_XSTREAM_TYPE_SECONDARY) {
1661  ABTD_xstream_context_free(&p_xstream->ctx);
1662  }
1663 
1664  ABTU_free(p_xstream);
1665 }
1666 
1667 void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os, int indent,
1668  ABT_bool print_sub)
1669 {
1670  if (p_xstream == NULL) {
1671  fprintf(p_os, "%*s== NULL ES ==\n", indent, "");
1672  } else {
1673  const char *type, *state;
1674  switch (p_xstream->type) {
1675  case ABTI_XSTREAM_TYPE_PRIMARY:
1676  type = "PRIMARY";
1677  break;
1678  case ABTI_XSTREAM_TYPE_SECONDARY:
1679  type = "SECONDARY";
1680  break;
1681  default:
1682  type = "UNKNOWN";
1683  break;
1684  }
1685  switch (ABTD_atomic_acquire_load_int(&p_xstream->state)) {
1687  state = "RUNNING";
1688  break;
1690  state = "TERMINATED";
1691  break;
1692  default:
1693  state = "UNKNOWN";
1694  break;
1695  }
1696 
1697  fprintf(p_os,
1698  "%*s== ES (%p) ==\n"
1699  "%*srank : %d\n"
1700  "%*stype : %s\n"
1701  "%*sstate : %s\n"
1702  "%*sroot_ythread : %p\n"
1703  "%*sroot_pool : %p\n"
1704  "%*sthread : %p\n"
1705  "%*smain_sched : %p\n",
1706  indent, "", (void *)p_xstream, indent, "", p_xstream->rank,
1707  indent, "", type, indent, "", state, indent, "",
1708  (void *)p_xstream->p_root_ythread, indent, "",
1709  (void *)p_xstream->p_root_pool, indent, "",
1710  (void *)p_xstream->p_thread, indent, "",
1711  (void *)p_xstream->p_main_sched);
1712 
1713  if (print_sub == ABT_TRUE) {
1714  ABTI_sched_print(p_xstream->p_main_sched, p_os,
1715  indent + ABTI_INDENT, ABT_TRUE);
1716  }
1717  fprintf(p_os, "%*sctx :\n", indent, "");
1718  ABTD_xstream_context_print(&p_xstream->ctx, p_os, indent + ABTI_INDENT);
1719  }
1720  fflush(p_os);
1721 }
1722 
1723 static void *xstream_launch_root_ythread(void *p_xstream)
1724 {
1725  ABTI_xstream *p_local_xstream = (ABTI_xstream *)p_xstream;
1726 
1727  /* Initialization of the local variables */
1728  ABTI_local_set_xstream(p_local_xstream);
1729 
1730  /* Set the root thread as the current thread */
1731  ABTI_ythread *p_root_ythread = p_local_xstream->p_root_ythread;
1732  p_local_xstream->p_thread = &p_local_xstream->p_root_ythread->thread;
1733  p_root_ythread->thread.p_last_xstream = p_local_xstream;
1734 
1735  /* Run the root thread. */
1736  p_root_ythread->thread.f_thread(p_root_ythread->thread.p_arg);
1737  ABTI_thread_terminate(ABTI_global_get_global(), p_local_xstream,
1738  &p_root_ythread->thread);
1739 
1740  /* Reset the current ES and its local info. */
1741  ABTI_local_set_xstream(NULL);
1742  return NULL;
1743 }
1744 
1745 /*****************************************************************************/
1746 /* Internal static functions */
1747 /*****************************************************************************/
1748 
1749 ABTU_ret_err static int xstream_create(ABTI_global *p_global,
1750  ABTI_sched *p_sched,
1751  ABTI_xstream_type xstream_type, int rank,
1752  ABT_bool start,
1753  ABTI_xstream **pp_xstream)
1755  int abt_errno, init_stage = 0;
1756  ABTI_xstream *p_newxstream;
1757 
1758  abt_errno = ABTU_malloc(sizeof(ABTI_xstream), (void **)&p_newxstream);
1759  ABTI_CHECK_ERROR(abt_errno);
1760 
1761  p_newxstream->p_prev = NULL;
1762  p_newxstream->p_next = NULL;
1763 
1764  if (xstream_set_new_rank(p_global, p_newxstream, rank) == ABT_FALSE) {
1765  abt_errno = ABT_ERR_INV_XSTREAM_RANK;
1766  goto FAILED;
1767  }
1768  init_stage = 1;
1769 
1770  p_newxstream->type = xstream_type;
1771  ABTD_atomic_relaxed_store_int(&p_newxstream->state,
1773  p_newxstream->p_main_sched = NULL;
1774  p_newxstream->p_thread = NULL;
1775  abt_errno = ABTI_mem_init_local(p_global, p_newxstream);
1776  if (abt_errno != ABT_SUCCESS)
1777  goto FAILED;
1778  init_stage = 2;
1779 
1780  /* Set the main scheduler */
1781  xstream_init_main_sched(p_newxstream, p_sched);
1782 
1783  /* Create the root thread. */
1784  abt_errno =
1785  ABTI_ythread_create_root(p_global, ABTI_xstream_get_local(p_newxstream),
1786  p_newxstream, &p_newxstream->p_root_ythread);
1787  if (abt_errno != ABT_SUCCESS)
1788  goto FAILED;
1789  init_stage = 3;
1790 
1791  /* Create the root pool. */
1792  abt_errno = ABTI_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPSC,
1793  ABT_FALSE, &p_newxstream->p_root_pool);
1794  if (abt_errno != ABT_SUCCESS)
1795  goto FAILED;
1796  init_stage = 4;
1797 
1798  /* Create the main scheduler thread. */
1799  abt_errno =
1800  ABTI_ythread_create_main_sched(p_global,
1801  ABTI_xstream_get_local(p_newxstream),
1802  p_newxstream,
1803  p_newxstream->p_main_sched);
1804  if (abt_errno != ABT_SUCCESS)
1805  goto FAILED;
1806  init_stage = 5;
1807 
1808  if (start) {
1809  /* The ES's state must be RUNNING */
1810  ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_newxstream->state) ==
1812  ABTI_ASSERT(p_newxstream->type != ABTI_XSTREAM_TYPE_PRIMARY);
1813  /* Start the main scheduler on a different ES */
1814  abt_errno = ABTD_xstream_context_create(xstream_launch_root_ythread,
1815  (void *)p_newxstream,
1816  &p_newxstream->ctx);
1817  if (abt_errno != ABT_SUCCESS)
1818  goto FAILED;
1819  init_stage = 6;
1820 
1821  /* Set the CPU affinity for the ES */
1822  if (p_global->set_affinity == ABT_TRUE) {
1823  ABTD_affinity_cpuset_apply_default(&p_newxstream->ctx,
1824  p_newxstream->rank);
1825  }
1826  }
1827 
1828  /* Return value */
1829  *pp_xstream = p_newxstream;
1830  return ABT_SUCCESS;
1831 FAILED:
1832  if (init_stage >= 5) {
1833  ABTI_thread_free(p_global, ABTI_xstream_get_local(p_newxstream),
1834  &p_newxstream->p_main_sched->p_ythread->thread);
1835  p_newxstream->p_main_sched->p_ythread = NULL;
1836  }
1837  if (init_stage >= 4) {
1838  ABTI_pool_free(p_newxstream->p_root_pool);
1839  }
1840  if (init_stage >= 3) {
1841  ABTI_ythread_free_root(p_global, ABTI_xstream_get_local(p_newxstream),
1842  p_newxstream->p_root_ythread);
1843  }
1844  if (init_stage >= 2) {
1845  p_sched->used = ABTI_SCHED_NOT_USED;
1846  ABTI_mem_finalize_local(p_newxstream);
1847  }
1848  if (init_stage >= 1) {
1849  xstream_return_rank(p_global, p_newxstream);
1850  }
1851  ABTU_free(p_newxstream);
1852  return abt_errno;
1853 }
1854 
1855 ABTU_ret_err static int xstream_join(ABTI_local **pp_local,
1856  ABTI_xstream *p_xstream)
1857 {
1858  /* The primary ES cannot be joined. */
1859  ABTI_CHECK_TRUE(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
1861  /* The main scheduler cannot join itself. */
1862  ABTI_CHECK_TRUE(!ABTI_local_get_xstream_or_null(*pp_local) ||
1863  &p_xstream->p_main_sched->p_ythread->thread !=
1864  ABTI_local_get_xstream(*pp_local)->p_thread,
1866 
1867  /* Wait until the target ES terminates */
1868  ABTI_sched_finish(p_xstream->p_main_sched);
1869  ABTI_thread_join(pp_local, &p_xstream->p_main_sched->p_ythread->thread);
1870 
1871  /* Normal join request */
1872  ABTD_xstream_context_join(&p_xstream->ctx);
1873 
1874  ABTI_ASSERT(ABTD_atomic_acquire_load_int(&p_xstream->state) ==
1876  return ABT_SUCCESS;
1877 }
1878 
1879 static void xstream_init_main_sched(ABTI_xstream *p_xstream,
1880  ABTI_sched *p_sched)
1881 {
1882  ABTI_ASSERT(p_xstream->p_main_sched == NULL);
1883  /* Set the scheduler as a main scheduler */
1884  p_sched->used = ABTI_SCHED_MAIN;
1885  /* Set the scheduler */
1886  p_xstream->p_main_sched = p_sched;
1887 }
1888 
1889 static int xstream_update_main_sched(ABTI_global *p_global,
1890  ABTI_xstream **pp_local_xstream,
1891  ABTI_xstream *p_xstream,
1892  ABTI_sched *p_sched)
1893 {
1894  ABTI_sched *p_main_sched = p_xstream->p_main_sched;
1895  if (p_main_sched == NULL) {
1896  /* Set the scheduler as a main scheduler */
1897  p_sched->used = ABTI_SCHED_MAIN;
1898  /* Set the scheduler */
1899  p_xstream->p_main_sched = p_sched;
1900  return ABT_SUCCESS;
1901  } else if (*pp_local_xstream != p_xstream) {
1902  /* Changing the scheduler of another execution stream. */
1903  ABTI_ASSERT(p_xstream->ctx.state == ABTD_XSTREAM_CONTEXT_STATE_WAITING);
1904  /* Use the original scheduler's thread. Unit creation might fail, so it
1905  * should be done first. */
1906  ABTI_pool *p_tar_pool = ABTI_pool_get_ptr(p_sched->pools[0]);
1907  int abt_errno =
1908  ABTI_thread_set_associated_pool(p_global,
1909  &p_main_sched->p_ythread->thread,
1910  p_tar_pool);
1911  ABTI_CHECK_ERROR(abt_errno);
1912 
1913  /* Set the scheduler as a main scheduler */
1914  p_sched->used = ABTI_SCHED_MAIN;
1915  p_sched->p_ythread = p_main_sched->p_ythread;
1916  p_main_sched->p_ythread = NULL;
1917  /* p_main_sched is no longer used. */
1918  p_xstream->p_main_sched->used = ABTI_SCHED_NOT_USED;
1919  if (p_xstream->p_main_sched->automatic) {
1920  /* Free that scheduler. */
1921  ABTI_sched_free(p_global, ABTI_xstream_get_local(*pp_local_xstream),
1922  p_xstream->p_main_sched, ABT_FALSE);
1923  }
1924  p_xstream->p_main_sched = p_sched;
1925  return ABT_SUCCESS;
1926  } else {
1927  /* If the ES has a main scheduler, we have to free it */
1928  ABTI_thread *p_thread = (*pp_local_xstream)->p_thread;
1929  ABTI_ASSERT(p_thread->type & ABTI_THREAD_TYPE_YIELDABLE);
1930  ABTI_ythread *p_ythread = ABTI_thread_get_ythread(p_thread);
1931  ABTI_pool *p_tar_pool = ABTI_pool_get_ptr(p_sched->pools[0]);
1932 
1933  /* If the caller ULT is associated with a pool of the current main
1934  * scheduler, it needs to be associated to a pool of new scheduler. */
1935  size_t p;
1936  for (p = 0; p < p_main_sched->num_pools; p++) {
1937  if (p_ythread->thread.p_pool ==
1938  ABTI_pool_get_ptr(p_main_sched->pools[p])) {
1939  /* Associate the work unit to the first pool of new scheduler */
1940  int abt_errno =
1941  ABTI_thread_set_associated_pool(p_global,
1942  &p_ythread->thread,
1943  p_tar_pool);
1944  ABTI_CHECK_ERROR(abt_errno);
1945  break;
1946  }
1947  }
1948  if (p_main_sched->p_replace_sched) {
1949  /* We need to overwrite the scheduler. Free the existing one. */
1950  ABTI_ythread *p_waiter = p_main_sched->p_replace_waiter;
1951  ABTI_sched_discard_and_free(p_global,
1952  ABTI_xstream_get_local(
1953  *pp_local_xstream),
1954  p_main_sched->p_replace_sched,
1955  ABT_FALSE);
1956  p_main_sched->p_replace_sched = NULL;
1957  p_main_sched->p_replace_waiter = NULL;
1958  /* Resume the waiter. This waiter sees that the scheduler finished
1959  * immediately and was replaced by this new scheduler. */
1960  ABTI_ythread_resume_and_push(ABTI_xstream_get_local(
1961  *pp_local_xstream),
1962  p_waiter);
1963  }
1964  /* Set the replace scheduler */
1965  p_main_sched->p_replace_sched = p_sched;
1966  p_main_sched->p_replace_waiter = p_ythread;
1967 
1968  /* Switch to the current main scheduler. The current ULT is pushed to
1969  * the new scheduler's pool so that when the new scheduler starts, this
1970  * ULT can be scheduled by the new scheduler. The existing main
1971  * scheduler will be freed by ABTI_SCHED_REQ_RELEASE. */
1972  ABTI_ythread_suspend_replace_sched(pp_local_xstream, p_ythread,
1973  p_main_sched,
1975  return ABT_SUCCESS;
1976  }
1977 }
1978 
1979 static void xstream_update_max_xstreams(ABTI_global *p_global, int newrank)
1980 {
1981  /* The lock must be taken. */
1982  if (newrank >= p_global->max_xstreams) {
1983  static int max_xstreams_warning_once = 0;
1984  if (max_xstreams_warning_once == 0) {
1985  /* Because some Argobots functionalities depend on the runtime value
1986  * ABT_MAX_NUM_XSTREAMS (or p_global->max_xstreams), changing
1987  * this value at run-time can cause an error. For example, using
1988  * ABT_mutex created before updating max_xstreams causes an error
1989  * since ABTI_thread_htable's array size depends on
1990  * ABT_MAX_NUM_XSTREAMS. To fix this issue, please set a larger
1991  * number to ABT_MAX_NUM_XSTREAMS in advance. */
1992  char *warning_message;
1993  int abt_errno =
1994  ABTU_malloc(sizeof(char) * 1024, (void **)&warning_message);
1995  if (!ABTI_IS_ERROR_CHECK_ENABLED || abt_errno == ABT_SUCCESS) {
1996  snprintf(warning_message, 1024,
1997  "Warning: the number of execution streams exceeds "
1998  "ABT_MAX_NUM_XSTREAMS (=%d). This may cause an error.",
1999  p_global->max_xstreams);
2000  HANDLE_WARNING(warning_message);
2001  ABTU_free(warning_message);
2002  max_xstreams_warning_once = 1;
2003  }
2004  }
2005  /* Anyway. let's increase max_xstreams. */
2006  p_global->max_xstreams = newrank + 1;
2007  }
2008 }
2009 
2010 /* Add p_newxstream to the list. This does not check the rank duplication. */
2011 static void xstream_add_xstream_list(ABTI_global *p_global,
2012  ABTI_xstream *p_newxstream)
2013 {
2014  int rank = p_newxstream->rank;
2015  ABTI_xstream *p_prev_xstream = p_global->p_xstream_head;
2016  ABTI_xstream *p_xstream = p_prev_xstream;
2017  /* Check if a certain rank is available */
2018  while (p_xstream) {
2019  ABTI_ASSERT(p_xstream->rank != rank);
2020  if (p_xstream->rank > rank) {
2021  /* Use this p_xstream. */
2022  break;
2023  }
2024  p_prev_xstream = p_xstream;
2025  p_xstream = p_xstream->p_next;
2026  }
2027 
2028  if (!p_xstream) {
2029  /* p_newxstream is appended to p_prev_xstream */
2030  if (p_prev_xstream) {
2031  p_prev_xstream->p_next = p_newxstream;
2032  p_newxstream->p_prev = p_prev_xstream;
2033  p_newxstream->p_next = NULL;
2034  } else {
2035  ABTI_ASSERT(p_global->p_xstream_head == NULL);
2036  p_newxstream->p_prev = NULL;
2037  p_newxstream->p_next = NULL;
2038  p_global->p_xstream_head = p_newxstream;
2039  }
2040  } else {
2041  /* p_newxstream is inserted in the middle.
2042  * (p_xstream->p_prev) -> p_new_xstream -> p_xstream */
2043  if (p_xstream->p_prev) {
2044  p_xstream->p_prev->p_next = p_newxstream;
2045  p_newxstream->p_prev = p_xstream->p_prev;
2046  } else {
2047  /* This p_xstream is the first element */
2048  ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
2049  p_global->p_xstream_head = p_newxstream;
2050  }
2051  p_xstream->p_prev = p_newxstream;
2052  p_newxstream->p_next = p_xstream;
2053  }
2054 }
2055 
2056 /* Remove p_xstream from the list. */
2057 static void xstream_remove_xstream_list(ABTI_global *p_global,
2058  ABTI_xstream *p_xstream)
2059 {
2060  if (!p_xstream->p_prev) {
2061  ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
2062  p_global->p_xstream_head = p_xstream->p_next;
2063  } else {
2064  p_xstream->p_prev->p_next = p_xstream->p_next;
2065  }
2066  if (p_xstream->p_next) {
2067  p_xstream->p_next->p_prev = p_xstream->p_prev;
2068  }
2069 }
2070 
2071 /* Set a new rank to ES */
2072 static ABT_bool xstream_set_new_rank(ABTI_global *p_global,
2073  ABTI_xstream *p_newxstream, int rank)
2074 {
2075  ABTD_spinlock_acquire(&p_global->xstream_list_lock);
2076 
2077  if (rank == -1) {
2078  /* Find an unused rank from 0. */
2079  rank = 0;
2080  ABTI_xstream *p_xstream = p_global->p_xstream_head;
2081  while (p_xstream) {
2082  if (p_xstream->rank == rank) {
2083  rank++;
2084  } else {
2085  /* Use this rank. */
2086  break;
2087  }
2088  p_xstream = p_xstream->p_next;
2089  }
2090  } else {
2091  /* Check if a certain rank is available */
2092  ABTI_xstream *p_xstream = p_global->p_xstream_head;
2093  while (p_xstream) {
2094  if (p_xstream->rank == rank) {
2095  ABTD_spinlock_release(&p_global->xstream_list_lock);
2096  return ABT_FALSE;
2097  } else if (p_xstream->rank > rank) {
2098  break;
2099  }
2100  p_xstream = p_xstream->p_next;
2101  }
2102  }
2103  /* Set the rank */
2104  p_newxstream->rank = rank;
2105  xstream_add_xstream_list(p_global, p_newxstream);
2106  xstream_update_max_xstreams(p_global, rank);
2107  p_global->num_xstreams++;
2108 
2109  ABTD_spinlock_release(&p_global->xstream_list_lock);
2110  return ABT_TRUE;
2111 }
2112 
2113 /* Change the rank of ES */
2114 static ABT_bool xstream_change_rank(ABTI_global *p_global,
2115  ABTI_xstream *p_xstream, int rank)
2116 {
2117  if (p_xstream->rank == rank) {
2118  /* No need to change the rank. */
2119  return ABT_TRUE;
2120  }
2121 
2122  ABTD_spinlock_acquire(&p_global->xstream_list_lock);
2123 
2124  ABTI_xstream *p_next = p_global->p_xstream_head;
2125  /* Check if a certain rank is available. */
2126  while (p_next) {
2127  if (p_next->rank == rank) {
2128  ABTD_spinlock_release(&p_global->xstream_list_lock);
2129  return ABT_FALSE;
2130  } else if (p_next->rank > rank) {
2131  break;
2132  }
2133  p_next = p_next->p_next;
2134  }
2135  /* Let's remove p_xstream from the list first. */
2136  xstream_remove_xstream_list(p_global, p_xstream);
2137  /* Then, let's add this p_xstream. */
2138  p_xstream->rank = rank;
2139  xstream_add_xstream_list(p_global, p_xstream);
2140  xstream_update_max_xstreams(p_global, rank);
2141 
2142  ABTD_spinlock_release(&p_global->xstream_list_lock);
2143  return ABT_TRUE;
2144 }
2145 
2146 static void xstream_return_rank(ABTI_global *p_global, ABTI_xstream *p_xstream)
2147 {
2148  /* Remove this xstream from the global ES list */
2149  ABTD_spinlock_acquire(&p_global->xstream_list_lock);
2150 
2151  xstream_remove_xstream_list(p_global, p_xstream);
2152  p_global->num_xstreams--;
2153 
2154  ABTD_spinlock_release(&p_global->xstream_list_lock);
2155 }
ABT_xstream_set_affinity
int ABT_xstream_set_affinity(ABT_xstream xstream, int num_cpuids, int *cpuids)
Bind an execution stream to target CPUs.
Definition: stream.c:1485
ABT_THREAD_STATE_TERMINATED
@ ABT_THREAD_STATE_TERMINATED
Definition: abt.h:433
HANDLE_WARNING
#define HANDLE_WARNING(msg)
Definition: abti_error.h:46
ABTU_min_int
static int ABTU_min_int(int a, int b)
Definition: abtu.h:45
ABT_ERR_INV_THREAD
#define ABT_ERR_INV_THREAD
Error code: invalid work unit.
Definition: abt.h:186
ABT_sched_predef
ABT_sched_predef
Predefined scheduler type.
Definition: abt.h:475
xstream_join
static ABTU_ret_err int xstream_join(ABTI_local **pp_local, ABTI_xstream *p_xstream)
Definition: stream.c:1860
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1043
ABT_ERR_CPUID
#define ABT_ERR_CPUID
Error code: error related to CPU ID.
Definition: abt.h:408
xstream_update_max_xstreams
static void xstream_update_max_xstreams(ABTI_global *p_global, int newrank)
Definition: stream.c:1984
ABT_xstream_self
int ABT_xstream_self(ABT_xstream *xstream)
Get an execution stream that is running the calling work unit.
Definition: stream.c:640
ABT_ERR_INV_XSTREAM
#define ABT_ERR_INV_XSTREAM
Error code: invalid execution stream.
Definition: abt.h:114
xstream_change_rank
static ABT_bool xstream_change_rank(ABTI_global *p_global, ABTI_xstream *p_xstream, int rank)
Definition: stream.c:2119
ABT_xstream_set_rank
int ABT_xstream_set_rank(ABT_xstream xstream, int rank)
Set a rank for an execution stream.
Definition: stream.c:738
ABT_xstream_get_num
int ABT_xstream_get_num(int *num_xstreams)
Get the number of current existing execution streams.
Definition: stream.c:1201
ABT_xstream_get_affinity
int ABT_xstream_get_affinity(ABT_xstream xstream, int max_cpuids, int *cpuids, int *num_cpuids)
Get CPU IDs of CPUs to which an execution stream is bound.
Definition: stream.c:1552
ABT_sched_config
struct ABT_sched_config_opaque * ABT_sched_config
Scheduler configuration handle type.
Definition: abt.h:852
ABT_SCHED_DEFAULT
@ ABT_SCHED_DEFAULT
Definition: abt.h:477
xstream_return_rank
static void xstream_return_rank(ABTI_global *p_global, ABTI_xstream *p_xstream)
Definition: stream.c:2151
xstream_remove_xstream_list
static void xstream_remove_xstream_list(ABTI_global *p_global, ABTI_xstream *p_xstream)
Definition: stream.c:2062
ABT_xstream_cancel
int ABT_xstream_cancel(ABT_xstream xstream)
Send a cancellation request to an execution stream.
Definition: stream.c:593
ABT_xstream_create
int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
Create a new execution stream.
Definition: stream.c:76
ABT_xstream_get_main_sched
int ABT_xstream_get_main_sched(ABT_xstream xstream, ABT_sched *sched)
Retrieve the main scheduler of an execution stream.
Definition: stream.c:1044
ABT_pool
struct ABT_pool_opaque * ABT_pool
Pool handle type.
Definition: abt.h:878
ABT_xstream_get_rank
int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
Retrieve a rank of an execution stream.
Definition: stream.c:785
ABT_xstream_self_rank
int ABT_xstream_self_rank(int *rank)
Return a rank of an execution stream associated with a caller.
Definition: stream.c:687
ABT_POOL_ACCESS_MPSC
@ ABT_POOL_ACCESS_MPSC
Definition: abt.h:569
xstream_update_main_sched
static ABTU_ret_err int xstream_update_main_sched(ABTI_global *p_global, ABTI_xstream **pp_local_xstream, ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:1894
ABTU_unreachable
#define ABTU_unreachable()
Definition: abtu.h:133
ABT_xstream_is_primary
int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *is_primary)
Check if the target execution stream is primary.
Definition: stream.c:1239
ABT_sched
struct ABT_sched_opaque * ABT_sched
Scheduler handle type.
Definition: abt.h:845
ABT_ERR_INV_SCHED
#define ABT_ERR_INV_SCHED
Error code: invalid scheduler.
Definition: abt.h:129
ABT_POOL_NULL
#define ABT_POOL_NULL
Definition: abt.h:1102
xstream_init_main_sched
static void xstream_init_main_sched(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:1884
abti.h
xstream_create
static ABTU_ret_err int xstream_create(ABTI_global *p_global, ABTI_sched *p_sched, ABTI_xstream_type xstream_type, int rank, ABT_bool start, ABTI_xstream **pp_xstream)
Definition: stream.c:1754
ABT_SYNC_EVENT_TYPE_OTHER
@ ABT_SYNC_EVENT_TYPE_OTHER
Definition: abt.h:702
ABT_xstream
struct ABT_xstream_opaque * ABT_xstream
Execution stream handle type.
Definition: abt.h:826
ABT_xstream_create_basic
int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABT_xstream *newxstream)
Create a new execution stream with a predefined scheduler.
Definition: stream.c:173
ABT_xstream_create_with_rank
int ABT_xstream_create_with_rank(ABT_sched sched, int rank, ABT_xstream *newxstream)
Create a new execution stream with a specific rank.
Definition: stream.c:270
xstream_set_new_rank
static ABT_bool xstream_set_new_rank(ABTI_global *p_global, ABTI_xstream *p_newxstream, int rank)
Definition: stream.c:2077
ABTU_malloc
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:235
ABT_xstream_check_events
int ABT_xstream_check_events(ABT_sched sched)
Process events associated with a scheduler.
Definition: stream.c:1334
ABT_ERR_INV_UNIT
#define ABT_ERR_INV_UNIT
Error code: invalid work unit for scheduling.
Definition: abt.h:181
ABT_ERR_XSTREAM_STATE
#define ABT_ERR_XSTREAM_STATE
Error code: error related to an execution stream state.
Definition: abt.h:272
ABT_POOL_FIFO
@ ABT_POOL_FIFO
Definition: abt.h:516
ABT_xstream_set_main_sched_basic
int ABT_xstream_set_main_sched_basic(ABT_xstream xstream, ABT_sched_predef predef, int num_pools, ABT_pool *pools)
Set the main scheduler of an execution stream to a predefined scheduler.
Definition: stream.c:982
ABT_xstream_get_main_pools
int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools, ABT_pool *pools)
Get pools associated with the main scheduler of an execution stream.
Definition: stream.c:1084
ABT_xstream_set_cpubind
int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
Bind an execution stream to a target CPU.
Definition: stream.c:1381
ABT_unit
struct ABT_unit_opaque * ABT_unit
Work unit handle type for scheduling.
Definition: abt.h:911
ABT_xstream_exit
int ABT_xstream_exit(void)
Terminate an execution stream that is running the calling ULT.
Definition: stream.c:541
ABT_xstream_join
int ABT_xstream_join(ABT_xstream xstream)
Wait for an execution stream to terminate.
Definition: stream.c:494
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:155
ABT_xstream_get_cpubind
int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
Get CPU ID of a CPU to which an execution stream is bound.
Definition: stream.c:1433
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:784
ABT_XSTREAM_STATE_RUNNING
@ ABT_XSTREAM_STATE_RUNNING
Definition: abt.h:416
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:786
ABT_xstream_free
int ABT_xstream_free(ABT_xstream *xstream)
Free an execution stream.
Definition: stream.c:427
ABT_ERR_INV_ARG
#define ABT_ERR_INV_ARG
Error code: invalid user argument.
Definition: abt.h:260
ABT_xstream_get_state
int ABT_xstream_get_state(ABT_xstream xstream, ABT_xstream_state *state)
Get a state of an execution stream.
Definition: stream.c:1123
ABTU_free
static void ABTU_free(void *ptr)
Definition: abtu.h:228
ABT_xstream_set_main_sched
int ABT_xstream_set_main_sched(ABT_xstream xstream, ABT_sched sched)
Set the main scheduler of an execution stream.
Definition: stream.c:877
xstream_add_xstream_list
static void xstream_add_xstream_list(ABTI_global *p_global, ABTI_xstream *p_newxstream)
Definition: stream.c:2016
ABT_XSTREAM_NULL
#define ABT_XSTREAM_NULL
Definition: abt.h:1098
xstream_launch_root_ythread
static void * xstream_launch_root_ythread(void *p_xstream)
Definition: stream.c:1728
ABT_ERR_INV_XSTREAM_RANK
#define ABT_ERR_INV_XSTREAM_RANK
Error code: invalid execution stream rank.
Definition: abt.h:119
ABT_xstream_revive
int ABT_xstream_revive(ABT_xstream xstream)
Revive a terminated execution stream.
Definition: stream.c:347
ABT_xstream_run_unit
int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool)
Execute a work unit.
Definition: stream.c:1281
ABT_UNIT_NULL
#define ABT_UNIT_NULL
Definition: abt.h:1104
ABT_xstream_equal
int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2, ABT_bool *result)
Compare two execution stream handles for equality.
Definition: stream.c:1162
ABT_XSTREAM_STATE_TERMINATED
@ ABT_XSTREAM_STATE_TERMINATED
Definition: abt.h:418
ABT_xstream_state
ABT_xstream_state
State of an execution stream.
Definition: abt.h:414