ARGOBOTS
abtd_stream.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 static void *ABTDI_xstream_context_thread_func(void *arg)
9 {
10  ABTD_xstream_context *p_ctx = (ABTD_xstream_context *)arg;
11  void *(*thread_f)(void *) = p_ctx->thread_f;
12  void *p_arg = p_ctx->p_arg;
13  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_RUNNING);
14  while (1) {
15  /* Execute a main execution stream function. */
16  thread_f(p_arg);
17  /* This thread has finished. */
18  ABT_bool restart;
19  pthread_mutex_lock(&p_ctx->state_lock);
20  /* If another execution stream is waiting for this thread completion,
21  * let's wake it up. */
22  if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN) {
23  pthread_cond_signal(&p_ctx->state_cond);
24  }
25  p_ctx->state = ABTD_XSTREAM_CONTEXT_STATE_WAITING;
26  /* Wait for a request from ABTD_xstream_context_free() or
27  * ABTD_xstream_context_restart().
28  * The following loop is to deal with spurious wakeup. */
29  do {
30  pthread_cond_wait(&p_ctx->state_cond, &p_ctx->state_lock);
31  } while (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_WAITING);
32  if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_TERMINATE) {
33  /* ABTD_xstream_context_free() terminates this thread. */
34  restart = ABT_FALSE;
35  } else {
36  /* ABTD_xstream_context_restart() restarts this thread */
37  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_RUNNING ||
38  p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN);
39  restart = ABT_TRUE;
40  }
41  pthread_mutex_unlock(&p_ctx->state_lock);
42  if (!restart)
43  break;
44  }
45  return NULL;
46 }
47 
48 int ABTD_xstream_context_create(void *(*f_xstream)(void *), void *p_arg,
49  ABTD_xstream_context *p_ctx)
50 {
51  int abt_errno = ABT_SUCCESS;
52  p_ctx->thread_f = f_xstream;
53  p_ctx->p_arg = p_arg;
54  p_ctx->state = ABTD_XSTREAM_CONTEXT_STATE_RUNNING;
55  pthread_mutex_init(&p_ctx->state_lock, NULL);
56  pthread_cond_init(&p_ctx->state_cond, NULL);
57  int ret = pthread_create(&p_ctx->native_thread, NULL,
59  if (ret != 0) {
60  HANDLE_ERROR("pthread_create");
61  abt_errno = ABT_ERR_XSTREAM;
62  }
63  return abt_errno;
64 }
65 
66 int ABTD_xstream_context_free(ABTD_xstream_context *p_ctx)
67 {
68  int abt_errno = ABT_SUCCESS;
69  /* Request termination */
70  pthread_mutex_lock(&p_ctx->state_lock);
71  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_WAITING);
72  p_ctx->state = ABTD_XSTREAM_CONTEXT_STATE_REQ_TERMINATE;
73  pthread_cond_signal(&p_ctx->state_cond);
74  pthread_mutex_unlock(&p_ctx->state_lock);
75  /* Join the target thread. */
76  int ret = pthread_join(p_ctx->native_thread, NULL);
77  if (ret != 0) {
78  HANDLE_ERROR("pthread_join");
79  abt_errno = ABT_ERR_XSTREAM;
80  }
81  pthread_cond_destroy(&p_ctx->state_cond);
82  pthread_mutex_destroy(&p_ctx->state_lock);
83  return abt_errno;
84 }
85 
86 int ABTD_xstream_context_join(ABTD_xstream_context *p_ctx)
87 {
88  int abt_errno = ABT_SUCCESS;
89  /* If not finished, sleep this thread. */
90  pthread_mutex_lock(&p_ctx->state_lock);
91  if (p_ctx->state != ABTD_XSTREAM_CONTEXT_STATE_WAITING) {
92  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_RUNNING);
93  p_ctx->state = ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN;
94  /* The following loop is to deal with spurious wakeup. */
95  do {
96  pthread_cond_wait(&p_ctx->state_cond, &p_ctx->state_lock);
97  } while (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN);
98  }
99  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_WAITING);
100  pthread_mutex_unlock(&p_ctx->state_lock);
101  return abt_errno;
102 }
103 
104 int ABTD_xstream_context_revive(ABTD_xstream_context *p_ctx)
105 {
106  int abt_errno = ABT_SUCCESS;
107  /* Request restart */
108  pthread_mutex_lock(&p_ctx->state_lock);
109  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_WAITING);
110  p_ctx->state = ABTD_XSTREAM_CONTEXT_STATE_RUNNING;
111  pthread_cond_signal(&p_ctx->state_cond);
112  pthread_mutex_unlock(&p_ctx->state_lock);
113  return abt_errno;
114 }
115 
116 int ABTD_xstream_context_set_self(ABTD_xstream_context *p_ctx)
117 {
118  int abt_errno = ABT_SUCCESS;
119  p_ctx->native_thread = pthread_self();
120  return abt_errno;
121 }
#define HANDLE_ERROR(msg)
Definition: abti_error.h:227
int ABT_bool
Definition: abt.h:309
#define ABT_FALSE
Definition: abt.h:224
#define ABT_SUCCESS
Definition: abt.h:64
#define ABT_TRUE
Definition: abt.h:223
static void * ABTDI_xstream_context_thread_func(void *arg)
Definition: abtd_stream.c:8
#define ABT_ERR_XSTREAM
Definition: abt.h:93