ARGOBOTS  dce6e727ffc4ca5b3ffc04cb9517c6689be51ec5
abti_waitlist.h
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #ifndef ABTI_WAITLIST_H_INCLUDED
7 #define ABTI_WAITLIST_H_INCLUDED
8 
9 #include "abt_config.h"
10 
11 static inline void ABTI_waitlist_init(ABTI_waitlist *p_waitlist)
12 {
13 #ifndef ABT_CONFIG_ACTIVE_WAIT_POLICY
14  ABTD_futex_multiple_init(&p_waitlist->futex);
15 #endif
16  p_waitlist->p_head = NULL;
17  p_waitlist->p_tail = NULL;
18 }
19 
20 static inline void
21 ABTI_waitlist_wait_and_unlock(ABTI_local **pp_local, ABTI_waitlist *p_waitlist,
22  ABTD_spinlock *p_lock,
23  ABT_sync_event_type sync_event_type, void *p_sync)
24 {
25  ABTI_ASSERT(ABTD_spinlock_is_locked(p_lock) == ABT_TRUE);
26  ABTI_ythread *p_ythread = NULL;
27  ABTI_xstream *p_local_xstream = ABTI_local_get_xstream_or_null(*pp_local);
28  if (!ABTI_IS_EXT_THREAD_ENABLED || p_local_xstream) {
29  p_ythread = ABTI_thread_get_ythread_or_null(p_local_xstream->p_thread);
30  }
31  if (!p_ythread) {
32  /* External thread or non-yieldable thread. */
33  ABTI_thread thread;
34  thread.type = ABTI_THREAD_TYPE_EXT;
35  /* use state for synchronization */
36  ABTD_atomic_relaxed_store_int(&thread.state, ABT_THREAD_STATE_BLOCKED);
37  /* Add thread to the list. */
38  thread.p_next = NULL;
39  if (p_waitlist->p_head == NULL) {
40  p_waitlist->p_head = &thread;
41  } else {
42  p_waitlist->p_tail->p_next = &thread;
43  }
44  p_waitlist->p_tail = &thread;
45 
46  /* Non-yieldable thread is waiting here. */
47 #ifdef ABT_CONFIG_ACTIVE_WAIT_POLICY
48  ABTD_spinlock_release(p_lock);
49  while (ABTD_atomic_acquire_load_int(&thread.state) !=
51  ;
52 #else
53  while (1) {
54  /* While taking a lock check if this thread is not ready. This is
55  * necessary to sleep while ready; otherwise deadlock. */
56  if (ABTD_atomic_relaxed_load_int(&thread.state) ==
58  ABTD_spinlock_release(p_lock);
59  break;
60  }
61  ABTD_futex_wait_and_unlock(&p_waitlist->futex, p_lock);
62 
63  /* Quick check. */
64  if (ABTD_atomic_acquire_load_int(&thread.state) ==
66  break;
67 
68  /* Take a lock again. */
69  ABTD_spinlock_acquire(p_lock);
70  }
71 #endif
72  } else {
73  /* Add p_thread to the list. */
74  p_ythread->thread.p_next = NULL;
75  if (p_waitlist->p_head == NULL) {
76  p_waitlist->p_head = &p_ythread->thread;
77  } else {
78  p_waitlist->p_tail->p_next = &p_ythread->thread;
79  }
80  p_waitlist->p_tail = &p_ythread->thread;
81 
82  /* Suspend the current ULT */
83  ABTI_ythread_suspend_unlock(&p_local_xstream, p_ythread, p_lock,
84  sync_event_type, p_sync);
85  /* Resumed. */
86  *pp_local = ABTI_xstream_get_local(p_local_xstream);
87  }
88 }
89 
90 /* Return ABT_TRUE if timed out. */
91 static inline ABT_bool ABTI_waitlist_wait_timedout_and_unlock(
92  ABTI_local **pp_local, ABTI_waitlist *p_waitlist, ABTD_spinlock *p_lock,
93  double target_time, ABT_sync_event_type sync_event_type, void *p_sync)
94 {
95  ABTI_ASSERT(ABTD_spinlock_is_locked(p_lock) == ABT_TRUE);
96  ABTI_ythread *p_ythread = NULL;
97  ABTI_xstream *p_local_xstream = ABTI_local_get_xstream_or_null(*pp_local);
98  if (!ABTI_IS_EXT_THREAD_ENABLED || p_local_xstream)
99  p_ythread = ABTI_thread_get_ythread_or_null(p_local_xstream->p_thread);
100 
101  /* Always use a dummy thread. */
102  ABTI_thread thread;
103  thread.type = ABTI_THREAD_TYPE_EXT;
104  /* use state for synchronization */
105  ABTD_atomic_relaxed_store_int(&thread.state, ABT_THREAD_STATE_BLOCKED);
106 
107  /* Add p_thread to the list. This implementation is tricky since this
108  * updates p_prev as well for removal on timeout while the other functions
109  * (e.g., wait, broadcast, signal) do not update it. */
110  thread.p_next = NULL;
111  if (p_waitlist->p_head == NULL) {
112  p_waitlist->p_head = &thread;
113  thread.p_prev = NULL;
114  } else {
115  p_waitlist->p_tail->p_next = &thread;
116  thread.p_prev = p_waitlist->p_tail;
117  }
118  p_waitlist->p_tail = &thread;
119 
120  /* Waiting here. */
121  if (p_ythread) {
122  /* When an underlying entity is yieldable. */
123  ABTD_spinlock_release(p_lock);
124  while (ABTD_atomic_acquire_load_int(&thread.state) !=
126  double cur_time = ABTI_get_wtime();
127  if (cur_time >= target_time) {
128  ABTD_spinlock_acquire(p_lock);
129  goto timeout;
130  }
131  ABTI_ythread_yield(&p_local_xstream, p_ythread,
132  ABTI_YTHREAD_YIELD_KIND_YIELD_LOOP,
133  sync_event_type, p_sync);
134  *pp_local = ABTI_xstream_get_local(p_local_xstream);
135  }
136  } else {
137  /* When an underlying entity is non-yieldable. */
138 #ifdef ABT_CONFIG_ACTIVE_WAIT_POLICY
139  ABTD_spinlock_release(p_lock);
140  while (ABTD_atomic_acquire_load_int(&thread.state) !=
142  double cur_time = ABTI_get_wtime();
143  if (cur_time >= target_time) {
144  ABTD_spinlock_acquire(p_lock);
145  goto timeout;
146  }
147  }
148 #else
149  while (1) {
150  double cur_time = ABTI_get_wtime();
151  if (cur_time >= target_time) {
152  goto timeout;
153  }
154  /* While taking a lock check if this thread is not ready. This is
155  * necessary to sleep while ready; otherwise deadlock. */
156  if (ABTD_atomic_relaxed_load_int(&thread.state) ==
158  ABTD_spinlock_release(p_lock);
159  break;
160  }
161  ABTD_futex_timedwait_and_unlock(&p_waitlist->futex, p_lock,
162  target_time - cur_time);
163  /* Quick check. */
164  if (ABTD_atomic_acquire_load_int(&thread.state) ==
166  break;
167  /* Take a lock again. */
168  ABTD_spinlock_acquire(p_lock);
169  }
170 #endif
171  }
172  /* Singled */
173  return ABT_FALSE;
174 timeout:
175  /* Timeout. Remove this thread if not signaled even after taking a lock. */
176  ABTI_ASSERT(ABTD_spinlock_is_locked(p_lock) == ABT_TRUE);
177  ABT_bool is_timedout =
178  (ABTD_atomic_relaxed_load_int(&thread.state) != ABT_THREAD_STATE_READY)
179  ? ABT_TRUE
180  : ABT_FALSE;
181  if (is_timedout) {
182  /* This thread is still in the list. */
183  if (p_waitlist->p_head == &thread) {
184  /* thread is a head. */
185  /* Note that thread->p_prev cannot be used to check whether
186  * thread is a head or not because signal and broadcast do
187  * not modify thread->p_prev. */
188  p_waitlist->p_head = thread.p_next;
189  if (!thread.p_next) {
190  /* This thread is p_tail */
191  ABTI_ASSERT(p_waitlist->p_tail == &thread);
192  p_waitlist->p_tail = NULL;
193  }
194  } else {
195  /* thread is not a head and thus p_prev exists. */
196  ABTI_ASSERT(thread.p_prev);
197  thread.p_prev->p_next = thread.p_next;
198  if (thread.p_next && thread.type == ABTI_THREAD_TYPE_EXT) {
199  /* Only a dummy external thread created by this function
200  * checks p_prev. Note that a real external thread is
201  * also dummy, so updating p_prev is allowed. */
202  thread.p_next->p_prev = thread.p_prev;
203  } else {
204  /* This thread is p_tail */
205  ABTI_ASSERT(p_waitlist->p_tail == &thread);
206  p_waitlist->p_tail = thread.p_prev;
207  }
208  }
209  /* We do not need to modify thread->p_prev and p_next since this
210  * dummy thread is no longer used. */
211  }
212  ABTD_spinlock_release(p_lock);
213  return is_timedout;
214 }
215 
216 static inline void ABTI_waitlist_signal(ABTI_local *p_local,
217  ABTI_waitlist *p_waitlist)
218 {
219  ABTI_thread *p_thread = p_waitlist->p_head;
220  if (p_thread) {
221  ABTI_thread *p_next = p_thread->p_next;
222  p_thread->p_next = NULL;
223 
224  ABTI_ythread *p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
225  if (p_ythread) {
226  ABTI_ythread_resume_and_push(p_local, p_ythread);
227  } else {
228  /* When p_thread is an external thread or a tasklet */
229  ABTD_atomic_release_store_int(&p_thread->state,
231 #ifndef ABT_CONFIG_ACTIVE_WAIT_POLICY
232  /* There's no way to selectively wake up threads. Let's just
233  * wake up all the threads. They will sleep again since their
234  * states are not marked as READY. */
235  ABTD_futex_broadcast(&p_waitlist->futex);
236 #endif
237  }
238  /* After updating p_thread->state, p_thread can be updated and
239  * freed. */
240  p_waitlist->p_head = p_next;
241  if (!p_next)
242  p_waitlist->p_tail = NULL;
243  }
244 }
245 
246 static inline void ABTI_waitlist_broadcast(ABTI_local *p_local,
247  ABTI_waitlist *p_waitlist)
248 {
249  ABTI_thread *p_thread = p_waitlist->p_head;
250  if (p_thread) {
251  ABT_bool wakeup_nonyieldable = ABT_FALSE;
252  do {
253  ABTI_thread *p_next = p_thread->p_next;
254  p_thread->p_next = NULL;
255 
256  ABTI_ythread *p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
257  if (p_ythread) {
258  ABTI_ythread_resume_and_push(p_local, p_ythread);
259  } else {
260  /* When p_thread is an external thread or a tasklet */
261  wakeup_nonyieldable = ABT_TRUE;
262  ABTD_atomic_release_store_int(&p_thread->state,
264  }
265  /* After updating p_thread->state, p_thread can be updated and
266  * freed. */
267  p_thread = p_next;
268  } while (p_thread);
269  p_waitlist->p_head = NULL;
270  p_waitlist->p_tail = NULL;
271 #ifndef ABT_CONFIG_ACTIVE_WAIT_POLICY
272  if (wakeup_nonyieldable) {
273  ABTD_futex_broadcast(&p_waitlist->futex);
274  }
275 #else
276  /* Do nothing. */
277  (void)wakeup_nonyieldable;
278 #endif
279  }
280 }
281 
282 static inline ABT_bool ABTI_waitlist_is_empty(ABTI_waitlist *p_waitlist)
283 {
284  return p_waitlist->p_head ? ABT_FALSE : ABT_TRUE;
285 }
286 
287 #endif /* ABTI_WAITLIST_H_INCLUDED */
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1043
ABT_THREAD_STATE_READY
@ ABT_THREAD_STATE_READY
Definition: abt.h:427
ABT_sync_event_type
ABT_sync_event_type
Type of synchronization event.
Definition: abt.h:696
ABT_THREAD_STATE_BLOCKED
@ ABT_THREAD_STATE_BLOCKED
Definition: abt.h:431
abt_config.h
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:784
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:786