ARGOBOTS  66b1c39742507d8df30e8d28c54839b961a14814
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
futures.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
56 int ABT_future_create(uint32_t compartments, void (*cb_func)(void **arg),
57  ABT_future *newfuture)
58 {
59  int abt_errno;
60  ABTI_future *p_future;
61 
62  abt_errno = ABTU_malloc(sizeof(ABTI_future), (void **)&p_future);
63  ABTI_CHECK_ERROR(abt_errno);
64  ABTI_spinlock_clear(&p_future->lock);
66  p_future->compartments = compartments;
67  abt_errno =
68  ABTU_malloc(compartments * sizeof(void *), (void **)&p_future->array);
69  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
70  ABTU_free(p_future);
71  ABTI_HANDLE_ERROR(abt_errno);
72  }
73  p_future->p_callback = cb_func;
74  p_future->p_head = NULL;
75  p_future->p_tail = NULL;
76 
77  *newfuture = ABTI_future_get_handle(p_future);
78  return ABT_SUCCESS;
79 }
80 
94 {
95  ABTI_future *p_future = ABTI_future_get_ptr(*future);
97 
98  /* The lock needs to be acquired to safely free the future structure.
99  * However, we do not have to unlock it because the entire structure is
100  * freed here. */
101  ABTI_spinlock_acquire(&p_future->lock);
102 
103  ABTU_free(p_future->array);
104  ABTU_free(p_future);
105 
106  *future = ABT_FUTURE_NULL;
107  return ABT_SUCCESS;
108 }
109 
126 {
127  ABTI_local *p_local = ABTI_local_get_local();
128  ABTI_future *p_future = ABTI_future_get_ptr(future);
129  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
130 
131  ABTI_spinlock_acquire(&p_future->lock);
132  if (ABTD_atomic_relaxed_load_uint32(&p_future->counter) <
133  p_future->compartments) {
134  ABTI_ythread *p_ythread = NULL;
135  ABTI_thread *p_thread;
136 
137  ABTI_xstream *p_local_xstream = ABTI_local_get_xstream_or_null(p_local);
138  if (!ABTI_IS_EXT_THREAD_ENABLED || p_local_xstream) {
139  p_thread = p_local_xstream->p_thread;
140  p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
141  }
142  if (!p_ythread) {
143  /* external thread */
144  int abt_errno =
145  ABTU_calloc(1, sizeof(ABTI_thread), (void **)&p_thread);
146  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
147  ABTI_spinlock_release(&p_future->lock);
148  ABTI_HANDLE_ERROR(abt_errno);
149  }
150  p_thread->type = ABTI_THREAD_TYPE_EXT;
151  /* use state for synchronization */
154  }
155 
156  p_thread->p_next = NULL;
157  if (p_future->p_head == NULL) {
158  p_future->p_head = p_thread;
159  p_future->p_tail = p_thread;
160  } else {
161  p_future->p_tail->p_next = p_thread;
162  p_future->p_tail = p_thread;
163  }
164 
165  if (p_ythread) {
166  ABTI_ythread_set_blocked(p_ythread);
167 
168  ABTI_spinlock_release(&p_future->lock);
169 
170  /* Suspend the current ULT */
171  ABTI_ythread_suspend(&p_local_xstream, p_ythread,
172  ABT_SYNC_EVENT_TYPE_FUTURE, (void *)p_future);
173 
174  } else {
175  ABTI_spinlock_release(&p_future->lock);
176 
177  /* External thread is waiting here. */
178  while (ABTD_atomic_acquire_load_int(&p_thread->state) !=
180  ;
181  ABTU_free(p_thread);
182  }
183  } else {
184  ABTI_spinlock_release(&p_future->lock);
185  }
186  return ABT_SUCCESS;
187 }
188 
202 {
203  ABTI_future *p_future = ABTI_future_get_ptr(future);
204  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
205 
206  uint32_t counter = ABTD_atomic_acquire_load_uint32(&p_future->counter);
207  *flag = (counter == p_future->compartments) ? ABT_TRUE : ABT_FALSE;
208  return ABT_SUCCESS;
209 }
210 
228 int ABT_future_set(ABT_future future, void *value)
229 {
230  ABTI_local *p_local = ABTI_local_get_local();
231  ABTI_future *p_future = ABTI_future_get_ptr(future);
232  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
233 
234  ABTI_spinlock_acquire(&p_future->lock);
235 
236  int counter = ABTD_atomic_relaxed_load_uint32(&p_future->counter);
237 #ifndef ABT_CONFIG_DISABLE_ERROR_CHECK
238  if (counter >= p_future->compartments) {
239  ABTI_spinlock_release(&p_future->lock);
241  }
242 #endif
243  p_future->array[counter] = value;
244  counter++;
245  ABTD_atomic_release_store_uint32(&p_future->counter, counter);
246 
247  if (counter == p_future->compartments) {
248  if (p_future->p_callback != NULL)
249  (*p_future->p_callback)(p_future->array);
250 
251  if (p_future->p_head == NULL) {
252  ABTI_spinlock_release(&p_future->lock);
253  return ABT_SUCCESS;
254  }
255 
256  /* Wake up all waiting ULTs */
257  ABTI_thread *p_head = p_future->p_head;
258  ABTI_thread *p_thread = p_head;
259  while (1) {
260  ABTI_thread *p_next = p_thread->p_next;
261  p_thread->p_next = NULL;
262 
263  ABTI_ythread *p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
264  if (p_ythread) {
265  ABTI_ythread_set_ready(p_local, p_ythread);
266  } else {
267  /* When the head is an external thread */
270  }
271 
272  /* Next ULT */
273  if (p_next != NULL) {
274  p_thread = p_next;
275  } else {
276  break;
277  }
278  }
279  p_future->p_head = NULL;
280  p_future->p_tail = NULL;
281  }
282 
283  ABTI_spinlock_release(&p_future->lock);
284  return ABT_SUCCESS;
285 }
286 
300 {
301  ABTI_future *p_future = ABTI_future_get_ptr(future);
302  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
303 
304  ABTI_spinlock_acquire(&p_future->lock);
306  ABTI_spinlock_release(&p_future->lock);
307  return ABT_SUCCESS;
308 }
int ABT_future_set(ABT_future future, void *value) ABT_API_PUBLIC
Signal the future.
Definition: futures.c:228
int ABT_future_test(ABT_future future, ABT_bool *flag) ABT_API_PUBLIC
Test whether the future is ready.
Definition: futures.c:201
void ** array
Definition: abti.h:404
static ABTI_ythread * ABTI_thread_get_ythread_or_null(ABTI_thread *p_thread)
Definition: abti_thread.h:59
ABTI_thread * p_tail
Definition: abti.h:407
struct ABTI_local ABTI_local
Definition: abti.h:101
static void ABTD_atomic_release_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:924
ABTI_thread_type type
Definition: abti.h:316
ABTI_thread * p_thread
Definition: abti.h:251
static void ABTD_atomic_release_store_uint32(ABTD_atomic_uint32 *ptr, uint32_t val)
Definition: abtd_atomic.h:947
int ABT_bool
Definition: abt.h:373
uint32_t compartments
Definition: abti.h:403
static void ABTI_spinlock_clear(ABTI_spinlock *p_lock)
Definition: abti_spinlock.h:18
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:142
#define ABT_FUTURE_NULL
Definition: abt.h:424
#define ABT_FALSE
Definition: abt.h:285
int ABT_future_free(ABT_future *future) ABT_API_PUBLIC
Free the future object.
Definition: futures.c:93
static uint32_t ABTD_atomic_acquire_load_uint32(const ABTD_atomic_uint32 *ptr)
Definition: abtd_atomic.h:797
int ABT_future_wait(ABT_future future) ABT_API_PUBLIC
Wait on the future.
Definition: futures.c:125
ABTD_atomic_int state
Definition: abti.h:322
int ABT_future_create(uint32_t compartments, void(*cb_func)(void **arg), ABT_future *newfuture) ABT_API_PUBLIC
Create a future.
Definition: futures.c:56
#define ABTI_IS_EXT_THREAD_ENABLED
Definition: abti.h:28
ABTI_thread * p_head
Definition: abti.h:406
static ABT_future ABTI_future_get_handle(ABTI_future *p_future)
Definition: abti_future.h:26
static void ABTI_spinlock_release(ABTI_spinlock *p_lock)
Definition: abti_spinlock.h:31
#define ABT_SUCCESS
Definition: abt.h:64
ABTD_atomic_uint32 counter
Definition: abti.h:402
#define ABT_TRUE
Definition: abt.h:284
#define ABTI_CHECK_NULL_FUTURE_PTR(p)
Definition: abti_error.h:274
void(* p_callback)(void **arg)
Definition: abti.h:405
struct ABT_future_opaque * ABT_future
Definition: abt.h:367
static void ABTD_atomic_relaxed_store_uint32(ABTD_atomic_uint32 *ptr, uint32_t val)
Definition: abtd_atomic.h:884
static void ABTI_spinlock_acquire(ABTI_spinlock *p_lock)
Definition: abti_spinlock.h:23
static ABTI_future * ABTI_future_get_ptr(ABT_future future)
Definition: abti_future.h:11
static uint32_t ABTD_atomic_relaxed_load_uint32(const ABTD_atomic_uint32 *ptr)
Definition: abtd_atomic.h:689
#define ABTI_CHECK_ERROR(abt_errno)
Definition: abti_error.h:127
static void ABTD_atomic_relaxed_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:865
#define ABT_ERR_FUTURE
Definition: abt.h:110
ABTI_thread * p_next
Definition: abti.h:314
static int ABTD_atomic_acquire_load_int(const ABTD_atomic_int *ptr)
Definition: abtd_atomic.h:763
ABTI_spinlock lock
Definition: abti.h:401
void ABTI_ythread_set_ready(ABTI_local *p_local, ABTI_ythread *p_ythread)
Definition: ythread.c:50
int ABT_future_reset(ABT_future future) ABT_API_PUBLIC
Reset the readiness of the target future.
Definition: futures.c:299
void ABTI_ythread_set_blocked(ABTI_ythread *p_ythread)
Definition: ythread.c:8
static ABTI_local * ABTI_local_get_local(void)
Definition: abti_local.h:41
#define ABTI_IS_ERROR_CHECK_ENABLED
Definition: abti.h:20
static ABTU_ret_err int ABTU_calloc(size_t num, size_t size, void **p_ptr)
Definition: abtu.h:152
static void ABTU_free(void *ptr)
Definition: abtu.h:135
#define ABTI_THREAD_TYPE_EXT
Definition: abti.h:72
void ABTI_ythread_suspend(ABTI_xstream **pp_local_xstream, ABTI_ythread *p_ythread, ABT_sync_event_type sync_event_type, void *p_sync)
Definition: ythread.c:30
static ABTI_xstream * ABTI_local_get_xstream_or_null(ABTI_local *p_local)
Definition: abti_local.h:77
#define ABTI_HANDLE_ERROR(n)
Definition: abti_error.h:121