ARGOBOTS  7496202f85916e93d6d143320764c2aba5026d93
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
futures.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
56 int ABT_future_create(uint32_t compartments, void (*cb_func)(void **arg),
57  ABT_future *newfuture)
58 {
59  int abt_errno;
60  ABTI_future *p_future;
61 
62  abt_errno = ABTU_malloc(sizeof(ABTI_future), (void **)&p_future);
63  ABTI_CHECK_ERROR(abt_errno);
64  ABTI_spinlock_clear(&p_future->lock);
65  ABTD_atomic_relaxed_store_uint32(&p_future->counter, 0);
66  p_future->compartments = compartments;
67  abt_errno =
68  ABTU_malloc(compartments * sizeof(void *), (void **)&p_future->array);
69  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
70  ABTU_free(p_future);
71  ABTI_HANDLE_ERROR(abt_errno);
72  }
73  p_future->p_callback = cb_func;
74  p_future->p_head = NULL;
75  p_future->p_tail = NULL;
76 
77  *newfuture = ABTI_future_get_handle(p_future);
78  return ABT_SUCCESS;
79 }
80 
94 {
95  ABTI_future *p_future = ABTI_future_get_ptr(*future);
96  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
97 
98  /* The lock needs to be acquired to safely free the future structure.
99  * However, we do not have to unlock it because the entire structure is
100  * freed here. */
101  ABTI_spinlock_acquire(&p_future->lock);
102 
103  ABTU_free(p_future->array);
104  ABTU_free(p_future);
105 
106  *future = ABT_FUTURE_NULL;
107  return ABT_SUCCESS;
108 }
109 
126 {
127  ABTI_local *p_local = ABTI_local_get_local();
128  ABTI_future *p_future = ABTI_future_get_ptr(future);
129  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
130 
131  ABTI_spinlock_acquire(&p_future->lock);
132  if (ABTD_atomic_relaxed_load_uint32(&p_future->counter) <
133  p_future->compartments) {
134  ABTI_ythread *p_ythread = NULL;
135  ABTI_thread *p_thread;
136 
137  ABTI_xstream *p_local_xstream = ABTI_local_get_xstream_or_null(p_local);
138  if (!ABTI_IS_EXT_THREAD_ENABLED || p_local_xstream) {
139  p_thread = p_local_xstream->p_thread;
140  p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
141  }
142  if (!p_ythread) {
143  /* external thread */
144  int abt_errno =
145  ABTU_calloc(1, sizeof(ABTI_thread), (void **)&p_thread);
146  if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
147  ABTI_spinlock_release(&p_future->lock);
148  ABTI_HANDLE_ERROR(abt_errno);
149  }
150  p_thread->type = ABTI_THREAD_TYPE_EXT;
151  /* use state for synchronization */
152  ABTD_atomic_relaxed_store_int(&p_thread->state,
154  }
155 
156  p_thread->p_next = NULL;
157  if (p_future->p_head == NULL) {
158  p_future->p_head = p_thread;
159  p_future->p_tail = p_thread;
160  } else {
161  p_future->p_tail->p_next = p_thread;
162  p_future->p_tail = p_thread;
163  }
164 
165  if (p_ythread) {
166  ABTI_ythread_set_blocked(p_ythread);
167 
168  ABTI_spinlock_release(&p_future->lock);
169 
170  /* Suspend the current ULT */
171  ABTI_ythread_suspend(&p_local_xstream, p_ythread,
172  ABT_SYNC_EVENT_TYPE_FUTURE, (void *)p_future);
173 
174  } else {
175  ABTI_spinlock_release(&p_future->lock);
176 
177  /* External thread is waiting here. */
178  while (ABTD_atomic_acquire_load_int(&p_thread->state) !=
180  ;
181  ABTU_free(p_thread);
182  }
183  } else {
184  ABTI_spinlock_release(&p_future->lock);
185  }
186  return ABT_SUCCESS;
187 }
188 
202 {
203  ABTI_future *p_future = ABTI_future_get_ptr(future);
204  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
205 
206  uint32_t counter = ABTD_atomic_acquire_load_uint32(&p_future->counter);
207  *flag = (counter == p_future->compartments) ? ABT_TRUE : ABT_FALSE;
208  return ABT_SUCCESS;
209 }
210 
228 int ABT_future_set(ABT_future future, void *value)
229 {
230  ABTI_local *p_local = ABTI_local_get_local();
231  ABTI_future *p_future = ABTI_future_get_ptr(future);
232  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
233 
234  ABTI_spinlock_acquire(&p_future->lock);
235 
236  int counter = ABTD_atomic_relaxed_load_uint32(&p_future->counter);
237 #ifndef ABT_CONFIG_DISABLE_ERROR_CHECK
238  if (counter >= p_future->compartments) {
239  ABTI_spinlock_release(&p_future->lock);
240  ABTI_HANDLE_ERROR(ABT_ERR_FUTURE);
241  }
242 #endif
243  p_future->array[counter] = value;
244  counter++;
245  ABTD_atomic_release_store_uint32(&p_future->counter, counter);
246 
247  if (counter == p_future->compartments) {
248  if (p_future->p_callback != NULL)
249  (*p_future->p_callback)(p_future->array);
250 
251  if (p_future->p_head == NULL) {
252  ABTI_spinlock_release(&p_future->lock);
253  return ABT_SUCCESS;
254  }
255 
256  /* Wake up all waiting ULTs */
257  ABTI_thread *p_head = p_future->p_head;
258  ABTI_thread *p_thread = p_head;
259  while (1) {
260  ABTI_thread *p_next = p_thread->p_next;
261  p_thread->p_next = NULL;
262 
263  ABTI_ythread *p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
264  if (p_ythread) {
265  ABTI_ythread_set_ready(p_local, p_ythread);
266  } else {
267  /* When the head is an external thread */
268  ABTD_atomic_release_store_int(&p_thread->state,
270  }
271 
272  /* Next ULT */
273  if (p_next != NULL) {
274  p_thread = p_next;
275  } else {
276  break;
277  }
278  }
279  p_future->p_head = NULL;
280  p_future->p_tail = NULL;
281  }
282 
283  ABTI_spinlock_release(&p_future->lock);
284  return ABT_SUCCESS;
285 }
286 
300 {
301  ABTI_future *p_future = ABTI_future_get_ptr(future);
302  ABTI_CHECK_NULL_FUTURE_PTR(p_future);
303 
304  ABTI_spinlock_acquire(&p_future->lock);
305  ABTD_atomic_release_store_uint32(&p_future->counter, 0);
306  ABTI_spinlock_release(&p_future->lock);
307  return ABT_SUCCESS;
308 }
int ABT_future_set(ABT_future future, void *value) ABT_API_PUBLIC
Signal the future.
Definition: futures.c:228
int ABT_future_test(ABT_future future, ABT_bool *flag) ABT_API_PUBLIC
Test whether the future is ready.
Definition: futures.c:201
int ABT_bool
Definition: abt.h:373
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:142
#define ABT_FUTURE_NULL
Definition: abt.h:424
#define ABT_FALSE
Definition: abt.h:285
int ABT_future_free(ABT_future *future) ABT_API_PUBLIC
Free the future object.
Definition: futures.c:93
int ABT_future_wait(ABT_future future) ABT_API_PUBLIC
Wait on the future.
Definition: futures.c:125
int ABT_future_create(uint32_t compartments, void(*cb_func)(void **arg), ABT_future *newfuture) ABT_API_PUBLIC
Create a future.
Definition: futures.c:56
#define ABT_SUCCESS
Definition: abt.h:64
#define ABT_TRUE
Definition: abt.h:284
struct ABT_future_opaque * ABT_future
Definition: abt.h:367
#define ABT_ERR_FUTURE
Definition: abt.h:110
int ABT_future_reset(ABT_future future) ABT_API_PUBLIC
Reset the readiness of the target future.
Definition: futures.c:299
static ABTU_ret_err int ABTU_calloc(size_t num, size_t size, void **p_ptr)
Definition: abtu.h:152
static void ABTU_free(void *ptr)
Definition: abtu.h:135