ARGOBOTS  dce6e727ffc4ca5b3ffc04cb9517c6689be51ec5
fifo_wait.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 #include "thread_queue.h"
8 
9 /* FIFO_WAIT pool implementation */
10 
11 static int pool_init(ABT_pool pool, ABT_pool_config config);
12 static void pool_free(ABT_pool pool);
13 static ABT_bool pool_is_empty(ABT_pool pool);
14 static size_t pool_get_size(ABT_pool pool);
15 static void pool_push(ABT_pool pool, ABT_unit unit, ABT_pool_context context);
16 static ABT_thread pool_pop(ABT_pool pool, ABT_pool_context context);
17 static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs,
18  ABT_pool_context context);
19 static void pool_push_many(ABT_pool pool, const ABT_unit *units,
20  size_t num_units, ABT_pool_context context);
21 static void pool_pop_many(ABT_pool pool, ABT_thread *threads,
22  size_t max_threads, size_t *num_popped,
23  ABT_pool_context context);
24 static void pool_print_all(ABT_pool pool, void *arg,
25  void (*print_fn)(void *, ABT_thread));
26 static ABT_unit pool_create_unit(ABT_pool pool, ABT_thread thread);
27 static void pool_free_unit(ABT_pool pool, ABT_unit unit);
28 
29 /* For backward compatibility */
30 static int pool_remove(ABT_pool pool, ABT_unit unit);
31 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs);
33 
34 struct data {
35  pthread_mutex_t mutex;
36  pthread_cond_t cond;
37  thread_queue_t queue;
38 };
39 typedef struct data data_t;
40 
41 static inline data_t *pool_get_data_ptr(void *p_data)
42 {
43  return (data_t *)p_data;
44 }
45 
46 ABTU_ret_err int
47 ABTI_pool_get_fifo_wait_def(ABT_pool_access access,
48  ABTI_pool_required_def *p_required_def,
49  ABTI_pool_optional_def *p_optional_def,
50  ABTI_pool_deprecated_def *p_deprecated_def)
51 {
52  p_optional_def->p_init = pool_init;
53  p_optional_def->p_free = pool_free;
54  p_required_def->p_is_empty = pool_is_empty;
55  p_optional_def->p_get_size = pool_get_size;
56  p_required_def->p_push = pool_push;
57  p_required_def->p_pop = pool_pop;
58  p_optional_def->p_pop_wait = pool_pop_wait;
59  p_optional_def->p_push_many = pool_push_many;
60  p_optional_def->p_pop_many = pool_pop_many;
61  p_optional_def->p_print_all = pool_print_all;
62  p_required_def->p_create_unit = pool_create_unit;
63  p_required_def->p_free_unit = pool_free_unit;
64 
65  p_deprecated_def->p_pop_timedwait = pool_pop_timedwait;
66  p_deprecated_def->u_is_in_pool = pool_unit_is_in_pool;
67  p_deprecated_def->p_remove = pool_remove;
68  return ABT_SUCCESS;
69 }
70 
71 /* Pool functions */
72 
73 static int pool_init(ABT_pool pool, ABT_pool_config config)
74 {
75  ABTI_UNUSED(config);
76  int ret, abt_errno = ABT_SUCCESS;
77  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
78 
79  data_t *p_data;
80  abt_errno = ABTU_malloc(sizeof(data_t), (void **)&p_data);
81  ABTI_CHECK_ERROR(abt_errno);
82 
83  ret = pthread_mutex_init(&p_data->mutex, NULL);
84  if (ret != 0) {
85  ABTU_free(p_data);
86  return ABT_ERR_SYS;
87  }
88  ret = pthread_cond_init(&p_data->cond, NULL);
89  if (ret != 0) {
90  pthread_mutex_destroy(&p_data->mutex);
91  ABTU_free(p_data);
92  return ABT_ERR_SYS;
93  }
94  thread_queue_init(&p_data->queue);
95 
96  p_pool->data = p_data;
97  return abt_errno;
98 }
99 
100 static void pool_free(ABT_pool pool)
101 {
102  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
103  data_t *p_data = pool_get_data_ptr(p_pool->data);
104  thread_queue_free(&p_data->queue);
105  pthread_mutex_destroy(&p_data->mutex);
106  pthread_cond_destroy(&p_data->cond);
107  ABTU_free(p_data);
108 }
109 
111 {
112  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
113  data_t *p_data = pool_get_data_ptr(p_pool->data);
114  return thread_queue_is_empty(&p_data->queue);
115 }
116 
117 static size_t pool_get_size(ABT_pool pool)
118 {
119  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
120  data_t *p_data = pool_get_data_ptr(p_pool->data);
121  return thread_queue_get_size(&p_data->queue);
122 }
123 
124 static void pool_push(ABT_pool pool, ABT_unit unit, ABT_pool_context context)
125 {
126  (void)context;
127  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
128  data_t *p_data = pool_get_data_ptr(p_pool->data);
129  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
130 
131  pthread_mutex_lock(&p_data->mutex);
132  thread_queue_push_tail(&p_data->queue, p_thread);
133  pthread_cond_signal(&p_data->cond);
134  pthread_mutex_unlock(&p_data->mutex);
135 }
136 
137 static void pool_push_many(ABT_pool pool, const ABT_unit *units,
138  size_t num_units, ABT_pool_context context)
139 {
140  (void)context;
141  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
142  data_t *p_data = pool_get_data_ptr(p_pool->data);
143 
144  if (num_units > 0) {
145  pthread_mutex_lock(&p_data->mutex);
146  size_t i;
147  for (i = 0; i < num_units; i++) {
148  ABTI_thread *p_thread =
149  ABTI_unit_get_thread_from_builtin_unit(units[i]);
150  thread_queue_push_tail(&p_data->queue, p_thread);
151  }
152  if (num_units == 1) {
153  /* Wake up a single waiter. */
154  pthread_cond_signal(&p_data->cond);
155  } else {
156  /* Wake up all the waiters. */
157  pthread_cond_broadcast(&p_data->cond);
158  }
159  pthread_mutex_unlock(&p_data->mutex);
160  }
161 }
162 
163 static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs,
164  ABT_pool_context context)
165 {
166  (void)context;
167  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
168  data_t *p_data = pool_get_data_ptr(p_pool->data);
169  pthread_mutex_lock(&p_data->mutex);
170  if (thread_queue_is_empty(&p_data->queue)) {
171 #if defined(ABT_CONFIG_USE_CLOCK_GETTIME)
172  struct timespec ts;
173  clock_gettime(CLOCK_REALTIME, &ts);
174  ts.tv_sec += (time_t)time_secs;
175  ts.tv_nsec += (long)((time_secs - (time_t)time_secs) * 1e9);
176  if (ts.tv_nsec > 1e9) {
177  ts.tv_sec += 1;
178  ts.tv_nsec -= 1e9;
179  }
180  pthread_cond_timedwait(&p_data->cond, &p_data->mutex, &ts);
181 #else
182  /* We cannot use pthread_cond_timedwait(). Let's use nanosleep()
183  * instead */
184  double start_time = ABTI_get_wtime();
185  while (ABTI_get_wtime() - start_time < time_secs) {
186  pthread_mutex_unlock(&p_data->mutex);
187  const int sleep_nsecs = 100;
188  struct timespec ts = { 0, sleep_nsecs };
189  nanosleep(&ts, NULL);
190  pthread_mutex_lock(&p_data->mutex);
191  if (p_data->num_threads > 0)
192  break;
193  }
194 #endif
195  }
196  ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
197  pthread_mutex_unlock(&p_data->mutex);
198  return ABTI_thread_get_handle(p_thread);
199 }
200 
201 static inline void convert_double_sec_to_timespec(struct timespec *ts_out,
202  double seconds)
203 {
204  ts_out->tv_sec = (time_t)seconds;
205  ts_out->tv_nsec = (long)((seconds - ts_out->tv_sec) * 1000000000.0);
206 }
207 
208 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
209 {
210  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
211  data_t *p_data = pool_get_data_ptr(p_pool->data);
212  pthread_mutex_lock(&p_data->mutex);
213  if (thread_queue_is_empty(&p_data->queue)) {
214  struct timespec ts;
215  convert_double_sec_to_timespec(&ts, abstime_secs);
216  pthread_cond_timedwait(&p_data->cond, &p_data->mutex, &ts);
217  }
218  ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
219  pthread_mutex_unlock(&p_data->mutex);
220  if (p_thread) {
221  return ABTI_unit_get_builtin_unit(p_thread);
222  } else {
223  return ABT_UNIT_NULL;
224  }
225 }
226 
228 {
229  (void)context;
230  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
231  data_t *p_data = pool_get_data_ptr(p_pool->data);
232  if (!thread_queue_is_empty(&p_data->queue)) {
233  pthread_mutex_lock(&p_data->mutex);
234  ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
235  pthread_mutex_unlock(&p_data->mutex);
236  return ABTI_thread_get_handle(p_thread);
237  } else {
238  return ABT_THREAD_NULL;
239  }
240 }
241 
242 static void pool_pop_many(ABT_pool pool, ABT_thread *threads,
243  size_t max_threads, size_t *num_popped,
244  ABT_pool_context context)
245 {
246  (void)context;
247  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
248  data_t *p_data = pool_get_data_ptr(p_pool->data);
249  if (max_threads != 0 && !thread_queue_is_empty(&p_data->queue)) {
250  pthread_mutex_lock(&p_data->mutex);
251  size_t i;
252  for (i = 0; i < max_threads; i++) {
253  ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
254  if (!p_thread)
255  break;
256  threads[i] = ABTI_thread_get_handle(p_thread);
257  }
258  *num_popped = i;
259  pthread_mutex_unlock(&p_data->mutex);
260  } else {
261  *num_popped = 0;
262  }
263 }
264 
265 static int pool_remove(ABT_pool pool, ABT_unit unit)
266 {
267  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
268  data_t *p_data = pool_get_data_ptr(p_pool->data);
269  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
270 
271  ABTI_CHECK_TRUE(!thread_queue_is_empty(&p_data->queue), ABT_ERR_POOL);
272  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) == 1,
273  ABT_ERR_POOL);
274 
275  pthread_mutex_lock(&p_data->mutex);
276  int abt_errno = thread_queue_remove(&p_data->queue, p_thread);
277  pthread_mutex_unlock(&p_data->mutex);
278  return abt_errno;
279 }
280 
281 static void pool_print_all(ABT_pool pool, void *arg,
282  void (*print_fn)(void *, ABT_thread))
283 {
284  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
285  data_t *p_data = pool_get_data_ptr(p_pool->data);
286 
287  pthread_mutex_lock(&p_data->mutex);
288  thread_queue_print_all(&p_data->queue, arg, print_fn);
289  pthread_mutex_unlock(&p_data->mutex);
290 }
291 
292 /* Unit functions */
293 
295 {
296  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
297  return ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) ? ABT_TRUE
298  : ABT_FALSE;
299 }
300 
302 {
303  /* Call ABTI_unit_init_builtin() instead. */
304  ABTI_ASSERT(0);
305  return ABT_UNIT_NULL;
306 }
307 
308 static void pool_free_unit(ABT_pool pool, ABT_unit unit)
309 {
310  /* A built-in unit does not need to be freed. This function may not be
311  * called. */
312  ABTI_ASSERT(0);
313 }
pool_get_data_ptr
static data_t * pool_get_data_ptr(void *p_data)
Definition: fifo_wait.c:41
pool_create_unit
static ABT_unit pool_create_unit(ABT_pool pool, ABT_thread thread)
Definition: fifo_wait.c:301
ABT_ERR_SYS
#define ABT_ERR_SYS
Error code: error related to system calls and standard libraries.
Definition: abt.h:403
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1043
thread_queue_free
static void thread_queue_free(thread_queue_t *p_queue)
Definition: thread_queue.h:29
ABT_pool_context
uint64_t ABT_pool_context
A pool context value.
Definition: abt.h:1566
ABT_thread
struct ABT_thread_opaque * ABT_thread
Work unit handle type.
Definition: abt.h:932
ABT_ERR_POOL
#define ABT_ERR_POOL
Error code: error related to a pool.
Definition: abt.h:292
ABT_THREAD_NULL
#define ABT_THREAD_NULL
Definition: abt.h:1105
ABT_pool
struct ABT_pool_opaque * ABT_pool
Pool handle type.
Definition: abt.h:878
pool_pop_timedwait
static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
Definition: fifo_wait.c:208
convert_double_sec_to_timespec
static void convert_double_sec_to_timespec(struct timespec *ts_out, double seconds)
Definition: fifo_wait.c:201
abti.h
ABT_pool_config
struct ABT_pool_config_opaque * ABT_pool_config
Pool configuration handle type.
Definition: abt.h:885
ABTU_malloc
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:235
pool_push
static void pool_push(ABT_pool pool, ABT_unit unit, ABT_pool_context context)
Definition: fifo_wait.c:124
ABT_unit
struct ABT_unit_opaque * ABT_unit
Work unit handle type for scheduling.
Definition: abt.h:911
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
pool_free
static void pool_free(ABT_pool pool)
Definition: fifo_wait.c:100
thread_queue_is_empty
static ABT_bool thread_queue_is_empty(const thread_queue_t *p_queue)
Definition: thread_queue.h:58
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:155
pool_pop
static ABT_thread pool_pop(ABT_pool pool, ABT_pool_context context)
Definition: fifo_wait.c:227
thread_queue_remove
static ABTU_ret_err int thread_queue_remove(thread_queue_t *p_queue, ABTI_thread *p_thread)
Definition: thread_queue.h:165
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:784
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:786
thread_queue_t
Definition: thread_queue.h:12
pool_get_size
static size_t pool_get_size(ABT_pool pool)
Definition: fifo_wait.c:117
ABTU_free
static void ABTU_free(void *ptr)
Definition: abtu.h:228
data_t
struct data data_t
Definition: fifo.c:49
thread_queue_push_tail
static void thread_queue_push_tail(thread_queue_t *p_queue, ABTI_thread *p_thread)
Definition: thread_queue.h:92
pool_free_unit
static void pool_free_unit(ABT_pool pool, ABT_unit unit)
Definition: fifo_wait.c:308
pool_pop_wait
static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs, ABT_pool_context context)
Definition: fifo_wait.c:163
pool_pop_many
static void pool_pop_many(ABT_pool pool, ABT_thread *threads, size_t max_threads, size_t *num_popped, ABT_pool_context context)
Definition: fifo_wait.c:242
thread_queue.h
thread_queue_pop_head
static ABTI_thread * thread_queue_pop_head(thread_queue_t *p_queue)
Definition: thread_queue.h:115
ABT_UNIT_NULL
#define ABT_UNIT_NULL
Definition: abt.h:1104
thread_queue_print_all
static void thread_queue_print_all(const thread_queue_t *p_queue, void *arg, void(*print_fn)(void *, ABT_thread))
Definition: thread_queue.h:193
pool_push_many
static void pool_push_many(ABT_pool pool, const ABT_unit *units, size_t num_units, ABT_pool_context context)
Definition: fifo_wait.c:137
pool_unit_is_in_pool
static ABT_bool pool_unit_is_in_pool(ABT_unit unit)
Definition: fifo_wait.c:294
thread_queue_init
static void thread_queue_init(thread_queue_t *p_queue)
Definition: thread_queue.h:21
pool_print_all
static void pool_print_all(ABT_pool pool, void *arg, void(*print_fn)(void *, ABT_thread))
Definition: fifo_wait.c:281
thread_queue_get_size
static size_t thread_queue_get_size(const thread_queue_t *p_queue)
Definition: thread_queue.h:64
pool_remove
static int pool_remove(ABT_pool pool, ABT_unit unit)
Definition: fifo_wait.c:265
ABT_pool_access
ABT_pool_access
Pool access type.
Definition: abt.h:556
pool_is_empty
static ABT_bool pool_is_empty(ABT_pool pool)
Definition: fifo_wait.c:110
pool_init
static int pool_init(ABT_pool pool, ABT_pool_config config)
Definition: fifo_wait.c:73