ARGOBOTS  66b1c39742507d8df30e8d28c54839b961a14814
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
stream.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 ABTU_ret_err static int xstream_create(ABTI_sched *p_sched,
9  ABTI_xstream_type xstream_type, int rank,
10  ABTI_xstream **pp_xstream);
11 ABTU_ret_err static int xstream_start(ABTI_xstream *p_xstream);
12 ABTU_ret_err static int xstream_join(ABTI_local **pp_local,
13  ABTI_xstream *p_xstream);
14 static ABT_bool xstream_set_new_rank(ABTI_xstream *p_newxstream, int rank);
15 static void xstream_return_rank(ABTI_xstream *p_xstream);
16 static inline void xstream_schedule_ythread(ABTI_xstream **pp_local_xstream,
17  ABTI_ythread *p_ythread);
18 static inline void xstream_schedule_task(ABTI_xstream *p_local_xstream,
19  ABTI_thread *p_task);
20 static void xstream_init_main_sched(ABTI_xstream *p_xstream,
21  ABTI_sched *p_sched);
22 ABTU_ret_err static int
23 xstream_update_main_sched(ABTI_xstream **pp_local_xstream,
24  ABTI_xstream *p_xstream, ABTI_sched *p_sched);
25 static void *xstream_launch_root_ythread(void *p_xstream);
26 #ifndef ABT_CONFIG_DISABLE_MIGRATION
28  ABTI_thread *p_thread);
29 #endif
30 
46 int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
47 {
48  int abt_errno;
49  ABTI_sched *p_sched;
50  ABTI_xstream *p_newxstream;
51 
52  if (sched == ABT_SCHED_NULL) {
53  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
54  ABT_SCHED_CONFIG_NULL, &p_sched);
55  ABTI_CHECK_ERROR(abt_errno);
56  } else {
57  p_sched = ABTI_sched_get_ptr(sched);
60  }
61 
62  abt_errno =
63  xstream_create(p_sched, ABTI_XSTREAM_TYPE_SECONDARY, -1, &p_newxstream);
64  ABTI_CHECK_ERROR(abt_errno);
65 
66  /* Start this ES */
67  abt_errno = xstream_start(p_newxstream);
68  ABTI_CHECK_ERROR(abt_errno);
69 
70  /* Return value */
71  *newxstream = ABTI_xstream_get_handle(p_newxstream);
72  return ABT_SUCCESS;
73 }
74 
91 int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools,
92  ABT_pool *pools, ABT_sched_config config,
93  ABT_xstream *newxstream)
94 {
95  int abt_errno;
96  ABTI_xstream *p_newxstream;
97 
98  ABTI_sched *p_sched;
99  abt_errno =
100  ABTI_sched_create_basic(predef, num_pools, pools, config, &p_sched);
101  ABTI_CHECK_ERROR(abt_errno);
102 
103  abt_errno =
104  xstream_create(p_sched, ABTI_XSTREAM_TYPE_SECONDARY, -1, &p_newxstream);
105  ABTI_CHECK_ERROR(abt_errno);
106 
107  /* Start this ES */
108  abt_errno = xstream_start(p_newxstream);
109  ABTI_CHECK_ERROR(abt_errno);
110 
111  *newxstream = ABTI_xstream_get_handle(p_newxstream);
112  return ABT_SUCCESS;
113 }
114 
129  ABT_xstream *newxstream)
130 {
131  int abt_errno;
132  ABTI_sched *p_sched;
133  ABTI_xstream *p_newxstream;
134 
136 
137  if (sched == ABT_SCHED_NULL) {
138  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
139  ABT_SCHED_CONFIG_NULL, &p_sched);
140  ABTI_CHECK_ERROR(abt_errno);
141  } else {
142  p_sched = ABTI_sched_get_ptr(sched);
145  }
146 
147  abt_errno = xstream_create(p_sched, ABTI_XSTREAM_TYPE_SECONDARY, rank,
148  &p_newxstream);
149  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
150  if (sched == ABT_SCHED_NULL)
152  ABT_FALSE);
153  ABTI_HANDLE_ERROR(abt_errno);
154  }
155 
156  /* Start this ES */
157  abt_errno = xstream_start(p_newxstream);
158  ABTI_CHECK_ERROR(abt_errno);
159 
160  /* Return value */
161  *newxstream = ABTI_xstream_get_handle(p_newxstream);
162  return ABT_SUCCESS;
163 }
164 
174 {
175  ABTI_local *p_local = ABTI_local_get_local();
176  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
177  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
178 
179  /* Revives the main scheduler thread. */
180  ABTI_sched *p_main_sched = p_xstream->p_main_sched;
181  ABTI_ythread *p_main_sched_ythread = p_main_sched->p_ythread;
183  &p_main_sched_ythread->thread.state) ==
186 
187  ABTD_atomic_relaxed_store_uint32(&p_main_sched->request, 0);
188  ABTI_tool_event_thread_join(p_local, &p_main_sched_ythread->thread,
190  ? ABTI_local_get_xstream(p_local)->p_thread
191  : NULL);
192 
193  ABTI_thread_revive(p_local, p_xstream->p_root_pool,
194  p_main_sched_ythread->thread.f_thread,
195  p_main_sched_ythread->thread.p_arg,
196  &p_main_sched_ythread->thread);
197 
199  ABTD_xstream_context_revive(&p_xstream->ctx);
200  return ABT_SUCCESS;
201 }
202 
218 {
219  ABTI_local *p_local = ABTI_local_get_local();
220  ABT_xstream h_xstream = *xstream;
221 
222  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(h_xstream);
223  if (p_xstream == NULL)
224  return ABT_SUCCESS;
225 
226  /* We first need to check whether p_local_xstream is NULL because this
227  * routine might be called by external threads. */
230  "The current xstream cannot be freed.");
231 
234  "The primary xstream cannot be freed explicitly.");
235 
236  /* Wait until xstream terminates */
237  int abt_errno = xstream_join(&p_local, p_xstream);
238  ABTI_CHECK_ERROR(abt_errno);
239 
240  /* Free the xstream object */
241  ABTI_xstream_free(p_local, p_xstream, ABT_FALSE);
242 
243  /* Return value */
244  *xstream = ABT_XSTREAM_NULL;
245  return ABT_SUCCESS;
246 }
247 
262 {
263  ABTI_local *p_local = ABTI_local_get_local();
264  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
265  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
266 
267  int abt_errno = xstream_join(&p_local, p_xstream);
268  ABTI_CHECK_ERROR(abt_errno);
269  return ABT_SUCCESS;
270 }
271 
286 {
287  ABTI_xstream *p_local_xstream;
288  ABTI_ythread *p_ythread;
289  ABTI_SETUP_LOCAL_YTHREAD_WITH_INIT_CHECK(&p_local_xstream, &p_ythread);
290 
291  /* Terminate the main scheduler. */
293  ->thread.request,
295  /* Terminate this ULT */
296  ABTI_ythread_exit(p_local_xstream, p_ythread);
298  return ABT_SUCCESS;
299 }
300 
310 {
311  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
312 
315  "The primary xstream cannot be canceled.");
316 
317  /* Terminate the main scheduler of the target xstream. */
319  .request,
321  return ABT_SUCCESS;
322 }
323 
342 {
343  *xstream = ABT_XSTREAM_NULL;
344 
345  ABTI_xstream *p_local_xstream;
347 
348  /* Return value */
349  *xstream = ABTI_xstream_get_handle(p_local_xstream);
350  return ABT_SUCCESS;
351 }
352 
363 int ABT_xstream_self_rank(int *rank)
364 {
365  ABTI_xstream *p_local_xstream;
367 
368  /* Return value */
369  *rank = (int)p_local_xstream->rank;
370  return ABT_SUCCESS;
371 }
372 
382 int ABT_xstream_set_rank(ABT_xstream xstream, int rank)
383 {
384  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
385  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
386 
387  p_xstream->rank = rank;
388 
389  /* Set the CPU affinity for the ES */
391  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
392  }
393  return ABT_SUCCESS;
394 }
395 
405 int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
406 {
407  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
408  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
409 
410  *rank = (int)p_xstream->rank;
411  return ABT_SUCCESS;
412 }
413 
445 {
446  int abt_errno;
447 
448  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
449  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
450 
451  ABTI_xstream *p_local_xstream;
452  ABTI_ythread *p_self;
453  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, &p_self);
454 
455  /* For now, if the target ES is running, we allow to change the main
456  * scheduler of the ES only when the caller is running on the same ES. */
457  /* TODO: a new state representing that the scheduler is changed is needed
458  * to avoid running xstreams while the scheduler is changed in this
459  * function. */
460  if (ABTD_atomic_acquire_load_int(&p_xstream->state) ==
462  if (p_self->thread.p_last_xstream != p_xstream) {
464  }
465  }
466 
467  /* TODO: permit to change the scheduler even when having work units in pools
468  */
469  if (p_xstream->p_main_sched) {
470  /* We only allow to change the main scheduler when the current main
471  * scheduler of p_xstream has no work unit in its associated pools. */
473  p_local_xstream),
474  p_xstream->p_main_sched) > 0) {
476  }
477  }
478 
479  ABTI_sched *p_sched;
480  if (sched == ABT_SCHED_NULL) {
481  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
482  ABT_SCHED_CONFIG_NULL, &p_sched);
483  ABTI_CHECK_ERROR(abt_errno);
484  } else {
485  p_sched = ABTI_sched_get_ptr(sched);
488  }
489 
490  abt_errno = xstream_update_main_sched(&p_local_xstream, p_xstream, p_sched);
491  ABTI_CHECK_ERROR(abt_errno);
492  return ABT_SUCCESS;
493 }
494 
511  ABT_sched_predef predef, int num_pools,
512  ABT_pool *pools)
513 {
514  int abt_errno;
515 
516  ABTI_xstream *p_local_xstream;
517  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
518 
519  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
520  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
521 
522  ABTI_sched *p_sched;
523  abt_errno = ABTI_sched_create_basic(predef, num_pools, pools,
524  ABT_SCHED_CONFIG_NULL, &p_sched);
525  ABTI_CHECK_ERROR(abt_errno);
526 
527  abt_errno = xstream_update_main_sched(&p_local_xstream, p_xstream, p_sched);
528  ABTI_CHECK_ERROR(abt_errno);
529  return ABT_SUCCESS;
530 }
531 
545 {
546  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
547  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
548 
549  *sched = ABTI_sched_get_handle(p_xstream->p_main_sched);
550  return ABT_SUCCESS;
551 }
552 
566 int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools,
567  ABT_pool *pools)
568 {
569  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
570  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
571 
572  ABTI_sched *p_sched = p_xstream->p_main_sched;
573  max_pools = p_sched->num_pools > max_pools ? max_pools : p_sched->num_pools;
574  memcpy(pools, p_sched->pools, sizeof(ABT_pool) * max_pools);
575  return ABT_SUCCESS;
576 }
577 
588 {
589  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
590  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
591 
593  return ABT_SUCCESS;
594 }
595 
612  ABT_bool *result)
613 {
614  ABTI_xstream *p_xstream1 = ABTI_xstream_get_ptr(xstream1);
615  ABTI_xstream *p_xstream2 = ABTI_xstream_get_ptr(xstream2);
616  *result = (p_xstream1 == p_xstream2) ? ABT_TRUE : ABT_FALSE;
617  return ABT_SUCCESS;
618 }
619 
632 int ABT_xstream_get_num(int *num_xstreams)
633 {
634  /* In case that Argobots has not been initialized, return an error code
635  * instead of making the call fail. */
637 
638  *num_xstreams = gp_ABTI_global->num_xstreams;
639  return ABT_SUCCESS;
640 }
641 
657 {
658  ABTI_xstream *p_xstream;
659 
660  p_xstream = ABTI_xstream_get_ptr(xstream);
661  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
662 
663  /* Return value */
664  *flag =
665  (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) ? ABT_TRUE : ABT_FALSE;
666  return ABT_SUCCESS;
667 }
668 
687 {
688  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
689 
690  ABTI_xstream *p_local_xstream;
691  ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, NULL);
692 
693  ABTI_xstream_run_unit(&p_local_xstream, unit, p_pool);
694  return ABT_SUCCESS;
695 }
696 
710 {
711  ABTI_xstream *p_local_xstream;
713 
714  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
715  ABTI_CHECK_NULL_SCHED_PTR(p_sched);
716 
717  ABTI_xstream_check_events(p_local_xstream, p_sched);
718  return ABT_SUCCESS;
719 }
720 
734 int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
735 {
736  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
737  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
738 
739  ABTD_affinity_cpuset cpuset;
740  cpuset.num_cpuids = 1;
741  cpuset.cpuids = &cpuid;
742  int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &cpuset);
743  /* Do not free cpuset since cpuids points to a user pointer. */
744  ABTI_CHECK_ERROR(abt_errno);
745  return ABT_SUCCESS;
746 }
747 
761 int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
762 {
763  int abt_errno;
764  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
765  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
766 
767  ABTD_affinity_cpuset cpuset;
768  cpuset.num_cpuids = 0;
769  cpuset.cpuids = NULL;
770  abt_errno = ABTD_affinity_cpuset_read(&p_xstream->ctx, &cpuset);
771  ABTI_CHECK_ERROR(abt_errno);
772 
773  if (cpuset.num_cpuids != 0) {
774  *cpuid = cpuset.cpuids[0];
775  } else {
776  abt_errno = ABT_ERR_FEATURE_NA;
777  }
779  ABTI_CHECK_ERROR(abt_errno);
780  return ABT_SUCCESS;
781 }
782 
797 int ABT_xstream_set_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset)
798 {
799  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
800  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
801 
802  ABTD_affinity_cpuset affinity;
803  affinity.num_cpuids = cpuset_size;
804  affinity.cpuids = cpuset;
805  int abt_errno = ABTD_affinity_cpuset_apply(&p_xstream->ctx, &affinity);
806  /* Do not free affinity since cpuids may not be freed. */
807  ABTI_CHECK_ERROR(abt_errno);
808  return ABT_SUCCESS;
809 }
810 
831 int ABT_xstream_get_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset,
832  int *num_cpus)
833 {
834  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
835  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
836 
837  ABTD_affinity_cpuset affinity;
838  int abt_errno = ABTD_affinity_cpuset_read(&p_xstream->ctx, &affinity);
839  ABTI_CHECK_ERROR(abt_errno);
840 
841  int i, n;
842  n = affinity.num_cpuids > cpuset_size ? cpuset_size : affinity.num_cpuids;
843  *num_cpus = n;
844  for (i = 0; i < n; i++) {
845  cpuset[i] = affinity.cpuids[i];
846  }
847  ABTD_affinity_cpuset_destroy(&affinity);
848  return abt_errno;
849 }
850 
851 /*****************************************************************************/
852 /* Private APIs */
853 /*****************************************************************************/
854 
856 {
857  int abt_errno;
858  ABTI_xstream *p_newxstream;
859  ABTI_sched *p_sched;
860 
861  /* For the primary ES, a default scheduler is created. */
862  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
863  ABT_SCHED_CONFIG_NULL, &p_sched);
864  ABTI_CHECK_ERROR(abt_errno);
865 
866  abt_errno =
867  xstream_create(p_sched, ABTI_XSTREAM_TYPE_PRIMARY, -1, &p_newxstream);
868  ABTI_CHECK_ERROR(abt_errno);
869 
870  *pp_xstream = p_newxstream;
871  return ABT_SUCCESS;
872 }
873 
874 /* This routine starts the primary ES. It should be called in ABT_init. */
875 void ABTI_xstream_start_primary(ABTI_xstream **pp_local_xstream,
876  ABTI_xstream *p_xstream,
877  ABTI_ythread *p_ythread)
878 {
879  /* p_ythread must be the main thread. */
881  /* The ES's state must be running here. */
884 
885  LOG_DEBUG("[E%d] start\n", p_xstream->rank);
886 
887  ABTD_xstream_context_set_self(&p_xstream->ctx);
888 
889  /* Set the CPU affinity for the ES */
891  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
892  }
893 
894  /* Context switch to the root thread. */
895  p_xstream->p_root_ythread->thread.p_last_xstream = p_xstream;
896  ABTD_ythread_context_switch(&p_ythread->ctx,
897  &p_xstream->p_root_ythread->ctx);
898  /* Come back to the main thread. Now this thread is executed on top of the
899  * main scheduler, which is running on the root thread. */
900  (*pp_local_xstream)->p_thread = &p_ythread->thread;
901 }
902 
903 void ABTI_xstream_run_unit(ABTI_xstream **pp_local_xstream, ABT_unit unit,
904  ABTI_pool *p_pool)
905 {
906  ABT_unit_type type = p_pool->u_get_type(unit);
907 
908  if (type == ABT_UNIT_TYPE_THREAD) {
909  ABT_thread thread = p_pool->u_get_thread(unit);
910  ABTI_ythread *p_ythread = ABTI_ythread_get_ptr(thread);
911  /* Switch the context */
912  xstream_schedule_ythread(pp_local_xstream, p_ythread);
913  } else {
915  ABT_task task = p_pool->u_get_task(unit);
916  ABTI_thread *p_task = ABTI_thread_get_ptr(task);
917  /* Execute the task */
918  xstream_schedule_task(*pp_local_xstream, p_task);
919  }
920 }
921 
923 {
925 
926  uint32_t request = ABTD_atomic_acquire_load_uint32(
927  &p_xstream->p_main_sched->p_ythread->thread.request);
928  if (request & ABTI_THREAD_REQ_JOIN) {
929  ABTI_sched_finish(p_sched);
930  }
931 
932  if ((request & ABTI_THREAD_REQ_TERMINATE) ||
933  (request & ABTI_THREAD_REQ_CANCEL)) {
934  ABTI_sched_exit(p_sched);
935  }
936 }
937 
938 void ABTI_xstream_free(ABTI_local *p_local, ABTI_xstream *p_xstream,
939  ABT_bool force_free)
940 {
941  LOG_DEBUG("[E%d] freed\n", p_xstream->rank);
942 
943  /* Clean up memory pool. */
944  ABTI_mem_finalize_local(p_xstream);
945  /* Return rank for reuse. rank must be returned prior to other free
946  * functions so that other xstreams cannot refer to this xstream. */
947  xstream_return_rank(p_xstream);
948 
949  /* Free the scheduler */
950  ABTI_sched *p_cursched = p_xstream->p_main_sched;
951  if (p_cursched != NULL) {
952  /* Join a scheduler thread. */
953  ABTI_tool_event_thread_join(p_local, &p_cursched->p_ythread->thread,
955  ? ABTI_local_get_xstream(p_local)
956  ->p_thread
957  : NULL);
958  ABTI_sched_discard_and_free(p_local, p_cursched, force_free);
959  /* The main scheduler thread is also freed. */
960  }
961 
962  /* Free the root thread and pool. */
963  ABTI_ythread_free_root(p_local, p_xstream->p_root_ythread);
964  ABTI_pool_free(p_xstream->p_root_pool);
965 
966  /* Free the context if a given xstream is secondary. */
967  if (p_xstream->type == ABTI_XSTREAM_TYPE_SECONDARY) {
968  ABTD_xstream_context_free(&p_xstream->ctx);
969  }
970 
971  ABTU_free(p_xstream);
972 }
973 
974 void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os, int indent,
975  ABT_bool print_sub)
976 {
977  if (p_xstream == NULL) {
978  fprintf(p_os, "%*s== NULL ES ==\n", indent, "");
979  } else {
980  char *type, *state;
981  switch (p_xstream->type) {
983  type = "PRIMARY";
984  break;
986  type = "SECONDARY";
987  break;
988  default:
989  type = "UNKNOWN";
990  break;
991  }
992  switch (ABTD_atomic_acquire_load_int(&p_xstream->state)) {
994  state = "RUNNING";
995  break;
997  state = "TERMINATED";
998  break;
999  default:
1000  state = "UNKNOWN";
1001  break;
1002  }
1003 
1004  fprintf(p_os,
1005  "%*s== ES (%p) ==\n"
1006  "%*srank : %d\n"
1007  "%*stype : %s\n"
1008  "%*sstate : %s\n"
1009  "%*smain_sched: %p\n",
1010  indent, "", (void *)p_xstream, indent, "", p_xstream->rank,
1011  indent, "", type, indent, "", state, indent, "",
1012  (void *)p_xstream->p_main_sched);
1013 
1014  if (print_sub == ABT_TRUE) {
1015  ABTI_sched_print(p_xstream->p_main_sched, p_os,
1016  indent + ABTI_INDENT, ABT_TRUE);
1017  }
1018  }
1019  fflush(p_os);
1020 }
1021 
1022 static void *xstream_launch_root_ythread(void *p_xstream)
1023 {
1024  ABTI_xstream *p_local_xstream = (ABTI_xstream *)p_xstream;
1025 
1026  /* Initialization of the local variables */
1027  ABTI_local_set_xstream(p_local_xstream);
1028 
1029  LOG_DEBUG("[E%d] start\n", p_local_xstream->rank);
1030 
1031  /* Set the root thread as the current thread */
1032  ABTI_ythread *p_root_ythread = p_local_xstream->p_root_ythread;
1033  p_local_xstream->p_thread = &p_local_xstream->p_root_ythread->thread;
1034  p_root_ythread->thread.f_thread(p_root_ythread->thread.p_arg);
1035 
1036  LOG_DEBUG("[E%d] end\n", p_local_xstream->rank);
1037 
1038  /* Reset the current ES and its local info. */
1039  ABTI_local_set_xstream(NULL);
1040  return NULL;
1041 }
1042 
1043 /*****************************************************************************/
1044 /* Internal static functions */
1045 /*****************************************************************************/
1046 
1048  ABTI_xstream_type xstream_type, int rank,
1049  ABTI_xstream **pp_xstream)
1050 {
1051  int abt_errno;
1052  ABTI_xstream *p_newxstream;
1053 
1054  abt_errno = ABTU_malloc(sizeof(ABTI_xstream), (void **)&p_newxstream);
1055  ABTI_CHECK_ERROR(abt_errno);
1056 
1057  p_newxstream->p_prev = NULL;
1058  p_newxstream->p_next = NULL;
1059 
1060  if (xstream_set_new_rank(p_newxstream, rank) == ABT_FALSE) {
1061  ABTU_free(p_newxstream);
1062  return ABT_ERR_INV_XSTREAM_RANK;
1063  }
1064 
1065  p_newxstream->type = xstream_type;
1066  ABTD_atomic_relaxed_store_int(&p_newxstream->state,
1068  p_newxstream->p_main_sched = NULL;
1069  p_newxstream->p_thread = NULL;
1070  ABTI_mem_init_local(p_newxstream);
1071 
1072  /* Set the main scheduler */
1073  xstream_init_main_sched(p_newxstream, p_sched);
1074 
1075  /* Create the root thread. */
1076  abt_errno =
1078  p_newxstream, &p_newxstream->p_root_ythread);
1079  ABTI_CHECK_ERROR(abt_errno);
1080 
1081  /* Create the root pool. */
1083  ABT_FALSE, &p_newxstream->p_root_pool);
1084  ABTI_CHECK_ERROR(abt_errno);
1085 
1086  /* Create the main scheduler thread. */
1087  abt_errno =
1089  p_newxstream,
1090  p_newxstream->p_main_sched);
1091  ABTI_CHECK_ERROR(abt_errno);
1092 
1093  LOG_DEBUG("[E%d] created\n", p_newxstream->rank);
1094 
1095  /* Return value */
1096  *pp_xstream = p_newxstream;
1097  return ABT_SUCCESS;
1098 }
1099 
1101 {
1102  /* The ES's state must be RUNNING */
1106  /* Start the main scheduler on a different ES */
1107  int abt_errno =
1109  (void *)p_xstream, &p_xstream->ctx);
1110  ABTI_CHECK_ERROR(abt_errno);
1111 
1112  /* Set the CPU affinity for the ES */
1114  ABTD_affinity_cpuset_apply_default(&p_xstream->ctx, p_xstream->rank);
1115  }
1116  return ABT_SUCCESS;
1117 }
1118 
1119 ABTU_ret_err static int xstream_join(ABTI_local **pp_local,
1120  ABTI_xstream *p_xstream)
1121 {
1122  /* The primary ES cannot be joined. */
1125  /* The main scheduler cannot join itself. */
1127  &p_xstream->p_main_sched->p_ythread->thread !=
1128  ABTI_local_get_xstream(*pp_local)->p_thread,
1130 
1131  /* Wait until the target ES terminates */
1132  ABTI_sched_finish(p_xstream->p_main_sched);
1133  ABTI_thread_join(pp_local, &p_xstream->p_main_sched->p_ythread->thread);
1134 
1135  /* Normal join request */
1136  ABTD_xstream_context_join(&p_xstream->ctx);
1137 
1140  return ABT_SUCCESS;
1141 }
1142 
1143 static inline void xstream_schedule_ythread(ABTI_xstream **pp_local_xstream,
1144  ABTI_ythread *p_ythread)
1145 {
1146  ABTI_xstream *p_local_xstream = *pp_local_xstream;
1147 
1148 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1151  LOG_DEBUG("[U%" PRIu64 ":E%d] canceled\n",
1152  ABTI_thread_get_id(&p_ythread->thread),
1153  p_local_xstream->rank);
1154  ABTD_ythread_cancel(p_local_xstream, p_ythread);
1156  &p_ythread->thread);
1157  return;
1158  }
1159 #endif
1160 
1161 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1164  int abt_errno =
1166  &p_ythread->thread);
1167  if (!ABTI_IS_ERROR_CHECK_ENABLED || abt_errno == ABT_SUCCESS) {
1168  /* Migration succeeded, so we do not need to schedule p_ythread. */
1169  return;
1170  }
1171  }
1172 #endif
1173 
1174  /* Change the last ES */
1175  p_ythread->thread.p_last_xstream = p_local_xstream;
1176 
1177  /* Change the ULT state */
1180 
1181  /* Switch the context */
1182  LOG_DEBUG("[U%" PRIu64 ":E%d] start running\n",
1183  ABTI_thread_get_id(&p_ythread->thread), p_local_xstream->rank);
1184 
1185  /* Since the argument is pp_local_xstream, p_local_xstream->p_thread must be
1186  * yieldable. */
1187  ABTI_ythread *p_self = ABTI_thread_get_ythread(p_local_xstream->p_thread);
1188  p_ythread = ABTI_ythread_context_switch_to_child(pp_local_xstream, p_self,
1189  p_ythread);
1190  /* The previous ULT (p_ythread) may not be the same as one to which the
1191  * context has been switched. */
1192  /* The scheduler continues from here. */
1193  p_local_xstream = *pp_local_xstream;
1194 
1195  LOG_DEBUG("[U%" PRIu64 ":E%d] stopped\n",
1196  ABTI_thread_get_id(&p_ythread->thread), p_local_xstream->rank);
1197 
1198  /* Request handling. */
1199  /* We do not need to acquire-load request since all critical requests
1200  * (BLOCK, ORPHAN, STOP, and NOPUSH) are written by p_ythread. CANCEL might
1201  * be delayed. */
1202  uint32_t request =
1204  if (request & ABTI_THREAD_REQ_TERMINATE) {
1205  /* The ULT has completed its execution or it called the exit request. */
1206  LOG_DEBUG("[U%" PRIu64 ":E%d] finished\n",
1207  ABTI_thread_get_id(&p_ythread->thread),
1208  p_local_xstream->rank);
1210  &p_ythread->thread);
1211 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1212  } else if (request & ABTI_THREAD_REQ_CANCEL) {
1213  LOG_DEBUG("[U%" PRIu64 ":E%d] canceled\n",
1214  ABTI_thread_get_id(&p_ythread->thread),
1215  p_local_xstream->rank);
1216  ABTD_ythread_cancel(p_local_xstream, p_ythread);
1218  &p_ythread->thread);
1219 #endif
1220  } else if (!(request & ABTI_THREAD_REQ_NON_YIELD)) {
1221  /* The ULT did not finish its execution.
1222  * Change the state of current running ULT and
1223  * add it to the pool again. */
1224  ABTI_pool_add_thread(&p_ythread->thread);
1225  } else if (request & ABTI_THREAD_REQ_BLOCK) {
1226  LOG_DEBUG("[U%" PRIu64 ":E%d] check blocked\n",
1227  ABTI_thread_get_id(&p_ythread->thread),
1228  p_local_xstream->rank);
1229  ABTI_thread_unset_request(&p_ythread->thread, ABTI_THREAD_REQ_BLOCK);
1230 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1231  } else if (request & ABTI_THREAD_REQ_MIGRATE) {
1232  /* This is the case when the ULT requests migration of itself. */
1233  int abt_errno =
1235  &p_ythread->thread);
1236  /* Migration is optional, so it is okay if it fails. */
1237  (void)abt_errno;
1238 #endif
1239  } else if (request & ABTI_THREAD_REQ_ORPHAN) {
1240  /* The ULT is not pushed back to the pool and is disconnected from any
1241  * pool. */
1242  LOG_DEBUG("[U%" PRIu64 ":E%d] orphaned\n",
1243  ABTI_thread_get_id(&p_ythread->thread),
1244  p_local_xstream->rank);
1245  ABTI_thread_unset_request(&p_ythread->thread, ABTI_THREAD_REQ_ORPHAN);
1246  p_ythread->thread.p_pool->u_free(&p_ythread->thread.unit);
1247  p_ythread->thread.p_pool = NULL;
1248  } else if (request & ABTI_THREAD_REQ_NOPUSH) {
1249  /* The ULT is not pushed back to the pool */
1250  LOG_DEBUG("[U%" PRIu64 ":E%d] not pushed\n",
1251  ABTI_thread_get_id(&p_ythread->thread),
1252  p_local_xstream->rank);
1253  ABTI_thread_unset_request(&p_ythread->thread, ABTI_THREAD_REQ_NOPUSH);
1254  } else {
1255  ABTI_ASSERT(0);
1256  ABTU_unreachable();
1257  }
1258 }
1259 
1260 static inline void xstream_schedule_task(ABTI_xstream *p_local_xstream,
1261  ABTI_thread *p_task)
1262 {
1263 #ifndef ABT_CONFIG_DISABLE_TASK_CANCEL
1266  ABTI_tool_event_thread_cancel(p_local_xstream, p_task);
1268  p_task);
1269  return;
1270  }
1271 #endif
1272 
1273  /* Change the task state */
1275 
1276  /* Set the associated ES */
1277  p_task->p_last_xstream = p_local_xstream;
1278 
1279  /* Execute the task function */
1280  LOG_DEBUG("[T%" PRIu64 ":E%d] running\n", ABTI_thread_get_id(p_task),
1281  p_local_xstream->rank);
1282 
1283  ABTI_thread *p_sched_thread = p_local_xstream->p_thread;
1284  p_local_xstream->p_thread = p_task;
1285  p_task->p_parent = p_sched_thread;
1286 
1287  /* Execute the task function */
1288  ABTI_tool_event_thread_run(p_local_xstream, p_task, p_sched_thread,
1289  p_sched_thread);
1290  LOG_DEBUG("[T%" PRIu64 ":E%d] running\n", ABTI_thread_get_id(p_task),
1291  p_local_xstream->rank);
1292  p_task->f_thread(p_task->p_arg);
1293  ABTI_tool_event_thread_finish(p_local_xstream, p_task, p_sched_thread);
1294  LOG_DEBUG("[T%" PRIu64 ":E%d] stopped\n", ABTI_thread_get_id(p_task),
1295  p_local_xstream->rank);
1296 
1297  /* Set the current running scheduler's thread */
1298  p_local_xstream->p_thread = p_sched_thread;
1299 
1300  /* Terminate the tasklet */
1302  p_task);
1303 }
1304 
1305 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1307  ABTI_thread *p_thread)
1308 {
1309  int abt_errno;
1310  ABTI_pool *p_pool;
1311 
1312  ABTI_thread_mig_data *p_mig_data;
1313  abt_errno = ABTI_thread_get_mig_data(p_local, p_thread, &p_mig_data);
1314  ABTI_CHECK_ERROR(abt_errno);
1315 
1316  /* callback function */
1317  if (p_mig_data->f_migration_cb) {
1318  ABTI_ythread *p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
1319  if (p_ythread) {
1320  ABT_thread thread = ABTI_ythread_get_handle(p_ythread);
1321  p_mig_data->f_migration_cb(thread, p_mig_data->p_migration_cb_arg);
1322  }
1323  }
1324 
1325  /* If request is set, p_migration_pool has a valid pool pointer. */
1328 
1329  /* Extracting argument in migration request. */
1330  p_pool = ABTD_atomic_relaxed_load_ptr(&p_mig_data->p_migration_pool);
1332 
1333  /* Change the associated pool */
1334  p_thread->p_pool = p_pool;
1335 
1336  /* Add the unit to the scheduler's pool */
1337  ABTI_pool_push(p_pool, p_thread->unit);
1338 
1340 
1341  return ABT_SUCCESS;
1342 }
1343 #endif
1344 
1345 static void xstream_init_main_sched(ABTI_xstream *p_xstream,
1346  ABTI_sched *p_sched)
1347 {
1348  ABTI_ASSERT(p_xstream->p_main_sched == NULL);
1349  /* The main scheduler will to be a ULT, not a tasklet */
1350  p_sched->type = ABT_SCHED_TYPE_ULT;
1351  /* Set the scheduler as a main scheduler */
1352  p_sched->used = ABTI_SCHED_MAIN;
1353  /* Set the scheduler */
1354  p_xstream->p_main_sched = p_sched;
1355 }
1356 
1357 ABTU_ret_err static int
1359  ABTI_xstream *p_xstream, ABTI_sched *p_sched)
1360 {
1361  ABTI_ythread *p_ythread = NULL;
1362  ABTI_sched *p_main_sched;
1363  ABTI_pool *p_tar_pool = NULL;
1364  int p;
1365 
1366  /* The main scheduler will to be a ULT, not a tasklet */
1367  p_sched->type = ABT_SCHED_TYPE_ULT;
1368 
1369  /* Set the scheduler as a main scheduler */
1370  p_sched->used = ABTI_SCHED_MAIN;
1371 
1372  p_main_sched = p_xstream->p_main_sched;
1373  if (p_main_sched == NULL) {
1374  /* Set the scheduler */
1375  p_xstream->p_main_sched = p_sched;
1376  return ABT_SUCCESS;
1377  }
1378 
1379  /* If the ES has a main scheduler, we have to free it */
1380  ABTI_CHECK_YIELDABLE((*pp_local_xstream)->p_thread, &p_ythread,
1382  p_tar_pool = ABTI_pool_get_ptr(p_sched->pools[0]);
1383 
1384  /* If the caller ULT is associated with a pool of the current main
1385  * scheduler, it needs to be associated to a pool of new scheduler. */
1386  for (p = 0; p < p_main_sched->num_pools; p++) {
1387  if (p_ythread->thread.p_pool ==
1388  ABTI_pool_get_ptr(p_main_sched->pools[p])) {
1389  /* Associate the work unit to the first pool of new scheduler */
1390  p_ythread->thread.p_pool->u_free(&p_ythread->thread.unit);
1391  ABT_thread h_thread = ABTI_ythread_get_handle(p_ythread);
1392  p_ythread->thread.unit = p_tar_pool->u_create_from_thread(h_thread);
1393  p_ythread->thread.p_pool = p_tar_pool;
1394  break;
1395  }
1396  }
1397  if (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) {
1399  ABT_ERR_THREAD);
1400 
1401  /* Since the primary ES does not finish its execution until ABT_finalize
1402  * is called, its main scheduler needs to be automatically freed when
1403  * it is freed in ABT_finalize. */
1404  p_sched->automatic = ABT_TRUE;
1405  }
1406 
1407  /* Finish the current main scheduler */
1409 
1410  /* If the ES is secondary, we should take the associated ULT of the
1411  * current main scheduler and keep it in the new scheduler. */
1412  p_sched->p_ythread = p_main_sched->p_ythread;
1413  /* The current ULT is pushed to the new scheduler's pool so that when
1414  * the new scheduler starts (see below), it can be scheduled by the new
1415  * scheduler. When the current ULT resumes its execution, it will free
1416  * the current main scheduler (see below). */
1417  ABTI_pool_push(p_tar_pool, p_ythread->thread.unit);
1418 
1419  /* Set the scheduler */
1420  p_xstream->p_main_sched = p_sched;
1421 
1422  /* Switch to the current main scheduler */
1424  ABTI_ythread_context_switch_to_parent(pp_local_xstream, p_ythread,
1426 
1427  /* Now, we free the current main scheduler. p_main_sched->p_ythread must
1428  * be NULL to avoid freeing it in ABTI_sched_discard_and_free(). */
1429  p_main_sched->p_ythread = NULL;
1431  p_main_sched, ABT_FALSE);
1432  return ABT_SUCCESS;
1433 }
1434 
1435 /* Set a new rank to ES */
1436 static ABT_bool xstream_set_new_rank(ABTI_xstream *p_newxstream, int rank)
1437 {
1438  ABTI_global *p_global = gp_ABTI_global;
1439 
1441 
1442  ABTI_xstream *p_prev_xstream = p_global->p_xstream_head;
1443  ABTI_xstream *p_xstream = p_prev_xstream;
1444  if (rank == -1) {
1445  /* Find an unused rank from 0. */
1446  rank = 0;
1447  while (p_xstream) {
1448  if (p_xstream->rank == rank) {
1449  rank++;
1450  } else {
1451  /* Use this rank. */
1452  break;
1453  }
1454  p_prev_xstream = p_xstream;
1455  p_xstream = p_xstream->p_next;
1456  }
1457  } else {
1458  /* Check if a certain rank is available */
1459  while (p_xstream) {
1460  if (p_xstream->rank == rank) {
1462  return ABT_FALSE;
1463  } else if (p_xstream->rank > rank) {
1464  /* Use this p_xstream. */
1465  break;
1466  }
1467  p_prev_xstream = p_xstream;
1468  p_xstream = p_xstream->p_next;
1469  }
1470  }
1471  if (!p_xstream) {
1472  /* p_newxstream is appended to p_prev_xstream */
1473  if (p_prev_xstream) {
1474  p_prev_xstream->p_next = p_newxstream;
1475  p_newxstream->p_prev = p_prev_xstream;
1476  p_newxstream->p_next = NULL;
1477  } else {
1478  ABTI_ASSERT(p_global->p_xstream_head == NULL);
1479  p_newxstream->p_prev = NULL;
1480  p_newxstream->p_next = NULL;
1481  p_global->p_xstream_head = p_newxstream;
1482  }
1483  } else {
1484  /* p_newxstream is inserted in the middle.
1485  * (p_xstream->p_prev) -> p_new_xstream -> p_xstream */
1486  if (p_xstream->p_prev) {
1487  p_xstream->p_prev->p_next = p_newxstream;
1488  p_newxstream->p_prev = p_xstream->p_prev;
1489  } else {
1490  /* This p_xstream is the first element */
1491  ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
1492  p_global->p_xstream_head = p_newxstream;
1493  }
1494  p_xstream->p_prev = p_newxstream;
1495  p_newxstream->p_next = p_xstream;
1496  }
1497  p_global->num_xstreams++;
1498  if (rank >= p_global->max_xstreams) {
1499  static int max_xstreams_warning_once = 0;
1500  if (max_xstreams_warning_once == 0) {
1501  /* Because some Argobots functionalities depend on the runtime value
1502  * ABT_MAX_NUM_XSTREAMS (or gp_ABTI_global->max_xstreams), changing
1503  * this value at run-time can cause an error. For example, using
1504  * ABT_mutex created before updating max_xstreams causes an error
1505  * since ABTI_thread_htable's array size depends on
1506  * ABT_MAX_NUM_XSTREAMS. To fix this issue, please set a larger
1507  * number to ABT_MAX_NUM_XSTREAMS in advance. */
1508  char *warning_message;
1509  int abt_errno =
1510  ABTU_malloc(sizeof(char) * 1024, (void **)&warning_message);
1511  if (!ABTI_IS_ERROR_CHECK_ENABLED || abt_errno == ABT_SUCCESS) {
1512  snprintf(warning_message, 1024,
1513  "Warning: the number of execution streams exceeds "
1514  "ABT_MAX_NUM_XSTREAMS (=%d). This may cause an error.",
1515  p_global->max_xstreams);
1516  HANDLE_WARNING(warning_message);
1517  ABTU_free(warning_message);
1518  max_xstreams_warning_once = 1;
1519  }
1520  }
1521  /* Anyway. let's increase max_xstreams. */
1522  p_global->max_xstreams = rank + 1;
1523  }
1524 
1526 
1527  /* Set the rank */
1528  p_newxstream->rank = rank;
1529  return ABT_TRUE;
1530 }
1531 
1532 static void xstream_return_rank(ABTI_xstream *p_xstream)
1533 {
1534  ABTI_global *p_global = gp_ABTI_global;
1535  /* Remove this xstream from the global ES list */
1537  if (!p_xstream->p_prev) {
1538  ABTI_ASSERT(p_global->p_xstream_head == p_xstream);
1539  p_global->p_xstream_head = p_xstream->p_next;
1540  } else {
1541  p_xstream->p_prev->p_next = p_xstream->p_next;
1542  }
1543  if (p_xstream->p_next) {
1544  p_xstream->p_next->p_prev = p_xstream->p_prev;
1545  }
1546  p_global->num_xstreams--;
1548 }
static void ABTI_sched_discard_and_free(ABTI_local *p_local, ABTI_sched *p_sched, ABT_bool force_free)
Definition: abti_sched.h:43
static ABTI_ythread * ABTI_ythread_get_ptr(ABT_thread thread)
Definition: abti_ythread.h:11
ABTI_pool * p_pool
Definition: abti.h:324
static ABTI_ythread * ABTI_thread_get_ythread_or_null(ABTI_thread *p_thread)
Definition: abti_thread.h:59
struct ABT_unit_opaque * ABT_unit
Definition: abt.h:337
ABTD_atomic_uint32 request
Definition: abti.h:323
#define ABTI_CHECK_NULL_SCHED_PTR(p)
Definition: abti_error.h:184
void(* f_migration_cb)(ABT_thread, void *)
Definition: abti.h:341
static void xstream_return_rank(ABTI_xstream *p_xstream)
Definition: stream.c:1532
int ABT_xstream_self(ABT_xstream *xstream) ABT_API_PUBLIC
Return the ES handle associated with the caller work unit.
Definition: stream.c:341
static ABT_xstream ABTI_xstream_get_handle(ABTI_xstream *p_xstream)
Definition: abti_stream.h:26
ABT_unit_get_type_fn u_get_type
Definition: abti.h:292
ABT_sched_predef
Definition: abt.h:143
ABT_unit_create_from_thread_fn u_create_from_thread
Definition: abti.h:296
ABTI_ythread * p_ythread
Definition: abti.h:267
ABTD_atomic_int state
Definition: abti.h:241
struct ABT_xstream_opaque * ABT_xstream
Definition: abt.h:313
void ABTD_xstream_context_join(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:81
struct ABT_sched_opaque * ABT_sched
Definition: abt.h:319
ABT_pool * pools
Definition: abti.h:265
void ABTI_pool_free(ABTI_pool *p_pool)
Definition: pool.c:408
#define ABT_XSTREAM_NULL
Definition: abt.h:409
void ABTD_xstream_context_revive(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:97
static ABT_sched ABTI_sched_get_handle(ABTI_sched *p_sched)
Definition: abti_sched.h:26
void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os, int indent, ABT_bool print_sub)
Definition: stream.c:974
#define ABTU_unreachable()
Definition: abtu.h:25
struct ABTI_local ABTI_local
Definition: abti.h:101
ABTU_ret_err int ABTI_ythread_create_root(ABTI_local *p_local, ABTI_xstream *p_xstream, ABTI_ythread **pp_root_ythread)
Definition: thread.c:1406
void * p_migration_cb_arg
Definition: abti.h:342
int max_xstreams
Definition: abti.h:170
#define ABTI_SETUP_WITH_INIT_CHECK()
Definition: abti_error.h:59
static void ABTD_atomic_release_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:924
void(* f_thread)(void *)
Definition: abti.h:320
static void xstream_init_main_sched(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:1345
ABTI_thread_type type
Definition: abti.h:316
static void ABTI_thread_unset_request(ABTI_thread *p_thread, uint32_t req)
Definition: abti_thread.h:73
ABTU_ret_err int ABTD_affinity_cpuset_read(ABTD_xstream_context *p_ctx, ABTD_affinity_cpuset *p_cpuset)
int ABTD_affinity_cpuset_apply_default(ABTD_xstream_context *p_ctx, int rank)
int ABT_xstream_get_num(int *num_xstreams) ABT_API_PUBLIC
Return the number of current existing ESs.
Definition: stream.c:632
static void xstream_schedule_task(ABTI_xstream *p_local_xstream, ABTI_thread *p_task)
Definition: stream.c:1260
static ABTI_thread * ABTI_thread_get_ptr(ABT_thread thread)
Definition: abti_thread.h:9
int ABT_xstream_set_rank(ABT_xstream xstream, int rank) ABT_API_PUBLIC
Set the rank for target ES.
Definition: stream.c:382
struct ABT_thread_opaque * ABT_task
Definition: abt.h:353
#define ABT_ERR_INV_THREAD
Definition: abt.h:80
ABT_xstream_state
Definition: abt.h:120
int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream) ABT_API_PUBLIC
Create a new ES and return its handle through newxstream.
Definition: stream.c:46
ABTI_thread * p_thread
Definition: abti.h:251
size_t ABTI_sched_get_effective_size(ABTI_local *p_local, ABTI_sched *p_sched)
Definition: sched.c:604
void ABTI_xstream_check_events(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:922
#define ABTI_SETUP_LOCAL_YTHREAD_WITH_INIT_CHECK(pp_local_xstream, pp_ythread)
Definition: abti_error.h:115
static void ABTI_thread_set_request(ABTI_thread *p_thread, uint32_t req)
Definition: abti_thread.h:68
int ABT_xstream_self_rank(int *rank) ABT_API_PUBLIC
Return the rank of ES associated with the caller work unit.
Definition: stream.c:363
static void * ABTD_atomic_relaxed_load_ptr(const ABTD_atomic_ptr *ptr)
Definition: abtd_atomic.h:731
int ABT_xstream_get_rank(ABT_xstream xstream, int *rank) ABT_API_PUBLIC
Return the rank of ES.
Definition: stream.c:405
#define ABTI_THREAD_REQ_CANCEL
Definition: abti.h:43
int ABT_bool
Definition: abt.h:373
static ABT_bool xstream_set_new_rank(ABTI_xstream *p_newxstream, int rank)
Definition: stream.c:1436
void ABTI_sched_exit(ABTI_sched *p_sched)
Definition: sched.c:349
ABTI_xstream_type type
Definition: abti.h:240
int ABT_xstream_cancel(ABT_xstream xstream) ABT_API_PUBLIC
Request the cancellation of the target ES.
Definition: stream.c:309
void ABTI_info_check_print_all_thread_stacks(void)
Definition: info.c:535
int ABT_xstream_get_main_sched(ABT_xstream xstream, ABT_sched *sched) ABT_API_PUBLIC
Get the main scheduler of the target ES.
Definition: stream.c:544
int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *flag) ABT_API_PUBLIC
Check if the target ES is the primary ES.
Definition: stream.c:656
ABTI_ythread * p_root_ythread
Definition: abti.h:247
#define ABTI_CHECK_YIELDABLE(p_thread, pp_ythread, abt_errno)
Definition: abti_error.h:146
ABT_unit_get_task_fn u_get_task
Definition: abti.h:294
struct ABT_pool_opaque * ABT_pool
Definition: abt.h:329
ABT_unit_get_thread_fn u_get_thread
Definition: abti.h:293
#define ABTI_THREAD_REQ_BLOCK
Definition: abti.h:45
#define ABTI_THREAD_REQ_JOIN
Definition: abti.h:41
#define ABT_ERR_THREAD
Definition: abt.h:101
static uint32_t ABTD_atomic_fetch_or_uint32(ABTD_atomic_uint32 *ptr, uint32_t v)
Definition: abtd_atomic.h:538
ABTI_sched * p_main_sched
Definition: abti.h:242
void ABTI_xstream_start_primary(ABTI_xstream **pp_local_xstream, ABTI_xstream *p_xstream, ABTI_ythread *p_ythread)
Definition: stream.c:875
int ABT_xstream_create_with_rank(ABT_sched sched, int rank, ABT_xstream *newxstream) ABT_API_PUBLIC
Create a new ES with a specific rank.
Definition: stream.c:128
ABT_unit_id ABTI_thread_get_id(ABTI_thread *p_thread)
Definition: thread.c:1600
#define HANDLE_WARNING(msg)
Definition: abti_error.h:30
ABTI_pool * p_root_pool
Definition: abti.h:248
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:142
#define ABTI_CHECK_TRUE_MSG(cond, abt_errno, msg)
Definition: abti_error.h:158
static ABTI_xstream * ABTI_xstream_get_ptr(ABT_xstream xstream)
Definition: abti_stream.h:11
ABT_bool set_affinity
Definition: abti.h:178
void ABTI_sched_print(ABTI_sched *p_sched, FILE *p_os, int indent, ABT_bool print_sub)
Definition: sched.c:636
#define ABT_FALSE
Definition: abt.h:285
ABTI_xstream * p_next
Definition: abti.h:237
ABTI_spinlock xstream_list_lock
Definition: abti.h:174
void ABTD_xstream_context_free(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:66
ABTD_atomic_ptr p_migration_pool
Definition: abti.h:344
static void ABTI_xstream_terminate_thread(ABTI_local *p_local, ABTI_thread *p_thread)
Definition: abti_stream.h:48
#define ABTI_THREAD_TYPE_MAIN
Definition: abti.h:75
int ABT_xstream_set_main_sched_basic(ABT_xstream xstream, ABT_sched_predef predef, int num_pools, ABT_pool *pools) ABT_API_PUBLIC
Set the main scheduler for xstream with a predefined scheduler.
Definition: stream.c:510
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:343
ABTI_xstream * p_last_xstream
Definition: abti.h:318
void ABTI_thread_join(ABTI_local **pp_local, ABTI_thread *p_thread)
Definition: thread.c:1470
int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools, ABT_pool *pools) ABT_API_PUBLIC
Get the pools of the main scheduler of the target ES.
Definition: stream.c:566
#define ABTI_CHECK_TRUE(cond, abt_errno)
Definition: abti_error.h:137
int rank
Definition: abti.h:239
static uint32_t ABTD_atomic_acquire_load_uint32(const ABTD_atomic_uint32 *ptr)
Definition: abtd_atomic.h:797
int num_pools
Definition: abti.h:266
#define ABTI_SCHED_REQ_FINISH
Definition: abti.h:38
ABTD_atomic_int state
Definition: abti.h:322
#define ABTI_INDENT
Definition: abti.h:56
static void ABTI_pool_dec_num_migrations(ABTI_pool *p_pool)
Definition: abti_pool.h:60
void ABTI_sched_free(ABTI_local *p_local, ABTI_sched *p_sched, ABT_bool force_free)
Definition: sched.c:497
#define ABTI_tool_event_thread_join(p_local, p_thread, p_caller)
Definition: abti_tool.h:317
int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABT_xstream *newxstream) ABT_API_PUBLIC
Create a new ES with a predefined scheduler and return its handle through newxstream.
Definition: stream.c:91
ABTD_atomic_uint32 request
Definition: abti.h:264
static ABTI_pool * ABTI_pool_get_ptr(ABT_pool pool)
Definition: abti_pool.h:11
ABT_bool automatic
Definition: abti.h:261
static void ABTI_pool_add_thread(ABTI_thread *p_thread)
Definition: abti_pool.h:71
ABT_unit_free_fn u_free
Definition: abti.h:298
int ABT_xstream_join(ABT_xstream xstream) ABT_API_PUBLIC
Wait for xstream to terminate.
Definition: stream.c:261
ABT_sched_type type
Definition: abti.h:263
static ABTI_ythread * ABTI_ythread_context_switch_to_parent(ABTI_xstream **pp_local_xstream, ABTI_ythread *p_old, ABT_sync_event_type sync_event_type, void *p_sync)
Definition: abti_ythread.h:312
int ABT_xstream_check_events(ABT_sched sched) ABT_API_PUBLIC
Check the events and process them.
Definition: stream.c:709
int ABT_xstream_set_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset) ABT_API_PUBLIC
Set the CPU affinity of the target ES.
Definition: stream.c:797
void * p_arg
Definition: abti.h:321
static void ABTI_spinlock_release(ABTI_spinlock *p_lock)
Definition: abti_spinlock.h:31
int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid) ABT_API_PUBLIC
Bind the target ES to a target CPU.
Definition: stream.c:734
ABTI_global * gp_ABTI_global
Definition: global.c:18
ABTI_thread thread
Definition: abti.h:348
#define ABT_SUCCESS
Definition: abt.h:64
void ABTI_xstream_run_unit(ABTI_xstream **pp_local_xstream, ABT_unit unit, ABTI_pool *p_pool)
Definition: stream.c:903
ABTU_ret_err int ABTI_sched_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABTI_sched **pp_newsched)
Definition: sched.c:354
int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid) ABT_API_PUBLIC
Get the CPU binding for the target ES.
Definition: stream.c:761
int ABT_xstream_get_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset, int *num_cpus) ABT_API_PUBLIC
Get the CPU affinity for the target ES.
Definition: stream.c:831
ABTI_xstream * p_xstream_head
Definition: abti.h:172
void ABTI_mem_finalize_local(ABTI_xstream *p_local_xstream)
Definition: malloc.c:172
ABTU_ret_err int ABTD_xstream_context_create(void *(*f_xstream)(void *), void *p_arg, ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:48
static ABTU_ret_err int xstream_start(ABTI_xstream *p_xstream)
Definition: stream.c:1100
#define ABT_TRUE
Definition: abt.h:284
#define ABT_SCHED_NULL
Definition: abt.h:411
int ABT_xstream_exit(void) ABT_API_PUBLIC
Terminate the ES associated with the calling ULT.
Definition: stream.c:285
#define ABTI_THREAD_REQ_TERMINATE
Definition: abti.h:42
ABT_unit_type
Definition: abt.h:169
#define ABTI_SETUP_LOCAL_YTHREAD(pp_local_xstream, pp_ythread)
Definition: abti_error.h:83
void ABTI_mem_init_local(ABTI_xstream *p_local_xstream)
Definition: malloc.c:164
static ABTU_ret_err int xstream_migrate_thread(ABTI_local *p_local, ABTI_thread *p_thread)
Definition: stream.c:1306
ABTU_ret_err int ABTI_thread_get_mig_data(ABTI_local *p_local, ABTI_thread *p_thread, ABTI_thread_mig_data **pp_mig_data)
Definition: thread.c:1508
static void xstream_schedule_ythread(ABTI_xstream **pp_local_xstream, ABTI_ythread *p_ythread)
Definition: stream.c:1143
int ABT_xstream_free(ABT_xstream *xstream) ABT_API_PUBLIC
Release the ES object associated with ES handle.
Definition: stream.c:217
static void ABTD_atomic_relaxed_store_uint32(ABTD_atomic_uint32 *ptr, uint32_t val)
Definition: abtd_atomic.h:884
void ABTD_ythread_cancel(ABTI_xstream *p_local_xstream, ABTI_ythread *p_ythread)
Definition: abtd_ythread.c:113
#define ABTI_CHECK_NULL_XSTREAM_PTR(p)
Definition: abti_error.h:166
static void ABTI_spinlock_acquire(ABTI_spinlock *p_lock)
Definition: abti_spinlock.h:23
#define ABT_ERR_XSTREAM_STATE
Definition: abt.h:95
ABTU_ret_err int ABTI_xstream_create_primary(ABTI_xstream **pp_xstream)
Definition: stream.c:855
ABTD_xstream_context ctx
Definition: abti.h:244
static ABTU_ret_err int xstream_join(ABTI_local **pp_local, ABTI_xstream *p_xstream)
Definition: stream.c:1119
static void ABTI_pool_push(ABTI_pool *p_pool, ABT_unit unit)
Definition: abti_pool.h:65
ABTU_ret_err int ABTI_ythread_create_main_sched(ABTI_local *p_local, ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: thread.c:1432
ABTU_ret_err int ABTI_pool_create_basic(ABT_pool_kind kind, ABT_pool_access access, ABT_bool automatic, ABTI_pool **pp_newpool)
Definition: pool.c:382
static ABT_thread ABTI_ythread_get_handle(ABTI_ythread *p_thread)
#define ABTI_THREAD_REQ_NOPUSH
Definition: abti.h:47
#define ABT_ERR_FEATURE_NA
Definition: abt.h:116
ABTI_xstream_type
Definition: abti.h:61
ABTU_ret_err int ABTD_affinity_cpuset_apply(ABTD_xstream_context *p_ctx, const ABTD_affinity_cpuset *p_cpuset)
static int ABTD_atomic_relaxed_load_int(const ABTD_atomic_int *ptr)
Definition: abtd_atomic.h:661
size_t num_cpuids
Definition: abtd.h:36
void ABTI_ythread_free_root(ABTI_local *p_local, ABTI_ythread *p_ythread)
Definition: thread.c:1492
ABTD_ythread_context ctx
Definition: abti.h:349
static void * xstream_launch_root_ythread(void *p_xstream)
Definition: stream.c:1022
static void ABTI_local_set_xstream(ABTI_xstream *p_local_xstream)
Definition: abti_local.h:60
static ABTI_sched * ABTI_sched_get_ptr(ABT_sched sched)
Definition: abti_sched.h:11
int ABT_xstream_get_state(ABT_xstream xstream, ABT_xstream_state *state) ABT_API_PUBLIC
Return the state of xstream.
Definition: stream.c:587
#define ABTI_ASSERT(cond)
Definition: abti_error.h:12
#define ABT_ERR_INV_XSTREAM_RANK
Definition: abt.h:69
#define LOG_DEBUG(fmt,...)
Definition: abti_log.h:26
static void ABTI_sched_set_request(ABTI_sched *p_sched, uint32_t req)
Definition: abti_sched.h:57
static ABTI_local * ABTI_local_get_local_uninlined(void)
Definition: abti_local.h:51
static ABTI_ythread * ABTI_ythread_context_switch_to_child(ABTI_xstream **pp_local_xstream, ABTI_ythread *p_old, ABTI_ythread *p_new)
Definition: abti_ythread.h:323
ABTI_xstream * p_prev
Definition: abti.h:236
struct ABT_sched_config_opaque * ABT_sched_config
Definition: abt.h:321
#define ABTI_CHECK_ERROR(abt_errno)
Definition: abti_error.h:127
#define ABTI_tool_event_thread_cancel(p_local_xstream, p_thread)
Definition: abti_tool.h:356
#define ABT_ERR_XSTREAM
Definition: abt.h:94
int ABT_xstream_revive(ABT_xstream xstream) ABT_API_PUBLIC
Restart an ES that has been joined by ABT_xstream_join().
Definition: stream.c:173
static void ABTD_atomic_relaxed_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:865
int ABT_xstream_set_main_sched(ABT_xstream xstream, ABT_sched sched) ABT_API_PUBLIC
Set the main scheduler of the target ES.
Definition: stream.c:444
static ABTU_ret_err int xstream_create(ABTI_sched *p_sched, ABTI_xstream_type xstream_type, int rank, ABTI_xstream **pp_xstream)
Definition: stream.c:1047
#define ABT_ERR_INV_SCHED
Definition: abt.h:71
void ABTI_xstream_free(ABTI_local *p_local, ABTI_xstream *p_xstream, ABT_bool force_free)
Definition: stream.c:938
#define ABTI_tool_event_thread_finish(p_local_xstream, p_thread, p_parent)
Definition: abti_tool.h:348
static ABTI_xstream * ABTI_local_get_xstream(ABTI_local *p_local)
Definition: abti_local.h:86
void ABTI_sched_finish(ABTI_sched *p_sched)
Definition: sched.c:344
static ABTI_ythread * ABTI_thread_get_ythread(ABTI_thread *p_thread)
Definition: abti_thread.h:52
static void ABTD_ythread_context_switch(ABTD_ythread_context *p_old, ABTD_ythread_context *p_new)
Definition: abtd_ythread.h:73
ABTI_sched_used used
Definition: abti.h:260
ABT_unit unit
Definition: abti.h:317
static int ABTD_atomic_acquire_load_int(const ABTD_atomic_int *ptr)
Definition: abtd_atomic.h:763
#define ABTI_THREAD_REQ_NON_YIELD
Definition: abti.h:48
void ABTI_thread_revive(ABTI_local *p_local, ABTI_pool *p_pool, void(*thread_func)(void *), void *arg, ABTI_thread *p_thread)
Definition: thread.c:1325
#define ABT_SCHED_CONFIG_NULL
Definition: abt.h:412
#define ABTI_SETUP_LOCAL_XSTREAM_WITH_INIT_CHECK(pp_local_xstream)
Definition: abti_error.h:109
#define ABT_ERR_INV_XSTREAM
Definition: abt.h:68
#define ABTI_tool_event_thread_run(p_local_xstream, p_thread, p_prev,p_parent)
Definition: abti_tool.h:339
static ABTI_local * ABTI_xstream_get_local(ABTI_xstream *p_xstream)
Definition: abti_stream.h:67
static ABTI_local * ABTI_local_get_local(void)
Definition: abti_local.h:41
#define ABTI_IS_ERROR_CHECK_ENABLED
Definition: abti.h:20
static void ABTU_free(void *ptr)
Definition: abtu.h:135
int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool) ABT_API_PUBLIC
Execute a unit on the local ES.
Definition: stream.c:686
#define ABTI_THREAD_REQ_ORPHAN
Definition: abti.h:46
int num_xstreams
Definition: abti.h:171
void ABTD_xstream_context_set_self(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:107
int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2, ABT_bool *result) ABT_API_PUBLIC
Compare two ES handles for equality.
Definition: stream.c:611
static ABTU_ret_err int xstream_update_main_sched(ABTI_xstream **pp_local_xstream, ABTI_xstream *p_xstream, ABTI_sched *p_sched)
Definition: stream.c:1358
ABTI_thread * p_parent
Definition: abti.h:319
#define ABTU_ret_err
Definition: abtu.h:49
static ABTI_xstream * ABTI_local_get_xstream_or_null(ABTI_local *p_local)
Definition: abti_local.h:77
#define ABTI_THREAD_REQ_MIGRATE
Definition: abti.h:44
void ABTD_affinity_cpuset_destroy(ABTD_affinity_cpuset *p_cpuset)
#define ABTI_HANDLE_ERROR(n)
Definition: abti_error.h:121
ABTU_noreturn void ABTI_ythread_exit(ABTI_xstream *p_local_xstream, ABTI_ythread *p_ythread)
Definition: thread.c:1497