ARGOBOTS  23067fa015f4b179569e2d52278c1072e674eb1e
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
thread_htable.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 #include "abti_thread_htable.h"
8 
9 ABTI_thread_htable *ABTI_thread_htable_create(uint32_t num_rows)
10 {
11  ABTI_STATIC_ASSERT(sizeof(ABTI_thread_queue) == 192);
12 
13  ABTI_thread_htable *p_htable;
14  size_t q_size = num_rows * sizeof(ABTI_thread_queue);
15 
16  p_htable = (ABTI_thread_htable *)ABTU_malloc(sizeof(ABTI_thread_htable));
17 #if defined(HAVE_LH_LOCK_H)
18  lh_lock_init(&p_htable->mutex);
19 #elif defined(HAVE_CLH_H)
20  clh_init(&p_htable->mutex);
21 #elif defined(USE_PTHREAD_MUTEX)
22  int ret = pthread_mutex_init(&p_htable->mutex, NULL);
23  assert(!ret);
24 #else
25  ABTI_spinlock_clear(&p_htable->mutex);
26 #endif
27  ABTD_atomic_relaxed_store_uint32(&p_htable->num_elems, 0);
28  p_htable->num_rows = num_rows;
29  p_htable->queue = (ABTI_thread_queue *)ABTU_memalign(64, q_size);
30  memset(p_htable->queue, 0, q_size);
31  p_htable->h_list = NULL;
32  p_htable->l_list = NULL;
33 
34  return p_htable;
35 }
36 
37 void ABTI_thread_htable_free(ABTI_thread_htable *p_htable)
38 {
39  ABTI_ASSERT(ABTD_atomic_relaxed_load_uint32(&p_htable->num_elems) == 0);
40 
41 #if defined(HAVE_LH_LOCK_H)
42  lh_lock_destroy(&p_htable->mutex);
43 #elif defined(HAVE_CLH_H)
44  clh_destroy(&p_htable->mutex);
45 #elif defined(USE_PTHREAD_MUTEX)
46  int ret = pthread_mutex_destroy(&p_htable->mutex);
47  assert(!ret);
48 #else
49  /* ABTI_spinlock needs no finalization. */
50 #endif
51  ABTU_free(p_htable->queue);
52  ABTU_free(p_htable);
53 }
54 
55 void ABTI_thread_htable_push(ABTI_thread_htable *p_htable, int idx,
56  ABTI_thread *p_thread)
57 {
58  ABTI_thread_queue *p_queue;
59 
60  if (idx >= p_htable->num_rows) {
61  ABTI_ASSERT(0);
63 #if 0
64  /* Increase the hash table */
65  uint32_t cur_size, new_size;
66  cur_size = p_htable->num_rows;
67  new_size = (idx / cur_size + 1) * cur_size;
68  p_htable->queue = (ABTI_thread_queue *)
69  ABTU_realloc(p_htable->queue, cur_size * sizeof(ABTI_thread_queue),
70  new_size * sizeof(ABTI_thread_queue));
71  memset(&p_htable->queue[cur_size], 0,
72  (new_size - cur_size) * sizeof(ABTI_thread_queue));
73  p_htable->num_rows = new_size;
74 #endif
75  }
76 
77  /* Add p_thread to the end of the idx-th row */
78  p_queue = &p_htable->queue[idx];
79  ABTI_thread_queue_acquire_mutex(p_queue);
80  if (p_queue->head == NULL) {
81  p_queue->head = p_thread;
82  p_queue->tail = p_thread;
83  } else {
84  p_queue->tail->unit_def.p_next = &p_thread->unit_def;
85  p_queue->tail = p_thread;
86  }
87  p_queue->num_threads++;
88  ABTI_thread_queue_release_mutex(p_queue);
89  ABTD_atomic_fetch_add_uint32(&p_htable->num_elems, 1);
90 }
91 
92 /* Unlike ABTI_thread_htable_push, this function pushes p_thread to the queue
93  * only when the queue is not empty. */
94 ABT_bool ABTI_thread_htable_add(ABTI_thread_htable *p_htable, int idx,
95  ABTI_thread *p_thread)
96 {
97  ABTI_thread_queue *p_queue;
98 
99  p_queue = &p_htable->queue[idx];
100 
101  ABTI_thread_queue_acquire_mutex(p_queue);
102  if (p_queue->head == NULL) {
103  ABTI_ASSERT(p_queue->num_threads == 0);
104  ABTI_thread_queue_release_mutex(p_queue);
105  return ABT_FALSE;
106  } else {
107  /* Change the ULT's state to BLOCKED */
108  ABTI_thread_set_blocked(p_thread);
109 
110  p_queue->tail->unit_def.p_next = &p_thread->unit_def;
111  p_queue->tail = p_thread;
112  }
113  p_queue->num_threads++;
114  ABTI_thread_queue_release_mutex(p_queue);
115  ABTD_atomic_fetch_add_uint32(&p_htable->num_elems, 1);
116  return ABT_TRUE;
117 }
118 
119 void ABTI_thread_htable_push_low(ABTI_thread_htable *p_htable, int idx,
120  ABTI_thread *p_thread)
121 {
122  ABTI_thread_queue *p_queue;
123 
124  if (idx >= p_htable->num_rows) {
125  ABTI_ASSERT(0);
127 #if 0
128  /* Increase the hash table */
129  uint32_t cur_size, new_size;
130  cur_size = p_htable->num_rows;
131  new_size = (idx / cur_size + 1) * cur_size;
132  p_htable->queue = (ABTI_thread_queue *)
133  ABTU_realloc(p_htable->queue, cur_size * sizeof(ABTI_thread_queue),
134  new_size * sizeof(ABTI_thread_queue));
135  memset(&p_htable->queue[cur_size], 0,
136  (new_size - cur_size) * sizeof(ABTI_thread_queue));
137  p_htable->num_rows = new_size;
138 #endif
139  }
140 
141  /* Add p_thread to the end of the idx-th row */
142  p_queue = &p_htable->queue[idx];
143  ABTI_thread_queue_acquire_low_mutex(p_queue);
144  if (p_queue->low_head == NULL) {
145  p_queue->low_head = p_thread;
146  p_queue->low_tail = p_thread;
147  } else {
148  p_queue->low_tail->unit_def.p_next = &p_thread->unit_def;
149  p_queue->low_tail = p_thread;
150  }
151  p_queue->low_num_threads++;
152  ABTI_thread_queue_release_low_mutex(p_queue);
153  ABTD_atomic_fetch_add_uint32(&p_htable->num_elems, 1);
154 }
155 
156 /* Unlike ABTI_thread_htable_push_low, this function pushes p_thread to the
157  * queue only when the queue is not empty. */
158 ABT_bool ABTI_thread_htable_add_low(ABTI_thread_htable *p_htable, int idx,
159  ABTI_thread *p_thread)
160 {
161  ABTI_thread_queue *p_queue;
162 
163  p_queue = &p_htable->queue[idx];
164 
165  ABTI_thread_queue_acquire_low_mutex(p_queue);
166  if (p_queue->low_head == NULL) {
167  ABTI_ASSERT(p_queue->low_num_threads == 0);
168  ABTI_thread_queue_release_low_mutex(p_queue);
169  return ABT_FALSE;
170  } else {
171  /* Change the ULT's state to BLOCKED */
172  ABTI_thread_set_blocked(p_thread);
173 
174  p_queue->low_tail->unit_def.p_next = &p_thread->unit_def;
175  p_queue->low_tail = p_thread;
176  }
177  p_queue->low_num_threads++;
178  ABTI_thread_queue_release_low_mutex(p_queue);
179  ABTD_atomic_fetch_add_uint32(&p_htable->num_elems, 1);
180  return ABT_TRUE;
181 }
182 
183 ABTI_thread *ABTI_thread_htable_pop(ABTI_thread_htable *p_htable,
184  ABTI_thread_queue *p_queue)
185 {
186  ABTI_thread *p_thread = NULL;
187 
188  ABTI_thread_queue_acquire_mutex(p_queue);
189  if (p_queue->head) {
190  ABTD_atomic_fetch_sub_uint32(&p_htable->num_elems, 1);
191  p_thread = p_queue->head;
192  if (p_queue->head == p_queue->tail) {
193  p_queue->head = NULL;
194  p_queue->tail = NULL;
195  } else {
196  p_queue->head = ABTI_unit_get_thread(p_thread->unit_def.p_next);
197  }
198 
199  p_queue->num_threads--;
200  }
201  ABTI_thread_queue_release_mutex(p_queue);
202 
203  return p_thread;
204 }
205 
206 ABTI_thread *ABTI_thread_htable_pop_low(ABTI_thread_htable *p_htable,
207  ABTI_thread_queue *p_queue)
208 {
209  ABTI_thread *p_thread = NULL;
210 
211  ABTI_thread_queue_acquire_low_mutex(p_queue);
212  if (p_queue->low_head) {
213  ABTD_atomic_fetch_sub_uint32(&p_htable->num_elems, 1);
214  p_thread = p_queue->low_head;
215  if (p_queue->low_head == p_queue->low_tail) {
216  p_queue->low_head = NULL;
217  p_queue->low_tail = NULL;
218  } else {
219  p_queue->low_head = ABTI_unit_get_thread(p_thread->unit_def.p_next);
220  }
221 
222  p_queue->low_num_threads--;
223  }
224  ABTI_thread_queue_release_low_mutex(p_queue);
225 
226  return p_thread;
227 }
228 
229 ABT_bool ABTI_thread_htable_switch_low(ABTI_xstream **pp_local_xstream,
230  ABTI_thread_queue *p_queue,
231  ABTI_thread *p_thread,
232  ABTI_thread_htable *p_htable,
233  ABT_sync_event_type sync_event_type,
234  void *p_sync)
235 {
236  ABTI_thread *p_target = NULL;
237  ABTI_xstream *p_local_xstream = *pp_local_xstream;
238 
239  ABTI_thread_queue_acquire_low_mutex(p_queue);
240  if (p_queue->low_head) {
241  p_target = p_queue->low_head;
242 
243  /* Push p_thread to the queue */
244  ABTD_atomic_release_store_int(&p_thread->unit_def.state,
245  ABTI_UNIT_STATE_BLOCKED);
246  ABTI_tool_event_thread_suspend(p_local_xstream, p_thread,
247  p_thread->unit_def.p_parent,
248  sync_event_type, p_sync);
249  if (p_queue->low_head == p_queue->low_tail) {
250  p_queue->low_head = p_thread;
251  p_queue->low_tail = p_thread;
252  } else {
253  p_queue->low_head = ABTI_unit_get_thread(p_target->unit_def.p_next);
254  p_queue->low_tail->unit_def.p_next = &p_thread->unit_def;
255  p_queue->low_tail = p_thread;
256  }
257  }
258  ABTI_thread_queue_release_low_mutex(p_queue);
259 
260  if (p_target) {
261  LOG_DEBUG("switch -> U%" PRIu64 "\n", ABTI_thread_get_id(p_target));
262 
263  /* Context-switch to p_target */
264  ABTD_atomic_release_store_int(&p_target->unit_def.state,
265  ABTI_UNIT_STATE_RUNNING);
266  ABTI_tool_event_thread_resume(p_local_xstream, p_target,
267  p_local_xstream ? p_local_xstream->p_unit
268  : NULL);
269  ABTI_thread *p_prev =
270  ABTI_thread_context_switch_to_sibling(pp_local_xstream, p_thread,
271  p_target);
272  ABTI_tool_event_thread_run(*pp_local_xstream, p_thread,
273  &p_prev->unit_def,
274  p_thread->unit_def.p_parent);
275  return ABT_TRUE;
276  } else {
277  return ABT_FALSE;
278  }
279 }
#define ABTU_unreachable()
Definition: abtu.h:25
static void * ABTU_malloc(size_t size)
Definition: abtu.h:118
int ABT_bool
Definition: abt.h:375
#define ABT_FALSE
Definition: abt.h:287
static void * ABTU_realloc(void *ptr, size_t old_size, size_t new_size)
Definition: abtu.h:135
#define ABT_TRUE
Definition: abt.h:286
#define LOG_DEBUG(fmt,...)
Definition: abti_log.h:30
static void * ABTU_memalign(size_t alignment, size_t size)
Definition: abtu.h:104
ABT_sync_event_type
Definition: abt.h:246
static void ABTU_free(void *ptr)
Definition: abtu.h:111