9 ABTI_xstream_type xstream_type,
int rank,
10 ABTI_xstream **pp_xstream);
13 ABTI_xstream *p_xstream);
17 ABTI_ythread *p_ythread);
24 ABTI_xstream *p_xstream, ABTI_sched *p_sched);
26 #ifndef ABT_CONFIG_DISABLE_MIGRATION
28 ABTI_thread *p_thread);
50 ABTI_xstream *p_newxstream;
55 ABTI_CHECK_ERROR(abt_errno);
57 p_sched = ABTI_sched_get_ptr(sched);
58 ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
63 xstream_create(p_sched, ABTI_XSTREAM_TYPE_SECONDARY, -1, &p_newxstream);
64 ABTI_CHECK_ERROR(abt_errno);
68 ABTI_CHECK_ERROR(abt_errno);
71 *newxstream = ABTI_xstream_get_handle(p_newxstream);
96 ABTI_xstream *p_newxstream;
100 ABTI_sched_create_basic(predef, num_pools, pools, config, &p_sched);
101 ABTI_CHECK_ERROR(abt_errno);
104 xstream_create(p_sched, ABTI_XSTREAM_TYPE_SECONDARY, -1, &p_newxstream);
105 ABTI_CHECK_ERROR(abt_errno);
109 ABTI_CHECK_ERROR(abt_errno);
111 *newxstream = ABTI_xstream_get_handle(p_newxstream);
133 ABTI_xstream *p_newxstream;
140 ABTI_CHECK_ERROR(abt_errno);
142 p_sched = ABTI_sched_get_ptr(sched);
143 ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
147 abt_errno =
xstream_create(p_sched, ABTI_XSTREAM_TYPE_SECONDARY, rank,
149 if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno !=
ABT_SUCCESS) {
151 ABTI_sched_free(ABTI_local_get_local_uninlined(), p_sched,
153 ABTI_HANDLE_ERROR(abt_errno);
158 ABTI_CHECK_ERROR(abt_errno);
161 *newxstream = ABTI_xstream_get_handle(p_newxstream);
175 ABTI_local *p_local = ABTI_local_get_local();
176 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
177 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
180 ABTI_sched *p_main_sched = p_xstream->p_main_sched;
181 ABTI_ythread *p_main_sched_ythread = p_main_sched->p_ythread;
182 ABTI_CHECK_TRUE(ABTD_atomic_relaxed_load_int(
183 &p_main_sched_ythread->thread.state) ==
187 ABTD_atomic_relaxed_store_uint32(&p_main_sched->request, 0);
188 ABTI_tool_event_thread_join(p_local, &p_main_sched_ythread->thread,
189 ABTI_local_get_xstream_or_null(p_local)
190 ? ABTI_local_get_xstream(p_local)->p_thread
193 ABTI_thread_revive(p_local, p_xstream->p_root_pool,
194 p_main_sched_ythread->thread.f_thread,
195 p_main_sched_ythread->thread.p_arg,
196 &p_main_sched_ythread->thread);
199 ABTD_xstream_context_revive(&p_xstream->ctx);
219 ABTI_local *p_local = ABTI_local_get_local();
222 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(h_xstream);
223 if (p_xstream == NULL)
228 ABTI_CHECK_TRUE_MSG(p_xstream != ABTI_local_get_xstream_or_null(p_local),
230 "The current xstream cannot be freed.");
232 ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
234 "The primary xstream cannot be freed explicitly.");
238 ABTI_CHECK_ERROR(abt_errno);
241 ABTI_xstream_free(p_local, p_xstream,
ABT_FALSE);
263 ABTI_local *p_local = ABTI_local_get_local();
264 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
265 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
268 ABTI_CHECK_ERROR(abt_errno);
287 ABTI_xstream *p_local_xstream;
288 ABTI_ythread *p_ythread;
289 ABTI_SETUP_LOCAL_YTHREAD_WITH_INIT_CHECK(&p_local_xstream, &p_ythread);
292 ABTD_atomic_fetch_or_uint32(&p_local_xstream->p_main_sched->p_ythread
294 ABTI_THREAD_REQ_TERMINATE);
296 ABTI_ythread_exit(p_local_xstream, p_ythread);
311 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
313 ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
315 "The primary xstream cannot be canceled.");
318 ABTD_atomic_fetch_or_uint32(&p_xstream->p_main_sched->p_ythread->thread
320 ABTI_THREAD_REQ_TERMINATE);
345 ABTI_xstream *p_local_xstream;
346 ABTI_SETUP_LOCAL_XSTREAM_WITH_INIT_CHECK(&p_local_xstream);
349 *xstream = ABTI_xstream_get_handle(p_local_xstream);
365 ABTI_xstream *p_local_xstream;
366 ABTI_SETUP_LOCAL_XSTREAM_WITH_INIT_CHECK(&p_local_xstream);
369 *rank = (int)p_local_xstream->rank;
384 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
385 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
387 p_xstream->rank = rank;
391 ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
407 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
408 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
410 *rank = (int)p_xstream->rank;
448 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
449 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
451 ABTI_xstream *p_local_xstream;
452 ABTI_ythread *p_self;
453 ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, &p_self);
460 if (ABTD_atomic_acquire_load_int(&p_xstream->state) ==
462 if (p_self->thread.p_last_xstream != p_xstream) {
469 if (p_xstream->p_main_sched) {
472 if (ABTI_sched_get_effective_size(ABTI_xstream_get_local(
474 p_xstream->p_main_sched) > 0) {
483 ABTI_CHECK_ERROR(abt_errno);
485 p_sched = ABTI_sched_get_ptr(sched);
486 ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
491 ABTI_CHECK_ERROR(abt_errno);
516 ABTI_xstream *p_local_xstream;
517 ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
519 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
520 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
523 abt_errno = ABTI_sched_create_basic(predef, num_pools, pools,
525 ABTI_CHECK_ERROR(abt_errno);
528 ABTI_CHECK_ERROR(abt_errno);
546 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
547 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
549 *sched = ABTI_sched_get_handle(p_xstream->p_main_sched);
569 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
570 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
572 ABTI_sched *p_sched = p_xstream->p_main_sched;
573 max_pools = p_sched->num_pools > max_pools ? max_pools : p_sched->num_pools;
574 memcpy(pools, p_sched->pools,
sizeof(
ABT_pool) * max_pools);
589 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
590 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
614 ABTI_xstream *p_xstream1 = ABTI_xstream_get_ptr(xstream1);
615 ABTI_xstream *p_xstream2 = ABTI_xstream_get_ptr(xstream2);
636 ABTI_SETUP_WITH_INIT_CHECK();
658 ABTI_xstream *p_xstream;
660 p_xstream = ABTI_xstream_get_ptr(xstream);
661 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
688 ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
690 ABTI_xstream *p_local_xstream;
691 ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
693 ABTI_xstream_run_unit(&p_local_xstream, unit, p_pool);
711 ABTI_xstream *p_local_xstream;
712 ABTI_SETUP_LOCAL_XSTREAM_WITH_INIT_CHECK(&p_local_xstream);
714 ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
715 ABTI_CHECK_NULL_SCHED_PTR(p_sched);
717 ABTI_xstream_check_events(p_local_xstream, p_sched);
736 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
737 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
739 ABTD_affinity_cpuset cpuset;
740 cpuset.num_cpuids = 1;
741 cpuset.cpuids = &cpuid;
742 int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &cpuset);
744 ABTI_CHECK_ERROR(abt_errno);
764 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
765 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
767 ABTD_affinity_cpuset cpuset;
768 cpuset.num_cpuids = 0;
769 cpuset.cpuids = NULL;
770 abt_errno = ABTD_affinity_cpuset_read(&p_xstream->ctx, &cpuset);
771 ABTI_CHECK_ERROR(abt_errno);
773 if (cpuset.num_cpuids != 0) {
774 *cpuid = cpuset.cpuids[0];
778 ABTD_affinity_cpuset_destroy(&cpuset);
779 ABTI_CHECK_ERROR(abt_errno);
799 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
800 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
802 ABTD_affinity_cpuset affinity;
803 affinity.num_cpuids = cpuset_size;
804 affinity.cpuids = cpuset;
805 int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &affinity);
807 ABTI_CHECK_ERROR(abt_errno);
834 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
835 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
837 ABTD_affinity_cpuset affinity;
838 int abt_errno = ABTD_affinity_cpuset_read(&p_xstream->ctx, &affinity);
839 ABTI_CHECK_ERROR(abt_errno);
842 n = affinity.num_cpuids > cpuset_size ? cpuset_size : affinity.num_cpuids;
844 for (i = 0; i < n; i++) {
845 cpuset[i] = affinity.cpuids[i];
847 ABTD_affinity_cpuset_destroy(&affinity);
855 ABTU_ret_err int ABTI_xstream_create_primary(ABTI_xstream **pp_xstream)
858 ABTI_xstream *p_newxstream;
864 ABTI_CHECK_ERROR(abt_errno);
867 xstream_create(p_sched, ABTI_XSTREAM_TYPE_PRIMARY, -1, &p_newxstream);
868 ABTI_CHECK_ERROR(abt_errno);
870 *pp_xstream = p_newxstream;
875 void ABTI_xstream_start_primary(ABTI_xstream **pp_local_xstream,
876 ABTI_xstream *p_xstream,
877 ABTI_ythread *p_ythread)
880 ABTI_ASSERT(p_ythread->thread.type & ABTI_THREAD_TYPE_MAIN);
882 ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
885 LOG_DEBUG(
"[E%d] start\n", p_xstream->rank);
887 ABTD_xstream_context_set_self(&p_xstream->ctx);
891 ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
895 p_xstream->p_root_ythread->thread.p_last_xstream = p_xstream;
896 ABTD_ythread_context_switch(&p_ythread->ctx,
897 &p_xstream->p_root_ythread->ctx);
900 (*pp_local_xstream)->p_thread = &p_ythread->thread;
903 void ABTI_xstream_run_unit(ABTI_xstream **pp_local_xstream,
ABT_unit unit,
909 ABT_thread thread = p_pool->u_get_thread(unit);
910 ABTI_ythread *p_ythread = ABTI_ythread_get_ptr(thread);
915 ABT_task task = p_pool->u_get_task(unit);
916 ABTI_thread *p_task = ABTI_thread_get_ptr(task);
922 void ABTI_xstream_check_events(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
924 ABTI_info_check_print_all_thread_stacks();
926 uint32_t request = ABTD_atomic_acquire_load_uint32(
927 &p_xstream->p_main_sched->p_ythread->thread.request);
928 if (request & ABTI_THREAD_REQ_JOIN) {
929 ABTI_sched_finish(p_sched);
932 if ((request & ABTI_THREAD_REQ_TERMINATE) ||
933 (request & ABTI_THREAD_REQ_CANCEL)) {
934 ABTI_sched_exit(p_sched);
938 void ABTI_xstream_free(ABTI_local *p_local, ABTI_xstream *p_xstream,
941 LOG_DEBUG(
"[E%d] freed\n", p_xstream->rank);
944 ABTI_mem_finalize_local(p_xstream);
950 ABTI_sched *p_cursched = p_xstream->p_main_sched;
951 if (p_cursched != NULL) {
953 ABTI_tool_event_thread_join(p_local, &p_cursched->p_ythread->thread,
954 ABTI_local_get_xstream_or_null(p_local)
955 ? ABTI_local_get_xstream(p_local)
958 ABTI_sched_discard_and_free(p_local, p_cursched, force_free);
963 ABTI_ythread_free_root(p_local, p_xstream->p_root_ythread);
964 ABTI_pool_free(p_xstream->p_root_pool);
967 if (p_xstream->type == ABTI_XSTREAM_TYPE_SECONDARY) {
968 ABTD_xstream_context_free(&p_xstream->ctx);
974 void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os,
int indent,
977 if (p_xstream == NULL) {
978 fprintf(p_os,
"%*s== NULL ES ==\n", indent,
"");
980 const char *type, *state;
981 switch (p_xstream->type) {
982 case ABTI_XSTREAM_TYPE_PRIMARY:
985 case ABTI_XSTREAM_TYPE_SECONDARY:
992 switch (ABTD_atomic_acquire_load_int(&p_xstream->state)) {
997 state =
"TERMINATED";
1005 "%*s== ES (%p) ==\n"
1009 "%*smain_sched: %p\n",
1010 indent,
"", (
void *)p_xstream, indent,
"", p_xstream->rank,
1011 indent,
"", type, indent,
"", state, indent,
"",
1012 (
void *)p_xstream->p_main_sched);
1015 ABTI_sched_print(p_xstream->p_main_sched, p_os,
1024 ABTI_xstream *p_local_xstream = (ABTI_xstream *)p_xstream;
1027 ABTI_local_set_xstream(p_local_xstream);
1029 LOG_DEBUG(
"[E%d] start\n", p_local_xstream->rank);
1032 ABTI_ythread *p_root_ythread = p_local_xstream->p_root_ythread;
1033 p_local_xstream->p_thread = &p_local_xstream->p_root_ythread->thread;
1034 p_root_ythread->thread.f_thread(p_root_ythread->thread.p_arg);
1036 LOG_DEBUG(
"[E%d] end\n", p_local_xstream->rank);
1039 ABTI_local_set_xstream(NULL);
1048 ABTI_xstream_type xstream_type,
int rank,
1049 ABTI_xstream **pp_xstream)
1052 ABTI_xstream *p_newxstream;
1054 abt_errno =
ABTU_malloc(
sizeof(ABTI_xstream), (
void **)&p_newxstream);
1055 ABTI_CHECK_ERROR(abt_errno);
1057 p_newxstream->p_prev = NULL;
1058 p_newxstream->p_next = NULL;
1065 p_newxstream->type = xstream_type;
1066 ABTD_atomic_relaxed_store_int(&p_newxstream->state,
1068 p_newxstream->p_main_sched = NULL;
1069 p_newxstream->p_thread = NULL;
1070 ABTI_mem_init_local(p_newxstream);
1077 ABTI_ythread_create_root(ABTI_xstream_get_local(p_newxstream),
1078 p_newxstream, &p_newxstream->p_root_ythread);
1079 ABTI_CHECK_ERROR(abt_errno);
1084 ABTI_CHECK_ERROR(abt_errno);
1088 ABTI_ythread_create_main_sched(ABTI_xstream_get_local(p_newxstream),
1090 p_newxstream->p_main_sched);
1091 ABTI_CHECK_ERROR(abt_errno);
1093 LOG_DEBUG(
"[E%d] created\n", p_newxstream->rank);
1096 *pp_xstream = p_newxstream;
1103 ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
1105 ABTI_ASSERT(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY);
1109 (
void *)p_xstream, &p_xstream->ctx);
1110 ABTI_CHECK_ERROR(abt_errno);
1114 ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
1120 ABTI_xstream *p_xstream)
1123 ABTI_CHECK_TRUE(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
1126 ABTI_CHECK_TRUE(!ABTI_local_get_xstream_or_null(*pp_local) ||
1127 &p_xstream->p_main_sched->p_ythread->thread !=
1128 ABTI_local_get_xstream(*pp_local)->p_thread,
1132 ABTI_sched_finish(p_xstream->p_main_sched);
1133 ABTI_thread_join(pp_local, &p_xstream->p_main_sched->p_ythread->thread);
1136 ABTD_xstream_context_join(&p_xstream->ctx);
1138 ABTI_ASSERT(ABTD_atomic_acquire_load_int(&p_xstream->state) ==
1144 ABTI_ythread *p_ythread)
1146 ABTI_xstream *p_local_xstream = *pp_local_xstream;
1148 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1149 if (ABTD_atomic_acquire_load_uint32(&p_ythread->thread.request) &
1150 ABTI_THREAD_REQ_CANCEL) {
1151 LOG_DEBUG(
"[U%" PRIu64
":E%d] canceled\n",
1152 ABTI_thread_get_id(&p_ythread->thread),
1153 p_local_xstream->rank);
1154 ABTD_ythread_cancel(p_local_xstream, p_ythread);
1155 ABTI_xstream_terminate_thread(ABTI_xstream_get_local(p_local_xstream),
1156 &p_ythread->thread);
1161 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1162 if (ABTD_atomic_acquire_load_uint32(&p_ythread->thread.request) &
1163 ABTI_THREAD_REQ_MIGRATE) {
1166 &p_ythread->thread);
1167 if (!ABTI_IS_ERROR_CHECK_ENABLED || abt_errno ==
ABT_SUCCESS) {
1175 p_ythread->thread.p_last_xstream = p_local_xstream;
1178 ABTD_atomic_release_store_int(&p_ythread->thread.state,
1182 LOG_DEBUG(
"[U%" PRIu64
":E%d] start running\n",
1183 ABTI_thread_get_id(&p_ythread->thread), p_local_xstream->rank);
1187 ABTI_ythread *p_self = ABTI_thread_get_ythread(p_local_xstream->p_thread);
1188 p_ythread = ABTI_ythread_context_switch_to_child(pp_local_xstream, p_self,
1193 p_local_xstream = *pp_local_xstream;
1195 LOG_DEBUG(
"[U%" PRIu64
":E%d] stopped\n",
1196 ABTI_thread_get_id(&p_ythread->thread), p_local_xstream->rank);
1203 ABTD_atomic_acquire_load_uint32(&p_ythread->thread.request);
1204 if (request & ABTI_THREAD_REQ_TERMINATE) {
1206 LOG_DEBUG(
"[U%" PRIu64
":E%d] finished\n",
1207 ABTI_thread_get_id(&p_ythread->thread),
1208 p_local_xstream->rank);
1209 ABTI_xstream_terminate_thread(ABTI_xstream_get_local(p_local_xstream),
1210 &p_ythread->thread);
1211 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1212 }
else if (request & ABTI_THREAD_REQ_CANCEL) {
1213 LOG_DEBUG(
"[U%" PRIu64
":E%d] canceled\n",
1214 ABTI_thread_get_id(&p_ythread->thread),
1215 p_local_xstream->rank);
1216 ABTD_ythread_cancel(p_local_xstream, p_ythread);
1217 ABTI_xstream_terminate_thread(ABTI_xstream_get_local(p_local_xstream),
1218 &p_ythread->thread);
1220 }
else if (!(request & ABTI_THREAD_REQ_NON_YIELD)) {
1224 ABTI_pool_add_thread(&p_ythread->thread);
1225 }
else if (request & ABTI_THREAD_REQ_BLOCK) {
1226 LOG_DEBUG(
"[U%" PRIu64
":E%d] check blocked\n",
1227 ABTI_thread_get_id(&p_ythread->thread),
1228 p_local_xstream->rank);
1229 ABTI_thread_unset_request(&p_ythread->thread, ABTI_THREAD_REQ_BLOCK);
1230 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1231 }
else if (request & ABTI_THREAD_REQ_MIGRATE) {
1235 &p_ythread->thread);
1239 }
else if (request & ABTI_THREAD_REQ_ORPHAN) {
1242 LOG_DEBUG(
"[U%" PRIu64
":E%d] orphaned\n",
1243 ABTI_thread_get_id(&p_ythread->thread),
1244 p_local_xstream->rank);
1245 ABTI_thread_unset_request(&p_ythread->thread, ABTI_THREAD_REQ_ORPHAN);
1246 p_ythread->thread.p_pool->u_free(&p_ythread->thread.unit);
1247 p_ythread->thread.p_pool = NULL;
1248 }
else if (request & ABTI_THREAD_REQ_NOPUSH) {
1250 LOG_DEBUG(
"[U%" PRIu64
":E%d] not pushed\n",
1251 ABTI_thread_get_id(&p_ythread->thread),
1252 p_local_xstream->rank);
1253 ABTI_thread_unset_request(&p_ythread->thread, ABTI_THREAD_REQ_NOPUSH);
1261 ABTI_thread *p_task)
1263 #ifndef ABT_CONFIG_DISABLE_TASK_CANCEL
1264 if (ABTD_atomic_acquire_load_uint32(&p_task->request) &
1265 ABTI_THREAD_REQ_CANCEL) {
1266 ABTI_tool_event_thread_cancel(p_local_xstream, p_task);
1267 ABTI_xstream_terminate_thread(ABTI_xstream_get_local(p_local_xstream),
1277 p_task->p_last_xstream = p_local_xstream;
1280 LOG_DEBUG(
"[T%" PRIu64
":E%d] running\n", ABTI_thread_get_id(p_task),
1281 p_local_xstream->rank);
1283 ABTI_thread *p_sched_thread = p_local_xstream->p_thread;
1284 p_local_xstream->p_thread = p_task;
1285 p_task->p_parent = p_sched_thread;
1288 ABTI_tool_event_thread_run(p_local_xstream, p_task, p_sched_thread,
1290 LOG_DEBUG(
"[T%" PRIu64
":E%d] running\n", ABTI_thread_get_id(p_task),
1291 p_local_xstream->rank);
1292 p_task->f_thread(p_task->p_arg);
1293 ABTI_tool_event_thread_finish(p_local_xstream, p_task, p_sched_thread);
1294 LOG_DEBUG(
"[T%" PRIu64
":E%d] stopped\n", ABTI_thread_get_id(p_task),
1295 p_local_xstream->rank);
1298 p_local_xstream->p_thread = p_sched_thread;
1301 ABTI_xstream_terminate_thread(ABTI_xstream_get_local(p_local_xstream),
1305 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1307 ABTI_thread *p_thread)
1312 ABTI_thread_mig_data *p_mig_data;
1313 abt_errno = ABTI_thread_get_mig_data(p_local, p_thread, &p_mig_data);
1314 ABTI_CHECK_ERROR(abt_errno);
1317 if (p_mig_data->f_migration_cb) {
1318 ABTI_ythread *p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
1320 ABT_thread thread = ABTI_ythread_get_handle(p_ythread);
1321 p_mig_data->f_migration_cb(thread, p_mig_data->p_migration_cb_arg);
1326 ABTI_ASSERT(ABTD_atomic_acquire_load_uint32(&p_thread->request) &
1327 ABTI_THREAD_REQ_MIGRATE);
1330 p_pool = ABTD_atomic_relaxed_load_ptr(&p_mig_data->p_migration_pool);
1331 ABTI_thread_unset_request(p_thread, ABTI_THREAD_REQ_MIGRATE);
1334 p_thread->p_pool = p_pool;
1337 ABTI_pool_push(p_pool, p_thread->unit);
1339 ABTI_pool_dec_num_migrations(p_pool);
1346 ABTI_sched *p_sched)
1348 ABTI_ASSERT(p_xstream->p_main_sched == NULL);
1352 p_sched->used = ABTI_SCHED_MAIN;
1354 p_xstream->p_main_sched = p_sched;
1359 ABTI_xstream *p_xstream, ABTI_sched *p_sched)
1361 ABTI_ythread *p_ythread = NULL;
1362 ABTI_sched *p_main_sched;
1363 ABTI_pool *p_tar_pool = NULL;
1370 p_sched->used = ABTI_SCHED_MAIN;
1372 p_main_sched = p_xstream->p_main_sched;
1373 if (p_main_sched == NULL) {
1375 p_xstream->p_main_sched = p_sched;
1380 ABTI_CHECK_YIELDABLE((*pp_local_xstream)->p_thread, &p_ythread,
1382 p_tar_pool = ABTI_pool_get_ptr(p_sched->pools[0]);
1386 for (p = 0; p < p_main_sched->num_pools; p++) {
1387 if (p_ythread->thread.p_pool ==
1388 ABTI_pool_get_ptr(p_main_sched->pools[p])) {
1390 p_ythread->thread.p_pool->u_free(&p_ythread->thread.unit);
1391 ABT_thread h_thread = ABTI_ythread_get_handle(p_ythread);
1392 p_ythread->thread.unit = p_tar_pool->u_create_from_thread(h_thread);
1393 p_ythread->thread.p_pool = p_tar_pool;
1397 if (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) {
1398 ABTI_CHECK_TRUE(p_ythread->thread.type & ABTI_THREAD_TYPE_MAIN,
1408 ABTI_sched_set_request(p_main_sched, ABTI_SCHED_REQ_FINISH);
1412 p_sched->p_ythread = p_main_sched->p_ythread;
1417 ABTI_pool_push(p_tar_pool, p_ythread->thread.unit);
1420 p_xstream->p_main_sched = p_sched;
1423 ABTI_thread_set_request(&p_ythread->thread, ABTI_THREAD_REQ_NOPUSH);
1424 ABTI_ythread_context_switch_to_parent(pp_local_xstream, p_ythread,
1429 p_main_sched->p_ythread = NULL;
1430 ABTI_sched_discard_and_free(ABTI_xstream_get_local(*pp_local_xstream),
1440 ABTI_spinlock_acquire(&p_global->xstream_list_lock);
1442 ABTI_xstream *p_prev_xstream = p_global->p_xstream_head;
1443 ABTI_xstream *p_xstream = p_prev_xstream;
1448 if (p_xstream->rank == rank) {
1454 p_prev_xstream = p_xstream;
1455 p_xstream = p_xstream->p_next;
1460 if (p_xstream->rank == rank) {
1461 ABTI_spinlock_release(&p_global->xstream_list_lock);
1463 }
else if (p_xstream->rank > rank) {
1467 p_prev_xstream = p_xstream;
1468 p_xstream = p_xstream->p_next;
1473 if (p_prev_xstream) {
1474 p_prev_xstream->p_next = p_newxstream;
1475 p_newxstream->p_prev = p_prev_xstream;
1476 p_newxstream->p_next = NULL;
1478 ABTI_ASSERT(p_global->p_xstream_head == NULL);
1479 p_newxstream->p_prev = NULL;
1480 p_newxstream->p_next = NULL;
1481 p_global->p_xstream_head = p_newxstream;
1486 if (p_xstream->p_prev) {
1487 p_xstream->p_prev->p_next = p_newxstream;
1488 p_newxstream->p_prev = p_xstream->p_prev;
1491 ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
1492 p_global->p_xstream_head = p_newxstream;
1494 p_xstream->p_prev = p_newxstream;
1495 p_newxstream->p_next = p_xstream;
1497 p_global->num_xstreams++;
1498 if (rank >= p_global->max_xstreams) {
1499 static int max_xstreams_warning_once = 0;
1500 if (max_xstreams_warning_once == 0) {
1508 char *warning_message;
1510 ABTU_malloc(
sizeof(
char) * 1024, (
void **)&warning_message);
1511 if (!ABTI_IS_ERROR_CHECK_ENABLED || abt_errno ==
ABT_SUCCESS) {
1512 snprintf(warning_message, 1024,
1513 "Warning: the number of execution streams exceeds "
1514 "ABT_MAX_NUM_XSTREAMS (=%d). This may cause an error.",
1515 p_global->max_xstreams);
1518 max_xstreams_warning_once = 1;
1522 p_global->max_xstreams = rank + 1;
1525 ABTI_spinlock_release(&p_global->xstream_list_lock);
1528 p_newxstream->rank = rank;
1536 ABTI_spinlock_acquire(&p_global->xstream_list_lock);
1537 if (!p_xstream->p_prev) {
1538 ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
1539 p_global->p_xstream_head = p_xstream->p_next;
1541 p_xstream->p_prev->p_next = p_xstream->p_next;
1543 if (p_xstream->p_next) {
1544 p_xstream->p_next->p_prev = p_xstream->p_prev;
1546 p_global->num_xstreams--;
1547 ABTI_spinlock_release(&p_global->xstream_list_lock);
struct ABT_unit_opaque * ABT_unit
static void xstream_return_rank(ABTI_xstream *p_xstream)
int ABT_xstream_self(ABT_xstream *xstream) ABT_API_PUBLIC
Return the ES handle associated with the caller work unit.
struct ABT_xstream_opaque * ABT_xstream
struct ABT_sched_opaque * ABT_sched
#define ABTU_unreachable()
static void xstream_init_main_sched(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
int ABT_xstream_get_num(int *num_xstreams) ABT_API_PUBLIC
Return the number of current existing ESs.
static void xstream_schedule_task(ABTI_xstream *p_local_xstream, ABTI_thread *p_task)
int ABT_xstream_set_rank(ABT_xstream xstream, int rank) ABT_API_PUBLIC
Set the rank for target ES.
struct ABT_thread_opaque * ABT_task
#define ABT_ERR_INV_THREAD
int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream) ABT_API_PUBLIC
Create a new ES and return its handle through newxstream.
int ABT_xstream_self_rank(int *rank) ABT_API_PUBLIC
Return the rank of ES associated with the caller work unit.
int ABT_xstream_get_rank(ABT_xstream xstream, int *rank) ABT_API_PUBLIC
Return the rank of ES.
static ABT_bool xstream_set_new_rank(ABTI_xstream *p_newxstream, int rank)
int ABT_xstream_cancel(ABT_xstream xstream) ABT_API_PUBLIC
Request the cancellation of the target ES.
int ABT_xstream_get_main_sched(ABT_xstream xstream, ABT_sched *sched) ABT_API_PUBLIC
Get the main scheduler of the target ES.
int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *flag) ABT_API_PUBLIC
Check if the target ES is the primary ES.
struct ABT_pool_opaque * ABT_pool
int ABT_xstream_create_with_rank(ABT_sched sched, int rank, ABT_xstream *newxstream) ABT_API_PUBLIC
Create a new ES with a specific rank.
#define HANDLE_WARNING(msg)
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
int ABT_xstream_set_main_sched_basic(ABT_xstream xstream, ABT_sched_predef predef, int num_pools, ABT_pool *pools) ABT_API_PUBLIC
Set the main scheduler for xstream with a predefined scheduler.
struct ABT_thread_opaque * ABT_thread
int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools, ABT_pool *pools) ABT_API_PUBLIC
Get the pools of the main scheduler of the target ES.
int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABT_xstream *newxstream) ABT_API_PUBLIC
Create a new ES with a predefined scheduler and return its handle through newxstream.
int ABT_xstream_join(ABT_xstream xstream) ABT_API_PUBLIC
Wait for xstream to terminate.
int ABT_xstream_check_events(ABT_sched sched) ABT_API_PUBLIC
Check the events and process them.
int ABT_xstream_set_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset) ABT_API_PUBLIC
Set the CPU affinity of the target ES.
int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid) ABT_API_PUBLIC
Bind the target ES to a target CPU.
ABTI_global * gp_ABTI_global
int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid) ABT_API_PUBLIC
Get the CPU binding for the target ES.
int ABT_xstream_get_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset, int *num_cpus) ABT_API_PUBLIC
Get the CPU affinity for the target ES.
static ABTU_ret_err int xstream_start(ABTI_xstream *p_xstream)
int ABT_xstream_exit(void) ABT_API_PUBLIC
Terminate the ES associated with the calling ULT.
static ABTU_ret_err int xstream_migrate_thread(ABTI_local *p_local, ABTI_thread *p_thread)
static void xstream_schedule_ythread(ABTI_xstream **pp_local_xstream, ABTI_ythread *p_ythread)
int ABT_xstream_free(ABT_xstream *xstream) ABT_API_PUBLIC
Release the ES object associated with ES handle.
#define ABT_ERR_XSTREAM_STATE
static ABTU_ret_err int xstream_join(ABTI_local **pp_local, ABTI_xstream *p_xstream)
#define ABT_ERR_FEATURE_NA
static void * xstream_launch_root_ythread(void *p_xstream)
int ABT_xstream_get_state(ABT_xstream xstream, ABT_xstream_state *state) ABT_API_PUBLIC
Return the state of xstream.
#define ABT_ERR_INV_XSTREAM_RANK
#define LOG_DEBUG(fmt,...)
struct ABT_sched_config_opaque * ABT_sched_config
int ABT_xstream_revive(ABT_xstream xstream) ABT_API_PUBLIC
Restart an ES that has been joined by ABT_xstream_join().
int ABT_xstream_set_main_sched(ABT_xstream xstream, ABT_sched sched) ABT_API_PUBLIC
Set the main scheduler of the target ES.
static ABTU_ret_err int xstream_create(ABTI_sched *p_sched, ABTI_xstream_type xstream_type, int rank, ABTI_xstream **pp_xstream)
#define ABT_ERR_INV_SCHED
#define ABT_SCHED_CONFIG_NULL
#define ABT_ERR_INV_XSTREAM
static void ABTU_free(void *ptr)
int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool) ABT_API_PUBLIC
Execute a unit on the local ES.
int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2, ABT_bool *result) ABT_API_PUBLIC
Compare two ES handles for equality.
static ABTU_ret_err int xstream_update_main_sched(ABTI_xstream **pp_local_xstream, ABTI_xstream *p_xstream, ABTI_sched *p_sched)