1
/* Copyright (c) 2013-2018, Linaro Limited
2
 * Copyright (c) 2019, Nokia
3
 * All rights reserved.
4
 *
5
 * SPDX-License-Identifier:     BSD-3-Clause
6
 */
7

8
#ifndef _GNU_SOURCE
9
#define _GNU_SOURCE
10
#endif
11
#include <sched.h>
12
#include <stdlib.h>
13
#include <string.h>
14
#include <unistd.h>
15
#include <sys/wait.h>
16
#include <sys/prctl.h>
17
#include <sys/syscall.h>
18

19
#include <odp_api.h>
20
#include <odp/helper/threads.h>
21
#include <odp/helper/odph_debug.h>
22

23
#define FAILED_CPU -1
24

25
/* Thread status codes */
26
#define NOT_STARTED 0
27
#define SYNC_INIT   1
28
#define INIT_DONE   2
29
#define STARTED     3
30

31
static odph_helper_options_t helper_options;
32

33
/*
34
 * Run a thread, either as Linux pthread or process.
35
 * In process mode, if start_routine returns NULL, the process return FAILURE.
36
 */
37 1
static void *run_thread(void *arg)
38
{
39
	int status;
40
	int ret;
41
	odp_instance_t instance;
42
	odph_odpthread_params_t *thr_params;
43

44 1
	odph_thread_start_args_t *start_args = arg;
45

46 1
	thr_params = &start_args->thr_params;
47 1
	instance   = start_args->instance;
48

49
	/* ODP thread local init */
50 1
	if (odp_init_local(instance, thr_params->thr_type)) {
51 0
		ODPH_ERR("Local init failed\n");
52 0
		if (start_args->mem_model == ODP_MEM_MODEL_PROCESS)
53 0
			_exit(EXIT_FAILURE);
54 0
		return (void *)-1;
55
	}
56

57 1
	ODPH_DBG("helper: ODP %s thread started as linux %s. (pid=%d)\n",
58
		 thr_params->thr_type == ODP_THREAD_WORKER ?
59
		 "worker" : "control",
60
		 (start_args->mem_model == ODP_MEM_MODEL_THREAD) ?
61
		 "pthread" : "process",
62
		 (int)getpid());
63

64 1
	if (odp_atomic_load_u32(&start_args->status) == SYNC_INIT)
65 1
		odp_atomic_store_rel_u32(&start_args->status, INIT_DONE);
66

67 1
	status = thr_params->start(thr_params->arg);
68 1
	ret = odp_term_local();
69

70 1
	if (ret < 0)
71 1
		ODPH_ERR("Local term failed\n");
72

73
	/* for process implementation of odp threads, just return status... */
74 1
	if (start_args->mem_model == ODP_MEM_MODEL_PROCESS)
75 0
		_exit(status);
76

77
	/* threads implementation return void* pointers: cast status to that. */
78 1
	return (void *)(intptr_t)status;
79
}
80

81
/*
82
 * Create a single linux process
83
 */
84 1
static int create_process(odph_thread_t *thread, int cpu)
85
{
86
	cpu_set_t cpu_set;
87
	pid_t pid;
88

89 1
	CPU_ZERO(&cpu_set);
90 1
	CPU_SET(cpu, &cpu_set);
91

92 1
	thread->start_args.mem_model = ODP_MEM_MODEL_PROCESS;
93 1
	thread->cpu = cpu;
94

95 1
	pid = fork();
96 1
	if (pid < 0) {
97 0
		ODPH_ERR("fork() failed\n");
98 0
		thread->cpu = FAILED_CPU;
99 0
		return -1;
100
	}
101

102
	/* Parent continues to fork */
103 1
	if (pid > 0) {
104 1
		thread->proc.pid  = pid;
105 1
		return 0;
106
	}
107

108
	/* Child process */
109

110
	/* Request SIGTERM if parent dies */
111 0
	prctl(PR_SET_PDEATHSIG, SIGTERM);
112
	/* Parent died already? */
113 0
	if (getppid() == 1)
114 0
		kill(getpid(), SIGTERM);
115

116 0
	if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set)) {
117 0
		ODPH_ERR("sched_setaffinity() failed\n");
118 0
		return -2;
119
	}
