ARGOBOTS
thread_htable.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 #include "abti_thread_htable.h"
8 
9 ABTI_thread_htable *ABTI_thread_htable_create(uint32_t num_rows)
10 {
11  ABTI_STATIC_ASSERT(sizeof(ABTI_thread_queue) == 192);
12 
13  ABTI_thread_htable *p_htable;
14  size_t q_size = num_rows * sizeof(ABTI_thread_queue);
15 
16  p_htable = (ABTI_thread_htable *)ABTU_malloc(sizeof(ABTI_thread_htable));
17 #if defined(HAVE_LH_LOCK_H)
18  lh_lock_init(&p_htable->mutex);
19 #elif defined(HAVE_CLH_H)
20  clh_init(&p_htable->mutex);
21 #elif defined(USE_PTHREAD_MUTEX)
22  int ret = pthread_mutex_init(&p_htable->mutex, NULL);
23  assert(!ret);
24 #else
25  ABTI_spinlock_clear(&p_htable->mutex);
26 #endif
27  ABTD_atomic_relaxed_store_uint32(&p_htable->num_elems, 0);
28  p_htable->num_rows = num_rows;
29  p_htable->queue = (ABTI_thread_queue *)ABTU_memalign(64, q_size);
30  memset(p_htable->queue, 0, q_size);
31  p_htable->h_list = NULL;
32  p_htable->l_list = NULL;
33 
34  return p_htable;
35 }
36 
37 void ABTI_thread_htable_free(ABTI_thread_htable *p_htable)
38 {
39  ABTI_ASSERT(ABTD_atomic_relaxed_load_uint32(&p_htable->num_elems) == 0);
40 
41 #if defined(HAVE_LH_LOCK_H)
42  lh_lock_destroy(&p_htable->mutex);
43 #elif defined(HAVE_CLH_H)
44  clh_destroy(&p_htable->mutex);
45 #elif defined(USE_PTHREAD_MUTEX)
46  int ret = pthread_mutex_destroy(&p_htable->mutex);
47  assert(!ret);
48 #else
49  /* ABTI_spinlock needs no finalization. */
50 #endif
51  ABTU_free(p_htable->queue);
52  ABTU_free(p_htable);
53 }
54 
55 void ABTI_thread_htable_push(ABTI_thread_htable *p_htable, int idx,
56  ABTI_thread *p_thread)
57 {
58  ABTI_thread_queue *p_queue;
59 
60  if (idx >= p_htable->num_rows) {
61  ABTI_ASSERT(0);
62  /* Increase the hash table */
63  uint32_t cur_size, new_size;
64  cur_size = p_htable->num_rows;
65  new_size = (idx / cur_size + 1) * cur_size;
66  p_htable->queue = (ABTI_thread_queue *)
67  ABTU_realloc(p_htable->queue, cur_size * sizeof(ABTI_thread_queue),
68  new_size * sizeof(ABTI_thread_queue));
69  memset(&p_htable->queue[cur_size], 0,
70  (new_size - cur_size) * sizeof(ABTI_thread_queue));
71  p_htable->num_rows = new_size;
72  }
73 
74  /* Add p_thread to the end of the idx-th row */
75  p_queue = &p_htable->queue[idx];
76  ABTI_thread_queue_acquire_mutex(p_queue);
77  if (p_queue->head == NULL) {
78  p_queue->head = p_thread;
79  p_queue->tail = p_thread;
80  } else {
81  p_queue->tail->unit_def.p_next = &p_thread->unit_def;
82  p_queue->tail = p_thread;
83  }
84  p_queue->num_threads++;
85  ABTI_thread_queue_release_mutex(p_queue);
86  ABTD_atomic_fetch_add_uint32(&p_htable->num_elems, 1);
87 }
88 
89 /* Unlike ABTI_thread_htable_push, this function pushes p_thread to the queue
90  * only when the queue is not empty. */
91 ABT_bool ABTI_thread_htable_add(ABTI_thread_htable *p_htable, int idx,
92  ABTI_thread *p_thread)
93 {
94  ABTI_thread_queue *p_queue;
95 
96  p_queue = &p_htable->queue[idx];
97 
98  ABTI_thread_queue_acquire_mutex(p_queue);
99  if (p_queue->head == NULL) {
100  ABTI_ASSERT(p_queue->num_threads == 0);
101  ABTI_thread_queue_release_mutex(p_queue);
102  return ABT_FALSE;
103  } else {
104  /* Change the ULT's state to BLOCKED */
105  ABTI_thread_set_blocked(p_thread);
106 
107  p_queue->tail->unit_def.p_next = &p_thread->unit_def;
108  p_queue->tail = p_thread;
109  }
110  p_queue->num_threads++;
111  ABTI_thread_queue_release_mutex(p_queue);
112  ABTD_atomic_fetch_add_uint32(&p_htable->num_elems, 1);
113  return ABT_TRUE;
114 }
115 
116 void ABTI_thread_htable_push_low(ABTI_thread_htable *p_htable, int idx,
117  ABTI_thread *p_thread)
118 {
119  ABTI_thread_queue *p_queue;
120 
121  if (idx >= p_htable->num_rows) {
122  ABTI_ASSERT(0);
123  /* Increase the hash table */
124  uint32_t cur_size, new_size;
125  cur_size = p_htable->num_rows;
126  new_size = (idx / cur_size + 1) * cur_size;
127  p_htable->queue = (ABTI_thread_queue *)
128  ABTU_realloc(p_htable->queue, cur_size * sizeof(ABTI_thread_queue),
129  new_size * sizeof(ABTI_thread_queue));
130  memset(&p_htable->queue[cur_size], 0,
131  (new_size - cur_size) * sizeof(ABTI_thread_queue));
132  p_htable->num_rows = new_size;
133  }
134 
135  /* Add p_thread to the end of the idx-th row */
136  p_queue = &p_htable->queue[idx];
137  ABTI_thread_queue_acquire_low_mutex(p_queue);
138  if (p_queue->low_head == NULL) {
139  p_queue->low_head = p_thread;
140  p_queue->low_tail = p_thread;
141  } else {
142  p_queue->low_tail->unit_def.p_next = &p_thread->unit_def;
143  p_queue->low_tail = p_thread;
144  }
145  p_queue->low_num_threads++;
146  ABTI_thread_queue_release_low_mutex(p_queue);
147  ABTD_atomic_fetch_add_uint32(&p_htable->num_elems, 1);
148 }
149 
150 /* Unlike ABTI_thread_htable_push_low, this function pushes p_thread to the
151  * queue only when the queue is not empty. */
152 ABT_bool ABTI_thread_htable_add_low(ABTI_thread_htable *p_htable, int idx,
153  ABTI_thread *p_thread)
154 {
155  ABTI_thread_queue *p_queue;
156 
157  p_queue = &p_htable->queue[idx];
158 
159  ABTI_thread_queue_acquire_low_mutex(p_queue);
160  if (p_queue->low_head == NULL) {
161  ABTI_ASSERT(p_queue->low_num_threads == 0);
162  ABTI_thread_queue_release_low_mutex(p_queue);
163  return ABT_FALSE;
164  } else {
165  /* Change the ULT's state to BLOCKED */
166  ABTI_thread_set_blocked(p_thread);
167 
168  p_queue->low_tail->unit_def.p_next = &p_thread->unit_def;
169  p_queue->low_tail = p_thread;
170  }
171  p_queue->low_num_threads++;
172  ABTI_thread_queue_release_low_mutex(p_queue);
173  ABTD_atomic_fetch_add_uint32(&p_htable->num_elems, 1);
174  return ABT_TRUE;
175 }
176 
177 ABTI_thread *ABTI_thread_htable_pop(ABTI_thread_htable *p_htable,
178  ABTI_thread_queue *p_queue)
179 {
180  ABTI_thread *p_thread = NULL;
181 
182  ABTI_thread_queue_acquire_mutex(p_queue);
183  if (p_queue->head) {
184  ABTD_atomic_fetch_sub_uint32(&p_htable->num_elems, 1);
185  p_thread = p_queue->head;
186  if (p_queue->head == p_queue->tail) {
187  p_queue->head = NULL;
188  p_queue->tail = NULL;
189  } else {
190  ABT_thread next = p_thread->unit_def.p_next->handle.thread;
191  p_queue->head = ABTI_thread_get_ptr(next);
192  }
193 
194  p_queue->num_threads--;
195  }
196  ABTI_thread_queue_release_mutex(p_queue);
197 
198  return p_thread;
199 }
200 
201 ABTI_thread *ABTI_thread_htable_pop_low(ABTI_thread_htable *p_htable,
202  ABTI_thread_queue *p_queue)
203 {
204  ABTI_thread *p_thread = NULL;
205 
206  ABTI_thread_queue_acquire_low_mutex(p_queue);
207  if (p_queue->low_head) {
208  ABTD_atomic_fetch_sub_uint32(&p_htable->num_elems, 1);
209  p_thread = p_queue->low_head;
210  if (p_queue->low_head == p_queue->low_tail) {
211  p_queue->low_head = NULL;
212  p_queue->low_tail = NULL;
213  } else {
214  ABT_thread next = p_thread->unit_def.p_next->handle.thread;
215  p_queue->low_head = ABTI_thread_get_ptr(next);
216  }
217 
218  p_queue->low_num_threads--;
219  }
220  ABTI_thread_queue_release_low_mutex(p_queue);
221 
222  return p_thread;
223 }
224 
225 ABT_bool ABTI_thread_htable_switch_low(ABTI_local **pp_local,
226  ABTI_thread_queue *p_queue,
227  ABTI_thread *p_thread,
228  ABTI_thread_htable *p_htable)
229 {
230  ABTI_thread *p_target = NULL;
231 
232  ABTI_thread_queue_acquire_low_mutex(p_queue);
233  if (p_queue->low_head) {
234  p_target = p_queue->low_head;
235 
236  /* Push p_thread to the queue */
237  ABTD_atomic_release_store_int(&p_thread->state,
239  if (p_queue->low_head == p_queue->low_tail) {
240  p_queue->low_head = p_thread;
241  p_queue->low_tail = p_thread;
242  } else {
243  ABT_thread next = p_target->unit_def.p_next->handle.thread;
244  p_queue->low_head = ABTI_thread_get_ptr(next);
245  p_queue->low_tail->unit_def.p_next = &p_thread->unit_def;
246  p_queue->low_tail = p_thread;
247  }
248  }
249  ABTI_thread_queue_release_low_mutex(p_queue);
250 
251  if (p_target) {
252  LOG_EVENT("switch -> U%" PRIu64 "\n", ABTI_thread_get_id(p_target));
253 
254  /* Context-switch to p_target */
255  ABTD_atomic_release_store_int(&p_target->state,
257  ABTI_thread_context_switch_thread_to_thread(pp_local, p_thread,
258  p_target);
259  return ABT_TRUE;
260  } else {
261  return ABT_FALSE;
262  }
263 }
static void * ABTU_malloc(size_t size)
Definition: abtu.h:39
int ABT_bool
Definition: abt.h:309
#define ABT_FALSE
Definition: abt.h:224
struct ABT_thread_opaque * ABT_thread
Definition: abt.h:279
static void * ABTU_realloc(void *ptr, size_t old_size, size_t new_size)
Definition: abtu.h:56
#define LOG_EVENT(fmt,...)
Definition: abti_log.h:60
#define ABT_TRUE
Definition: abt.h:223
static void * ABTU_memalign(size_t alignment, size_t size)
Definition: abtu.h:25
static void ABTU_free(void *ptr)
Definition: abtu.h:32