ARGOBOTS
abti_stream.h
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #ifndef ABTI_XSTREAM_H_INCLUDED
7 #define ABTI_XSTREAM_H_INCLUDED
8 
9 /* Inlined functions for Execution Stream (ES) */
10 
11 static inline ABTI_xstream *ABTI_xstream_get_ptr(ABT_xstream xstream)
12 {
13 #ifndef ABT_CONFIG_DISABLE_ERROR_CHECK
14  ABTI_xstream *p_xstream;
15  if (xstream == ABT_XSTREAM_NULL) {
16  p_xstream = NULL;
17  } else {
18  p_xstream = (ABTI_xstream *)xstream;
19  }
20  return p_xstream;
21 #else
22  return (ABTI_xstream *)xstream;
23 #endif
24 }
25 
26 static inline ABT_xstream ABTI_xstream_get_handle(ABTI_xstream *p_xstream)
27 {
28 #ifndef ABT_CONFIG_DISABLE_ERROR_CHECK
29  ABT_xstream h_xstream;
30  if (p_xstream == NULL) {
31  h_xstream = ABT_XSTREAM_NULL;
32  } else {
33  h_xstream = (ABT_xstream)p_xstream;
34  }
35  return h_xstream;
36 #else
37  return (ABT_xstream)p_xstream;
38 #endif
39 }
40 
41 static inline void ABTI_xstream_set_request(ABTI_xstream *p_xstream,
42  uint32_t req)
43 {
44  ABTD_atomic_fetch_or_uint32(&p_xstream->request, req);
45 }
46 
47 static inline void ABTI_xstream_unset_request(ABTI_xstream *p_xstream,
48  uint32_t req)
49 {
50  ABTD_atomic_fetch_and_uint32(&p_xstream->request, ~req);
51 }
52 
53 /* Get the top scheduler from the sched stack (field scheds) */
54 static inline ABTI_sched *ABTI_xstream_get_top_sched(ABTI_xstream *p_xstream)
55 {
56  return p_xstream->scheds[p_xstream->num_scheds - 1];
57 }
58 
59 /* Get the parent scheduler of the current scheduler */
60 static inline ABTI_sched *ABTI_xstream_get_parent_sched(ABTI_xstream *p_xstream)
61 {
62  ABTI_ASSERT(p_xstream->num_scheds >= 2);
63  return p_xstream->scheds[p_xstream->num_scheds - 2];
64 }
65 
66 /* Get the scheduling context */
67 static inline ABTD_thread_context *
68 ABTI_xstream_get_sched_ctx(ABTI_xstream *p_xstream)
69 {
70  ABTI_sched *p_sched = ABTI_xstream_get_top_sched(p_xstream);
71  return p_sched->p_ctx;
72 }
73 
74 /* Remove the top scheduler from the sched stack (field scheds) */
75 static inline void ABTI_xstream_pop_sched(ABTI_xstream *p_xstream)
76 {
77  p_xstream->num_scheds--;
78  ABTI_ASSERT(p_xstream->num_scheds >= 0);
79 }
80 
81 /* Replace the top scheduler of the sched stack (field scheds) with the target
82  * scheduler */
83 static inline void ABTI_xstream_replace_top_sched(ABTI_xstream *p_xstream,
84  ABTI_sched *p_sched)
85 {
86  p_xstream->scheds[p_xstream->num_scheds - 1] = p_sched;
87 }
88 
89 /* Add the specified scheduler to the sched stack (field scheds) */
90 static inline void ABTI_xstream_push_sched(ABTI_xstream *p_xstream,
91  ABTI_sched *p_sched)
92 {
93  if (p_xstream->num_scheds == p_xstream->max_scheds) {
94  int cur_size = p_xstream->max_scheds;
95  int new_size = cur_size + 10;
96  void *temp;
97  temp = ABTU_realloc(p_xstream->scheds, cur_size * sizeof(ABTI_sched *),
98  new_size * sizeof(ABTI_sched *));
99  p_xstream->scheds = (ABTI_sched **)temp;
100  p_xstream->max_scheds = new_size;
101  }
102 
103  p_xstream->scheds[p_xstream->num_scheds++] = p_sched;
104 }
105 
106 /* Get the first pool of the main scheduler. */
107 static inline ABTI_pool *ABTI_xstream_get_main_pool(ABTI_xstream *p_xstream)
108 {
109  ABT_pool pool = p_xstream->p_main_sched->pools[0];
110  return ABTI_pool_get_ptr(pool);
111 }
112 
113 static inline void ABTI_xstream_terminate_thread(ABTI_local *p_local,
114  ABTI_thread *p_thread)
115 {
116  LOG_EVENT("[U%" PRIu64 ":E%d] terminated\n", ABTI_thread_get_id(p_thread),
117  p_thread->p_last_xstream->rank);
118  if (p_thread->refcount == 0) {
119  ABTD_atomic_release_store_int(&p_thread->state,
121  ABTI_thread_free(p_local, p_thread);
122 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
123  } else if (p_thread->is_sched) {
124  /* NOTE: p_thread itself will be freed in ABTI_sched_free. */
125  ABTD_atomic_release_store_int(&p_thread->state,
127  ABTI_sched_discard_and_free(p_local, p_thread->is_sched);
128 #endif
129  } else {
130  /* NOTE: We set the ULT's state as TERMINATED after checking refcount
131  * because the ULT can be freed on a different ES. In other words, we
132  * must not access any field of p_thead after changing the state to
133  * TERMINATED. */
134  ABTD_atomic_release_store_int(&p_thread->state,
136  }
137 }
138 
139 static inline void ABTI_xstream_terminate_task(ABTI_local *p_local,
140  ABTI_task *p_task)
141 {
142  LOG_EVENT("[T%" PRIu64 ":E%d] terminated\n", ABTI_task_get_id(p_task),
143  p_task->p_xstream->rank);
144  if (p_task->refcount == 0) {
145  ABTD_atomic_release_store_int(&p_task->state,
147  ABTI_task_free(p_local, p_task);
148 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED
149  } else if (p_task->is_sched) {
150  /* NOTE: p_task itself will be freed in ABTI_sched_free. */
151  ABTD_atomic_release_store_int(&p_task->state,
153  ABTI_sched_discard_and_free(p_local, p_task->is_sched);
154 #endif
155  } else {
156  /* NOTE: We set the task's state as TERMINATED after checking refcount
157  * because the task can be freed on a different ES. In other words, we
158  * must not access any field of p_task after changing the state to
159  * TERMINATED. */
160  ABTD_atomic_release_store_int(&p_task->state,
162  }
163 }
164 
165 /* Get the native thread id associated with the target xstream. */
166 static inline ABTI_native_thread_id
167 ABTI_xstream_get_native_thread_id(ABTI_xstream *p_xstream)
168 {
169  return (ABTI_native_thread_id)p_xstream;
170 }
171 
172 #endif /* ABTI_XSTREAM_H_INCLUDED */
struct ABT_xstream_opaque * ABT_xstream
Definition: abt.h:251
#define ABT_XSTREAM_NULL
Definition: abt.h:337
struct ABT_pool_opaque * ABT_pool
Definition: abt.h:267
static void * ABTU_realloc(void *ptr, size_t old_size, size_t new_size)
Definition: abtu.h:56
#define LOG_EVENT(fmt,...)
Definition: abti_log.h:60