ARGOBOTS
thread.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 static inline int ABTI_thread_create_internal(
9  ABTI_local *p_local, ABTI_pool *p_pool, void (*thread_func)(void *),
10  void *arg, ABTI_thread_attr *p_attr, ABTI_thread_type thread_type,
11  ABTI_sched *p_sched, int refcount, ABTI_xstream *p_parent_xstream,
12  ABT_bool push_pool, ABTI_thread **pp_newthread);
13 static int ABTI_thread_revive(ABTI_local *p_local, ABTI_pool *p_pool,
14  void (*thread_func)(void *), void *arg,
15  ABTI_thread *p_thread);
16 static inline int ABTI_thread_join(ABTI_local **pp_local,
17  ABTI_thread *p_thread);
18 #ifndef ABT_CONFIG_DISABLE_MIGRATION
19 static int ABTI_thread_migrate_to_xstream(ABTI_local **pp_local,
20  ABTI_thread *p_thread,
21  ABTI_xstream *p_xstream);
22 #endif
23 static inline ABT_bool ABTI_thread_is_ready(ABTI_thread *p_thread);
24 static inline void ABTI_thread_free_internal(ABTI_thread *p_thread);
25 static inline ABT_thread_id ABTI_thread_get_new_id(void);
26 
53 int ABT_thread_create(ABT_pool pool, void (*thread_func)(void *), void *arg,
54  ABT_thread_attr attr, ABT_thread *newthread)
55 {
56  int abt_errno = ABT_SUCCESS;
57  ABTI_local *p_local = ABTI_local_get_local();
58  ABTI_thread *p_newthread;
59 
60  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
61  ABTI_CHECK_NULL_POOL_PTR(p_pool);
62 
63  int refcount = (newthread != NULL) ? 1 : 0;
64  abt_errno =
65  ABTI_thread_create_internal(p_local, p_pool, thread_func, arg,
66  ABTI_thread_attr_get_ptr(attr),
67  ABTI_THREAD_TYPE_USER, NULL, refcount, NULL,
68  ABT_TRUE, &p_newthread);
69 
70  /* Return value */
71  if (newthread)
72  *newthread = ABTI_thread_get_handle(p_newthread);
73 
74 fn_exit:
75  return abt_errno;
76 
77 fn_fail:
78  if (newthread)
79  *newthread = ABT_THREAD_NULL;
80  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
81  goto fn_exit;
82 }
83 
125  void (*thread_func)(void *), void *arg,
126  ABT_thread_attr attr, ABT_thread *newthread)
127 {
128  int abt_errno = ABT_SUCCESS;
129  ABTI_local *p_local = ABTI_local_get_local();
130  ABTI_thread *p_newthread;
131 
132  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
133  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
134 
135  /* TODO: need to consider the access type of target pool */
136  ABTI_pool *p_pool = ABTI_xstream_get_main_pool(p_xstream);
137  int refcount = (newthread != NULL) ? 1 : 0;
138  abt_errno =
139  ABTI_thread_create_internal(p_local, p_pool, thread_func, arg,
140  ABTI_thread_attr_get_ptr(attr),
141  ABTI_THREAD_TYPE_USER, NULL, refcount, NULL,
142  ABT_TRUE, &p_newthread);
143  ABTI_CHECK_ERROR(abt_errno);
144 
145  /* Return value */
146  if (newthread)
147  *newthread = ABTI_thread_get_handle(p_newthread);
148 
149 fn_exit:
150  return abt_errno;
151 
152 fn_fail:
153  if (newthread)
154  *newthread = ABT_THREAD_NULL;
155  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
156  goto fn_exit;
157 }
158 
182 int ABT_thread_create_many(int num, ABT_pool *pool_list,
183  void (**thread_func_list)(void *), void **arg_list,
184  ABT_thread_attr attr, ABT_thread *newthread_list)
185 {
186  int abt_errno = ABT_SUCCESS;
187  ABTI_local *p_local = ABTI_local_get_local();
188  int i;
189 
190  if (attr != ABT_THREAD_ATTR_NULL) {
191  if (ABTI_thread_attr_get_ptr(attr)->stacktype == ABTI_STACK_TYPE_USER) {
192  abt_errno = ABT_ERR_INV_THREAD_ATTR;
193  goto fn_fail;
194  }
195  }
196 
197  if (newthread_list == NULL) {
198  for (i = 0; i < num; i++) {
199  ABT_pool pool = pool_list[i];
200  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
201  ABTI_CHECK_NULL_POOL_PTR(p_pool);
202 
203  void (*thread_f)(void *) = thread_func_list[i];
204  void *arg = arg_list ? arg_list[i] : NULL;
205  abt_errno =
206  ABTI_thread_create_internal(p_local, p_pool, thread_f, arg,
207  ABTI_thread_attr_get_ptr(attr),
208  ABTI_THREAD_TYPE_USER, NULL, 0,
209  NULL, ABT_TRUE, NULL);
210  ABTI_CHECK_ERROR(abt_errno);
211  }
212  } else {
213  for (i = 0; i < num; i++) {
214  ABTI_thread *p_newthread;
215  ABT_pool pool = pool_list[i];
216  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
217  ABTI_CHECK_NULL_POOL_PTR(p_pool);
218 
219  void (*thread_f)(void *) = thread_func_list[i];
220  void *arg = arg_list ? arg_list[i] : NULL;
221  abt_errno =
222  ABTI_thread_create_internal(p_local, p_pool, thread_f, arg,
223  ABTI_thread_attr_get_ptr(attr),
224  ABTI_THREAD_TYPE_USER, NULL, 1,
225  NULL, ABT_TRUE, &p_newthread);
226  newthread_list[i] = ABTI_thread_get_handle(p_newthread);
227  /* TODO: Release threads that have been already created. */
228  ABTI_CHECK_ERROR(abt_errno);
229  }
230  }
231 
232 fn_exit:
233  return abt_errno;
234 
235 fn_fail:
236  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
237  goto fn_exit;
238 }
239 
259 int ABT_thread_revive(ABT_pool pool, void (*thread_func)(void *), void *arg,
260  ABT_thread *thread)
261 {
262  int abt_errno = ABT_SUCCESS;
263  ABTI_local *p_local = ABTI_local_get_local();
264 
265  ABTI_thread *p_thread = ABTI_thread_get_ptr(*thread);
266  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
267 
268  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
269  ABTI_CHECK_NULL_POOL_PTR(p_pool);
270 
271  abt_errno = ABTI_thread_revive(p_local, p_pool, thread_func, arg, p_thread);
272  ABTI_CHECK_ERROR(abt_errno);
273 
274 fn_exit:
275  return abt_errno;
276 
277 fn_fail:
278  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
279  goto fn_exit;
280 }
281 
296 {
297  int abt_errno = ABT_SUCCESS;
298  ABTI_local *p_local = ABTI_local_get_local();
299  ABT_thread h_thread = *thread;
300 
301  ABTI_thread *p_thread = ABTI_thread_get_ptr(h_thread);
302  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
303 
304  /* We first need to check whether p_local is NULL because external
305  * threads might call this routine. */
306  ABTI_CHECK_TRUE_MSG(p_local == NULL || p_thread != p_local->p_thread,
308  "The current thread cannot be freed.");
309 
310  ABTI_CHECK_TRUE_MSG(p_thread->type != ABTI_THREAD_TYPE_MAIN &&
311  p_thread->type != ABTI_THREAD_TYPE_MAIN_SCHED,
313  "The main thread cannot be freed explicitly.");
314 
315  /* Wait until the thread terminates */
316  if (ABTD_atomic_acquire_load_int(&p_thread->state) !=
318  ABTI_thread_join(&p_local, p_thread);
319  }
320 
321  /* Free the ABTI_thread structure */
322  ABTI_thread_free(p_local, p_thread);
323 
324  /* Return value */
325  *thread = ABT_THREAD_NULL;
326 
327 fn_exit:
328  return abt_errno;
329 
330 fn_fail:
331  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
332  goto fn_exit;
333 }
334 
348 int ABT_thread_free_many(int num, ABT_thread *thread_list)
349 {
350  ABTI_local *p_local = ABTI_local_get_local();
351  int i;
352 
353  for (i = 0; i < num; i++) {
354  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread_list[i]);
355  ABTI_thread_join(&p_local, p_thread);
356  ABTI_thread_free(p_local, p_thread);
357  }
358  return ABT_SUCCESS;
359 }
360 
372 {
373  int abt_errno = ABT_SUCCESS;
374  ABTI_local *p_local = ABTI_local_get_local();
375  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
376  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
377  abt_errno = ABTI_thread_join(&p_local, p_thread);
378  ABTI_CHECK_ERROR(abt_errno);
379 
380 fn_exit:
381  return abt_errno;
382 
383 fn_fail:
384  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
385  goto fn_exit;
386 }
387 
400 int ABT_thread_join_many(int num_threads, ABT_thread *thread_list)
401 {
402  int abt_errno = ABT_SUCCESS;
403  ABTI_local *p_local = ABTI_local_get_local();
404  int i;
405  for (i = 0; i < num_threads; i++) {
406  abt_errno =
407  ABTI_thread_join(&p_local, ABTI_thread_get_ptr(thread_list[i]));
408  ABTI_CHECK_ERROR(abt_errno);
409  }
410 
411 fn_exit:
412  return abt_errno;
413 
414 fn_fail:
415  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
416  goto fn_exit;
417 }
418 
431 {
432  int abt_errno = ABT_SUCCESS;
433  ABTI_local *p_local = ABTI_local_get_local();
434 
435  /* In case that Argobots has not been initialized or this routine is called
436  * by an external thread, e.g., pthread, return an error code instead of
437  * making the call fail. */
438  if (gp_ABTI_global == NULL) {
439  abt_errno = ABT_ERR_UNINITIALIZED;
440  goto fn_exit;
441  }
442  if (p_local == NULL) {
443  abt_errno = ABT_ERR_INV_XSTREAM;
444  goto fn_exit;
445  }
446 
447  ABTI_thread *p_thread = p_local->p_thread;
448  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
449 
450  /* Set the exit request */
451  ABTI_thread_set_request(p_thread, ABTI_THREAD_REQ_EXIT);
452 
453  /* Terminate this ULT */
454  ABTD_thread_exit(p_local, p_thread);
455 
456 fn_exit:
457  return abt_errno;
458 
459 fn_fail:
460  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
461  goto fn_exit;
462 }
463 
473 {
474 #ifdef ABT_CONFIG_DISABLE_THREAD_CANCEL
475  return ABT_ERR_FEATURE_NA;
476 #else
477  int abt_errno = ABT_SUCCESS;
478  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
479  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
480 
481  ABTI_CHECK_TRUE_MSG(p_thread->type != ABTI_THREAD_TYPE_MAIN &&
482  p_thread->type != ABTI_THREAD_TYPE_MAIN_SCHED,
484  "The main thread cannot be canceled.");
485 
486  /* Set the cancel request */
487  ABTI_thread_set_request(p_thread, ABTI_THREAD_REQ_CANCEL);
488 
489 fn_exit:
490  return abt_errno;
491 
492 fn_fail:
493  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
494  goto fn_exit;
495 #endif
496 }
497 
515 {
516  int abt_errno = ABT_SUCCESS;
517  ABTI_local *p_local = ABTI_local_get_local();
518 
519 #ifndef ABT_CONFIG_DISABLE_EXT_THREAD
520  /* In case that Argobots has not been initialized or this routine is called
521  * by an external thread, e.g., pthread, return an error code instead of
522  * making the call fail. */
523  if (gp_ABTI_global == NULL) {
524  abt_errno = ABT_ERR_UNINITIALIZED;
525  *thread = ABT_THREAD_NULL;
526  return abt_errno;
527  }
528  if (p_local == NULL) {
529  abt_errno = ABT_ERR_INV_XSTREAM;
530  *thread = ABT_THREAD_NULL;
531  return abt_errno;
532  }
533 #endif
534 
535  ABTI_thread *p_thread = p_local->p_thread;
536  if (p_thread != NULL) {
537  *thread = ABTI_thread_get_handle(p_thread);
538  } else {
539  abt_errno = ABT_ERR_INV_THREAD;
540  *thread = ABT_THREAD_NULL;
541  }
542 
543  return abt_errno;
544 }
545 
560 {
561  int abt_errno = ABT_SUCCESS;
562  ABTI_local *p_local = ABTI_local_get_local();
563 
564 #ifndef ABT_CONFIG_DISABLE_EXT_THREAD
565  /* In case that Argobots has not been initialized or this routine is called
566  * by an external thread, e.g., pthread, return an error code instead of
567  * making the call fail. */
568  if (gp_ABTI_global == NULL) {
569  abt_errno = ABT_ERR_UNINITIALIZED;
570  return abt_errno;
571  }
572  if (p_local == NULL) {
573  abt_errno = ABT_ERR_INV_XSTREAM;
574  return abt_errno;
575  }
576 #endif
577 
578  ABTI_thread *p_thread = p_local->p_thread;
579  if (p_thread != NULL) {
580  *id = ABTI_thread_get_id(p_thread);
581  } else {
582  abt_errno = ABT_ERR_INV_THREAD;
583  }
584 
585  return abt_errno;
586 }
587 
598 {
599  int abt_errno = ABT_SUCCESS;
600 
601  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
602  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
603 
604  /* Return value */
605  *state = (ABT_thread_state)ABTD_atomic_acquire_load_int(&p_thread->state);
606 
607 fn_exit:
608  return abt_errno;
609 
610 fn_fail:
611  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
612  goto fn_exit;
613 }
614 
628 {
629  int abt_errno = ABT_SUCCESS;
630 
631  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
632  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
633 
634  /* Return value */
635  *pool = ABTI_pool_get_handle(p_thread->p_pool);
636 
637 fn_exit:
638  return abt_errno;
639 
640 fn_fail:
641  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
642  goto fn_exit;
643 }
644 
660 {
661  int abt_errno = ABT_SUCCESS;
662 
663  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
664  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
665 
666  ABTI_ASSERT(p_thread->p_pool);
667  *id = (int)(p_thread->p_pool->id);
668 
669 fn_exit:
670  return abt_errno;
671 
672 fn_fail:
673  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
674  goto fn_exit;
675 }
676 
695 {
696  int abt_errno = ABT_SUCCESS;
697 
698  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
699  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
700  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
701  ABTI_CHECK_NULL_POOL_PTR(p_pool);
702 
703  p_thread->p_pool = p_pool;
704 
705 fn_exit:
706  return abt_errno;
707 
708 fn_fail:
709  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
710  goto fn_exit;
711 }
712 
726 {
727  int abt_errno = ABT_SUCCESS;
728  ABTI_local *p_local = ABTI_local_get_local();
729  ABTI_thread *p_cur_thread = NULL;
730 
731 #ifdef ABT_CONFIG_DISABLE_EXT_THREAD
732  p_cur_thread = p_local->p_thread;
733 #else
734  /* If this routine is called by non-ULT, just return. */
735  if (p_local != NULL) {
736  p_cur_thread = p_local->p_thread;
737  }
738  if (p_cur_thread == NULL)
739  goto fn_exit;
740 #endif
741 
742  ABTI_xstream *p_xstream = p_local->p_xstream;
743  ABTI_thread *p_tar_thread = ABTI_thread_get_ptr(thread);
744  ABTI_CHECK_NULL_THREAD_PTR(p_tar_thread);
745  LOG_EVENT("[U%" PRIu64 ":E%d] yield_to -> U%" PRIu64 "\n",
746  ABTI_thread_get_id(p_cur_thread),
747  p_cur_thread->p_last_xstream->rank,
748  ABTI_thread_get_id(p_tar_thread));
749 
750  /* The target ULT must be different from the caller ULT. */
751  ABTI_CHECK_TRUE_MSG(p_cur_thread != p_tar_thread, ABT_ERR_INV_THREAD,
752  "The caller and target ULTs are the same.");
753 
754  ABTI_CHECK_TRUE_MSG(ABTD_atomic_relaxed_load_int(&p_tar_thread->state) !=
757  "Cannot yield to the terminated thread");
758 
759  /* Both threads must be associated with the same pool. */
760  /* FIXME: instead of same pool, runnable by the same ES */
761  ABTI_CHECK_TRUE_MSG(p_cur_thread->p_pool == p_tar_thread->p_pool,
763  "The target thread's pool is not the same as mine.");
764 
765  /* If the target thread is not in READY, we don't yield. */
766  if (ABTI_thread_is_ready(p_tar_thread) == ABT_FALSE) {
767  goto fn_exit;
768  }
769 
770  ABTD_atomic_release_store_int(&p_cur_thread->state, ABT_THREAD_STATE_READY);
771 
772  /* Add the current thread to the pool again */
773  ABTI_POOL_PUSH(p_cur_thread->p_pool, p_cur_thread->unit,
774  ABTI_self_get_native_thread_id(p_local));
775 
776 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
777  /* Delete the last context if the ULT is a scheduler */
778  if (p_cur_thread->is_sched != NULL) {
779  ABTI_xstream_pop_sched(p_xstream);
780  p_cur_thread->is_sched->state = ABT_SCHED_STATE_STOPPED;
781  }
782 #endif
783 
784  /* Remove the target ULT from the pool */
785  ABTI_POOL_REMOVE(p_tar_thread->p_pool, p_tar_thread->unit,
786  ABTI_self_get_native_thread_id(p_local));
787 
788 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
789  /* Add a new scheduler if the ULT is a scheduler */
790  if (p_tar_thread->is_sched != NULL) {
791  p_tar_thread->is_sched->p_ctx = ABTI_xstream_get_sched_ctx(p_xstream);
792  ABTI_xstream_push_sched(p_xstream, p_tar_thread->is_sched);
793  p_tar_thread->is_sched->state = ABT_SCHED_STATE_RUNNING;
794  }
795 #endif
796 
797  /* We set the last ES */
798  p_tar_thread->p_last_xstream = p_xstream;
799 
800  /* Switch the context */
801  ABTD_atomic_release_store_int(&p_tar_thread->state,
803  ABTI_thread_context_switch_thread_to_thread(&p_local, p_cur_thread,
804  p_tar_thread);
805 
806 fn_exit:
807  return abt_errno;
808 
809 fn_fail:
810  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
811  goto fn_exit;
812 }
813 
826 {
827  int abt_errno = ABT_SUCCESS;
828  ABTI_local *p_local = ABTI_local_get_local();
829  ABTI_thread *p_thread = NULL;
830 
831 #ifdef ABT_CONFIG_DISABLE_EXT_THREAD
832  p_thread = p_local->p_thread;
833 #else
834  /* If this routine is called by non-ULT, just return. */
835  if (p_local != NULL) {
836  p_thread = p_local->p_thread;
837  }
838  if (p_thread == NULL)
839  goto fn_exit;
840 #endif
841 
842  ABTI_CHECK_TRUE(p_thread->p_last_xstream == p_local->p_xstream,
844 
845  ABTI_thread_yield(&p_local, p_thread);
846 
847 fn_exit:
848  return abt_errno;
849 
850 fn_fail:
851  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
852  goto fn_exit;
853 }
854 
872 {
873  int abt_errno = ABT_SUCCESS;
874  ABTI_local *p_local = ABTI_local_get_local();
875  ABTI_thread *p_thread;
876 
877  p_thread = ABTI_thread_get_ptr(thread);
878  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
879 
880  abt_errno = ABTI_thread_set_ready(p_local, p_thread);
881  ABTI_CHECK_ERROR(abt_errno);
882 
883 fn_exit:
884  return abt_errno;
885 
886 fn_fail:
887  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
888  goto fn_exit;
889 }
890 
910 {
911 #ifndef ABT_CONFIG_DISABLE_MIGRATION
912  int abt_errno = ABT_SUCCESS;
913  ABTI_local *p_local = ABTI_local_get_local();
914  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
915  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
916  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
917  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
918 
919  abt_errno = ABTI_thread_migrate_to_xstream(&p_local, p_thread, p_xstream);
920  ABTI_CHECK_ERROR(abt_errno);
921 
922 fn_exit:
923  return abt_errno;
924 
925 fn_fail:
926  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
927  goto fn_exit;
928 #else
929  return ABT_ERR_MIGRATION_NA;
930 #endif
931 }
932 
952 {
953 #ifndef ABT_CONFIG_DISABLE_MIGRATION
954  int abt_errno = ABT_SUCCESS;
955  ABTI_local *p_local = ABTI_local_get_local();
956  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
957  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
958  ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
959  ABTI_CHECK_NULL_SCHED_PTR(p_sched);
960 
961  /* checking for cases when migration is not allowed */
962  ABTI_CHECK_TRUE(p_sched->state == ABT_SCHED_STATE_RUNNING,
964  ABTI_CHECK_TRUE(p_thread->type != ABTI_THREAD_TYPE_MAIN &&
965  p_thread->type != ABTI_THREAD_TYPE_MAIN_SCHED,
967  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_thread->state) !=
970 
971  /* Find a pool */
972  ABTI_pool *p_pool;
973  ABTI_sched_get_migration_pool(p_sched, p_thread->p_pool, &p_pool);
974  ABTI_CHECK_NULL_POOL_PTR(p_pool);
975 
976  abt_errno = ABTI_thread_migrate_to_pool(&p_local, p_thread, p_pool);
977  ABTI_CHECK_ERROR(abt_errno);
978 
979  ABTI_pool_inc_num_migrations(p_pool);
980 
981 fn_exit:
982  return abt_errno;
983 
984 fn_fail:
985  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
986  goto fn_exit;
987 #else
988  return ABT_ERR_MIGRATION_NA;
989 #endif
990 }
991 
1008 {
1009 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1010  int abt_errno;
1011  ABTI_local *p_local = ABTI_local_get_local();
1012  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1013  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
1014  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
1015  ABTI_CHECK_NULL_POOL_PTR(p_pool);
1016 
1017  abt_errno = ABTI_thread_migrate_to_pool(&p_local, p_thread, p_pool);
1018  ABTI_CHECK_ERROR(abt_errno);
1019 
1020  ABTI_pool_inc_num_migrations(p_pool);
1021 
1022 fn_exit:
1023  return abt_errno;
1024 
1025 fn_fail:
1026  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1027  goto fn_exit;
1028 #else
1029  return ABT_ERR_MIGRATION_NA;
1030 #endif
1031 }
1032 
1050 {
1051 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1052  /* TODO: fix the bug(s) */
1053  int abt_errno = ABT_SUCCESS;
1054  ABTI_local *p_local = ABTI_local_get_local();
1055  ABTI_xstream *p_xstream;
1056 
1057  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1058  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
1059 
1060  ABTI_xstream **p_xstreams = gp_ABTI_global->p_xstreams;
1061 
1062  /* Choose the destination xstream */
1063  /* FIXME: Currently, the target xstream is randomly chosen. We need a
1064  * better selection strategy. */
1065  /* TODO: handle better when no pool accepts migration */
1066  /* TODO: choose a pool also when (p_thread->p_pool->consumer == NULL) */
1067  while (1) {
1068  /* Only one ES */
1069  if (gp_ABTI_global->num_xstreams == 1) {
1070  abt_errno = ABT_ERR_MIGRATION_NA;
1071  break;
1072  }
1073 
1074  p_xstream = p_xstreams[rand() % gp_ABTI_global->num_xstreams];
1075  if (p_xstream && p_xstream != p_thread->p_last_xstream) {
1076  if (ABTD_atomic_acquire_load_int(&p_xstream->state) ==
1078  abt_errno = ABTI_thread_migrate_to_xstream(&p_local, p_thread,
1079  p_xstream);
1080  if (abt_errno != ABT_ERR_INV_XSTREAM &&
1081  abt_errno != ABT_ERR_MIGRATION_TARGET) {
1082  ABTI_CHECK_ERROR(abt_errno);
1083  break;
1084  }
1085  }
1086  }
1087  }
1088 
1089 fn_exit:
1090  return abt_errno;
1091 
1092 fn_fail:
1093  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1094  goto fn_exit;
1095 #else
1096  return ABT_ERR_MIGRATION_NA;
1097 #endif
1098 }
1099 
1114  void (*cb_func)(ABT_thread thread, void *cb_arg),
1115  void *cb_arg)
1116 {
1117 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1118  int abt_errno = ABT_SUCCESS;
1119  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1120  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
1121 
1122  p_thread->attr.f_cb = cb_func;
1123  p_thread->attr.p_cb_arg = cb_arg;
1124 
1125 fn_exit:
1126  return abt_errno;
1127 
1128 fn_fail:
1129  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1130  goto fn_exit;
1131 #else
1132  return ABT_ERR_FEATURE_NA;
1133 #endif
1134 }
1135 
1152 {
1153 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1154  int abt_errno = ABT_SUCCESS;
1155  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1156  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
1157 
1158  if (p_thread->type == ABTI_THREAD_TYPE_USER) {
1159  p_thread->attr.migratable = flag;
1160  }
1161 
1162 fn_exit:
1163  return abt_errno;
1164 
1165 fn_fail:
1166  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1167  goto fn_exit;
1168 #else
1169  return ABT_ERR_FEATURE_NA;
1170 #endif
1171 }
1172 
1188 {
1189 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1190  int abt_errno = ABT_SUCCESS;
1191  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1192  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
1193 
1194  *flag = p_thread->attr.migratable;
1195 
1196 fn_exit:
1197  return abt_errno;
1198 
1199 fn_fail:
1200  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1201  goto fn_exit;
1202 #else
1203  return ABT_ERR_FEATURE_NA;
1204 #endif
1205 }
1206 
1224 {
1225  int abt_errno = ABT_SUCCESS;
1226  ABTI_thread *p_thread;
1227 
1228  p_thread = ABTI_thread_get_ptr(thread);
1229  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
1230 
1231  *flag = (p_thread->type == ABTI_THREAD_TYPE_MAIN) ? ABT_TRUE : ABT_FALSE;
1232 
1233 fn_exit:
1234  return abt_errno;
1235 
1236 fn_fail:
1237  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1238  goto fn_exit;
1239 }
1240 
1256 int ABT_thread_equal(ABT_thread thread1, ABT_thread thread2, ABT_bool *result)
1257 {
1258  ABTI_thread *p_thread1 = ABTI_thread_get_ptr(thread1);
1259  ABTI_thread *p_thread2 = ABTI_thread_get_ptr(thread2);
1260  *result = (p_thread1 == p_thread2) ? ABT_TRUE : ABT_FALSE;
1261  return ABT_SUCCESS;
1262 }
1263 
1275 int ABT_thread_get_stacksize(ABT_thread thread, size_t *stacksize)
1276 {
1277  int abt_errno = ABT_SUCCESS;
1278 
1279  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1280  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
1281 
1282  /* Return value */
1283  *stacksize = p_thread->attr.stacksize;
1284 
1285 fn_exit:
1286  return abt_errno;
1287 
1288 fn_fail:
1289  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1290  goto fn_exit;
1291 }
1292 
1305 {
1306  int abt_errno = ABT_SUCCESS;
1307 
1308  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1309  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
1310 
1311  *thread_id = ABTI_thread_get_id(p_thread);
1312 
1313 fn_exit:
1314  return abt_errno;
1315 
1316 fn_fail:
1317  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1318  goto fn_exit;
1319 }
1320 
1332 int ABT_thread_set_arg(ABT_thread thread, void *arg)
1333 {
1334  int abt_errno = ABT_SUCCESS;
1335 
1336  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1337  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
1338 
1339  ABTD_thread_context_set_arg(&p_thread->ctx, arg);
1340 
1341 fn_exit:
1342  return abt_errno;
1343 
1344 fn_fail:
1345  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1346  goto fn_exit;
1347 }
1348 
1362 int ABT_thread_get_arg(ABT_thread thread, void **arg)
1363 {
1364  int abt_errno = ABT_SUCCESS;
1365 
1366  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1367  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
1368 
1369  *arg = ABTD_thread_context_get_arg(&p_thread->ctx);
1370 
1371 fn_exit:
1372  return abt_errno;
1373 
1374 fn_fail:
1375  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1376  goto fn_exit;
1377 }
1378 
1395 {
1396  int abt_errno = ABT_SUCCESS;
1397 
1398  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1399  ABTI_CHECK_NULL_THREAD_PTR(p_thread);
1400 
1401  ABTI_thread_attr *p_attr;
1402  p_attr = ABTI_thread_attr_dup(&p_thread->attr);
1403 
1404  *attr = ABTI_thread_attr_get_handle(p_attr);
1405 
1406 fn_exit:
1407  return abt_errno;
1408 
1409 fn_fail:
1410  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1411  goto fn_exit;
1412 }
1413 
1414 /*****************************************************************************/
1415 /* Private APIs */
1416 /*****************************************************************************/
1417 
1418 static inline int ABTI_thread_create_internal(
1419  ABTI_local *p_local, ABTI_pool *p_pool, void (*thread_func)(void *),
1420  void *arg, ABTI_thread_attr *p_attr, ABTI_thread_type thread_type,
1421  ABTI_sched *p_sched, int refcount, ABTI_xstream *p_parent_xstream,
1422  ABT_bool push_pool, ABTI_thread **pp_newthread)
1423 {
1424  int abt_errno = ABT_SUCCESS;
1425  ABTI_thread *p_newthread;
1426  ABT_thread h_newthread;
1427 
1428  /* Allocate a ULT object and its stack, then create a thread context. */
1429  p_newthread = ABTI_mem_alloc_thread(p_local, p_attr);
1430  if ((thread_type == ABTI_THREAD_TYPE_MAIN ||
1431  thread_type == ABTI_THREAD_TYPE_MAIN_SCHED) &&
1432  p_newthread->attr.p_stack == NULL) {
1433  /* We don't need to initialize the context of 1. the main thread, and
1434  * 2. the main scheduler thread which runs on OS-level threads
1435  * (p_stack == NULL). Invalidate the context here. */
1436  abt_errno = ABTD_thread_context_invalidate(&p_newthread->ctx);
1437  } else if (p_sched == NULL) {
1438 #if ABT_CONFIG_THREAD_TYPE != ABT_THREAD_TYPE_DYNAMIC_PROMOTION
1439  size_t stack_size = p_newthread->attr.stacksize;
1440  void *p_stack = p_newthread->attr.p_stack;
1441  abt_errno = ABTD_thread_context_create_thread(NULL, thread_func, arg,
1442  stack_size, p_stack,
1443  &p_newthread->ctx);
1444 #else
1445  /* The context is not fully created now. */
1446  abt_errno =
1447  ABTD_thread_context_init(NULL, thread_func, arg, &p_newthread->ctx);
1448 #endif
1449  } else {
1450  size_t stack_size = p_newthread->attr.stacksize;
1451  void *p_stack = p_newthread->attr.p_stack;
1452  abt_errno =
1453  ABTD_thread_context_create_sched(NULL, thread_func, arg, stack_size,
1454  p_stack, &p_newthread->ctx);
1455  }
1456  ABTI_CHECK_ERROR(abt_errno);
1457 
1458  ABTD_atomic_release_store_int(&p_newthread->state, ABT_THREAD_STATE_READY);
1459  ABTD_atomic_release_store_uint32(&p_newthread->request, 0);
1460  p_newthread->p_last_xstream = NULL;
1461 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
1462  p_newthread->is_sched = p_sched;
1463 #endif
1464  p_newthread->p_pool = p_pool;
1465  p_newthread->refcount = refcount;
1466  p_newthread->type = thread_type;
1467 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1468  p_newthread->p_req_arg = NULL;
1469 #endif
1470  p_newthread->p_keytable = NULL;
1471  p_newthread->id = ABTI_THREAD_INIT_ID;
1472 
1473 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1474  /* Initialize a spinlock */
1475  ABTI_spinlock_clear(&p_newthread->lock);
1476 #endif
1477 
1478 #ifdef ABT_CONFIG_USE_DEBUG_LOG
1479  ABT_thread_id thread_id = ABTI_thread_get_id(p_newthread);
1480  if (thread_type == ABTI_THREAD_TYPE_MAIN) {
1481  LOG_EVENT("[U%" PRIu64 ":E%d] main ULT created\n", thread_id,
1482  p_parent_xstream ? p_parent_xstream->rank : 0);
1483  } else if (thread_type == ABTI_THREAD_TYPE_MAIN_SCHED) {
1484  LOG_EVENT("[U%" PRIu64 ":E%d] main sched ULT created\n", thread_id,
1485  p_parent_xstream ? p_parent_xstream->rank : 0);
1486  } else {
1487  LOG_EVENT("[U%" PRIu64 "] created\n", thread_id);
1488  }
1489 #endif
1490 
1491  /* Create a wrapper unit */
1492  h_newthread = ABTI_thread_get_handle(p_newthread);
1493  if (push_pool) {
1494  p_newthread->unit = p_pool->u_create_from_thread(h_newthread);
1495  /* Add this thread to the pool */
1496 #ifdef ABT_CONFIG_DISABLE_POOL_PRODUCER_CHECK
1497  ABTI_pool_push(p_pool, p_newthread->unit);
1498 #else
1499  abt_errno = ABTI_pool_push(p_pool, p_newthread->unit,
1500  ABTI_self_get_native_thread_id(p_local));
1501  if (abt_errno != ABT_SUCCESS) {
1502  if (thread_type == ABTI_THREAD_TYPE_MAIN) {
1503  ABTI_thread_free_main(p_local, p_newthread);
1504  } else if (thread_type == ABTI_THREAD_TYPE_MAIN_SCHED) {
1505  ABTI_thread_free_main_sched(p_local, p_newthread);
1506  } else {
1507  ABTI_thread_free(p_local, p_newthread);
1508  }
1509  goto fn_fail;
1510  }
1511 #endif
1512  } else {
1513  p_newthread->unit = ABT_UNIT_NULL;
1514  }
1515 
1516  /* Return value */
1517  *pp_newthread = p_newthread;
1518 
1519 fn_exit:
1520  return abt_errno;
1521 
1522 fn_fail:
1523  *pp_newthread = NULL;
1524  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1525  goto fn_exit;
1526 }
1527 
1528 int ABTI_thread_create(ABTI_local *p_local, ABTI_pool *p_pool,
1529  void (*thread_func)(void *), void *arg,
1530  ABTI_thread_attr *p_attr, ABTI_thread **pp_newthread)
1531 {
1532  int abt_errno = ABT_SUCCESS;
1533  int refcount = (pp_newthread != NULL) ? 1 : 0;
1534  abt_errno =
1535  ABTI_thread_create_internal(p_local, p_pool, thread_func, arg, p_attr,
1536  ABTI_THREAD_TYPE_USER, NULL, refcount, NULL,
1537  ABT_TRUE, pp_newthread);
1538  return abt_errno;
1539 }
1540 
1541 int ABTI_thread_migrate_to_pool(ABTI_local **pp_local, ABTI_thread *p_thread,
1542  ABTI_pool *p_pool)
1543 {
1544 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1545  int abt_errno = ABT_SUCCESS;
1546  ABTI_local *p_local = *pp_local;
1547 
1548  /* checking for cases when migration is not allowed */
1549  ABTI_CHECK_TRUE(ABTI_pool_accept_migration(p_pool, p_thread->p_pool) ==
1550  ABT_TRUE,
1552  ABTI_CHECK_TRUE(p_thread->type != ABTI_THREAD_TYPE_MAIN &&
1553  p_thread->type != ABTI_THREAD_TYPE_MAIN_SCHED,
1555  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_thread->state) !=
1558 
1559  /* checking for migration to the same pool */
1560  ABTI_CHECK_TRUE(p_thread->p_pool != p_pool, ABT_ERR_MIGRATION_TARGET);
1561 
1562  /* adding request to the thread */
1563  ABTI_spinlock_acquire(&p_thread->lock);
1564  ABTI_thread_add_req_arg(p_thread, ABTI_THREAD_REQ_MIGRATE, p_pool);
1565  ABTI_spinlock_release(&p_thread->lock);
1566  ABTI_thread_set_request(p_thread, ABTI_THREAD_REQ_MIGRATE);
1567 
1568  /* yielding if it is the same thread */
1569  if (p_local != NULL && p_thread == p_local->p_thread) {
1570  ABTI_thread_yield(pp_local, p_thread);
1571  }
1572  goto fn_exit;
1573 
1574 fn_exit:
1575  return abt_errno;
1576 
1577 fn_fail:
1578  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1579  goto fn_exit;
1580 #else
1581  return ABT_ERR_MIGRATION_NA;
1582 #endif
1583 }
1584 
1585 int ABTI_thread_create_main(ABTI_local *p_local, ABTI_xstream *p_xstream,
1586  ABTI_thread **p_thread)
1587 {
1588  int abt_errno = ABT_SUCCESS;
1589  ABTI_thread_attr attr;
1590  ABTI_thread *p_newthread;
1591  ABTI_pool *p_pool;
1592 
1593  /* Get the first pool of ES */
1594  p_pool = ABTI_pool_get_ptr(p_xstream->p_main_sched->pools[0]);
1595 
1596  /* Allocate a ULT object */
1597 
1598  /* TODO: Need to set the actual stack address and size for the main ULT */
1599  ABTI_thread_attr_init(&attr, NULL, 0, ABTI_STACK_TYPE_MAIN, ABT_FALSE);
1600 
1601  /* Although this main ULT is running now, we add this main ULT to the pool
1602  * so that the scheduler can schedule the main ULT when the main ULT is
1603  * context switched to the scheduler for the first time. */
1604  ABT_bool push_pool = ABT_TRUE;
1605  abt_errno = ABTI_thread_create_internal(p_local, p_pool, NULL, NULL, &attr,
1606  ABTI_THREAD_TYPE_MAIN, NULL, 0,
1607  p_xstream, push_pool, &p_newthread);
1608  ABTI_CHECK_ERROR(abt_errno);
1609 
1610  /* Return value */
1611  *p_thread = p_newthread;
1612 
1613 fn_exit:
1614  return abt_errno;
1615 
1616 fn_fail:
1617  *p_thread = NULL;
1618  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1619  goto fn_exit;
1620 }
1621 
1622 /* This routine is to create a ULT for the main scheduler of ES. */
1623 int ABTI_thread_create_main_sched(ABTI_local *p_local, ABTI_xstream *p_xstream,
1624  ABTI_sched *p_sched)
1625 {
1626  int abt_errno = ABT_SUCCESS;
1627  ABTI_thread *p_newthread;
1628 
1629  /* Create a ULT context */
1630  if (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) {
1631  /* Create a ULT object and its stack */
1632  ABTI_thread_attr attr;
1633  ABTI_thread_attr_init(&attr, NULL, ABTI_global_get_sched_stacksize(),
1634  ABTI_STACK_TYPE_MALLOC, ABT_FALSE);
1635  ABTI_thread *p_main_thread = ABTI_global_get_main();
1636  abt_errno =
1637  ABTI_thread_create_internal(p_local, NULL, ABTI_xstream_schedule,
1638  (void *)p_xstream, &attr,
1639  ABTI_THREAD_TYPE_MAIN_SCHED, p_sched, 0,
1640  p_xstream, ABT_FALSE, &p_newthread);
1641  ABTI_CHECK_ERROR(abt_errno);
1642  /* When the main scheduler is terminated, the control will jump to the
1643  * primary ULT. */
1644  ABTD_atomic_relaxed_store_thread_context_ptr(&p_newthread->ctx.p_link,
1645  &p_main_thread->ctx);
1646  } else {
1647  /* For secondary ESs, the stack of OS thread is used for the main
1648  * scheduler's ULT. */
1649  ABTI_thread_attr attr;
1650  ABTI_thread_attr_init(&attr, NULL, 0, ABTI_STACK_TYPE_MAIN, ABT_FALSE);
1651  abt_errno =
1652  ABTI_thread_create_internal(p_local, NULL, ABTI_xstream_schedule,
1653  (void *)p_xstream, &attr,
1654  ABTI_THREAD_TYPE_MAIN_SCHED, p_sched, 0,
1655  p_xstream, ABT_FALSE, &p_newthread);
1656  ABTI_CHECK_ERROR(abt_errno);
1657  }
1658 
1659  /* Return value */
1660  p_sched->p_thread = p_newthread;
1661  p_sched->p_ctx = &p_newthread->ctx;
1662 
1663 fn_exit:
1664  return abt_errno;
1665 
1666 fn_fail:
1667  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1668  goto fn_exit;
1669 }
1670 
1671 /* This routine is to create a ULT for the scheduler. */
1672 int ABTI_thread_create_sched(ABTI_local *p_local, ABTI_pool *p_pool,
1673  ABTI_sched *p_sched)
1674 {
1675  int abt_errno = ABT_SUCCESS;
1676  ABTI_thread *p_newthread;
1677  ABTI_thread_attr attr;
1678 
1679  /* If p_sched is reused, ABTI_thread_revive() can be used. */
1680  if (p_sched->p_thread) {
1681  ABT_sched h_sched = ABTI_sched_get_handle(p_sched);
1682  abt_errno =
1683  ABTI_thread_revive(p_local, p_pool, (void (*)(void *))p_sched->run,
1684  (void *)h_sched, p_sched->p_thread);
1685  ABTI_CHECK_ERROR(abt_errno);
1686  goto fn_exit;
1687  }
1688 
1689  /* Allocate a ULT object and its stack */
1690  ABTI_thread_attr_init(&attr, NULL, ABTI_global_get_sched_stacksize(),
1691  ABTI_STACK_TYPE_MALLOC, ABT_FALSE);
1692  abt_errno =
1693  ABTI_thread_create_internal(p_local, p_pool,
1694  (void (*)(void *))p_sched->run,
1695  (void *)ABTI_sched_get_handle(p_sched),
1696  &attr, ABTI_THREAD_TYPE_USER, p_sched, 1,
1697  NULL, ABT_TRUE, &p_newthread);
1698  ABTI_CHECK_ERROR(abt_errno);
1699 
1700 fn_exit:
1701  return abt_errno;
1702 
1703 fn_fail:
1704  p_sched->p_thread = NULL;
1705  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1706  goto fn_exit;
1707 }
1708 
1709 static inline void ABTI_thread_free_internal(ABTI_thread *p_thread)
1710 {
1711  /* Free the unit */
1712  p_thread->p_pool->u_free(&p_thread->unit);
1713 
1714  /* Free the context */
1715  ABTD_thread_context_free(&p_thread->ctx);
1716 
1717  /* Free the key-value table */
1718  if (p_thread->p_keytable) {
1719  ABTI_ktable_free(p_thread->p_keytable);
1720  }
1721 }
1722 
1723 void ABTI_thread_free(ABTI_local *p_local, ABTI_thread *p_thread)
1724 {
1725 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1726  /* p_thread's lock may have been acquired somewhere. We free p_thread when
1727  the lock can be acquired here. */
1728  ABTI_spinlock_acquire(&p_thread->lock);
1729 #endif
1730 
1731  LOG_EVENT("[U%" PRIu64 ":E%d] freed\n", ABTI_thread_get_id(p_thread),
1732  p_thread->p_last_xstream->rank);
1733 
1734  ABTI_thread_free_internal(p_thread);
1735 
1736  /* Free ABTI_thread (stack will also be freed) */
1737  ABTI_mem_free_thread(p_local, p_thread);
1738 }
1739 
1740 void ABTI_thread_free_main(ABTI_local *p_local, ABTI_thread *p_thread)
1741 {
1742  LOG_EVENT("[U%" PRIu64 ":E%d] main ULT freed\n",
1743  ABTI_thread_get_id(p_thread), p_thread->p_last_xstream->rank);
1744 
1745  /* Free the key-value table */
1746  if (p_thread->p_keytable) {
1747  ABTI_ktable_free(p_thread->p_keytable);
1748  }
1749 
1750  ABTI_mem_free_thread(p_local, p_thread);
1751 }
1752 
1753 void ABTI_thread_free_main_sched(ABTI_local *p_local, ABTI_thread *p_thread)
1754 {
1755  LOG_EVENT("[U%" PRIu64 ":E%d] main sched ULT freed\n",
1756  ABTI_thread_get_id(p_thread), p_thread->p_last_xstream->rank);
1757 
1758  /* Free the context */
1759  ABTD_thread_context_free(&p_thread->ctx);
1760 
1761  /* Free the key-value table */
1762  if (p_thread->p_keytable) {
1763  ABTI_ktable_free(p_thread->p_keytable);
1764  }
1765 
1766  ABTI_mem_free_thread(p_local, p_thread);
1767 }
1768 
1769 int ABTI_thread_set_blocked(ABTI_thread *p_thread)
1770 {
1771  int abt_errno = ABT_SUCCESS;
1772 
1773  /* The main sched cannot be blocked */
1774  ABTI_CHECK_TRUE(p_thread->type != ABTI_THREAD_TYPE_MAIN_SCHED,
1775  ABT_ERR_THREAD);
1776 
1777  /* To prevent the scheduler from adding the ULT to the pool */
1778  ABTI_thread_set_request(p_thread, ABTI_THREAD_REQ_BLOCK);
1779 
1780  /* Change the ULT's state to BLOCKED */
1781  ABTD_atomic_release_store_int(&p_thread->state, ABT_THREAD_STATE_BLOCKED);
1782 
1783  /* Increase the number of blocked ULTs */
1784  ABTI_pool *p_pool = p_thread->p_pool;
1785  ABTI_pool_inc_num_blocked(p_pool);
1786 
1787  LOG_EVENT("[U%" PRIu64 ":E%d] blocked\n", ABTI_thread_get_id(p_thread),
1788  p_thread->p_last_xstream->rank);
1789 
1790 fn_exit:
1791  return abt_errno;
1792 
1793 fn_fail:
1794  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1795  goto fn_exit;
1796 }
1797 
1798 /* NOTE: This routine should be called after ABTI_thread_set_blocked. */
1799 void ABTI_thread_suspend(ABTI_local **pp_local, ABTI_thread *p_thread)
1800 {
1801  ABTI_local *p_local = *pp_local;
1802  ABTI_ASSERT(p_thread == p_local->p_thread);
1803  ABTI_ASSERT(p_thread->p_last_xstream == p_local->p_xstream);
1804 
1805  /* Switch to the scheduler, i.e., suspend p_thread */
1806  ABTI_xstream *p_xstream = p_local->p_xstream;
1807  ABTI_sched *p_sched = ABTI_xstream_get_top_sched(p_xstream);
1808  LOG_EVENT("[U%" PRIu64 ":E%d] suspended\n", ABTI_thread_get_id(p_thread),
1809  p_xstream->rank);
1810  ABTI_thread_context_switch_thread_to_sched(pp_local, p_thread, p_sched);
1811 
1812  /* The suspended ULT resumes its execution from here. */
1813  LOG_EVENT("[U%" PRIu64 ":E%d] resumed\n", ABTI_thread_get_id(p_thread),
1814  p_thread->p_last_xstream->rank);
1815 }
1816 
1817 int ABTI_thread_set_ready(ABTI_local *p_local, ABTI_thread *p_thread)
1818 {
1819  int abt_errno = ABT_SUCCESS;
1820 
1821  /* The ULT should be in BLOCKED state. */
1822  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_thread->state) ==
1824  ABT_ERR_THREAD);
1825 
1826  /* We should wait until the scheduler of the blocked ULT resets the BLOCK
1827  * request. Otherwise, the ULT can be pushed to a pool here and be
1828  * scheduled by another scheduler if it is pushed to a shared pool. */
1829  while (ABTD_atomic_acquire_load_uint32(&p_thread->request) &
1830  ABTI_THREAD_REQ_BLOCK)
1831  ABTD_atomic_pause();
1832 
1833  LOG_EVENT("[U%" PRIu64 ":E%d] set ready\n", ABTI_thread_get_id(p_thread),
1834  p_thread->p_last_xstream->rank);
1835 
1836  /* p_thread->p_pool is loaded before ABTI_POOL_ADD_THREAD to keep
1837  * num_blocked consistent. Otherwise, other threads might pop p_thread
1838  * that has been pushed in ABTI_POOL_ADD_THREAD and change p_thread->p_pool
1839  * by ABT_unit_set_associated_pool. */
1840  ABTI_pool *p_pool = p_thread->p_pool;
1841 
1842  /* Add the ULT to its associated pool */
1843  ABTI_POOL_ADD_THREAD(p_thread, ABTI_self_get_native_thread_id(p_local));
1844 
1845  /* Decrease the number of blocked threads */
1846  ABTI_pool_dec_num_blocked(p_pool);
1847 
1848 fn_exit:
1849  return abt_errno;
1850 
1851 fn_fail:
1852  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
1853  goto fn_exit;
1854 }
1855 
1856 static inline ABT_bool ABTI_thread_is_ready(ABTI_thread *p_thread)
1857 {
1858  /* ULT can be regarded as 'ready' only if its state is READY and it has been
1859  * pushed into a pool. Since we set ULT's state to READY and then push it
1860  * into a pool, we check them in the reverse order, i.e., check if the ULT
1861  * is inside a pool and the its state. */
1862  ABTI_pool *p_pool = p_thread->p_pool;
1863  if (p_pool->u_is_in_pool(p_thread->unit) == ABT_TRUE &&
1864  ABTD_atomic_acquire_load_int(&p_thread->state) ==
1866  return ABT_TRUE;
1867  }
1868 
1869  return ABT_FALSE;
1870 }
1871 
1872 void ABTI_thread_print(ABTI_thread *p_thread, FILE *p_os, int indent)
1873 {
1874  char *prefix = ABTU_get_indent_str(indent);
1875 
1876  if (p_thread == NULL) {
1877  fprintf(p_os, "%s== NULL ULT ==\n", prefix);
1878  goto fn_exit;
1879  }
1880 
1881  ABTI_xstream *p_xstream = p_thread->p_last_xstream;
1882  int xstream_rank = p_xstream ? p_xstream->rank : 0;
1883  char *type, *state;
1884  char attr[100];
1885 
1886  switch (p_thread->type) {
1887  case ABTI_THREAD_TYPE_MAIN:
1888  type = "MAIN";
1889  break;
1890  case ABTI_THREAD_TYPE_MAIN_SCHED:
1891  type = "MAIN_SCHED";
1892  break;
1893  case ABTI_THREAD_TYPE_USER:
1894  type = "USER";
1895  break;
1896  default:
1897  type = "UNKNOWN";
1898  break;
1899  }
1900  switch (ABTD_atomic_acquire_load_int(&p_thread->state)) {
1902  state = "READY";
1903  break;
1905  state = "RUNNING";
1906  break;
1908  state = "BLOCKED";
1909  break;
1911  state = "TERMINATED";
1912  break;
1913  default:
1914  state = "UNKNOWN";
1915  break;
1916  }
1917  ABTI_thread_attr_get_str(&p_thread->attr, attr);
1918 
1919  fprintf(p_os,
1920  "%s== ULT (%p) ==\n"
1921  "%sid : %" PRIu64 "\n"
1922  "%stype : %s\n"
1923  "%sstate : %s\n"
1924  "%slast_ES : %p (%d)\n"
1925 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
1926  "%sis_sched: %p\n"
1927 #endif
1928  "%spool : %p\n"
1929  "%srefcount: %u\n"
1930  "%srequest : 0x%x\n"
1931 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1932  "%sreq_arg : %p\n"
1933 #endif
1934  "%skeytable: %p\n"
1935  "%sattr : %s\n",
1936  prefix, (void *)p_thread, prefix, ABTI_thread_get_id(p_thread),
1937  prefix, type, prefix, state, prefix, (void *)p_xstream,
1938  xstream_rank,
1939 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
1940  prefix, (void *)p_thread->is_sched,
1941 #endif
1942  prefix, (void *)p_thread->p_pool, prefix, p_thread->refcount,
1943  prefix, ABTD_atomic_acquire_load_uint32(&p_thread->request),
1944 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1945  prefix, (void *)p_thread->p_req_arg,
1946 #endif
1947  prefix, (void *)p_thread->p_keytable, prefix, attr);
1948 
1949 fn_exit:
1950  fflush(p_os);
1951  ABTU_free(prefix);
1952 }
1953 
1954 int ABTI_thread_print_stack(ABTI_thread *p_thread, FILE *p_os)
1955 {
1956  void *p_stack = p_thread->attr.p_stack;
1957  size_t i, j, stacksize = p_thread->attr.stacksize;
1958  if (stacksize == 0 || p_stack == NULL) {
1959  /* Some threads do not have p_stack (e.g., the main thread) */
1960  return ABT_ERR_THREAD;
1961  }
1962 
1963  const size_t value_width = 8;
1964  const int num_bytes = 32;
1965  char *buffer = (char *)alloca(num_bytes);
1966  for (i = 0; i < stacksize; i += num_bytes) {
1967  if (stacksize >= i + num_bytes) {
1968  memcpy(buffer, &((uint8_t *)p_stack)[i], num_bytes);
1969  } else {
1970  memset(buffer, 0, num_bytes);
1971  memcpy(buffer, &((uint8_t *)p_stack)[i], stacksize - i);
1972  }
1973  /* Print the stack address */
1974 #if SIZEOF_VOID_P == 8
1975  fprintf(p_os, "%016" PRIxPTR ":",
1976  (uintptr_t)(&((uint8_t *)p_stack)[i]));
1977 #elif SIZEOF_VOID_P == 4
1978  fprintf(p_os, "%08" PRIxPTR ":", (uintptr_t)(&((uint8_t *)p_stack)[i]));
1979 #else
1980 #error "unknown pointer size"
1981 #endif
1982  /* Print the raw stack data */
1983  for (j = 0; j < num_bytes / value_width; j++) {
1984  if (value_width == 8) {
1985  uint64_t val = ((uint64_t *)buffer)[j];
1986  fprintf(p_os, " %016" PRIx64, val);
1987  } else if (value_width == 4) {
1988  uint32_t val = ((uint32_t *)buffer)[j];
1989  fprintf(p_os, " %08" PRIx32, val);
1990  } else if (value_width == 2) {
1991  uint16_t val = ((uint16_t *)buffer)[j];
1992  fprintf(p_os, " %04" PRIx16, val);
1993  } else {
1994  uint8_t val = ((uint8_t *)buffer)[j];
1995  fprintf(p_os, " %02" PRIx8, val);
1996  }
1997  if (j == (num_bytes / value_width) - 1)
1998  fprintf(p_os, "\n");
1999  }
2000  }
2001  return ABT_SUCCESS;
2002 }
2003 
2004 #ifndef ABT_CONFIG_DISABLE_MIGRATION
2005 void ABTI_thread_add_req_arg(ABTI_thread *p_thread, uint32_t req, void *arg)
2006 {
2007  ABTI_thread_req_arg *new;
2008  ABTI_thread_req_arg *p_head = p_thread->p_req_arg;
2009 
2010  /* Overwrite the previous same request if exists */
2011  while (p_head != NULL) {
2012  if (p_head->request == req) {
2013  p_head->p_arg = arg;
2014  return;
2015  }
2016  }
2017 
2018  new = (ABTI_thread_req_arg *)ABTU_malloc(sizeof(ABTI_thread_req_arg));
2019 
2020  /* filling the new argument data structure */
2021  new->request = req;
2022  new->p_arg = arg;
2023  new->next = NULL;
2024 
2025  if (p_head == NULL) {
2026  p_thread->p_req_arg = new;
2027  } else {
2028  while (p_head->next != NULL)
2029  p_head = p_head->next;
2030  p_head->next = new;
2031  }
2032 }
2033 
2034 void *ABTI_thread_extract_req_arg(ABTI_thread *p_thread, uint32_t req)
2035 {
2036  void *result = NULL;
2037  ABTI_thread_req_arg *p_last = NULL, *p_head = p_thread->p_req_arg;
2038 
2039  while (p_head != NULL) {
2040  if (p_head->request == req) {
2041  result = p_head->p_arg;
2042  if (p_last == NULL)
2043  p_thread->p_req_arg = p_head->next;
2044  else
2045  p_last->next = p_head->next;
2046  ABTU_free(p_head);
2047  break;
2048  }
2049  p_last = p_head;
2050  p_head = p_head->next;
2051  }
2052 
2053  return result;
2054 }
2055 
2056 void ABTI_thread_put_req_arg(ABTI_thread *p_thread,
2057  ABTI_thread_req_arg *p_req_arg)
2058 {
2059  ABTI_spinlock_acquire(&p_thread->lock);
2060  ABTI_thread_req_arg *p_head = p_thread->p_req_arg;
2061 
2062  if (p_head == NULL) {
2063  p_thread->p_req_arg = p_req_arg;
2064  } else {
2065  while (p_head->next != NULL) {
2066  p_head = p_head->next;
2067  }
2068  p_head->next = p_req_arg;
2069  }
2070  ABTI_spinlock_release(&p_thread->lock);
2071 }
2072 
2073 ABTI_thread_req_arg *ABTI_thread_get_req_arg(ABTI_thread *p_thread,
2074  uint32_t req)
2075 {
2076  ABTI_thread_req_arg *p_result = NULL;
2077  ABTI_thread_req_arg *p_last = NULL;
2078 
2079  ABTI_spinlock_acquire(&p_thread->lock);
2080  ABTI_thread_req_arg *p_head = p_thread->p_req_arg;
2081  while (p_head != NULL) {
2082  if (p_head->request == req) {
2083  p_result = p_head;
2084  if (p_last == NULL)
2085  p_thread->p_req_arg = p_head->next;
2086  else
2087  p_last->next = p_head->next;
2088  break;
2089  }
2090  p_last = p_head;
2091  p_head = p_head->next;
2092  }
2093  ABTI_spinlock_release(&p_thread->lock);
2094 
2095  return p_result;
2096 }
2097 #endif /* ABT_CONFIG_DISABLE_MIGRATION */
2098 
2099 static ABTD_atomic_uint64 g_thread_id =
2100  ABTD_ATOMIC_UINT64_STATIC_INITIALIZER(0);
2101 void ABTI_thread_reset_id(void)
2102 {
2103  ABTD_atomic_release_store_uint64(&g_thread_id, 0);
2104 }
2105 
2106 ABT_thread_id ABTI_thread_get_id(ABTI_thread *p_thread)
2107 {
2108  if (p_thread == NULL)
2109  return ABTI_THREAD_INIT_ID;
2110 
2111  if (p_thread->id == ABTI_THREAD_INIT_ID) {
2112  p_thread->id = ABTI_thread_get_new_id();
2113  }
2114  return p_thread->id;
2115 }
2116 
2117 ABT_thread_id ABTI_thread_self_id(ABTI_local *p_local)
2118 {
2119  ABTI_thread *p_self = NULL;
2120  if (p_local)
2121  p_self = p_local->p_thread;
2122  return ABTI_thread_get_id(p_self);
2123 }
2124 
2125 int ABTI_thread_get_xstream_rank(ABTI_thread *p_thread)
2126 {
2127  if (p_thread == NULL)
2128  return -1;
2129 
2130  if (p_thread->p_last_xstream) {
2131  return p_thread->p_last_xstream->rank;
2132  } else {
2133  return -1;
2134  }
2135 }
2136 
2137 int ABTI_thread_self_xstream_rank(ABTI_local *p_local)
2138 {
2139  ABTI_thread *p_self = NULL;
2140  if (p_local)
2141  p_self = p_local->p_thread;
2142  return ABTI_thread_get_xstream_rank(p_self);
2143 }
2144 
2145 /*****************************************************************************/
2146 /* Internal static functions */
2147 /*****************************************************************************/
2148 
2149 static int ABTI_thread_revive(ABTI_local *p_local, ABTI_pool *p_pool,
2150  void (*thread_func)(void *), void *arg,
2151  ABTI_thread *p_thread)
2152 {
2153  int abt_errno = ABT_SUCCESS;
2154  size_t stacksize;
2155 
2156  ABTI_CHECK_TRUE(ABTD_atomic_relaxed_load_int(&p_thread->state) ==
2159 
2160  /* Create a ULT context */
2161  stacksize = p_thread->attr.stacksize;
2162 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
2163  if (p_thread->is_sched) {
2164  abt_errno =
2165  ABTD_thread_context_create_sched(NULL, thread_func, arg, stacksize,
2166  p_thread->attr.p_stack,
2167  &p_thread->ctx);
2168  } else {
2169 #endif
2170  abt_errno =
2171  ABTD_thread_context_create_thread(NULL, thread_func, arg, stacksize,
2172  p_thread->attr.p_stack,
2173  &p_thread->ctx);
2174 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
2175  }
2176 #endif
2177  ABTI_CHECK_ERROR(abt_errno);
2178 
2179  ABTD_atomic_relaxed_store_int(&p_thread->state, ABT_THREAD_STATE_READY);
2180  ABTD_atomic_relaxed_store_uint32(&p_thread->request, 0);
2181  p_thread->p_last_xstream = NULL;
2182  p_thread->refcount = 1;
2183  p_thread->type = ABTI_THREAD_TYPE_USER;
2184 
2185  if (p_thread->p_pool != p_pool) {
2186  /* Free the unit for the old pool */
2187  p_thread->p_pool->u_free(&p_thread->unit);
2188 
2189  /* Set the new pool */
2190  p_thread->p_pool = p_pool;
2191 
2192  /* Create a wrapper unit */
2193  ABT_thread h_thread = ABTI_thread_get_handle(p_thread);
2194  p_thread->unit = p_pool->u_create_from_thread(h_thread);
2195  }
2196 
2197  LOG_EVENT("[U%" PRIu64 "] revived\n", ABTI_thread_get_id(p_thread));
2198 
2199  /* Add this thread to the pool */
2200 #ifdef ABT_CONFIG_DISABLE_POOL_PRODUCER_CHECK
2201  ABTI_pool_push(p_pool, p_thread->unit);
2202 #else
2203  abt_errno = ABTI_pool_push(p_pool, p_thread->unit,
2204  ABTI_self_get_native_thread_id(p_local));
2205  ABTI_CHECK_ERROR(abt_errno);
2206 #endif
2207 
2208 fn_exit:
2209  return abt_errno;
2210 
2211 fn_fail:
2212  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
2213  goto fn_exit;
2214 }
2215 
2216 static inline int ABTI_thread_join(ABTI_local **pp_local, ABTI_thread *p_thread)
2217 {
2218  int abt_errno = ABT_SUCCESS;
2219 
2220  if (ABTD_atomic_acquire_load_int(&p_thread->state) ==
2222  return abt_errno;
2223 
2224  ABTI_CHECK_TRUE_MSG(p_thread->type != ABTI_THREAD_TYPE_MAIN &&
2225  p_thread->type != ABTI_THREAD_TYPE_MAIN_SCHED,
2226  ABT_ERR_INV_THREAD, "The main ULT cannot be joined.");
2227 
2228  ABTI_local *p_local = *pp_local;
2229 #ifndef ABT_CONFIG_DISABLE_EXT_THREAD
2230  ABT_unit_type type = ABTI_self_get_type(p_local);
2231  if (type != ABT_UNIT_TYPE_THREAD)
2232  goto busywait_based;
2233 #endif
2234 
2235  ABTI_CHECK_TRUE_MSG(p_thread != p_local->p_thread, ABT_ERR_INV_THREAD,
2236  "The target ULT should be different.");
2237 
2238  ABTI_thread *p_self = p_local->p_thread;
2239  ABT_pool_access access = p_self->p_pool->access;
2240 
2241  if ((p_self->p_pool == p_thread->p_pool) &&
2242  (access == ABT_POOL_ACCESS_PRIV || access == ABT_POOL_ACCESS_MPSC ||
2243  access == ABT_POOL_ACCESS_SPSC) &&
2244  (ABTD_atomic_acquire_load_int(&p_thread->state) ==
2246 
2247  ABTI_xstream *p_xstream = p_self->p_last_xstream;
2248 
2249  /* If other ES is calling ABTI_thread_set_ready(), p_thread may not
2250  * have been added to the pool yet because ABTI_thread_set_ready()
2251  * changes the state first followed by pushing p_thread to the pool.
2252  * Therefore, we have to check whether p_thread is in the pool, and if
2253  * not, we need to wait until it is added. */
2254  while (p_thread->p_pool->u_is_in_pool(p_thread->unit) != ABT_TRUE) {
2255  }
2256 
2257  /* Increase the number of blocked units. Be sure to execute
2258  * ABTI_pool_inc_num_blocked before ABTI_POOL_REMOVE in order not to
2259  * underestimate the number of units in a pool. */
2260  ABTI_pool_inc_num_blocked(p_self->p_pool);
2261  /* Remove the target ULT from the pool */
2262  ABTI_POOL_REMOVE(p_thread->p_pool, p_thread->unit,
2263  ABTI_self_get_native_thread_id(p_local));
2264 
2265  /* Set the link in the context for the target ULT. Since p_link will be
2266  * referenced by p_self, this update does not require release store. */
2267  ABTD_atomic_relaxed_store_thread_context_ptr(&p_thread->ctx.p_link,
2268  &p_self->ctx);
2269  /* Set the last ES */
2270  p_thread->p_last_xstream = p_xstream;
2271  ABTD_atomic_release_store_int(&p_thread->state,
2273 
2274  /* Make the current ULT BLOCKED */
2275  ABTD_atomic_release_store_int(&p_self->state, ABT_THREAD_STATE_BLOCKED);
2276 
2277  LOG_EVENT("[U%" PRIu64 ":E%d] blocked to join U%" PRIu64 "\n",
2278  ABTI_thread_get_id(p_self), p_self->p_last_xstream->rank,
2279  ABTI_thread_get_id(p_thread));
2280  LOG_EVENT("[U%" PRIu64 ":E%d] start running\n",
2281  ABTI_thread_get_id(p_thread), p_thread->p_last_xstream->rank);
2282 
2283  /* Switch the context */
2284  ABTI_thread_context_switch_thread_to_thread(pp_local, p_self, p_thread);
2285  p_local = *pp_local;
2286 
2287  } else if ((p_self->p_pool != p_thread->p_pool) &&
2288  (access == ABT_POOL_ACCESS_PRIV ||
2289  access == ABT_POOL_ACCESS_SPSC)) {
2290  /* FIXME: once we change the suspend/resume mechanism (i.e., asking the
2291  * scheduler to wake up the blocked ULT), we will be able to handle all
2292  * access modes. */
2293  goto yield_based;
2294 
2295  } else {
2296  /* Tell p_thread that there has been a join request. */
2297  /* If request already has ABTI_THREAD_REQ_JOIN, p_thread is terminating.
2298  * We can't block p_self in this case. */
2299  uint32_t req = ABTD_atomic_fetch_or_uint32(&p_thread->request,
2300  ABTI_THREAD_REQ_JOIN);
2301  if (req & ABTI_THREAD_REQ_JOIN)
2302  goto yield_based;
2303 
2304  ABTI_thread_set_blocked(p_self);
2305  LOG_EVENT("[U%" PRIu64 ":E%d] blocked to join U%" PRIu64 "\n",
2306  ABTI_thread_get_id(p_self), p_self->p_last_xstream->rank,
2307  ABTI_thread_get_id(p_thread));
2308 
2309  /* Set the link in the context of the target ULT. This p_link might be
2310  * read by p_thread running on another ES in parallel, so release-store
2311  * is needed here. */
2312  ABTD_atomic_release_store_thread_context_ptr(&p_thread->ctx.p_link,
2313  &p_self->ctx);
2314 
2315  /* Suspend the current ULT */
2316  ABTI_thread_suspend(pp_local, p_self);
2317  p_local = *pp_local;
2318  }
2319 
2320  /* Resume */
2321  /* If p_self's state is BLOCKED, the target ULT has terminated on the same
2322  * ES as p_self's ES and the control has come from the target ULT.
2323  * Otherwise, the target ULT had been migrated to a different ES, p_self
2324  * has been resumed by p_self's scheduler. In the latter case, we don't
2325  * need to change p_self's state. */
2326  if (ABTD_atomic_relaxed_load_int(&p_self->state) ==
2328  ABTD_atomic_release_store_int(&p_self->state, ABT_THREAD_STATE_RUNNING);
2329  ABTI_pool_dec_num_blocked(p_self->p_pool);
2330  LOG_EVENT("[U%" PRIu64 ":E%d] resume after join\n",
2331  ABTI_thread_get_id(p_self), p_self->p_last_xstream->rank);
2332  return abt_errno;
2333  }
2334 
2335 yield_based:
2336  while (ABTD_atomic_acquire_load_int(&p_thread->state) !=
2338  ABTI_thread_yield(pp_local, p_local->p_thread);
2339  p_local = *pp_local;
2340  }
2341  goto fn_exit;
2342 
2343 #ifndef ABT_CONFIG_DISABLE_EXT_THREAD
2344 busywait_based:
2345 #endif
2346  while (ABTD_atomic_acquire_load_int(&p_thread->state) !=
2348  ABTD_atomic_pause();
2349  }
2350 
2351 fn_exit:
2352  return abt_errno;
2353 
2354 fn_fail:
2355  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
2356  goto fn_exit;
2357 }
2358 
2359 #ifndef ABT_CONFIG_DISABLE_MIGRATION
2360 static int ABTI_thread_migrate_to_xstream(ABTI_local **pp_local,
2361  ABTI_thread *p_thread,
2362  ABTI_xstream *p_xstream)
2363 {
2364  int abt_errno = ABT_SUCCESS;
2365 
2366  /* checking for cases when migration is not allowed */
2367  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_xstream->state) !=
2370  ABTI_CHECK_TRUE(p_thread->type != ABTI_THREAD_TYPE_MAIN &&
2371  p_thread->type != ABTI_THREAD_TYPE_MAIN_SCHED,
2373  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_thread->state) !=
2376 
2377  /* We need to find the target scheduler */
2378  ABTI_pool *p_pool = NULL;
2379  ABTI_sched *p_sched = NULL;
2380  do {
2381  ABTI_spinlock_acquire(&p_xstream->sched_lock);
2382 
2383  /* We check the state of the ES */
2384  if (ABTD_atomic_acquire_load_int(&p_xstream->state) ==
2386  abt_errno = ABT_ERR_INV_XSTREAM;
2387  ABTI_spinlock_release(&p_xstream->sched_lock);
2388  goto fn_fail;
2389 
2390  } else if (ABTD_atomic_acquire_load_int(&p_xstream->state) ==
2392  p_sched = ABTI_xstream_get_top_sched(p_xstream);
2393 
2394  } else {
2395  p_sched = p_xstream->p_main_sched;
2396  }
2397 
2398  /* We check the state of the sched */
2399  if (p_sched->state == ABT_SCHED_STATE_TERMINATED) {
2400  abt_errno = ABT_ERR_INV_XSTREAM;
2401  ABTI_spinlock_release(&p_xstream->sched_lock);
2402  goto fn_fail;
2403  } else {
2404  /* Find a pool */
2405  ABTI_sched_get_migration_pool(p_sched, p_thread->p_pool, &p_pool);
2406  if (p_pool == NULL) {
2407  abt_errno = ABT_ERR_INV_POOL;
2408  ABTI_spinlock_release(&p_xstream->sched_lock);
2409  goto fn_fail;
2410  }
2411  /* We set the migration counter to prevent the scheduler from
2412  * stopping */
2413  ABTI_pool_inc_num_migrations(p_pool);
2414  }
2415  ABTI_spinlock_release(&p_xstream->sched_lock);
2416  } while (p_pool == NULL);
2417 
2418  abt_errno = ABTI_thread_migrate_to_pool(pp_local, p_thread, p_pool);
2419  if (abt_errno != ABT_SUCCESS) {
2420  ABTI_pool_dec_num_migrations(p_pool);
2421  goto fn_fail;
2422  }
2423 
2424 fn_exit:
2425  return abt_errno;
2426 
2427 fn_fail:
2428  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
2429  goto fn_exit;
2430 }
2431 #endif
2432 
2433 static inline ABT_thread_id ABTI_thread_get_new_id(void)
2434 {
2435  return (ABT_thread_id)ABTD_atomic_fetch_add_uint64(&g_thread_id, 1);
2436 }
struct ABT_thread_attr_opaque * ABT_thread_attr
Definition: abt.h:281
int ABT_thread_exit(void)
The calling ULT terminates its execution.
Definition: thread.c:430
int ABT_thread_get_last_pool_id(ABT_thread thread, int *id)
Get the last pool&#39;s ID of the ULT.
Definition: thread.c:659
int ABT_thread_get_state(ABT_thread thread, ABT_thread_state *state)
Return the state of thread.
Definition: thread.c:597
struct ABT_xstream_opaque * ABT_xstream
Definition: abt.h:251
struct ABT_sched_opaque * ABT_sched
Definition: abt.h:257
int ABT_thread_join_many(int num_threads, ABT_thread *thread_list)
Wait for a number of ULTs to terminate.
Definition: thread.c:400
char * ABTU_get_indent_str(int indent)
Definition: util.c:12
int ABT_thread_cancel(ABT_thread thread)
Request the cancellation of the target thread.
Definition: thread.c:472
uint64_t ABT_thread_id
Definition: abt.h:287
static ABTD_atomic_uint64 g_thread_id
Definition: thread.c:2099
#define ABT_ERR_INV_POOL
Definition: abt.h:76
#define ABT_UNIT_NULL
Definition: abt.h:343
#define ABT_ERR_INV_THREAD
Definition: abt.h:80
int ABT_thread_free(ABT_thread *thread)
Release the thread object associated with thread handle.
Definition: thread.c:295
int ABT_thread_get_attr(ABT_thread thread, ABT_thread_attr *attr)
Get attributes of the target ULT.
Definition: thread.c:1394
int ABT_thread_resume(ABT_thread thread)
Resume the target ULT.
Definition: thread.c:871
static void * ABTU_malloc(size_t size)
Definition: abtu.h:39
int ABT_bool
Definition: abt.h:309
int ABT_thread_migrate(ABT_thread thread)
Request migration of the thread to an any available ES.
Definition: thread.c:1049
int ABT_thread_migrate_to_xstream(ABT_thread thread, ABT_xstream xstream)
Migrate a thread to a specific ES.
Definition: thread.c:909
struct ABT_pool_opaque * ABT_pool
Definition: abt.h:267
int ABT_thread_create(ABT_pool pool, void(*thread_func)(void *), void *arg, ABT_thread_attr attr, ABT_thread *newthread)
Create a new thread and return its handle through newthread.
Definition: thread.c:53
int ABT_thread_set_callback(ABT_thread thread, void(*cb_func)(ABT_thread thread, void *cb_arg), void *cb_arg)
Set the callback function.
Definition: thread.c:1113
#define ABT_ERR_THREAD
Definition: abt.h:100
#define ABT_FALSE
Definition: abt.h:224
int ABT_thread_yield(void)
Yield the processor from the current running ULT back to the scheduler.
Definition: thread.c:825
int ABT_thread_get_last_pool(ABT_thread thread, ABT_pool *pool)
Return the last pool of ULT.
Definition: thread.c:627
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:279
int ABT_thread_self_id(ABT_thread_id *id)
Return the calling ULT&#39;s ID.
Definition: thread.c:559
int ABT_thread_free_many(int num, ABT_thread *thread_list)
Release a set of ULT objects.
Definition: thread.c:348
#define HANDLE_ERROR_FUNC_WITH_CODE(n)
Definition: abti_error.h:241
int ABT_thread_get_stacksize(ABT_thread thread, size_t *stacksize)
Get the ULT&#39;s stack size.
Definition: thread.c:1275
int ABT_thread_is_migratable(ABT_thread thread, ABT_bool *flag)
Get the ULT&#39;s migratability.
Definition: thread.c:1187
ABTI_global * gp_ABTI_global
Definition: global.c:14
#define ABT_SUCCESS
Definition: abt.h:64
ABT_pool_access
Definition: abt.h:162
#define LOG_EVENT(fmt,...)
Definition: abti_log.h:60
int ABT_thread_is_primary(ABT_thread thread, ABT_bool *flag)
Check if the target ULT is the primary ULT.
Definition: thread.c:1223
#define ABT_TRUE
Definition: abt.h:223
ABT_unit_type
Definition: abt.h:170
int ABT_thread_create_many(int num, ABT_pool *pool_list, void(**thread_func_list)(void *), void **arg_list, ABT_thread_attr attr, ABT_thread *newthread_list)
Create a set of ULTs.
Definition: thread.c:182
int ABT_thread_set_migratable(ABT_thread thread, ABT_bool flag)
Set the ULT&#39;s migratability.
Definition: thread.c:1151
#define ABT_THREAD_ATTR_NULL
Definition: abt.h:345
int ABT_thread_join(ABT_thread thread)
Wait for thread to terminate.
Definition: thread.c:371
#define ABT_ERR_UNINITIALIZED
Definition: abt.h:65
int ABT_thread_migrate_to_pool(ABT_thread thread, ABT_pool pool)
Migrate a thread to a specific pool.
Definition: thread.c:1007
int ABT_thread_set_associated_pool(ABT_thread thread, ABT_pool pool)
Set the associated pool for the target ULT.
Definition: thread.c:694
int ABT_thread_get_id(ABT_thread thread, ABT_thread_id *thread_id)
Get the ULT&#39;s id.
Definition: thread.c:1304
#define ABT_THREAD_NULL
Definition: abt.h:344
int ABT_thread_set_arg(ABT_thread thread, void *arg)
Set the argument for the ULT function.
Definition: thread.c:1332
#define ABT_ERR_FEATURE_NA
Definition: abt.h:115
#define ABT_ERR_MIGRATION_NA
Definition: abt.h:113
int ABT_thread_get_arg(ABT_thread thread, void **arg)
Retrieve the argument for the ULT function.
Definition: thread.c:1362
#define ABT_ERR_MIGRATION_TARGET
Definition: abt.h:112
int ABT_thread_create_on_xstream(ABT_xstream xstream, void(*thread_func)(void *), void *arg, ABT_thread_attr attr, ABT_thread *newthread)
Create a new ULT associated with the target ES (xstream).
Definition: thread.c:124
int ABT_thread_yield_to(ABT_thread thread)
Yield the processor from the current running thread to the specific thread.
Definition: thread.c:725
ABT_thread_state
Definition: abt.h:124
#define ABT_ERR_INV_XSTREAM
Definition: abt.h:68
int ABT_thread_revive(ABT_pool pool, void(*thread_func)(void *), void *arg, ABT_thread *thread)
Revive the ULT.
Definition: thread.c:259
#define ABT_ERR_INV_THREAD_ATTR
Definition: abt.h:81
int ABT_thread_equal(ABT_thread thread1, ABT_thread thread2, ABT_bool *result)
Compare two ULT handles for equality.
Definition: thread.c:1256
static void ABTU_free(void *ptr)
Definition: abtu.h:32
int ABT_thread_self(ABT_thread *thread)
Return the handle of the calling ULT.
Definition: thread.c:514
int ABT_thread_migrate_to_sched(ABT_thread thread, ABT_sched sched)
Migrate a thread to a specific scheduler.
Definition: thread.c:951