120

121 0
	run_thread(&thread->start_args);
122

123 0
	return 0; /* never reached */
124
}
125

126
/*
127
 * Wait single process to exit
128
 */
129 0
static int wait_process(odph_thread_t *thread)
130
{
131
	pid_t pid;
132 0
	int status = 0;
133

134 0
	pid = waitpid(thread->proc.pid, &status, 0);
135

136 0
	if (pid < 0) {
137 0
		ODPH_ERR("waitpid() failed\n");
138 0
		return -1;
139
	}
140

141
	/* Examine the child process' termination status */
142 0
	if (WIFEXITED(status) &&
143 0
	    WEXITSTATUS(status) != EXIT_SUCCESS) {
144 0
		ODPH_ERR("Child exit status:%d (pid:%d)\n",
145
			 WEXITSTATUS(status), (int)pid);
146 0
		return -1;
147
	}
148

149 0
	if (WIFSIGNALED(status)) {
150 0
		int signo = WTERMSIG(status);
151

152 0
		ODPH_ERR("Child term signo:%d - %s (pid:%d)\n",
153
			 signo, strsignal(signo), (int)pid);
154 0
		return -1;
155
	}
156

157 0
	return 0;
158
}
159

160
/*
161
 * Create a single linux pthread
162
 */
163 1
static int create_pthread(odph_thread_t *thread, int cpu)
164
{
165
	int ret;
166
	cpu_set_t cpu_set;
167

168 1
	CPU_ZERO(&cpu_set);
169 1
	CPU_SET(cpu, &cpu_set);
170

171 1
	pthread_attr_init(&thread->thread.attr);
172

173 1
	thread->cpu = cpu;
174

175 1
	pthread_attr_setaffinity_np(&thread->thread.attr,
176
				    sizeof(cpu_set_t), &cpu_set);
177

178 1
	thread->start_args.mem_model = ODP_MEM_MODEL_THREAD;
179

180 1
	ret = pthread_create(&thread->thread.thread_id,
181 1
			     &thread->thread.attr,
182
			     run_thread,
183 1
			     &thread->start_args);
184 1
	if (ret != 0) {
185 0
		ODPH_ERR("Failed to start thread on CPU #%d: %d\n", cpu, ret);
186 0
		thread->cpu = FAILED_CPU;
187 0
		return ret;
188
	}
189

190 1
	return 0;
191
}
192

193
/*
194
 * Wait single pthread to exit
195
 */
196 1
static int wait_pthread(odph_thread_t *thread)
197
{
198
	int ret;
199 1
	void *thread_ret = NULL;
200

201
	/* Wait thread to exit */
202 1
	ret = pthread_join(thread->thread.thread_id, &thread_ret);
203

204 1
	if (ret) {
205 0
		ODPH_ERR("pthread_join failed (%i) from cpu #%i\n",
206
			 ret, thread->cpu);
207 0
		return -1;
208
	}
209

210 1
	if (thread_ret) {
211 0
		ODPH_ERR("Bad exit status cpu #%i %p\n",
212
			 thread->cpu, thread_ret);
213 0
		return -1;
214
	}
215

216 1
	ret = pthread_attr_destroy(&thread->thread.attr);
217

218 1
	if (ret) {
219 0
		ODPH_ERR("pthread_attr_destroy failed (%i) from cpu #%i\n",
220
			 ret, thread->cpu);
221 0
		return -1;
222
	}
223

224 1
	return 0;
225
}
226

227 1
int odph_thread_create(odph_thread_t thread[],
228
		       const odph_thread_common_param_t *param,
229
		       const odph_thread_param_t thr_param[],
230
		       int num)
