ARGOBOTS  4dc37e16e1b227a480715ab67dae1dcfb4d2d4e0
task.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 ABTU_ret_err static int task_create(ABTI_global *p_global, ABTI_local *p_local,
9  ABTI_pool *p_pool,
10  void (*task_func)(void *), void *arg,
11  ABTI_sched *p_sched, int refcount,
12  ABTI_thread **pp_newtask);
13 
56 int ABT_task_create(ABT_pool pool, void (*task_func)(void *), void *arg,
57  ABT_task *newtask)
58 {
60  ABTI_UB_ASSERT(task_func);
61 
62 #ifndef ABT_CONFIG_ENABLE_VER_20_API
63  /* Argobots 1.x sets newtask to NULL on error. */
64  if (newtask)
65  *newtask = ABT_TASK_NULL;
66 #endif
67  ABTI_global *p_global = ABTI_global_get_global();
68  ABTI_local *p_local = ABTI_local_get_local();
69  ABTI_thread *p_newtask;
70  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
72 
73  int refcount = (newtask != NULL) ? 1 : 0;
74  int abt_errno = task_create(p_global, p_local, p_pool, task_func, arg, NULL,
75  refcount, &p_newtask);
76  ABTI_CHECK_ERROR(abt_errno);
77 
78  /* Return value */
79  if (newtask)
80  *newtask = ABTI_thread_get_handle(p_newtask);
81  return ABT_SUCCESS;
82 }
83 
123 int ABT_task_create_on_xstream(ABT_xstream xstream, void (*task_func)(void *),
124  void *arg, ABT_task *newtask)
125 {
127  ABTI_UB_ASSERT(task_func);
128 
129 #ifndef ABT_CONFIG_ENABLE_VER_20_API
130  /* Argobots 1.x sets newtask to NULL on error. */
131  if (newtask)
132  *newtask = ABT_TASK_NULL;
133 #endif
134  ABTI_global *p_global = ABTI_global_get_global();
135  ABTI_local *p_local = ABTI_local_get_local();
136  ABTI_thread *p_newtask;
137 
138  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
139  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
140 
141  /* TODO: need to consider the access type of target pool */
142  ABTI_pool *p_pool = ABTI_xstream_get_main_pool(p_xstream);
143  int refcount = (newtask != NULL) ? 1 : 0;
144  int abt_errno = task_create(p_global, p_local, p_pool, task_func, arg, NULL,
145  refcount, &p_newtask);
146  ABTI_CHECK_ERROR(abt_errno);
147 
148  /* Return value */
149  if (newtask)
150  *newtask = ABTI_thread_get_handle(p_newtask);
151  return ABT_SUCCESS;
152 }
153 
165 int ABT_task_revive(ABT_pool pool, void (*task_func)(void *), void *arg,
166  ABT_task *task)
167 {
168  int abt_errno = ABT_thread_revive(pool, task_func, arg, task);
170  return abt_errno;
171 }
172 
186 int ABT_task_free(ABT_task *task)
187 {
188  int abt_errno = ABT_thread_free(task);
190  if (!(ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS)) {
191  *task = ABT_TASK_NULL;
192  }
193  return abt_errno;
194 }
195 
207 int ABT_task_join(ABT_task task)
208 {
209  int abt_errno = ABT_thread_join(task);
211  return abt_errno;
212 }
213 
225 int ABT_task_cancel(ABT_task task)
226 {
227  int abt_errno = ABT_thread_cancel(task);
229  return abt_errno;
230 }
231 
267 int ABT_task_self(ABT_task *task)
268 {
270 
271  ABTI_xstream *p_local_xstream;
272 #ifndef ABT_CONFIG_ENABLE_VER_20_API
273  *task = ABT_TASK_NULL;
274  ABTI_SETUP_GLOBAL(NULL);
275  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
276  ABTI_CHECK_TRUE(!(p_local_xstream->p_thread->type &
279 #else
281  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
282 #endif
283  *task = ABTI_thread_get_handle(p_local_xstream->p_thread);
284  return ABT_SUCCESS;
285 }
286 
320 {
322 
323  ABTI_xstream *p_local_xstream;
324 #ifndef ABT_CONFIG_ENABLE_VER_20_API
325  ABTI_SETUP_GLOBAL(NULL);
326  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
327  ABTI_CHECK_TRUE(!(p_local_xstream->p_thread->type &
330 #else
332  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
333 #endif
334  *id = ABTI_thread_get_id(p_local_xstream->p_thread);
335  return ABT_SUCCESS;
336 }
337 
350 int ABT_task_get_xstream(ABT_task task, ABT_xstream *xstream)
351 {
352  int abt_errno = ABT_thread_get_last_xstream(task, xstream);
354  return abt_errno;
355 }
356 
386 {
388  ABTI_UB_ASSERT(state);
389 
390  ABT_thread_state thread_state;
391  int abt_errno = ABT_thread_get_state(task, &thread_state);
393  if (thread_state == ABT_THREAD_STATE_READY) {
394  *state = ABT_TASK_STATE_READY;
395  } else if (thread_state == ABT_THREAD_STATE_TERMINATED) {
396  *state = ABT_TASK_STATE_TERMINATED;
397  } else {
398  *state = ABT_TASK_STATE_RUNNING;
399  }
400  return abt_errno;
401 }
402 
416 {
417  int abt_errno = ABT_thread_get_last_pool(task, pool);
419  return abt_errno;
420 }
421 
434 int ABT_task_get_last_pool_id(ABT_task task, int *id)
435 {
436  int abt_errno = ABT_thread_get_last_pool_id(task, id);
438  return abt_errno;
439 }
440 
454 {
455  int abt_errno = ABT_thread_set_migratable(task, flag);
457  return abt_errno;
458 }
459 
473 {
474  int abt_errno = ABT_thread_is_migratable(task, flag);
476  return abt_errno;
477 }
478 
479 #ifdef ABT_CONFIG_USE_DOXYGEN
480 
489 int ABT_task_is_unnamed(ABT_task task, ABT_bool *flag);
490 #endif
491 
498 int ABT_task_equal(ABT_task task1, ABT_task task2, ABT_bool *result)
499 {
500  return ABT_thread_equal(task1, task2, result);
501 }
502 
514 int ABT_task_get_id(ABT_task task, ABT_unit_id *task_id)
515 {
516  int abt_errno = ABT_thread_get_id(task, task_id);
518  return abt_errno;
519 }
520 
533 int ABT_task_get_arg(ABT_task task, void **arg)
534 {
535  int abt_errno = ABT_thread_get_arg(task, arg);
537  return abt_errno;
538 }
539 
540 #ifdef ABT_CONFIG_USE_DOXYGEN
541 
551 int ABT_task_set_specific(ABT_task task, ABT_key key, void *value);
552 #endif
553 
554 #ifdef ABT_CONFIG_USE_DOXYGEN
555 
565 int ABT_task_get_specific(ABT_task task, ABT_key key, void **value);
566 #endif
567 
568 /*****************************************************************************/
569 /* Internal static functions */
570 /*****************************************************************************/
571 
572 ABTU_ret_err static int task_create(ABTI_global *p_global, ABTI_local *p_local,
573  ABTI_pool *p_pool,
574  void (*task_func)(void *), void *arg,
575  ABTI_sched *p_sched, int refcount,
576  ABTI_thread **pp_newtask)
577 {
578  ABTI_thread *p_newtask;
579 
580  /* Allocate a task object */
581  int abt_errno = ABTI_mem_alloc_nythread(p_local, &p_newtask);
582  ABTI_CHECK_ERROR(abt_errno);
583  abt_errno = ABTI_thread_init_pool(p_global, p_newtask, p_pool);
584  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
585  ABTI_mem_free_thread(p_global, p_local, p_newtask);
586  ABTI_HANDLE_ERROR(abt_errno);
587  }
588 
589  p_newtask->p_last_xstream = NULL;
590  p_newtask->p_parent = NULL;
593  p_newtask->f_thread = task_func;
594  p_newtask->p_arg = arg;
595  ABTD_atomic_relaxed_store_ptr(&p_newtask->p_keytable, NULL);
596  p_newtask->id = ABTI_TASK_INIT_ID;
597 
598  /* Create a wrapper work unit */
599  ABTI_thread_type thread_type =
602 #ifndef ABT_CONFIG_DISABLE_MIGRATION
603  thread_type |= ABTI_THREAD_TYPE_MIGRATABLE;
604 #endif
605  p_newtask->type |= thread_type;
606 
607  ABTI_event_thread_create(p_local, p_newtask,
609  ? ABTI_local_get_xstream(p_local)->p_thread
610  : NULL,
611  p_pool);
612 
613  /* Add this task to the scheduler's pool */
615 
616  /* Return value */
617  *pp_newtask = p_newtask;
618 
619  return ABT_SUCCESS;
620 }
ABT_task_state
ABT_task_state
State of a tasklet.
Definition: abt.h:444
ABT_THREAD_STATE_TERMINATED
@ ABT_THREAD_STATE_TERMINATED
Definition: abt.h:433
ABT_key
struct ABT_key_opaque * ABT_key
Work-unit-specific data key handle type.
Definition: abt.h:980
ABT_ERR_INV_THREAD
#define ABT_ERR_INV_THREAD
Error code: invalid work unit.
Definition: abt.h:186
ABT_thread_get_arg
int ABT_thread_get_arg(ABT_thread thread, void **arg) ABT_API_PUBLIC
Retrieve an argument for a work-unit function of a work unit.
Definition: thread.c:2284
ABTI_mem_free_thread
static void ABTI_mem_free_thread(ABTI_global *p_global, ABTI_local *p_local, ABTI_thread *p_thread)
Definition: abti_mem.h:382
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1043
ABT_task_set_specific
int ABT_task_set_specific(ABT_task task, ABT_key key, void *value)
Set a value with a work-unit-specific key in a work unit.
ABT_task_cancel
int ABT_task_cancel(ABT_task task)
Send a termination request to a work unit.
Definition: task.c:227
ABT_POOL_CONTEXT_OP_THREAD_CREATE
#define ABT_POOL_CONTEXT_OP_THREAD_CREATE
A flag that hints a push operation in a thread creation routine without a yield operation.
Definition: abt.h:1673
ABTI_SETUP_LOCAL_XSTREAM
#define ABTI_SETUP_LOCAL_XSTREAM(pp_local_xstream)
Definition: abti_error.h:89
ABT_THREAD_STATE_READY
@ ABT_THREAD_STATE_READY
Definition: abt.h:427
ABT_task_self_id
int ABT_task_self_id(ABT_unit_id *id)
Get ID of the calling work unit.
Definition: task.c:321
ABT_task_self
int ABT_task_self(ABT_task *task)
Get the calling work unit.
Definition: task.c:269
ABT_task
struct ABT_thread_opaque * ABT_task
Work unit handle type.
Definition: abt.h:973
ABT_task_get_last_pool_id
int ABT_task_get_last_pool_id(ABT_task task, int *id)
Get the last pool's ID of a work unit.
Definition: task.c:436
ABTI_SETUP_GLOBAL
#define ABTI_SETUP_GLOBAL(pp_global)
Definition: abti_error.h:75
ABTI_thread::type
ABTI_thread_type type
Definition: abti.h:426
ABT_TASK_STATE_TERMINATED
@ ABT_TASK_STATE_TERMINATED
Definition: abt.h:450
ABTI_global_get_global
static ABTI_global * ABTI_global_get_global(void)
Definition: abti_global.h:9
ABT_thread_set_migratable
int ABT_thread_set_migratable(ABT_thread thread, ABT_bool migratable) ABT_API_PUBLIC
Set the migratability in a work unit.
Definition: thread.c:1933
ABTI_CHECK_ERROR
#define ABTI_CHECK_ERROR(abt_errno)
Definition: abti_error.h:136
ABTI_mem_alloc_nythread
static ABTU_ret_err int ABTI_mem_alloc_nythread(ABTI_local *p_local, ABTI_thread **pp_thread)
Definition: abti_mem.h:130
ABT_thread_get_id
int ABT_thread_get_id(ABT_thread thread, ABT_unit_id *thread_id) ABT_API_PUBLIC
Get ID of a work unit.
Definition: thread.c:2209
ABTI_thread::p_arg
void * p_arg
Definition: abti.h:431
ABTI_thread::unit
ABT_unit unit
Definition: abti.h:427
ABTI_xstream::type
ABTI_xstream_type type
Definition: abti.h:300
ABTI_THREAD_TYPE_YIELDABLE
#define ABTI_THREAD_TYPE_YIELDABLE
Definition: abti.h:87
ABTI_thread::request
ABTD_atomic_uint32 request
Definition: abti.h:433
ABTI_thread_get_id
ABT_unit_id ABTI_thread_get_id(ABTI_thread *p_thread)
Definition: thread.c:2803
ABTI_thread_get_handle
static ABT_thread ABTI_thread_get_handle(ABTI_thread *p_thread)
Definition: abti_thread.h:24
ABTI_thread
Definition: abti.h:422
ABTI_IS_ERROR_CHECK_ENABLED
#define ABTI_IS_ERROR_CHECK_ENABLED
Definition: abti.h:20
ABT_thread_get_state
int ABT_thread_get_state(ABT_thread thread, ABT_thread_state *state) ABT_API_PUBLIC
Get a state of a work unit.
Definition: thread.c:1068
ABTI_xstream
Definition: abti.h:294
ABT_pool
struct ABT_pool_opaque * ABT_pool
Pool handle type.
Definition: abt.h:878
task_create
static ABTU_ret_err int task_create(ABTI_global *p_global, ABTI_local *p_local, ABTI_pool *p_pool, void(*task_func)(void *), void *arg, ABTI_sched *p_sched, int refcount, ABTI_thread **pp_newtask)
Definition: task.c:574
ABT_thread_state
ABT_thread_state
State of a work unit.
Definition: abt.h:425
ABT_thread_free
int ABT_thread_free(ABT_thread *thread) ABT_API_PUBLIC
Free a work unit.
Definition: thread.c:612
ABT_task_get_arg
int ABT_task_get_arg(ABT_task task, void **arg)
Retrieve an argument for a work-unit function of a work unit.
Definition: task.c:535
ABTI_pool
Definition: abti.h:389
ABT_task_create_on_xstream
int ABT_task_create_on_xstream(ABT_xstream xstream, void(*task_func)(void *), void *arg, ABT_task *newtask)
Create a new tasklet associated with an execution stream.
Definition: task.c:125
abti.h
ABTD_atomic_relaxed_store_uint32
static void ABTD_atomic_relaxed_store_uint32(ABTD_atomic_uint32 *ptr, uint32_t val)
Definition: abtd_atomic.h:1025
ABT_unit_id
uint64_t ABT_unit_id
Work unit ID type.
Definition: abt.h:921
ABT_xstream
struct ABT_xstream_opaque * ABT_xstream
Execution stream handle type.
Definition: abt.h:826
ABTI_thread::state
ABTD_atomic_int state
Definition: abti.h:432
ABT_task_is_unnamed
int ABT_task_is_unnamed(ABT_task task, ABT_bool *flag)
Check if a work unit is unnamed.
ABT_thread_get_last_pool
int ABT_thread_get_last_pool(ABT_thread thread, ABT_pool *pool) ABT_API_PUBLIC
Get the last pool of a work unit.
Definition: thread.c:1111
ABTI_HANDLE_ERROR
#define ABTI_HANDLE_ERROR(n)
Definition: abti_error.h:130
ABT_TASK_STATE_RUNNING
@ ABT_TASK_STATE_RUNNING
Definition: abt.h:448
ABTI_THREAD_TYPE_MIGRATABLE
#define ABTI_THREAD_TYPE_MIGRATABLE
Definition: abti.h:89
ABTD_atomic_relaxed_store_int
static void ABTD_atomic_relaxed_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:996
ABT_thread_join
int ABT_thread_join(ABT_thread thread) ABT_API_PUBLIC
Wait for a work unit to terminate.
Definition: thread.c:733
ABTI_THREAD_TYPE_THREAD
#define ABTI_THREAD_TYPE_THREAD
Definition: abti.h:83
ABT_task_revive
int ABT_task_revive(ABT_pool pool, void(*task_func)(void *), void *arg, ABT_task *task)
Revive a terminated work unit.
Definition: task.c:167
ABT_task_free
int ABT_task_free(ABT_task *task)
Free a work unit.
Definition: task.c:188
ABTI_initialized
ABT_bool ABTI_initialized(void)
Definition: global.c:187
ABTI_local_get_local
static ABTI_local * ABTI_local_get_local(void)
Definition: abti_local.h:41
ABT_TASK_STATE_READY
@ ABT_TASK_STATE_READY
Definition: abt.h:446
ABT_ERR_INV_TASK
#define ABT_ERR_INV_TASK
Error code: invalid work unit.
Definition: abt.h:200
ABTI_xstream_get_main_pool
static ABTI_pool * ABTI_xstream_get_main_pool(ABTI_xstream *p_xstream)
Definition: abti_stream.h:42
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
ABT_TASK_NULL
#define ABT_TASK_NULL
Definition: abt.h:1107
ABTI_event_thread_create
#define ABTI_event_thread_create(p_local, p_thread, p_caller, p_pool)
Definition: abti_event.h:162
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:155
ABT_thread_get_last_xstream
int ABT_thread_get_last_xstream(ABT_thread thread, ABT_xstream *xstream) ABT_API_PUBLIC
Get an execution stream associated with a work unit.
Definition: thread.c:1024
ABTI_local_get_xstream_or_null
static ABTI_xstream * ABTI_local_get_xstream_or_null(ABTI_local *p_local)
Definition: abti_local.h:77
ABTI_thread_type
uint32_t ABTI_thread_type
Definition: abti.h:152
ABTI_pool_get_ptr
static ABTI_pool * ABTI_pool_get_ptr(ABT_pool pool)
Definition: abti_pool.h:11
ABTI_sched
Definition: abti.h:319
ABT_task_get_id
int ABT_task_get_id(ABT_task task, ABT_unit_id *task_id)
Get ID of a work unit.
Definition: task.c:516
ABT_thread_is_migratable
int ABT_thread_is_migratable(ABT_thread thread, ABT_bool *is_migratable) ABT_API_PUBLIC
Get the migratability of a work unit.
Definition: thread.c:1993
ABT_task_get_last_pool
int ABT_task_get_last_pool(ABT_task task, ABT_pool *pool)
Get the last pool of a work unit.
Definition: task.c:417
ABTI_UB_ASSERT
#define ABTI_UB_ASSERT(cond)
Definition: abti_error.h:19
ABTI_CHECK_NULL_POOL_PTR
#define ABTI_CHECK_NULL_POOL_PTR(p)
Definition: abti_error.h:184
ABTI_CHECK_NULL_XSTREAM_PTR
#define ABTI_CHECK_NULL_XSTREAM_PTR(p)
Definition: abti_error.h:175
ABTI_thread::p_parent
ABTI_thread * p_parent
Definition: abti.h:429
ABTI_THREAD_TYPE_NAMED
#define ABTI_THREAD_TYPE_NAMED
Definition: abti.h:88
ABT_task_create
int ABT_task_create(ABT_pool pool, void(*task_func)(void *), void *arg, ABT_task *newtask)
Create a new tasklet.
Definition: task.c:57
ABTI_xstream_get_ptr
static ABTI_xstream * ABTI_xstream_get_ptr(ABT_xstream xstream)
Definition: abti_stream.h:11
ABTI_thread::p_last_xstream
ABTI_xstream * p_last_xstream
Definition: abti.h:428
ABTI_pool_push
static void ABTI_pool_push(ABTI_pool *p_pool, ABT_unit unit, ABT_pool_context context)
Definition: abti_pool.h:53
ABT_task_equal
int ABT_task_equal(ABT_task task1, ABT_task task2, ABT_bool *result)
Compare two work-unit handles for equality.
Definition: task.c:500
ABTI_thread::p_keytable
ABTD_atomic_ptr p_keytable
Definition: abti.h:435
ABTI_local
struct ABTI_local ABTI_local
Definition: abti.h:132
ABT_task_get_state
int ABT_task_get_state(ABT_task task, ABT_task_state *state)
Get a state of a tasklet.
Definition: task.c:387
ABT_task_set_migratable
int ABT_task_set_migratable(ABT_task task, ABT_bool flag)
Set the migratability in a work unit.
Definition: task.c:455
ABT_task_get_specific
int ABT_task_get_specific(ABT_task task, ABT_key key, void **value)
Get a value associated with a work-unit-specific key in a work unit.
ABT_task_get_xstream
int ABT_task_get_xstream(ABT_task task, ABT_xstream *xstream)
Get an execution stream associated with a work unit.
Definition: task.c:352
ABT_thread_get_last_pool_id
int ABT_thread_get_last_pool_id(ABT_thread thread, int *id) ABT_API_PUBLIC
Get the last pool's ID of a work unit.
Definition: thread.c:1154
ABTI_CHECK_TRUE
#define ABTI_CHECK_TRUE(cond, abt_errno)
Definition: abti_error.h:146
ABT_thread_revive
int ABT_thread_revive(ABT_pool pool, void(*thread_func)(void *), void *arg, ABT_thread *thread) ABT_API_PUBLIC
Revive a terminated work unit.
Definition: thread.c:458
ABTI_global
Definition: abti.h:223
ABT_thread_equal
int ABT_thread_equal(ABT_thread thread1, ABT_thread thread2, ABT_bool *result) ABT_API_PUBLIC
Compare two work unit handles for equality.
Definition: thread.c:2129
ABTI_thread::f_thread
void(* f_thread)(void *)
Definition: abti.h:430
ABTD_atomic_relaxed_store_ptr
static void ABTD_atomic_relaxed_store_ptr(ABTD_atomic_ptr *ptr, void *val)
Definition: abtd_atomic.h:1055
ABTI_local_get_xstream
static ABTI_xstream * ABTI_local_get_xstream(ABTI_local *p_local)
Definition: abti_local.h:86
ABT_task_is_migratable
int ABT_task_is_migratable(ABT_task task, ABT_bool *flag)
Get the migratability of a work unit.
Definition: task.c:474
ABTI_thread_init_pool
static ABTU_ret_err int ABTI_thread_init_pool(ABTI_global *p_global, ABTI_thread *p_thread, ABTI_pool *p_pool)
Definition: abti_unit.h:128
ABT_thread_cancel
int ABT_thread_cancel(ABT_thread thread) ABT_API_PUBLIC
Send a cancellation request to a work unit.
Definition: thread.c:873
ABTI_thread::id
ABT_unit_id id
Definition: abti.h:436
ABT_task_join
int ABT_task_join(ABT_task task)
Wait for a work unit to terminate.
Definition: task.c:209
ABTI_TASK_INIT_ID
#define ABTI_TASK_INIT_ID
Definition: abti.h:53