ARGOBOTS  8bef54a19d7b3e50f977f85716578b627bf37d9c
fifo_wait.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 /* FIFO_WAIT pool implementation */
9 
10 static int pool_init(ABT_pool pool, ABT_pool_config config);
11 static int pool_free(ABT_pool pool);
12 static size_t pool_get_size(ABT_pool pool);
13 static void pool_push(ABT_pool pool, ABT_unit unit);
14 static ABT_unit pool_pop(ABT_pool pool);
15 static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs);
16 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs);
17 static int pool_remove(ABT_pool pool, ABT_unit unit);
18 static int pool_print_all(ABT_pool pool, void *arg,
19  void (*print_fn)(void *, ABT_unit));
20 static ABT_bool unit_is_in_pool(ABT_unit unit);
22 static void unit_free(ABT_unit *unit);
23 
24 struct data {
25  pthread_mutex_t mutex;
26  pthread_cond_t cond;
27  size_t num_threads;
30  ABTD_atomic_int is_empty; /* Whether the pool is empty or not. */
31 };
32 typedef struct data data_t;
33 
34 static inline data_t *pool_get_data_ptr(void *p_data)
35 {
36  return (data_t *)p_data;
37 }
38 
40  ABTI_pool_def *p_def)
41 {
42  p_def->access = access;
43  p_def->p_init = pool_init;
44  p_def->p_free = pool_free;
45  p_def->p_get_size = pool_get_size;
46  p_def->p_push = pool_push;
47  p_def->p_pop = pool_pop;
48  p_def->p_pop_wait = pool_pop_wait;
50  p_def->p_remove = pool_remove;
51  p_def->p_print_all = pool_print_all;
54  p_def->u_free = unit_free;
55 
56  return ABT_SUCCESS;
57 }
58 
59 /* Pool functions */
60 
61 static int pool_init(ABT_pool pool, ABT_pool_config config)
62 {
63  ABTI_UNUSED(config);
64  int ret, abt_errno = ABT_SUCCESS;
65  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
66 
67  data_t *p_data;
68  abt_errno = ABTU_malloc(sizeof(data_t), (void **)&p_data);
69  ABTI_CHECK_ERROR(abt_errno);
70 
71  ret = pthread_mutex_init(&p_data->mutex, NULL);
72  if (ret != 0) {
73  ABTU_free(p_data);
74  return ABT_ERR_SYS;
75  }
76  ret = pthread_cond_init(&p_data->cond, NULL);
77  if (ret != 0) {
78  pthread_mutex_destroy(&p_data->mutex);
79  ABTU_free(p_data);
80  return ABT_ERR_SYS;
81  }
82 
83  p_data->num_threads = 0;
84  p_data->p_head = NULL;
85  p_data->p_tail = NULL;
86  ABTD_atomic_relaxed_store_int(&p_data->is_empty, 1);
87 
88  p_pool->data = p_data;
89 
90  return abt_errno;
91 }
92 
93 static int pool_free(ABT_pool pool)
94 {
95  int abt_errno = ABT_SUCCESS;
96  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
97  data_t *p_data = pool_get_data_ptr(p_pool->data);
98 
99  pthread_mutex_destroy(&p_data->mutex);
100  pthread_cond_destroy(&p_data->cond);
101  ABTU_free(p_data);
102 
103  return abt_errno;
104 }
105 
106 static size_t pool_get_size(ABT_pool pool)
107 {
108  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
109  data_t *p_data = pool_get_data_ptr(p_pool->data);
110  return p_data->num_threads;
111 }
112 
113 static void pool_push(ABT_pool pool, ABT_unit unit)
114 {
115  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
116  data_t *p_data = pool_get_data_ptr(p_pool->data);
118 
119  pthread_mutex_lock(&p_data->mutex);
120  if (p_data->num_threads == 0) {
121  p_thread->p_prev = p_thread;
122  p_thread->p_next = p_thread;
123  p_data->p_head = p_thread;
124  p_data->p_tail = p_thread;
125  p_data->num_threads = 1;
127  } else {
128  ABTI_thread *p_head = p_data->p_head;
129  ABTI_thread *p_tail = p_data->p_tail;
130  p_tail->p_next = p_thread;
131  p_head->p_prev = p_thread;
132  p_thread->p_prev = p_tail;
133  p_thread->p_next = p_head;
134  p_data->p_tail = p_thread;
135  p_data->num_threads++;
136  }
137 
139  pthread_cond_signal(&p_data->cond);
140  pthread_mutex_unlock(&p_data->mutex);
141 }
142 
143 static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs)
144 {
145  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
146  data_t *p_data = pool_get_data_ptr(p_pool->data);
147  ABTI_thread *p_thread = NULL;
148  ABT_unit h_unit = ABT_UNIT_NULL;
149 
150  pthread_mutex_lock(&p_data->mutex);
151 
152  if (!p_data->num_threads) {
153 #if defined(ABT_CONFIG_USE_CLOCK_GETTIME)
154  struct timespec ts;
155  clock_gettime(CLOCK_REALTIME, &ts);
156  ts.tv_sec += (time_t)time_secs;
157  ts.tv_nsec += (long)((time_secs - (time_t)time_secs) * 1e9);
158  if (ts.tv_nsec > 1e9) {
159  ts.tv_sec += 1;
160  ts.tv_nsec -= 1e9;
161  }
162  pthread_cond_timedwait(&p_data->cond, &p_data->mutex, &ts);
163 #else
164  /* We cannot use pthread_cond_timedwait(). Let's use nanosleep()
165  * instead */
166  double start_time = ABTI_get_wtime();
167  while (ABTI_get_wtime() - start_time < time_secs) {
168  pthread_mutex_unlock(&p_data->mutex);
169  const int sleep_nsecs = 100;
170  struct timespec ts = { 0, sleep_nsecs };
171  nanosleep(&ts, NULL);
172  pthread_mutex_lock(&p_data->mutex);
173  if (p_data->num_threads > 0)
174  break;
175  }
176 #endif
177  }
178 
179  if (p_data->num_threads > 0) {
180  p_thread = p_data->p_head;
181  if (p_data->num_threads == 1) {
182  p_data->p_head = NULL;
183  p_data->p_tail = NULL;
184  p_data->num_threads = 0;
186  } else {
187  p_thread->p_prev->p_next = p_thread->p_next;
188  p_thread->p_next->p_prev = p_thread->p_prev;
189  p_data->p_head = p_thread->p_next;
190  p_data->num_threads--;
191  }
192 
193  p_thread->p_prev = NULL;
194  p_thread->p_next = NULL;
196 
197  h_unit = ABTI_unit_get_builtin_unit(p_thread);
198  }
199  pthread_mutex_unlock(&p_data->mutex);
200 
201  return h_unit;
202 }
203 
204 static inline void convert_double_sec_to_timespec(struct timespec *ts_out,
205  double seconds)
206 {
207  ts_out->tv_sec = (time_t)seconds;
208  ts_out->tv_nsec = (long)((seconds - ts_out->tv_sec) * 1000000000.0);
209 }
210 
211 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
212 {
213  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
214  data_t *p_data = pool_get_data_ptr(p_pool->data);
215  ABTI_thread *p_thread = NULL;
216  ABT_unit h_unit = ABT_UNIT_NULL;
217 
218  pthread_mutex_lock(&p_data->mutex);
219 
220  if (!p_data->num_threads) {
221  struct timespec ts;
222  convert_double_sec_to_timespec(&ts, abstime_secs);
223  pthread_cond_timedwait(&p_data->cond, &p_data->mutex, &ts);
224  }
225 
226  if (p_data->num_threads > 0) {
227  p_thread = p_data->p_head;
228  if (p_data->num_threads == 1) {
229  p_data->p_head = NULL;
230  p_data->p_tail = NULL;
231  p_data->num_threads = 0;
233  } else {
234  p_thread->p_prev->p_next = p_thread->p_next;
235  p_thread->p_next->p_prev = p_thread->p_prev;
236  p_data->p_head = p_thread->p_next;
237  p_data->num_threads--;
238  }
239 
240  p_thread->p_prev = NULL;
241  p_thread->p_next = NULL;
243 
244  h_unit = ABTI_unit_get_builtin_unit(p_thread);
245  }
246  pthread_mutex_unlock(&p_data->mutex);
247 
248  return h_unit;
249 }
250 
252 {
253  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
254  data_t *p_data = pool_get_data_ptr(p_pool->data);
255  ABTI_thread *p_thread = NULL;
256 
257  if (ABTD_atomic_acquire_load_int(&p_data->is_empty) == 0) {
258  ABT_unit h_unit = ABT_UNIT_NULL;
259  pthread_mutex_lock(&p_data->mutex);
260  if (p_data->num_threads > 0) {
261  p_thread = p_data->p_head;
262  if (p_data->num_threads == 1) {
263  p_data->p_head = NULL;
264  p_data->p_tail = NULL;
265  p_data->num_threads = 0;
267  } else {
268  p_thread->p_prev->p_next = p_thread->p_next;
269  p_thread->p_next->p_prev = p_thread->p_prev;
270  p_data->p_head = p_thread->p_next;
271  p_data->num_threads--;
272  }
273 
274  p_thread->p_prev = NULL;
275  p_thread->p_next = NULL;
277 
278  h_unit = ABTI_unit_get_builtin_unit(p_thread);
279  }
280  pthread_mutex_unlock(&p_data->mutex);
281  return h_unit;
282  } else {
283  return ABT_UNIT_NULL;
284  }
285 }
286 
287 static int pool_remove(ABT_pool pool, ABT_unit unit)
288 {
289  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
290  data_t *p_data = pool_get_data_ptr(p_pool->data);
292 
295  ABT_ERR_POOL);
296 
297  pthread_mutex_lock(&p_data->mutex);
298  if (p_data->num_threads == 1) {
299  p_data->p_head = NULL;
300  p_data->p_tail = NULL;
301  p_data->num_threads = 0;
303  } else {
304  p_thread->p_prev->p_next = p_thread->p_next;
305  p_thread->p_next->p_prev = p_thread->p_prev;
306  if (p_thread == p_data->p_head) {
307  p_data->p_head = p_thread->p_next;
308  } else if (p_thread == p_data->p_tail) {
309  p_data->p_tail = p_thread->p_prev;
310  }
311  p_data->num_threads--;
312  }
313 
315  pthread_mutex_unlock(&p_data->mutex);
316 
317  p_thread->p_prev = NULL;
318  p_thread->p_next = NULL;
319 
320  return ABT_SUCCESS;
321 }
322 
323 static int pool_print_all(ABT_pool pool, void *arg,
324  void (*print_fn)(void *, ABT_unit))
325 {
326  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
327  data_t *p_data = pool_get_data_ptr(p_pool->data);
328 
329  pthread_mutex_lock(&p_data->mutex);
330 
331  size_t num_threads = p_data->num_threads;
332  ABTI_thread *p_thread = p_data->p_head;
333  while (num_threads--) {
334  ABTI_ASSERT(p_thread);
335  ABT_unit unit = ABTI_unit_get_builtin_unit(p_thread);
336  print_fn(arg, unit);
337  p_thread = p_thread->p_next;
338  }
339 
340  pthread_mutex_unlock(&p_data->mutex);
341 
342  return ABT_SUCCESS;
343 }
344 
345 /* Unit functions */
346 
348 {
351  : ABT_FALSE;
352 }
353 
355 {
356  /* Call ABTI_unit_init_builtin() instead. */
357  ABTI_ASSERT(0);
358  return ABT_UNIT_NULL;
359 }
360 
361 static void unit_free(ABT_unit *unit)
362 {
363  /* A built-in unit does not need to be freed. This function may not be
364  * called. */
365  ABTI_ASSERT(0);
366 }
ABTI_pool_def::p_init
ABT_pool_init_fn p_init
Definition: abti.h:378
ABTI_pool_def::p_get_size
ABT_pool_get_size_fn p_get_size
Definition: abti.h:379
pool_get_data_ptr
static data_t * pool_get_data_ptr(void *p_data)
Definition: fifo_wait.c:34
ABT_ERR_SYS
#define ABT_ERR_SYS
Error code: error related to system calls and standard libraries.
Definition: abt.h:393
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1001
ABTD_atomic_int
Definition: abtd_atomic.h:15
ABTI_pool_def::p_remove
ABT_pool_remove_fn p_remove
Definition: abti.h:384
ABT_thread
struct ABT_thread_opaque * ABT_thread
Work unit handle type.
Definition: abt.h:890
ABTI_pool_def::p_print_all
ABT_pool_print_all_fn p_print_all
Definition: abti.h:386
ABTI_pool_def
Definition: abti.h:372
pool_free
static int pool_free(ABT_pool pool)
Definition: fifo_wait.c:93
ABTI_unit_get_builtin_unit
static ABT_unit ABTI_unit_get_builtin_unit(ABTI_thread *p_thread)
Definition: abti_unit.h:23
ABT_ERR_POOL
#define ABT_ERR_POOL
Error code: error related to a pool.
Definition: abt.h:282
ABTI_CHECK_ERROR
#define ABTI_CHECK_ERROR(abt_errno)
Definition: abti_error.h:136
data
Definition: fifo.c:29
ABTI_thread::p_next
ABTI_thread * p_next
Definition: abti.h:391
unit_is_in_pool
static ABT_bool unit_is_in_pool(ABT_unit unit)
Definition: fifo_wait.c:347
ABTI_unit_get_thread_from_builtin_unit
static ABTI_thread * ABTI_unit_get_thread_from_builtin_unit(ABT_unit unit)
Definition: abti_unit.h:37
ABTI_UNUSED
#define ABTI_UNUSED(a)
Definition: abti.h:120
ABTI_thread
Definition: abti.h:389
data::is_empty
ABTD_atomic_int is_empty
Definition: fifo.c:36
ABT_pool
struct ABT_pool_opaque * ABT_pool
Pool handle type.
Definition: abt.h:841
ABTI_pool_def::u_free
ABT_unit_free_fn u_free
Definition: abti.h:377
ABTI_pool_get_fifo_wait_def
ABTU_ret_err int ABTI_pool_get_fifo_wait_def(ABT_pool_access access, ABTI_pool_def *p_def)
Definition: fifo_wait.c:39
pool_pop_timedwait
static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
Definition: fifo_wait.c:211
ABTI_pool
Definition: abti.h:345
convert_double_sec_to_timespec
static void convert_double_sec_to_timespec(struct timespec *ts_out, double seconds)
Definition: fifo_wait.c:204
ABTI_pool_def::u_is_in_pool
ABT_unit_is_in_pool_fn u_is_in_pool
Definition: abti.h:375
abti.h
ABTI_pool::data
void * data
Definition: abti.h:352
ABT_pool_config
struct ABT_pool_config_opaque * ABT_pool_config
Pool configuration handle type.
Definition: abt.h:848
ABTU_malloc
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:235
ABTD_atomic_relaxed_store_int
static void ABTD_atomic_relaxed_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:996
pool_print_all
static int pool_print_all(ABT_pool pool, void *arg, void(*print_fn)(void *, ABT_unit))
Definition: fifo_wait.c:323
ABT_unit
struct ABT_unit_opaque * ABT_unit
Work unit handle type for scheduling.
Definition: abt.h:869
ABTI_pool_def::u_create_from_thread
ABT_unit_create_from_thread_fn u_create_from_thread
Definition: abti.h:376
ABTI_ASSERT
#define ABTI_ASSERT(cond)
Definition: abti_error.h:12
unit_create_from_thread
static ABT_unit unit_create_from_thread(ABT_thread thread)
Definition: fifo_wait.c:354
ABTI_pool_def::p_push
ABT_pool_push_fn p_push
Definition: abti.h:380
ABTI_pool_def::p_pop_timedwait
ABT_pool_pop_timedwait_fn p_pop_timedwait
Definition: abti.h:383
ABTI_thread::is_in_pool
ABTD_atomic_int is_in_pool
Definition: abti.h:392
pool_push
static void pool_push(ABT_pool pool, ABT_unit unit)
Definition: fifo_wait.c:113
unit_free
static void unit_free(ABT_unit *unit)
Definition: fifo_wait.c:361
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
data::mutex
pthread_mutex_t mutex
Definition: fifo_wait.c:25
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:155
ABTI_pool_def::p_free
ABT_pool_free_fn p_free
Definition: abti.h:385
ABTI_pool_def::p_pop_wait
ABT_pool_pop_wait_fn p_pop_wait
Definition: abti.h:382
ABTD_atomic_acquire_load_int
static int ABTD_atomic_acquire_load_int(const ABTD_atomic_int *ptr)
Definition: abtd_atomic.h:878
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:748
ABTI_pool_get_ptr
static ABTI_pool * ABTI_pool_get_ptr(ABT_pool pool)
Definition: abti_pool.h:11
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:750
ABTI_pool_def::p_pop
ABT_pool_pop_fn p_pop
Definition: abti.h:381
pool_get_size
static size_t pool_get_size(ABT_pool pool)
Definition: fifo_wait.c:106
ABTU_free
static void ABTU_free(void *ptr)
Definition: abtu.h:228
data::num_threads
size_t num_threads
Definition: fifo.c:31
pool_pop_wait
static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs)
Definition: fifo_wait.c:143
data::mutex
ABTD_spinlock mutex
Definition: fifo.c:30
ABTI_thread::p_prev
ABTI_thread * p_prev
Definition: abti.h:390
pool_pop
static ABT_unit pool_pop(ABT_pool pool)
Definition: fifo_wait.c:251
ABTI_CHECK_TRUE
#define ABTI_CHECK_TRUE(cond, abt_errno)
Definition: abti_error.h:146
data::cond
pthread_cond_t cond
Definition: fifo_wait.c:26
ABTI_pool_def::access
ABT_pool_access access
Definition: abti.h:373
ABT_UNIT_NULL
#define ABT_UNIT_NULL
Definition: abt.h:1061
ABTD_atomic_release_store_int
static void ABTD_atomic_release_store_int(ABTD_atomic_int *ptr, int val)
Definition: abtd_atomic.h:1065
data::p_tail
ABTI_thread * p_tail
Definition: fifo.c:33
ABTI_get_wtime
static double ABTI_get_wtime(void)
Definition: abti_timer.h:11
data::p_head
ABTI_thread * p_head
Definition: fifo.c:32
pool_remove
static int pool_remove(ABT_pool pool, ABT_unit unit)
Definition: fifo_wait.c:287
ABT_pool_access
ABT_pool_access
Pool access type.
Definition: abt.h:522
pool_init
static int pool_init(ABT_pool pool, ABT_pool_config config)
Definition: fifo_wait.c:61