ARGOBOTS  1.1
task.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 ABTU_ret_err static int task_create(ABTI_global *p_global, ABTI_local *p_local,
9  ABTI_pool *p_pool,
10  void (*task_func)(void *), void *arg,
11  ABTI_sched *p_sched, int refcount,
12  ABTI_thread **pp_newtask);
13 
56 int ABT_task_create(ABT_pool pool, void (*task_func)(void *), void *arg,
57  ABT_task *newtask)
58 {
59 #ifndef ABT_CONFIG_ENABLE_VER_20_API
60  /* Argobots 1.x sets newtask to NULL on error. */
61  if (newtask)
62  *newtask = ABT_TASK_NULL;
63 #endif
64  ABTI_global *p_global = ABTI_global_get_global();
65  ABTI_local *p_local = ABTI_local_get_local();
66  ABTI_thread *p_newtask;
67  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
69 
70  int refcount = (newtask != NULL) ? 1 : 0;
71  int abt_errno = task_create(p_global, p_local, p_pool, task_func, arg, NULL,
72  refcount, &p_newtask);
73  ABTI_CHECK_ERROR(abt_errno);
74 
75  /* Return value */
76  if (newtask)
77  *newtask = ABTI_thread_get_handle(p_newtask);
78  return ABT_SUCCESS;
79 }
80 
120 int ABT_task_create_on_xstream(ABT_xstream xstream, void (*task_func)(void *),
121  void *arg, ABT_task *newtask)
122 {
123 #ifndef ABT_CONFIG_ENABLE_VER_20_API
124  /* Argobots 1.x sets newtask to NULL on error. */
125  if (newtask)
126  *newtask = ABT_TASK_NULL;
127 #endif
128  ABTI_global *p_global = ABTI_global_get_global();
129  ABTI_local *p_local = ABTI_local_get_local();
130  ABTI_thread *p_newtask;
131 
132  ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
133  ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
134 
135  /* TODO: need to consider the access type of target pool */
136  ABTI_pool *p_pool = ABTI_xstream_get_main_pool(p_xstream);
137  int refcount = (newtask != NULL) ? 1 : 0;
138  int abt_errno = task_create(p_global, p_local, p_pool, task_func, arg, NULL,
139  refcount, &p_newtask);
140  ABTI_CHECK_ERROR(abt_errno);
141 
142  /* Return value */
143  if (newtask)
144  *newtask = ABTI_thread_get_handle(p_newtask);
145  return ABT_SUCCESS;
146 }
147 
159 int ABT_task_revive(ABT_pool pool, void (*task_func)(void *), void *arg,
160  ABT_task *task)
161 {
162  int abt_errno = ABT_thread_revive(pool, task_func, arg, task);
164  return abt_errno;
165 }
166 
180 int ABT_task_free(ABT_task *task)
181 {
182  int abt_errno = ABT_thread_free(task);
184  if (!(ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS)) {
185  *task = ABT_TASK_NULL;
186  }
187  return abt_errno;
188 }
189 
201 int ABT_task_join(ABT_task task)
202 {
203  int abt_errno = ABT_thread_join(task);
205  return abt_errno;
206 }
207 
219 int ABT_task_cancel(ABT_task task)
220 {
221  int abt_errno = ABT_thread_cancel(task);
223  return abt_errno;
224 }
225 
261 int ABT_task_self(ABT_task *task)
262 {
263  ABTI_xstream *p_local_xstream;
264 #ifndef ABT_CONFIG_ENABLE_VER_20_API
265  *task = ABT_TASK_NULL;
266  ABTI_SETUP_GLOBAL(NULL);
267  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
268  ABTI_CHECK_TRUE(!(p_local_xstream->p_thread->type &
271 #else
272  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
273 #endif
274  *task = ABTI_thread_get_handle(p_local_xstream->p_thread);
275  return ABT_SUCCESS;
276 }
277 
311 {
312  ABTI_xstream *p_local_xstream;
313 #ifndef ABT_CONFIG_ENABLE_VER_20_API
314  ABTI_SETUP_GLOBAL(NULL);
315  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
316  ABTI_CHECK_TRUE(!(p_local_xstream->p_thread->type &
319 #else
320  ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
321 #endif
322  *id = ABTI_thread_get_id(p_local_xstream->p_thread);
323  return ABT_SUCCESS;
324 }
325 
338 int ABT_task_get_xstream(ABT_task task, ABT_xstream *xstream)
339 {
340  int abt_errno = ABT_thread_get_last_xstream(task, xstream);
342  return abt_errno;
343 }
344 
374 {
375  ABT_thread_state thread_state;
376  int abt_errno = ABT_thread_get_state(task, &thread_state);
378  if (thread_state == ABT_THREAD_STATE_READY) {
379  *state = ABT_TASK_STATE_READY;
380  } else if (thread_state == ABT_THREAD_STATE_TERMINATED) {
381  *state = ABT_TASK_STATE_TERMINATED;
382  } else {
383  *state = ABT_TASK_STATE_RUNNING;
384  }
385  return abt_errno;
386 }
387 
401 {
402  int abt_errno = ABT_thread_get_last_pool(task, pool);
404  return abt_errno;
405 }
406 
419 int ABT_task_get_last_pool_id(ABT_task task, int *id)
420 {
421  int abt_errno = ABT_thread_get_last_pool_id(task, id);
423  return abt_errno;
424 }
425 
439 {
440  int abt_errno = ABT_thread_set_migratable(task, flag);
442  return abt_errno;
443 }
444 
458 {
459  int abt_errno = ABT_thread_is_migratable(task, flag);
461  return abt_errno;
462 }
463 
464 #ifdef ABT_CONFIG_USE_DOXYGEN
465 
474 int ABT_task_is_unnamed(ABT_task task, ABT_bool *flag);
475 #endif
476 
483 int ABT_task_equal(ABT_task task1, ABT_task task2, ABT_bool *result)
484 {
485  return ABT_thread_equal(task1, task2, result);
486 }
487 
499 int ABT_task_get_id(ABT_task task, ABT_unit_id *task_id)
500 {
501  int abt_errno = ABT_thread_get_id(task, task_id);
503  return abt_errno;
504 }
505 
518 int ABT_task_get_arg(ABT_task task, void **arg)
519 {
520  int abt_errno = ABT_thread_get_arg(task, arg);
522  return abt_errno;
523 }
524 
525 #ifdef ABT_CONFIG_USE_DOXYGEN
526 
536 int ABT_task_set_specific(ABT_task task, ABT_key key, void *value);
537 #endif
538 
539 #ifdef ABT_CONFIG_USE_DOXYGEN
540 
550 int ABT_task_get_specific(ABT_task task, ABT_key key, void **value);
551 #endif
552 
553 /*****************************************************************************/
554 /* Internal static functions */
555 /*****************************************************************************/
556 
557 ABTU_ret_err static int task_create(ABTI_global *p_global, ABTI_local *p_local,
558  ABTI_pool *p_pool,
559  void (*task_func)(void *), void *arg,
560  ABTI_sched *p_sched, int refcount,
561  ABTI_thread **pp_newtask)
562 {
563  ABTI_thread *p_newtask;
564 
565  /* Allocate a task object */
566  int abt_errno = ABTI_mem_alloc_nythread(p_local, &p_newtask);
567  ABTI_CHECK_ERROR(abt_errno);
568  abt_errno = ABTI_thread_init_pool(p_global, p_newtask, p_pool);
569  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
570  ABTI_mem_free_nythread(p_global, p_local, p_newtask);
571  ABTI_HANDLE_ERROR(abt_errno);
572  }
573 
574  p_newtask->p_last_xstream = NULL;
575  p_newtask->p_parent = NULL;
578  p_newtask->f_thread = task_func;
579  p_newtask->p_arg = arg;
580  ABTD_atomic_relaxed_store_ptr(&p_newtask->p_keytable, NULL);
581  p_newtask->id = ABTI_TASK_INIT_ID;
582 
583  /* Create a wrapper work unit */
584  ABTI_thread_type thread_type =
587 #ifndef ABT_CONFIG_DISABLE_MIGRATION
588  thread_type |= ABTI_THREAD_TYPE_MIGRATABLE;
589 #endif
590  p_newtask->type |= thread_type;
591 
592  ABTI_tool_event_thread_create(p_local, p_newtask,
594  ? ABTI_local_get_xstream(p_local)
595  ->p_thread
596  : NULL,
597  p_pool);
598  LOG_DEBUG("[T%" PRIu64 "] created\n", ABTI_thread_get_id(p_newtask));
599 
600  /* Add this task to the scheduler's pool */
601  ABTI_pool_push(p_pool, p_newtask->unit);
602 
603  /* Return value */
604  *pp_newtask = p_newtask;
605 
606  return ABT_SUCCESS;
607 }
ABT_task_state
ABT_task_state
State of a tasklet.
Definition: abt.h:434
ABT_THREAD_STATE_TERMINATED
@ ABT_THREAD_STATE_TERMINATED
Definition: abt.h:423
ABT_key
struct ABT_key_opaque * ABT_key
Work-unit-specific data key handle type.
Definition: abt.h:938
ABT_ERR_INV_THREAD
#define ABT_ERR_INV_THREAD
Error code: invalid work unit.
Definition: abt.h:176
ABT_thread_get_arg
int ABT_thread_get_arg(ABT_thread thread, void **arg) ABT_API_PUBLIC
Retrieve an argument for a work-unit function of a work unit.
Definition: thread.c:2053
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1001
ABT_task_set_specific
int ABT_task_set_specific(ABT_task task, ABT_key key, void *value)
Set a value with a work-unit-specific key in a work unit.
ABT_task_cancel
int ABT_task_cancel(ABT_task task)
Send a termination request to a work unit.
Definition: task.c:221
ABTI_SETUP_LOCAL_XSTREAM
#define ABTI_SETUP_LOCAL_XSTREAM(pp_local_xstream)
Definition: abti_error.h:73
ABT_THREAD_STATE_READY
@ ABT_THREAD_STATE_READY
Definition: abt.h:417
ABT_task_self_id
int ABT_task_self_id(ABT_unit_id *id)
Get ID of the calling work unit.
Definition: task.c:312
ABT_task_self
int ABT_task_self(ABT_task *task)
Get the calling work unit.
Definition: task.c:263
ABT_task
struct ABT_thread_opaque * ABT_task
Work unit handle type.
Definition: abt.h:931
ABT_task_get_last_pool_id
int ABT_task_get_last_pool_id(ABT_task task, int *id)
Get the last pool's ID of a work unit.
Definition: task.c:421
ABTI_SETUP_GLOBAL
#define ABTI_SETUP_GLOBAL(pp_global)
Definition: abti_error.h:59
ABTI_thread::type
ABTI_thread_type type
Definition: abti.h:375
ABT_TASK_STATE_TERMINATED
@ ABT_TASK_STATE_TERMINATED
Definition: abt.h:440
ABTI_global_get_global
static ABTI_global * ABTI_global_get_global(void)
Definition: abti_global.h:9
ABTI_tool_event_thread_create
#define ABTI_tool_event_thread_create(p_local, p_thread, p_caller, p_pool)
Definition: abti_tool.h:258
ABT_thread_set_migratable
int ABT_thread_set_migratable(ABT_thread thread, ABT_bool migratable) ABT_API_PUBLIC
Set the migratability in a work unit.
Definition: thread.c:1724
ABTI_CHECK_ERROR
#define ABTI_CHECK_ERROR(abt_errno)
Definition: abti_error.h:120
ABTI_mem_alloc_nythread
static ABTU_ret_err int ABTI_mem_alloc_nythread(ABTI_local *p_local, ABTI_thread **pp_thread)
Definition: abti_mem.h:98
ABT_thread_get_id
int ABT_thread_get_id(ABT_thread thread, ABT_unit_id *thread_id) ABT_API_PUBLIC
Get ID of a work unit.
Definition: thread.c:1983
ABTI_thread::p_arg
void * p_arg
Definition: abti.h:380
ABTI_thread::unit
ABT_unit unit
Definition: abti.h:376
ABTI_xstream::type
ABTI_xstream_type type
Definition: abti.h:270
ABTI_THREAD_TYPE_YIELDABLE
#define ABTI_THREAD_TYPE_YIELDABLE
Definition: abti.h:86
ABTI_thread::request
ABTD_atomic_uint32 request
Definition: abti.h:382
ABTI_thread_get_id
ABT_unit_id ABTI_thread_get_id(ABTI_thread *p_thread)
Definition: thread.c:2561
ABTI_pool_push
static void ABTI_pool_push(ABTI_pool *p_pool, ABT_unit unit)
Definition: abti_pool.h:53
ABTI_thread_get_handle
static ABT_thread ABTI_thread_get_handle(ABTI_thread *p_thread)
Definition: abti_thread.h:24
ABTI_thread
Definition: abti.h:371
ABTI_IS_ERROR_CHECK_ENABLED
#define ABTI_IS_ERROR_CHECK_ENABLED
Definition: abti.h:20
ABT_thread_get_state
int ABT_thread_get_state(ABT_thread thread, ABT_thread_state *state) ABT_API_PUBLIC
Get a state of a work unit.
Definition: thread.c:858
ABTI_xstream
Definition: abti.h:264
ABT_pool
struct ABT_pool_opaque * ABT_pool
Pool handle type.
Definition: abt.h:841
task_create
static ABTU_ret_err int task_create(ABTI_global *p_global, ABTI_local *p_local, ABTI_pool *p_pool, void(*task_func)(void *), void *arg, ABTI_sched *p_sched, int refcount, ABTI_thread **pp_newtask)
Definition: task.c:559
ABT_thread_state
ABT_thread_state
State of a work unit.
Definition: abt.h:415
ABT_thread_free
int ABT_thread_free(ABT_thread *thread) ABT_API_PUBLIC
Free a work unit.
Definition: thread.c:420
ABTI_mem_free_nythread
static void ABTI_mem_free_nythread(ABTI_global *p_global, ABTI_local *p_local, ABTI_thread *p_thread)
Definition: abti_mem.h:108
ABT_task_get_arg
int ABT_task_get_arg(ABT_task task, void **arg)
Retrieve an argument for a work-unit function of a work unit.
Definition: task.c:520
ABTI_pool
Definition: abti.h:327
ABT_task_create_on_xstream
int ABT_task_create_on_xstream(ABT_xstream xstream, void(*task_func)(void *), void *arg, ABT_task *newtask)
Create a new tasklet associated with an execution stream.
Definition: task.c:122
abti.h
ABTD_atomic_relaxed_store_uint32
static void ABTD_atomic_relaxed_store_uint32(ABTD_atomic_uint32 *ptr, uint32_t val)
Definition: abtd_atomic.h:1025
ABT_unit_id
uint64_t ABT_unit_id
Work unit ID type.
Definition: abt.h:879
ABT_xstream
struct ABT_xstream_opaque * ABT_xstream
Execution stream handle type.
Definition: abt.h:789
ABTI_thread::state
ABTD_atomic_int state
Definition: abti.h:381
ABT_task_is_unnamed
int ABT_task_is_unnamed(ABT_task task, ABT_bool *flag)
Check if a work unit is unnamed.
ABT_thread_get_last_pool
int ABT_thread_get_last_pool(ABT_thread thread, ABT_pool *pool) ABT_API_PUBLIC
Get the last pool of a work unit.
Definition: thread.c:898
ABTI_HANDLE_ERROR
#define ABTI_HANDLE_ERROR(n)
Definition: abti_error.h:114
ABT_TASK_STATE_RUNNING
@ ABT_TASK_STATE_RUNNING
Definition: abt.h:438
LOG_DEBUG
#define LOG_DEBUG(fmt,...)
Definition: abti_log.h:26
ABTI_THREAD_TYPE_MIGRATABLE
#define ABTI_THREAD_TYPE_MIGRATABLE
Definition: abti.h:88
ABTD_atomic_relaxed_store_int
static void ABTD_atomic_relaxed_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:996
ABT_thread_join
int ABT_thread_join(ABT_thread thread) ABT_API_PUBLIC
Wait for a work unit to terminate.
Definition: thread.c:538
ABTI_THREAD_TYPE_THREAD
#define ABTI_THREAD_TYPE_THREAD
Definition: abti.h:82
ABT_task_revive
int ABT_task_revive(ABT_pool pool, void(*task_func)(void *), void *arg, ABT_task *task)
Revive a terminated work unit.
Definition: task.c:161
ABT_task_free
int ABT_task_free(ABT_task *task)
Free a work unit.
Definition: task.c:182
ABTI_local_get_local
static ABTI_local * ABTI_local_get_local(void)
Definition: abti_local.h:41
ABT_TASK_STATE_READY
@ ABT_TASK_STATE_READY
Definition: abt.h:436
ABT_ERR_INV_TASK
#define ABT_ERR_INV_TASK
Error code: invalid work unit.
Definition: abt.h:190
ABTI_xstream_get_main_pool
static ABTI_pool * ABTI_xstream_get_main_pool(ABTI_xstream *p_xstream)
Definition: abti_stream.h:42
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
ABT_TASK_NULL
#define ABT_TASK_NULL
Definition: abt.h:1064
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:146
ABT_thread_get_last_xstream
int ABT_thread_get_last_xstream(ABT_thread thread, ABT_xstream *xstream) ABT_API_PUBLIC
Get an execution stream associated with a work unit.
Definition: thread.c:817
ABTI_local_get_xstream_or_null
static ABTI_xstream * ABTI_local_get_xstream_or_null(ABTI_local *p_local)
Definition: abti_local.h:77
ABTI_thread_type
uint32_t ABTI_thread_type
Definition: abti.h:126
ABTI_pool_get_ptr
static ABTI_pool * ABTI_pool_get_ptr(ABT_pool pool)
Definition: abti_pool.h:11
ABTI_sched
Definition: abti.h:289
ABT_task_get_id
int ABT_task_get_id(ABT_task task, ABT_unit_id *task_id)
Get ID of a work unit.
Definition: task.c:501
ABT_thread_is_migratable
int ABT_thread_is_migratable(ABT_thread thread, ABT_bool *is_migratable) ABT_API_PUBLIC
Get the migratability of a work unit.
Definition: thread.c:1781
ABT_task_get_last_pool
int ABT_task_get_last_pool(ABT_task task, ABT_pool *pool)
Get the last pool of a work unit.
Definition: task.c:402
ABTI_CHECK_NULL_POOL_PTR
#define ABTI_CHECK_NULL_POOL_PTR(p)
Definition: abti_error.h:168
ABTI_CHECK_NULL_XSTREAM_PTR
#define ABTI_CHECK_NULL_XSTREAM_PTR(p)
Definition: abti_error.h:159
ABTI_thread::p_parent
ABTI_thread * p_parent
Definition: abti.h:378
ABTI_THREAD_TYPE_NAMED
#define ABTI_THREAD_TYPE_NAMED
Definition: abti.h:87
ABT_task_create
int ABT_task_create(ABT_pool pool, void(*task_func)(void *), void *arg, ABT_task *newtask)
Create a new tasklet.
Definition: task.c:57
ABTI_xstream_get_ptr
static ABTI_xstream * ABTI_xstream_get_ptr(ABT_xstream xstream)
Definition: abti_stream.h:11
ABTI_thread::p_last_xstream
ABTI_xstream * p_last_xstream
Definition: abti.h:377
ABT_task_equal
int ABT_task_equal(ABT_task task1, ABT_task task2, ABT_bool *result)
Compare two work-unit handles for equality.
Definition: task.c:485
ABTI_thread::p_keytable
ABTD_atomic_ptr p_keytable
Definition: abti.h:384
ABTI_local
struct ABTI_local ABTI_local
Definition: abti.h:110
ABT_task_get_state
int ABT_task_get_state(ABT_task task, ABT_task_state *state)
Get a state of a tasklet.
Definition: task.c:375
ABT_task_set_migratable
int ABT_task_set_migratable(ABT_task task, ABT_bool flag)
Set the migratability in a work unit.
Definition: task.c:440
ABT_task_get_specific
int ABT_task_get_specific(ABT_task task, ABT_key key, void **value)
Get a value associated with a work-unit-specific key in a work unit.
ABT_task_get_xstream
int ABT_task_get_xstream(ABT_task task, ABT_xstream *xstream)
Get an execution stream associated with a work unit.
Definition: task.c:340
ABT_thread_get_last_pool_id
int ABT_thread_get_last_pool_id(ABT_thread thread, int *id) ABT_API_PUBLIC
Get the last pool's ID of a work unit.
Definition: thread.c:938
ABTI_CHECK_TRUE
#define ABTI_CHECK_TRUE(cond, abt_errno)
Definition: abti_error.h:130
ABT_thread_revive
int ABT_thread_revive(ABT_pool pool, void(*thread_func)(void *), void *arg, ABT_thread *thread) ABT_API_PUBLIC
Revive a terminated work unit.
Definition: thread.c:358
ABTI_global
Definition: abti.h:196
ABT_thread_equal
int ABT_thread_equal(ABT_thread thread1, ABT_thread thread2, ABT_bool *result) ABT_API_PUBLIC
Compare two work unit handles for equality.
Definition: thread.c:1908
ABTI_thread::f_thread
void(* f_thread)(void *)
Definition: abti.h:379
ABTD_atomic_relaxed_store_ptr
static void ABTD_atomic_relaxed_store_ptr(ABTD_atomic_ptr *ptr, void *val)
Definition: abtd_atomic.h:1055
ABTI_local_get_xstream
static ABTI_xstream * ABTI_local_get_xstream(ABTI_local *p_local)
Definition: abti_local.h:86
ABT_task_is_migratable
int ABT_task_is_migratable(ABT_task task, ABT_bool *flag)
Get the migratability of a work unit.
Definition: task.c:459
ABTI_thread_init_pool
static ABTU_ret_err int ABTI_thread_init_pool(ABTI_global *p_global, ABTI_thread *p_thread, ABTI_pool *p_pool)
Definition: abti_unit.h:122
ABT_thread_cancel
int ABT_thread_cancel(ABT_thread thread) ABT_API_PUBLIC
Send a cancellation request to a work unit.
Definition: thread.c:674
ABTI_thread::id
ABT_unit_id id
Definition: abti.h:385
ABT_task_join
int ABT_task_join(ABT_task task)
Wait for a work unit to terminate.
Definition: task.c:203
ABTI_TASK_INIT_ID
#define ABTI_TASK_INIT_ID
Definition: abti.h:54