超轻量 pthread 集结点实现
我需要的 pthread 线程集结点功能,使用同一集结点的线程将通过 rend_wait 函数等待,当集结点到达指定数量的线程后同时激发继续执行。使用 pthread 的 mutex 和 cond 超轻量实现。下面 rend.h 是集结点实现,rendezvous.c 是测试应用。/* * rend.h * *Created on: 2009-11-14 * Author: liuzy (lzy.dev@gmail.com) */#ifndef REND_H_#define REND_H_#include <pthread.h>#include <assert.h>struct rend_t {volatile int count;pthread_mutex_t count_lock;pthread_cond_t ready;};#define DECLARE_REND(name, count) \struct rend_t name = {(count), PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER}int rend_init(struct rend_t* prend, int count) {int ret = 0;assert(prend);prend->count = count;if ((ret = pthread_mutex_init(&prend->count_lock, NULL)))return ret;if ((ret = pthread_cond_init(&prend->ready, NULL)))return ret;return EXIT_SUCCESS;}int rend_wait(struct rend_t* prend) {int ret = 0;assert(prend);if ((ret = pthread_mutex_lock(&prend->count_lock)))return ret;/* check count value is ready to weak up block code */if (prend->count == 1) {if ((ret = pthread_cond_broadcast(&prend->ready)))return ret;if ((ret = pthread_mutex_unlock(&prend->count_lock)))return ret;} else {prend->count--;ret = pthread_cond_wait(&prend->ready, &prend->count_lock);prend->count++;if (ret) {pthread_mutex_unlock(&prend->count_lock);return ret;}if ((ret = pthread_mutex_unlock(&prend->count_lock)))return ret;}return EXIT_SUCCESS;}int rend_free(struct rend_t* prend) {int ret = 0;assert(prend);prend->count = 0;if ((ret = pthread_mutex_destroy(&prend->count_lock)))return ret;if ((ret = pthread_cond_destroy(&prend->ready)))return ret;return EXIT_SUCCESS;}#endif /* REND_H_ */
rend 使用更简单:
[*]定义/初始化 rend_t 集结点对象。DECLARE_REND 宏用于静态定义,rend_init 函数可以对动态创建的集结点结构初始化;
[*]pthread 线程通过调用 rend_wait 函数 P/V 集结状态。集结关系的线程要 P/V 在同一个 rend_t 集结对象上;
[*]释放集结对象,rend_free 函数。
以上函数都是成功返回 0,出错返回 errno 值(非 0)。
/* ============================== Name : rendezvous.c Author : liuzy (lzy.dev@gmail.com) Version : 0.1============================== */#include <stdio.h>#include <stdlib.h>#include <stdarg.h>/* va_list */#include <unistd.h>#include <string.h>#include <errno.h>/* errno */#include <syslog.h>/* for syslog(2) and level */#include <pthread.h>#include "rend.h"static int daemon_proc = 0;/* for syslog in err_doit */#defineMAXLINE 4096/* max text line length */void err_doit(int errnoflag, int level, const char* fmt, va_list ap) {char buf = { 0 };int errno_save = errno, n = 0;#ifdef HAVE_VSNPRINTFvsnprintf(buf, MAXLINE, fmt, ap);#elsevsprintf(buf, fmt, ap);#endif/* HAVE_VSNPRINTF */n = strlen(buf);if (errnoflag)snprintf(buf + n, MAXLINE - n, ": %s", strerror(errno_save));strcat(buf, "\n");if (daemon_proc) {syslog(level, "%s", buf);} else {fflush(stdout);fputs(buf, stderr);fflush(stderr);}return;}void err_msg(const char* fmt, ...) {va_list ap;va_start(ap, fmt);err_doit(0, LOG_INFO, fmt, ap);va_end(ap);return;}void err_sys(const char* fmt, ...) {va_list ap;va_start(ap, fmt);err_doit(1, LOG_ERR, fmt, ap);va_end(ap);exit(EXIT_FAILURE);}#define THREAD_COUNT 100/* rendezvous test thread workers */struct worker_arg {int worker_id;struct rend_t* prend;};static void* pthread_worker(void* arg) {struct worker_arg* parg = (struct worker_arg*) arg;err_msg("worker #%d running.", (int) parg->worker_id);srand(parg->worker_id * 2);sleep(rand() % 5);rend_wait(parg->prend);/* workers rendezvous */err_msg("worker #%d exiting.", (int) parg->worker_id);return EXIT_SUCCESS;}int main(void) {int idx = 0;void* exitcode = NULL;pthread_t thds;struct worker_arg arg;DECLARE_REND(rend, THREAD_COUNT);err_msg("workers creating.");for (idx = 0; idx < THREAD_COUNT; idx++) {arg.prend = &rend;arg.worker_id = idx;if (pthread_create(thds + idx, NULL, pthread_worker, (void*) &arg))err_sys("worker #%d create error.", idx);}puts("workers exiting.");for (idx = 0; idx < THREAD_COUNT; idx++)if (pthread_join(thds, &exitcode) || (exitcode != EXIT_SUCCESS))err_msg("worker #%d exit error.", idx);err_msg("all done. exit 0.");rend_free(&rend);return EXIT_SUCCESS;}
看了下 semaphore os syscall 及其 infrastructure,也许以后还需要进程间(非 pthread)集结时用得上。kernel 实现的超强啊,呵呵~
// 2009.11.17 14:34 添加 ////
<div class="quote_title">快速用户空间互斥锁(Futex)
页:
[1]