231
{
232
	int i, num_cpu, cpu;
233 1
	const odp_cpumask_t *cpumask = param->cpumask;
234 1
	int use_pthread = 1;
235

236 1
	if (param->thread_model == 1)
237 0
		use_pthread = 0;
238

239 1
	if (helper_options.mem_model == ODP_MEM_MODEL_PROCESS)
240 0
		use_pthread = 0;
241

242 1
	if (num < 1) {
243 0
		ODPH_ERR("Bad number of threads (%i)\n", num);
244 0
		return -1;
245
	}
246

247 1
	num_cpu = odp_cpumask_count(cpumask);
248

249 1
	if (num_cpu != num) {
250 0
		ODPH_ERR("Number of threads (%i) and CPUs (%i) does not match"
251
			 "\n", num, num_cpu);
252 0
		return -1;
253
	}
254

255 1
	memset(thread, 0, num * sizeof(odph_thread_t));
256

257 1
	cpu = odp_cpumask_first(cpumask);
258 1
	for (i = 0; i < num; i++) {
259 1
		odph_thread_start_args_t *start_args = &thread[i].start_args;
260

261
		/* Copy thread parameters */
262 1
		if (param->share_param)
263 1
			start_args->thr_params = thr_param[0];
264
		else
265 1
			start_args->thr_params = thr_param[i];
266

267 1
		start_args->instance   = param->instance;
268

269 1
		if (param->sync)
270 1
			odp_atomic_init_u32(&start_args->status, SYNC_INIT);
271
		else
272 1
			odp_atomic_init_u32(&start_args->status, NOT_STARTED);
273

274 1
		if (use_pthread) {
275 1
			if (create_pthread(&thread[i], cpu))
276 0
				break;
277
		} else {
278 0
			if (create_process(&thread[i], cpu))
279 0
				break;
280
		}
281

282
		/* Wait newly created thread to update status */
283 1
		if (param->sync) {
284
			odp_time_t t1, t2;
285
			uint64_t diff_ns;
286
			uint32_t status;
287 1
			int timeout = 0;
288 1
			odp_atomic_u32_t *atomic = &start_args->status;
289

290 1
			t1 = odp_time_local();
291

292
			do {
293 1
				odp_cpu_pause();
294 1
				t2 = odp_time_local();
295 1
				diff_ns = odp_time_diff_ns(t2, t1);
296 1
				timeout = diff_ns > ODP_TIME_SEC_IN_NS;
297 1
				status = odp_atomic_load_acq_u32(atomic);
298

299 1
			} while (status != INIT_DONE && timeout == 0);
300

301 1
			if (timeout) {
302 0
				ODPH_ERR("Thread (i:%i) start up timeout\n", i);
303 0
				break;
304
			}
305
		}
306

307 1
		odp_atomic_store_u32(&start_args->status, STARTED);
308

309 1
		cpu = odp_cpumask_next(cpumask, cpu);
310
	}
311

312 1
	return i;
313
}
314

315 1
int odph_thread_join(odph_thread_t thread[], int num)
316
{
317
	odph_thread_start_args_t *start_args;
318
	int i;
319

320 1
	for (i = 0; i < num; i++) {
321 1
		start_args = &thread[i].start_args;
322

323 1
		if (odp_atomic_load_u32(&start_args->status) != STARTED) {
324 0
			ODPH_DBG("Thread (i:%i) not started.\n", i);
325 0
			break;
326
		}
327

328 1
		if (thread[i].start_args.mem_model == ODP_MEM_MODEL_THREAD) {
329 1
			if (wait_pthread(&thread[i]))
330 0
				break;
331
		} else {
332 0
			if (wait_process(&thread[i]))
333 0
				break;
334
		}
335

336 1
		odp_atomic_store_u32(&start_args->status, NOT_STARTED);
337
	}
338

339 1
	return i;
340
}
341

