ARGOBOTS  1.1
abtd_stream.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 static void *xstream_context_thread_func(void *arg)
9 {
11  void *(*thread_f)(void *) = p_ctx->thread_f;
12  void *p_arg = p_ctx->p_arg;
14  while (1) {
15  /* Execute a main execution stream function. */
16  thread_f(p_arg);
17  /* This thread has finished. */
18  ABT_bool restart;
19  pthread_mutex_lock(&p_ctx->state_lock);
20  /* If another execution stream is waiting for this thread completion,
21  * let's wake it up. */
23  pthread_cond_signal(&p_ctx->state_cond);
24  }
26  /* Wait for a request from ABTD_xstream_context_free() or
27  * ABTD_xstream_context_restart().
28  * The following loop is to deal with spurious wakeup. */
29  do {
30  pthread_cond_wait(&p_ctx->state_cond, &p_ctx->state_lock);
31  } while (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_WAITING);
33  /* ABTD_xstream_context_free() terminates this thread. */
34  restart = ABT_FALSE;
35  } else {
36  /* ABTD_xstream_context_restart() restarts this thread */
39  restart = ABT_TRUE;
40  }
41  pthread_mutex_unlock(&p_ctx->state_lock);
42  if (!restart)
43  break;
44  }
45  return NULL;
46 }
47 
48 ABTU_ret_err int ABTD_xstream_context_create(void *(*f_xstream)(void *),
49  void *p_arg,
50  ABTD_xstream_context *p_ctx)
51 {
52  p_ctx->thread_f = f_xstream;
53  p_ctx->p_arg = p_arg;
54  /* Coverity thinks p_ctx->state must be updated with a lock since it is
55  * updated with a lock in the other places. This assumption is wrong. The
56  * following suppresses a false positive. */
57  /* coverity[missing_lock] */
59  int ret, init_stage = 0;
60  ret = pthread_mutex_init(&p_ctx->state_lock, NULL);
61  if (ret != 0)
62  goto FAILED;
63  init_stage = 1;
64 
65  ret = pthread_cond_init(&p_ctx->state_cond, NULL);
66  if (ret != 0)
67  goto FAILED;
68  init_stage = 2;
69 
70  ret = pthread_create(&p_ctx->native_thread, NULL,
72  if (ret != 0)
73  goto FAILED;
74  init_stage = 3;
75 
76  return ABT_SUCCESS;
77 FAILED:
78  if (init_stage >= 2) {
79  ret = pthread_cond_destroy(&p_ctx->state_cond);
80  ABTI_ASSERT(ret == 0);
81  }
82  if (init_stage >= 1) {
83  ret = pthread_mutex_destroy(&p_ctx->state_lock);
84  ABTI_ASSERT(ret == 0);
85  }
88 }
89 
91 {
92  /* Request termination */
94  /* Do nothing. */
95  } else {
96  pthread_mutex_lock(&p_ctx->state_lock);
99  pthread_cond_signal(&p_ctx->state_cond);
100  pthread_mutex_unlock(&p_ctx->state_lock);
101  /* Join the target thread. */
102  int ret;
103  ret = pthread_join(p_ctx->native_thread, NULL);
104  ABTI_ASSERT(ret == 0);
105  ret = pthread_cond_destroy(&p_ctx->state_cond);
106  ABTI_ASSERT(ret == 0);
107  ret = pthread_mutex_destroy(&p_ctx->state_lock);
108  ABTI_ASSERT(ret == 0);
109  }
110 }
111 
113 {
114  /* If not finished, sleep this thread. */
115  pthread_mutex_lock(&p_ctx->state_lock);
119  /* The following loop is to deal with spurious wakeup. */
120  do {
121  pthread_cond_wait(&p_ctx->state_cond, &p_ctx->state_lock);
122  } while (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN);
123  }
125  pthread_mutex_unlock(&p_ctx->state_lock);
126 }
127 
129 {
130  /* Request restart */
131  pthread_mutex_lock(&p_ctx->state_lock);
134  pthread_cond_signal(&p_ctx->state_cond);
135  pthread_mutex_unlock(&p_ctx->state_lock);
136 }
137 
139 {
140  p_ctx->native_thread = pthread_self();
141 }
142 
144  int indent)
145 {
146  if (p_ctx == NULL) {
147  fprintf(p_os, "%*s== NULL XSTREAM CONTEXT ==\n", indent, "");
148  } else {
149  const char *state;
151  state = "RUNNING";
152  } else if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_WAITING) {
153  state = "WAITING";
154  } else if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN) {
155  state = "REQ_JOIN";
156  } else if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_TERMINATE) {
157  state = "REQ_TERMINATE";
158  } else if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_UNINIT) {
159  state = "UNINIT";
160  } else {
161  state = "UNKNOWN";
162  }
163  fprintf(p_os,
164  "%*s== XSTREAM CONTEXT (%p) ==\n"
165  "%*sstate : %s\n",
166  indent, "", (void *)p_ctx, indent, "", state);
167  }
168  fflush(p_os);
169 }
ABTD_xstream_context::native_thread
pthread_t native_thread
Definition: abtd.h:25
ABT_ERR_SYS
#define ABT_ERR_SYS
Error code: error related to system calls and standard libraries.
Definition: abt.h:393
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1001
ABTD_XSTREAM_CONTEXT_STATE_WAITING
@ ABTD_XSTREAM_CONTEXT_STATE_WAITING
Definition: abtd.h:19
xstream_context_thread_func
static void * xstream_context_thread_func(void *arg)
Definition: abtd_stream.c:8
ABTD_xstream_context_revive
void ABTD_xstream_context_revive(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:128
ABTD_XSTREAM_CONTEXT_STATE_RUNNING
@ ABTD_XSTREAM_CONTEXT_STATE_RUNNING
Definition: abtd.h:18
ABTD_xstream_context
Definition: abtd.h:24
ABTD_xstream_context_free
void ABTD_xstream_context_free(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:90
ABTD_xstream_context_create
ABTU_ret_err int ABTD_xstream_context_create(void *(*f_xstream)(void *), void *p_arg, ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:48
ABTD_xstream_context_join
void ABTD_xstream_context_join(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:112
ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN
@ ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN
Definition: abtd.h:20
ABTD_xstream_context::state_lock
pthread_mutex_t state_lock
Definition: abtd.h:29
abti.h
ABTD_XSTREAM_CONTEXT_STATE_REQ_TERMINATE
@ ABTD_XSTREAM_CONTEXT_STATE_REQ_TERMINATE
Definition: abtd.h:21
ABTI_HANDLE_ERROR
#define ABTI_HANDLE_ERROR(n)
Definition: abti_error.h:114
ABTD_xstream_context::state_cond
pthread_cond_t state_cond
Definition: abtd.h:30
ABTI_ASSERT
#define ABTI_ASSERT(cond)
Definition: abti_error.h:12
ABTD_xstream_context::p_arg
void * p_arg
Definition: abtd.h:27
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:146
ABTD_XSTREAM_CONTEXT_STATE_UNINIT
@ ABTD_XSTREAM_CONTEXT_STATE_UNINIT
Definition: abtd.h:22
ABTD_xstream_context::state
ABTD_xstream_context_state state
Definition: abtd.h:28
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:748
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:750
ABTD_xstream_context::thread_f
void *(* thread_f)(void *)
Definition: abtd.h:26
ABTD_xstream_context_print
void ABTD_xstream_context_print(ABTD_xstream_context *p_ctx, FILE *p_os, int indent)
Definition: abtd_stream.c:143
ABTD_xstream_context_set_self
void ABTD_xstream_context_set_self(ABTD_xstream_context *p_ctx)
Definition: abtd_stream.c:138