ARGOBOTS  dce6e727ffc4ca5b3ffc04cb9517c6689be51ec5
task.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 ABTU_ret_err static int task_create(ABTI_global *p_global, ABTI_local *p_local,
9  ABTI_pool *p_pool,
10  void (*task_func)(void *), void *arg,
11  ABTI_sched *p_sched, int refcount,
12  ABTI_thread **pp_newtask);
13 
56 int ABT_task_create(ABT_pool pool, void (*task_func)(void *), void *arg,
57  ABT_task *newtask)
58 {
59  ABTI_UB_ASSERT(ABTI_initialized());
60  ABTI_UB_ASSERT(task_func);
61 
62 #ifndef ABT_CONFIG_ENABLE_VER_20_API
63  /* Argobots 1.x sets newtask to NULL on error. */
64  if (newtask)
65  *newtask = ABT_TASK_NULL;
66 #endif
67  ABTI_global *p_global = ABTI_global_get_global();
68  ABTI_local *p_local = ABTI_local_get_local();
69  ABTI_thread *p_newtask;
70  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
71  ABTI_CHECK_NULL_POOL_PTR(p_pool);
72 
73  int refcount = (newtask != NULL) ? 1 : 0;
74  int abt_errno = task_create(p_global, p_local, p_pool, task_func, arg, NULL,
75  refcount, &p_newtask);
76  ABTI_CHECK_ERROR(abt_errno);
77 
78  /* Return value */
79  if (newtask)
80  *newtask = ABTI_thread_get_handle(p_newtask);
81  return ABT_SUCCESS;
82 }
83 
123 int ABT_task_create_on_xstream(ABT_xstream xstream, void (*task_func)(void *),
124  void *arg, ABT_task *newtask)
125 {
126  ABTI_UB_ASSERT(ABTI_initialized());
127  ABTI_UB_ASSERT(task_func);
128 
129 #ifndef ABT_CONFIG_ENABLE_VER_20_API
130  /* Argobots 1.x sets newtask to NULL on error. */
131  if (newtask)
132  *newtask = ABT_TASK_NULL;
133 #endif
134  ABTI_global *p_global = ABTI_global_get_global();
135  ABTI_local *p_local = ABTI_local_get_local();
136  ABTI_thread *p_newtask;
137 
138  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
139  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
140 
141  /* TODO: need to consider the access type of target pool */
142  ABTI_pool *p_pool = ABTI_xstream_get_main_pool(p_xstream);
143  int refcount = (newtask != NULL) ? 1 : 0;
144  int abt_errno = task_create(p_global, p_local, p_pool, task_func, arg, NULL,
145  refcount, &p_newtask);
146  ABTI_CHECK_ERROR(abt_errno);
147 
148  /* Return value */
149  if (newtask)
150  *newtask = ABTI_thread_get_handle(p_newtask);
151  return ABT_SUCCESS;
152 }
153 
165 int ABT_task_revive(ABT_pool pool, void (*task_func)(void *), void *arg,
166  ABT_task *task)
167 {
168  int abt_errno = ABT_thread_revive(pool, task_func, arg, task);
169  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
170  return abt_errno;
171 }
172 
186 int ABT_task_free(ABT_task *task)
187 {
188  int abt_errno = ABT_thread_free(task);
189  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
190  if (!(ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS)) {
191  *task = ABT_TASK_NULL;
192  }
193  return abt_errno;
194 }
195 
207 int ABT_task_join(ABT_task task)
208 {
209  int abt_errno = ABT_thread_join(task);
210  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
211  return abt_errno;
212 }
213 
225 int ABT_task_cancel(ABT_task task)
226 {
227  int abt_errno = ABT_thread_cancel(task);
228  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
229  return abt_errno;
230 }
231 
267 int ABT_task_self(ABT_task *task)
268 {
269  ABTI_UB_ASSERT(task);
270 
271  ABTI_xstream *p_local_xstream;
272 #ifndef ABT_CONFIG_ENABLE_VER_20_API
273  *task = ABT_TASK_NULL;
274  ABTI_SETUP_GLOBAL(NULL);
275  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
276  ABTI_CHECK_TRUE(!(p_local_xstream->p_thread->type &
277  ABTI_THREAD_TYPE_YIELDABLE),
279 #else
280  ABTI_UB_ASSERT(ABTI_initialized());
281  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
282 #endif
283  *task = ABTI_thread_get_handle(p_local_xstream->p_thread);
284  return ABT_SUCCESS;
285 }
286 
320 {
321  ABTI_UB_ASSERT(id);
322 
323  ABTI_xstream *p_local_xstream;
324 #ifndef ABT_CONFIG_ENABLE_VER_20_API
325  ABTI_SETUP_GLOBAL(NULL);
326  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
327  ABTI_CHECK_TRUE(!(p_local_xstream->p_thread->type &
328  ABTI_THREAD_TYPE_YIELDABLE),
330 #else
331  ABTI_UB_ASSERT(ABTI_initialized());
332  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
333 #endif
334  *id = ABTI_thread_get_id(p_local_xstream->p_thread);
335  return ABT_SUCCESS;
336 }
337 
350 int ABT_task_get_xstream(ABT_task task, ABT_xstream *xstream)
351 {
352  int abt_errno = ABT_thread_get_last_xstream(task, xstream);
353  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
354  return abt_errno;
355 }
356 
386 {
387  ABTI_UB_ASSERT(ABTI_initialized());
388  ABTI_UB_ASSERT(state);
389 
390  ABT_thread_state thread_state;
391  int abt_errno = ABT_thread_get_state(task, &thread_state);
392  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
393  if (thread_state == ABT_THREAD_STATE_READY) {
394  *state = ABT_TASK_STATE_READY;
395  } else if (thread_state == ABT_THREAD_STATE_TERMINATED) {
396  *state = ABT_TASK_STATE_TERMINATED;
397  } else {
398  *state = ABT_TASK_STATE_RUNNING;
399  }
400  return abt_errno;
401 }
402 
416 {
417  int abt_errno = ABT_thread_get_last_pool(task, pool);
418  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
419  return abt_errno;
420 }
421 
434 int ABT_task_get_last_pool_id(ABT_task task, int *id)
435 {
436  int abt_errno = ABT_thread_get_last_pool_id(task, id);
437  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
438  return abt_errno;
439 }
440 
454 {
455  int abt_errno = ABT_thread_set_migratable(task, flag);
456  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
457  return abt_errno;
458 }
459 
473 {
474  int abt_errno = ABT_thread_is_migratable(task, flag);
475  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
476  return abt_errno;
477 }
478 
479 #ifdef ABT_CONFIG_USE_DOXYGEN
480 
489 int ABT_task_is_unnamed(ABT_task task, ABT_bool *flag);
490 #endif
491 
498 int ABT_task_equal(ABT_task task1, ABT_task task2, ABT_bool *result)
499 {
500  return ABT_thread_equal(task1, task2, result);
501 }
502 
514 int ABT_task_get_id(ABT_task task, ABT_unit_id *task_id)
515 {
516  int abt_errno = ABT_thread_get_id(task, task_id);
517  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
518  return abt_errno;
519 }
520 
533 int ABT_task_get_arg(ABT_task task, void **arg)
534 {
535  int abt_errno = ABT_thread_get_arg(task, arg);
536  ABTI_CHECK_TRUE(abt_errno != ABT_ERR_INV_THREAD, ABT_ERR_INV_TASK);
537  return abt_errno;
538 }
539 
540 #ifdef ABT_CONFIG_USE_DOXYGEN
541 
551 int ABT_task_set_specific(ABT_task task, ABT_key key, void *value);
552 #endif
553 
554 #ifdef ABT_CONFIG_USE_DOXYGEN
555 
565 int ABT_task_get_specific(ABT_task task, ABT_key key, void **value);
566 #endif
567 
568 /*****************************************************************************/
569 /* Internal static functions */
570 /*****************************************************************************/
571 
572 ABTU_ret_err static int task_create(ABTI_global *p_global, ABTI_local *p_local,
573  ABTI_pool *p_pool,
574  void (*task_func)(void *), void *arg,
575  ABTI_sched *p_sched, int refcount,
576  ABTI_thread **pp_newtask)
577 {
578  ABTI_thread *p_newtask;
579 
580  /* Allocate a task object */
581  int abt_errno = ABTI_mem_alloc_nythread(p_local, &p_newtask);
582  ABTI_CHECK_ERROR(abt_errno);
583  abt_errno = ABTI_thread_init_pool(p_global, p_newtask, p_pool);
584  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
585  ABTI_mem_free_thread(p_global, p_local, p_newtask);
586  ABTI_HANDLE_ERROR(abt_errno);
587  }
588 
589  p_newtask->p_last_xstream = NULL;
590  p_newtask->p_parent = NULL;
591  ABTD_atomic_relaxed_store_int(&p_newtask->state, ABT_THREAD_STATE_READY);
592  ABTD_atomic_relaxed_store_uint32(&p_newtask->request, 0);
593  p_newtask->f_thread = task_func;
594  p_newtask->p_arg = arg;
595  ABTD_atomic_relaxed_store_ptr(&p_newtask->p_keytable, NULL);
596  p_newtask->id = ABTI_TASK_INIT_ID;
597 
598  /* Create a wrapper work unit */
599  ABTI_thread_type thread_type =
600  refcount ? (ABTI_THREAD_TYPE_THREAD | ABTI_THREAD_TYPE_NAMED)
601  : ABTI_THREAD_TYPE_THREAD;
602 #ifndef ABT_CONFIG_DISABLE_MIGRATION
603  thread_type |= ABTI_THREAD_TYPE_MIGRATABLE;
604 #endif
605  p_newtask->type |= thread_type;
606 
607  ABTI_event_thread_create(p_local, p_newtask,
608  ABTI_local_get_xstream_or_null(p_local)
609  ? ABTI_local_get_xstream(p_local)->p_thread
610  : NULL,
611  p_pool);
612 
613  /* Add this task to the scheduler's pool */
614  ABTI_pool_push(p_pool, p_newtask->unit, ABT_POOL_CONTEXT_OP_THREAD_CREATE);
615 
616  /* Return value */
617  *pp_newtask = p_newtask;
618 
619  return ABT_SUCCESS;
620 }
ABT_task_state
ABT_task_state
State of a tasklet.
Definition: abt.h:444
ABT_THREAD_STATE_TERMINATED
@ ABT_THREAD_STATE_TERMINATED
Definition: abt.h:433
ABT_key
struct ABT_key_opaque * ABT_key
Work-unit-specific data key handle type.
Definition: abt.h:980
ABT_ERR_INV_THREAD
#define ABT_ERR_INV_THREAD
Error code: invalid work unit.
Definition: abt.h:186
ABT_thread_get_arg
int ABT_thread_get_arg(ABT_thread thread, void **arg) ABT_API_PUBLIC
Retrieve an argument for a work-unit function of a work unit.
Definition: thread.c:2284
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1043
ABT_task_set_specific
int ABT_task_set_specific(ABT_task task, ABT_key key, void *value)
Set a value with a work-unit-specific key in a work unit.
ABT_task_cancel
int ABT_task_cancel(ABT_task task)
Send a termination request to a work unit.
Definition: task.c:227
ABT_POOL_CONTEXT_OP_THREAD_CREATE
#define ABT_POOL_CONTEXT_OP_THREAD_CREATE
A flag that hints a push operation in a thread creation routine without a yield operation.
Definition: abt.h:1673
ABT_THREAD_STATE_READY
@ ABT_THREAD_STATE_READY
Definition: abt.h:427
ABT_task_self_id
int ABT_task_self_id(ABT_unit_id *id)
Get ID of the calling work unit.
Definition: task.c:321
ABT_task_self
int ABT_task_self(ABT_task *task)
Get the calling work unit.
Definition: task.c:269
ABT_task
struct ABT_thread_opaque * ABT_task
Work unit handle type.
Definition: abt.h:973
ABT_task_get_last_pool_id
int ABT_task_get_last_pool_id(ABT_task task, int *id)
Get the last pool's ID of a work unit.
Definition: task.c:436
ABT_TASK_STATE_TERMINATED
@ ABT_TASK_STATE_TERMINATED
Definition: abt.h:450
ABT_thread_set_migratable
int ABT_thread_set_migratable(ABT_thread thread, ABT_bool migratable) ABT_API_PUBLIC
Set the migratability in a work unit.
Definition: thread.c:1933
ABT_thread_get_id
int ABT_thread_get_id(ABT_thread thread, ABT_unit_id *thread_id) ABT_API_PUBLIC
Get ID of a work unit.
Definition: thread.c:2209
ABT_thread_get_state
int ABT_thread_get_state(ABT_thread thread, ABT_thread_state *state) ABT_API_PUBLIC
Get a state of a work unit.
Definition: thread.c:1068
ABT_pool
struct ABT_pool_opaque * ABT_pool
Pool handle type.
Definition: abt.h:878
task_create
static ABTU_ret_err int task_create(ABTI_global *p_global, ABTI_local *p_local, ABTI_pool *p_pool, void(*task_func)(void *), void *arg, ABTI_sched *p_sched, int refcount, ABTI_thread **pp_newtask)
Definition: task.c:574
ABT_thread_state
ABT_thread_state
State of a work unit.
Definition: abt.h:425
ABT_thread_free
int ABT_thread_free(ABT_thread *thread) ABT_API_PUBLIC
Free a work unit.
Definition: thread.c:612
ABT_task_get_arg
int ABT_task_get_arg(ABT_task task, void **arg)
Retrieve an argument for a work-unit function of a work unit.
Definition: task.c:535
ABT_task_create_on_xstream
int ABT_task_create_on_xstream(ABT_xstream xstream, void(*task_func)(void *), void *arg, ABT_task *newtask)
Create a new tasklet associated with an execution stream.
Definition: task.c:125
abti.h
ABT_unit_id
uint64_t ABT_unit_id
Work unit ID type.
Definition: abt.h:921
ABT_xstream
struct ABT_xstream_opaque * ABT_xstream
Execution stream handle type.
Definition: abt.h:826
ABT_task_is_unnamed
int ABT_task_is_unnamed(ABT_task task, ABT_bool *flag)
Check if a work unit is unnamed.
ABT_thread_get_last_pool
int ABT_thread_get_last_pool(ABT_thread thread, ABT_pool *pool) ABT_API_PUBLIC
Get the last pool of a work unit.
Definition: thread.c:1111
ABT_TASK_STATE_RUNNING
@ ABT_TASK_STATE_RUNNING
Definition: abt.h:448
ABT_thread_join
int ABT_thread_join(ABT_thread thread) ABT_API_PUBLIC
Wait for a work unit to terminate.
Definition: thread.c:733
ABT_task_revive
int ABT_task_revive(ABT_pool pool, void(*task_func)(void *), void *arg, ABT_task *task)
Revive a terminated work unit.
Definition: task.c:167
ABT_task_free
int ABT_task_free(ABT_task *task)
Free a work unit.
Definition: task.c:188
ABT_TASK_STATE_READY
@ ABT_TASK_STATE_READY
Definition: abt.h:446
ABT_ERR_INV_TASK
#define ABT_ERR_INV_TASK
Error code: invalid work unit.
Definition: abt.h:200
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
ABT_TASK_NULL
#define ABT_TASK_NULL
Definition: abt.h:1107
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:155
ABT_thread_get_last_xstream
int ABT_thread_get_last_xstream(ABT_thread thread, ABT_xstream *xstream) ABT_API_PUBLIC
Get an execution stream associated with a work unit.
Definition: thread.c:1024
ABT_task_get_id
int ABT_task_get_id(ABT_task task, ABT_unit_id *task_id)
Get ID of a work unit.
Definition: task.c:516
ABT_thread_is_migratable
int ABT_thread_is_migratable(ABT_thread thread, ABT_bool *is_migratable) ABT_API_PUBLIC
Get the migratability of a work unit.
Definition: thread.c:1993
ABT_task_get_last_pool
int ABT_task_get_last_pool(ABT_task task, ABT_pool *pool)
Get the last pool of a work unit.
Definition: task.c:417
ABT_task_create
int ABT_task_create(ABT_pool pool, void(*task_func)(void *), void *arg, ABT_task *newtask)
Create a new tasklet.
Definition: task.c:57
ABT_task_equal
int ABT_task_equal(ABT_task task1, ABT_task task2, ABT_bool *result)
Compare two work-unit handles for equality.
Definition: task.c:500
ABT_task_get_state
int ABT_task_get_state(ABT_task task, ABT_task_state *state)
Get a state of a tasklet.
Definition: task.c:387
ABT_task_set_migratable
int ABT_task_set_migratable(ABT_task task, ABT_bool flag)
Set the migratability in a work unit.
Definition: task.c:455
ABT_task_get_specific
int ABT_task_get_specific(ABT_task task, ABT_key key, void **value)
Get a value associated with a work-unit-specific key in a work unit.
ABT_task_get_xstream
int ABT_task_get_xstream(ABT_task task, ABT_xstream *xstream)
Get an execution stream associated with a work unit.
Definition: task.c:352
ABT_thread_get_last_pool_id
int ABT_thread_get_last_pool_id(ABT_thread thread, int *id) ABT_API_PUBLIC
Get the last pool's ID of a work unit.
Definition: thread.c:1154
ABT_thread_revive
int ABT_thread_revive(ABT_pool pool, void(*thread_func)(void *), void *arg, ABT_thread *thread) ABT_API_PUBLIC
Revive a terminated work unit.
Definition: thread.c:458
ABT_thread_equal
int ABT_thread_equal(ABT_thread thread1, ABT_thread thread2, ABT_bool *result) ABT_API_PUBLIC
Compare two work unit handles for equality.
Definition: thread.c:2129
ABT_task_is_migratable
int ABT_task_is_migratable(ABT_task task, ABT_bool *flag)
Get the migratability of a work unit.
Definition: task.c:474
ABT_thread_cancel
int ABT_thread_cancel(ABT_thread thread) ABT_API_PUBLIC
Send a cancellation request to a work unit.
Definition: thread.c:873
ABT_task_join
int ABT_task_join(ABT_task task)
Wait for a work unit to terminate.
Definition: task.c:209