342
/*
343
 * create an odpthread set (as linux processes or linux threads or both)
344
 */
345 1
int odph_odpthreads_create(odph_odpthread_t *thread_tbl,
346
			   const odp_cpumask_t *mask,
347
			   const odph_odpthread_params_t *thr_params)
348
{
349
	int i;
350
	int num;
351
	int cpu_count;
352
	int cpu;
353

354 1
	num = odp_cpumask_count(mask);
355

356 1
	memset(thread_tbl, 0, num * sizeof(odph_odpthread_t));
357

358 1
	cpu_count = odp_cpu_count();
359

360 1
	if (num < 1 || num > cpu_count) {
361 0
		ODPH_ERR("Invalid number of odpthreads:%d"
362
			 " (%d cores available)\n",
363
			 num, cpu_count);
364 0
		return -1;
365
	}
366

367 1
	cpu = odp_cpumask_first(mask);
368 1
	for (i = 0; i < num; i++) {
369
		odph_thread_start_args_t *start_args;
370

371 1
		start_args = &thread_tbl[i].start_args;
372

373
		/* Copy thread parameters */
374 1
		start_args->thr_params = *thr_params;
375 1
		start_args->instance   = thr_params->instance;
376

377 1
		if (helper_options.mem_model == ODP_MEM_MODEL_THREAD) {
378 1
			if (create_pthread(&thread_tbl[i], cpu))
379 0
				break;
380
		 } else {
381 1
			if (create_process(&thread_tbl[i], cpu))
382 0
				break;
383
		}
384

385 1
		cpu = odp_cpumask_next(mask, cpu);
386
	}
387 1
	thread_tbl[num - 1].last = 1;
388

389 1
	return i;
390
}
391

392
/*
393
 * wait for the odpthreads termination (linux processes and threads)
394
 */
395 1
int odph_odpthreads_join(odph_odpthread_t *thread_tbl)
396
{
397
	pid_t pid;
398 1
	int i = 0;
399 1
	int terminated = 0;
400
	/* child process return code (!=0 is error) */
401 1
	int status = 0;
402
	/* "child" thread return code (!NULL is error) */
403 1
	void *thread_ret = NULL;
404
	int ret;
405 1
	int retval = 0;
406

407
	/* joins linux threads or wait for processes */
408
	do {
409 1
		if (thread_tbl[i].cpu == FAILED_CPU) {
410 0
			ODPH_DBG("ODP thread %d not started.\n", i);
411 0
			continue;
412
		}
413
		/* pthreads: */
414 1
		if (thread_tbl[i].start_args.mem_model ==
415
				ODP_MEM_MODEL_THREAD) {
416
			/* Wait thread to exit */
417 1
			ret = pthread_join(thread_tbl[i].thread.thread_id,
418
					   &thread_ret);
419 1
			if (ret != 0) {
420 0
				ODPH_ERR("Failed to join thread from cpu #%d\n",
421
					 thread_tbl[i].cpu);
422 0
				retval = -1;
423
			} else {
424 1
				terminated++;
425 1
				if (thread_ret != NULL) {
426 0
					ODPH_ERR("Bad exit status cpu #%d %p\n",
427
						 thread_tbl[i].cpu, thread_ret);
428 0
					retval = -1;
429
				}
430
			}
431 1
			pthread_attr_destroy(&thread_tbl[i].thread.attr);
432
		} else {
433
			/* processes: */
434 1
			pid = waitpid(thread_tbl[i].proc.pid, &status, 0);
435

436 1
			if (pid < 0) {
437 0
				ODPH_ERR("waitpid() failed\n");
438 0
				retval = -1;
439 0
				break;
440
			}
441

442 1
			terminated++;
443

444
			/* Examine the child process' termination status */
445 1
			if (WIFEXITED(status) &&
446 1
			    WEXITSTATUS(status) != EXIT_SUCCESS) {
447 0
				ODPH_ERR("Child exit status:%d (pid:%d)\n",
448
					 WEXITSTATUS(status), (int)pid);
449 0
				retval = -1;
450
			}
451 1
			if (WIFSIGNALED(status)) {
452 0
				int signo = WTERMSIG(status);
453

454 0
				ODPH_ERR("Child term signo:%d - %s (pid:%d)\n",
455
					 signo, strsignal(signo), (int)pid);
456 0
				retval = -1;
457
			}
458
		}
459 1
	} while (!thread_tbl[i++].last);
460

461 1
	return (retval < 0) ? retval : terminated;
462
}
463

