ARGOBOTS  1227c643f7a7f974f1f1778a9ffebd29d7dafecf
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
fifo.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 #include <time.h>
8 
9 /* FIFO pool implementation */
10 
11 static int pool_init(ABT_pool pool, ABT_pool_config config);
12 static int pool_free(ABT_pool pool);
13 static size_t pool_get_size(ABT_pool pool);
14 static void pool_push_shared(ABT_pool pool, ABT_unit unit);
15 static void pool_push_private(ABT_pool pool, ABT_unit unit);
16 static ABT_unit pool_pop_shared(ABT_pool pool);
18 static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs);
19 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs);
20 static int pool_remove_shared(ABT_pool pool, ABT_unit unit);
21 static int pool_remove_private(ABT_pool pool, ABT_unit unit);
22 static int pool_print_all(ABT_pool pool, void *arg,
23  void (*print_fn)(void *, ABT_unit));
24 
27 static ABT_task unit_get_task(ABT_unit unit);
28 static ABT_bool unit_is_in_pool(ABT_unit unit);
31 static void unit_free(ABT_unit *unit);
32 
33 struct data {
34  ABTI_spinlock mutex;
35  size_t num_threads;
36  ABTI_thread *p_head;
37  ABTI_thread *p_tail;
38 };
39 typedef struct data data_t;
40 
41 static inline data_t *pool_get_data_ptr(void *p_data)
42 {
43  return (data_t *)p_data;
44 }
45 
46 /* Obtain the FIFO pool definition according to the access type */
47 ABTU_ret_err int ABTI_pool_get_fifo_def(ABT_pool_access access,
48  ABT_pool_def *p_def)
49 {
50  int abt_errno = ABT_SUCCESS;
51 
52  /* Definitions according to the access type */
53  /* FIXME: need better implementation, e.g., lock-free one */
54  switch (access) {
56  p_def->p_push = pool_push_private;
57  p_def->p_pop = pool_pop_private;
59  break;
60 
65  p_def->p_push = pool_push_shared;
66  p_def->p_pop = pool_pop_shared;
68  break;
69 
70  default:
71  ABTI_CHECK_TRUE(0, ABT_ERR_INV_POOL_ACCESS);
72  }
73 
74  /* Common definitions regardless of the access type */
75  p_def->access = access;
76  p_def->p_init = pool_init;
77  p_def->p_free = pool_free;
78  p_def->p_get_size = pool_get_size;
79  p_def->p_pop_wait = pool_pop_wait;
81  p_def->p_print_all = pool_print_all;
82  p_def->u_get_type = unit_get_type;
84  p_def->u_get_task = unit_get_task;
88  p_def->u_free = unit_free;
89 
90 fn_exit:
91  return abt_errno;
92 
93 fn_fail:
94  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
95  goto fn_exit;
96 }
97 
98 /* Pool functions */
99 
100 static int pool_init(ABT_pool pool, ABT_pool_config config)
101 {
102  ABTI_UNUSED(config);
103  int abt_errno = ABT_SUCCESS;
104  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
105  ABT_pool_access access;
106 
107  data_t *p_data;
108  abt_errno = ABTU_malloc(sizeof(data_t), (void **)&p_data);
109  ABTI_CHECK_ERROR_RET(abt_errno);
110 
111  access = p_pool->access;
112 
113  if (access != ABT_POOL_ACCESS_PRIV) {
114  /* Initialize the mutex */
115  ABTI_spinlock_clear(&p_data->mutex);
116  }
117 
118  p_data->num_threads = 0;
119  p_data->p_head = NULL;
120  p_data->p_tail = NULL;
121 
122  p_pool->data = p_data;
123 
124  return abt_errno;
125 }
126 
127 static int pool_free(ABT_pool pool)
128 {
129  int abt_errno = ABT_SUCCESS;
130  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
131  data_t *p_data = pool_get_data_ptr(p_pool->data);
132 
133  ABTU_free(p_data);
134 
135  return abt_errno;
136 }
137 
138 static size_t pool_get_size(ABT_pool pool)
139 {
140  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
141  data_t *p_data = pool_get_data_ptr(p_pool->data);
142  return p_data->num_threads;
143 }
144 
145 static void pool_push_shared(ABT_pool pool, ABT_unit unit)
146 {
147  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
148  data_t *p_data = pool_get_data_ptr(p_pool->data);
149  ABTI_thread *p_thread = (ABTI_thread *)unit;
150 
151  ABTI_spinlock_acquire(&p_data->mutex);
152  if (p_data->num_threads == 0) {
153  p_thread->p_prev = p_thread;
154  p_thread->p_next = p_thread;
155  p_data->p_head = p_thread;
156  p_data->p_tail = p_thread;
157  } else {
158  ABTI_thread *p_head = p_data->p_head;
159  ABTI_thread *p_tail = p_data->p_tail;
160  p_tail->p_next = p_thread;
161  p_head->p_prev = p_thread;
162  p_thread->p_prev = p_tail;
163  p_thread->p_next = p_head;
164  p_data->p_tail = p_thread;
165  }
166  p_data->num_threads++;
167 
168  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1);
169  ABTI_spinlock_release(&p_data->mutex);
170 }
171 
172 static void pool_push_private(ABT_pool pool, ABT_unit unit)
173 {
174  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
175  data_t *p_data = pool_get_data_ptr(p_pool->data);
176  ABTI_thread *p_thread = (ABTI_thread *)unit;
177 
178  if (p_data->num_threads == 0) {
179  p_thread->p_prev = p_thread;
180  p_thread->p_next = p_thread;
181  p_data->p_head = p_thread;
182  p_data->p_tail = p_thread;
183  } else {
184  ABTI_thread *p_head = p_data->p_head;
185  ABTI_thread *p_tail = p_data->p_tail;
186  p_tail->p_next = p_thread;
187  p_head->p_prev = p_thread;
188  p_thread->p_prev = p_tail;
189  p_thread->p_next = p_head;
190  p_data->p_tail = p_thread;
191  }
192  p_data->num_threads++;
193 
194  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1);
195 }
196 
197 static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs)
198 {
199  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
200  data_t *p_data = pool_get_data_ptr(p_pool->data);
201  ABTI_thread *p_thread = NULL;
202  ABT_unit h_unit = ABT_UNIT_NULL;
203 
204  double time_start = 0.0;
205 
206  do {
207  ABTI_spinlock_acquire(&p_data->mutex);
208  if (p_data->num_threads > 0) {
209  p_thread = p_data->p_head;
210  if (p_data->num_threads == 1) {
211  p_data->p_head = NULL;
212  p_data->p_tail = NULL;
213  } else {
214  p_thread->p_prev->p_next = p_thread->p_next;
215  p_thread->p_next->p_prev = p_thread->p_prev;
216  p_data->p_head = p_thread->p_next;
217  }
218  p_data->num_threads--;
219 
220  p_thread->p_prev = NULL;
221  p_thread->p_next = NULL;
222  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
223 
224  h_unit = (ABT_unit)p_thread;
225  ABTI_spinlock_release(&p_data->mutex);
226  } else {
227  ABTI_spinlock_release(&p_data->mutex);
228  if (time_start == 0.0) {
229  time_start = ABTI_get_wtime();
230  } else {
231  double elapsed = ABTI_get_wtime() - time_start;
232  if (elapsed > time_secs)
233  break;
234  }
235  /* Sleep. */
236  const int sleep_nsecs = 100;
237  struct timespec ts = { 0, sleep_nsecs };
238  nanosleep(&ts, NULL);
239  }
240  } while (h_unit == ABT_UNIT_NULL);
241 
242  return h_unit;
243 }
244 
245 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
246 {
247  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
248  data_t *p_data = pool_get_data_ptr(p_pool->data);
249  ABTI_thread *p_thread = NULL;
250  ABT_unit h_unit = ABT_UNIT_NULL;
251 
252  do {
253  ABTI_spinlock_acquire(&p_data->mutex);
254  if (p_data->num_threads > 0) {
255  p_thread = p_data->p_head;
256  if (p_data->num_threads == 1) {
257  p_data->p_head = NULL;
258  p_data->p_tail = NULL;
259  } else {
260  p_thread->p_prev->p_next = p_thread->p_next;
261  p_thread->p_next->p_prev = p_thread->p_prev;
262  p_data->p_head = p_thread->p_next;
263  }
264  p_data->num_threads--;
265 
266  p_thread->p_prev = NULL;
267  p_thread->p_next = NULL;
268  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
269 
270  h_unit = (ABT_unit)p_thread;
271  ABTI_spinlock_release(&p_data->mutex);
272  } else {
273  ABTI_spinlock_release(&p_data->mutex);
274  /* Sleep. */
275  const int sleep_nsecs = 100;
276  struct timespec ts = { 0, sleep_nsecs };
277  nanosleep(&ts, NULL);
278 
279  if (ABTI_get_wtime() > abstime_secs)
280  break;
281  }
282  } while (h_unit == ABT_UNIT_NULL);
283 
284  return h_unit;
285 }
286 
288 {
289  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
290  data_t *p_data = pool_get_data_ptr(p_pool->data);
291  ABTI_thread *p_thread = NULL;
292  ABT_unit h_unit = ABT_UNIT_NULL;
293 
294  ABTI_spinlock_acquire(&p_data->mutex);
295  if (p_data->num_threads > 0) {
296  p_thread = p_data->p_head;
297  if (p_data->num_threads == 1) {
298  p_data->p_head = NULL;
299  p_data->p_tail = NULL;
300  } else {
301  p_thread->p_prev->p_next = p_thread->p_next;
302  p_thread->p_next->p_prev = p_thread->p_prev;
303  p_data->p_head = p_thread->p_next;
304  }
305  p_data->num_threads--;
306 
307  p_thread->p_prev = NULL;
308  p_thread->p_next = NULL;
309  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
310 
311  h_unit = (ABT_unit)p_thread;
312  }
313  ABTI_spinlock_release(&p_data->mutex);
314 
315  return h_unit;
316 }
317 
319 {
320  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
321  data_t *p_data = pool_get_data_ptr(p_pool->data);
322  ABTI_thread *p_thread = NULL;
323  ABT_unit h_unit = ABT_UNIT_NULL;
324 
325  if (p_data->num_threads > 0) {
326  p_thread = p_data->p_head;
327  if (p_data->num_threads == 1) {
328  p_data->p_head = NULL;
329  p_data->p_tail = NULL;
330  } else {
331  p_thread->p_prev->p_next = p_thread->p_next;
332  p_thread->p_next->p_prev = p_thread->p_prev;
333  p_data->p_head = p_thread->p_next;
334  }
335  p_data->num_threads--;
336 
337  p_thread->p_prev = NULL;
338  p_thread->p_next = NULL;
339  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
340 
341  h_unit = (ABT_unit)p_thread;
342  }
343 
344  return h_unit;
345 }
346 
347 static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
348 {
349  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
350  data_t *p_data = pool_get_data_ptr(p_pool->data);
351  ABTI_thread *p_thread = (ABTI_thread *)unit;
352 
353  ABTI_CHECK_TRUE_RET(p_data->num_threads != 0, ABT_ERR_POOL);
354  ABTI_CHECK_TRUE_RET(ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) ==
355  1,
356  ABT_ERR_POOL);
357 
358  ABTI_spinlock_acquire(&p_data->mutex);
359  if (p_data->num_threads == 1) {
360  p_data->p_head = NULL;
361  p_data->p_tail = NULL;
362  } else {
363  p_thread->p_prev->p_next = p_thread->p_next;
364  p_thread->p_next->p_prev = p_thread->p_prev;
365  if (p_thread == p_data->p_head) {
366  p_data->p_head = p_thread->p_next;
367  } else if (p_thread == p_data->p_tail) {
368  p_data->p_tail = p_thread->p_prev;
369  }
370  }
371  p_data->num_threads--;
372 
373  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
374  ABTI_spinlock_release(&p_data->mutex);
375 
376  p_thread->p_prev = NULL;
377  p_thread->p_next = NULL;
378 
379  return ABT_SUCCESS;
380 }
381 
382 static int pool_remove_private(ABT_pool pool, ABT_unit unit)
383 {
384  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
385  data_t *p_data = pool_get_data_ptr(p_pool->data);
386  ABTI_thread *p_thread = (ABTI_thread *)unit;
387 
388  ABTI_CHECK_TRUE_RET(p_data->num_threads != 0, ABT_ERR_POOL);
389  ABTI_CHECK_TRUE_RET(ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) ==
390  1,
391  ABT_ERR_POOL);
392 
393  if (p_data->num_threads == 1) {
394  p_data->p_head = NULL;
395  p_data->p_tail = NULL;
396  } else {
397  p_thread->p_prev->p_next = p_thread->p_next;
398  p_thread->p_next->p_prev = p_thread->p_prev;
399  if (p_thread == p_data->p_head) {
400  p_data->p_head = p_thread->p_next;
401  } else if (p_thread == p_data->p_tail) {
402  p_data->p_tail = p_thread->p_prev;
403  }
404  }
405  p_data->num_threads--;
406 
407  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
408  p_thread->p_prev = NULL;
409  p_thread->p_next = NULL;
410 
411  return ABT_SUCCESS;
412 }
413 
414 static int pool_print_all(ABT_pool pool, void *arg,
415  void (*print_fn)(void *, ABT_unit))
416 {
417  ABT_pool_access access;
418  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
419  data_t *p_data = pool_get_data_ptr(p_pool->data);
420 
421  access = p_pool->access;
422  if (access != ABT_POOL_ACCESS_PRIV) {
423  ABTI_spinlock_acquire(&p_data->mutex);
424  }
425 
426  size_t num_threads = p_data->num_threads;
427  ABTI_thread *p_thread = p_data->p_head;
428  while (num_threads--) {
429  ABTI_ASSERT(p_thread);
430  ABT_unit unit = (ABT_unit)p_thread;
431  print_fn(arg, unit);
432  p_thread = p_thread->p_next;
433  }
434 
435  if (access != ABT_POOL_ACCESS_PRIV) {
436  ABTI_spinlock_release(&p_data->mutex);
437  }
438 
439  return ABT_SUCCESS;
440 }
441 
442 /* Unit functions */
443 
445 {
446  ABTI_thread *p_thread = (ABTI_thread *)unit;
447  return ABTI_thread_type_get_type(p_thread->type);
448 }
449 
451 {
452  ABT_thread h_thread;
453  ABTI_ythread *p_ythread =
454  ABTI_thread_get_ythread_or_null((ABTI_thread *)unit);
455  if (p_ythread) {
456  h_thread = ABTI_ythread_get_handle(p_ythread);
457  } else {
458  h_thread = ABT_THREAD_NULL;
459  }
460  return h_thread;
461 }
462 
464 {
465  ABT_task h_task;
466  ABTI_thread *p_thread = (ABTI_thread *)unit;
467  if (!(p_thread->type & ABTI_THREAD_TYPE_YIELDABLE)) {
468  h_task = ABTI_thread_get_handle(p_thread);
469  } else {
470  h_task = ABT_TASK_NULL;
471  }
472  return h_task;
473 }
474 
476 {
477  ABTI_thread *p_thread = (ABTI_thread *)unit;
478  return ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) ? ABT_TRUE
479  : ABT_FALSE;
480 }
481 
483 {
484  ABTI_ythread *p_ythread = ABTI_ythread_get_ptr(thread);
485  ABTI_thread *p_thread = &p_ythread->thread;
486  p_thread->p_prev = NULL;
487  p_thread->p_next = NULL;
488  ABTD_atomic_relaxed_store_int(&p_thread->is_in_pool, 0);
489  ABTI_ASSERT(p_thread->type & ABTI_THREAD_TYPE_YIELDABLE);
490 
491  return (ABT_unit)p_thread;
492 }
493 
495 {
496  ABTI_thread *p_thread = ABTI_thread_get_ptr(task);
497  p_thread->p_prev = NULL;
498  p_thread->p_next = NULL;
499  ABTD_atomic_relaxed_store_int(&p_thread->is_in_pool, 0);
500  ABTI_ASSERT(!(p_thread->type & ABTI_THREAD_TYPE_YIELDABLE));
501 
502  return (ABT_unit)p_thread;
503 }
504 
505 static void unit_free(ABT_unit *unit)
506 {
507  *unit = ABT_UNIT_NULL;
508 }
static int pool_remove_private(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:382
static ABT_thread unit_get_thread(ABT_unit unit)
Definition: fifo.c:450
struct ABT_unit_opaque * ABT_unit
Definition: abt.h:337
static ABT_unit pool_pop_shared(ABT_pool pool)
Definition: fifo.c:287
ABT_unit_get_task_fn u_get_task
Definition: abt.h:494
#define ABT_ERR_INV_POOL_ACCESS
Definition: abt.h:78
static void pool_push_shared(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:145
ABT_pool_init_fn p_init
Definition: abt.h:501
#define ABT_UNIT_NULL
Definition: abt.h:415
struct ABT_thread_opaque * ABT_task
Definition: abt.h:353
int ABT_bool
Definition: abt.h:373
static size_t pool_get_size(ABT_pool pool)
Definition: fifo.c:138
ABT_unit_get_thread_fn u_get_thread
Definition: abt.h:493
ABT_pool_pop_fn p_pop
Definition: abt.h:504
static ABT_bool unit_is_in_pool(ABT_unit unit)
Definition: fifo.c:475
ABT_pool_access access
Definition: abt.h:489
struct ABT_pool_opaque * ABT_pool
Definition: abt.h:329
ABT_unit_is_in_pool_fn u_is_in_pool
Definition: abt.h:495
ABT_pool_push_fn p_push
Definition: abt.h:503
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:142
#define ABT_FALSE
Definition: abt.h:285
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:343
static ABT_unit unit_create_from_thread(ABT_thread thread)
Definition: fifo.c:482
struct data data_t
Definition: fifo.c:39
#define HANDLE_ERROR_FUNC_WITH_CODE(n)
Definition: abti_error.h:353
static void pool_push_private(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:172
static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs)
Definition: fifo.c:197
ABT_pool_free_fn p_free
Definition: abt.h:510
static int pool_free(ABT_pool pool)
Definition: fifo.c:127
#define ABT_SUCCESS
Definition: abt.h:64
ABT_unit_free_fn u_free
Definition: abt.h:498
ABT_pool_access
Definition: abt.h:161
#define ABT_TRUE
Definition: abt.h:284
ABT_unit_type
Definition: abt.h:169
static ABT_unit unit_create_from_task(ABT_task task)
Definition: fifo.c:494
static ABT_unit pool_pop_private(ABT_pool pool)
Definition: fifo.c:318
ABT_unit_create_from_task_fn u_create_from_task
Definition: abt.h:497
static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:347
#define ABT_THREAD_NULL
Definition: abt.h:416
static void unit_free(ABT_unit *unit)
Definition: fifo.c:505
static int pool_init(ABT_pool pool, ABT_pool_config config)
Definition: fifo.c:100
ABT_unit_create_from_thread_fn u_create_from_thread
Definition: abt.h:496
static ABT_unit_type unit_get_type(ABT_unit unit)
Definition: fifo.c:444
static ABT_task unit_get_task(ABT_unit unit)
Definition: fifo.c:463
struct ABT_pool_config_opaque * ABT_pool_config
Definition: abt.h:331
ABT_unit_get_type_fn u_get_type
Definition: abt.h:492
ABT_pool_get_size_fn p_get_size
Definition: abt.h:502
static int pool_print_all(ABT_pool pool, void *arg, void(*print_fn)(void *, ABT_unit))
Definition: fifo.c:414
ABT_pool_pop_wait_fn p_pop_wait
Definition: abt.h:505
#define ABT_TASK_NULL
Definition: abt.h:429
static void ABTU_free(void *ptr)
Definition: abtu.h:135
ABT_pool_remove_fn p_remove
Definition: abt.h:509
ABT_pool_print_all_fn p_print_all
Definition: abt.h:511
#define ABT_ERR_POOL
Definition: abt.h:99
static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
Definition: fifo.c:245
ABT_pool_pop_timedwait_fn p_pop_timedwait
Definition: abt.h:506
static data_t * pool_get_data_ptr(void *p_data)
Definition: fifo.c:41
#define ABTU_ret_err
Definition: abtu.h:49