ARGOBOTS  66b1c39742507d8df30e8d28c54839b961a14814
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
cond.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 #include <sys/time.h>
8 
27 {
28  ABTI_cond *p_newcond;
29  int abt_errno = ABTU_malloc(sizeof(ABTI_cond), (void **)&p_newcond);
30  ABTI_CHECK_ERROR(abt_errno);
31 
32  ABTI_cond_init(p_newcond);
33  /* Return value */
34  *newcond = ABTI_cond_get_handle(p_newcond);
35  return ABT_SUCCESS;
36 }
37 
51 {
52  ABT_cond h_cond = *cond;
53  ABTI_cond *p_cond = ABTI_cond_get_ptr(h_cond);
54  ABTI_CHECK_NULL_COND_PTR(p_cond);
55  ABTI_CHECK_TRUE(p_cond->num_waiters == 0, ABT_ERR_COND);
56 
57  ABTI_cond_fini(p_cond);
58  ABTU_free(p_cond);
59  /* Return value */
60  *cond = ABT_COND_NULL;
61  return ABT_SUCCESS;
62 }
63 
82 {
83  ABTI_local *p_local = ABTI_local_get_local();
84  ABTI_cond *p_cond = ABTI_cond_get_ptr(cond);
85  ABTI_CHECK_NULL_COND_PTR(p_cond);
86  ABTI_mutex *p_mutex = ABTI_mutex_get_ptr(mutex);
87  ABTI_CHECK_NULL_MUTEX_PTR(p_mutex);
88 
89  int abt_errno = ABTI_cond_wait(&p_local, p_cond, p_mutex);
90  ABTI_CHECK_ERROR(abt_errno);
91  return ABT_SUCCESS;
92 }
93 
94 static inline double convert_timespec_to_sec(const struct timespec *p_ts)
95 {
96  double secs;
97  secs = ((double)p_ts->tv_sec) + 1.0e-9 * ((double)p_ts->tv_nsec);
98  return secs;
99 }
100 
101 static inline void remove_thread(ABTI_cond *p_cond, ABTI_thread *p_thread)
102 {
103  if (p_thread->p_next == NULL)
104  return;
105 
106  ABTI_spinlock_acquire(&p_cond->lock);
107 
108  if (p_thread->p_next == NULL) {
109  ABTI_spinlock_release(&p_cond->lock);
110  return;
111  }
112 
113  /* If p_thread is still in the queue, we have to remove it. */
114  p_cond->num_waiters--;
115  if (p_cond->num_waiters == 0) {
116  p_cond->p_waiter_mutex = NULL;
117  p_cond->p_head = NULL;
118  p_cond->p_tail = NULL;
119  } else {
120  p_thread->p_prev->p_next = p_thread->p_next;
121  p_thread->p_next->p_prev = p_thread->p_prev;
122  if (p_thread == p_cond->p_head) {
123  p_cond->p_head = p_thread->p_next;
124  } else if (p_thread == p_cond->p_tail) {
125  p_cond->p_tail = p_thread->p_prev;
126  }
127  }
128 
129  ABTI_spinlock_release(&p_cond->lock);
130 
131  p_thread->p_prev = NULL;
132  p_thread->p_next = NULL;
133 }
134 
158  const struct timespec *abstime)
159 {
160  ABTI_local *p_local = ABTI_local_get_local();
161  ABTI_cond *p_cond = ABTI_cond_get_ptr(cond);
162  ABTI_CHECK_NULL_COND_PTR(p_cond);
163  ABTI_mutex *p_mutex = ABTI_mutex_get_ptr(mutex);
164  ABTI_CHECK_NULL_MUTEX_PTR(p_mutex);
165 
166  double tar_time = convert_timespec_to_sec(abstime);
167 
168  ABTI_thread thread;
169  thread.type = ABTI_THREAD_TYPE_EXT;
170  ABTD_atomic_relaxed_store_int(&thread.state, ABT_THREAD_STATE_BLOCKED);
171 
172  ABTI_spinlock_acquire(&p_cond->lock);
173 
174  if (p_cond->p_waiter_mutex == NULL) {
175  p_cond->p_waiter_mutex = p_mutex;
176  } else {
177  if (p_cond->p_waiter_mutex != p_mutex) {
178  ABTI_spinlock_release(&p_cond->lock);
179  ABTI_HANDLE_ERROR(ABT_ERR_INV_MUTEX);
180  }
181  }
182 
183  if (p_cond->num_waiters == 0) {
184  thread.p_prev = &thread;
185  thread.p_next = &thread;
186  p_cond->p_head = &thread;
187  p_cond->p_tail = &thread;
188  } else {
189  p_cond->p_tail->p_next = &thread;
190  p_cond->p_head->p_prev = &thread;
191  thread.p_prev = p_cond->p_tail;
192  thread.p_next = p_cond->p_head;
193  p_cond->p_tail = &thread;
194  }
195 
196  p_cond->num_waiters++;
197 
198  ABTI_spinlock_release(&p_cond->lock);
199 
200  /* Unlock the mutex that the calling ULT is holding */
201  ABTI_mutex_unlock(p_local, p_mutex);
202 
203  ABTI_xstream *p_local_xstream = ABTI_local_get_xstream_or_null(p_local);
204  ABTI_ythread *p_ythread = NULL;
205  if (!ABTI_IS_EXT_THREAD_ENABLED || p_local_xstream) {
206  p_ythread = ABTI_thread_get_ythread_or_null(p_local_xstream->p_thread);
207  }
208  while (ABTD_atomic_acquire_load_int(&thread.state) !=
210  double cur_time = ABTI_get_wtime();
211  if (cur_time >= tar_time) {
212  remove_thread(p_cond, &thread);
213  /* Lock the mutex again */
214  ABTI_mutex_lock(&p_local, p_mutex);
215  return ABT_ERR_COND_TIMEDOUT;
216  }
217  if (p_ythread) {
218  ABTI_ythread_yield(&p_local_xstream, p_ythread,
219  ABT_SYNC_EVENT_TYPE_COND, (void *)p_cond);
220  p_local = ABTI_xstream_get_local(p_local_xstream);
221  } else {
222  ABTD_atomic_pause();
223  }
224  }
225  /* Lock the mutex again */
226  ABTI_mutex_lock(&p_local, p_mutex);
227  return ABT_SUCCESS;
228 }
229 
245 {
246  ABTI_local *p_local = ABTI_local_get_local();
247  ABTI_cond *p_cond = ABTI_cond_get_ptr(cond);
248  ABTI_CHECK_NULL_COND_PTR(p_cond);
249 
250  ABTI_spinlock_acquire(&p_cond->lock);
251 
252  if (p_cond->num_waiters == 0) {
253  ABTI_spinlock_release(&p_cond->lock);
254  return ABT_SUCCESS;
255  }
256 
257  /* Wake up the first waiting ULT */
258  ABTI_thread *p_thread = p_cond->p_head;
259 
260  p_cond->num_waiters--;
261  if (p_cond->num_waiters == 0) {
262  p_cond->p_waiter_mutex = NULL;
263  p_cond->p_head = NULL;
264  p_cond->p_tail = NULL;
265  } else {
266  p_thread->p_prev->p_next = p_thread->p_next;
267  p_thread->p_next->p_prev = p_thread->p_prev;
268  p_cond->p_head = p_thread->p_next;
269  }
270  p_thread->p_prev = NULL;
271  p_thread->p_next = NULL;
272 
273  ABTI_ythread *p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
274  if (p_ythread) {
275  ABTI_ythread_set_ready(p_local, p_ythread);
276  } else {
277  /* When the head is an external thread */
278  ABTD_atomic_release_store_int(&p_thread->state, ABT_THREAD_STATE_READY);
279  }
280 
281  ABTI_spinlock_release(&p_cond->lock);
282  return ABT_SUCCESS;
283 }
284 
299 {
300  ABTI_local *p_local = ABTI_local_get_local();
301  ABTI_cond *p_cond = ABTI_cond_get_ptr(cond);
302  ABTI_CHECK_NULL_COND_PTR(p_cond);
303 
304  ABTI_cond_broadcast(p_local, p_cond);
305  return ABT_SUCCESS;
306 }
#define ABT_ERR_INV_MUTEX
Definition: abt.h:84
int ABT_cond_create(ABT_cond *newcond) ABT_API_PUBLIC
Create a new condition variable.
Definition: cond.c:26
int ABT_cond_free(ABT_cond *cond) ABT_API_PUBLIC
Free the condition variable.
Definition: cond.c:50
#define ABT_ERR_COND_TIMEDOUT
Definition: abt.h:107
int ABT_cond_timedwait(ABT_cond cond, ABT_mutex mutex, const struct timespec *abstime) ABT_API_PUBLIC
Wait on the condition.
Definition: cond.c:157
int ABT_cond_wait(ABT_cond cond, ABT_mutex mutex) ABT_API_PUBLIC
Wait on the condition.
Definition: cond.c:81
struct ABT_mutex_opaque * ABT_mutex
Definition: abt.h:357
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:142
#define ABT_SUCCESS
Definition: abt.h:64
static void remove_thread(ABTI_cond *p_cond, ABTI_thread *p_thread)
Definition: cond.c:101
#define ABT_ERR_COND
Definition: abt.h:106
#define ABT_COND_NULL
Definition: abt.h:421
int ABT_cond_broadcast(ABT_cond cond) ABT_API_PUBLIC
Broadcast a condition.
Definition: cond.c:298
static double convert_timespec_to_sec(const struct timespec *p_ts)
Definition: cond.c:94
static void ABTU_free(void *ptr)
Definition: abtu.h:135
int ABT_cond_signal(ABT_cond cond) ABT_API_PUBLIC
Signal a condition.
Definition: cond.c:244
struct ABT_cond_opaque * ABT_cond
Definition: abt.h:361