ARGOBOTS
fifo_wait.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 /* FIFO_WAIT pool implementation */
9 
10 static int pool_init(ABT_pool pool, ABT_pool_config config);
11 static int pool_free(ABT_pool pool);
12 static size_t pool_get_size(ABT_pool pool);
13 static void pool_push(ABT_pool pool, ABT_unit unit);
14 static ABT_unit pool_pop(ABT_pool pool);
15 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs);
16 static int pool_remove(ABT_pool pool, ABT_unit unit);
17 static int pool_print_all(ABT_pool pool, void *arg,
18  void (*print_fn)(void *, ABT_unit));
19 
20 typedef ABTI_unit unit_t;
23 static ABT_task unit_get_task(ABT_unit unit);
24 static ABT_bool unit_is_in_pool(ABT_unit unit);
27 static void unit_free(ABT_unit *unit);
28 
29 struct data {
30  pthread_mutex_t mutex;
31  pthread_cond_t cond;
32  size_t num_units;
33  unit_t *p_head;
34  unit_t *p_tail;
35 };
36 typedef struct data data_t;
37 
38 static inline data_t *pool_get_data_ptr(void *p_data)
39 {
40  return (data_t *)p_data;
41 }
42 
43 int ABTI_pool_get_fifo_wait_def(ABT_pool_access access, ABT_pool_def *p_def)
44 {
45  p_def->access = access;
46  p_def->p_init = pool_init;
47  p_def->p_free = pool_free;
48  p_def->p_get_size = pool_get_size;
49  p_def->p_push = pool_push;
50  p_def->p_pop = pool_pop;
52  p_def->p_remove = pool_remove;
53  p_def->p_print_all = pool_print_all;
54  p_def->u_get_type = unit_get_type;
56  p_def->u_get_task = unit_get_task;
60  p_def->u_free = unit_free;
61 
62  return ABT_SUCCESS;
63 }
64 
65 /* Pool functions */
66 
68 {
69  ABTI_UNUSED(config);
70  int abt_errno = ABT_SUCCESS;
71  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
72 
73  data_t *p_data = (data_t *)ABTU_malloc(sizeof(data_t));
74 
75  pthread_mutex_init(&p_data->mutex, NULL);
76  pthread_cond_init(&p_data->cond, NULL);
77 
78  p_data->num_units = 0;
79  p_data->p_head = NULL;
80  p_data->p_tail = NULL;
81 
82  p_pool->data = p_data;
83 
84  return abt_errno;
85 }
86 
87 static int pool_free(ABT_pool pool)
88 {
89  int abt_errno = ABT_SUCCESS;
90  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
91  data_t *p_data = pool_get_data_ptr(p_pool->data);
92 
93  pthread_mutex_destroy(&p_data->mutex);
94  pthread_cond_destroy(&p_data->cond);
95  ABTU_free(p_data);
96 
97  return abt_errno;
98 }
99 
100 static size_t pool_get_size(ABT_pool pool)
101 {
102  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
103  data_t *p_data = pool_get_data_ptr(p_pool->data);
104  return p_data->num_units;
105 }
106 
107 static void pool_push(ABT_pool pool, ABT_unit unit)
108 {
109  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
110  data_t *p_data = pool_get_data_ptr(p_pool->data);
111  unit_t *p_unit = (unit_t *)unit;
112 
113  pthread_mutex_lock(&p_data->mutex);
114  if (p_data->num_units == 0) {
115  p_unit->p_prev = p_unit;
116  p_unit->p_next = p_unit;
117  p_data->p_head = p_unit;
118  p_data->p_tail = p_unit;
119  } else {
120  unit_t *p_head = p_data->p_head;
121  unit_t *p_tail = p_data->p_tail;
122  p_tail->p_next = p_unit;
123  p_head->p_prev = p_unit;
124  p_unit->p_prev = p_tail;
125  p_unit->p_next = p_head;
126  p_data->p_tail = p_unit;
127  }
128  p_data->num_units++;
129 
130  ABTD_atomic_release_store_int(&p_unit->is_in_pool, 1);
131  pthread_cond_signal(&p_data->cond);
132  pthread_mutex_unlock(&p_data->mutex);
133 }
134 
135 static inline void convert_double_sec_to_timespec(struct timespec *ts_out,
136  double seconds)
137 {
138  ts_out->tv_sec = (time_t)seconds;
139  ts_out->tv_nsec = (long)((seconds - ts_out->tv_sec) * 1000000000.0);
140 }
141 
142 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
143 {
144  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
145  data_t *p_data = pool_get_data_ptr(p_pool->data);
146  unit_t *p_unit = NULL;
147  ABT_unit h_unit = ABT_UNIT_NULL;
148 
149  pthread_mutex_lock(&p_data->mutex);
150 
151  if (!p_data->num_units) {
152  struct timespec ts;
153  convert_double_sec_to_timespec(&ts, abstime_secs);
154  pthread_cond_timedwait(&p_data->cond, &p_data->mutex, &ts);
155  }
156 
157  if (p_data->num_units > 0) {
158  p_unit = p_data->p_head;
159  if (p_data->num_units == 1) {
160  p_data->p_head = NULL;
161  p_data->p_tail = NULL;
162  } else {
163  p_unit->p_prev->p_next = p_unit->p_next;
164  p_unit->p_next->p_prev = p_unit->p_prev;
165  p_data->p_head = p_unit->p_next;
166  }
167  p_data->num_units--;
168 
169  p_unit->p_prev = NULL;
170  p_unit->p_next = NULL;
171  ABTD_atomic_release_store_int(&p_unit->is_in_pool, 0);
172 
173  h_unit = (ABT_unit)p_unit;
174  }
175  pthread_mutex_unlock(&p_data->mutex);
176 
177  return h_unit;
178 }
179 
181 {
182  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
183  data_t *p_data = pool_get_data_ptr(p_pool->data);
184  unit_t *p_unit = NULL;
185  ABT_unit h_unit = ABT_UNIT_NULL;
186 
187  pthread_mutex_lock(&p_data->mutex);
188  if (p_data->num_units > 0) {
189  p_unit = p_data->p_head;
190  if (p_data->num_units == 1) {
191  p_data->p_head = NULL;
192  p_data->p_tail = NULL;
193  } else {
194  p_unit->p_prev->p_next = p_unit->p_next;
195  p_unit->p_next->p_prev = p_unit->p_prev;
196  p_data->p_head = p_unit->p_next;
197  }
198  p_data->num_units--;
199 
200  p_unit->p_prev = NULL;
201  p_unit->p_next = NULL;
202  ABTD_atomic_release_store_int(&p_unit->is_in_pool, 0);
203 
204  h_unit = (ABT_unit)p_unit;
205  }
206  pthread_mutex_unlock(&p_data->mutex);
207 
208  return h_unit;
209 }
210 
211 static int pool_remove(ABT_pool pool, ABT_unit unit)
212 {
213  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
214  data_t *p_data = pool_get_data_ptr(p_pool->data);
215  unit_t *p_unit = (unit_t *)unit;
216 
217  ABTI_CHECK_TRUE_RET(p_data->num_units != 0, ABT_ERR_POOL);
218  ABTI_CHECK_TRUE_RET(ABTD_atomic_acquire_load_int(&p_unit->is_in_pool) == 1,
219  ABT_ERR_POOL);
220 
221  pthread_mutex_lock(&p_data->mutex);
222  if (p_data->num_units == 1) {
223  p_data->p_head = NULL;
224  p_data->p_tail = NULL;
225  } else {
226  p_unit->p_prev->p_next = p_unit->p_next;
227  p_unit->p_next->p_prev = p_unit->p_prev;
228  if (p_unit == p_data->p_head) {
229  p_data->p_head = p_unit->p_next;
230  } else if (p_unit == p_data->p_tail) {
231  p_data->p_tail = p_unit->p_prev;
232  }
233  }
234  p_data->num_units--;
235 
236  ABTD_atomic_release_store_int(&p_unit->is_in_pool, 0);
237  pthread_mutex_unlock(&p_data->mutex);
238 
239  p_unit->p_prev = NULL;
240  p_unit->p_next = NULL;
241 
242  return ABT_SUCCESS;
243 }
244 
245 static int pool_print_all(ABT_pool pool, void *arg,
246  void (*print_fn)(void *, ABT_unit))
247 {
248  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
249  data_t *p_data = pool_get_data_ptr(p_pool->data);
250 
251  pthread_mutex_lock(&p_data->mutex);
252 
253  size_t num_units = p_data->num_units;
254  unit_t *p_unit = p_data->p_head;
255  while (num_units--) {
256  ABTI_ASSERT(p_unit);
257  ABT_unit unit = (ABT_unit)p_unit;
258  print_fn(arg, unit);
259  p_unit = p_unit->p_next;
260  }
261 
262  pthread_mutex_unlock(&p_data->mutex);
263 
264  return ABT_SUCCESS;
265 }
266 
267 /* Unit functions */
268 
270 {
271  unit_t *p_unit = (unit_t *)unit;
272  return p_unit->type;
273 }
274 
276 {
277  ABT_thread h_thread;
278  unit_t *p_unit = (unit_t *)unit;
279  if (p_unit->type == ABT_UNIT_TYPE_THREAD) {
280  h_thread = p_unit->handle.thread;
281  } else {
282  h_thread = ABT_THREAD_NULL;
283  }
284  return h_thread;
285 }
286 
288 {
289  ABT_task h_task;
290  unit_t *p_unit = (unit_t *)unit;
291  if (p_unit->type == ABT_UNIT_TYPE_TASK) {
292  h_task = p_unit->handle.task;
293  } else {
294  h_task = ABT_TASK_NULL;
295  }
296  return h_task;
297 }
298 
300 {
301  unit_t *p_unit = (unit_t *)unit;
302  return ABTD_atomic_acquire_load_int(&p_unit->is_in_pool) ? ABT_TRUE
303  : ABT_FALSE;
304 }
305 
307 {
308  ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
309  unit_t *p_unit = &p_thread->unit_def;
310  p_unit->p_prev = NULL;
311  p_unit->p_next = NULL;
312  ABTD_atomic_relaxed_store_int(&p_unit->is_in_pool, 0);
313  p_unit->handle.thread = thread;
314  p_unit->type = ABT_UNIT_TYPE_THREAD;
315 
316  return (ABT_unit)p_unit;
317 }
318 
320 {
321  ABTI_task *p_task = ABTI_task_get_ptr(task);
322  unit_t *p_unit = &p_task->unit_def;
323  p_unit->p_prev = NULL;
324  p_unit->p_next = NULL;
325  ABTD_atomic_relaxed_store_int(&p_unit->is_in_pool, 0);
326  p_unit->handle.task = task;
327  p_unit->type = ABT_UNIT_TYPE_TASK;
328 
329  return (ABT_unit)p_unit;
330 }
331 
332 static void unit_free(ABT_unit *unit)
333 {
334  *unit = ABT_UNIT_NULL;
335 }
static ABT_unit unit_create_from_thread(ABT_thread thread)
Definition: fifo_wait.c:306
struct ABT_unit_opaque * ABT_unit
Definition: abt.h:275
ABT_unit_get_task_fn u_get_task
Definition: abt.h:420
ABT_pool_init_fn p_init
Definition: abt.h:427
#define ABT_UNIT_NULL
Definition: abt.h:343
struct ABT_task_opaque * ABT_task
Definition: abt.h:289
static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
Definition: fifo_wait.c:142
static void * ABTU_malloc(size_t size)
Definition: abtu.h:39
int ABT_bool
Definition: abt.h:309
ABT_unit_get_thread_fn u_get_thread
Definition: abt.h:419
ABT_pool_pop_fn p_pop
Definition: abt.h:430
static size_t pool_get_size(ABT_pool pool)
Definition: fifo_wait.c:100
static int pool_print_all(ABT_pool pool, void *arg, void(*print_fn)(void *, ABT_unit))
Definition: fifo_wait.c:245
ABT_pool_access access
Definition: abt.h:415
static ABT_unit unit_create_from_task(ABT_task task)
Definition: fifo_wait.c:319
struct ABT_pool_opaque * ABT_pool
Definition: abt.h:267
ABT_unit_is_in_pool_fn u_is_in_pool
Definition: abt.h:421
ABTI_unit unit_t
Definition: fifo_wait.c:20
ABT_pool_push_fn p_push
Definition: abt.h:429
static ABT_unit_type unit_get_type(ABT_unit unit)
Definition: fifo_wait.c:269
#define ABT_FALSE
Definition: abt.h:224
static ABT_unit pool_pop(ABT_pool pool)
Definition: fifo_wait.c:180
static int pool_remove(ABT_pool pool, ABT_unit unit)
Definition: fifo_wait.c:211
static int pool_init(ABT_pool pool, ABT_pool_config config)
Definition: fifo_wait.c:67
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:279
static data_t * pool_get_data_ptr(void *p_data)
Definition: fifo_wait.c:38
static int pool_free(ABT_pool pool)
Definition: fifo_wait.c:87
struct data data_t
Definition: fifo.c:55
ABT_pool_free_fn p_free
Definition: abt.h:433
#define ABT_SUCCESS
Definition: abt.h:64
ABT_unit_free_fn u_free
Definition: abt.h:424
ABT_pool_access
Definition: abt.h:162
#define ABT_TRUE
Definition: abt.h:223
ABT_unit_type
Definition: abt.h:170
static void unit_free(ABT_unit *unit)
Definition: fifo_wait.c:332
static void convert_double_sec_to_timespec(struct timespec *ts_out, double seconds)
Definition: fifo_wait.c:135
static ABT_task unit_get_task(ABT_unit unit)
Definition: fifo_wait.c:287
ABT_unit_create_from_task_fn u_create_from_task
Definition: abt.h:423
#define ABT_THREAD_NULL
Definition: abt.h:344
ABT_unit_create_from_thread_fn u_create_from_thread
Definition: abt.h:422
static ABT_bool unit_is_in_pool(ABT_unit unit)
Definition: fifo_wait.c:299
static ABT_thread unit_get_thread(ABT_unit unit)
Definition: fifo_wait.c:275
struct ABT_pool_config_opaque * ABT_pool_config
Definition: abt.h:269
ABT_unit_get_type_fn u_get_type
Definition: abt.h:418
ABT_pool_get_size_fn p_get_size
Definition: abt.h:428
#define ABT_TASK_NULL
Definition: abt.h:346
static void ABTU_free(void *ptr)
Definition: abtu.h:32
ABT_pool_remove_fn p_remove
Definition: abt.h:432
ABT_pool_print_all_fn p_print_all
Definition: abt.h:434
#define ABT_ERR_POOL
Definition: abt.h:98
static void pool_push(ABT_pool pool, ABT_unit unit)
Definition: fifo_wait.c:107
ABT_pool_pop_timedwait_fn p_pop_timedwait
Definition: abt.h:431