ARGOBOTS
fifo.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 #include <time.h>
8 
9 static inline double get_cur_time(void)
10 {
11 #if defined(HAVE_CLOCK_GETTIME)
12  struct timespec ts;
13  clock_gettime(CLOCK_REALTIME, &ts);
14  return ((double)ts.tv_sec) + 1.0e-9 * ((double)ts.tv_nsec);
15 #elif defined(HAVE_GETTIMEOFDAY)
16  struct timeval tv;
17  gettimeofday(&tv, NULL);
18  return ((double)tv.tv_sec) + 1.0e-6 * ((double)tv.tv_usec);
19 #else
20 #error "No timer function available"
21  return 0.0;
22 #endif
23 }
24 
25 /* FIFO pool implementation */
26 
27 static int pool_init(ABT_pool pool, ABT_pool_config config);
28 static int pool_free(ABT_pool pool);
29 static size_t pool_get_size(ABT_pool pool);
30 static void pool_push_shared(ABT_pool pool, ABT_unit unit);
31 static void pool_push_private(ABT_pool pool, ABT_unit unit);
32 static ABT_unit pool_pop_shared(ABT_pool pool);
34 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs);
35 static int pool_remove_shared(ABT_pool pool, ABT_unit unit);
36 static int pool_remove_private(ABT_pool pool, ABT_unit unit);
37 static int pool_print_all(ABT_pool pool, void *arg,
38  void (*print_fn)(void *, ABT_unit));
39 
40 typedef ABTI_unit unit_t;
43 static ABT_task unit_get_task(ABT_unit unit);
44 static ABT_bool unit_is_in_pool(ABT_unit unit);
47 static void unit_free(ABT_unit *unit);
48 
49 struct data {
50  ABTI_spinlock mutex;
51  size_t num_units;
52  unit_t *p_head;
53  unit_t *p_tail;
54 };
55 typedef struct data data_t;
56 
57 static inline data_t *pool_get_data_ptr(void *p_data)
58 {
59  return (data_t *)p_data;
60 }
61 
62 /* Obtain the FIFO pool definition according to the access type */
63 int ABTI_pool_get_fifo_def(ABT_pool_access access, ABT_pool_def *p_def)
64 {
65  int abt_errno = ABT_SUCCESS;
66 
67  /* Definitions according to the access type */
68  /* FIXME: need better implementation, e.g., lock-free one */
69  switch (access) {
71  p_def->p_push = pool_push_private;
72  p_def->p_pop = pool_pop_private;
74  break;
75 
80  p_def->p_push = pool_push_shared;
81  p_def->p_pop = pool_pop_shared;
83  break;
84 
85  default:
86  ABTI_CHECK_TRUE(0, ABT_ERR_INV_POOL_ACCESS);
87  }
88 
89  /* Common definitions regardless of the access type */
90  p_def->access = access;
91  p_def->p_init = pool_init;
92  p_def->p_free = pool_free;
93  p_def->p_get_size = pool_get_size;
95  p_def->p_print_all = pool_print_all;
96  p_def->u_get_type = unit_get_type;
98  p_def->u_get_task = unit_get_task;
102  p_def->u_free = unit_free;
103 
104 fn_exit:
105  return abt_errno;
106 
107 fn_fail:
108  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
109  goto fn_exit;
110 }
111 
112 /* Pool functions */
113 
115 {
116  ABTI_UNUSED(config);
117  int abt_errno = ABT_SUCCESS;
118  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
119  ABT_pool_access access;
120 
121  data_t *p_data = (data_t *)ABTU_malloc(sizeof(data_t));
122 
123  access = p_pool->access;
124 
125  if (access != ABT_POOL_ACCESS_PRIV) {
126  /* Initialize the mutex */
127  ABTI_spinlock_clear(&p_data->mutex);
128  }
129 
130  p_data->num_units = 0;
131  p_data->p_head = NULL;
132  p_data->p_tail = NULL;
133 
134  p_pool->data = p_data;
135 
136  return abt_errno;
137 }
138 
139 static int pool_free(ABT_pool pool)
140 {
141  int abt_errno = ABT_SUCCESS;
142  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
143  data_t *p_data = pool_get_data_ptr(p_pool->data);
144 
145  ABTU_free(p_data);
146 
147  return abt_errno;
148 }
149 
150 static size_t pool_get_size(ABT_pool pool)
151 {
152  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
153  data_t *p_data = pool_get_data_ptr(p_pool->data);
154  return p_data->num_units;
155 }
156 
157 static void pool_push_shared(ABT_pool pool, ABT_unit unit)
158 {
159  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
160  data_t *p_data = pool_get_data_ptr(p_pool->data);
161  unit_t *p_unit = (unit_t *)unit;
162 
163  ABTI_spinlock_acquire(&p_data->mutex);
164  if (p_data->num_units == 0) {
165  p_unit->p_prev = p_unit;
166  p_unit->p_next = p_unit;
167  p_data->p_head = p_unit;
168  p_data->p_tail = p_unit;
169  } else {
170  unit_t *p_head = p_data->p_head;
171  unit_t *p_tail = p_data->p_tail;
172  p_tail->p_next = p_unit;
173  p_head->p_prev = p_unit;
174  p_unit->p_prev = p_tail;
175  p_unit->p_next = p_head;
176  p_data->p_tail = p_unit;
177  }
178  p_data->num_units++;
179 
180  ABTD_atomic_release_store_int(&p_unit->is_in_pool, 1);
181  ABTI_spinlock_release(&p_data->mutex);
182 }
183 
184 static void pool_push_private(ABT_pool pool, ABT_unit unit)
185 {
186  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
187  data_t *p_data = pool_get_data_ptr(p_pool->data);
188  unit_t *p_unit = (unit_t *)unit;
189 
190  if (p_data->num_units == 0) {
191  p_unit->p_prev = p_unit;
192  p_unit->p_next = p_unit;
193  p_data->p_head = p_unit;
194  p_data->p_tail = p_unit;
195  } else {
196  unit_t *p_head = p_data->p_head;
197  unit_t *p_tail = p_data->p_tail;
198  p_tail->p_next = p_unit;
199  p_head->p_prev = p_unit;
200  p_unit->p_prev = p_tail;
201  p_unit->p_next = p_head;
202  p_data->p_tail = p_unit;
203  }
204  p_data->num_units++;
205 
206  ABTD_atomic_release_store_int(&p_unit->is_in_pool, 1);
207 }
208 
209 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
210 {
211  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
212  data_t *p_data = pool_get_data_ptr(p_pool->data);
213  unit_t *p_unit = NULL;
214  ABT_unit h_unit = ABT_UNIT_NULL;
215 
216  double time_start = get_cur_time();
217 
218  do {
219  ABTI_spinlock_acquire(&p_data->mutex);
220  if (p_data->num_units > 0) {
221  p_unit = p_data->p_head;
222  if (p_data->num_units == 1) {
223  p_data->p_head = NULL;
224  p_data->p_tail = NULL;
225  } else {
226  p_unit->p_prev->p_next = p_unit->p_next;
227  p_unit->p_next->p_prev = p_unit->p_prev;
228  p_data->p_head = p_unit->p_next;
229  }
230  p_data->num_units--;
231 
232  p_unit->p_prev = NULL;
233  p_unit->p_next = NULL;
234  ABTD_atomic_release_store_int(&p_unit->is_in_pool, 0);
235 
236  h_unit = (ABT_unit)p_unit;
237  ABTI_spinlock_release(&p_data->mutex);
238  } else {
239  ABTI_spinlock_release(&p_data->mutex);
240  /* Sleep. */
241  const int sleep_nsecs = 100;
242  struct timespec ts = { 0, sleep_nsecs };
243  nanosleep(&ts, NULL);
244 
245  double elapsed = get_cur_time() - time_start;
246  if (elapsed > abstime_secs)
247  break;
248  }
249  } while (h_unit == ABT_UNIT_NULL);
250 
251  return h_unit;
252 }
253 
255 {
256  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
257  data_t *p_data = pool_get_data_ptr(p_pool->data);
258  unit_t *p_unit = NULL;
259  ABT_unit h_unit = ABT_UNIT_NULL;
260 
261  ABTI_spinlock_acquire(&p_data->mutex);
262  if (p_data->num_units > 0) {
263  p_unit = p_data->p_head;
264  if (p_data->num_units == 1) {
265  p_data->p_head = NULL;
266  p_data->p_tail = NULL;
267  } else {
268  p_unit->p_prev->p_next = p_unit->p_next;
269  p_unit->p_next->p_prev = p_unit->p_prev;
270  p_data->p_head = p_unit->p_next;
271  }
272  p_data->num_units--;
273 
274  p_unit->p_prev = NULL;
275  p_unit->p_next = NULL;
276  ABTD_atomic_release_store_int(&p_unit->is_in_pool, 0);
277 
278  h_unit = (ABT_unit)p_unit;
279  }
280  ABTI_spinlock_release(&p_data->mutex);
281 
282  return h_unit;
283 }
284 
286 {
287  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
288  data_t *p_data = pool_get_data_ptr(p_pool->data);
289  unit_t *p_unit = NULL;
290  ABT_unit h_unit = ABT_UNIT_NULL;
291 
292  if (p_data->num_units > 0) {
293  p_unit = p_data->p_head;
294  if (p_data->num_units == 1) {
295  p_data->p_head = NULL;
296  p_data->p_tail = NULL;
297  } else {
298  p_unit->p_prev->p_next = p_unit->p_next;
299  p_unit->p_next->p_prev = p_unit->p_prev;
300  p_data->p_head = p_unit->p_next;
301  }
302  p_data->num_units--;
303 
304  p_unit->p_prev = NULL;
305  p_unit->p_next = NULL;
306  ABTD_atomic_release_store_int(&p_unit->is_in_pool, 0);
307 
308  h_unit = (ABT_unit)p_unit;
309  }
310 
311  return h_unit;
312 }
313 
314 static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
315 {
316  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
317  data_t *p_data = pool_get_data_ptr(p_pool->data);
318  unit_t *p_unit = (unit_t *)unit;
319 
320  ABTI_CHECK_TRUE_RET(p_data->num_units != 0, ABT_ERR_POOL);
321  ABTI_CHECK_TRUE_RET(ABTD_atomic_acquire_load_int(&p_unit->is_in_pool) == 1,
322  ABT_ERR_POOL);
323 
324  ABTI_spinlock_acquire(&p_data->mutex);
325  if (p_data->num_units == 1) {
326  p_data->p_head = NULL;
327  p_data->p_tail = NULL;
328  } else {
329  p_unit->p_prev->p_next = p_unit->p_next;
330  p_unit->p_next->p_prev = p_unit->p_prev;
331  if (p_unit == p_data->p_head) {
332  p_data->p_head = p_unit->p_next;
333  } else if (p_unit == p_data->p_tail) {
334  p_data->p_tail = p_unit->p_prev;
335  }
336  }
337  p_data->num_units--;
338 
339  ABTD_atomic_release_store_int(&p_unit->is_in_pool, 0);
340  ABTI_spinlock_release(&p_data->mutex);
341 
342  p_unit->p_prev = NULL;
343  p_unit->p_next = NULL;
344 
345  return ABT_SUCCESS;
346 }
347 
348 static int pool_remove_private(ABT_pool pool, ABT_unit unit)
349 {
350  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
351  data_t *p_data = pool_get_data_ptr(p_pool->data);
352  unit_t *p_unit = (unit_t *)unit;
353 
354  ABTI_CHECK_TRUE_RET(p_data->num_units != 0, ABT_ERR_POOL);
355  ABTI_CHECK_TRUE_RET(ABTD_atomic_acquire_load_int(&p_unit->is_in_pool) == 1,
356  ABT_ERR_POOL);
357 
358  if (p_data->num_units == 1) {
359  p_data->p_head = NULL;
360  p_data->p_tail = NULL;
361  } else {
362  p_unit->p_prev->p_next = p_unit->p_next;
363  p_unit->p_next->p_prev = p_unit->p_prev;
364  if (p_unit == p_data->p_head) {
365  p_data->p_head = p_unit->p_next;
366  } else if (p_unit == p_data->p_tail) {
367  p_data->p_tail = p_unit->p_prev;
368  }
369  }
370  p_data->num_units--;
371 
372  ABTD_atomic_release_store_int(&p_unit->is_in_pool, 0);
373  p_unit->p_prev = NULL;
374  p_unit->p_next = NULL;
375 
376  return ABT_SUCCESS;
377 }
378 
379 static int pool_print_all(ABT_pool pool, void *arg,
380  void (*print_fn)(void *, ABT_unit))
381 {
382  ABT_pool_access access;
383  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
384  data_t *p_data = pool_get_data_ptr(p_pool->data);
385 
386  access = p_pool->access;
387  if (access != ABT_POOL_ACCESS_PRIV) {
388  ABTI_spinlock_acquire(&p_data->mutex);
389  }
390 
391  size_t num_units = p_data->num_units;
392  unit_t *p_unit = p_data->p_head;
393  while (num_units--) {
394  ABTI_ASSERT(p_unit);
395  ABT_unit unit = (ABT_unit)p_unit;
396  print_fn(arg, unit);
397  p_unit = p_unit->p_next;
398  }
399 
400  if (access != ABT_POOL_ACCESS_PRIV) {
401  ABTI_spinlock_release(&p_data->mutex);
402  }
403 
404  return ABT_SUCCESS;
405 }
406 
407 /* Unit functions */
408 
410 {
411  unit_t *p_unit = (unit_t *)unit;
412  return p_unit->type;
413 }
414 
416 {
417  ABT_thread h_thread;
418  unit_t *p_unit = (unit_t *)unit;
419  if (p_unit->type == ABT_UNIT_TYPE_THREAD) {
420  h_thread = p_unit->handle.thread;
421  } else {
422  h_thread = ABT_THREAD_NULL;
423  }
424  return h_thread;
425 }
426 
428 {
429  ABT_task h_task;
430  unit_t *p_unit = (unit_t *)unit;
431  if (p_unit->type == ABT_UNIT_TYPE_TASK) {
432  h_task = p_unit->handle.task;
433  } else {
434  h_task = ABT_TASK_NULL;
435  }
436  return h_task;
437 }
438 
440 {
441  unit_t *p_unit = (unit_t *)unit;
442  return ABTD_atomic_acquire_load_int(&p_unit->is_in_pool) ? ABT_TRUE
443  : ABT_FALSE;
444 }
445 
447 {
448  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
449  unit_t *p_unit = &p_thread->unit_def;
450  p_unit->p_prev = NULL;
451  p_unit->p_next = NULL;
452  ABTD_atomic_relaxed_store_int(&p_unit->is_in_pool, 0);
453  p_unit->handle.thread = thread;
454  p_unit->type = ABT_UNIT_TYPE_THREAD;
455 
456  return (ABT_unit)p_unit;
457 }
458 
460 {
461  ABTI_task *p_task = ABTI_task_get_ptr(task);
462  unit_t *p_unit = &p_task->unit_def;
463  p_unit->p_prev = NULL;
464  p_unit->p_next = NULL;
465  ABTD_atomic_relaxed_store_int(&p_unit->is_in_pool, 0);
466  p_unit->handle.task = task;
467  p_unit->type = ABT_UNIT_TYPE_TASK;
468 
469  return (ABT_unit)p_unit;
470 }
471 
472 static void unit_free(ABT_unit *unit)
473 {
474  *unit = ABT_UNIT_NULL;
475 }
static int pool_remove_private(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:348
static ABT_thread unit_get_thread(ABT_unit unit)
Definition: fifo.c:415
struct ABT_unit_opaque * ABT_unit
Definition: abt.h:275
static ABT_unit pool_pop_shared(ABT_pool pool)
Definition: fifo.c:254
ABT_unit_get_task_fn u_get_task
Definition: abt.h:420
#define ABT_ERR_INV_POOL_ACCESS
Definition: abt.h:78
static void pool_push_shared(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:157
ABT_pool_init_fn p_init
Definition: abt.h:427
#define ABT_UNIT_NULL
Definition: abt.h:343
struct ABT_task_opaque * ABT_task
Definition: abt.h:289
static void * ABTU_malloc(size_t size)
Definition: abtu.h:39
int ABT_bool
Definition: abt.h:309
static size_t pool_get_size(ABT_pool pool)
Definition: fifo.c:150
ABT_unit_get_thread_fn u_get_thread
Definition: abt.h:419
ABT_pool_pop_fn p_pop
Definition: abt.h:430
static ABT_bool unit_is_in_pool(ABT_unit unit)
Definition: fifo.c:439
ABT_pool_access access
Definition: abt.h:415
struct ABT_pool_opaque * ABT_pool
Definition: abt.h:267
ABT_unit_is_in_pool_fn u_is_in_pool
Definition: abt.h:421
ABT_pool_push_fn p_push
Definition: abt.h:429
#define ABT_FALSE
Definition: abt.h:224
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:279
static ABT_unit unit_create_from_thread(ABT_thread thread)
Definition: fifo.c:446
struct data data_t
Definition: fifo.c:55
#define HANDLE_ERROR_FUNC_WITH_CODE(n)
Definition: abti_error.h:241
static void pool_push_private(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:184
ABT_pool_free_fn p_free
Definition: abt.h:433
static int pool_free(ABT_pool pool)
Definition: fifo.c:139
#define ABT_SUCCESS
Definition: abt.h:64
ABT_unit_free_fn u_free
Definition: abt.h:424
ABT_pool_access
Definition: abt.h:162
#define ABT_TRUE
Definition: abt.h:223
ABT_unit_type
Definition: abt.h:170
static ABT_unit unit_create_from_task(ABT_task task)
Definition: fifo.c:459
static ABT_unit pool_pop_private(ABT_pool pool)
Definition: fifo.c:285
ABT_unit_create_from_task_fn u_create_from_task
Definition: abt.h:423
static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:314
#define ABT_THREAD_NULL
Definition: abt.h:344
ABTI_unit unit_t
Definition: fifo.c:40
static void unit_free(ABT_unit *unit)
Definition: fifo.c:472
ABT_unit_create_from_thread_fn u_create_from_thread
Definition: abt.h:422
static double get_cur_time(void)
Definition: fifo.c:9
static ABT_unit_type unit_get_type(ABT_unit unit)
Definition: fifo.c:409
static ABT_task unit_get_task(ABT_unit unit)
Definition: fifo.c:427
struct ABT_pool_config_opaque * ABT_pool_config
Definition: abt.h:269
ABT_unit_get_type_fn u_get_type
Definition: abt.h:418
ABT_pool_get_size_fn p_get_size
Definition: abt.h:428
static int pool_print_all(ABT_pool pool, void *arg, void(*print_fn)(void *, ABT_unit))
Definition: fifo.c:379
static int pool_init(ABT_pool pool, ABT_pool_config config)
Definition: fifo.c:114
#define ABT_TASK_NULL
Definition: abt.h:346
static void ABTU_free(void *ptr)
Definition: abtu.h:32
ABT_pool_remove_fn p_remove
Definition: abt.h:432
ABT_pool_print_all_fn p_print_all
Definition: abt.h:434
#define ABT_ERR_POOL
Definition: abt.h:98
static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
Definition: fifo.c:209
ABT_pool_pop_timedwait_fn p_pop_timedwait
Definition: abt.h:431
static data_t * pool_get_data_ptr(void *p_data)
Definition: fifo.c:57