ARGOBOTS  1.1b1
fifo_wait.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 /* FIFO_WAIT pool implementation */
9 
10 static int pool_init(ABT_pool pool, ABT_pool_config config);
11 static int pool_free(ABT_pool pool);
12 static size_t pool_get_size(ABT_pool pool);
13 static void pool_push(ABT_pool pool, ABT_unit unit);
14 static ABT_unit pool_pop(ABT_pool pool);
15 static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs);
16 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs);
17 static int pool_remove(ABT_pool pool, ABT_unit unit);
18 static int pool_print_all(ABT_pool pool, void *arg,
19  void (*print_fn)(void *, ABT_unit));
21 static ABT_bool unit_is_in_pool(ABT_unit unit);
23 static void unit_free(ABT_unit *unit);
24 
25 struct data {
26  pthread_mutex_t mutex;
27  pthread_cond_t cond;
28  size_t num_threads;
31 };
32 typedef struct data data_t;
33 
34 static inline data_t *pool_get_data_ptr(void *p_data)
35 {
36  return (data_t *)p_data;
37 }
38 
40  ABTI_pool_def *p_def)
41 {
42  p_def->access = access;
43  p_def->p_init = pool_init;
44  p_def->p_free = pool_free;
45  p_def->p_get_size = pool_get_size;
46  p_def->p_push = pool_push;
47  p_def->p_pop = pool_pop;
48  p_def->p_pop_wait = pool_pop_wait;
50  p_def->p_remove = pool_remove;
51  p_def->p_print_all = pool_print_all;
55  p_def->u_free = unit_free;
56 
57  return ABT_SUCCESS;
58 }
59 
60 /* Pool functions */
61 
62 static int pool_init(ABT_pool pool, ABT_pool_config config)
63 {
64  ABTI_UNUSED(config);
65  int abt_errno = ABT_SUCCESS;
66  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
67 
68  data_t *p_data;
69  abt_errno = ABTU_malloc(sizeof(data_t), (void **)&p_data);
70  ABTI_CHECK_ERROR(abt_errno);
71 
72  pthread_mutex_init(&p_data->mutex, NULL);
73  pthread_cond_init(&p_data->cond, NULL);
74 
75  p_data->num_threads = 0;
76  p_data->p_head = NULL;
77  p_data->p_tail = NULL;
78 
79  p_pool->data = p_data;
80 
81  return abt_errno;
82 }
83 
84 static int pool_free(ABT_pool pool)
85 {
86  int abt_errno = ABT_SUCCESS;
87  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
88  data_t *p_data = pool_get_data_ptr(p_pool->data);
89 
90  pthread_mutex_destroy(&p_data->mutex);
91  pthread_cond_destroy(&p_data->cond);
92  ABTU_free(p_data);
93 
94  return abt_errno;
95 }
96 
97 static size_t pool_get_size(ABT_pool pool)
98 {
99  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
100  data_t *p_data = pool_get_data_ptr(p_pool->data);
101  return p_data->num_threads;
102 }
103 
104 static void pool_push(ABT_pool pool, ABT_unit unit)
105 {
106  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
107  data_t *p_data = pool_get_data_ptr(p_pool->data);
108  ABTI_thread *p_thread = (ABTI_thread *)unit;
109 
110  pthread_mutex_lock(&p_data->mutex);
111  if (p_data->num_threads == 0) {
112  p_thread->p_prev = p_thread;
113  p_thread->p_next = p_thread;
114  p_data->p_head = p_thread;
115  p_data->p_tail = p_thread;
116  } else {
117  ABTI_thread *p_head = p_data->p_head;
118  ABTI_thread *p_tail = p_data->p_tail;
119  p_tail->p_next = p_thread;
120  p_head->p_prev = p_thread;
121  p_thread->p_prev = p_tail;
122  p_thread->p_next = p_head;
123  p_data->p_tail = p_thread;
124  }
125  p_data->num_threads++;
126 
128  pthread_cond_signal(&p_data->cond);
129  pthread_mutex_unlock(&p_data->mutex);
130 }
131 
132 static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs)
133 {
134  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
135  data_t *p_data = pool_get_data_ptr(p_pool->data);
136  ABTI_thread *p_thread = NULL;
137  ABT_unit h_unit = ABT_UNIT_NULL;
138 
139  pthread_mutex_lock(&p_data->mutex);
140 
141  if (!p_data->num_threads) {
142 #if defined(ABT_CONFIG_USE_CLOCK_GETTIME)
143  struct timespec ts;
144  clock_gettime(CLOCK_REALTIME, &ts);
145  ts.tv_sec += (time_t)time_secs;
146  ts.tv_nsec += (long)((time_secs - (time_t)time_secs) * 1e9);
147  if (ts.tv_nsec > 1e9) {
148  ts.tv_sec += 1;
149  ts.tv_nsec -= 1e9;
150  }
151  pthread_cond_timedwait(&p_data->cond, &p_data->mutex, &ts);
152 #else
153  /* We cannot use pthread_cond_timedwait(). Let's use nanosleep()
154  * instead */
155  double start_time = ABTI_get_wtime();
156  while (ABTI_get_wtime() - start_time < time_secs) {
157  pthread_mutex_unlock(&p_data->mutex);
158  const int sleep_nsecs = 100;
159  struct timespec ts = { 0, sleep_nsecs };
160  nanosleep(&ts, NULL);
161  pthread_mutex_lock(&p_data->mutex);
162  if (p_data->num_threads > 0)
163  break;
164  }
165 #endif
166  }
167 
168  if (p_data->num_threads > 0) {
169  p_thread = p_data->p_head;
170  if (p_data->num_threads == 1) {
171  p_data->p_head = NULL;
172  p_data->p_tail = NULL;
173  } else {
174  p_thread->p_prev->p_next = p_thread->p_next;
175  p_thread->p_next->p_prev = p_thread->p_prev;
176  p_data->p_head = p_thread->p_next;
177  }
178  p_data->num_threads--;
179 
180  p_thread->p_prev = NULL;
181  p_thread->p_next = NULL;
183 
184  h_unit = (ABT_unit)p_thread;
185  }
186  pthread_mutex_unlock(&p_data->mutex);
187 
188  return h_unit;
189 }
190 
191 static inline void convert_double_sec_to_timespec(struct timespec *ts_out,
192  double seconds)
193 {
194  ts_out->tv_sec = (time_t)seconds;
195  ts_out->tv_nsec = (long)((seconds - ts_out->tv_sec) * 1000000000.0);
196 }
197 
198 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
199 {
200  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
201  data_t *p_data = pool_get_data_ptr(p_pool->data);
202  ABTI_thread *p_thread = NULL;
203  ABT_unit h_unit = ABT_UNIT_NULL;
204 
205  pthread_mutex_lock(&p_data->mutex);
206 
207  if (!p_data->num_threads) {
208  struct timespec ts;
209  convert_double_sec_to_timespec(&ts, abstime_secs);
210  pthread_cond_timedwait(&p_data->cond, &p_data->mutex, &ts);
211  }
212 
213  if (p_data->num_threads > 0) {
214  p_thread = p_data->p_head;
215  if (p_data->num_threads == 1) {
216  p_data->p_head = NULL;
217  p_data->p_tail = NULL;
218  } else {
219  p_thread->p_prev->p_next = p_thread->p_next;
220  p_thread->p_next->p_prev = p_thread->p_prev;
221  p_data->p_head = p_thread->p_next;
222  }
223  p_data->num_threads--;
224 
225  p_thread->p_prev = NULL;
226  p_thread->p_next = NULL;
228 
229  h_unit = (ABT_unit)p_thread;
230  }
231  pthread_mutex_unlock(&p_data->mutex);
232 
233  return h_unit;
234 }
235 
237 {
238  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
239  data_t *p_data = pool_get_data_ptr(p_pool->data);
240  ABTI_thread *p_thread = NULL;
241  ABT_unit h_unit = ABT_UNIT_NULL;
242 
243  pthread_mutex_lock(&p_data->mutex);
244  if (p_data->num_threads > 0) {
245  p_thread = p_data->p_head;
246  if (p_data->num_threads == 1) {
247  p_data->p_head = NULL;
248  p_data->p_tail = NULL;
249  } else {
250  p_thread->p_prev->p_next = p_thread->p_next;
251  p_thread->p_next->p_prev = p_thread->p_prev;
252  p_data->p_head = p_thread->p_next;
253  }
254  p_data->num_threads--;
255 
256  p_thread->p_prev = NULL;
257  p_thread->p_next = NULL;
259 
260  h_unit = (ABT_unit)p_thread;
261  }
262  pthread_mutex_unlock(&p_data->mutex);
263 
264  return h_unit;
265 }
266 
267 static int pool_remove(ABT_pool pool, ABT_unit unit)
268 {
269  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
270  data_t *p_data = pool_get_data_ptr(p_pool->data);
271  ABTI_thread *p_thread = (ABTI_thread *)unit;
272 
275  ABT_ERR_POOL);
276 
277  pthread_mutex_lock(&p_data->mutex);
278  if (p_data->num_threads == 1) {
279  p_data->p_head = NULL;
280  p_data->p_tail = NULL;
281  } else {
282  p_thread->p_prev->p_next = p_thread->p_next;
283  p_thread->p_next->p_prev = p_thread->p_prev;
284  if (p_thread == p_data->p_head) {
285  p_data->p_head = p_thread->p_next;
286  } else if (p_thread == p_data->p_tail) {
287  p_data->p_tail = p_thread->p_prev;
288  }
289  }
290  p_data->num_threads--;
291 
293  pthread_mutex_unlock(&p_data->mutex);
294 
295  p_thread->p_prev = NULL;
296  p_thread->p_next = NULL;
297 
298  return ABT_SUCCESS;
299 }
300 
301 static int pool_print_all(ABT_pool pool, void *arg,
302  void (*print_fn)(void *, ABT_unit))
303 {
304  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
305  data_t *p_data = pool_get_data_ptr(p_pool->data);
306 
307  pthread_mutex_lock(&p_data->mutex);
308 
309  size_t num_threads = p_data->num_threads;
310  ABTI_thread *p_thread = p_data->p_head;
311  while (num_threads--) {
312  ABTI_ASSERT(p_thread);
313  ABT_unit unit = (ABT_unit)p_thread;
314  print_fn(arg, unit);
315  p_thread = p_thread->p_next;
316  }
317 
318  pthread_mutex_unlock(&p_data->mutex);
319 
320  return ABT_SUCCESS;
321 }
322 
323 /* Unit functions */
324 
326 {
327  ABTI_thread *p_thread = (ABTI_thread *)unit;
328  return ABTI_thread_get_handle(p_thread);
329 }
330 
332 {
333  ABTI_thread *p_thread = (ABTI_thread *)unit;
335  : ABT_FALSE;
336 }
337 
339 {
340  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
341  p_thread->p_prev = NULL;
342  p_thread->p_next = NULL;
344 
345  return (ABT_unit)p_thread;
346 }
347 
348 static void unit_free(ABT_unit *unit)
349 {
350  *unit = ABT_UNIT_NULL;
351 }
ABTI_pool_def::p_init
ABT_pool_init_fn p_init
Definition: abti.h:338
ABTI_pool_def::p_get_size
ABT_pool_get_size_fn p_get_size
Definition: abti.h:339
pool_get_data_ptr
static data_t * pool_get_data_ptr(void *p_data)
Definition: fifo_wait.c:34
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:999
ABTI_pool_def::p_remove
ABT_pool_remove_fn p_remove
Definition: abti.h:344
ABT_thread
struct ABT_thread_opaque * ABT_thread
Work unit handle type.
Definition: abt.h:888
ABTI_pool_def::p_print_all
ABT_pool_print_all_fn p_print_all
Definition: abti.h:346
ABTI_pool_def
Definition: abti.h:332
pool_free
static int pool_free(ABT_pool pool)
Definition: fifo_wait.c:84
ABT_ERR_POOL
#define ABT_ERR_POOL
Error code: error related to a pool.
Definition: abt.h:282
ABTI_CHECK_ERROR
#define ABTI_CHECK_ERROR(abt_errno)
Definition: abti_error.h:120
data
Definition: fifo.c:30
ABTI_thread::p_next
ABTI_thread * p_next
Definition: abti.h:351
unit_is_in_pool
static ABT_bool unit_is_in_pool(ABT_unit unit)
Definition: fifo_wait.c:331
ABTI_thread_get_handle
static ABT_thread ABTI_thread_get_handle(ABTI_thread *p_thread)
Definition: abti_thread.h:24
ABTI_UNUSED
#define ABTI_UNUSED(a)
Definition: abti.h:103
ABTI_thread
Definition: abti.h:349
ABT_pool
struct ABT_pool_opaque * ABT_pool
Pool handle type.
Definition: abt.h:839
ABTI_pool_def::u_free
ABT_unit_free_fn u_free
Definition: abti.h:337
ABTI_pool_get_fifo_wait_def
ABTU_ret_err int ABTI_pool_get_fifo_wait_def(ABT_pool_access access, ABTI_pool_def *p_def)
Definition: fifo_wait.c:39
pool_pop_timedwait
static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
Definition: fifo_wait.c:198
ABTI_pool
Definition: abti.h:305
convert_double_sec_to_timespec
static void convert_double_sec_to_timespec(struct timespec *ts_out, double seconds)
Definition: fifo_wait.c:191
ABTI_pool_def::u_is_in_pool
ABT_unit_is_in_pool_fn u_is_in_pool
Definition: abti.h:335
abti.h
ABTI_pool::data
void * data
Definition: abti.h:311
ABT_pool_config
struct ABT_pool_config_opaque * ABT_pool_config
Pool configuration handle type.
Definition: abt.h:846
ABTU_malloc
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:218
data::mutex
ABTI_spinlock mutex
Definition: fifo.c:31
ABTD_atomic_relaxed_store_int
static void ABTD_atomic_relaxed_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:996
pool_print_all
static int pool_print_all(ABT_pool pool, void *arg, void(*print_fn)(void *, ABT_unit))
Definition: fifo_wait.c:301
ABT_unit
struct ABT_unit_opaque * ABT_unit
Work unit handle type for scheduling.
Definition: abt.h:867
ABTI_pool_def::u_create_from_thread
ABT_unit_create_from_thread_fn u_create_from_thread
Definition: abti.h:336
ABTI_ASSERT
#define ABTI_ASSERT(cond)
Definition: abti_error.h:12
unit_create_from_thread
static ABT_unit unit_create_from_thread(ABT_thread thread)
Definition: fifo_wait.c:338
ABTI_pool_def::p_push
ABT_pool_push_fn p_push
Definition: abti.h:340
ABTI_pool_def::p_pop_timedwait
ABT_pool_pop_timedwait_fn p_pop_timedwait
Definition: abti.h:343
ABTI_thread::is_in_pool
ABTD_atomic_int is_in_pool
Definition: abti.h:352
pool_push
static void pool_push(ABT_pool pool, ABT_unit unit)
Definition: fifo_wait.c:104
unit_free
static void unit_free(ABT_unit *unit)
Definition: fifo_wait.c:348
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
data::mutex
pthread_mutex_t mutex
Definition: fifo_wait.c:26
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:138
ABTI_pool_def::p_free
ABT_pool_free_fn p_free
Definition: abti.h:345
ABTI_pool_def::p_pop_wait
ABT_pool_pop_wait_fn p_pop_wait
Definition: abti.h:342
ABTD_atomic_acquire_load_int
static int ABTD_atomic_acquire_load_int(const ABTD_atomic_int *ptr)
Definition: abtd_atomic.h:878
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:746
ABTI_pool_get_ptr
static ABTI_pool * ABTI_pool_get_ptr(ABT_pool pool)
Definition: abti_pool.h:11
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:748
ABTI_pool_def::p_pop
ABT_pool_pop_fn p_pop
Definition: abti.h:341
pool_get_size
static size_t pool_get_size(ABT_pool pool)
Definition: fifo_wait.c:97
ABTU_free
static void ABTU_free(void *ptr)
Definition: abtu.h:211
data::num_threads
size_t num_threads
Definition: fifo.c:32
pool_pop_wait
static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs)
Definition: fifo_wait.c:132
ABTI_thread::p_prev
ABTI_thread * p_prev
Definition: abti.h:350
pool_pop
static ABT_unit pool_pop(ABT_pool pool)
Definition: fifo_wait.c:236
unit_get_thread
static ABT_thread unit_get_thread(ABT_unit unit)
Definition: fifo_wait.c:325
ABTI_CHECK_TRUE
#define ABTI_CHECK_TRUE(cond, abt_errno)
Definition: abti_error.h:130
ABTI_pool_def::u_get_thread
ABT_unit_get_thread_fn u_get_thread
Definition: abti.h:334
data::cond
pthread_cond_t cond
Definition: fifo_wait.c:27
ABTI_pool_def::access
ABT_pool_access access
Definition: abti.h:333
ABT_UNIT_NULL
#define ABT_UNIT_NULL
Definition: abt.h:1059
ABTD_atomic_release_store_int
static void ABTD_atomic_release_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:1065
data::p_tail
ABTI_thread * p_tail
Definition: fifo.c:34
ABTI_thread_get_ptr
static ABTI_thread * ABTI_thread_get_ptr(ABT_thread thread)
Definition: abti_thread.h:9
ABTI_get_wtime
static double ABTI_get_wtime(void)
Definition: abti_timer.h:11
data::p_head
ABTI_thread * p_head
Definition: fifo.c:33
pool_remove
static int pool_remove(ABT_pool pool, ABT_unit unit)
Definition: fifo_wait.c:267
ABT_pool_access
ABT_pool_access
Pool access type.
Definition: abt.h:522
pool_init
static int pool_init(ABT_pool pool, ABT_pool_config config)
Definition: fifo_wait.c:62