ARGOBOTS
abti_pool.h
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #ifndef ABTI_POOL_H_INCLUDED
7 #define ABTI_POOL_H_INCLUDED
8 
9 /* Inlined functions for Pool */
10 
11 static inline ABTI_pool *ABTI_pool_get_ptr(ABT_pool pool)
12 {
13 #ifndef ABT_CONFIG_DISABLE_ERROR_CHECK
14  ABTI_pool *p_pool;
15  if (pool == ABT_POOL_NULL) {
16  p_pool = NULL;
17  } else {
18  p_pool = (ABTI_pool *)pool;
19  }
20  return p_pool;
21 #else
22  return (ABTI_pool *)pool;
23 #endif
24 }
25 
26 static inline ABT_pool ABTI_pool_get_handle(ABTI_pool *p_pool)
27 {
28 #ifndef ABT_CONFIG_DISABLE_ERROR_CHECK
29  ABT_pool h_pool;
30  if (p_pool == NULL) {
31  h_pool = ABT_POOL_NULL;
32  } else {
33  h_pool = (ABT_pool)p_pool;
34  }
35  return h_pool;
36 #else
37  return (ABT_pool)p_pool;
38 #endif
39 }
40 
41 /* A ULT is blocked and is waiting for going back to this pool */
42 static inline void ABTI_pool_inc_num_blocked(ABTI_pool *p_pool)
43 {
44  ABTD_atomic_fetch_add_int32(&p_pool->num_blocked, 1);
45 }
46 
47 /* A blocked ULT is back in the pool */
48 static inline void ABTI_pool_dec_num_blocked(ABTI_pool *p_pool)
49 {
50  ABTD_atomic_fetch_sub_int32(&p_pool->num_blocked, 1);
51 }
52 
53 /* The pool will receive a migrated ULT */
54 static inline void ABTI_pool_inc_num_migrations(ABTI_pool *p_pool)
55 {
56  ABTD_atomic_fetch_add_int32(&p_pool->num_migrations, 1);
57 }
58 
59 /* The pool has received a migrated ULT */
60 static inline void ABTI_pool_dec_num_migrations(ABTI_pool *p_pool)
61 {
62  ABTD_atomic_fetch_sub_int32(&p_pool->num_migrations, 1);
63 }
64 
65 #ifdef ABT_CONFIG_DISABLE_POOL_PRODUCER_CHECK
66 static inline void ABTI_pool_push(ABTI_pool *p_pool, ABT_unit unit)
67 {
68  LOG_EVENT_POOL_PUSH(p_pool, unit,
69  ABTI_self_get_native_thread_id(ABTI_local_get_local()));
70 
71  /* Push unit into pool */
72  p_pool->p_push(ABTI_pool_get_handle(p_pool), unit);
73 }
74 
75 static inline void ABTI_pool_add_thread(ABTI_thread *p_thread)
76 {
77  /* Set the ULT's state as READY. The relaxed version is used since the state
78  * is synchronized by the following pool operation. */
79  ABTD_atomic_relaxed_store_int(&p_thread->state, ABT_THREAD_STATE_READY);
80 
81  /* Add the ULT to the associated pool */
82  ABTI_pool_push(p_thread->p_pool, p_thread->unit);
83 }
84 
85 #define ABTI_POOL_PUSH(p_pool, unit, p_producer) ABTI_pool_push(p_pool, unit)
86 
87 #define ABTI_POOL_ADD_THREAD(p_thread, p_producer) \
88  ABTI_pool_add_thread(p_thread)
89 
90 #else /* ABT_CONFIG_DISABLE_POOL_PRODUCER_CHECK */
91 
92 static inline int ABTI_pool_push(ABTI_pool *p_pool, ABT_unit unit,
93  ABTI_native_thread_id producer_id)
94 {
95  int abt_errno = ABT_SUCCESS;
96 
97  LOG_EVENT_POOL_PUSH(p_pool, unit, producer_id);
98 
99  /* Save the producer ES information in the pool */
100  abt_errno = ABTI_pool_set_producer(p_pool, producer_id);
101  ABTI_CHECK_ERROR(abt_errno);
102 
103  /* Push unit into pool */
104  p_pool->p_push(ABTI_pool_get_handle(p_pool), unit);
105 
106 fn_exit:
107  return abt_errno;
108 
109 fn_fail:
110  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
111  goto fn_exit;
112 }
113 
114 static inline int ABTI_pool_add_thread(ABTI_thread *p_thread,
115  ABTI_native_thread_id producer_id)
116 {
117  int abt_errno;
118 
119  /* Set the ULT's state as READY. The relaxed version is used since the state
120  * is synchronized by the following pool operation. */
121  ABTD_atomic_relaxed_store_int(&p_thread->state, ABT_THREAD_STATE_READY);
122 
123  /* Add the ULT to the associated pool */
124  abt_errno = ABTI_pool_push(p_thread->p_pool, p_thread->unit, producer_id);
125  ABTI_CHECK_ERROR(abt_errno);
126 
127 fn_exit:
128  return abt_errno;
129 
130 fn_fail:
131  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
132  goto fn_exit;
133 }
134 
135 #define ABTI_POOL_PUSH(p_pool, unit, producer_id) \
136  do { \
137  abt_errno = ABTI_pool_push(p_pool, unit, producer_id); \
138  ABTI_CHECK_ERROR_MSG(abt_errno, "ABTI_pool_push"); \
139  } while (0)
140 
141 #define ABTI_POOL_ADD_THREAD(p_thread, producer_id) \
142  do { \
143  abt_errno = ABTI_pool_add_thread(p_thread, producer_id); \
144  ABTI_CHECK_ERROR(abt_errno); \
145  } while (0)
146 
147 #endif /* ABT_CONFIG_DISABLE_POOL_PRODUCER_CHECK */
148 
149 #ifdef ABT_CONFIG_DISABLE_POOL_CONSUMER_CHECK
150 static inline int ABTI_pool_remove(ABTI_pool *p_pool, ABT_unit unit)
151 {
152  int abt_errno = ABT_SUCCESS;
153 
154  LOG_EVENT_POOL_REMOVE(p_pool, unit,
155  ABTI_self_get_native_thread_id(
156  ABTI_local_get_local()));
157 
158  abt_errno = p_pool->p_remove(ABTI_pool_get_handle(p_pool), unit);
159  ABTI_CHECK_ERROR(abt_errno);
160 
161 fn_exit:
162  return abt_errno;
163 
164 fn_fail:
165  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
166  goto fn_exit;
167 }
168 
169 #define ABTI_POOL_REMOVE(p_pool, unit, consumer_id) \
170  ABTI_pool_remove(p_pool, unit)
171 #define ABTI_POOL_SET_CONSUMER(p_pool, consumer_id)
172 
173 #else /* ABT_CONFIG_DISABLE_POOL_CONSUMER_CHECK */
174 
175 static inline int ABTI_pool_remove(ABTI_pool *p_pool, ABT_unit unit,
176  ABTI_native_thread_id consumer_id)
177 {
178  int abt_errno = ABT_SUCCESS;
179 
180  LOG_EVENT_POOL_REMOVE(p_pool, unit, consumer_id);
181 
182  abt_errno = ABTI_pool_set_consumer(p_pool, consumer_id);
183  ABTI_CHECK_ERROR(abt_errno);
184 
185  abt_errno = p_pool->p_remove(ABTI_pool_get_handle(p_pool), unit);
186  ABTI_CHECK_ERROR(abt_errno);
187 
188 fn_exit:
189  return abt_errno;
190 
191 fn_fail:
192  HANDLE_ERROR_FUNC_WITH_CODE(abt_errno);
193  goto fn_exit;
194 }
195 
196 #define ABTI_POOL_REMOVE(p_pool, unit, consumer_id) \
197  ABTI_pool_remove(p_pool, unit, consumer_id)
198 #define ABTI_POOL_SET_CONSUMER(p_pool, consumer_id) \
199  do { \
200  abt_errno = ABTI_pool_set_consumer(p_pool, consumer_id); \
201  ABTI_CHECK_ERROR(abt_errno); \
202  } while (0)
203 
204 #endif /* ABT_CONFIG_DISABLE_POOL_CONSUMER_CHECK */
205 
206 static inline ABT_unit ABTI_pool_pop_timedwait(ABTI_pool *p_pool,
207  double abstime_secs)
208 {
209  ABT_unit unit;
210 
211  unit = p_pool->p_pop_timedwait(ABTI_pool_get_handle(p_pool), abstime_secs);
212  LOG_EVENT_POOL_POP(p_pool, unit);
213 
214  return unit;
215 }
216 
217 static inline ABT_unit ABTI_pool_pop(ABTI_pool *p_pool)
218 {
219  ABT_unit unit;
220 
221  unit = p_pool->p_pop(ABTI_pool_get_handle(p_pool));
222  LOG_EVENT_POOL_POP(p_pool, unit);
223 
224  return unit;
225 }
226 
227 /* Increase num_scheds to mark the pool as having another scheduler. If the
228  * pool is not available, it returns ABT_ERR_INV_POOL_ACCESS. */
229 static inline void ABTI_pool_retain(ABTI_pool *p_pool)
230 {
231  ABTD_atomic_fetch_add_int32(&p_pool->num_scheds, 1);
232 }
233 
234 /* Decrease the num_scheds to release this pool from a scheduler. Call when
235  * the pool is removed from a scheduler or when it stops. */
236 static inline int32_t ABTI_pool_release(ABTI_pool *p_pool)
237 {
238  ABTI_ASSERT(ABTD_atomic_acquire_load_int32(&p_pool->num_scheds) > 0);
239  return ABTD_atomic_fetch_sub_int32(&p_pool->num_scheds, 1) - 1;
240 }
241 
242 static inline size_t ABTI_pool_get_size(ABTI_pool *p_pool)
243 {
244  return p_pool->p_get_size(ABTI_pool_get_handle(p_pool));
245 }
246 
247 static inline size_t ABTI_pool_get_total_size(ABTI_pool *p_pool)
248 {
249  size_t total_size;
250  total_size = ABTI_pool_get_size(p_pool);
251  total_size += ABTD_atomic_acquire_load_int32(&p_pool->num_blocked);
252  total_size += ABTD_atomic_acquire_load_int32(&p_pool->num_migrations);
253  return total_size;
254 }
255 
256 #endif /* ABTI_POOL_H_INCLUDED */
struct ABT_unit_opaque * ABT_unit
Definition: abt.h:275
#define ABT_POOL_NULL
Definition: abt.h:341
#define LOG_EVENT_POOL_PUSH(p_pool, unit, produer_id)
Definition: abti_log.h:63
#define LOG_EVENT_POOL_POP(p_pool, unit)
Definition: abti_log.h:65
struct ABT_pool_opaque * ABT_pool
Definition: abt.h:267
#define LOG_EVENT_POOL_REMOVE(p_pool, unit, consumer_id)
Definition: abti_log.h:64
#define HANDLE_ERROR_FUNC_WITH_CODE(n)
Definition: abti_error.h:241
#define ABT_SUCCESS
Definition: abt.h:64