ARGOBOTS  7496202f85916e93d6d143320764c2aba5026d93
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
stream.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 ABTU_ret_err static int xstream_create(ABTI_sched *p_sched,
9  ABTI_xstream_type xstream_type, int rank,
10  ABTI_xstream **pp_xstream);
11 ABTU_ret_err static int xstream_start(ABTI_xstream *p_xstream);
12 ABTU_ret_err static int xstream_join(ABTI_local **pp_local,
13  ABTI_xstream *p_xstream);
14 static ABT_bool xstream_set_new_rank(ABTI_xstream *p_newxstream, int rank);
15 static void xstream_return_rank(ABTI_xstream *p_xstream);
16 static inline void xstream_schedule_ythread(ABTI_xstream **pp_local_xstream,
17  ABTI_ythread *p_ythread);
18 static inline void xstream_schedule_task(ABTI_xstream *p_local_xstream,
19  ABTI_thread *p_task);
20 static void xstream_init_main_sched(ABTI_xstream *p_xstream,
21  ABTI_sched *p_sched);
22 ABTU_ret_err static int
23 xstream_update_main_sched(ABTI_xstream **pp_local_xstream,
24  ABTI_xstream *p_xstream, ABTI_sched *p_sched);
25 static void *xstream_launch_root_ythread(void *p_xstream);
26 #ifndef ABT_CONFIG_DISABLE_MIGRATION
27 ABTU_ret_err static int xstream_migrate_thread(ABTI_local *p_local,
28  ABTI_thread *p_thread);
29 #endif
30 
46 int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
47 {
48  int abt_errno;
49  ABTI_sched *p_sched;
50  ABTI_xstream *p_newxstream;
51 
52  if (sched == ABT_SCHED_NULL) {
53  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
54  ABT_SCHED_CONFIG_NULL, &p_sched);
55  ABTI_CHECK_ERROR(abt_errno);
56  } else {
57  p_sched = ABTI_sched_get_ptr(sched);
58  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
60  }
61 
62  abt_errno =
63  xstream_create(p_sched, ABTI_XSTREAM_TYPE_SECONDARY, -1, &p_newxstream);
64  ABTI_CHECK_ERROR(abt_errno);
65 
66  /* Start this ES */
67  abt_errno = xstream_start(p_newxstream);
68  ABTI_CHECK_ERROR(abt_errno);
69 
70  /* Return value */
71  *newxstream = ABTI_xstream_get_handle(p_newxstream);
72  return ABT_SUCCESS;
73 }
74 
91 int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools,
92  ABT_pool *pools, ABT_sched_config config,
93  ABT_xstream *newxstream)
94 {
95  int abt_errno;
96  ABTI_xstream *p_newxstream;
97 
98  ABTI_sched *p_sched;
99  abt_errno =
100  ABTI_sched_create_basic(predef, num_pools, pools, config, &p_sched);
101  ABTI_CHECK_ERROR(abt_errno);
102 
103  abt_errno =
104  xstream_create(p_sched, ABTI_XSTREAM_TYPE_SECONDARY, -1, &p_newxstream);
105  ABTI_CHECK_ERROR(abt_errno);
106 
107  /* Start this ES */
108  abt_errno = xstream_start(p_newxstream);
109  ABTI_CHECK_ERROR(abt_errno);
110 
111  *newxstream = ABTI_xstream_get_handle(p_newxstream);
112  return ABT_SUCCESS;
113 }
114 
129  ABT_xstream *newxstream)
130 {
131  int abt_errno;
132  ABTI_sched *p_sched;
133  ABTI_xstream *p_newxstream;
134 
135  ABTI_CHECK_TRUE(rank >= 0, ABT_ERR_INV_XSTREAM_RANK);
136 
137  if (sched == ABT_SCHED_NULL) {
138  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
139  ABT_SCHED_CONFIG_NULL, &p_sched);
140  ABTI_CHECK_ERROR(abt_errno);
141  } else {
142  p_sched = ABTI_sched_get_ptr(sched);
143  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
145  }
146 
147  abt_errno = xstream_create(p_sched, ABTI_XSTREAM_TYPE_SECONDARY, rank,
148  &p_newxstream);
149  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
150  if (sched == ABT_SCHED_NULL)
151  ABTI_sched_free(ABTI_local_get_local_uninlined(), p_sched,
152  ABT_FALSE);
153  ABTI_HANDLE_ERROR(abt_errno);
154  }
155 
156  /* Start this ES */
157  abt_errno = xstream_start(p_newxstream);
158  ABTI_CHECK_ERROR(abt_errno);
159 
160  /* Return value */
161  *newxstream = ABTI_xstream_get_handle(p_newxstream);
162  return ABT_SUCCESS;
163 }
164 
174 {
175  ABTI_local *p_local = ABTI_local_get_local();
176  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
177  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
178 
179  /* Revives the main scheduler thread. */
180  ABTI_sched *p_main_sched = p_xstream->p_main_sched;
181  ABTI_ythread *p_main_sched_ythread = p_main_sched->p_ythread;
182  ABTI_CHECK_TRUE(ABTD_atomic_relaxed_load_int(
183  &p_main_sched_ythread->thread.state) ==
186 
187  ABTD_atomic_relaxed_store_uint32(&p_main_sched->request, 0);
188  ABTI_tool_event_thread_join(p_local, &p_main_sched_ythread->thread,
189  ABTI_local_get_xstream_or_null(p_local)
190  ? ABTI_local_get_xstream(p_local)->p_thread
191  : NULL);
192 
193  ABTI_thread_revive(p_local, p_xstream->p_root_pool,
194  p_main_sched_ythread->thread.f_thread,
195  p_main_sched_ythread->thread.p_arg,
196  &p_main_sched_ythread->thread);
197 
198  ABTD_atomic_relaxed_store_int(&p_xstream->state, ABT_XSTREAM_STATE_RUNNING);
199  ABTD_xstream_context_revive(&p_xstream->ctx);
200  return ABT_SUCCESS;
201 }
202 
218 {
219  ABTI_local *p_local = ABTI_local_get_local();
220  ABT_xstream h_xstream = *xstream;
221 
222  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(h_xstream);
223  if (p_xstream == NULL)
224  return ABT_SUCCESS;
225 
226  /* We first need to check whether p_local_xstream is NULL because this
227  * routine might be called by external threads. */
228  ABTI_CHECK_TRUE_MSG(p_xstream != ABTI_local_get_xstream_or_null(p_local),
230  "The current xstream cannot be freed.");
231 
232  ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
234  "The primary xstream cannot be freed explicitly.");
235 
236  /* Wait until xstream terminates */
237  int abt_errno = xstream_join(&p_local, p_xstream);
238  ABTI_CHECK_ERROR(abt_errno);
239 
240  /* Free the xstream object */
241  ABTI_xstream_free(p_local, p_xstream, ABT_FALSE);
242 
243  /* Return value */
244  *xstream = ABT_XSTREAM_NULL;
245  return ABT_SUCCESS;
246 }
247 
262 {
263  ABTI_local *p_local = ABTI_local_get_local();
264  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
265  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
266 
267  int abt_errno = xstream_join(&p_local, p_xstream);
268  ABTI_CHECK_ERROR(abt_errno);
269  return ABT_SUCCESS;
270 }
271 
286 {
287  ABTI_xstream *p_local_xstream;
288  ABTI_ythread *p_ythread;
289  ABTI_SETUP_LOCAL_YTHREAD_WITH_INIT_CHECK(&p_local_xstream, &p_ythread);
290 
291  /* Terminate the main scheduler. */
292  ABTD_atomic_fetch_or_uint32(&p_local_xstream->p_main_sched->p_ythread
293  ->thread.request,
294  ABTI_THREAD_REQ_TERMINATE);
295  /* Terminate this ULT */
296  ABTI_ythread_exit(p_local_xstream, p_ythread);
298  return ABT_SUCCESS;
299 }
300 
310 {
311  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
312 
313  ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
315  "The primary xstream cannot be canceled.");
316 
317  /* Terminate the main scheduler of the target xstream. */
318  ABTD_atomic_fetch_or_uint32(&p_xstream->p_main_sched->p_ythread->thread
319  .request,
320  ABTI_THREAD_REQ_TERMINATE);
321  return ABT_SUCCESS;
322 }
323 
342 {
343  *xstream = ABT_XSTREAM_NULL;
344 
345  ABTI_xstream *p_local_xstream;
346  ABTI_SETUP_LOCAL_XSTREAM_WITH_INIT_CHECK(&p_local_xstream);
347 
348  /* Return value */
349  *xstream = ABTI_xstream_get_handle(p_local_xstream);
350  return ABT_SUCCESS;
351 }
352 
363 int ABT_xstream_self_rank(int *rank)
364 {
365  ABTI_xstream *p_local_xstream;
366  ABTI_SETUP_LOCAL_XSTREAM_WITH_INIT_CHECK(&p_local_xstream);
367 
368  /* Return value */
369  *rank = (int)p_local_xstream->rank;
370  return ABT_SUCCESS;
371 }
372 
382 int ABT_xstream_set_rank(ABT_xstream xstream, int rank)
383 {
384  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
385  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
386 
387  p_xstream->rank = rank;
388 
389  /* Set the CPU affinity for the ES */
390  if (gp_ABTI_global->set_affinity == ABT_TRUE) {
391  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
392  }
393  return ABT_SUCCESS;
394 }
395 
405 int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
406 {
407  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
408  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
409 
410  *rank = (int)p_xstream->rank;
411  return ABT_SUCCESS;
412 }
413 
445 {
446  int abt_errno;
447 
448  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
449  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
450 
451  ABTI_xstream *p_local_xstream;
452  ABTI_ythread *p_self;
453  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, &p_self);
454 
455  /* For now, if the target ES is running, we allow to change the main
456  * scheduler of the ES only when the caller is running on the same ES. */
457  /* TODO: a new state representing that the scheduler is changed is needed
458  * to avoid running xstreams while the scheduler is changed in this
459  * function. */
460  if (ABTD_atomic_acquire_load_int(&p_xstream->state) ==
462  if (p_self->thread.p_last_xstream != p_xstream) {
463  ABTI_HANDLE_ERROR(ABT_ERR_XSTREAM_STATE);
464  }
465  }
466 
467  /* TODO: permit to change the scheduler even when having work units in pools
468  */
469  if (p_xstream->p_main_sched) {
470  /* We only allow to change the main scheduler when the current main
471  * scheduler of p_xstream has no work unit in its associated pools. */
472  if (ABTI_sched_get_effective_size(ABTI_xstream_get_local(
473  p_local_xstream),
474  p_xstream->p_main_sched) > 0) {
475  ABTI_HANDLE_ERROR(ABT_ERR_XSTREAM);
476  }
477  }
478 
479  ABTI_sched *p_sched;
480  if (sched == ABT_SCHED_NULL) {
481  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
482  ABT_SCHED_CONFIG_NULL, &p_sched);
483  ABTI_CHECK_ERROR(abt_errno);
484  } else {
485  p_sched = ABTI_sched_get_ptr(sched);
486  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
488  }
489 
490  abt_errno = xstream_update_main_sched(&p_local_xstream, p_xstream, p_sched);
491  ABTI_CHECK_ERROR(abt_errno);
492  return ABT_SUCCESS;
493 }
494 
511  ABT_sched_predef predef, int num_pools,
512  ABT_pool *pools)
513 {
514  int abt_errno;
515 
516  ABTI_xstream *p_local_xstream;
517  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
518 
519  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
520  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
521 
522  ABTI_sched *p_sched;
523  abt_errno = ABTI_sched_create_basic(predef, num_pools, pools,
524  ABT_SCHED_CONFIG_NULL, &p_sched);
525  ABTI_CHECK_ERROR(abt_errno);
526 
527  abt_errno = xstream_update_main_sched(&p_local_xstream, p_xstream, p_sched);
528  ABTI_CHECK_ERROR(abt_errno);
529  return ABT_SUCCESS;
530 }
531 
545 {
546  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
547  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
548 
549  *sched = ABTI_sched_get_handle(p_xstream->p_main_sched);
550  return ABT_SUCCESS;
551 }
552 
566 int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools,
567  ABT_pool *pools)
568 {
569  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
570  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
571 
572  ABTI_sched *p_sched = p_xstream->p_main_sched;
573  max_pools = p_sched->num_pools > max_pools ? max_pools : p_sched->num_pools;
574  memcpy(pools, p_sched->pools, sizeof(ABT_pool) * max_pools);
575  return ABT_SUCCESS;
576 }
577 
588 {
589  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
590  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
591 
592  *state = (ABT_xstream_state)ABTD_atomic_acquire_load_int(&p_xstream->state);
593  return ABT_SUCCESS;
594 }
595 
612  ABT_bool *result)
613 {
614  ABTI_xstream *p_xstream1 = ABTI_xstream_get_ptr(xstream1);
615  ABTI_xstream *p_xstream2 = ABTI_xstream_get_ptr(xstream2);
616  *result = (p_xstream1 == p_xstream2) ? ABT_TRUE : ABT_FALSE;
617  return ABT_SUCCESS;
618 }
619 
632 int ABT_xstream_get_num(int *num_xstreams)
633 {
634  /* In case that Argobots has not been initialized, return an error code
635  * instead of making the call fail. */
636  ABTI_SETUP_WITH_INIT_CHECK();
637 
638  *num_xstreams = gp_ABTI_global->num_xstreams;
639  return ABT_SUCCESS;
640 }
641 
657 {
658  ABTI_xstream *p_xstream;
659 
660  p_xstream = ABTI_xstream_get_ptr(xstream);
661  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
662 
663  /* Return value */
664  *flag =
665  (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) ? ABT_TRUE : ABT_FALSE;
666  return ABT_SUCCESS;
667 }
668 
687 {
688  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
689 
690  ABTI_xstream *p_local_xstream;
691  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
692 
693  ABTI_xstream_run_unit(&p_local_xstream, unit, p_pool);
694  return ABT_SUCCESS;
695 }
696 
710 {
711  ABTI_xstream *p_local_xstream;
712  ABTI_SETUP_LOCAL_XSTREAM_WITH_INIT_CHECK(&p_local_xstream);
713 
714  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
715  ABTI_CHECK_NULL_SCHED_PTR(p_sched);
716 
717  ABTI_xstream_check_events(p_local_xstream, p_sched);
718  return ABT_SUCCESS;
719 }
720 
734 int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
735 {
736  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
737  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
738 
739  ABTD_affinity_cpuset cpuset;
740  cpuset.num_cpuids = 1;
741  cpuset.cpuids = &cpuid;
742  int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &cpuset);
743  /* Do not free cpuset since cpuids points to a user pointer. */
744  ABTI_CHECK_ERROR(abt_errno);
745  return ABT_SUCCESS;
746 }
747 
761 int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
762 {
763  int abt_errno;
764  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
765  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
766 
767  ABTD_affinity_cpuset cpuset;
768  cpuset.num_cpuids = 0;
769  cpuset.cpuids = NULL;
770  abt_errno = ABTD_affinity_cpuset_read(&p_xstream->ctx, &cpuset);
771  ABTI_CHECK_ERROR(abt_errno);
772 
773  if (cpuset.num_cpuids != 0) {
774  *cpuid = cpuset.cpuids[0];
775  } else {
776  abt_errno = ABT_ERR_FEATURE_NA;
777  }
778  ABTD_affinity_cpuset_destroy(&cpuset);
779  ABTI_CHECK_ERROR(abt_errno);
780  return ABT_SUCCESS;
781 }
782 
797 int ABT_xstream_set_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset)
798 {
799  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
800  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
801 
802  ABTD_affinity_cpuset affinity;
803  affinity.num_cpuids = cpuset_size;
804  affinity.cpuids = cpuset;
805  int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &affinity);
806  /* Do not free affinity since cpuids may not be freed. */
807  ABTI_CHECK_ERROR(abt_errno);
808  return ABT_SUCCESS;
809 }
810 
831 int ABT_xstream_get_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset,
832  int *num_cpus)
833 {
834  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
835  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
836 
837  ABTD_affinity_cpuset affinity;
838  int abt_errno = ABTD_affinity_cpuset_read(&p_xstream->ctx, &affinity);
839  ABTI_CHECK_ERROR(abt_errno);
840 
841  int i, n;
842  n = affinity.num_cpuids > cpuset_size ? cpuset_size : affinity.num_cpuids;
843  *num_cpus = n;
844  for (i = 0; i < n; i++) {
845  cpuset[i] = affinity.cpuids[i];
846  }
847  ABTD_affinity_cpuset_destroy(&affinity);
848  return abt_errno;
849 }
850 
851 /*****************************************************************************/
852 /* Private APIs */
853 /*****************************************************************************/
854 
855 ABTU_ret_err int ABTI_xstream_create_primary(ABTI_xstream **pp_xstream)
856 {
857  int abt_errno;
858  ABTI_xstream *p_newxstream;
859  ABTI_sched *p_sched;
860 
861  /* For the primary ES, a default scheduler is created. */
862  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
863  ABT_SCHED_CONFIG_NULL, &p_sched);
864  ABTI_CHECK_ERROR(abt_errno);
865 
866  abt_errno =
867  xstream_create(p_sched, ABTI_XSTREAM_TYPE_PRIMARY, -1, &p_newxstream);
868  ABTI_CHECK_ERROR(abt_errno);
869 
870  *pp_xstream = p_newxstream;
871  return ABT_SUCCESS;
872 }
873 
874 /* This routine starts the primary ES. It should be called in ABT_init. */
875 void ABTI_xstream_start_primary(ABTI_xstream **pp_local_xstream,
876  ABTI_xstream *p_xstream,
877  ABTI_ythread *p_ythread)
878 {
879  /* p_ythread must be the main thread. */
880  ABTI_ASSERT(p_ythread->thread.type & ABTI_THREAD_TYPE_MAIN);
881  /* The ES's state must be running here. */
882  ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
884 
885  LOG_DEBUG("[E%d] start\n", p_xstream->rank);
886 
887  ABTD_xstream_context_set_self(&p_xstream->ctx);
888 
889  /* Set the CPU affinity for the ES */
890  if (gp_ABTI_global->set_affinity == ABT_TRUE) {
891  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
892  }
893 
894  /* Context switch to the root thread. */
895  p_xstream->p_root_ythread->thread.p_last_xstream = p_xstream;
896  ABTD_ythread_context_switch(&p_ythread->ctx,
897  &p_xstream->p_root_ythread->ctx);
898  /* Come back to the main thread. Now this thread is executed on top of the
899  * main scheduler, which is running on the root thread. */
900  (*pp_local_xstream)->p_thread = &p_ythread->thread;
901 }
902 
903 void ABTI_xstream_run_unit(ABTI_xstream **pp_local_xstream, ABT_unit unit,
904  ABTI_pool *p_pool)
905 {
906  ABT_unit_type type = p_pool->u_get_type(unit);
907 
908  if (type == ABT_UNIT_TYPE_THREAD) {
909  ABT_thread thread = p_pool->u_get_thread(unit);
910  ABTI_ythread *p_ythread = ABTI_ythread_get_ptr(thread);
911  /* Switch the context */
912  xstream_schedule_ythread(pp_local_xstream, p_ythread);
913  } else {
914  ABTI_ASSERT(type == ABT_UNIT_TYPE_TASK);
915  ABT_task task = p_pool->u_get_task(unit);
916  ABTI_thread *p_task = ABTI_thread_get_ptr(task);
917  /* Execute the task */
918  xstream_schedule_task(*pp_local_xstream, p_task);
919  }
920 }
921 
922 void ABTI_xstream_check_events(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
923 {
924  ABTI_info_check_print_all_thread_stacks();
925 
926  uint32_t request = ABTD_atomic_acquire_load_uint32(
927  &p_xstream->p_main_sched->p_ythread->thread.request);
928  if (request & ABTI_THREAD_REQ_JOIN) {
929  ABTI_sched_finish(p_sched);
930  }
931 
932  if ((request & ABTI_THREAD_REQ_TERMINATE) ||
933  (request & ABTI_THREAD_REQ_CANCEL)) {
934  ABTI_sched_exit(p_sched);
935  }
936 }
937 
938 void ABTI_xstream_free(ABTI_local *p_local, ABTI_xstream *p_xstream,
939  ABT_bool force_free)
940 {
941  LOG_DEBUG("[E%d] freed\n", p_xstream->rank);
942 
943  /* Clean up memory pool. */
944  ABTI_mem_finalize_local(p_xstream);
945  /* Return rank for reuse. rank must be returned prior to other free
946  * functions so that other xstreams cannot refer to this xstream. */
947  xstream_return_rank(p_xstream);
948 
949  /* Free the scheduler */
950  ABTI_sched *p_cursched = p_xstream->p_main_sched;
951  if (p_cursched != NULL) {
952  /* Join a scheduler thread. */
953  ABTI_tool_event_thread_join(p_local, &p_cursched->p_ythread->thread,
954  ABTI_local_get_xstream_or_null(p_local)
955  ? ABTI_local_get_xstream(p_local)
956  ->p_thread
957  : NULL);
958  ABTI_sched_discard_and_free(p_local, p_cursched, force_free);
959  /* The main scheduler thread is also freed. */
960  }
961 
962  /* Free the root thread and pool. */
963  ABTI_ythread_free_root(p_local, p_xstream->p_root_ythread);
964  ABTI_pool_free(p_xstream->p_root_pool);
965 
966  /* Free the context if a given xstream is secondary. */
967  if (p_xstream->type == ABTI_XSTREAM_TYPE_SECONDARY) {
968  ABTD_xstream_context_free(&p_xstream->ctx);
969  }
970 
971  ABTU_free(p_xstream);
972 }
973 
974 void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os, int indent,
975  ABT_bool print_sub)
976 {
977  if (p_xstream == NULL) {
978  fprintf(p_os, "%*s== NULL ES ==\n", indent, "");
979  } else {
980  char *type, *state;
981  switch (p_xstream->type) {
982  case ABTI_XSTREAM_TYPE_PRIMARY:
983  type = "PRIMARY";
984  break;
985  case ABTI_XSTREAM_TYPE_SECONDARY:
986  type = "SECONDARY";
987  break;
988  default:
989  type = "UNKNOWN";
990  break;
991  }
992  switch (ABTD_atomic_acquire_load_int(&p_xstream->state)) {
994  state = "RUNNING";
995  break;
997  state = "TERMINATED";
998  break;
999  default:
1000  state = "UNKNOWN";
1001  break;
1002  }
1003 
1004  fprintf(p_os,
1005  "%*s== ES (%p) ==\n"
1006  "%*srank : %d\n"
1007  "%*stype : %s\n"
1008  "%*sstate : %s\n"
1009  "%*smain_sched: %p\n",
1010  indent, "", (void *)p_xstream, indent, "", p_xstream->rank,
1011  indent, "", type, indent, "", state, indent, "",
1012  (void *)p_xstream->p_main_sched);
1013 
1014  if (print_sub == ABT_TRUE) {
1015  ABTI_sched_print(p_xstream->p_main_sched, p_os,
1016  indent + ABTI_INDENT, ABT_TRUE);
1017  }
1018  }
1019  fflush(p_os);
1020 }
1021 
1022 static void *xstream_launch_root_ythread(void *p_xstream)
1023 {
1024  ABTI_xstream *p_local_xstream = (ABTI_xstream *)p_xstream;
1025 
1026  /* Initialization of the local variables */
1027  ABTI_local_set_xstream(p_local_xstream);
1028 
1029  LOG_DEBUG("[E%d] start\n", p_local_xstream->rank);
1030 
1031  /* Set the root thread as the current thread */
1032  ABTI_ythread *p_root_ythread = p_local_xstream->p_root_ythread;
1033  p_local_xstream->p_thread = &p_local_xstream->p_root_ythread->thread;
1034  p_root_ythread->thread.f_thread(p_root_ythread->thread.p_arg);
1035 
1036  LOG_DEBUG("[E%d] end\n", p_local_xstream->rank);
1037 
1038  /* Reset the current ES and its local info. */
1039  ABTI_local_set_xstream(NULL);
1040  return NULL;
1041 }
1042 
1043 /*****************************************************************************/
1044 /* Internal static functions */
1045 /*****************************************************************************/
1046 
1047 ABTU_ret_err static int xstream_create(ABTI_sched *p_sched,
1048  ABTI_xstream_type xstream_type, int rank,
1049  ABTI_xstream **pp_xstream)
1050 {
1051  int abt_errno;
1052  ABTI_xstream *p_newxstream;
1053 
1054  abt_errno = ABTU_malloc(sizeof(ABTI_xstream), (void **)&p_newxstream);
1055  ABTI_CHECK_ERROR(abt_errno);
1056 
1057  p_newxstream->p_prev = NULL;
1058  p_newxstream->p_next = NULL;
1059 
1060  if (xstream_set_new_rank(p_newxstream, rank) == ABT_FALSE) {
1061  ABTU_free(p_newxstream);
1062  return ABT_ERR_INV_XSTREAM_RANK;
1063  }
1064 
1065  p_newxstream->type = xstream_type;
1066  ABTD_atomic_relaxed_store_int(&p_newxstream->state,
1068  p_newxstream->p_main_sched = NULL;
1069  p_newxstream->p_thread = NULL;
1070  ABTI_mem_init_local(p_newxstream);
1071 
1072  /* Set the main scheduler */
1073  xstream_init_main_sched(p_newxstream, p_sched);
1074 
1075  /* Create the root thread. */
1076  abt_errno =
1077  ABTI_ythread_create_root(ABTI_xstream_get_local(p_newxstream),
1078  p_newxstream, &p_newxstream->p_root_ythread);
1079  ABTI_CHECK_ERROR(abt_errno);
1080 
1081  /* Create the root pool. */
1082  abt_errno = ABTI_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPSC,
1083  ABT_FALSE, &p_newxstream->p_root_pool);
1084  ABTI_CHECK_ERROR(abt_errno);
1085 
1086  /* Create the main scheduler thread. */
1087  abt_errno =
1088  ABTI_ythread_create_main_sched(ABTI_xstream_get_local(p_newxstream),
1089  p_newxstream,
1090  p_newxstream->p_main_sched);
1091  ABTI_CHECK_ERROR(abt_errno);
1092 
1093  LOG_DEBUG("[E%d] created\n", p_newxstream->rank);
1094 
1095  /* Return value */
1096  *pp_xstream = p_newxstream;
1097  return ABT_SUCCESS;
1098 }
1099 
1100 ABTU_ret_err static int xstream_start(ABTI_xstream *p_xstream)
1101 {
1102  /* The ES's state must be RUNNING */
1103  ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
1105  ABTI_ASSERT(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY);
1106  /* Start the main scheduler on a different ES */
1107  int abt_errno =
1108  ABTD_xstream_context_create(xstream_launch_root_ythread,
1109  (void *)p_xstream, &p_xstream->ctx);
1110  ABTI_CHECK_ERROR(abt_errno);
1111 
1112  /* Set the CPU affinity for the ES */
1113  if (gp_ABTI_global->set_affinity == ABT_TRUE) {
1114  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
1115  }
1116  return ABT_SUCCESS;
1117 }
1118 
1119 ABTU_ret_err static int xstream_join(ABTI_local **pp_local,
1120  ABTI_xstream *p_xstream)
1121 {
1122  /* The primary ES cannot be joined. */
1123  ABTI_CHECK_TRUE(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
1125  /* The main scheduler cannot join itself. */
1126  ABTI_CHECK_TRUE(!ABTI_local_get_xstream_or_null(*pp_local) ||
1127  &p_xstream->p_main_sched->p_ythread->thread !=
1128  ABTI_local_get_xstream(*pp_local)->p_thread,
1130 
1131  /* Wait until the target ES terminates */
1132  ABTI_sched_finish(p_xstream->p_main_sched);
1133  ABTI_thread_join(pp_local, &p_xstream->p_main_sched->p_ythread->thread);
1134 
1135  /* Normal join request */
1136  ABTD_xstream_context_join(&p_xstream->ctx);
1137 
1138  ABTI_ASSERT(ABTD_atomic_acquire_load_int(&p_xstream->state) ==
1140  return ABT_SUCCESS;
1141 }
1142 
1143 static inline void xstream_schedule_ythread(ABTI_xstream **pp_local_xstream,
1144  ABTI_ythread *p_ythread)
1145 {
1146  ABTI_xstream *p_local_xstream = *pp_local_xstream;
1147 
1148 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1149  if (ABTD_atomic_acquire_load_uint32(&p_ythread->thread.request) &
1150  ABTI_THREAD_REQ_CANCEL) {
1151  LOG_DEBUG("[U%" PRIu64 ":E%d] canceled\n",
1152  ABTI_thread_get_id(&p_ythread->thread),
1153  p_local_xstream->rank);
1154  ABTD_ythread_cancel(p_local_xstream, p_ythread);
1155  ABTI_xstream_terminate_thread(ABTI_xstream_get_local(p_local_xstream),
1156  &p_ythread->thread);
1157  return;
1158  }
1159 #endif
1160 
1161 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1162  if (ABTD_atomic_acquire_load_uint32(&p_ythread->thread.request) &
1163  ABTI_THREAD_REQ_MIGRATE) {
1164  int abt_errno =
1165  xstream_migrate_thread(ABTI_xstream_get_local(p_local_xstream),
1166  &p_ythread->thread);
1167  if (!ABTI_IS_ERROR_CHECK_ENABLED || abt_errno == ABT_SUCCESS) {
1168  /* Migration succeeded, so we do not need to schedule p_ythread. */
1169  return;
1170  }
1171  }
1172 #endif
1173 
1174  /* Change the last ES */
1175  p_ythread->thread.p_last_xstream = p_local_xstream;
1176 
1177  /* Change the ULT state */
1178  ABTD_atomic_release_store_int(&p_ythread->thread.state,
1180 
1181  /* Switch the context */
1182  LOG_DEBUG("[U%" PRIu64 ":E%d] start running\n",
1183  ABTI_thread_get_id(&p_ythread->thread), p_local_xstream->rank);
1184 
1185  /* Since the argument is pp_local_xstream, p_local_xstream->p_thread must be
1186  * yieldable. */
1187  ABTI_ythread *p_self = ABTI_thread_get_ythread(p_local_xstream->p_thread);
1188  p_ythread = ABTI_ythread_context_switch_to_child(pp_local_xstream, p_self,
1189  p_ythread);
1190  /* The previous ULT (p_ythread) may not be the same as one to which the
1191  * context has been switched. */
1192  /* The scheduler continues from here. */
1193  p_local_xstream = *pp_local_xstream;
1194 
1195  LOG_DEBUG("[U%" PRIu64 ":E%d] stopped\n",
1196  ABTI_thread_get_id(&p_ythread->thread), p_local_xstream->rank);
1197 
1198  /* Request handling. */
1199  /* We do not need to acquire-load request since all critical requests
1200  * (BLOCK, ORPHAN, STOP, and NOPUSH) are written by p_ythread. CANCEL might
1201  * be delayed. */
1202  uint32_t request =
1203  ABTD_atomic_acquire_load_uint32(&p_ythread->thread.request);
1204  if (request & ABTI_THREAD_REQ_TERMINATE) {
1205  /* The ULT has completed its execution or it called the exit request. */
1206  LOG_DEBUG("[U%" PRIu64 ":E%d] finished\n",
1207  ABTI_thread_get_id(&p_ythread->thread),
1208  p_local_xstream->rank);
1209  ABTI_xstream_terminate_thread(ABTI_xstream_get_local(p_local_xstream),
1210  &p_ythread->thread);
1211 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1212  } else if (request & ABTI_THREAD_REQ_CANCEL) {
1213  LOG_DEBUG("[U%" PRIu64 ":E%d] canceled\n",
1214  ABTI_thread_get_id(&p_ythread->thread),
1215  p_local_xstream->rank);
1216  ABTD_ythread_cancel(p_local_xstream, p_ythread);
1217  ABTI_xstream_terminate_thread(ABTI_xstream_get_local(p_local_xstream),
1218  &p_ythread->thread);
1219 #endif
1220  } else if (!(request & ABTI_THREAD_REQ_NON_YIELD)) {
1221  /* The ULT did not finish its execution.
1222  * Change the state of current running ULT and
1223  * add it to the pool again. */
1224  ABTI_pool_add_thread(&p_ythread->thread);
1225  } else if (request & ABTI_THREAD_REQ_BLOCK) {
1226  LOG_DEBUG("[U%" PRIu64 ":E%d] check blocked\n",
1227  ABTI_thread_get_id(&p_ythread->thread),
1228  p_local_xstream->rank);
1229  ABTI_thread_unset_request(&p_ythread->thread, ABTI_THREAD_REQ_BLOCK);
1230 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1231  } else if (request & ABTI_THREAD_REQ_MIGRATE) {
1232  /* This is the case when the ULT requests migration of itself. */
1233  int abt_errno =
1234  xstream_migrate_thread(ABTI_xstream_get_local(p_local_xstream),
1235  &p_ythread->thread);
1236  /* Migration is optional, so it is okay if it fails. */
1237  (void)abt_errno;
1238 #endif
1239  } else if (request & ABTI_THREAD_REQ_ORPHAN) {
1240  /* The ULT is not pushed back to the pool and is disconnected from any
1241  * pool. */
1242  LOG_DEBUG("[U%" PRIu64 ":E%d] orphaned\n",
1243  ABTI_thread_get_id(&p_ythread->thread),
1244  p_local_xstream->rank);
1245  ABTI_thread_unset_request(&p_ythread->thread, ABTI_THREAD_REQ_ORPHAN);
1246  p_ythread->thread.p_pool->u_free(&p_ythread->thread.unit);
1247  p_ythread->thread.p_pool = NULL;
1248  } else if (request & ABTI_THREAD_REQ_NOPUSH) {
1249  /* The ULT is not pushed back to the pool */
1250  LOG_DEBUG("[U%" PRIu64 ":E%d] not pushed\n",
1251  ABTI_thread_get_id(&p_ythread->thread),
1252  p_local_xstream->rank);
1253  ABTI_thread_unset_request(&p_ythread->thread, ABTI_THREAD_REQ_NOPUSH);
1254  } else {
1255  ABTI_ASSERT(0);
1256  ABTU_unreachable();
1257  }
1258 }
1259 
1260 static inline void xstream_schedule_task(ABTI_xstream *p_local_xstream,
1261  ABTI_thread *p_task)
1262 {
1263 #ifndef ABT_CONFIG_DISABLE_TASK_CANCEL
1264  if (ABTD_atomic_acquire_load_uint32(&p_task->request) &
1265  ABTI_THREAD_REQ_CANCEL) {
1266  ABTI_tool_event_thread_cancel(p_local_xstream, p_task);
1267  ABTI_xstream_terminate_thread(ABTI_xstream_get_local(p_local_xstream),
1268  p_task);
1269  return;
1270  }
1271 #endif
1272 
1273  /* Change the task state */
1274  ABTD_atomic_release_store_int(&p_task->state, ABT_THREAD_STATE_RUNNING);
1275 
1276  /* Set the associated ES */
1277  p_task->p_last_xstream = p_local_xstream;
1278 
1279  /* Execute the task function */
1280  LOG_DEBUG("[T%" PRIu64 ":E%d] running\n", ABTI_thread_get_id(p_task),
1281  p_local_xstream->rank);
1282 
1283  ABTI_thread *p_sched_thread = p_local_xstream->p_thread;
1284  p_local_xstream->p_thread = p_task;
1285  p_task->p_parent = p_sched_thread;
1286 
1287  /* Execute the task function */
1288  ABTI_tool_event_thread_run(p_local_xstream, p_task, p_sched_thread,
1289  p_sched_thread);
1290  LOG_DEBUG("[T%" PRIu64 ":E%d] running\n", ABTI_thread_get_id(p_task),
1291  p_local_xstream->rank);
1292  p_task->f_thread(p_task->p_arg);
1293  ABTI_tool_event_thread_finish(p_local_xstream, p_task, p_sched_thread);
1294  LOG_DEBUG("[T%" PRIu64 ":E%d] stopped\n", ABTI_thread_get_id(p_task),
1295  p_local_xstream->rank);
1296 
1297  /* Set the current running scheduler's thread */
1298  p_local_xstream->p_thread = p_sched_thread;
1299 
1300  /* Terminate the tasklet */
1301  ABTI_xstream_terminate_thread(ABTI_xstream_get_local(p_local_xstream),
1302  p_task);
1303 }
1304 
1305 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1306 ABTU_ret_err static int xstream_migrate_thread(ABTI_local *p_local,
1307  ABTI_thread *p_thread)
1308 {
1309  int abt_errno;
1310  ABTI_pool *p_pool;
1311 
1312  ABTI_thread_mig_data *p_mig_data;
1313  abt_errno = ABTI_thread_get_mig_data(p_local, p_thread, &p_mig_data);
1314  ABTI_CHECK_ERROR(abt_errno);
1315 
1316  /* callback function */
1317  if (p_mig_data->f_migration_cb) {
1318  ABTI_ythread *p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
1319  if (p_ythread) {
1320  ABT_thread thread = ABTI_ythread_get_handle(p_ythread);
1321  p_mig_data->f_migration_cb(thread, p_mig_data->p_migration_cb_arg);
1322  }
1323  }
1324 
1325  /* If request is set, p_migration_pool has a valid pool pointer. */
1326  ABTI_ASSERT(ABTD_atomic_acquire_load_uint32(&p_thread->request) &
1327  ABTI_THREAD_REQ_MIGRATE);
1328 
1329  /* Extracting argument in migration request. */
1330  p_pool = ABTD_atomic_relaxed_load_ptr(&p_mig_data->p_migration_pool);
1331  ABTI_thread_unset_request(p_thread, ABTI_THREAD_REQ_MIGRATE);
1332 
1333  /* Change the associated pool */
1334  p_thread->p_pool = p_pool;
1335 
1336  /* Add the unit to the scheduler's pool */
1337  ABTI_pool_push(p_pool, p_thread->unit);
1338 
1339  ABTI_pool_dec_num_migrations(p_pool);
1340 
1341  return ABT_SUCCESS;
1342 }
1343 #endif
1344 
1345 static void xstream_init_main_sched(ABTI_xstream *p_xstream,
1346  ABTI_sched *p_sched)
1347 {
1348  ABTI_ASSERT(p_xstream->p_main_sched == NULL);
1349  /* The main scheduler will to be a ULT, not a tasklet */
1350  p_sched->type = ABT_SCHED_TYPE_ULT;
1351  /* Set the scheduler as a main scheduler */
1352  p_sched->used = ABTI_SCHED_MAIN;
1353  /* Set the scheduler */
1354  p_xstream->p_main_sched = p_sched;
1355 }
1356 
1357 ABTU_ret_err static int
1358 xstream_update_main_sched(ABTI_xstream **pp_local_xstream,
1359  ABTI_xstream *p_xstream, ABTI_sched *p_sched)
1360 {
1361  ABTI_ythread *p_ythread = NULL;
1362  ABTI_sched *p_main_sched;
1363  ABTI_pool *p_tar_pool = NULL;
1364  int p;
1365 
1366  /* The main scheduler will to be a ULT, not a tasklet */
1367  p_sched->type = ABT_SCHED_TYPE_ULT;
1368 
1369  /* Set the scheduler as a main scheduler */
1370  p_sched->used = ABTI_SCHED_MAIN;
1371 
1372  p_main_sched = p_xstream->p_main_sched;
1373  if (p_main_sched == NULL) {
1374  /* Set the scheduler */
1375  p_xstream->p_main_sched = p_sched;
1376  return ABT_SUCCESS;
1377  }
1378 
1379  /* If the ES has a main scheduler, we have to free it */
1380  ABTI_CHECK_YIELDABLE((*pp_local_xstream)->p_thread, &p_ythread,
1382  p_tar_pool = ABTI_pool_get_ptr(p_sched->pools[0]);
1383 
1384  /* If the caller ULT is associated with a pool of the current main
1385  * scheduler, it needs to be associated to a pool of new scheduler. */
1386  for (p = 0; p < p_main_sched->num_pools; p++) {
1387  if (p_ythread->thread.p_pool ==
1388  ABTI_pool_get_ptr(p_main_sched->pools[p])) {
1389  /* Associate the work unit to the first pool of new scheduler */
1390  p_ythread->thread.p_pool->u_free(&p_ythread->thread.unit);
1391  ABT_thread h_thread = ABTI_ythread_get_handle(p_ythread);
1392  p_ythread->thread.unit = p_tar_pool->u_create_from_thread(h_thread);
1393  p_ythread->thread.p_pool = p_tar_pool;
1394  break;
1395  }
1396  }
1397  if (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) {
1398  ABTI_CHECK_TRUE(p_ythread->thread.type & ABTI_THREAD_TYPE_MAIN,
1399  ABT_ERR_THREAD);
1400 
1401  /* Since the primary ES does not finish its execution until ABT_finalize
1402  * is called, its main scheduler needs to be automatically freed when
1403  * it is freed in ABT_finalize. */
1404  p_sched->automatic = ABT_TRUE;
1405  }
1406 
1407  /* Finish the current main scheduler */
1408  ABTI_sched_set_request(p_main_sched, ABTI_SCHED_REQ_FINISH);
1409 
1410  /* If the ES is secondary, we should take the associated ULT of the
1411  * current main scheduler and keep it in the new scheduler. */
1412  p_sched->p_ythread = p_main_sched->p_ythread;
1413  /* The current ULT is pushed to the new scheduler's pool so that when
1414  * the new scheduler starts (see below), it can be scheduled by the new
1415  * scheduler. When the current ULT resumes its execution, it will free
1416  * the current main scheduler (see below). */
1417  ABTI_pool_push(p_tar_pool, p_ythread->thread.unit);
1418 
1419  /* Set the scheduler */
1420  p_xstream->p_main_sched = p_sched;
1421 
1422  /* Switch to the current main scheduler */
1423  ABTI_thread_set_request(&p_ythread->thread, ABTI_THREAD_REQ_NOPUSH);
1424  ABTI_ythread_context_switch_to_parent(pp_local_xstream, p_ythread,
1426 
1427  /* Now, we free the current main scheduler. p_main_sched->p_ythread must
1428  * be NULL to avoid freeing it in ABTI_sched_discard_and_free(). */
1429  p_main_sched->p_ythread = NULL;
1430  ABTI_sched_discard_and_free(ABTI_xstream_get_local(*pp_local_xstream),
1431  p_main_sched, ABT_FALSE);
1432  return ABT_SUCCESS;
1433 }
1434 
1435 /* Set a new rank to ES */
1436 static ABT_bool xstream_set_new_rank(ABTI_xstream *p_newxstream, int rank)
1437 {
1438  ABTI_global *p_global = gp_ABTI_global;
1439 
1440  ABTI_spinlock_acquire(&p_global->xstream_list_lock);
1441 
1442  ABTI_xstream *p_prev_xstream = p_global->p_xstream_head;
1443  ABTI_xstream *p_xstream = p_prev_xstream;
1444  if (rank == -1) {
1445  /* Find an unused rank from 0. */
1446  rank = 0;
1447  while (p_xstream) {
1448  if (p_xstream->rank == rank) {
1449  rank++;
1450  } else {
1451  /* Use this rank. */
1452  break;
1453  }
1454  p_prev_xstream = p_xstream;
1455  p_xstream = p_xstream->p_next;
1456  }
1457  } else {
1458  /* Check if a certain rank is available */
1459  while (p_xstream) {
1460  if (p_xstream->rank == rank) {
1461  ABTI_spinlock_release(&p_global->xstream_list_lock);
1462  return ABT_FALSE;
1463  } else if (p_xstream->rank > rank) {
1464  /* Use this p_xstream. */
1465  break;
1466  }
1467  p_prev_xstream = p_xstream;
1468  p_xstream = p_xstream->p_next;
1469  }
1470  }
1471  if (!p_xstream) {
1472  /* p_newxstream is appended to p_prev_xstream */
1473  if (p_prev_xstream) {
1474  p_prev_xstream->p_next = p_newxstream;
1475  p_newxstream->p_prev = p_prev_xstream;
1476  p_newxstream->p_next = NULL;
1477  } else {
1478  ABTI_ASSERT(p_global->p_xstream_head == NULL);
1479  p_newxstream->p_prev = NULL;
1480  p_newxstream->p_next = NULL;
1481  p_global->p_xstream_head = p_newxstream;
1482  }
1483  } else {
1484  /* p_newxstream is inserted in the middle.
1485  * (p_xstream->p_prev) -> p_new_xstream -> p_xstream */
1486  if (p_xstream->p_prev) {
1487  p_xstream->p_prev->p_next = p_newxstream;
1488  p_newxstream->p_prev = p_xstream->p_prev;
1489  } else {
1490  /* This p_xstream is the first element */
1491  ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
1492  p_global->p_xstream_head = p_newxstream;
1493  }
1494  p_xstream->p_prev = p_newxstream;
1495  p_newxstream->p_next = p_xstream;
1496  }
1497  p_global->num_xstreams++;
1498  if (rank >= p_global->max_xstreams) {
1499  static int max_xstreams_warning_once = 0;
1500  if (max_xstreams_warning_once == 0) {
1501  /* Because some Argobots functionalities depend on the runtime value
1502  * ABT_MAX_NUM_XSTREAMS (or gp_ABTI_global->max_xstreams), changing
1503  * this value at run-time can cause an error. For example, using
1504  * ABT_mutex created before updating max_xstreams causes an error
1505  * since ABTI_thread_htable's array size depends on
1506  * ABT_MAX_NUM_XSTREAMS. To fix this issue, please set a larger
1507  * number to ABT_MAX_NUM_XSTREAMS in advance. */
1508  char *warning_message;
1509  int abt_errno =
1510  ABTU_malloc(sizeof(char) * 1024, (void **)&warning_message);
1511  if (!ABTI_IS_ERROR_CHECK_ENABLED || abt_errno == ABT_SUCCESS) {
1512  snprintf(warning_message, 1024,
1513  "Warning: the number of execution streams exceeds "
1514  "ABT_MAX_NUM_XSTREAMS (=%d). This may cause an error.",
1515  p_global->max_xstreams);
1516  HANDLE_WARNING(warning_message);
1517  ABTU_free(warning_message);
1518  max_xstreams_warning_once = 1;
1519  }
1520  }
1521  /* Anyway. let's increase max_xstreams. */
1522  p_global->max_xstreams = rank + 1;
1523  }
1524 
1525  ABTI_spinlock_release(&p_global->xstream_list_lock);
1526 
1527  /* Set the rank */
1528  p_newxstream->rank = rank;
1529  return ABT_TRUE;
1530 }
1531 
1532 static void xstream_return_rank(ABTI_xstream *p_xstream)
1533 {
1534  ABTI_global *p_global = gp_ABTI_global;
1535  /* Remove this xstream from the global ES list */
1536  ABTI_spinlock_acquire(&p_global->xstream_list_lock);
1537  if (!p_xstream->p_prev) {
1538  ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
1539  p_global->p_xstream_head = p_xstream->p_next;
1540  } else {
1541  p_xstream->p_prev->p_next = p_xstream->p_next;
1542  }
1543  if (p_xstream->p_next) {
1544  p_xstream->p_next->p_prev = p_xstream->p_prev;
1545  }
1546  p_global->num_xstreams--;
1547  ABTI_spinlock_release(&p_global->xstream_list_lock);
1548 }
struct ABT_unit_opaque * ABT_unit
Definition: abt.h:337
static void xstream_return_rank(ABTI_xstream *p_xstream)
Definition: stream.c:1532
int ABT_xstream_self(ABT_xstream *xstream) ABT_API_PUBLIC
Return the ES handle associated with the caller work unit.
Definition: stream.c:341
ABT_sched_predef
Definition: abt.h:143
struct ABT_xstream_opaque * ABT_xstream
Definition: abt.h:313
struct ABT_sched_opaque * ABT_sched
Definition: abt.h:319
#define ABT_XSTREAM_NULL
Definition: abt.h:409
#define ABTU_unreachable()
Definition: abtu.h:25
static void xstream_init_main_sched(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:1345
int ABT_xstream_get_num(int *num_xstreams) ABT_API_PUBLIC
Return the number of current existing ESs.
Definition: stream.c:632
static void xstream_schedule_task(ABTI_xstream *p_local_xstream, ABTI_thread *p_task)
Definition: stream.c:1260
int ABT_xstream_set_rank(ABT_xstream xstream, int rank) ABT_API_PUBLIC
Set the rank for target ES.
Definition: stream.c:382
struct ABT_thread_opaque * ABT_task
Definition: abt.h:353
#define ABT_ERR_INV_THREAD
Definition: abt.h:80
ABT_xstream_state
Definition: abt.h:120
int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream) ABT_API_PUBLIC
Create a new ES and return its handle through newxstream.
Definition: stream.c:46
int ABT_xstream_self_rank(int *rank) ABT_API_PUBLIC
Return the rank of ES associated with the caller work unit.
Definition: stream.c:363
int ABT_xstream_get_rank(ABT_xstream xstream, int *rank) ABT_API_PUBLIC
Return the rank of ES.
Definition: stream.c:405
int ABT_bool
Definition: abt.h:373
static ABT_bool xstream_set_new_rank(ABTI_xstream *p_newxstream, int rank)
Definition: stream.c:1436
int ABT_xstream_cancel(ABT_xstream xstream) ABT_API_PUBLIC
Request the cancellation of the target ES.
Definition: stream.c:309
int ABT_xstream_get_main_sched(ABT_xstream xstream, ABT_sched *sched) ABT_API_PUBLIC
Get the main scheduler of the target ES.
Definition: stream.c:544
int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *flag) ABT_API_PUBLIC
Check if the target ES is the primary ES.
Definition: stream.c:656
struct ABT_pool_opaque * ABT_pool
Definition: abt.h:329
#define ABT_ERR_THREAD
Definition: abt.h:101
int ABT_xstream_create_with_rank(ABT_sched sched, int rank, ABT_xstream *newxstream) ABT_API_PUBLIC
Create a new ES with a specific rank.
Definition: stream.c:128
#define HANDLE_WARNING(msg)
Definition: abti_error.h:30
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:142
#define ABT_FALSE
Definition: abt.h:285
int ABT_xstream_set_main_sched_basic(ABT_xstream xstream, ABT_sched_predef predef, int num_pools, ABT_pool *pools) ABT_API_PUBLIC
Set the main scheduler for xstream with a predefined scheduler.
Definition: stream.c:510
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:343
int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools, ABT_pool *pools) ABT_API_PUBLIC
Get the pools of the main scheduler of the target ES.
Definition: stream.c:566
int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABT_xstream *newxstream) ABT_API_PUBLIC
Create a new ES with a predefined scheduler and return its handle through newxstream.
Definition: stream.c:91
int ABT_xstream_join(ABT_xstream xstream) ABT_API_PUBLIC
Wait for xstream to terminate.
Definition: stream.c:261
int ABT_xstream_check_events(ABT_sched sched) ABT_API_PUBLIC
Check the events and process them.
Definition: stream.c:709
int ABT_xstream_set_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset) ABT_API_PUBLIC
Set the CPU affinity of the target ES.
Definition: stream.c:797
int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid) ABT_API_PUBLIC
Bind the target ES to a target CPU.
Definition: stream.c:734
ABTI_global * gp_ABTI_global
Definition: global.c:18
#define ABT_SUCCESS
Definition: abt.h:64
int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid) ABT_API_PUBLIC
Get the CPU binding for the target ES.
Definition: stream.c:761
int ABT_xstream_get_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset, int *num_cpus) ABT_API_PUBLIC
Get the CPU affinity for the target ES.
Definition: stream.c:831
static ABTU_ret_err int xstream_start(ABTI_xstream *p_xstream)
Definition: stream.c:1100
#define ABT_TRUE
Definition: abt.h:284
#define ABT_SCHED_NULL
Definition: abt.h:411
int ABT_xstream_exit(void) ABT_API_PUBLIC
Terminate the ES associated with the calling ULT.
Definition: stream.c:285
ABT_unit_type
Definition: abt.h:169
static ABTU_ret_err int xstream_migrate_thread(ABTI_local *p_local, ABTI_thread *p_thread)
Definition: stream.c:1306
static void xstream_schedule_ythread(ABTI_xstream **pp_local_xstream, ABTI_ythread *p_ythread)
Definition: stream.c:1143
int ABT_xstream_free(ABT_xstream *xstream) ABT_API_PUBLIC
Release the ES object associated with ES handle.
Definition: stream.c:217
#define ABT_ERR_XSTREAM_STATE
Definition: abt.h:95
static ABTU_ret_err int xstream_join(ABTI_local **pp_local, ABTI_xstream *p_xstream)
Definition: stream.c:1119
#define ABT_ERR_FEATURE_NA
Definition: abt.h:116
static void * xstream_launch_root_ythread(void *p_xstream)
Definition: stream.c:1022
int ABT_xstream_get_state(ABT_xstream xstream, ABT_xstream_state *state) ABT_API_PUBLIC
Return the state of xstream.
Definition: stream.c:587
#define ABT_ERR_INV_XSTREAM_RANK
Definition: abt.h:69
#define LOG_DEBUG(fmt,...)
Definition: abti_log.h:26
struct ABT_sched_config_opaque * ABT_sched_config
Definition: abt.h:321
#define ABT_ERR_XSTREAM
Definition: abt.h:94
int ABT_xstream_revive(ABT_xstream xstream) ABT_API_PUBLIC
Restart an ES that has been joined by ABT_xstream_join().
Definition: stream.c:173
int ABT_xstream_set_main_sched(ABT_xstream xstream, ABT_sched sched) ABT_API_PUBLIC
Set the main scheduler of the target ES.
Definition: stream.c:444
static ABTU_ret_err int xstream_create(ABTI_sched *p_sched, ABTI_xstream_type xstream_type, int rank, ABTI_xstream **pp_xstream)
Definition: stream.c:1047
#define ABT_ERR_INV_SCHED
Definition: abt.h:71
#define ABT_SCHED_CONFIG_NULL
Definition: abt.h:412
#define ABT_ERR_INV_XSTREAM
Definition: abt.h:68
static void ABTU_free(void *ptr)
Definition: abtu.h:135
int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool) ABT_API_PUBLIC
Execute a unit on the local ES.
Definition: stream.c:686
int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2, ABT_bool *result) ABT_API_PUBLIC
Compare two ES handles for equality.
Definition: stream.c:611
static ABTU_ret_err int xstream_update_main_sched(ABTI_xstream **pp_local_xstream, ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:1358
#define ABTU_ret_err
Definition: abtu.h:49