ARGOBOTS  c6511494322293e01714f56f341b8d2b22c1e3c1
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
fifo.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 #include <time.h>
8 
9 /* FIFO pool implementation */
10 
11 static int pool_init(ABT_pool pool, ABT_pool_config config);
12 static int pool_free(ABT_pool pool);
13 static size_t pool_get_size(ABT_pool pool);
14 static void pool_push_shared(ABT_pool pool, ABT_unit unit);
15 static void pool_push_private(ABT_pool pool, ABT_unit unit);
16 static ABT_unit pool_pop_shared(ABT_pool pool);
18 static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs);
19 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs);
20 static int pool_remove_shared(ABT_pool pool, ABT_unit unit);
21 static int pool_remove_private(ABT_pool pool, ABT_unit unit);
22 static int pool_print_all(ABT_pool pool, void *arg,
23  void (*print_fn)(void *, ABT_unit));
24 
27 static ABT_task unit_get_task(ABT_unit unit);
28 static ABT_bool unit_is_in_pool(ABT_unit unit);
31 static void unit_free(ABT_unit *unit);
32 
33 struct data {
34  ABTI_spinlock mutex;
35  size_t num_threads;
36  ABTI_thread *p_head;
37  ABTI_thread *p_tail;
38 };
39 typedef struct data data_t;
40 
41 static inline data_t *pool_get_data_ptr(void *p_data)
42 {
43  return (data_t *)p_data;
44 }
45 
46 /* Obtain the FIFO pool definition according to the access type */
47 ABTU_ret_err int ABTI_pool_get_fifo_def(ABT_pool_access access,
48  ABT_pool_def *p_def)
49 {
50  /* Definitions according to the access type */
51  /* FIXME: need better implementation, e.g., lock-free one */
52  switch (access) {
54  p_def->p_push = pool_push_private;
55  p_def->p_pop = pool_pop_private;
57  break;
58 
63  p_def->p_push = pool_push_shared;
64  p_def->p_pop = pool_pop_shared;
66  break;
67 
68  default:
69  ABTI_HANDLE_ERROR(ABT_ERR_INV_POOL_ACCESS);
70  }
71 
72  /* Common definitions regardless of the access type */
73  p_def->access = access;
74  p_def->p_init = pool_init;
75  p_def->p_free = pool_free;
76  p_def->p_get_size = pool_get_size;
77  p_def->p_pop_wait = pool_pop_wait;
79  p_def->p_print_all = pool_print_all;
80  p_def->u_get_type = unit_get_type;
82  p_def->u_get_task = unit_get_task;
86  p_def->u_free = unit_free;
87 
88  return ABT_SUCCESS;
89 }
90 
91 /* Pool functions */
92 
93 static int pool_init(ABT_pool pool, ABT_pool_config config)
94 {
95  ABTI_UNUSED(config);
96  int abt_errno = ABT_SUCCESS;
97  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
98  ABT_pool_access access;
99 
100  data_t *p_data;
101  abt_errno = ABTU_malloc(sizeof(data_t), (void **)&p_data);
102  ABTI_CHECK_ERROR(abt_errno);
103 
104  access = p_pool->access;
105 
106  if (access != ABT_POOL_ACCESS_PRIV) {
107  /* Initialize the mutex */
108  ABTI_spinlock_clear(&p_data->mutex);
109  }
110 
111  p_data->num_threads = 0;
112  p_data->p_head = NULL;
113  p_data->p_tail = NULL;
114 
115  p_pool->data = p_data;
116 
117  return abt_errno;
118 }
119 
120 static int pool_free(ABT_pool pool)
121 {
122  int abt_errno = ABT_SUCCESS;
123  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
124  data_t *p_data = pool_get_data_ptr(p_pool->data);
125 
126  ABTU_free(p_data);
127 
128  return abt_errno;
129 }
130 
131 static size_t pool_get_size(ABT_pool pool)
132 {
133  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
134  data_t *p_data = pool_get_data_ptr(p_pool->data);
135  return p_data->num_threads;
136 }
137 
138 static void pool_push_shared(ABT_pool pool, ABT_unit unit)
139 {
140  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
141  data_t *p_data = pool_get_data_ptr(p_pool->data);
142  ABTI_thread *p_thread = (ABTI_thread *)unit;
143 
144  ABTI_spinlock_acquire(&p_data->mutex);
145  if (p_data->num_threads == 0) {
146  p_thread->p_prev = p_thread;
147  p_thread->p_next = p_thread;
148  p_data->p_head = p_thread;
149  p_data->p_tail = p_thread;
150  } else {
151  ABTI_thread *p_head = p_data->p_head;
152  ABTI_thread *p_tail = p_data->p_tail;
153  p_tail->p_next = p_thread;
154  p_head->p_prev = p_thread;
155  p_thread->p_prev = p_tail;
156  p_thread->p_next = p_head;
157  p_data->p_tail = p_thread;
158  }
159  p_data->num_threads++;
160 
161  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1);
162  ABTI_spinlock_release(&p_data->mutex);
163 }
164 
165 static void pool_push_private(ABT_pool pool, ABT_unit unit)
166 {
167  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
168  data_t *p_data = pool_get_data_ptr(p_pool->data);
169  ABTI_thread *p_thread = (ABTI_thread *)unit;
170 
171  if (p_data->num_threads == 0) {
172  p_thread->p_prev = p_thread;
173  p_thread->p_next = p_thread;
174  p_data->p_head = p_thread;
175  p_data->p_tail = p_thread;
176  } else {
177  ABTI_thread *p_head = p_data->p_head;
178  ABTI_thread *p_tail = p_data->p_tail;
179  p_tail->p_next = p_thread;
180  p_head->p_prev = p_thread;
181  p_thread->p_prev = p_tail;
182  p_thread->p_next = p_head;
183  p_data->p_tail = p_thread;
184  }
185  p_data->num_threads++;
186 
187  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1);
188 }
189 
190 static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs)
191 {
192  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
193  data_t *p_data = pool_get_data_ptr(p_pool->data);
194  ABTI_thread *p_thread = NULL;
195  ABT_unit h_unit = ABT_UNIT_NULL;
196 
197  double time_start = 0.0;
198 
199  do {
200  ABTI_spinlock_acquire(&p_data->mutex);
201  if (p_data->num_threads > 0) {
202  p_thread = p_data->p_head;
203  if (p_data->num_threads == 1) {
204  p_data->p_head = NULL;
205  p_data->p_tail = NULL;
206  } else {
207  p_thread->p_prev->p_next = p_thread->p_next;
208  p_thread->p_next->p_prev = p_thread->p_prev;
209  p_data->p_head = p_thread->p_next;
210  }
211  p_data->num_threads--;
212 
213  p_thread->p_prev = NULL;
214  p_thread->p_next = NULL;
215  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
216 
217  h_unit = (ABT_unit)p_thread;
218  ABTI_spinlock_release(&p_data->mutex);
219  } else {
220  ABTI_spinlock_release(&p_data->mutex);
221  if (time_start == 0.0) {
222  time_start = ABTI_get_wtime();
223  } else {
224  double elapsed = ABTI_get_wtime() - time_start;
225  if (elapsed > time_secs)
226  break;
227  }
228  /* Sleep. */
229  const int sleep_nsecs = 100;
230  struct timespec ts = { 0, sleep_nsecs };
231  nanosleep(&ts, NULL);
232  }
233  } while (h_unit == ABT_UNIT_NULL);
234 
235  return h_unit;
236 }
237 
238 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
239 {
240  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
241  data_t *p_data = pool_get_data_ptr(p_pool->data);
242  ABTI_thread *p_thread = NULL;
243  ABT_unit h_unit = ABT_UNIT_NULL;
244 
245  do {
246  ABTI_spinlock_acquire(&p_data->mutex);
247  if (p_data->num_threads > 0) {
248  p_thread = p_data->p_head;
249  if (p_data->num_threads == 1) {
250  p_data->p_head = NULL;
251  p_data->p_tail = NULL;
252  } else {
253  p_thread->p_prev->p_next = p_thread->p_next;
254  p_thread->p_next->p_prev = p_thread->p_prev;
255  p_data->p_head = p_thread->p_next;
256  }
257  p_data->num_threads--;
258 
259  p_thread->p_prev = NULL;
260  p_thread->p_next = NULL;
261  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
262 
263  h_unit = (ABT_unit)p_thread;
264  ABTI_spinlock_release(&p_data->mutex);
265  } else {
266  ABTI_spinlock_release(&p_data->mutex);
267  /* Sleep. */
268  const int sleep_nsecs = 100;
269  struct timespec ts = { 0, sleep_nsecs };
270  nanosleep(&ts, NULL);
271 
272  if (ABTI_get_wtime() > abstime_secs)
273  break;
274  }
275  } while (h_unit == ABT_UNIT_NULL);
276 
277  return h_unit;
278 }
279 
281 {
282  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
283  data_t *p_data = pool_get_data_ptr(p_pool->data);
284  ABTI_thread *p_thread = NULL;
285  ABT_unit h_unit = ABT_UNIT_NULL;
286 
287  ABTI_spinlock_acquire(&p_data->mutex);
288  if (p_data->num_threads > 0) {
289  p_thread = p_data->p_head;
290  if (p_data->num_threads == 1) {
291  p_data->p_head = NULL;
292  p_data->p_tail = NULL;
293  } else {
294  p_thread->p_prev->p_next = p_thread->p_next;
295  p_thread->p_next->p_prev = p_thread->p_prev;
296  p_data->p_head = p_thread->p_next;
297  }
298  p_data->num_threads--;
299 
300  p_thread->p_prev = NULL;
301  p_thread->p_next = NULL;
302  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
303 
304  h_unit = (ABT_unit)p_thread;
305  }
306  ABTI_spinlock_release(&p_data->mutex);
307 
308  return h_unit;
309 }
310 
312 {
313  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
314  data_t *p_data = pool_get_data_ptr(p_pool->data);
315  ABTI_thread *p_thread = NULL;
316  ABT_unit h_unit = ABT_UNIT_NULL;
317 
318  if (p_data->num_threads > 0) {
319  p_thread = p_data->p_head;
320  if (p_data->num_threads == 1) {
321  p_data->p_head = NULL;
322  p_data->p_tail = NULL;
323  } else {
324  p_thread->p_prev->p_next = p_thread->p_next;
325  p_thread->p_next->p_prev = p_thread->p_prev;
326  p_data->p_head = p_thread->p_next;
327  }
328  p_data->num_threads--;
329 
330  p_thread->p_prev = NULL;
331  p_thread->p_next = NULL;
332  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
333 
334  h_unit = (ABT_unit)p_thread;
335  }
336 
337  return h_unit;
338 }
339 
340 static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
341 {
342  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
343  data_t *p_data = pool_get_data_ptr(p_pool->data);
344  ABTI_thread *p_thread = (ABTI_thread *)unit;
345 
346  ABTI_CHECK_TRUE(p_data->num_threads != 0, ABT_ERR_POOL);
347  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) == 1,
348  ABT_ERR_POOL);
349 
350  ABTI_spinlock_acquire(&p_data->mutex);
351  if (p_data->num_threads == 1) {
352  p_data->p_head = NULL;
353  p_data->p_tail = NULL;
354  } else {
355  p_thread->p_prev->p_next = p_thread->p_next;
356  p_thread->p_next->p_prev = p_thread->p_prev;
357  if (p_thread == p_data->p_head) {
358  p_data->p_head = p_thread->p_next;
359  } else if (p_thread == p_data->p_tail) {
360  p_data->p_tail = p_thread->p_prev;
361  }
362  }
363  p_data->num_threads--;
364 
365  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
366  ABTI_spinlock_release(&p_data->mutex);
367 
368  p_thread->p_prev = NULL;
369  p_thread->p_next = NULL;
370 
371  return ABT_SUCCESS;
372 }
373 
374 static int pool_remove_private(ABT_pool pool, ABT_unit unit)
375 {
376  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
377  data_t *p_data = pool_get_data_ptr(p_pool->data);
378  ABTI_thread *p_thread = (ABTI_thread *)unit;
379 
380  ABTI_CHECK_TRUE(p_data->num_threads != 0, ABT_ERR_POOL);
381  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) == 1,
382  ABT_ERR_POOL);
383 
384  if (p_data->num_threads == 1) {
385  p_data->p_head = NULL;
386  p_data->p_tail = NULL;
387  } else {
388  p_thread->p_prev->p_next = p_thread->p_next;
389  p_thread->p_next->p_prev = p_thread->p_prev;
390  if (p_thread == p_data->p_head) {
391  p_data->p_head = p_thread->p_next;
392  } else if (p_thread == p_data->p_tail) {
393  p_data->p_tail = p_thread->p_prev;
394  }
395  }
396  p_data->num_threads--;
397 
398  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
399  p_thread->p_prev = NULL;
400  p_thread->p_next = NULL;
401 
402  return ABT_SUCCESS;
403 }
404 
405 static int pool_print_all(ABT_pool pool, void *arg,
406  void (*print_fn)(void *, ABT_unit))
407 {
408  ABT_pool_access access;
409  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
410  data_t *p_data = pool_get_data_ptr(p_pool->data);
411 
412  access = p_pool->access;
413  if (access != ABT_POOL_ACCESS_PRIV) {
414  ABTI_spinlock_acquire(&p_data->mutex);
415  }
416 
417  size_t num_threads = p_data->num_threads;
418  ABTI_thread *p_thread = p_data->p_head;
419  while (num_threads--) {
420  ABTI_ASSERT(p_thread);
421  ABT_unit unit = (ABT_unit)p_thread;
422  print_fn(arg, unit);
423  p_thread = p_thread->p_next;
424  }
425 
426  if (access != ABT_POOL_ACCESS_PRIV) {
427  ABTI_spinlock_release(&p_data->mutex);
428  }
429 
430  return ABT_SUCCESS;
431 }
432 
433 /* Unit functions */
434 
436 {
437  ABTI_thread *p_thread = (ABTI_thread *)unit;
438  return ABTI_thread_type_get_type(p_thread->type);
439 }
440 
442 {
443  ABT_thread h_thread;
444  ABTI_ythread *p_ythread =
445  ABTI_thread_get_ythread_or_null((ABTI_thread *)unit);
446  if (p_ythread) {
447  h_thread = ABTI_ythread_get_handle(p_ythread);
448  } else {
449  h_thread = ABT_THREAD_NULL;
450  }
451  return h_thread;
452 }
453 
455 {
456  ABT_task h_task;
457  ABTI_thread *p_thread = (ABTI_thread *)unit;
458  if (!(p_thread->type & ABTI_THREAD_TYPE_YIELDABLE)) {
459  h_task = ABTI_thread_get_handle(p_thread);
460  } else {
461  h_task = ABT_TASK_NULL;
462  }
463  return h_task;
464 }
465 
467 {
468  ABTI_thread *p_thread = (ABTI_thread *)unit;
469  return ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) ? ABT_TRUE
470  : ABT_FALSE;
471 }
472 
474 {
475  ABTI_ythread *p_ythread = ABTI_ythread_get_ptr(thread);
476  ABTI_thread *p_thread = &p_ythread->thread;
477  p_thread->p_prev = NULL;
478  p_thread->p_next = NULL;
479  ABTD_atomic_relaxed_store_int(&p_thread->is_in_pool, 0);
480  ABTI_ASSERT(p_thread->type & ABTI_THREAD_TYPE_YIELDABLE);
481 
482  return (ABT_unit)p_thread;
483 }
484 
486 {
487  ABTI_thread *p_thread = ABTI_thread_get_ptr(task);
488  p_thread->p_prev = NULL;
489  p_thread->p_next = NULL;
490  ABTD_atomic_relaxed_store_int(&p_thread->is_in_pool, 0);
491  ABTI_ASSERT(!(p_thread->type & ABTI_THREAD_TYPE_YIELDABLE));
492 
493  return (ABT_unit)p_thread;
494 }
495 
496 static void unit_free(ABT_unit *unit)
497 {
498  *unit = ABT_UNIT_NULL;
499 }
static int pool_remove_private(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:374
static ABT_thread unit_get_thread(ABT_unit unit)
Definition: fifo.c:441
struct ABT_unit_opaque * ABT_unit
Definition: abt.h:337
static ABT_unit pool_pop_shared(ABT_pool pool)
Definition: fifo.c:280
ABT_unit_get_task_fn u_get_task
Definition: abt.h:494
#define ABT_ERR_INV_POOL_ACCESS
Definition: abt.h:78
static void pool_push_shared(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:138
ABT_pool_init_fn p_init
Definition: abt.h:501
#define ABT_UNIT_NULL
Definition: abt.h:415
struct ABT_thread_opaque * ABT_task
Definition: abt.h:353
int ABT_bool
Definition: abt.h:373
static size_t pool_get_size(ABT_pool pool)
Definition: fifo.c:131
ABT_unit_get_thread_fn u_get_thread
Definition: abt.h:493
ABT_pool_pop_fn p_pop
Definition: abt.h:504
static ABT_bool unit_is_in_pool(ABT_unit unit)
Definition: fifo.c:466
ABT_pool_access access
Definition: abt.h:489
struct ABT_pool_opaque * ABT_pool
Definition: abt.h:329
ABT_unit_is_in_pool_fn u_is_in_pool
Definition: abt.h:495
ABT_pool_push_fn p_push
Definition: abt.h:503
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:129
#define ABT_FALSE
Definition: abt.h:285
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:343
static ABT_unit unit_create_from_thread(ABT_thread thread)
Definition: fifo.c:473
struct data data_t
Definition: fifo.c:39
static void pool_push_private(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:165
static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs)
Definition: fifo.c:190
ABT_pool_free_fn p_free
Definition: abt.h:510
static int pool_free(ABT_pool pool)
Definition: fifo.c:120
#define ABT_SUCCESS
Definition: abt.h:64
ABT_unit_free_fn u_free
Definition: abt.h:498
ABT_pool_access
Definition: abt.h:161
#define ABT_TRUE
Definition: abt.h:284
ABT_unit_type
Definition: abt.h:169
static ABT_unit unit_create_from_task(ABT_task task)
Definition: fifo.c:485
static ABT_unit pool_pop_private(ABT_pool pool)
Definition: fifo.c:311
ABT_unit_create_from_task_fn u_create_from_task
Definition: abt.h:497
static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:340
#define ABT_THREAD_NULL
Definition: abt.h:416
static void unit_free(ABT_unit *unit)
Definition: fifo.c:496
static int pool_init(ABT_pool pool, ABT_pool_config config)
Definition: fifo.c:93
ABT_unit_create_from_thread_fn u_create_from_thread
Definition: abt.h:496
static ABT_unit_type unit_get_type(ABT_unit unit)
Definition: fifo.c:435
static ABT_task unit_get_task(ABT_unit unit)
Definition: fifo.c:454
struct ABT_pool_config_opaque * ABT_pool_config
Definition: abt.h:331
ABT_unit_get_type_fn u_get_type
Definition: abt.h:492
ABT_pool_get_size_fn p_get_size
Definition: abt.h:502
static int pool_print_all(ABT_pool pool, void *arg, void(*print_fn)(void *, ABT_unit))
Definition: fifo.c:405
ABT_pool_pop_wait_fn p_pop_wait
Definition: abt.h:505
#define ABT_TASK_NULL
Definition: abt.h:429
static void ABTU_free(void *ptr)
Definition: abtu.h:122
ABT_pool_remove_fn p_remove
Definition: abt.h:509
ABT_pool_print_all_fn p_print_all
Definition: abt.h:511
#define ABT_ERR_POOL
Definition: abt.h:99
static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
Definition: fifo.c:238
ABT_pool_pop_timedwait_fn p_pop_timedwait
Definition: abt.h:506
static data_t * pool_get_data_ptr(void *p_data)
Definition: fifo.c:41
#define ABTU_ret_err
Definition: abtu.h:49