464
/* man gettid() notes:
465
 * Glibc does not provide a wrapper for this system call;
466
 */
467 1
static inline pid_t __gettid(void)
468
{
469 1
	return (pid_t)syscall(SYS_gettid);
470
}
471

472 1
int odph_odpthread_setaffinity(const int cpu)
473
{
474
	cpu_set_t cpuset;
475

476 1
	CPU_ZERO(&cpuset);
477 1
	CPU_SET(cpu, &cpuset);
478

479
	/* determine main process or pthread based on
480
	 * equality of thread and thread group IDs.
481
	 */
482 1
	if (__gettid() == getpid()) {
483 1
		return sched_setaffinity(
484
			0, /* pid zero means calling process */
485
			sizeof(cpu_set_t), &cpuset);
486
	}
487

488
	/* on error, they return a nonzero error number. */
489 1
	return (0 == pthread_setaffinity_np(
490 1
		pthread_self(), sizeof(cpu_set_t), &cpuset)) ? 0 : -1;
491
}
492

493 1
int odph_odpthread_getaffinity(void)
494
{
495
	int cpu, result;
496
	cpu_set_t cpuset;
497

498 1
	CPU_ZERO(&cpuset);
499 1
	if (__gettid() == getpid()) {
500 1
		result = sched_getaffinity(
501
			0, sizeof(cpu_set_t), &cpuset);
502
	} else {
503 1
		result = pthread_getaffinity_np(
504
			pthread_self(), sizeof(cpu_set_t), &cpuset);
505
	}
506

507
	/* ODP thread mean to run on single CPU core */
508 1
	if ((result == 0) && (CPU_COUNT(&cpuset) == 1)) {
509 1
		for (cpu = 0; cpu < CPU_SETSIZE; cpu++) {
510 1
			if (CPU_ISSET(cpu, &cpuset))
511 1
				return cpu;
512
		}
513
	}
514 0
	return -1;
515
}
516

517 1
int odph_parse_options(int argc, char *argv[])
518
{
519
	char *env;
520
	int i, j;
521

522 1
	helper_options.mem_model = ODP_MEM_MODEL_THREAD;
523

524
	/* Enable process mode using environment variable. Setting environment
525
	 * variable is easier for CI testing compared to command line
526
	 * argument. */
527 1
	env = getenv("ODPH_PROC_MODE");
528 1
	if (env && atoi(env))
529 0
		helper_options.mem_model = ODP_MEM_MODEL_PROCESS;
530

531
	/* Find and remove option */
532 1
	for (i = 0; i < argc;) {
533 1
		if (strcmp(argv[i], "--odph_proc") == 0) {
534 1
			helper_options.mem_model = ODP_MEM_MODEL_PROCESS;
535

536 1
			for (j = i; j < argc - 1; j++)
537 0
				argv[j] = argv[j + 1];
538

539 1
			argc--;
540 1
			continue;
541
		}
542

543 1
		i++;
544
	}
545

546 1
	return argc;
547
}
548

549 1
int odph_options(odph_helper_options_t *options)
550
{
551 1
	memset(options, 0, sizeof(odph_helper_options_t));
552

553 1
	options->mem_model = helper_options.mem_model;
554

555 1
	return 0;
556
}

Read our documentation on viewing source code .

Loading