ARGOBOTS  dce6e727ffc4ca5b3ffc04cb9517c6689be51ec5
thread_queue.h
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #ifndef THREAD_QUEUE_H_INCLUDED
7 #define THREAD_QUEUE_H_INCLUDED
8 
9 #include "abti.h"
10 
11 /* Generic queue implementation for work units. */
12 typedef struct {
13  size_t num_threads;
14  ABTI_thread *p_head;
15  ABTI_thread *p_tail;
16  /* If the pool is empty, pop() accesses only is_empty so that pop() does not
17  * slow down a push operation. */
18  ABTD_atomic_int is_empty; /* Whether the pool is empty or not. */
20 
21 static inline void thread_queue_init(thread_queue_t *p_queue)
22 {
23  p_queue->num_threads = 0;
24  p_queue->p_head = NULL;
25  p_queue->p_tail = NULL;
26  ABTD_atomic_relaxed_store_int(&p_queue->is_empty, 1);
27 }
28 
29 static inline void thread_queue_free(thread_queue_t *p_queue)
30 {
31  ; /* Do nothing. */
32 }
33 
34 ABTU_ret_err static inline int
36  ABTD_spinlock *p_lock)
37 {
38  if (ABTD_atomic_acquire_load_int(&p_queue->is_empty)) {
39  /* The pool is empty. Lock is not taken. */
40  return 1;
41  }
42  while (ABTD_spinlock_try_acquire(p_lock)) {
43  /* Lock acquisition failed. Check the size. */
44  while (1) {
45  if (ABTD_atomic_acquire_load_int(&p_queue->is_empty)) {
46  /* The pool becomes empty. Lock is not taken. */
47  return 1;
48  } else if (!ABTD_spinlock_is_locked(p_lock)) {
49  /* Lock seems released. Let's try to take a lock again. */
50  break;
51  }
52  }
53  }
54  /* Lock is acquired. */
55  return 0;
56 }
57 
58 static inline ABT_bool thread_queue_is_empty(const thread_queue_t *p_queue)
59 {
60  return ABTD_atomic_acquire_load_int(&p_queue->is_empty) ? ABT_TRUE
61  : ABT_FALSE;
62 }
63 
64 static inline size_t thread_queue_get_size(const thread_queue_t *p_queue)
65 {
66  return p_queue->num_threads;
67 }
68 
69 static inline void thread_queue_push_head(thread_queue_t *p_queue,
70  ABTI_thread *p_thread)
71 {
72  if (p_queue->num_threads == 0) {
73  p_thread->p_prev = p_thread;
74  p_thread->p_next = p_thread;
75  p_queue->p_head = p_thread;
76  p_queue->p_tail = p_thread;
77  p_queue->num_threads = 1;
78  ABTD_atomic_release_store_int(&p_queue->is_empty, 0);
79  } else {
80  ABTI_thread *p_head = p_queue->p_head;
81  ABTI_thread *p_tail = p_queue->p_tail;
82  p_tail->p_next = p_thread;
83  p_head->p_prev = p_thread;
84  p_thread->p_prev = p_tail;
85  p_thread->p_next = p_head;
86  p_queue->p_head = p_thread;
87  p_queue->num_threads++;
88  }
89  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1);
90 }
91 
92 static inline void thread_queue_push_tail(thread_queue_t *p_queue,
93  ABTI_thread *p_thread)
94 {
95  if (p_queue->num_threads == 0) {
96  p_thread->p_prev = p_thread;
97  p_thread->p_next = p_thread;
98  p_queue->p_head = p_thread;
99  p_queue->p_tail = p_thread;
100  p_queue->num_threads = 1;
101  ABTD_atomic_release_store_int(&p_queue->is_empty, 0);
102  } else {
103  ABTI_thread *p_head = p_queue->p_head;
104  ABTI_thread *p_tail = p_queue->p_tail;
105  p_tail->p_next = p_thread;
106  p_head->p_prev = p_thread;
107  p_thread->p_prev = p_tail;
108  p_thread->p_next = p_head;
109  p_queue->p_tail = p_thread;
110  p_queue->num_threads++;
111  }
112  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1);
113 }
114 
115 static inline ABTI_thread *thread_queue_pop_head(thread_queue_t *p_queue)
116 {
117  if (p_queue->num_threads > 0) {
118  ABTI_thread *p_thread = p_queue->p_head;
119  if (p_queue->num_threads == 1) {
120  p_queue->p_head = NULL;
121  p_queue->p_tail = NULL;
122  p_queue->num_threads = 0;
123  ABTD_atomic_release_store_int(&p_queue->is_empty, 1);
124  } else {
125  p_thread->p_prev->p_next = p_thread->p_next;
126  p_thread->p_next->p_prev = p_thread->p_prev;
127  p_queue->p_head = p_thread->p_next;
128  p_queue->num_threads--;
129  }
130 
131  p_thread->p_prev = NULL;
132  p_thread->p_next = NULL;
133  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
134  return p_thread;
135  } else {
136  return NULL;
137  }
138 }
139 
140 static inline ABTI_thread *thread_queue_pop_tail(thread_queue_t *p_queue)
141 {
142  if (p_queue->num_threads > 0) {
143  ABTI_thread *p_thread = p_queue->p_tail;
144  if (p_queue->num_threads == 1) {
145  p_queue->p_head = NULL;
146  p_queue->p_tail = NULL;
147  p_queue->num_threads = 0;
148  ABTD_atomic_release_store_int(&p_queue->is_empty, 1);
149  } else {
150  p_thread->p_prev->p_next = p_thread->p_next;
151  p_thread->p_next->p_prev = p_thread->p_prev;
152  p_queue->p_tail = p_thread->p_prev;
153  p_queue->num_threads--;
154  }
155 
156  p_thread->p_prev = NULL;
157  p_thread->p_next = NULL;
158  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
159  return p_thread;
160  } else {
161  return NULL;
162  }
163 }
164 
166  ABTI_thread *p_thread)
167 {
168  ABTI_CHECK_TRUE(p_queue->num_threads != 0, ABT_ERR_POOL);
169  ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) == 1,
170  ABT_ERR_POOL);
171 
172  if (p_queue->num_threads == 1) {
173  p_queue->p_head = NULL;
174  p_queue->p_tail = NULL;
175  p_queue->num_threads = 0;
176  ABTD_atomic_release_store_int(&p_queue->is_empty, 1);
177  } else {
178  p_thread->p_prev->p_next = p_thread->p_next;
179  p_thread->p_next->p_prev = p_thread->p_prev;
180  if (p_thread == p_queue->p_head) {
181  p_queue->p_head = p_thread->p_next;
182  } else if (p_thread == p_queue->p_tail) {
183  p_queue->p_tail = p_thread->p_prev;
184  }
185  p_queue->num_threads--;
186  }
187  ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
188  p_thread->p_prev = NULL;
189  p_thread->p_next = NULL;
190  return ABT_SUCCESS;
191 }
192 
193 static inline void thread_queue_print_all(const thread_queue_t *p_queue,
194  void *arg,
195  void (*print_fn)(void *, ABT_thread))
196 {
197  size_t num_threads = p_queue->num_threads;
198  ABTI_thread *p_thread = p_queue->p_head;
199  while (num_threads--) {
200  ABTI_ASSERT(p_thread);
201  ABT_thread thread = ABTI_thread_get_handle(p_thread);
202  print_fn(arg, thread);
203  p_thread = p_thread->p_next;
204  }
205 }
206 
207 #endif /* THREAD_QUEUE_H_INCLUDED */
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1043
thread_queue_free
static void thread_queue_free(thread_queue_t *p_queue)
Definition: thread_queue.h:29
ABT_thread
struct ABT_thread_opaque * ABT_thread
Work unit handle type.
Definition: abt.h:932
ABT_ERR_POOL
#define ABT_ERR_POOL
Error code: error related to a pool.
Definition: abt.h:292
thread_queue_acquire_spinlock_if_not_empty
static ABTU_ret_err int thread_queue_acquire_spinlock_if_not_empty(thread_queue_t *p_queue, ABTD_spinlock *p_lock)
Definition: thread_queue.h:35
thread_queue_t::is_empty
ABTD_atomic_int is_empty
Definition: thread_queue.h:18
abti.h
thread_queue_t::p_tail
ABTI_thread * p_tail
Definition: thread_queue.h:15
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
thread_queue_is_empty
static ABT_bool thread_queue_is_empty(const thread_queue_t *p_queue)
Definition: thread_queue.h:58
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:155
thread_queue_pop_tail
static ABTI_thread * thread_queue_pop_tail(thread_queue_t *p_queue)
Definition: thread_queue.h:140
thread_queue_remove
static ABTU_ret_err int thread_queue_remove(thread_queue_t *p_queue, ABTI_thread *p_thread)
Definition: thread_queue.h:165
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:784
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:786
thread_queue_t
Definition: thread_queue.h:12
thread_queue_push_tail
static void thread_queue_push_tail(thread_queue_t *p_queue, ABTI_thread *p_thread)
Definition: thread_queue.h:92
thread_queue_pop_head
static ABTI_thread * thread_queue_pop_head(thread_queue_t *p_queue)
Definition: thread_queue.h:115
thread_queue_t::num_threads
size_t num_threads
Definition: thread_queue.h:13
thread_queue_print_all
static void thread_queue_print_all(const thread_queue_t *p_queue, void *arg, void(*print_fn)(void *, ABT_thread))
Definition: thread_queue.h:193
thread_queue_push_head
static void thread_queue_push_head(thread_queue_t *p_queue, ABTI_thread *p_thread)
Definition: thread_queue.h:69
thread_queue_t::p_head
ABTI_thread * p_head
Definition: thread_queue.h:14
thread_queue_init
static void thread_queue_init(thread_queue_t *p_queue)
Definition: thread_queue.h:21
thread_queue_get_size
static size_t thread_queue_get_size(const thread_queue_t *p_queue)
Definition: thread_queue.h:64