ARGOBOTS  dce6e727ffc4ca5b3ffc04cb9517c6689be51ec5
abtd_futex.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 #ifndef ABT_CONFIG_ACTIVE_WAIT_POLICY
9 
10 #ifdef ABT_CONFIG_USE_LINUX_FUTEX
11 
12 /* Use Linux futex. */
13 #include <unistd.h>
14 #include <linux/futex.h>
15 #include <syscall.h>
16 
18  ABTD_spinlock *p_lock)
19 {
20  const int original_val = ABTD_atomic_relaxed_load_int(&p_futex->val);
21  ABTD_spinlock_release(p_lock);
22  do {
23  syscall(SYS_futex, &p_futex->val.val, FUTEX_WAIT_PRIVATE, original_val,
24  NULL, NULL, 0);
25  } while (ABTD_atomic_relaxed_load_int(&p_futex->val) == original_val);
26 }
27 
29  ABTD_spinlock *p_lock,
30  double wait_time_sec)
31 {
32  const int original_val = ABTD_atomic_relaxed_load_int(&p_futex->val);
33  ABTD_spinlock_release(p_lock);
34  struct timespec wait_time; /* This wait_time must be **relative**. */
35  wait_time.tv_sec = (time_t)wait_time_sec;
36  wait_time.tv_nsec =
37  (long)((wait_time_sec - (double)(time_t)wait_time_sec) * 1.0e9);
38  syscall(SYS_futex, &p_futex->val.val, FUTEX_WAIT_PRIVATE, original_val,
39  &wait_time, NULL, 0);
40 }
41 
43 {
44  int current_val = ABTD_atomic_relaxed_load_int(&p_futex->val);
45  ABTD_atomic_relaxed_store_int(&p_futex->val, current_val + 1);
46  syscall(SYS_futex, &p_futex->val.val, FUTEX_WAKE_PRIVATE, INT_MAX, NULL,
47  NULL, 0);
48 }
49 
51 {
52  /* Wake-up signal is 1. */
53  while (ABTD_atomic_acquire_load_int(&p_futex->val) == 0) {
54  syscall(SYS_futex, &p_futex->val.val, FUTEX_WAIT_PRIVATE, 0, NULL, NULL,
55  0);
56  }
57  /* Resumed by ABTD_futex_resume() */
58 }
59 
61 {
62  ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_futex->val) == 0);
63  /* Write 1 and wake the waiter up. */
64  ABTD_atomic_release_store_int(&p_futex->val, 1);
65  syscall(SYS_futex, &p_futex->val.val, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
66 }
67 
68 #else /* ABT_CONFIG_USE_LINUX_FUTEX */
69 
70 /* Use Pthreads. */
71 #include <pthread.h>
72 
73 typedef struct pthread_sync {
74  pthread_mutex_t mutex;
75  pthread_cond_t cond;
79 } pthread_sync;
80 
81 #define PTHREAD_SYNC_STATIC_INITIALIZER \
82  { \
83  PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, NULL, NULL, \
84  ABTD_ATOMIC_INT_STATIC_INITIALIZER(0), \
85  }
86 
88  ABTD_spinlock *p_lock)
89 {
91  pthread_mutex_lock(&sync_obj.mutex);
92  /* This p_next updates must be done "after" taking mutex but "before"
93  * releasing p_lock. */
94  pthread_sync *p_next = (pthread_sync *)p_futex->p_next;
95  if (p_next)
96  p_next->p_prev = &sync_obj;
97  sync_obj.p_next = p_next;
98  p_futex->p_next = (void *)&sync_obj;
99  ABTD_spinlock_release(p_lock);
100  while (ABTD_atomic_relaxed_load_int(&sync_obj.val) == 0) {
101  pthread_cond_wait(&sync_obj.cond, &sync_obj.mutex);
102  }
103  /* I cannot find whether a statically initialized mutex must be unlocked
104  * before it gets out of scope or not, but let's choose a safer way. */
105  pthread_mutex_unlock(&sync_obj.mutex);
106 
107  /* Since now val is 1, there's no possibility that the signaler is still
108  * touching this sync_obj. sync_obj can be safely released by exiting this
109  * function. Note that it seems that Linux does not need such for
110  * statically allocated pthread_mutex_t and pthread_cond_t, FreeBSD
111  * dynamically allocates memory and thus leaks memory if we do not destroy
112  * those objects. Let's choose a safer option. */
113  pthread_cond_destroy(&sync_obj.cond);
114  pthread_mutex_destroy(&sync_obj.mutex);
115 }
116 
118  ABTD_spinlock *p_lock,
119  double wait_time_sec)
120 {
122 
123  struct timespec wait_time; /* This time must be **relative**. */
124  clock_gettime(CLOCK_REALTIME, &wait_time);
125  wait_time.tv_sec += (time_t)wait_time_sec;
126  wait_time.tv_nsec +=
127  (long)((wait_time_sec - (double)(time_t)wait_time_sec) * 1.0e9);
128  if (wait_time.tv_nsec >= 1e9) {
129  wait_time.tv_sec += 1;
130  wait_time.tv_nsec -= 1e9;
131  }
132  pthread_mutex_lock(&sync_obj.mutex);
133  /* This p_next updates must be done "after" taking mutex but "before"
134  * releasing p_lock. */
135  pthread_sync *p_next = (pthread_sync *)p_futex->p_next;
136  if (p_next)
137  p_next->p_prev = &sync_obj;
138  sync_obj.p_next = p_next;
139  p_futex->p_next = (void *)&sync_obj;
140  ABTD_spinlock_release(p_lock);
141  pthread_cond_timedwait(&sync_obj.cond, &sync_obj.mutex, &wait_time);
142 
143  /* I cannot find whether a statically initialized mutex must be unlocked
144  * before it gets out of scope or not, but let's choose a safer way. */
145  pthread_mutex_unlock(&sync_obj.mutex);
146 
147  if (ABTD_atomic_acquire_load_int(&sync_obj.val) != 0) {
148  /* Since now val is 1, there's no possibility that the signaler is still
149  * touching sync_obj. sync_obj can be safely released by exiting this
150  * function. */
151  } else {
152  /* Maybe this sync_obj is being touched by the signaler. Take a lock
153  * and remove it from the list. */
154  ABTD_spinlock_acquire(p_lock);
155  /* Double check the value in a lock. */
156  if (ABTD_atomic_acquire_load_int(&sync_obj.val) == 0) {
157  /* timedout or spurious wakeup happens. Remove this sync_obj from
158  * p_futex carefully. */
159  if (p_futex->p_next == (void *)&sync_obj) {
160  p_futex->p_next = (void *)sync_obj.p_next;
161  } else {
162  ABTI_ASSERT(sync_obj.p_prev);
163  sync_obj.p_prev->p_next = sync_obj.p_next;
164  sync_obj.p_next->p_prev = sync_obj.p_prev;
165  }
166  }
167  ABTD_spinlock_release(p_lock);
168  }
169  /* Free sync_obj. */
170  pthread_cond_destroy(&sync_obj.cond);
171  pthread_mutex_destroy(&sync_obj.mutex);
172 }
173 
175 {
176  /* The caller must be holding a lock (p_lock above). */
177  pthread_sync *p_cur = (pthread_sync *)p_futex->p_next;
178  while (p_cur) {
179  pthread_sync *p_next = p_cur->p_next;
180  pthread_mutex_lock(&p_cur->mutex);
182  pthread_cond_broadcast(&p_cur->cond);
183  pthread_mutex_unlock(&p_cur->mutex);
184  /* After "val" is updated and the mutex is unlocked, that pthread_sync
185  * may not be touched. */
186  p_cur = p_next;
187  }
188  p_futex->p_next = NULL;
189 }
190 
192 {
193  if (ABTD_atomic_acquire_load_ptr(&p_futex->p_sync_obj) != NULL) {
194  /* Always resumed (invalid_ptr has be written). No need to wait. */
195  return;
196  }
198  pthread_mutex_lock(&sync_obj.mutex);
199  /* Use strong since either suspend or resume must succeed if those two are
200  * executed concurrently. */
201  if (ABTD_atomic_bool_cas_strong_ptr(&p_futex->p_sync_obj, NULL,
202  (void *)&sync_obj)) {
203  /* This thread needs to wait on this. The outer loop is needed to avoid
204  * spurious wakeup. */
205  while (ABTD_atomic_relaxed_load_int(&sync_obj.val) == 0)
206  pthread_cond_wait(&sync_obj.cond, &sync_obj.mutex);
207  } else {
208  /* It seems that this futex has already been resumed. */
209  }
210  pthread_mutex_unlock(&sync_obj.mutex);
211  /* Resumed by ABTD_futex_resume(). Free sync_obj. */
212  pthread_cond_destroy(&sync_obj.cond);
213  pthread_mutex_destroy(&sync_obj.mutex);
214 }
215 
217 {
218  pthread_sync *p_sync_obj =
220  if (!p_sync_obj) {
221  /* Try to use CAS to notify a waiter that this is "resumed" */
222  void *invalid_ptr = (void *)((intptr_t)1);
223  void *ret_val = ABTD_atomic_val_cas_strong_ptr(&p_futex->p_sync_obj,
224  NULL, invalid_ptr);
225  if (ret_val == NULL) {
226  /* CAS succeeded. Resumed. This thread should not touch this
227  * sync_obj. */
228  return;
229  }
230  /* p_next has been updated by the waiter. Let's wake him up. */
231  p_sync_obj = (pthread_sync *)ret_val;
232  }
233  pthread_mutex_lock(&p_sync_obj->mutex);
234  /* After setting value 1 and unlock the mutex, sync_obj will be freed
235  * immediately. */
236  ABTD_atomic_relaxed_store_int(&p_sync_obj->val, 1);
237  pthread_cond_signal(&p_sync_obj->cond);
238  pthread_mutex_unlock(&p_sync_obj->mutex);
239 }
240 
241 #endif /* !ABT_CONFIG_USE_LINUX_FUTEX */
242 
243 #endif /* !ABT_CONFIG_ACTIVE_WAIT_POLICY */
ABTD_atomic_int
Definition: abtd_atomic.h:15
ABTD_atomic_val_cas_strong_ptr
static void * ABTD_atomic_val_cas_strong_ptr(ABTD_atomic_ptr *ptr, void *oldv, void *newv)
Definition: abtd_atomic.h:337
pthread_sync::p_prev
struct pthread_sync * p_prev
Definition: abtd_futex.c:77
ABTD_futex_resume
void ABTD_futex_resume(ABTD_futex_single *p_futex)
Definition: abtd_futex.c:216
ABTD_futex_suspend
void ABTD_futex_suspend(ABTD_futex_single *p_futex)
Definition: abtd_futex.c:191
ABTD_spinlock
Definition: abtd_spinlock.h:9
abti.h
ABTD_atomic_bool_cas_strong_ptr
static int ABTD_atomic_bool_cas_strong_ptr(ABTD_atomic_ptr *ptr, void *oldv, void *newv)
Definition: abtd_atomic.h:423
ABTD_atomic_relaxed_load_int
static int ABTD_atomic_relaxed_load_int(const ABTD_atomic_int *ptr)
Definition: abtd_atomic.h:763
ABTD_spinlock_acquire
static void ABTD_spinlock_acquire(ABTD_spinlock *p_lock)
Definition: abtd_spinlock.h:28
ABTD_atomic_relaxed_store_int
static void ABTD_atomic_relaxed_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:996
pthread_sync::p_next
struct pthread_sync * p_next
Definition: abtd_futex.c:76
ABTD_futex_multiple
Definition: abtd_futex.h:75
pthread_sync::val
ABTD_atomic_int val
Definition: abtd_futex.c:78
ABTI_ASSERT
#define ABTI_ASSERT(cond)
Definition: abti_error.h:12
ABTD_futex_single
Definition: abtd_futex.h:84
pthread_sync::cond
pthread_cond_t cond
Definition: abtd_futex.c:75
ABTD_atomic_acquire_load_int
static int ABTD_atomic_acquire_load_int(const ABTD_atomic_int *ptr)
Definition: abtd_atomic.h:878
ABTD_futex_wait_and_unlock
void ABTD_futex_wait_and_unlock(ABTD_futex_multiple *p_futex, ABTD_spinlock *p_lock)
Definition: abtd_futex.c:87
ABTD_spinlock_release
static void ABTD_spinlock_release(ABTD_spinlock *p_lock)
Definition: abtd_spinlock.h:42
ABTD_futex_single::p_sync_obj
ABTD_atomic_ptr p_sync_obj
Definition: abtd_futex.h:85
pthread_sync::mutex
pthread_mutex_t mutex
Definition: abtd_futex.c:74
ABTD_futex_multiple::p_next
void * p_next
Definition: abtd_futex.h:76
pthread_sync
struct pthread_sync pthread_sync
ABTD_atomic_acquire_load_ptr
static void * ABTD_atomic_acquire_load_ptr(const ABTD_atomic_ptr *ptr)
Definition: abtd_atomic.h:979
ABTD_futex_timedwait_and_unlock
void ABTD_futex_timedwait_and_unlock(ABTD_futex_multiple *p_futex, ABTD_spinlock *p_lock, double wait_time_sec)
Definition: abtd_futex.c:117
pthread_sync
Definition: abtd_futex.c:73
ABTD_atomic_release_store_int
static void ABTD_atomic_release_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:1065
ABTD_futex_broadcast
void ABTD_futex_broadcast(ABTD_futex_multiple *p_futex)
Definition: abtd_futex.c:174
PTHREAD_SYNC_STATIC_INITIALIZER
#define PTHREAD_SYNC_STATIC_INITIALIZER
Definition: abtd_futex.c:81