ARGOBOTS  dce6e727ffc4ca5b3ffc04cb9517c6689be51ec5
randws.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 #include "thread_queue.h"
8 #include <time.h>
9 
10 /* RANDWS pool implementation */
11 
12 static int pool_init(ABT_pool pool, ABT_pool_config config);
13 static void pool_free(ABT_pool pool);
14 static ABT_bool pool_is_empty(ABT_pool pool);
15 static size_t pool_get_size(ABT_pool pool);
16 static void pool_push_shared(ABT_pool pool, ABT_unit unit,
17  ABT_pool_context context);
18 static void pool_push_private(ABT_pool pool, ABT_unit unit,
19  ABT_pool_context context);
22 static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs,
23  ABT_pool_context context);
24 static void pool_push_many_shared(ABT_pool pool, const ABT_unit *units,
25  size_t num_units, ABT_pool_context context);
26 static void pool_push_many_private(ABT_pool pool, const ABT_unit *units,
27  size_t num_units, ABT_pool_context context);
28 static void pool_pop_many_shared(ABT_pool pool, ABT_thread *threads,
29  size_t max_threads, size_t *num_popped,
30  ABT_pool_context context);
31 static void pool_pop_many_private(ABT_pool pool, ABT_thread *threads,
32  size_t max_threads, size_t *num_popped,
33  ABT_pool_context context);
34 static void pool_print_all(ABT_pool pool, void *arg,
35  void (*print_fn)(void *, ABT_thread));
36 static ABT_unit pool_create_unit(ABT_pool pool, ABT_thread thread);
37 static void pool_free_unit(ABT_pool pool, ABT_unit unit);
38 
39 /* For backward compatibility */
40 static int pool_remove_shared(ABT_pool pool, ABT_unit unit);
41 static int pool_remove_private(ABT_pool pool, ABT_unit unit);
42 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs);
44 
45 #define POOL_CONTEXT_PUSH_HEAD \
46  (ABT_POOL_CONTEXT_OP_THREAD_CREATE | \
47  ABT_POOL_CONTEXT_OP_THREAD_CREATE_TO | \
48  ABT_POOL_CONTEXT_OP_THREAD_REVIVE | ABT_POOL_CONTEXT_OP_THREAD_REVIVE_TO)
49 #define POOL_CONTEXT_POP_TAIL (ABT_POOL_CONTEXT_OWNER_SECONDARY)
50 
51 struct data {
52  ABTD_spinlock mutex;
53  thread_queue_t queue;
54 };
55 typedef struct data data_t;
56 
57 static inline data_t *pool_get_data_ptr(void *p_data)
58 {
59  return (data_t *)p_data;
60 }
61 
62 /* Obtain the RANDWS pool definition according to the access type */
63 ABTU_ret_err int
64 ABTI_pool_get_randws_def(ABT_pool_access access,
65  ABTI_pool_required_def *p_required_def,
66  ABTI_pool_optional_def *p_optional_def,
67  ABTI_pool_deprecated_def *p_deprecated_def)
68 {
69  /* Definitions according to the access type */
70  /* FIXME: need better implementation, e.g., lock-free one */
71  switch (access) {
73  p_required_def->p_push = pool_push_private;
74  p_required_def->p_pop = pool_pop_private;
75  p_optional_def->p_push_many = pool_push_many_private;
76  p_optional_def->p_pop_many = pool_pop_many_private;
77  p_deprecated_def->p_remove = pool_remove_private;
78  break;
79 
84  p_required_def->p_push = pool_push_shared;
85  p_required_def->p_pop = pool_pop_shared;
86  p_optional_def->p_push_many = pool_push_many_shared;
87  p_optional_def->p_pop_many = pool_pop_many_shared;
88  p_deprecated_def->p_remove = pool_remove_shared;
89  break;
90 
91  default:
92  ABTI_HANDLE_ERROR(ABT_ERR_INV_POOL_ACCESS);
93  }
94 
95  /* Common definitions regardless of the access type */
96  p_optional_def->p_init = pool_init;
97  p_optional_def->p_free = pool_free;
98  p_required_def->p_is_empty = pool_is_empty;
99  p_optional_def->p_get_size = pool_get_size;
100  p_optional_def->p_pop_wait = pool_pop_wait;
101  p_optional_def->p_print_all = pool_print_all;
102  p_required_def->p_create_unit = pool_create_unit;
103  p_required_def->p_free_unit = pool_free_unit;
104 
105  p_deprecated_def->p_pop_timedwait = pool_pop_timedwait;
106  p_deprecated_def->u_is_in_pool = pool_unit_is_in_pool;
107  return ABT_SUCCESS;
108 }
109 
110 /* Pool functions */
111 
112 static int pool_init(ABT_pool pool, ABT_pool_config config)
113 {
114  ABTI_UNUSED(config);
115  int abt_errno = ABT_SUCCESS;
116  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
117  ABT_pool_access access;
118 
119  data_t *p_data;
120  abt_errno = ABTU_malloc(sizeof(data_t), (void **)&p_data);
121  ABTI_CHECK_ERROR(abt_errno);
122 
123  access = p_pool->access;
124  if (access != ABT_POOL_ACCESS_PRIV) {
125  /* Initialize the mutex */
126  ABTD_spinlock_clear(&p_data->mutex);
127  }
128  thread_queue_init(&p_data->queue);
129 
130  p_pool->data = p_data;
131  return abt_errno;
132 }
133 
134 static void pool_free(ABT_pool pool)
135 {
136  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
137  data_t *p_data = pool_get_data_ptr(p_pool->data);
138  thread_queue_free(&p_data->queue);
139  ABTU_free(p_data);
140 }
141 
143 {
144  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
145  data_t *p_data = pool_get_data_ptr(p_pool->data);
146  return thread_queue_is_empty(&p_data->queue);
147 }
148 
149 static size_t pool_get_size(ABT_pool pool)
150 {
151  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
152  data_t *p_data = pool_get_data_ptr(p_pool->data);
153  return thread_queue_get_size(&p_data->queue);
154 }
155 
156 static void pool_push_shared(ABT_pool pool, ABT_unit unit,
157  ABT_pool_context context)
158 {
159  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
160  data_t *p_data = pool_get_data_ptr(p_pool->data);
161  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
162  ABTD_spinlock_acquire(&p_data->mutex);
163  if (context & POOL_CONTEXT_PUSH_HEAD) {
164  thread_queue_push_head(&p_data->queue, p_thread);
165  } else {
166  thread_queue_push_tail(&p_data->queue, p_thread);
167  }
168  ABTD_spinlock_release(&p_data->mutex);
169 }
170 
171 static void pool_push_private(ABT_pool pool, ABT_unit unit,
172  ABT_pool_context context)
173 {
174  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
175  data_t *p_data = pool_get_data_ptr(p_pool->data);
176  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
177  if (context & POOL_CONTEXT_PUSH_HEAD) {
178  thread_queue_push_head(&p_data->queue, p_thread);
179  } else {
180  thread_queue_push_tail(&p_data->queue, p_thread);
181  }
182 }
183 
184 static void pool_push_many_shared(ABT_pool pool, const ABT_unit *units,
185  size_t num_units, ABT_pool_context context)
186 {
187  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
188  data_t *p_data = pool_get_data_ptr(p_pool->data);
189  if (num_units > 0) {
190  ABTD_spinlock_acquire(&p_data->mutex);
191  size_t i;
192  for (i = 0; i < num_units; i++) {
193  ABTI_thread *p_thread =
194  ABTI_unit_get_thread_from_builtin_unit(units[i]);
195  if (context & POOL_CONTEXT_PUSH_HEAD) {
196  thread_queue_push_head(&p_data->queue, p_thread);
197  } else {
198  thread_queue_push_tail(&p_data->queue, p_thread);
199  }
200  }
201  ABTD_spinlock_release(&p_data->mutex);
202  }
203 }
204 
205 static void pool_push_many_private(ABT_pool pool, const ABT_unit *units,
206  size_t num_units, ABT_pool_context context)
207 {
208  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
209  data_t *p_data = pool_get_data_ptr(p_pool->data);
210  size_t i;
211  for (i = 0; i < num_units; i++) {
212  ABTI_thread *p_thread =
213  ABTI_unit_get_thread_from_builtin_unit(units[i]);
214  if (context & POOL_CONTEXT_PUSH_HEAD) {
215  thread_queue_push_head(&p_data->queue, p_thread);
216  } else {
217  thread_queue_push_tail(&p_data->queue, p_thread);
218  }
219  }
220 }
221 
222 static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs,
223  ABT_pool_context context)
224 {
225  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
226  data_t *p_data = pool_get_data_ptr(p_pool->data);
227  double time_start = 0.0;
228  while (1) {
230  &p_data->mutex) == 0) {
231  ABTI_thread *p_thread;
232  if (context & POOL_CONTEXT_POP_TAIL) {
233  p_thread = thread_queue_pop_tail(&p_data->queue);
234  } else {
235  p_thread = thread_queue_pop_head(&p_data->queue);
236  }
237  ABTD_spinlock_release(&p_data->mutex);
238  if (p_thread)
239  return ABTI_thread_get_handle(p_thread);
240  }
241  if (time_start == 0.0) {
242  time_start = ABTI_get_wtime();
243  } else {
244  double elapsed = ABTI_get_wtime() - time_start;
245  if (elapsed > time_secs)
246  return ABT_THREAD_NULL;
247  }
248  /* Sleep. */
249  const int sleep_nsecs = 100;
250  struct timespec ts = { 0, sleep_nsecs };
251  nanosleep(&ts, NULL);
252  }
253 }
254 
255 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
256 {
257  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
258  data_t *p_data = pool_get_data_ptr(p_pool->data);
259  while (1) {
261  &p_data->mutex) == 0) {
262  ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
263  ABTD_spinlock_release(&p_data->mutex);
264  if (p_thread) {
265  return ABTI_unit_get_builtin_unit(p_thread);
266  }
267  }
268  const int sleep_nsecs = 100;
269  struct timespec ts = { 0, sleep_nsecs };
270  nanosleep(&ts, NULL);
271 
272  if (ABTI_get_wtime() > abstime_secs)
273  return ABT_UNIT_NULL;
274  }
275 }
276 
278 {
279  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
280  data_t *p_data = pool_get_data_ptr(p_pool->data);
282  &p_data->mutex) == 0) {
283  ABTI_thread *p_thread;
284  if (context & POOL_CONTEXT_POP_TAIL) {
285  p_thread = thread_queue_pop_tail(&p_data->queue);
286  } else {
287  p_thread = thread_queue_pop_head(&p_data->queue);
288  }
289  ABTD_spinlock_release(&p_data->mutex);
290  return ABTI_thread_get_handle(p_thread);
291  } else {
292  return ABT_THREAD_NULL;
293  }
294 }
295 
297 {
298  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
299  data_t *p_data = pool_get_data_ptr(p_pool->data);
300  ABTI_thread *p_thread;
301  if (context & POOL_CONTEXT_POP_TAIL) {
302  p_thread = thread_queue_pop_tail(&p_data->queue);
303  } else {
304  p_thread = thread_queue_pop_head(&p_data->queue);
305  }
306  return ABTI_thread_get_handle(p_thread);
307 }
308 
309 static void pool_pop_many_shared(ABT_pool pool, ABT_thread *threads,
310  size_t max_threads, size_t *num_popped,
311  ABT_pool_context context)
312 {
313  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
314  data_t *p_data = pool_get_data_ptr(p_pool->data);
315  if (max_threads != 0 &&
317  &p_data->mutex) == 0) {
318  size_t i;
319  for (i = 0; i < max_threads; i++) {
320  ABTI_thread *p_thread;
321  if (context & POOL_CONTEXT_POP_TAIL) {
322  p_thread = thread_queue_pop_tail(&p_data->queue);
323  } else {
324  p_thread = thread_queue_pop_head(&p_data->queue);
325  }
326  if (!p_thread)
327  break;
328  threads[i] = ABTI_thread_get_handle(p_thread);
329  }
330  *num_popped = i;
331  ABTD_spinlock_release(&p_data->mutex);
332  } else {
333  *num_popped = 0;
334  }
335 }
336 
337 static void pool_pop_many_private(ABT_pool pool, ABT_thread *threads,
338  size_t max_threads, size_t *num_popped,
339  ABT_pool_context context)
340 {
341  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
342  data_t *p_data = pool_get_data_ptr(p_pool->data);
343  size_t i;
344  for (i = 0; i < max_threads; i++) {
345  ABTI_thread *p_thread;
346  if (context & POOL_CONTEXT_POP_TAIL) {
347  p_thread = thread_queue_pop_tail(&p_data->queue);
348  } else {
349  p_thread = thread_queue_pop_head(&p_data->queue);
350  }
351  if (!p_thread)
352  break;
353  threads[i] = ABTI_thread_get_handle(p_thread);
354  }
355  *num_popped = i;
356 }
357 
358 static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
359 {
360  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
361  data_t *p_data = pool_get_data_ptr(p_pool->data);
362  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
363  ABTD_spinlock_acquire(&p_data->mutex);
364  int abt_errno = thread_queue_remove(&p_data->queue, p_thread);
365  ABTD_spinlock_release(&p_data->mutex);
366  return abt_errno;
367 }
368 
369 static int pool_remove_private(ABT_pool pool, ABT_unit unit)
370 {
371  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
372  data_t *p_data = pool_get_data_ptr(p_pool->data);
373  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
374  return thread_queue_remove(&p_data->queue, p_thread);
375 }
376 
377 static void pool_print_all(ABT_pool pool, void *arg,
378  void (*print_fn)(void *, ABT_thread))
379 {
380  ABT_pool_access access;
381  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
382  data_t *p_data = pool_get_data_ptr(p_pool->data);
383 
384  access = p_pool->access;
385  if (access != ABT_POOL_ACCESS_PRIV) {
386  ABTD_spinlock_acquire(&p_data->mutex);
387  }
388  thread_queue_print_all(&p_data->queue, arg, print_fn);
389  if (access != ABT_POOL_ACCESS_PRIV) {
390  ABTD_spinlock_release(&p_data->mutex);
391  }
392 }
393 
394 /* Unit functions */
395 
397 {
398  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
399  return ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) ? ABT_TRUE
400  : ABT_FALSE;
401 }
402 
404 {
405  /* Call ABTI_unit_init_builtin() instead. */
406  ABTI_ASSERT(0);
407  return ABT_UNIT_NULL;
408 }
409 
410 static void pool_free_unit(ABT_pool pool, ABT_unit unit)
411 {
412  /* A built-in unit does not need to be freed. This function may not be
413  * called. */
414  ABTI_ASSERT(0);
415 }
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1043
thread_queue_free
static void thread_queue_free(thread_queue_t *p_queue)
Definition: thread_queue.h:29
ABT_pool_context
uint64_t ABT_pool_context
A pool context value.
Definition: abt.h:1566
pool_push_many_private
static void pool_push_many_private(ABT_pool pool, const ABT_unit *units, size_t num_units, ABT_pool_context context)
Definition: randws.c:205
ABT_thread
struct ABT_thread_opaque * ABT_thread
Work unit handle type.
Definition: abt.h:932
pool_pop_many_shared
static void pool_pop_many_shared(ABT_pool pool, ABT_thread *threads, size_t max_threads, size_t *num_popped, ABT_pool_context context)
Definition: randws.c:309
pool_create_unit
static ABT_unit pool_create_unit(ABT_pool pool, ABT_thread thread)
Definition: randws.c:403
thread_queue_acquire_spinlock_if_not_empty
static ABTU_ret_err int thread_queue_acquire_spinlock_if_not_empty(thread_queue_t *p_queue, ABTD_spinlock *p_lock)
Definition: thread_queue.h:35
pool_free
static void pool_free(ABT_pool pool)
Definition: randws.c:134
ABT_POOL_ACCESS_MPMC
@ ABT_POOL_ACCESS_MPMC
Definition: abt.h:575
ABT_THREAD_NULL
#define ABT_THREAD_NULL
Definition: abt.h:1105
ABT_pool
struct ABT_pool_opaque * ABT_pool
Pool handle type.
Definition: abt.h:878
pool_init
static int pool_init(ABT_pool pool, ABT_pool_config config)
Definition: randws.c:112
ABT_POOL_ACCESS_MPSC
@ ABT_POOL_ACCESS_MPSC
Definition: abt.h:569
pool_pop_private
static ABT_thread pool_pop_private(ABT_pool pool, ABT_pool_context context)
Definition: randws.c:296
ABT_POOL_ACCESS_PRIV
@ ABT_POOL_ACCESS_PRIV
Definition: abt.h:560
pool_push_many_shared
static void pool_push_many_shared(ABT_pool pool, const ABT_unit *units, size_t num_units, ABT_pool_context context)
Definition: randws.c:184
abti.h
pool_unit_is_in_pool
static ABT_bool pool_unit_is_in_pool(ABT_unit unit)
Definition: randws.c:396
pool_get_size
static size_t pool_get_size(ABT_pool pool)
Definition: randws.c:149
ABT_pool_config
struct ABT_pool_config_opaque * ABT_pool_config
Pool configuration handle type.
Definition: abt.h:885
pool_remove_private
static int pool_remove_private(ABT_pool pool, ABT_unit unit)
Definition: randws.c:369
ABTU_malloc
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:235
pool_pop_wait
static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs, ABT_pool_context context)
Definition: randws.c:222
ABT_ERR_INV_POOL_ACCESS
#define ABT_ERR_INV_POOL_ACCESS
Error code: invalid pool access type.
Definition: abt.h:166
pool_pop_timedwait
static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
Definition: randws.c:255
ABT_unit
struct ABT_unit_opaque * ABT_unit
Work unit handle type for scheduling.
Definition: abt.h:911
pool_print_all
static void pool_print_all(ABT_pool pool, void *arg, void(*print_fn)(void *, ABT_thread))
Definition: randws.c:377
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
thread_queue_is_empty
static ABT_bool thread_queue_is_empty(const thread_queue_t *p_queue)
Definition: thread_queue.h:58
pool_pop_shared
static ABT_thread pool_pop_shared(ABT_pool pool, ABT_pool_context context)
Definition: randws.c:277
pool_free_unit
static void pool_free_unit(ABT_pool pool, ABT_unit unit)
Definition: randws.c:410
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:155
thread_queue_pop_tail
static ABTI_thread * thread_queue_pop_tail(thread_queue_t *p_queue)
Definition: thread_queue.h:140
thread_queue_remove
static ABTU_ret_err int thread_queue_remove(thread_queue_t *p_queue, ABTI_thread *p_thread)
Definition: thread_queue.h:165
pool_push_private
static void pool_push_private(ABT_pool pool, ABT_unit unit, ABT_pool_context context)
Definition: randws.c:171
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:784
ABT_POOL_ACCESS_SPMC
@ ABT_POOL_ACCESS_SPMC
Definition: abt.h:573
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:786
thread_queue_t
Definition: thread_queue.h:12
pool_push_shared
static void pool_push_shared(ABT_pool pool, ABT_unit unit, ABT_pool_context context)
Definition: randws.c:156
ABT_POOL_ACCESS_SPSC
@ ABT_POOL_ACCESS_SPSC
Definition: abt.h:565
ABTU_free
static void ABTU_free(void *ptr)
Definition: abtu.h:228
data_t
struct data data_t
Definition: fifo.c:49
thread_queue_push_tail
static void thread_queue_push_tail(thread_queue_t *p_queue, ABTI_thread *p_thread)
Definition: thread_queue.h:92
pool_pop_many_private
static void pool_pop_many_private(ABT_pool pool, ABT_thread *threads, size_t max_threads, size_t *num_popped, ABT_pool_context context)
Definition: randws.c:337
thread_queue.h
thread_queue_pop_head
static ABTI_thread * thread_queue_pop_head(thread_queue_t *p_queue)
Definition: thread_queue.h:115
pool_get_data_ptr
static data_t * pool_get_data_ptr(void *p_data)
Definition: randws.c:57
ABT_UNIT_NULL
#define ABT_UNIT_NULL
Definition: abt.h:1104
thread_queue_print_all
static void thread_queue_print_all(const thread_queue_t *p_queue, void *arg, void(*print_fn)(void *, ABT_thread))
Definition: thread_queue.h:193
thread_queue_push_head
static void thread_queue_push_head(thread_queue_t *p_queue, ABTI_thread *p_thread)
Definition: thread_queue.h:69
pool_remove_shared
static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
Definition: randws.c:358
thread_queue_init
static void thread_queue_init(thread_queue_t *p_queue)
Definition: thread_queue.h:21
POOL_CONTEXT_POP_TAIL
#define POOL_CONTEXT_POP_TAIL
Definition: randws.c:49
POOL_CONTEXT_PUSH_HEAD
#define POOL_CONTEXT_PUSH_HEAD
Definition: randws.c:45
pool_is_empty
static ABT_bool pool_is_empty(ABT_pool pool)
Definition: randws.c:142
thread_queue_get_size
static size_t thread_queue_get_size(const thread_queue_t *p_queue)
Definition: thread_queue.h:64
ABT_pool_access
ABT_pool_access
Pool access type.
Definition: abt.h:556