ARGOBOTS
futures.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
56 int ABT_future_create(uint32_t compartments, void (*cb_func)(void **arg),
57  ABT_future *newfuture)
58 {
59  int abt_errno = ABT_SUCCESS;
60  ABTI_future *p_future;
61 
62  p_future = (ABTI_future *)ABTU_malloc(sizeof(ABTI_future));
63  ABTI_spinlock_clear(&p_future->lock);
64  ABTD_atomic_relaxed_store_uint32(&p_future->counter, 0);
65  p_future->compartments = compartments;
66  p_future->array = ABTU_malloc(compartments * sizeof(void *));
67  p_future->p_callback = cb_func;
68  p_future->p_head = NULL;
69  p_future->p_tail = NULL;
70 
71  *newfuture = ABTI_future_get_handle(p_future);
72 
73  return abt_errno;
74 }
75 
89 {
90  int abt_errno = ABT_SUCCESS;
91  ABTI_future *p_future = ABTI_future_get_ptr(*future);
92  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
93 
94  /* The lock needs to be acquired to safely free the future structure.
95  * However, we do not have to unlock it because the entire structure is
96  * freed here. */
97  ABTI_spinlock_acquire(&p_future->lock);
98 
99  ABTU_free(p_future->array);
100  ABTU_free(p_future);
101 
102  *future = ABT_FUTURE_NULL;
103 
104 fn_exit:
105  return abt_errno;
106 
107 fn_fail:
108  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
109  goto fn_exit;
110 }
111 
128 {
129  int abt_errno = ABT_SUCCESS;
130  ABTI_local *p_local = ABTI_local_get_local();
131  ABTI_future *p_future = ABTI_future_get_ptr(future);
132  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
133 
134  ABTI_spinlock_acquire(&p_future->lock);
135  if (ABTD_atomic_relaxed_load_uint32(&p_future->counter) <
136  p_future->compartments) {
137  ABTI_thread *p_current;
138  ABTI_unit *p_unit;
139  ABT_unit_type type;
140  ABTD_atomic_int32 ext_signal = ABTD_ATOMIC_INT32_STATIC_INITIALIZER(0);
141 
142  if (p_local != NULL) {
143  p_current = p_local->p_thread;
144 #ifndef ABT_CONFIG_DISABLE_ERROR_CHECK
145  if (p_current == NULL) {
146  abt_errno = ABT_ERR_FUTURE;
147  ABTI_spinlock_release(&p_future->lock);
148  goto fn_fail;
149  }
150 #endif
151  type = ABT_UNIT_TYPE_THREAD;
152  p_unit = &p_current->unit_def;
153  p_unit->handle.thread = ABTI_thread_get_handle(p_current);
154  p_unit->type = type;
155  } else {
156  /* external thread */
157  type = ABT_UNIT_TYPE_EXT;
158  p_unit = (ABTI_unit *)ABTU_calloc(1, sizeof(ABTI_unit));
159  /* Check size if ext_signal can be stored in p_unit->handle.thread.
160  */
161  ABTI_STATIC_ASSERT(sizeof(ext_signal) <=
162  sizeof(p_unit->handle.thread));
163  p_unit->handle.thread = (ABT_thread)&ext_signal;
164  p_unit->type = type;
165  }
166 
167  p_unit->p_next = NULL;
168  if (p_future->p_head == NULL) {
169  p_future->p_head = p_unit;
170  p_future->p_tail = p_unit;
171  } else {
172  p_future->p_tail->p_next = p_unit;
173  p_future->p_tail = p_unit;
174  }
175 
176  if (type == ABT_UNIT_TYPE_THREAD) {
177  ABTI_thread_set_blocked(p_current);
178 
179  ABTI_spinlock_release(&p_future->lock);
180 
181  /* Suspend the current ULT */
182  ABTI_thread_suspend(&p_local, p_current);
183 
184  } else {
185  ABTI_spinlock_release(&p_future->lock);
186 
187  /* External thread is waiting here polling ext_signal. */
188  /* FIXME: need a better implementation */
189  while (!ABTD_atomic_acquire_load_int32(&ext_signal))
190  ;
191  ABTU_free(p_unit);
192  }
193  } else {
194  ABTI_spinlock_release(&p_future->lock);
195  }
196 
197 fn_exit:
198  return abt_errno;
199 
200 fn_fail:
201  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
202  goto fn_exit;
203 }
204 
218 {
219  int abt_errno = ABT_SUCCESS;
220  ABTI_future *p_future = ABTI_future_get_ptr(future);
221  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
222 
223  uint32_t counter = ABTD_atomic_acquire_load_uint32(&p_future->counter);
224  *flag = (counter == p_future->compartments) ? ABT_TRUE : ABT_FALSE;
225 
226 fn_exit:
227  return abt_errno;
228 
229 fn_fail:
230  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
231  goto fn_exit;
232 }
233 
251 int ABT_future_set(ABT_future future, void *value)
252 {
253  int abt_errno = ABT_SUCCESS;
254  ABTI_local *p_local = ABTI_local_get_local();
255  ABTI_future *p_future = ABTI_future_get_ptr(future);
256  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
257 
258  ABTI_spinlock_acquire(&p_future->lock);
259 
260  int counter = ABTD_atomic_relaxed_load_uint32(&p_future->counter);
261 #ifndef ABT_CONFIG_DISABLE_ERROR_CHECK
262  if (counter >= p_future->compartments) {
263  abt_errno = ABT_ERR_FUTURE;
264  ABTI_spinlock_release(&p_future->lock);
265  goto fn_fail;
266  }
267 #endif
268  p_future->array[counter] = value;
269  counter++;
270  ABTD_atomic_release_store_uint32(&p_future->counter, counter);
271 
272  if (counter == p_future->compartments) {
273  if (p_future->p_callback != NULL)
274  (*p_future->p_callback)(p_future->array);
275 
276  if (p_future->p_head == NULL) {
277  ABTI_spinlock_release(&p_future->lock);
278  goto fn_exit;
279  }
280 
281  /* Wake up all waiting ULTs */
282  ABTI_unit *p_head = p_future->p_head;
283  ABTI_unit *p_unit = p_head;
284  while (1) {
285  ABTI_unit *p_next = p_unit->p_next;
286  ABT_unit_type type = p_unit->type;
287 
288  p_unit->p_next = NULL;
289 
290  if (type == ABT_UNIT_TYPE_THREAD) {
291  ABTI_thread *p_thread =
292  ABTI_thread_get_ptr(p_unit->handle.thread);
293  ABTI_thread_set_ready(p_local, p_thread);
294  } else {
295  /* When the head is an external thread */
296  ABTD_atomic_int32 *p_ext_signal =
297  (ABTD_atomic_int32 *)p_unit->handle.thread;
298  ABTD_atomic_release_store_int32(p_ext_signal, 1);
299  }
300 
301  /* Next ULT */
302  if (p_next != NULL) {
303  p_unit = p_next;
304  } else {
305  break;
306  }
307  }
308  p_future->p_head = NULL;
309  p_future->p_tail = NULL;
310  }
311 
312  ABTI_spinlock_release(&p_future->lock);
313 
314 fn_exit:
315  return abt_errno;
316 
317 fn_fail:
318  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
319  goto fn_exit;
320 }
321 
335 {
336  int abt_errno = ABT_SUCCESS;
337  ABTI_future *p_future = ABTI_future_get_ptr(future);
338  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
339 
340  ABTI_spinlock_acquire(&p_future->lock);
341  ABTD_atomic_release_store_uint32(&p_future->counter, 0);
342  ABTI_spinlock_release(&p_future->lock);
343 
344 fn_exit:
345  return abt_errno;
346 
347 fn_fail:
348  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
349  goto fn_exit;
350 }
int ABT_future_set(ABT_future future, void *value)
Signal the future.
Definition: futures.c:251
int ABT_future_test(ABT_future future, ABT_bool *flag)
Test whether the future is ready.
Definition: futures.c:217
static void * ABTU_malloc(size_t size)
Definition: abtu.h:39
int ABT_bool
Definition: abt.h:309
#define ABT_FUTURE_NULL
Definition: abt.h:353
#define ABT_FALSE
Definition: abt.h:224
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:279
int ABT_future_free(ABT_future *future)
Free the future object.
Definition: futures.c:88
int ABT_future_wait(ABT_future future)
Wait on the future.
Definition: futures.c:127
int ABT_future_create(uint32_t compartments, void(*cb_func)(void **arg), ABT_future *newfuture)
Create a future.
Definition: futures.c:56
#define HANDLE_ERROR_FUNC_WITH_CODE(n)
Definition: abti_error.h:241
#define ABT_SUCCESS
Definition: abt.h:64
#define ABT_TRUE
Definition: abt.h:223
ABT_unit_type
Definition: abt.h:170
struct ABT_future_opaque * ABT_future
Definition: abt.h:303
#define ABT_ERR_FUTURE
Definition: abt.h:109
int ABT_future_reset(ABT_future future)
Reset the readiness of the target future.
Definition: futures.c:334
static void ABTU_free(void *ptr)
Definition: abtu.h:32
static void * ABTU_calloc(size_t num, size_t size)
Definition: abtu.h:49