ARGOBOTS
abti_cond.h
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #ifndef ABTI_COND_H_INCLUDED
7 #define ABTI_COND_H_INCLUDED
8 
9 #include "abti_mutex.h"
10 
11 /* Inlined functions for Condition Variable */
12 
13 static inline void ABTI_cond_init(ABTI_cond *p_cond)
14 {
15  ABTI_spinlock_clear(&p_cond->lock);
16  p_cond->p_waiter_mutex = NULL;
17  p_cond->num_waiters = 0;
18  p_cond->p_head = NULL;
19  p_cond->p_tail = NULL;
20 }
21 
22 static inline void ABTI_cond_fini(ABTI_cond *p_cond)
23 {
24  /* The lock needs to be acquired to safely free the condition structure.
25  * However, we do not have to unlock it because the entire structure is
26  * freed here. */
27  ABTI_spinlock_acquire(&p_cond->lock);
28 }
29 
30 static inline ABTI_cond *ABTI_cond_get_ptr(ABT_cond cond)
31 {
32 #ifndef ABT_CONFIG_DISABLE_ERROR_CHECK
33  ABTI_cond *p_cond;
34  if (cond == ABT_COND_NULL) {
35  p_cond = NULL;
36  } else {
37  p_cond = (ABTI_cond *)cond;
38  }
39  return p_cond;
40 #else
41  return (ABTI_cond *)cond;
42 #endif
43 }
44 
45 static inline ABT_cond ABTI_cond_get_handle(ABTI_cond *p_cond)
46 {
47 #ifndef ABT_CONFIG_DISABLE_ERROR_CHECK
48  ABT_cond h_cond;
49  if (p_cond == NULL) {
50  h_cond = ABT_COND_NULL;
51  } else {
52  h_cond = (ABT_cond)p_cond;
53  }
54  return h_cond;
55 #else
56  return (ABT_cond)p_cond;
57 #endif
58 }
59 
60 static inline int ABTI_cond_wait(ABTI_local **pp_local, ABTI_cond *p_cond,
61  ABTI_mutex *p_mutex)
62 {
63  int abt_errno = ABT_SUCCESS;
64 
65  ABTI_local *p_local = *pp_local;
66  ABTI_thread *p_thread;
67  ABTI_unit *p_unit;
68  ABT_unit_type type;
69  ABTD_atomic_int32 ext_signal = ABTD_ATOMIC_INT32_STATIC_INITIALIZER(0);
70 
71  if (p_local != NULL) {
72  p_thread = p_local->p_thread;
73  ABTI_CHECK_TRUE(p_thread != NULL, ABT_ERR_COND);
74 
75  type = ABT_UNIT_TYPE_THREAD;
76  p_unit = &p_thread->unit_def;
77  p_unit->handle.thread = ABTI_thread_get_handle(p_thread);
78  p_unit->type = type;
79  } else {
80  /* external thread */
81  type = ABT_UNIT_TYPE_EXT;
82  p_unit = (ABTI_unit *)ABTU_calloc(1, sizeof(ABTI_unit));
83  /* Check size if ext_signal can be stored in p_unit->handle.thread. */
84  ABTI_STATIC_ASSERT(sizeof(ext_signal) <= sizeof(p_unit->handle.thread));
85  p_unit->handle.thread = (ABT_thread)&ext_signal;
86  p_unit->type = type;
87  }
88 
89  ABTI_spinlock_acquire(&p_cond->lock);
90 
91  if (p_cond->p_waiter_mutex == NULL) {
92  p_cond->p_waiter_mutex = p_mutex;
93  } else {
94  ABT_bool result = ABTI_mutex_equal(p_cond->p_waiter_mutex, p_mutex);
95  if (result == ABT_FALSE) {
96  ABTI_spinlock_release(&p_cond->lock);
97  if (type == ABT_UNIT_TYPE_EXT)
98  ABTU_free(p_unit);
99  abt_errno = ABT_ERR_INV_MUTEX;
100  goto fn_fail;
101  }
102  }
103 
104  if (p_cond->num_waiters == 0) {
105  p_unit->p_prev = p_unit;
106  p_unit->p_next = p_unit;
107  p_cond->p_head = p_unit;
108  p_cond->p_tail = p_unit;
109  } else {
110  p_cond->p_tail->p_next = p_unit;
111  p_cond->p_head->p_prev = p_unit;
112  p_unit->p_prev = p_cond->p_tail;
113  p_unit->p_next = p_cond->p_head;
114  p_cond->p_tail = p_unit;
115  }
116 
117  p_cond->num_waiters++;
118 
119  if (type == ABT_UNIT_TYPE_THREAD) {
120  /* Change the ULT's state to BLOCKED */
121  ABTI_thread_set_blocked(p_thread);
122 
123  ABTI_spinlock_release(&p_cond->lock);
124 
125  /* Unlock the mutex that the calling ULT is holding */
126  /* FIXME: should check if mutex was locked by the calling ULT */
127  ABTI_mutex_unlock(p_local, p_mutex);
128 
129  /* Suspend the current ULT */
130  ABTI_thread_suspend(pp_local, p_thread);
131 
132  } else { /* TYPE == ABT_UNIT_TYPE_EXT */
133  ABTI_spinlock_release(&p_cond->lock);
134  ABTI_mutex_unlock(p_local, p_mutex);
135 
136  /* External thread is waiting here polling ext_signal. */
137  /* FIXME: need a better implementation */
138  while (!ABTD_atomic_acquire_load_int32(&ext_signal))
139  ;
140  ABTU_free(p_unit);
141  }
142 
143  /* Lock the mutex again */
144  ABTI_mutex_lock(pp_local, p_mutex);
145 
146 fn_exit:
147  return abt_errno;
148 
149 fn_fail:
150  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
151  goto fn_exit;
152 }
153 
154 static inline void ABTI_cond_broadcast(ABTI_local *p_local, ABTI_cond *p_cond)
155 {
156  ABTI_spinlock_acquire(&p_cond->lock);
157 
158  if (p_cond->num_waiters == 0) {
159  ABTI_spinlock_release(&p_cond->lock);
160  return;
161  }
162 
163  /* Wake up all waiting ULTs */
164  ABTI_unit *p_head = p_cond->p_head;
165  ABTI_unit *p_unit = p_head;
166  while (1) {
167  ABTI_unit *p_next = p_unit->p_next;
168 
169  p_unit->p_prev = NULL;
170  p_unit->p_next = NULL;
171 
172  if (p_unit->type == ABT_UNIT_TYPE_THREAD) {
173  ABTI_thread *p_thread = ABTI_thread_get_ptr(p_unit->handle.thread);
174  ABTI_thread_set_ready(p_local, p_thread);
175  } else {
176  /* When the head is an external thread */
177  ABTD_atomic_int32 *p_ext_signal =
178  (ABTD_atomic_int32 *)p_unit->handle.thread;
179  ABTD_atomic_release_store_int32(p_ext_signal, 1);
180  }
181 
182  /* Next ULT */
183  if (p_next != p_head) {
184  p_unit = p_next;
185  } else {
186  break;
187  }
188  }
189 
190  p_cond->p_waiter_mutex = NULL;
191  p_cond->num_waiters = 0;
192  p_cond->p_head = NULL;
193  p_cond->p_tail = NULL;
194 
195  ABTI_spinlock_release(&p_cond->lock);
196 }
197 
198 #endif /* ABTI_COND_H_INCLUDED */
#define ABT_ERR_INV_MUTEX
Definition: abt.h:84
int ABT_bool
Definition: abt.h:309
#define ABT_FALSE
Definition: abt.h:224
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:279
#define HANDLE_ERROR_FUNC_WITH_CODE(n)
Definition: abti_error.h:241
#define ABT_SUCCESS
Definition: abt.h:64
ABT_unit_type
Definition: abt.h:170
#define ABT_ERR_COND
Definition: abt.h:105
#define ABT_COND_NULL
Definition: abt.h:350
static void ABTU_free(void *ptr)
Definition: abtu.h:32
static void * ABTU_calloc(size_t num, size_t size)
Definition: abtu.h:49
struct ABT_cond_opaque * ABT_cond
Definition: abt.h:297