ARGOBOTS
stream.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 static void ABTI_xstream_set_new_rank(ABTI_xstream *);
9 static ABT_bool ABTI_xstream_take_rank(ABTI_xstream *, int);
10 static void ABTI_xstream_return_rank(ABTI_xstream *);
11 
27 int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
28 {
29  int abt_errno = ABT_SUCCESS;
30  ABTI_local *p_local = ABTI_local_get_local();
31  ABTI_sched *p_sched;
32  ABTI_xstream *p_newxstream;
33 
34  if (sched == ABT_SCHED_NULL) {
35  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
36  ABT_SCHED_CONFIG_NULL, &p_sched);
37  ABTI_CHECK_ERROR(abt_errno);
38  } else {
39  p_sched = ABTI_sched_get_ptr(sched);
40  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
42  }
43 
44  abt_errno = ABTI_xstream_create(&p_local, p_sched, &p_newxstream);
45  ABTI_CHECK_ERROR(abt_errno);
46 
47  /* Start this ES */
48  abt_errno = ABTI_xstream_start(p_local, p_newxstream);
49  ABTI_CHECK_ERROR(abt_errno);
50 
51  /* Return value */
52  *newxstream = ABTI_xstream_get_handle(p_newxstream);
53 
54 fn_exit:
55  return abt_errno;
56 
57 fn_fail:
58  *newxstream = ABT_XSTREAM_NULL;
59  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
60  goto fn_exit;
61 }
62 
63 int ABTI_xstream_create(ABTI_local **pp_local, ABTI_sched *p_sched,
64  ABTI_xstream **pp_xstream)
65 {
66  int abt_errno = ABT_SUCCESS;
67  ABTI_xstream *p_newxstream;
68 
69  p_newxstream = (ABTI_xstream *)ABTU_malloc(sizeof(ABTI_xstream));
70 
71  ABTI_xstream_set_new_rank(p_newxstream);
72 
73  p_newxstream->type = ABTI_XSTREAM_TYPE_SECONDARY;
74  ABTD_atomic_relaxed_store_int(&p_newxstream->state,
76  p_newxstream->scheds = NULL;
77  p_newxstream->num_scheds = 0;
78  p_newxstream->max_scheds = 0;
79  ABTD_atomic_relaxed_store_uint32(&p_newxstream->request, 0);
80  p_newxstream->p_req_arg = NULL;
81  p_newxstream->p_main_sched = NULL;
82 
83  /* Initialize the spinlock */
84  ABTI_spinlock_clear(&p_newxstream->sched_lock);
85 
86  /* Set the main scheduler */
87  abt_errno = ABTI_xstream_set_main_sched(pp_local, p_newxstream, p_sched);
88  ABTI_CHECK_ERROR(abt_errno);
89 
90  LOG_EVENT("[E%d] created\n", p_newxstream->rank);
91 
92  /* Return value */
93  *pp_xstream = p_newxstream;
94 
95 fn_exit:
96  return abt_errno;
97 
98 fn_fail:
99  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
100  goto fn_exit;
101 }
102 
103 int ABTI_xstream_create_primary(ABTI_local **pp_local,
104  ABTI_xstream **pp_xstream)
105 {
106  int abt_errno = ABT_SUCCESS;
107  ABTI_xstream *p_newxstream;
108  ABTI_sched *p_sched;
109 
110  /* For the primary ES, a default scheduler is created. */
111  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
112  ABT_SCHED_CONFIG_NULL, &p_sched);
113  ABTI_CHECK_ERROR(abt_errno);
114 
115  abt_errno = ABTI_xstream_create(pp_local, p_sched, &p_newxstream);
116  ABTI_CHECK_ERROR(abt_errno);
117 
118  p_newxstream->type = ABTI_XSTREAM_TYPE_PRIMARY;
119 
120  *pp_xstream = p_newxstream;
121 
122 fn_exit:
123  return abt_errno;
124 
125 fn_fail:
126  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
127  goto fn_exit;
128 }
129 
146 int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools,
147  ABT_pool *pools, ABT_sched_config config,
148  ABT_xstream *newxstream)
149 {
150  int abt_errno = ABT_SUCCESS;
151  ABTI_local *p_local = ABTI_local_get_local();
152  ABTI_xstream *p_newxstream;
153 
154  ABTI_sched *p_sched;
155  abt_errno =
156  ABTI_sched_create_basic(predef, num_pools, pools, config, &p_sched);
157  ABTI_CHECK_ERROR(abt_errno);
158 
159  abt_errno = ABTI_xstream_create(&p_local, p_sched, &p_newxstream);
160  ABTI_CHECK_ERROR(abt_errno);
161 
162  /* Start this ES */
163  abt_errno = ABTI_xstream_start(p_local, p_newxstream);
164  ABTI_CHECK_ERROR(abt_errno);
165 
166  *newxstream = ABTI_xstream_get_handle(p_newxstream);
167 
168 fn_exit:
169  return abt_errno;
170 
171 fn_fail:
172  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
173  goto fn_exit;
174 }
175 
190  ABT_xstream *newxstream)
191 {
192  int abt_errno = ABT_SUCCESS;
193  ABTI_local *p_local = ABTI_local_get_local();
194  ABTI_xstream *p_newxstream;
195  ABTI_sched *p_sched;
196 
197  ABTI_CHECK_TRUE(rank >= 0, ABT_ERR_INV_XSTREAM_RANK);
198 
199  p_newxstream = (ABTI_xstream *)ABTU_malloc(sizeof(ABTI_xstream));
200 
201  if (ABTI_xstream_take_rank(p_newxstream, rank) == ABT_FALSE) {
202  ABTU_free(p_newxstream);
203  abt_errno = ABT_ERR_INV_XSTREAM_RANK;
204  *newxstream = ABT_XSTREAM_NULL;
205  return abt_errno;
206  }
207 
208  if (sched == ABT_SCHED_NULL) {
209  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
210  ABT_SCHED_CONFIG_NULL, &p_sched);
211  ABTI_CHECK_ERROR(abt_errno);
212  } else {
213  p_sched = ABTI_sched_get_ptr(sched);
214  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
216  }
217 
218  p_newxstream->type = ABTI_XSTREAM_TYPE_SECONDARY;
219  ABTD_atomic_relaxed_store_int(&p_newxstream->state,
221  p_newxstream->scheds = NULL;
222  p_newxstream->num_scheds = 0;
223  p_newxstream->max_scheds = 0;
224  ABTD_atomic_relaxed_store_uint32(&p_newxstream->request, 0);
225  p_newxstream->p_req_arg = NULL;
226  p_newxstream->p_main_sched = NULL;
227 
228  /* Initialize the spinlock */
229  ABTI_spinlock_clear(&p_newxstream->sched_lock);
230 
231  /* Set the main scheduler */
232  abt_errno = ABTI_xstream_set_main_sched(&p_local, p_newxstream, p_sched);
233  ABTI_CHECK_ERROR(abt_errno);
234 
235  LOG_EVENT("[E%d] created\n", p_newxstream->rank);
236 
237  /* Start this ES */
238  abt_errno = ABTI_xstream_start(p_local, p_newxstream);
239  ABTI_CHECK_ERROR(abt_errno);
240 
241  /* Return value */
242  *newxstream = ABTI_xstream_get_handle(p_newxstream);
243 
244 fn_exit:
245  return abt_errno;
246 
247 fn_fail:
248  *newxstream = ABT_XSTREAM_NULL;
249  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
250  goto fn_exit;
251 }
252 
253 int ABTI_xstream_start(ABTI_local *p_local, ABTI_xstream *p_xstream)
254 {
255  int abt_errno = ABT_SUCCESS;
256 
257  /* The ES's state must be RUNNING */
258  ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
260 
261  /* Add the main scheduler to the stack of schedulers */
262  ABTI_xstream_push_sched(p_xstream, p_xstream->p_main_sched);
263 
264  if (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) {
265  LOG_EVENT("[E%d] start\n", p_xstream->rank);
266 
267  abt_errno = ABTD_xstream_context_set_self(&p_xstream->ctx);
268  ABTI_CHECK_ERROR_MSG(abt_errno, "ABTD_xstream_context_set_self");
269 
270  /* Create the main sched ULT */
271  ABTI_sched *p_sched = p_xstream->p_main_sched;
272  abt_errno = ABTI_thread_create_main_sched(p_local, p_xstream, p_sched);
273  ABTI_CHECK_ERROR(abt_errno);
274  p_sched->p_thread->p_last_xstream = p_xstream;
275 
276  } else {
277  /* Start the main scheduler on a different ES */
278  abt_errno =
279  ABTD_xstream_context_create(ABTI_xstream_launch_main_sched,
280  (void *)p_xstream, &p_xstream->ctx);
281  ABTI_CHECK_ERROR_MSG(abt_errno, "ABTD_xstream_context_create");
282  }
283 
284  /* Set the CPU affinity for the ES */
285  if (gp_ABTI_global->set_affinity == ABT_TRUE) {
286  ABTD_affinity_set(&p_xstream->ctx, p_xstream->rank);
287  }
288 
289 fn_exit:
290  return abt_errno;
291 
292 fn_fail:
293  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
294  goto fn_exit;
295 }
296 
306 {
307  int abt_errno = ABT_SUCCESS;
308  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
309  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
310 
311  ABTD_atomic_relaxed_store_int(&p_xstream->state, ABT_XSTREAM_STATE_RUNNING);
312  ABTD_atomic_relaxed_store_uint32(&p_xstream->request, 0);
313  p_xstream->p_req_arg = NULL;
314  abt_errno = ABTD_xstream_context_revive(&p_xstream->ctx);
315  ABTI_CHECK_ERROR(abt_errno);
316 
317 fn_exit:
318  return abt_errno;
319 
320 fn_fail:
321  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
322  goto fn_exit;
323 }
324 
325 /* This routine starts the primary ES. It should be called in ABT_init.
326  * [in] p_xstream the primary ES
327  * [in] p_thread the main ULT
328  */
329 int ABTI_xstream_start_primary(ABTI_local **pp_local, ABTI_xstream *p_xstream,
330  ABTI_thread *p_thread)
331 {
332  int abt_errno = ABT_SUCCESS;
333 
334  /* Add the main scheduler to the stack of schedulers */
335  ABTI_xstream_push_sched(p_xstream, p_xstream->p_main_sched);
336 
337  /* The ES's state must be running here. */
338  ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
340 
341  LOG_EVENT("[E%d] start\n", p_xstream->rank);
342 
343  abt_errno = ABTD_xstream_context_set_self(&p_xstream->ctx);
344  ABTI_CHECK_ERROR_MSG(abt_errno, "ABTD_xstream_context_set_self");
345 
346  /* Set the CPU affinity for the ES */
347  if (gp_ABTI_global->set_affinity == ABT_TRUE) {
348  ABTD_affinity_set(&p_xstream->ctx, p_xstream->rank);
349  }
350 
351  /* Create the main sched ULT */
352  ABTI_sched *p_sched = p_xstream->p_main_sched;
353  abt_errno = ABTI_thread_create_main_sched(*pp_local, p_xstream, p_sched);
354  ABTI_CHECK_ERROR(abt_errno);
355  p_sched->p_thread->p_last_xstream = p_xstream;
356 
357  /* Start the scheduler by context switching to it */
358  LOG_EVENT("[U%" PRIu64 ":E%d] yield\n", ABTI_thread_get_id(p_thread),
359  p_thread->p_last_xstream->rank);
360  ABTI_thread_context_switch_thread_to_sched(pp_local, p_thread, p_sched);
361 
362  /* Back to the main ULT */
363  LOG_EVENT("[U%" PRIu64 ":E%d] resume\n", ABTI_thread_get_id(p_thread),
364  p_thread->p_last_xstream->rank);
365 
366 fn_exit:
367  return abt_errno;
368 
369 fn_fail:
370  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
371  goto fn_exit;
372 }
373 
389 {
390  int abt_errno = ABT_SUCCESS;
391  ABTI_local *p_local = ABTI_local_get_local();
392  ABT_xstream h_xstream = *xstream;
393 
394  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(h_xstream);
395  if (p_xstream == NULL)
396  goto fn_exit;
397 
398  /* We first need to check whether p_local is NULL because this
399  * routine might be called by external threads. */
400  ABTI_CHECK_TRUE_MSG(p_local == NULL || p_xstream != p_local->p_xstream,
402  "The current xstream cannot be freed.");
403 
404  ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
406  "The primary xstream cannot be freed explicitly.");
407 
408  /* Wait until xstream terminates */
409  if (ABTD_atomic_acquire_load_int(&p_xstream->state) !=
411  abt_errno = ABTI_xstream_join(&p_local, p_xstream);
412  ABTI_CHECK_ERROR(abt_errno);
413  }
414 
415  /* Free the xstream object */
416  abt_errno = ABTI_xstream_free(p_local, p_xstream);
417  ABTI_CHECK_ERROR(abt_errno);
418 
419  /* Return value */
420  *xstream = ABT_XSTREAM_NULL;
421 
422 fn_exit:
423  return abt_errno;
424 
425 fn_fail:
426  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
427  goto fn_exit;
428 }
429 
444 {
445  int abt_errno = ABT_SUCCESS;
446  ABTI_local *p_local = ABTI_local_get_local();
447  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
448  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
449 
450  abt_errno = ABTI_xstream_join(&p_local, p_xstream);
451  ABTI_CHECK_ERROR(abt_errno);
452 
453 fn_exit:
454  return abt_errno;
455 
456 fn_fail:
457  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
458  goto fn_exit;
459 }
460 
474 {
475  int abt_errno = ABT_SUCCESS;
476  ABTI_local *p_local = ABTI_local_get_local();
477 
478  /* In case that Argobots has not been initialized or this routine is called
479  * by an external thread, e.g., pthread, return an error code instead of
480  * making the call fail. */
481  if (gp_ABTI_global == NULL) {
482  abt_errno = ABT_ERR_UNINITIALIZED;
483  goto fn_exit;
484  }
485  if (p_local == NULL) {
486  abt_errno = ABT_ERR_INV_XSTREAM;
487  goto fn_exit;
488  }
489 
490  ABTI_xstream *p_xstream = p_local->p_xstream;
491  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
492 
493  /* Set the exit request */
494  ABTI_xstream_set_request(p_xstream, ABTI_XSTREAM_REQ_EXIT);
495 
496  /* Wait until the ES terminates */
497  do {
498 #ifndef ABT_CONFIG_DISABLE_EXT_THREAD
499  if (ABTI_self_get_type(p_local) != ABT_UNIT_TYPE_THREAD) {
500  ABTD_atomic_pause();
501  continue;
502  }
503 #endif
504  ABTI_thread_yield(&p_local, p_local->p_thread);
505  } while (ABTD_atomic_acquire_load_int(&p_xstream->state) !=
507 
508 fn_exit:
509  return abt_errno;
510 
511 fn_fail:
512  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
513  goto fn_exit;
514 }
515 
525 {
526  int abt_errno = ABT_SUCCESS;
527  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
528 
529  ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
531  "The primary xstream cannot be canceled.");
532 
533  /* Set the cancel request */
534  ABTI_xstream_set_request(p_xstream, ABTI_XSTREAM_REQ_CANCEL);
535 
536 fn_exit:
537  return abt_errno;
538 
539 fn_fail:
540  goto fn_exit;
541 }
542 
558 {
559  int abt_errno = ABT_SUCCESS;
560  ABTI_local *p_local = ABTI_local_get_local();
561 
562  /* In case that Argobots has not been initialized or this routine is called
563  * by an external thread, e.g., pthread, return an error code instead of
564  * making the call fail. */
565  if (gp_ABTI_global == NULL) {
566  abt_errno = ABT_ERR_UNINITIALIZED;
567  *xstream = ABT_XSTREAM_NULL;
568  goto fn_exit;
569  }
570  if (p_local == NULL) {
571  abt_errno = ABT_ERR_INV_XSTREAM;
572  *xstream = ABT_XSTREAM_NULL;
573  goto fn_exit;
574  }
575 
576  ABTI_xstream *p_xstream = p_local->p_xstream;
577  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
578 
579  /* Return value */
580  *xstream = ABTI_xstream_get_handle(p_xstream);
581 
582 fn_exit:
583  return abt_errno;
584 
585 fn_fail:
586  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
587  goto fn_exit;
588 }
589 
600 int ABT_xstream_self_rank(int *rank)
601 {
602  int abt_errno = ABT_SUCCESS;
603  ABTI_local *p_local = ABTI_local_get_local();
604 
605  /* In case that Argobots has not been initialized or this routine is called
606  * by an external thread, e.g., pthread, return an error code instead of
607  * making the call fail. */
608  if (gp_ABTI_global == NULL) {
609  abt_errno = ABT_ERR_UNINITIALIZED;
610  goto fn_exit;
611  }
612  if (p_local == NULL) {
613  abt_errno = ABT_ERR_INV_XSTREAM;
614  goto fn_exit;
615  }
616 
617  ABTI_xstream *p_xstream = p_local->p_xstream;
618  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
619 
620  /* Return value */
621  *rank = (int)p_xstream->rank;
622 
623 fn_exit:
624  return abt_errno;
625 
626 fn_fail:
627  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
628  goto fn_exit;
629 }
630 
640 int ABT_xstream_set_rank(ABT_xstream xstream, const int rank)
641 {
642  int abt_errno = ABT_SUCCESS;
643  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
644  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
645 
646  p_xstream->rank = rank;
647 
648  /* Set the CPU affinity for the ES */
649  if (gp_ABTI_global->set_affinity == ABT_TRUE) {
650  ABTD_affinity_set(&p_xstream->ctx, p_xstream->rank);
651  }
652 
653 fn_exit:
654  return abt_errno;
655 
656 fn_fail:
657  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
658  goto fn_exit;
659 }
660 
670 int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
671 {
672  int abt_errno = ABT_SUCCESS;
673  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
674  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
675  *rank = (int)p_xstream->rank;
676 
677 fn_exit:
678  return abt_errno;
679 
680 fn_fail:
681  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
682  goto fn_exit;
683 }
684 
715 {
716  int abt_errno = ABT_SUCCESS;
717  ABTI_local *p_local = ABTI_local_get_local();
718  ABTI_sched *p_sched;
719 
720  ABTI_CHECK_TRUE(p_local != NULL, ABT_ERR_INV_THREAD);
721 
722  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
723  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
724 
725  ABTI_thread *p_thread = p_local->p_thread;
726  ABTI_CHECK_TRUE(p_thread != NULL, ABT_ERR_INV_THREAD);
727 
728  /* For now, if the target ES is running, we allow to change the main
729  * scheduler of the ES only when the caller is running on the same ES. */
730  /* TODO: a new state representing that the scheduler is changed is needed
731  * to avoid running xstreams while the scheduler is changed in this
732  * function. */
733  if (ABTD_atomic_acquire_load_int(&p_xstream->state) ==
735  if (p_thread->p_last_xstream != p_xstream) {
736  abt_errno = ABT_ERR_XSTREAM_STATE;
737  goto fn_fail;
738  }
739  }
740 
741  /* TODO: permit to change the scheduler even when having work units in pools
742  */
743  if (p_xstream->p_main_sched) {
744  /* We only allow to change the main scheduler when the current main
745  * scheduler of p_xstream has no work unit in its associated pools. */
746  if (ABTI_sched_get_effective_size(p_local, p_xstream->p_main_sched) >
747  0) {
748  abt_errno = ABT_ERR_XSTREAM;
749  goto fn_fail;
750  }
751  }
752 
753  if (sched == ABT_SCHED_NULL) {
754  abt_errno = ABTI_sched_create_basic(ABT_SCHED_DEFAULT, 0, NULL,
755  ABT_SCHED_CONFIG_NULL, &p_sched);
756  ABTI_CHECK_ERROR(abt_errno);
757  } else {
758  p_sched = ABTI_sched_get_ptr(sched);
759  ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
761  }
762 
763  abt_errno = ABTI_xstream_set_main_sched(&p_local, p_xstream, p_sched);
764  ABTI_CHECK_ERROR(abt_errno);
765 
766 fn_exit:
767  return abt_errno;
768 
769 fn_fail:
770  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
771  goto fn_exit;
772 }
773 
788  ABT_sched_predef predef, int num_pools,
789  ABT_pool *pools)
790 {
791  ABTI_local *p_local = ABTI_local_get_local();
792  int abt_errno = ABT_SUCCESS;
793 
794  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
795  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
796 
797  ABTI_sched *p_sched;
798  abt_errno = ABTI_sched_create_basic(predef, num_pools, pools,
799  ABT_SCHED_CONFIG_NULL, &p_sched);
800  ABTI_CHECK_ERROR(abt_errno);
801 
802  abt_errno = ABTI_xstream_set_main_sched(&p_local, p_xstream, p_sched);
803  ABTI_CHECK_ERROR(abt_errno);
804 
805 fn_exit:
806  return abt_errno;
807 
808 fn_fail:
809  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
810  goto fn_exit;
811 }
812 
826 {
827  int abt_errno = ABT_SUCCESS;
828 
829  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
830  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
831 
832  *sched = ABTI_sched_get_handle(p_xstream->p_main_sched);
833 
834 fn_exit:
835  return abt_errno;
836 
837 fn_fail:
838  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
839  goto fn_exit;
840 }
841 
855 int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools,
856  ABT_pool *pools)
857 {
858  int abt_errno = ABT_SUCCESS;
859 
860  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
861  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
862  ABTI_sched *p_sched = p_xstream->p_main_sched;
863  max_pools = p_sched->num_pools > max_pools ? max_pools : p_sched->num_pools;
864  memcpy(pools, p_sched->pools, sizeof(ABT_pool) * max_pools);
865 
866 fn_exit:
867  return abt_errno;
868 
869 fn_fail:
870  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
871  goto fn_exit;
872 }
873 
884 {
885  int abt_errno = ABT_SUCCESS;
886 
887  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
888  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
889 
890  /* Return value */
891  *state = (ABT_xstream_state)ABTD_atomic_acquire_load_int(&p_xstream->state);
892 
893 fn_exit:
894  return abt_errno;
895 
896 fn_fail:
897  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
898  goto fn_exit;
899 }
900 
917  ABT_bool *result)
918 {
919  ABTI_xstream *p_xstream1 = ABTI_xstream_get_ptr(xstream1);
920  ABTI_xstream *p_xstream2 = ABTI_xstream_get_ptr(xstream2);
921  *result = (p_xstream1 == p_xstream2) ? ABT_TRUE : ABT_FALSE;
922  return ABT_SUCCESS;
923 }
924 
937 int ABT_xstream_get_num(int *num_xstreams)
938 {
939  int abt_errno = ABT_SUCCESS;
940 
941  /* In case that Argobots has not been initialized, return an error code
942  * instead of making the call fail. */
943  if (gp_ABTI_global == NULL) {
944  abt_errno = ABT_ERR_UNINITIALIZED;
945  *num_xstreams = 0;
946  goto fn_exit;
947  }
948 
949  *num_xstreams = gp_ABTI_global->num_xstreams;
950 
951 fn_exit:
952  return abt_errno;
953 }
954 
970 {
971  int abt_errno = ABT_SUCCESS;
972  ABTI_xstream *p_xstream;
973 
974  p_xstream = ABTI_xstream_get_ptr(xstream);
975  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
976 
977  /* Return value */
978  *flag =
979  (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) ? ABT_TRUE : ABT_FALSE;
980 
981 fn_exit:
982  return abt_errno;
983 
984 fn_fail:
985  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
986  goto fn_exit;
987 }
988 
1002 {
1003  int abt_errno;
1004  ABTI_local *p_local = ABTI_local_get_local();
1005  ABTI_xstream *p_xstream = p_local->p_xstream;
1006  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
1007 
1008  abt_errno = ABTI_xstream_run_unit(&p_local, p_xstream, unit, p_pool);
1009  ABTI_CHECK_ERROR(abt_errno);
1010 
1011 fn_exit:
1012  return abt_errno;
1013 
1014 fn_fail:
1015  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1016  goto fn_exit;
1017 }
1018 
1019 int ABTI_xstream_run_unit(ABTI_local **pp_local, ABTI_xstream *p_xstream,
1020  ABT_unit unit, ABTI_pool *p_pool)
1021 {
1022  int abt_errno = ABT_SUCCESS;
1023 
1024  ABT_unit_type type = p_pool->u_get_type(unit);
1025 
1026  if (type == ABT_UNIT_TYPE_THREAD) {
1027  ABT_thread thread = p_pool->u_get_thread(unit);
1028  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1029  /* Switch the context */
1030  abt_errno = ABTI_xstream_schedule_thread(pp_local, p_xstream, p_thread);
1031  ABTI_CHECK_ERROR(abt_errno);
1032 
1033  } else if (type == ABT_UNIT_TYPE_TASK) {
1034  ABT_task task = p_pool->u_get_task(unit);
1035  ABTI_task *p_task = ABTI_task_get_ptr(task);
1036  /* Execute the task */
1037  ABTI_xstream_schedule_task(*pp_local, p_xstream, p_task);
1038 
1039  } else {
1040  HANDLE_ERROR("Not supported type!");
1041  ABTI_CHECK_TRUE(0, ABT_ERR_INV_UNIT);
1042  }
1043 
1044 fn_exit:
1045  return abt_errno;
1046 
1047 fn_fail:
1048  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1049  goto fn_exit;
1050 }
1051 
1064 {
1065  int abt_errno = ABT_SUCCESS;
1066  ABTI_local *p_local = ABTI_local_get_local();
1067 
1068  /* In case that Argobots has not been initialized or this routine is called
1069  * by an external thread, e.g., pthread, return an error code instead of
1070  * making the call fail. */
1071  if (gp_ABTI_global == NULL) {
1072  abt_errno = ABT_ERR_UNINITIALIZED;
1073  goto fn_exit;
1074  }
1075  if (p_local == NULL) {
1076  abt_errno = ABT_ERR_INV_XSTREAM;
1077  goto fn_exit;
1078  }
1079 
1080  ABTI_xstream *p_xstream = p_local->p_xstream;
1081 
1082  abt_errno = ABTI_xstream_check_events(p_xstream, sched);
1083  ABTI_CHECK_ERROR(abt_errno);
1084 
1085 fn_exit:
1086  return abt_errno;
1087 
1088 fn_fail:
1089  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1090  goto fn_exit;
1091 }
1092 
1093 int ABTI_xstream_check_events(ABTI_xstream *p_xstream, ABT_sched sched)
1094 {
1095  int abt_errno = ABT_SUCCESS;
1096  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
1097  ABTI_CHECK_NULL_SCHED_PTR(p_sched);
1098 
1099  ABTI_info_check_print_all_thread_stacks();
1100 
1101  uint32_t request = ABTD_atomic_acquire_load_uint32(&p_xstream->request);
1102  if (request & ABTI_XSTREAM_REQ_JOIN) {
1103  ABTI_sched_finish(p_sched);
1104  }
1105 
1106  if ((request & ABTI_XSTREAM_REQ_EXIT) ||
1107  (request & ABTI_XSTREAM_REQ_CANCEL)) {
1108  ABTI_sched_exit(p_sched);
1109  }
1110 
1111 fn_exit:
1112  return abt_errno;
1113 
1114 fn_fail:
1115  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1116  goto fn_exit;
1117 }
1118 
1132 int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
1133 {
1134  int abt_errno = ABT_SUCCESS;
1135  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1136  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1137 
1138  abt_errno = ABTD_affinity_set_cpuset(&p_xstream->ctx, 1, &cpuid);
1139  ABTI_CHECK_ERROR(abt_errno);
1140 
1141 fn_exit:
1142  return abt_errno;
1143 
1144 fn_fail:
1145  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1146  goto fn_exit;
1147 }
1148 
1162 int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
1163 {
1164  int abt_errno = ABT_SUCCESS;
1165  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1166  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1167 
1168  abt_errno = ABTD_affinity_get_cpuset(&p_xstream->ctx, 1, cpuid, NULL);
1169  ABTI_CHECK_ERROR(abt_errno);
1170 
1171 fn_exit:
1172  return abt_errno;
1173 
1174 fn_fail:
1175  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1176  goto fn_exit;
1177 }
1178 
1193 int ABT_xstream_set_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset)
1194 {
1195  int abt_errno = ABT_SUCCESS;
1196  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1197  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1198 
1199  abt_errno = ABTD_affinity_set_cpuset(&p_xstream->ctx, cpuset_size, cpuset);
1200  ABTI_CHECK_ERROR(abt_errno);
1201 
1202 fn_exit:
1203  return abt_errno;
1204 
1205 fn_fail:
1206  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1207  goto fn_exit;
1208 }
1209 
1230 int ABT_xstream_get_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset,
1231  int *num_cpus)
1232 {
1233  int abt_errno = ABT_SUCCESS;
1234  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1235  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1236 
1237  if (cpuset == NULL && num_cpus == NULL) {
1238  goto fn_exit;
1239  }
1240 
1241  abt_errno = ABTD_affinity_get_cpuset(&p_xstream->ctx, cpuset_size, cpuset,
1242  num_cpus);
1243  ABTI_CHECK_ERROR(abt_errno);
1244 
1245 fn_exit:
1246  return abt_errno;
1247 
1248 fn_fail:
1249  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1250  goto fn_exit;
1251 }
1252 
1253 /*****************************************************************************/
1254 /* Private APIs */
1255 /*****************************************************************************/
1256 
1257 int ABTI_xstream_join(ABTI_local **pp_local, ABTI_xstream *p_xstream)
1258 {
1259  int abt_errno = ABT_SUCCESS;
1260  ABTI_local *p_local;
1261  ABTI_thread *p_thread;
1262  ABT_bool is_blockable = ABT_FALSE;
1263 
1264  ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
1266  "The primary ES cannot be joined.");
1267 
1268  /* When the associated pool of the caller ULT has multiple-writer access
1269  * mode, the ULT can be blocked. Otherwise, the access mode, if it is a
1270  * single-writer access mode, may be violated because another ES has to set
1271  * the blocked ULT ready. */
1272  p_local = *pp_local;
1273  p_thread = p_local ? p_local->p_thread : NULL;
1274  if (p_thread) {
1275  ABT_pool_access access = p_thread->p_pool->access;
1276  if (access == ABT_POOL_ACCESS_MPSC || access == ABT_POOL_ACCESS_MPMC) {
1277  is_blockable = ABT_TRUE;
1278  }
1279 
1280  /* The target ES must not be the same as the caller ULT's ES if the
1281  * access mode of the associated pool is not MPMC. */
1282  if (access != ABT_POOL_ACCESS_MPMC) {
1283  ABTI_CHECK_TRUE_MSG(p_xstream != p_local->p_xstream,
1285  "The target ES should be different.");
1286  }
1287  }
1288 
1289  if (ABTD_atomic_acquire_load_int(&p_xstream->state) ==
1291  goto fn_join;
1292  }
1293 
1294  /* Wait until the target ES terminates */
1295  if (is_blockable == ABT_TRUE) {
1296  ABTI_POOL_SET_CONSUMER(p_thread->p_pool,
1297  ABTI_self_get_native_thread_id(p_local));
1298 
1299  /* Save the caller ULT to set it ready when the ES is terminated */
1300  p_xstream->p_req_arg = (void *)p_thread;
1301  ABTI_thread_set_blocked(p_thread);
1302 
1303  /* Set the join request */
1304  ABTI_xstream_set_request(p_xstream, ABTI_XSTREAM_REQ_JOIN);
1305 
1306  /* If the caller is a ULT, it is blocked here */
1307  ABTI_thread_suspend(pp_local, p_thread);
1308  } else {
1309  /* Set the join request */
1310  ABTI_xstream_set_request(p_xstream, ABTI_XSTREAM_REQ_JOIN);
1311 
1312  while (ABTD_atomic_acquire_load_int(&p_xstream->state) !=
1314 #ifndef ABT_CONFIG_DISABLE_EXT_THREAD
1315  if (ABTI_self_get_type(p_local) != ABT_UNIT_TYPE_THREAD) {
1316  ABTD_atomic_pause();
1317  continue;
1318  }
1319 #endif
1320  ABTI_thread_yield(pp_local, p_local->p_thread);
1321  p_local = *pp_local;
1322  }
1323  }
1324 
1325 fn_join:
1326  /* Normal join request */
1327  abt_errno = ABTD_xstream_context_join(&p_xstream->ctx);
1328  ABTI_CHECK_ERROR_MSG(abt_errno, "ABTD_xstream_context_join");
1329 
1330 fn_exit:
1331  return abt_errno;
1332 
1333 fn_fail:
1334  goto fn_exit;
1335 }
1336 
1337 int ABTI_xstream_free(ABTI_local *p_local, ABTI_xstream *p_xstream)
1338 {
1339  int abt_errno = ABT_SUCCESS;
1340 
1341  LOG_EVENT("[E%d] freed\n", p_xstream->rank);
1342 
1343  /* Return rank for reuse. rank must be returned prior to other free
1344  * functions so that other xstreams cannot refer to this xstream via
1345  * global->p_xstreams. */
1346  ABTI_xstream_return_rank(p_xstream);
1347 
1348  /* Free the scheduler */
1349  ABTI_sched *p_cursched = p_xstream->p_main_sched;
1350  if (p_cursched != NULL) {
1351  abt_errno = ABTI_sched_discard_and_free(p_local, p_cursched);
1352  ABTI_CHECK_ERROR(abt_errno);
1353  }
1354 
1355  /* Free the array of sched contexts */
1356  ABTU_free(p_xstream->scheds);
1357 
1358  /* Free the context if a given xstream is secondary. */
1359  if (p_xstream->type == ABTI_XSTREAM_TYPE_SECONDARY) {
1360  abt_errno = ABTD_xstream_context_free(&p_xstream->ctx);
1361  ABTI_CHECK_ERROR(abt_errno);
1362  }
1363 
1364  ABTU_free(p_xstream);
1365 
1366 fn_exit:
1367  return abt_errno;
1368 
1369 fn_fail:
1370  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1371  goto fn_exit;
1372 }
1373 
1374 /* The main scheduler of each ES executes this routine. */
1375 void ABTI_xstream_schedule(void *p_arg)
1376 {
1377  ABTI_local *p_local = ABTI_local_get_local();
1378  ABTI_xstream *p_xstream = (ABTI_xstream *)p_arg;
1379 
1380  ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
1382  while (1) {
1383  uint32_t request;
1384 
1385  /* Execute the run function of scheduler */
1386  ABTI_sched *p_sched = p_xstream->p_main_sched;
1387  /* This function can be invoked without user-level context switches
1388  * (e.g., directly called on top of Pthreads), so ABTI_LOG_SET_SCHED
1389  * must be called manually here. */
1390  ABTI_LOG_SET_SCHED(p_sched);
1391  p_sched->state = ABT_SCHED_STATE_RUNNING;
1392  LOG_EVENT("[S%" PRIu64 "] start\n", p_sched->id);
1393  p_sched->run(ABTI_sched_get_handle(p_sched));
1394  LOG_EVENT("[S%" PRIu64 "] end\n", p_sched->id);
1395  p_sched->state = ABT_SCHED_STATE_TERMINATED;
1396 
1397  ABTI_spinlock_release(&p_xstream->sched_lock);
1398 
1399  request = ABTD_atomic_acquire_load_uint32(&p_xstream->request);
1400 
1401  /* If there is an exit or a cancel request, the ES terminates
1402  * regardless of remaining work units. */
1403  if ((request & ABTI_XSTREAM_REQ_EXIT) ||
1404  (request & ABTI_XSTREAM_REQ_CANCEL))
1405  break;
1406 
1407  /* When join is requested, the ES terminates after finishing
1408  * execution of all work units. */
1409  if (request & ABTI_XSTREAM_REQ_JOIN) {
1410  if (ABTI_sched_get_effective_size(p_local,
1411  p_xstream->p_main_sched) == 0) {
1412  /* If a ULT has been blocked on the join call, we make it ready
1413  */
1414  if (p_xstream->p_req_arg) {
1415  ABTI_thread_set_ready(p_local,
1416  (ABTI_thread *)p_xstream->p_req_arg);
1417  p_xstream->p_req_arg = NULL;
1418  }
1419  break;
1420  }
1421  }
1422  }
1423 
1424  /* Set the ES's state as TERMINATED */
1425  ABTD_atomic_release_store_int(&p_xstream->state,
1427  LOG_EVENT("[E%d] terminated\n", p_xstream->rank);
1428 }
1429 
1430 int ABTI_xstream_schedule_thread(ABTI_local **pp_local, ABTI_xstream *p_xstream,
1431  ABTI_thread *p_thread)
1432 {
1433  int abt_errno = ABT_SUCCESS;
1434  ABTI_local *p_local = *pp_local;
1435 
1436 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1437  if (ABTD_atomic_acquire_load_uint32(&p_thread->request) &
1438  ABTI_THREAD_REQ_CANCEL) {
1439  LOG_EVENT("[U%" PRIu64 ":E%d] canceled\n", ABTI_thread_get_id(p_thread),
1440  p_xstream->rank);
1441  ABTD_thread_cancel(p_local, p_thread);
1442  ABTI_xstream_terminate_thread(p_local, p_thread);
1443  goto fn_exit;
1444  }
1445 #endif
1446 
1447 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1448  if (ABTD_atomic_acquire_load_uint32(&p_thread->request) &
1449  ABTI_THREAD_REQ_MIGRATE) {
1450  abt_errno = ABTI_xstream_migrate_thread(p_local, p_thread);
1451  ABTI_CHECK_ERROR(abt_errno);
1452  goto fn_exit;
1453  }
1454 #endif
1455 
1456 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
1457  /* Add the new scheduler if the ULT is a scheduler */
1458  if (p_thread->is_sched != NULL) {
1459  p_thread->is_sched->p_ctx = &p_thread->ctx;
1460  ABTI_xstream_push_sched(p_xstream, p_thread->is_sched);
1461  p_thread->is_sched->state = ABT_SCHED_STATE_RUNNING;
1462  }
1463 #endif
1464 
1465  /* Change the last ES */
1466  p_thread->p_last_xstream = p_xstream;
1467 
1468  /* Change the ULT state */
1469  ABTD_atomic_release_store_int(&p_thread->state, ABT_THREAD_STATE_RUNNING);
1470 
1471  /* Switch the context */
1472  LOG_EVENT("[U%" PRIu64 ":E%d] start running\n",
1473  ABTI_thread_get_id(p_thread), p_xstream->rank);
1474  ABTI_sched *p_sched = ABTI_xstream_get_top_sched(p_xstream);
1475 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
1476  if (p_thread->is_sched != NULL) {
1477  ABTI_thread_context_switch_sched_to_sched(pp_local, p_sched,
1478  p_thread->is_sched);
1479  /* The scheduler continues from here. */
1480  p_local = *pp_local;
1481  /* Because of the stackable scheduler concept, the previous ULT must
1482  * be the same as one to which the context has been switched. */
1483  } else {
1484 #endif
1485  ABTI_thread_context_switch_sched_to_thread(pp_local, p_sched, p_thread);
1486  /* The scheduler continues from here. */
1487  p_local = *pp_local;
1488  /* The previous ULT may not be the same as one to which the
1489  * context has been switched. */
1490  p_thread = p_local->p_thread;
1491 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
1492  }
1493 #endif
1494 
1495  p_xstream = p_thread->p_last_xstream;
1496  LOG_EVENT("[U%" PRIu64 ":E%d] stopped\n", ABTI_thread_get_id(p_thread),
1497  p_xstream->rank);
1498 
1499 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
1500  /* Delete the last scheduler if the ULT was a scheduler */
1501  if (p_thread->is_sched != NULL) {
1502  ABTI_xstream_pop_sched(p_xstream);
1503  /* If a migration is trying to read the state of the scheduler, we need
1504  * to let it finish before freeing the scheduler */
1505  p_thread->is_sched->state = ABT_SCHED_STATE_STOPPED;
1506  ABTI_spinlock_release(&p_xstream->sched_lock);
1507  }
1508 #endif
1509 
1510  /* Request handling. */
1511  /* We do not need to acquire-load request since all critical requests
1512  * (BLOCK, ORPHAN, STOP, and NOPUSH) are written by p_thread. CANCEL might
1513  * be delayed. */
1514  uint32_t request = ABTD_atomic_acquire_load_uint32(&p_thread->request);
1515  if (request & ABTI_THREAD_REQ_STOP) {
1516  /* The ULT has completed its execution or it called the exit request. */
1517  LOG_EVENT("[U%" PRIu64 ":E%d] %s\n", ABTI_thread_get_id(p_thread),
1518  p_xstream->rank,
1519  (request & ABTI_THREAD_REQ_TERMINATE
1520  ? "finished"
1521  : ((request & ABTI_THREAD_REQ_EXIT) ? "exit called"
1522  : "UNKNOWN")));
1523  ABTI_xstream_terminate_thread(p_local, p_thread);
1524 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1525  } else if (request & ABTI_THREAD_REQ_CANCEL) {
1526  LOG_EVENT("[U%" PRIu64 ":E%d] canceled\n", ABTI_thread_get_id(p_thread),
1527  p_xstream->rank);
1528  ABTD_thread_cancel(p_local, p_thread);
1529  ABTI_xstream_terminate_thread(p_local, p_thread);
1530 #endif
1531  } else if (!(request & ABTI_THREAD_REQ_NON_YIELD)) {
1532  /* The ULT did not finish its execution.
1533  * Change the state of current running ULT and
1534  * add it to the pool again. */
1535  ABTI_POOL_ADD_THREAD(p_thread, ABTI_self_get_native_thread_id(p_local));
1536  } else if (request & ABTI_THREAD_REQ_BLOCK) {
1537  LOG_EVENT("[U%" PRIu64 ":E%d] check blocked\n",
1538  ABTI_thread_get_id(p_thread), p_xstream->rank);
1539  ABTI_thread_unset_request(p_thread, ABTI_THREAD_REQ_BLOCK);
1540 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1541  } else if (request & ABTI_THREAD_REQ_MIGRATE) {
1542  /* This is the case when the ULT requests migration of itself. */
1543  abt_errno = ABTI_xstream_migrate_thread(p_local, p_thread);
1544  ABTI_CHECK_ERROR(abt_errno);
1545 #endif
1546  } else if (request & ABTI_THREAD_REQ_ORPHAN) {
1547  /* The ULT is not pushed back to the pool and is disconnected from any
1548  * pool. */
1549  LOG_EVENT("[U%" PRIu64 ":E%d] orphaned\n", ABTI_thread_get_id(p_thread),
1550  p_xstream->rank);
1551  ABTI_thread_unset_request(p_thread, ABTI_THREAD_REQ_ORPHAN);
1552  p_thread->p_pool->u_free(&p_thread->unit);
1553  p_thread->p_pool = NULL;
1554  } else if (request & ABTI_THREAD_REQ_NOPUSH) {
1555  /* The ULT is not pushed back to the pool */
1556  LOG_EVENT("[U%" PRIu64 ":E%d] not pushed\n",
1557  ABTI_thread_get_id(p_thread), p_xstream->rank);
1558  ABTI_thread_unset_request(p_thread, ABTI_THREAD_REQ_NOPUSH);
1559  } else {
1560  abt_errno = ABT_ERR_THREAD;
1561  goto fn_fail;
1562  }
1563 
1564 fn_exit:
1565  return abt_errno;
1566 
1567 fn_fail:
1568  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1569  goto fn_exit;
1570 }
1571 
1572 void ABTI_xstream_schedule_task(ABTI_local *p_local, ABTI_xstream *p_xstream,
1573  ABTI_task *p_task)
1574 {
1575 #ifndef ABT_CONFIG_DISABLE_TASK_CANCEL
1576  if (ABTD_atomic_acquire_load_uint32(&p_task->request) &
1577  ABTI_TASK_REQ_CANCEL) {
1578  ABTI_xstream_terminate_task(p_local, p_task);
1579  return;
1580  }
1581 #endif
1582 
1583  /* Set the current running tasklet */
1584  p_local->p_task = p_task;
1585  p_local->p_thread = NULL;
1586 
1587  /* Change the task state */
1588  ABTD_atomic_release_store_int(&p_task->state, ABT_TASK_STATE_RUNNING);
1589 
1590  /* Set the associated ES */
1591  p_task->p_xstream = p_xstream;
1592 
1593 #ifdef ABT_CONFIG_DISABLE_STACKABLE_SCHED
1594  /* Execute the task function */
1595  LOG_EVENT("[T%" PRIu64 ":E%d] running\n", ABTI_task_get_id(p_task),
1596  p_xstream->rank);
1597  ABTI_LOG_SET_SCHED(NULL);
1598  p_task->f_task(p_task->p_arg);
1599 #else
1600  /* Add a new scheduler if the task is a scheduler */
1601  if (p_task->is_sched != NULL) {
1602  ABTI_sched *current_sched = ABTI_xstream_get_top_sched(p_xstream);
1603  ABTI_thread *p_last_thread = current_sched->p_thread;
1604 
1605  p_task->is_sched->p_ctx = current_sched->p_ctx;
1606  ABTI_xstream_push_sched(p_xstream, p_task->is_sched);
1607  p_task->is_sched->state = ABT_SCHED_STATE_RUNNING;
1608  p_task->is_sched->p_thread = p_last_thread;
1609  LOG_EVENT("[S%" PRIu64 ":E%d] stacked sched start\n",
1610  p_task->is_sched->id, p_xstream->rank);
1611  }
1612 
1613  /* Execute the task function */
1614  LOG_EVENT("[T%" PRIu64 ":E%d] running\n", ABTI_task_get_id(p_task),
1615  p_xstream->rank);
1616  ABTI_LOG_SET_SCHED(p_task->is_sched ? p_task->is_sched : NULL);
1617 
1618  p_task->f_task(p_task->p_arg);
1619 
1620  /* Delete the last scheduler if the tasklet was a scheduler */
1621  if (p_task->is_sched != NULL) {
1622  ABTI_xstream_pop_sched(p_xstream);
1623  /* If a migration is trying to read the state of the scheduler, we need
1624  * to let it finish before freeing the scheduler */
1625  ABTI_spinlock_release(&p_xstream->sched_lock);
1626  ABTI_LOG_SET_SCHED(ABTI_xstream_get_top_sched(p_xstream));
1627  LOG_EVENT("[S%" PRIu64 ":E%d] stacked sched end\n",
1628  p_task->is_sched->id, p_xstream->rank);
1629  }
1630 #endif
1631 
1632  ABTI_LOG_SET_SCHED(ABTI_xstream_get_top_sched(p_xstream));
1633  LOG_EVENT("[T%" PRIu64 ":E%d] stopped\n", ABTI_task_get_id(p_task),
1634  p_xstream->rank);
1635 
1636  /* Terminate the tasklet */
1637  ABTI_xstream_terminate_task(p_local, p_task);
1638 }
1639 
1640 int ABTI_xstream_migrate_thread(ABTI_local *p_local, ABTI_thread *p_thread)
1641 {
1642 #ifdef ABT_CONFIG_DISABLE_MIGRATION
1643  return ABT_ERR_MIGRATION_NA;
1644 #else
1645  int abt_errno = ABT_SUCCESS;
1646  ABTI_pool *p_pool;
1647 
1648  /* callback function */
1649  if (p_thread->attr.f_cb) {
1650  ABT_thread thread = ABTI_thread_get_handle(p_thread);
1651  p_thread->attr.f_cb(thread, p_thread->attr.p_cb_arg);
1652  }
1653 
1654  ABTI_spinlock_acquire(&p_thread->lock); // TODO: mutex useful?
1655  {
1656  /* extracting argument in migration request */
1657  p_pool =
1658  (ABTI_pool *)ABTI_thread_extract_req_arg(p_thread,
1659  ABTI_THREAD_REQ_MIGRATE);
1660  ABTI_thread_unset_request(p_thread, ABTI_THREAD_REQ_MIGRATE);
1661 
1662  LOG_EVENT("[U%" PRIu64 "] migration: E%d -> NT %p\n",
1663  ABTI_thread_get_id(p_thread), p_thread->p_last_xstream->rank,
1664  (void *)p_pool->consumer_id);
1665 
1666  /* Change the associated pool */
1667  p_thread->p_pool = p_pool;
1668 
1669  /* Add the unit to the scheduler's pool */
1670  ABTI_POOL_PUSH(p_pool, p_thread->unit,
1671  ABTI_self_get_native_thread_id(p_local));
1672  }
1673  ABTI_spinlock_release(&p_thread->lock);
1674 
1675  ABTI_pool_dec_num_migrations(p_pool);
1676 
1677  /* Check the push */
1678  ABTI_CHECK_ERROR(abt_errno);
1679 
1680 fn_exit:
1681  return abt_errno;
1682 
1683 fn_fail:
1684  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1685  goto fn_exit;
1686 #endif
1687 }
1688 
1689 int ABTI_xstream_set_main_sched(ABTI_local **pp_local, ABTI_xstream *p_xstream,
1690  ABTI_sched *p_sched)
1691 {
1692  int abt_errno = ABT_SUCCESS;
1693  ABTI_thread *p_thread = NULL;
1694  ABTI_sched *p_main_sched;
1695  ABTI_pool *p_tar_pool = NULL;
1696  int p;
1697 
1698 #ifndef ABT_CONFIG_DISABLE_POOL_CONSUMER_CHECK
1699  /* We check that from the pool set of the scheduler we do not find a pool
1700  * with another associated pool, and set the right value if it is okay */
1701  ABTI_native_thread_id consumer_id =
1702  ABTI_xstream_get_native_thread_id(p_xstream);
1703  for (p = 0; p < p_sched->num_pools; p++) {
1704  ABTI_pool *p_pool = ABTI_pool_get_ptr(p_sched->pools[p]);
1705  abt_errno = ABTI_pool_set_consumer(p_pool, consumer_id);
1706  ABTI_CHECK_ERROR(abt_errno);
1707  }
1708 #endif
1709 
1710  /* The main scheduler will to be a ULT, not a tasklet */
1711  p_sched->type = ABT_SCHED_TYPE_ULT;
1712 
1713  /* Set the scheduler as a main scheduler */
1714  p_sched->used = ABTI_SCHED_MAIN;
1715 
1716  p_main_sched = p_xstream->p_main_sched;
1717  if (p_main_sched == NULL) {
1718  /* Set the scheduler */
1719  p_xstream->p_main_sched = p_sched;
1720 
1721  goto fn_exit;
1722  }
1723 
1724  /* If the ES has a main scheduler, we have to free it */
1725  p_thread = (*pp_local)->p_thread;
1726  ABTI_ASSERT(p_thread != NULL);
1727 
1728  p_tar_pool = ABTI_pool_get_ptr(p_sched->pools[0]);
1729 
1730  /* If the caller ULT is associated with a pool of the current main
1731  * scheduler, it needs to be associated to a pool of new scheduler. */
1732  for (p = 0; p < p_main_sched->num_pools; p++) {
1733  if (p_thread->p_pool == ABTI_pool_get_ptr(p_main_sched->pools[p])) {
1734  /* Associate the work unit to the first pool of new scheduler */
1735  p_thread->p_pool->u_free(&p_thread->unit);
1736  ABT_thread h_thread = ABTI_thread_get_handle(p_thread);
1737  p_thread->unit = p_tar_pool->u_create_from_thread(h_thread);
1738  p_thread->p_pool = p_tar_pool;
1739  break;
1740  }
1741  }
1742 
1743  if (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) {
1744  ABTI_CHECK_TRUE(p_thread->type == ABTI_THREAD_TYPE_MAIN,
1745  ABT_ERR_THREAD);
1746 
1747  /* Free the current main scheduler */
1748  abt_errno = ABTI_sched_discard_and_free(*pp_local, p_main_sched);
1749  ABTI_CHECK_ERROR(abt_errno);
1750 
1751  /* Since the primary ES does not finish its execution until ABT_finalize
1752  * is called, its main scheduler needs to be automatically freed when
1753  * it is freed in ABT_finalize. */
1754  p_sched->automatic = ABT_TRUE;
1755 
1756  ABTI_POOL_PUSH(p_tar_pool, p_thread->unit,
1757  ABTI_self_get_native_thread_id(*pp_local));
1758 
1759  /* Pop the top scheduler */
1760  ABTI_xstream_pop_sched(p_xstream);
1761 
1762  /* Set the scheduler */
1763  p_xstream->p_main_sched = p_sched;
1764 
1765  /* Start the primary ES again because we have to create a sched ULT for
1766  * the new scheduler */
1767  abt_errno = ABTI_xstream_start_primary(pp_local, p_xstream, p_thread);
1768  ABTI_CHECK_ERROR_MSG(abt_errno, "ABTI_xstream_start");
1769  } else {
1770  /* Finish the current main scheduler */
1771  ABTI_sched_set_request(p_main_sched, ABTI_SCHED_REQ_FINISH);
1772 
1773  /* If the ES is secondary, we should take the associated ULT of the
1774  * current main scheduler and keep it in the new scheduler. */
1775  p_sched->p_thread = p_main_sched->p_thread;
1776  p_sched->p_ctx = p_main_sched->p_ctx;
1777  p_main_sched->p_thread = NULL;
1778 
1779  /* The current ULT is pushed to the new scheduler's pool so that when
1780  * the new scheduler starts (see below), it can be scheduled by the new
1781  * scheduler. When the current ULT resumes its execution, it will free
1782  * the current main scheduler (see below). */
1783  ABTI_POOL_PUSH(p_tar_pool, p_thread->unit,
1784  ABTI_self_get_native_thread_id(*pp_local));
1785 
1786  /* Set the scheduler */
1787  p_xstream->p_main_sched = p_sched;
1788 
1789  /* Replace the top scheduler with the new scheduler */
1790  ABTI_xstream_replace_top_sched(p_xstream, p_sched);
1791 
1792  /* Switch to the current main scheduler */
1793  ABTI_thread_set_request(p_thread, ABTI_THREAD_REQ_NOPUSH);
1794  ABTI_thread_context_switch_thread_to_sched(pp_local, p_thread,
1795  p_main_sched);
1796 
1797  /* Now, we free the current main scheduler */
1798  abt_errno = ABTI_sched_discard_and_free(*pp_local, p_main_sched);
1799  ABTI_CHECK_ERROR(abt_errno);
1800  }
1801 
1802 fn_exit:
1803  return abt_errno;
1804 
1805 fn_fail:
1806  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1807  goto fn_exit;
1808 }
1809 
1810 void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os, int indent,
1811  ABT_bool print_sub)
1812 {
1813  char *prefix = ABTU_get_indent_str(indent);
1814 
1815  if (p_xstream == NULL) {
1816  fprintf(p_os, "%s== NULL ES ==\n", prefix);
1817  goto fn_exit;
1818  }
1819 
1820  char *type, *state;
1821  char *scheds_str;
1822  int i;
1823  size_t size, pos;
1824 
1825  switch (p_xstream->type) {
1826  case ABTI_XSTREAM_TYPE_PRIMARY:
1827  type = "PRIMARY";
1828  break;
1829  case ABTI_XSTREAM_TYPE_SECONDARY:
1830  type = "SECONDARY";
1831  break;
1832  default:
1833  type = "UNKNOWN";
1834  break;
1835  }
1836  switch (ABTD_atomic_acquire_load_int(&p_xstream->state)) {
1838  state = "RUNNING";
1839  break;
1841  state = "TERMINATED";
1842  break;
1843  default:
1844  state = "UNKNOWN";
1845  break;
1846  }
1847 
1848  size = sizeof(char) * (p_xstream->num_scheds * 20 + 4);
1849  scheds_str = (char *)ABTU_calloc(size, 1);
1850  scheds_str[0] = '[';
1851  scheds_str[1] = ' ';
1852  pos = 2;
1853  for (i = 0; i < p_xstream->num_scheds; i++) {
1854  sprintf(&scheds_str[pos], "%p ", (void *)p_xstream->scheds[i]);
1855  pos = strlen(scheds_str);
1856  }
1857  scheds_str[pos] = ']';
1858 
1859  fprintf(p_os,
1860  "%s== ES (%p) ==\n"
1861  "%srank : %d\n"
1862  "%stype : %s\n"
1863  "%sstate : %s\n"
1864  "%srequest : 0x%x\n"
1865  "%smax_scheds: %d\n"
1866  "%snum_scheds: %d\n"
1867  "%sscheds : %s\n"
1868  "%smain_sched: %p\n",
1869  prefix, (void *)p_xstream, prefix, p_xstream->rank, prefix, type,
1870  prefix, state, prefix,
1871  ABTD_atomic_acquire_load_uint32(&p_xstream->request), prefix,
1872  p_xstream->max_scheds, prefix, p_xstream->num_scheds, prefix,
1873  scheds_str, prefix, (void *)p_xstream->p_main_sched);
1874  ABTU_free(scheds_str);
1875 
1876  if (print_sub == ABT_TRUE) {
1877  ABTI_sched_print(p_xstream->p_main_sched, p_os, indent + ABTI_INDENT,
1878  ABT_TRUE);
1879  }
1880 
1881 fn_exit:
1882  fflush(p_os);
1883  ABTU_free(prefix);
1884 }
1885 
1886 void *ABTI_xstream_launch_main_sched(void *p_arg)
1887 {
1888  int abt_errno = ABT_SUCCESS;
1889  ABTI_xstream *p_xstream = (ABTI_xstream *)p_arg;
1890 
1891  /* Initialization of the local variables */
1892  ABTI_local *p_local = NULL;
1893  abt_errno = ABTI_local_init(&p_local);
1894  ABTI_local_set_local(p_local);
1895  ABTI_CHECK_ERROR(abt_errno);
1896  p_local->p_xstream = p_xstream;
1897 
1898  /* Create the main sched ULT if not created yet */
1899  ABTI_sched *p_sched = p_xstream->p_main_sched;
1900  if (!p_sched->p_thread) {
1901  abt_errno = ABTI_thread_create_main_sched(p_local, p_xstream, p_sched);
1902  ABTI_CHECK_ERROR(abt_errno);
1903  p_sched->p_thread->p_last_xstream = p_xstream;
1904  }
1905 
1906  /* Set the sched ULT as the current ULT */
1907  p_local->p_thread = p_sched->p_thread;
1908 
1909  /* Execute the main scheduler of this ES */
1910  LOG_EVENT("[E%d] start\n", p_xstream->rank);
1911  ABTI_xstream_schedule(p_arg);
1912  LOG_EVENT("[E%d] end\n", p_xstream->rank);
1913 
1914  /* Reset the current ES and its local info. */
1915  ABTI_local_finalize(&p_local);
1916 
1917 fn_exit:
1918  return NULL;
1919 
1920 fn_fail:
1921  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1922  goto fn_exit;
1923 }
1924 
1925 /*****************************************************************************/
1926 /* Internal static functions */
1927 /*****************************************************************************/
1928 
1929 /* Set a new rank to ES */
1930 static void ABTI_xstream_set_new_rank(ABTI_xstream *p_xstream)
1931 {
1932  int i, rank;
1933  ABT_bool found = ABT_FALSE;
1934 
1935  while (found == ABT_FALSE) {
1936  if (gp_ABTI_global->num_xstreams >= gp_ABTI_global->max_xstreams) {
1937  ABTI_global_update_max_xstreams(0);
1938  }
1939 
1940  ABTI_spinlock_acquire(&gp_ABTI_global->xstreams_lock);
1941  for (i = 0; i < gp_ABTI_global->max_xstreams; i++) {
1942  if (gp_ABTI_global->p_xstreams[i] == NULL) {
1943  /* Add this ES to the global ES array */
1944  gp_ABTI_global->p_xstreams[i] = p_xstream;
1945  gp_ABTI_global->num_xstreams++;
1946  rank = i;
1947  found = ABT_TRUE;
1948  break;
1949  }
1950  }
1951  ABTI_spinlock_release(&gp_ABTI_global->xstreams_lock);
1952  }
1953 
1954  /* Set the rank */
1955  p_xstream->rank = rank;
1956 }
1957 
1958 static ABT_bool ABTI_xstream_take_rank(ABTI_xstream *p_xstream, int rank)
1959 {
1960  ABT_bool ret;
1961 
1962  if (rank >= gp_ABTI_global->max_xstreams) {
1963  ABTI_global_update_max_xstreams(rank + 1);
1964  }
1965 
1966  ABTI_spinlock_acquire(&gp_ABTI_global->xstreams_lock);
1967  if (gp_ABTI_global->p_xstreams[rank] == NULL) {
1968  /* Add this ES to the global ES array */
1969  gp_ABTI_global->p_xstreams[rank] = p_xstream;
1970  gp_ABTI_global->num_xstreams++;
1971  ret = ABT_TRUE;
1972  } else {
1973  ret = ABT_FALSE;
1974  }
1975  ABTI_spinlock_release(&gp_ABTI_global->xstreams_lock);
1976 
1977  if (ret == ABT_TRUE) {
1978 
1979  /* Set the rank */
1980  p_xstream->rank = rank;
1981  }
1982 
1983  return ret;
1984 }
1985 
1986 static void ABTI_xstream_return_rank(ABTI_xstream *p_xstream)
1987 {
1988  /* Remove this xstream from the global ES array */
1989  ABTI_spinlock_acquire(&gp_ABTI_global->xstreams_lock);
1990  gp_ABTI_global->p_xstreams[p_xstream->rank] = NULL;
1991  gp_ABTI_global->num_xstreams--;
1992  ABTI_spinlock_release(&gp_ABTI_global->xstreams_lock);
1993 }
#define HANDLE_ERROR(msg)
Definition: abti_error.h:227
struct ABT_unit_opaque * ABT_unit
Definition: abt.h:275
int ABT_xstream_self(ABT_xstream *xstream)
Return the ES handle associated with the caller work unit.
Definition: stream.c:557
ABT_sched_predef
Definition: abt.h:144
struct ABT_xstream_opaque * ABT_xstream
Definition: abt.h:251
struct ABT_sched_opaque * ABT_sched
Definition: abt.h:257
char * ABTU_get_indent_str(int indent)
Definition: util.c:12
#define ABT_XSTREAM_NULL
Definition: abt.h:337
#define ABT_ERR_INV_UNIT
Definition: abt.h:79
int ABT_xstream_get_num(int *num_xstreams)
Return the number of current existing ESs.
Definition: stream.c:937
#define ABT_ERR_INV_THREAD
Definition: abt.h:80
ABT_xstream_state
Definition: abt.h:119
int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
Create a new ES and return its handle through newxstream.
Definition: stream.c:27
struct ABT_task_opaque * ABT_task
Definition: abt.h:289
int ABT_xstream_self_rank(int *rank)
Return the rank of ES associated with the caller work unit.
Definition: stream.c:600
static void * ABTU_malloc(size_t size)
Definition: abtu.h:39
int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
Return the rank of ES.
Definition: stream.c:670
int ABT_bool
Definition: abt.h:309
int ABT_xstream_cancel(ABT_xstream xstream)
Request the cancellation of the target ES.
Definition: stream.c:524
int ABT_xstream_get_main_sched(ABT_xstream xstream, ABT_sched *sched)
Get the main scheduler of the target ES.
Definition: stream.c:825
int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *flag)
Check if the target ES is the primary ES.
Definition: stream.c:969
struct ABT_pool_opaque * ABT_pool
Definition: abt.h:267
#define ABT_ERR_THREAD
Definition: abt.h:100
int ABT_xstream_create_with_rank(ABT_sched sched, int rank, ABT_xstream *newxstream)
Create a new ES with a specific rank.
Definition: stream.c:189
#define ABT_FALSE
Definition: abt.h:224
int ABT_xstream_set_main_sched_basic(ABT_xstream xstream, ABT_sched_predef predef, int num_pools, ABT_pool *pools)
Set the main scheduler for xstream with a predefined scheduler.
Definition: stream.c:787
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:279
int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools, ABT_pool *pools)
Get the pools of the main scheduler of the target ES.
Definition: stream.c:855
#define HANDLE_ERROR_FUNC_WITH_CODE(n)
Definition: abti_error.h:241
int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABT_xstream *newxstream)
Create a new ES with a predefined scheduler and return its handle through newxstream.
Definition: stream.c:146
int ABT_xstream_join(ABT_xstream xstream)
Wait for xstream to terminate.
Definition: stream.c:443
int ABT_xstream_check_events(ABT_sched sched)
Check the events and process them.
Definition: stream.c:1063
int ABT_xstream_set_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset)
Set the CPU affinity of the target ES.
Definition: stream.c:1193
int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
Bind the target ES to a target CPU.
Definition: stream.c:1132
ABTI_global * gp_ABTI_global
Definition: global.c:14
#define ABT_SUCCESS
Definition: abt.h:64
int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
Get the CPU binding for the target ES.
Definition: stream.c:1162
ABT_pool_access
Definition: abt.h:162
int ABT_xstream_get_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset, int *num_cpus)
Get the CPU affinity for the target ES.
Definition: stream.c:1230
#define LOG_EVENT(fmt,...)
Definition: abti_log.h:60
#define ABT_TRUE
Definition: abt.h:223
#define ABT_SCHED_NULL
Definition: abt.h:339
int ABT_xstream_exit(void)
Terminate the ES associated with the calling ULT.
Definition: stream.c:473
ABT_unit_type
Definition: abt.h:170
#define ABT_ERR_UNINITIALIZED
Definition: abt.h:65
int ABT_xstream_free(ABT_xstream *xstream)
Release the ES object associated with ES handle.
Definition: stream.c:388
#define ABT_ERR_XSTREAM_STATE
Definition: abt.h:94
#define ABT_ERR_MIGRATION_NA
Definition: abt.h:113
int ABT_xstream_get_state(ABT_xstream xstream, ABT_xstream_state *state)
Return the state of xstream.
Definition: stream.c:883
#define ABT_ERR_INV_XSTREAM_RANK
Definition: abt.h:69
struct ABT_sched_config_opaque * ABT_sched_config
Definition: abt.h:259
#define ABT_ERR_XSTREAM
Definition: abt.h:93
int ABT_xstream_revive(ABT_xstream xstream)
Restart an ES that has been joined by ABT_xstream_join().
Definition: stream.c:305
int ABT_xstream_set_main_sched(ABT_xstream xstream, ABT_sched sched)
Set the main scheduler of the target ES.
Definition: stream.c:714
#define ABT_ERR_INV_SCHED
Definition: abt.h:71
int ABT_xstream_set_rank(ABT_xstream xstream, const int rank)
Set the rank for target ES.
Definition: stream.c:640
#define ABT_SCHED_CONFIG_NULL
Definition: abt.h:340
#define ABT_ERR_INV_XSTREAM
Definition: abt.h:68
static void ABTU_free(void *ptr)
Definition: abtu.h:32
int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool)
Execute a unit on the local ES.
Definition: stream.c:1001
int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2, ABT_bool *result)
Compare two ES handles for equality.
Definition: stream.c:916
static void * ABTU_calloc(size_t num, size_t size)
Definition: abtu.h:49