ARGOBOTS  1227c643f7a7f974f1f1778a9ffebd29d7dafecf
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
fifo_wait.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 /* FIFO_WAIT pool implementation */
9 
10 static int pool_init(ABT_pool pool, ABT_pool_config config);
11 static int pool_free(ABT_pool pool);
12 static size_t pool_get_size(ABT_pool pool);
13 static void pool_push(ABT_pool pool, ABT_unit unit);
14 static ABT_unit pool_pop(ABT_pool pool);
15 static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs);
16 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs);
17 static int pool_remove(ABT_pool pool, ABT_unit unit);
18 static int pool_print_all(ABT_pool pool, void *arg,
19  void (*print_fn)(void *, ABT_unit));
22 static ABT_task unit_get_task(ABT_unit unit);
23 static ABT_bool unit_is_in_pool(ABT_unit unit);
26 static void unit_free(ABT_unit *unit);
27 
28 struct data {
29  pthread_mutex_t mutex;
30  pthread_cond_t cond;
31  size_t num_threads;
32  ABTI_thread *p_head;
33  ABTI_thread *p_tail;
34 };
35 typedef struct data data_t;
36 
37 static inline data_t *pool_get_data_ptr(void *p_data)
38 {
39  return (data_t *)p_data;
40 }
41 
42 ABTU_ret_err int ABTI_pool_get_fifo_wait_def(ABT_pool_access access,
43  ABT_pool_def *p_def)
44 {
45  p_def->access = access;
46  p_def->p_init = pool_init;
47  p_def->p_free = pool_free;
48  p_def->p_get_size = pool_get_size;
49  p_def->p_push = pool_push;
50  p_def->p_pop = pool_pop;
51  p_def->p_pop_wait = pool_pop_wait;
53  p_def->p_remove = pool_remove;
54  p_def->p_print_all = pool_print_all;
55  p_def->u_get_type = unit_get_type;
57  p_def->u_get_task = unit_get_task;
61  p_def->u_free = unit_free;
62 
63  return ABT_SUCCESS;
64 }
65 
66 /* Pool functions */
67 
68 static int pool_init(ABT_pool pool, ABT_pool_config config)
69 {
70  ABTI_UNUSED(config);
71  int abt_errno = ABT_SUCCESS;
72  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
73 
74  data_t *p_data;
75  abt_errno = ABTU_malloc(sizeof(data_t), (void **)&p_data);
76  ABTI_CHECK_ERROR_RET(abt_errno);
77 
78  pthread_mutex_init(&p_data->mutex, NULL);
79  pthread_cond_init(&p_data->cond, NULL);
80 
81  p_data->num_threads = 0;
82  p_data->p_head = NULL;
83  p_data->p_tail = NULL;
84 
85  p_pool->data = p_data;
86 
87  return abt_errno;
88 }
89 
90 static int pool_free(ABT_pool pool)
91 {
92  int abt_errno = ABT_SUCCESS;
93  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
94  data_t *p_data = pool_get_data_ptr(p_pool->data);
95 
96  pthread_mutex_destroy(&p_data->mutex);
97  pthread_cond_destroy(&p_data->cond);
98  ABTU_free(p_data);
99 
100  return abt_errno;
101 }
102 
103 static size_t pool_get_size(ABT_pool pool)
104 {
105  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
106  data_t *p_data = pool_get_data_ptr(p_pool->data);
107  return p_data->num_threads;
108 }
109 
110 static void pool_push(ABT_pool pool, ABT_unit unit)
111 {
112  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
113  data_t *p_data = pool_get_data_ptr(p_pool->data);
114  ABTI_thread *p_thread = (ABTI_thread *)unit;
115 
116  pthread_mutex_lock(&p_data->mutex);
117  if (p_data->num_threads == 0) {
118  p_thread->p_prev = p_thread;
119  p_thread->p_next = p_thread;
120  p_data->p_head = p_thread;
121  p_data->p_tail = p_thread;
122  } else {
123  ABTI_thread *p_head = p_data->p_head;
124  ABTI_thread *p_tail = p_data->p_tail;
125  p_tail->p_next = p_thread;
126  p_head->p_prev = p_thread;
127  p_thread->p_prev = p_tail;
128  p_thread->p_next = p_head;
129  p_data->p_tail = p_thread;
130  }
131  p_data->num_threads++;
132 
133  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1);
134  pthread_cond_signal(&p_data->cond);
135  pthread_mutex_unlock(&p_data->mutex);
136 }
137 
138 static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs)
139 {
140  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
141  data_t *p_data = pool_get_data_ptr(p_pool->data);
142  ABTI_thread *p_thread = NULL;
143  ABT_unit h_unit = ABT_UNIT_NULL;
144 
145  pthread_mutex_lock(&p_data->mutex);
146 
147  if (!p_data->num_threads) {
148 #if defined(ABT_CONFIG_USE_CLOCK_GETTIME)
149  struct timespec ts;
150  clock_gettime(CLOCK_REALTIME, &ts);
151  ts.tv_sec += (time_t)time_secs;
152  ts.tv_nsec += (long)((time_secs - (time_t)time_secs) * 1e9);
153  if (ts.tv_nsec > 1e9) {
154  ts.tv_sec += 1;
155  ts.tv_nsec -= 1e9;
156  }
157  pthread_cond_timedwait(&p_data->cond, &p_data->mutex, &ts);
158 #else
159  /* We cannot use pthread_cond_timedwait(). Let's use nanosleep()
160  * instead */
161  double start_time = ABTI_get_wtime();
162  while (ABTI_get_wtime() - start_time < time_secs) {
163  pthread_mutex_unlock(&p_data->mutex);
164  const int sleep_nsecs = 100;
165  struct timespec ts = { 0, sleep_nsecs };
166  nanosleep(&ts, NULL);
167  pthread_mutex_lock(&p_data->mutex);
168  if (p_data->num_threads > 0)
169  break;
170  }
171 #endif
172  }
173 
174  if (p_data->num_threads > 0) {
175  p_thread = p_data->p_head;
176  if (p_data->num_threads == 1) {
177  p_data->p_head = NULL;
178  p_data->p_tail = NULL;
179  } else {
180  p_thread->p_prev->p_next = p_thread->p_next;
181  p_thread->p_next->p_prev = p_thread->p_prev;
182  p_data->p_head = p_thread->p_next;
183  }
184  p_data->num_threads--;
185 
186  p_thread->p_prev = NULL;
187  p_thread->p_next = NULL;
188  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
189 
190  h_unit = (ABT_unit)p_thread;
191  }
192  pthread_mutex_unlock(&p_data->mutex);
193 
194  return h_unit;
195 }
196 
197 static inline void convert_double_sec_to_timespec(struct timespec *ts_out,
198  double seconds)
199 {
200  ts_out->tv_sec = (time_t)seconds;
201  ts_out->tv_nsec = (long)((seconds - ts_out->tv_sec) * 1000000000.0);
202 }
203 
204 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
205 {
206  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
207  data_t *p_data = pool_get_data_ptr(p_pool->data);
208  ABTI_thread *p_thread = NULL;
209  ABT_unit h_unit = ABT_UNIT_NULL;
210 
211  pthread_mutex_lock(&p_data->mutex);
212 
213  if (!p_data->num_threads) {
214  struct timespec ts;
215  convert_double_sec_to_timespec(&ts, abstime_secs);
216  pthread_cond_timedwait(&p_data->cond, &p_data->mutex, &ts);
217  }
218 
219  if (p_data->num_threads > 0) {
220  p_thread = p_data->p_head;
221  if (p_data->num_threads == 1) {
222  p_data->p_head = NULL;
223  p_data->p_tail = NULL;
224  } else {
225  p_thread->p_prev->p_next = p_thread->p_next;
226  p_thread->p_next->p_prev = p_thread->p_prev;
227  p_data->p_head = p_thread->p_next;
228  }
229  p_data->num_threads--;
230 
231  p_thread->p_prev = NULL;
232  p_thread->p_next = NULL;
233  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
234 
235  h_unit = (ABT_unit)p_thread;
236  }
237  pthread_mutex_unlock(&p_data->mutex);
238 
239  return h_unit;
240 }
241 
243 {
244  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
245  data_t *p_data = pool_get_data_ptr(p_pool->data);
246  ABTI_thread *p_thread = NULL;
247  ABT_unit h_unit = ABT_UNIT_NULL;
248 
249  pthread_mutex_lock(&p_data->mutex);
250  if (p_data->num_threads > 0) {
251  p_thread = p_data->p_head;
252  if (p_data->num_threads == 1) {
253  p_data->p_head = NULL;
254  p_data->p_tail = NULL;
255  } else {
256  p_thread->p_prev->p_next = p_thread->p_next;
257  p_thread->p_next->p_prev = p_thread->p_prev;
258  p_data->p_head = p_thread->p_next;
259  }
260  p_data->num_threads--;
261 
262  p_thread->p_prev = NULL;
263  p_thread->p_next = NULL;
264  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
265 
266  h_unit = (ABT_unit)p_thread;
267  }
268  pthread_mutex_unlock(&p_data->mutex);
269 
270  return h_unit;
271 }
272 
273 static int pool_remove(ABT_pool pool, ABT_unit unit)
274 {
275  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
276  data_t *p_data = pool_get_data_ptr(p_pool->data);
277  ABTI_thread *p_thread = (ABTI_thread *)unit;
278 
279  ABTI_CHECK_TRUE_RET(p_data->num_threads != 0, ABT_ERR_POOL);
280  ABTI_CHECK_TRUE_RET(ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) ==
281  1,
282  ABT_ERR_POOL);
283 
284  pthread_mutex_lock(&p_data->mutex);
285  if (p_data->num_threads == 1) {
286  p_data->p_head = NULL;
287  p_data->p_tail = NULL;
288  } else {
289  p_thread->p_prev->p_next = p_thread->p_next;
290  p_thread->p_next->p_prev = p_thread->p_prev;
291  if (p_thread == p_data->p_head) {
292  p_data->p_head = p_thread->p_next;
293  } else if (p_thread == p_data->p_tail) {
294  p_data->p_tail = p_thread->p_prev;
295  }
296  }
297  p_data->num_threads--;
298 
299  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
300  pthread_mutex_unlock(&p_data->mutex);
301 
302  p_thread->p_prev = NULL;
303  p_thread->p_next = NULL;
304 
305  return ABT_SUCCESS;
306 }
307 
308 static int pool_print_all(ABT_pool pool, void *arg,
309  void (*print_fn)(void *, ABT_unit))
310 {
311  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
312  data_t *p_data = pool_get_data_ptr(p_pool->data);
313 
314  pthread_mutex_lock(&p_data->mutex);
315 
316  size_t num_threads = p_data->num_threads;
317  ABTI_thread *p_thread = p_data->p_head;
318  while (num_threads--) {
319  ABTI_ASSERT(p_thread);
320  ABT_unit unit = (ABT_unit)p_thread;
321  print_fn(arg, unit);
322  p_thread = p_thread->p_next;
323  }
324 
325  pthread_mutex_unlock(&p_data->mutex);
326 
327  return ABT_SUCCESS;
328 }
329 
330 /* Unit functions */
331 
333 {
334  ABTI_thread *p_thread = (ABTI_thread *)unit;
335  return ABTI_thread_type_get_type(p_thread->type);
336 }
337 
339 {
340  ABT_thread h_thread;
341  ABTI_ythread *p_ythread =
342  ABTI_thread_get_ythread_or_null((ABTI_thread *)unit);
343  if (p_ythread) {
344  h_thread = ABTI_ythread_get_handle(p_ythread);
345  } else {
346  h_thread = ABT_THREAD_NULL;
347  }
348  return h_thread;
349 }
350 
352 {
353  ABT_task h_task;
354  ABTI_thread *p_thread = (ABTI_thread *)unit;
355  if (!(p_thread->type & ABTI_THREAD_TYPE_YIELDABLE)) {
356  h_task = ABTI_thread_get_handle(p_thread);
357  } else {
358  h_task = ABT_TASK_NULL;
359  }
360  return h_task;
361 }
362 
364 {
365  ABTI_thread *p_thread = (ABTI_thread *)unit;
366  return ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) ? ABT_TRUE
367  : ABT_FALSE;
368 }
369 
371 {
372  ABTI_ythread *p_ythread = ABTI_ythread_get_ptr(thread);
373  ABTI_thread *p_thread = &p_ythread->thread;
374  p_thread->p_prev = NULL;
375  p_thread->p_next = NULL;
376  ABTD_atomic_relaxed_store_int(&p_thread->is_in_pool, 0);
377  ABTI_ASSERT(p_thread->type & ABTI_THREAD_TYPE_YIELDABLE);
378 
379  return (ABT_unit)p_thread;
380 }
381 
383 {
384  ABTI_thread *p_thread = ABTI_thread_get_ptr(task);
385  p_thread->p_prev = NULL;
386  p_thread->p_next = NULL;
387  ABTD_atomic_relaxed_store_int(&p_thread->is_in_pool, 0);
388  ABTI_ASSERT(!(p_thread->type & ABTI_THREAD_TYPE_YIELDABLE));
389 
390  return (ABT_unit)p_thread;
391 }
392 
393 static void unit_free(ABT_unit *unit)
394 {
395  *unit = ABT_UNIT_NULL;
396 }
static ABT_unit unit_create_from_thread(ABT_thread thread)
Definition: fifo_wait.c:370
struct ABT_unit_opaque * ABT_unit
Definition: abt.h:337
ABT_unit_get_task_fn u_get_task
Definition: abt.h:494
ABT_pool_init_fn p_init
Definition: abt.h:501
#define ABT_UNIT_NULL
Definition: abt.h:415
struct ABT_thread_opaque * ABT_task
Definition: abt.h:353
static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
Definition: fifo_wait.c:204
static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs)
Definition: fifo_wait.c:138
int ABT_bool
Definition: abt.h:373
ABT_unit_get_thread_fn u_get_thread
Definition: abt.h:493
ABT_pool_pop_fn p_pop
Definition: abt.h:504
static size_t pool_get_size(ABT_pool pool)
Definition: fifo_wait.c:103
static int pool_print_all(ABT_pool pool, void *arg, void(*print_fn)(void *, ABT_unit))
Definition: fifo_wait.c:308
ABT_pool_access access
Definition: abt.h:489
static ABT_unit unit_create_from_task(ABT_task task)
Definition: fifo_wait.c:382
struct ABT_pool_opaque * ABT_pool
Definition: abt.h:329
ABT_unit_is_in_pool_fn u_is_in_pool
Definition: abt.h:495
ABT_pool_push_fn p_push
Definition: abt.h:503
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:142
static ABT_unit_type unit_get_type(ABT_unit unit)
Definition: fifo_wait.c:332
static int pool_init(ABT_pool pool, ABT_pool_config config)
Definition: fifo_wait.c:68
#define ABT_FALSE
Definition: abt.h:285
static ABT_unit pool_pop(ABT_pool pool)
Definition: fifo_wait.c:242
static int pool_remove(ABT_pool pool, ABT_unit unit)
Definition: fifo_wait.c:273
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:343
static data_t * pool_get_data_ptr(void *p_data)
Definition: fifo_wait.c:37
static int pool_free(ABT_pool pool)
Definition: fifo_wait.c:90
struct data data_t
Definition: fifo.c:39
ABT_pool_free_fn p_free
Definition: abt.h:510
#define ABT_SUCCESS
Definition: abt.h:64
ABT_unit_free_fn u_free
Definition: abt.h:498
ABT_pool_access
Definition: abt.h:161
#define ABT_TRUE
Definition: abt.h:284
ABT_unit_type
Definition: abt.h:169
static void unit_free(ABT_unit *unit)
Definition: fifo_wait.c:393
static void convert_double_sec_to_timespec(struct timespec *ts_out, double seconds)
Definition: fifo_wait.c:197
static ABT_task unit_get_task(ABT_unit unit)
Definition: fifo_wait.c:351
ABT_unit_create_from_task_fn u_create_from_task
Definition: abt.h:497
#define ABT_THREAD_NULL
Definition: abt.h:416
ABT_unit_create_from_thread_fn u_create_from_thread
Definition: abt.h:496
static ABT_bool unit_is_in_pool(ABT_unit unit)
Definition: fifo_wait.c:363
static ABT_thread unit_get_thread(ABT_unit unit)
Definition: fifo_wait.c:338
struct ABT_pool_config_opaque * ABT_pool_config
Definition: abt.h:331
ABT_unit_get_type_fn u_get_type
Definition: abt.h:492
ABT_pool_get_size_fn p_get_size
Definition: abt.h:502
ABT_pool_pop_wait_fn p_pop_wait
Definition: abt.h:505
#define ABT_TASK_NULL
Definition: abt.h:429
static void ABTU_free(void *ptr)
Definition: abtu.h:135
ABT_pool_remove_fn p_remove
Definition: abt.h:509
ABT_pool_print_all_fn p_print_all
Definition: abt.h:511
#define ABT_ERR_POOL
Definition: abt.h:99
static void pool_push(ABT_pool pool, ABT_unit unit)
Definition: fifo_wait.c:110
ABT_pool_pop_timedwait_fn p_pop_timedwait
Definition: abt.h:506
#define ABTU_ret_err
Definition: abtu.h:49