问题描述
我有一个通用的问题,我想解决的问题,其中从标准输入或常规文件流发送到应用程序的二进制数据,这又将该二进制数据转换为文本.使用线程,我想在将文本输送到下一个应用程序之前对其进行处理,该应用程序将其进一步修改,依此类推.
作为一个简单的测试用例,我想通过gunzip提取压缩数据.具体来说,我正在考虑使用gunzip -c -通过其(重新分配)stdin文件描述符提取发送给它的二进制数据的块,然后从其(重新分配)stdout文件描述符中拉出一块文本.然后,我可以将这些文本块打印到真实的stdout或stderr(或者以后再做其他事情).
(我意识到我可以在命令行上进行gzip基于基于gzip的压缩和提取.我的目标是使用此测试用例,以了解如何正确传递二进制的通用块和线程之间的文本数据通过二进制运行该数据,或进一步处理.)
在我的测试程序的情况下,我设置了三个pthread_t线程:
- produce_gzip_chunk_thread
- consume_gzip_chunk_thread
- consume_gunzip_chunk_thread
i传递这些线程中的每一个共享数据实例,称为thread_data,其中包含线程锁定,两个条件以及一些缓冲区和计数器变量.我还包括一组使用popen3()打开的gunzip进程的文件描述符:
typedef struct pthread_data pthread_data_t; typedef struct popen3_desc popen3_desc_t; struct pthread_data { pthread_mutex_t in_lock; pthread_cond_t in_cond; pthread_cond_t out_cond; unsigned char in_buf[BUF_LENGTH_VALUE]; size_t n_in_bytes; size_t n_in_bytes_written_to_gunzip; size_t n_out_bytes_read_from_gunzip; FILE *in_file_ptr; boolean in_eof; char in_line[LINE_LENGTH_VALUE]; popen3_desc_t *gunzip_ptr; }; struct popen3_desc { int in; int out; int err; };
produce_gzip_chunk_thread读取gzip的1024字节块,来自一个名为foo.gz的常规文件.
这些字节写入称为in_buf的unsigned char缓冲区,这是共享数据结构的一部分,我将传递给每个线程:
void * produce_gzip_chunk(void *t_data) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n"); #endif pthread_data_t *d = (pthread_data_t *)t_data; unsigned char in_buf[BUF_LENGTH_VALUE]; size_t n_in_bytes = 0; d->in_eof = kFalse; pthread_mutex_lock(&d->in_lock); while(kTrue) { n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr); if (n_in_bytes > 0) { while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0) pthread_cond_wait(&d->in_cond, &d->in_lock); memcpy(d->in_buf, in_buf, n_in_bytes); d->n_in_bytes = n_in_bytes; #ifdef DEBUG fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes); #endif pthread_cond_signal(&d->in_cond); } else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr)) break; } d->in_eof = kTrue; pthread_mutex_unlock(&d->in_lock); pthread_cond_signal(&d->in_cond); #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> produce_gzip_chunk()\n"); #endif return NULL; }
一旦在n_bytes中存储了一个正数字节,也就是说,我们已经从输入gzip存档中提取数据,该数据需要使用gunzip进行处理,这触发了允许第二个线程的条件consume_gzip_chunk_thread操作:
void * consume_gzip_chunk(void *t_data) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n"); #endif pthread_data_t *d = (pthread_data_t *)t_data; long n_in_bytes_written_to_gunzip; pthread_mutex_lock(&d->in_lock); while(kTrue) { while (d->n_in_bytes == 0 && !d->in_eof) pthread_cond_wait(&d->in_cond, &d->in_lock); if (d->n_in_bytes) { #ifdef DEBUG fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes); #endif if (!d->gunzip_ptr) { #ifdef DEBUG fprintf(stderr, "Debug: * setting up gunzip ptr\n"); #endif d->gunzip_ptr = malloc(sizeof(popen3_desc_t)); if (!d->gunzip_ptr) { fprintf(stderr, "Error: Could not create gunzip file handle struct\n"); exit(EXIT_FAILURE); } popen3("gunzip -c -", &(d->gunzip_ptr->in), &(d->gunzip_ptr->out), &(d->gunzip_ptr->err), kTrue, kTrue); memset(d->in_line, 0, LINE_LENGTH_VALUE); } n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes); #ifdef DEBUG fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip); #endif if (n_in_bytes_written_to_gunzip > 0) d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip; d->n_in_bytes = 0; pthread_cond_signal(&d->out_cond); } if (d->in_eof) break; } pthread_mutex_unlock(&d->in_lock); #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> consume_gzip_chunk()\n"); #endif return NULL; }
消耗gzip数据块时,我们使用write函数将in_buf的n_bytes发送到in_buf in_buf n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes最后,我们将另一个线程信号发送到out_cond,以帮助Reawaken consume_gunzip_chunk_thread,它从gunzip的输出中读取以完成更多工作:
void * consume_gunzip_chunk(void *t_data) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n"); #endif pthread_data_t *d = (pthread_data_t *)t_data; long n_out_bytes_read_from_gunzip; pthread_mutex_lock(&d->in_lock); while(kTrue) { while (d->n_in_bytes_written_to_gunzip == 0) { pthread_cond_wait(&d->out_cond, &d->in_lock); } if (d->n_in_bytes_written_to_gunzip) { sleep(1); n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE); #ifdef DEBUG fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip); fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line); #endif memset(d->in_line, 0, strlen(d->in_line)); if (n_out_bytes_read_from_gunzip > 0) d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip; d->n_in_bytes_written_to_gunzip = 0; pthread_cond_signal(&d->in_cond); } if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0)) break; } pthread_mutex_unlock(&d->in_lock); #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> consume_gunzip_chunk()\n"); #endif return NULL; }
这将尝试read gunzip流程输出文件描述符中的任何可用字节.出于调试目的,我只想将它们打印到stderr.
我面临的问题是,在执行read之前,我需要在consume_gunzip_chunk中添加sleep(1)语句,以使事情正常工作.
没有此sleep(1)语句,我的测试程序通常不会输出任何内容,除了每8-10尝试一次,当正确提取压缩数据时一次.
问题 - 我对条件的安排做错了什么,使sleep(1)呼叫需要使gzip - 萃取正常工作?在生产方案中,使用更大的输入文件,强行等待每1kb一秒钟似乎是一个坏主意.
对于完整源代码的可重复性,这是两个相关文件.这是标题:
/* * convert.h */ #ifndef CONVERT_H #define CONVERT_H #include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <getopt.h> #include <unistd.h> #include <fcntl.h> #include <sys/types.h> #include <sys/stat.h> #include <errno.h> #define CB_VERSION "1.0" #define LINE_LENGTH_VALUE 65536 #define BUF_LENGTH_VALUE 1024 #define POPEN3_READ 0 #define POPEN3_WRITE 1 typedef int boolean; extern const boolean kTrue; extern const boolean kFalse; const boolean kTrue = 1; const boolean kFalse = 0; typedef enum { kGzip, kUnknown } format_t; typedef struct pthread_data pthread_data_t; typedef struct popen3_desc popen3_desc_t; struct pthread_data { pthread_mutex_t in_lock; pthread_cond_t in_cond; pthread_cond_t out_cond; unsigned char in_buf[BUF_LENGTH_VALUE]; size_t n_in_bytes; size_t n_in_bytes_written_to_gunzip; size_t n_out_bytes_read_from_gunzip; boolean in_eof; FILE *in_file_ptr; popen3_desc_t *gunzip_ptr; char in_line[LINE_LENGTH_VALUE]; }; struct popen3_desc { int in; int out; int err; }; static const char *name = "convert"; static const char *version = CB_VERSION; static const char *authors = "Alex Reynolds"; static const char *usage = "\n" \ "Usage: convert --input-format=str <input-file>\n" \ " Process Flags:\n\n" \ " --input-format=str | -f str Input format (str = [ gzip ]; required)\n" \ " --help | -h Show this usage message\n"; static struct convert_globals_t { char *input_format_str; format_t input_format; char **filenames; int num_filenames; } convert_globals; static struct option convert_client_long_options[] = { { "input-format", required_argument, NULL, 'f' }, { "help", no_argument, NULL, 'h' }, { NULL, no_argument, NULL, 0 } }; static const char *convert_client_opt_string = "f:h?"; void * consume_gunzip_chunk (void *t_data); void * consume_gzip_chunk (void *t_data); void * produce_gzip_chunk (void *t_data); FILE * new_file_ptr (const char *in_fn); void delete_file_ptr (FILE **file_ptr); pid_t popen3 (const char *command, int *in_desc, int *out_desc, int *err_desc, boolean nonblock_in, boolean nonblock_outerr); off_t fsize (const char *fn); void initialize_globals (); void parse_command_line_options (int argc, char **argv); void print_usage (FILE *stream); #endif
这是实现:
/* * convert.c */ #include "convert.h" int main(int argc, char **argv) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> main()\n"); #endif pthread_t produce_gzip_chunk_thread = NULL; pthread_t consume_gzip_chunk_thread = NULL; pthread_t consume_gunzip_chunk_thread = NULL; pthread_data_t *thread_data = NULL; parse_command_line_options(argc, argv); /* initialize thread data */ thread_data = malloc(sizeof(pthread_data_t)); thread_data->n_in_bytes = 0; thread_data->n_in_bytes_written_to_gunzip = 0; thread_data->n_out_bytes_read_from_gunzip = 0; thread_data->in_eof = kFalse; thread_data->in_file_ptr = new_file_ptr(convert_globals.filenames[0]); pthread_mutex_init(&(thread_data->in_lock), NULL); pthread_cond_init(&(thread_data->in_cond), NULL); pthread_cond_init(&(thread_data->out_cond), NULL); /* parse input */ if (convert_globals.input_format == kGzip) { if (pthread_create(&produce_gzip_chunk_thread, NULL, produce_gzip_chunk, (void *) thread_data) != 0) { fprintf(stderr, "Error: Could not create gzip chunk production thread\n"); return EXIT_FAILURE; } if (pthread_create(&consume_gzip_chunk_thread, NULL, consume_gzip_chunk, (void *) thread_data) != 0) { fprintf(stderr, "Error: Could not create gzip chunk consumption thread\n"); return EXIT_FAILURE; } if (pthread_create(&consume_gunzip_chunk_thread, NULL, consume_gunzip_chunk, (void *) thread_data) != 0) { fprintf(stderr, "Error: Could not create gunzip chunk consumption thread\n"); return EXIT_FAILURE; } if (pthread_join(produce_gzip_chunk_thread, NULL) != 0) { fprintf(stderr, "Error: Could not join gzip chunk production thread\n"); return EXIT_FAILURE; } if (pthread_join(consume_gzip_chunk_thread, NULL) != 0) { fprintf(stderr, "Error: Could not join gzip chunk consumption thread\n"); return EXIT_FAILURE; } if (pthread_join(consume_gunzip_chunk_thread, NULL) != 0) { fprintf(stderr, "Error: Could not join gunzip chunk consumption thread\n"); return EXIT_FAILURE; } } else { /* handle text formats */ } /* cleanup */ delete_file_ptr(&thread_data->in_file_ptr); pthread_mutex_destroy(&(thread_data->in_lock)); pthread_cond_destroy(&(thread_data->in_cond)); pthread_cond_destroy(&(thread_data->out_cond)); free(thread_data); #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> main()\n"); #endif return EXIT_SUCCESS; } void * consume_gunzip_chunk(void *t_data) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n"); #endif pthread_data_t *d = (pthread_data_t *)t_data; long n_out_bytes_read_from_gunzip; pthread_mutex_lock(&d->in_lock); while(kTrue) { while (d->n_in_bytes_written_to_gunzip == 0) { pthread_cond_wait(&d->out_cond, &d->in_lock); } if (d->n_in_bytes_written_to_gunzip) { sleep(1); n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE); #ifdef DEBUG fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip); fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line); #endif memset(d->in_line, 0, strlen(d->in_line)); if (n_out_bytes_read_from_gunzip > 0) d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip; d->n_in_bytes_written_to_gunzip = 0; pthread_cond_signal(&d->in_cond); } if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0)) break; } pthread_mutex_unlock(&d->in_lock); #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> consume_gunzip_chunk()\n"); #endif return NULL; } void * consume_gzip_chunk(void *t_data) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n"); #endif pthread_data_t *d = (pthread_data_t *)t_data; long n_in_bytes_written_to_gunzip; pthread_mutex_lock(&d->in_lock); while(kTrue) { while (d->n_in_bytes == 0 && !d->in_eof) pthread_cond_wait(&d->in_cond, &d->in_lock); if (d->n_in_bytes) { #ifdef DEBUG fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes); #endif if (!d->gunzip_ptr) { #ifdef DEBUG fprintf(stderr, "Debug: * setting up gunzip ptr\n"); #endif d->gunzip_ptr = malloc(sizeof(popen3_desc_t)); if (!d->gunzip_ptr) { fprintf(stderr, "Error: Could not create gunzip file handle struct\n"); exit(EXIT_FAILURE); } popen3("gunzip -c -", &(d->gunzip_ptr->in), &(d->gunzip_ptr->out), &(d->gunzip_ptr->err), kTrue, kTrue); memset(d->in_line, 0, LINE_LENGTH_VALUE); } n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes); #ifdef DEBUG fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip); #endif if (n_in_bytes_written_to_gunzip > 0) d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip; d->n_in_bytes = 0; /* pthread_cond_signal(&d->in_cond); */ pthread_cond_signal(&d->out_cond); } if (d->in_eof) break; } pthread_mutex_unlock(&d->in_lock); #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> consume_gzip_chunk()\n"); #endif return NULL; } void * produce_gzip_chunk(void *t_data) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n"); #endif pthread_data_t *d = (pthread_data_t *)t_data; unsigned char in_buf[BUF_LENGTH_VALUE]; size_t n_in_bytes = 0; d->in_eof = kFalse; pthread_mutex_lock(&d->in_lock); while(kTrue) { n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr); if (n_in_bytes > 0) { while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0) pthread_cond_wait(&d->in_cond, &d->in_lock); memcpy(d->in_buf, in_buf, n_in_bytes); d->n_in_bytes = n_in_bytes; #ifdef DEBUG fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes); #endif pthread_cond_signal(&d->in_cond); } else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr)) break; } d->in_eof = kTrue; pthread_mutex_unlock(&d->in_lock); pthread_cond_signal(&d->in_cond); #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> produce_gzip_chunk()\n"); #endif return NULL; } FILE * new_file_ptr(const char *in_fn) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> new_file_ptr()\n"); #endif FILE *file_ptr = NULL; boolean not_stdin = kTrue; not_stdin = strcmp(in_fn, "-"); file_ptr = (not_stdin) ? fopen(in_fn, "r") : stdin; if (!file_ptr) { fprintf(stderr, "Error: Could not open input stream\n"); exit(EXIT_FAILURE); } #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> new_file_ptr()\n"); #endif return file_ptr; } void delete_file_ptr(FILE **file_ptr) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> delete_file_ptr()\n"); #endif fclose(*file_ptr); *file_ptr = NULL; #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> delete_file_ptr()\n"); #endif } pid_t popen3(const char *command, int *in_desc, int *out_desc, int *err_desc, boolean nonblock_in, boolean nonblock_outerr) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> popen3()\n"); #endif int p_stdin[2], p_stdout[2], p_stderr[2]; pid_t pid; if (pipe(p_stdin) != 0 || pipe(p_stdout) != 0 || pipe(p_stderr) != 0) return -1; if (nonblock_in) { fcntl(p_stdin[POPEN3_WRITE], F_SETFL, fcntl(p_stdin[POPEN3_WRITE], F_GETFL) | O_NONBLOCK); } if (nonblock_outerr) { fcntl(p_stdout[POPEN3_READ], F_SETFL, fcntl(p_stdout[POPEN3_READ], F_GETFL) | O_NONBLOCK); fcntl(p_stderr[POPEN3_READ], F_SETFL, fcntl(p_stderr[POPEN3_READ], F_GETFL) | O_NONBLOCK); } pid = fork(); if (pid < 0) return pid; /* error */ if (pid == 0) { close(p_stdin[POPEN3_WRITE]); close(p_stdout[POPEN3_READ]); close(p_stderr[POPEN3_READ]); dup2(p_stdin[POPEN3_READ], fileno(stdin)); dup2(p_stdout[POPEN3_WRITE], fileno(stderr)); dup2(p_stdout[POPEN3_WRITE], fileno(stdout)); execl("/bin/sh", "sh", "-c", command, NULL); fprintf(stderr, "Error: Could not execl [%s]\n", command); exit(EXIT_FAILURE); } if (in_desc == NULL) close(p_stdin[POPEN3_WRITE]); else *in_desc = p_stdin[POPEN3_WRITE]; if (out_desc == NULL) close(p_stdout[POPEN3_READ]); else *out_desc = p_stdout[POPEN3_READ]; if (err_desc == NULL) close(p_stderr[POPEN3_READ]); else *err_desc = p_stderr[POPEN3_READ]; #ifdef DEBUG fprintf(stderr, "Debug: New *in_desc = %d\n", *in_desc); fprintf(stderr, "Debug: New *out_desc = %d\n", *out_desc); fprintf(stderr, "Debug: New *err_desc = %d\n", *err_desc); #endif #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> popen3()\n"); #endif return pid; } off_t fsize(const char *fn) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> fsize()\n"); #endif struct stat st; if (stat(fn, &st) == 0) return st.st_size; #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> fsize()\n"); #endif return EXIT_FAILURE; } void initialize_globals() { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> initialize_globals()\n"); #endif convert_globals.input_format = kUnknown; convert_globals.filenames = NULL; convert_globals.num_filenames = 0; #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> initialize_globals()\n"); #endif } void parse_command_line_options(int argc, char **argv) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> parse_command_line_options()\n"); #endif int client_long_index; int client_opt = getopt_long(argc, argv, convert_client_opt_string, convert_client_long_options, &client_long_index); char *in_format_str = NULL; opterr = 0; /* disable error reporting by GNU getopt */ initialize_globals(); while (client_opt != -1) { switch (client_opt) { case 'f': in_format_str = optarg; break; case 'h': print_usage(stdout); exit(EXIT_SUCCESS); case '?': print_usage(stdout); exit(EXIT_SUCCESS); default: break; } client_opt = getopt_long(argc, argv, convert_client_opt_string, convert_client_long_options, &client_long_index); } convert_globals.filenames = argv + optind; convert_globals.num_filenames = argc - optind; if (!in_format_str) { fprintf(stderr, "Error: Specified input format was omitted; please specify one of required input formats\n"); print_usage(stderr); exit(EXIT_FAILURE); } else if (convert_globals.num_filenames != 1) { fprintf(stderr, "Error: Please specify an input file (either a regular file or '-' for stdin\n"); print_usage(stderr); exit(EXIT_FAILURE); } /* map format string to setting */ if (strcmp(in_format_str, "gzip") == 0) convert_globals.input_format = kGzip; else { fprintf(stderr, "Error: Specified input format is unknown; please specify one of required input formats\n"); print_usage(stderr); exit(EXIT_FAILURE); } #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> parse_command_line_options()\n"); #endif } void print_usage(FILE *stream) { #ifdef DEBUG fprintf(stderr, "Debug: Entering --> print_usage()\n"); #endif fprintf(stream, "%s\n" \ " version: %s\n" \ " author: %s\n" \ "%s\n", name, version, authors, usage); #ifdef DEBUG fprintf(stderr, "Debug: Leaving --> print_usage()\n"); #endif }
这是构建过程:
$ mkdir -p objects $ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline -c convert.c -o objects/convert.o -iquote${PWD} $ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline objects/convert.o -o convert -lpthread
我能够在OS X和Linux主机上构建此测试代码,并具有合理的现代编译环境.
事先感谢您的任何有用建议!
推荐答案
我首先要说我觉得pthreads条件和静音并不是真正必要的,也不是对您描述的问题的最佳反应.
我认为,您所描述的问题和无静脉版本是忘记close()顽固的管道末端的症状管道喂养儿童过程的stdin泄漏(进入那个孩子或其他人)活着.
然后,鉴于与Stdin的阅读端相对应的写入端仍然存在,该系统没有给出EOF,而是无限期地阻止.
在您的情况下,您 did 防止管道端文件描述符泄漏到产卵的孩子(在您的fork()的子侧上打电话正确close() C13>,尽管您忘记了close()错误的管道在父侧结束).但是,您并没有阻止所有其他孩子的泄漏!如果您致电popen3()两次,则防止三个描述符的泄漏到孩子中,但是由于父母仍然拥有它们,当下一个呼叫popen3()发生时,fork()现在有fork()现在有 6 要关闭的文件描述符(三个旧组,以及您刚创建的三个集合).
在您的情况下,您应该在这些管道末端设置关闭符号标志,因此:
fcntl(fdIn [PIPEWR], F_SETFD, fcntl(fdIn [PIPEWR], F_GETFD) | FD_CLOEXEC); fcntl(fdOut[PIPERD], F_SETFD, fcntl(fdOut[PIPERD], F_GETFD) | FD_CLOEXEC); fcntl(fdErr[PIPERD], F_SETFD, fcntl(fdErr[PIPERD], F_GETFD) | FD_CLOEXEC);
这是催生6个线程和3个进程的代码,并在内部压缩后将其输入未修饰传递给输出,然后将其解压缩.它有效地实现了gzip -c - | XOR 0x55 | XOR 0x55 | gunzip -c - | cat,其中:
- 标准输入被Thread srcThrd.
- gzip的输出由线程a2xor0Thrd读取,并馈入线程xor0Thrd.
- 线程xor0Thrd在将其传递到线程xor1Thrd之前,用0x55 XORS输入.
- 线程xor1Thrd在将其传递到线程xor22BThrd之前,用0x55 XORS输入.
- 线程xor22BThrd将其输入输入到Process gunzip.
- Process gunzip将其输出直接馈送(不通过线程)cat
- Process cat的输出由线程dstThrd读取并打印到标准输出.
压缩是通过程序间管道通信完成的,而Xoring是通过进程内管道通信完成的.不使用静音变量或条件变量. main()非常容易理解.此代码应该易于扩展到您的情况.
/* Includes */ #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include <stdio.h> #include <fcntl.h> /* Defines */ #define PIPERD 0 #define PIPEWR 1 /* Data structures */ typedef struct PIPESET{ int Ain[2]; int Aout[2]; int Aerr[2]; int xor0[2]; int xor1[2]; int xor2[2]; int Bin[2]; int BoutCin[2]; int Berr[2]; int Cout[2]; int Cerr[2]; } PIPESET; /* Function Implementations */ /** * Source thread main method. * * Slurps from standard input and feeds process A. */ void* srcThrdMain(void* arg){ PIPESET* pipeset = (PIPESET*)arg; char c; while(read(0, &c, 1) > 0){ write(pipeset->Ain[PIPEWR], &c, 1); } close(pipeset->Ain[PIPEWR]); pthread_exit(NULL); } /** * A to XOR0 thread main method. * * Manually pipes from standard output of process A to input of thread XOR0. */ void* a2xor0ThrdMain(void* arg){ PIPESET* pipeset = (PIPESET*)arg; char buf[65536]; ssize_t bytesRead; while((bytesRead = read(pipeset->Aout[PIPERD], buf, 65536)) > 0){ write(pipeset->xor0[PIPEWR], buf, bytesRead); } close(pipeset->xor0[PIPEWR]); pthread_exit(NULL); } /** * XOR0 thread main method. * * XORs input with 0x55 and outputs to input of XOR1. */ void* xor0ThrdMain(void* arg){ PIPESET* pipeset = (PIPESET*)arg; char c; while(read(pipeset->xor0[PIPERD], &c, 1) > 0){ c ^= 0x55; write(pipeset->xor1[PIPEWR], &c, 1); } close(pipeset->xor1[PIPEWR]); pthread_exit(NULL); } /** * XOR1 thread main method. * * XORs input with 0x55 and outputs to input of process B. */ void* xor1ThrdMain(void* arg){ PIPESET* pipeset = (PIPESET*)arg; char c; while(read(pipeset->xor1[PIPERD], &c, 1) > 0){ c ^= 0x55; write(pipeset->xor2[PIPEWR], &c, 1); } close(pipeset->xor2[PIPEWR]); pthread_exit(NULL); } /** * XOR2 to B thread main method. * * Manually pipes from input (output of XOR1) to input of process B. */ void* xor22BThrdMain(void* arg){ PIPESET* pipeset = (PIPESET*)arg; char buf[65536]; ssize_t bytesRead; while((bytesRead = read(pipeset->xor2[PIPERD], buf, 65536)) > 0){ write(pipeset->Bin[PIPEWR], buf, bytesRead); } close(pipeset->Bin[PIPEWR]); pthread_exit(NULL); } /** * Destination thread main method. * * Manually copies the standard output of process C to the standard output. */ void* dstThrdMain(void* arg){ PIPESET* pipeset = (PIPESET*)arg; char c; while(read(pipeset->Cout[PIPERD], &c, 1) > 0){ write(1, &c, 1); } pthread_exit(NULL); } /** * Set close on exec flag on given descriptor. */ void setCloExecFlag(int fd){ fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC); } /** * Set close on exec flag on given descriptor. */ void unsetCloExecFlag(int fd){ fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) & ~FD_CLOEXEC); } /** * Pipe4. * * Create a pipe with some ends possibly marked close-on-exec. */ #define PIPE4_FLAG_NONE (0U) #define PIPE4_FLAG_RD_CLOEXEC (1U << 0) #define PIPE4_FLAG_WR_CLOEXEC (1U << 1) int pipe4(int fd[2], int flags){ int ret = pipe(fd); if(flags&PIPE4_FLAG_RD_CLOEXEC){setCloExecFlag(fd[PIPERD]);} if(flags&PIPE4_FLAG_WR_CLOEXEC){setCloExecFlag(fd[PIPEWR]);} return ret; } /** * Pipe4 explicit derivatives. */ #define pipe4_cloexec(fd) pipe4((fd), PIPE4_FLAG_RD_CLOEXEC|PIPE4_FLAG_WR_CLOEXEC) /** * Popen4. * * General-case for spawning a process and tethering it with cloexec pipes on stdin, * stdout and stderr. * * @param [in] cmd The command to execute. * @param [in/out] pin The pointer to the cloexec pipe for stdin. * @param [in/out] pout The pointer to the cloexec pipe for stdout. * @param [in/out] perr The pointer to the cloexec pipe for stderr. * @param [in] flags A bitwise OR of flags to this function. Available * flags are: * * POPEN4_FLAG_NONE: * Explicitly specify no flags. * POPEN4_FLAG_NOCLOSE_PARENT_STDIN, * POPEN4_FLAG_NOCLOSE_PARENT_STDOUT, * POPEN4_FLAG_NOCLOSE_PARENT_STDERR: * Don't close pin[PIPERD], pout[PIPEWR] and perr[PIPEWR] in the parent, * respectively. * POPEN4_FLAG_CLOSE_CHILD_STDIN, * POPEN4_FLAG_CLOSE_CHILD_STDOUT, * POPEN4_FLAG_CLOSE_CHILD_STDERR: * Close the respective streams in the child. Ignores pin, pout and perr * entirely. Overrides a NOCLOSE_PARENT flag for the same stream. */ #define POPEN4_FLAG_NONE (0U) #define POPEN4_FLAG_NOCLOSE_PARENT_STDIN (1U << 0) #define POPEN4_FLAG_NOCLOSE_PARENT_STDOUT (1U << 1) #define POPEN4_FLAG_NOCLOSE_PARENT_STDERR (1U << 2) #define POPEN4_FLAG_CLOSE_CHILD_STDIN (1U << 3) #define POPEN4_FLAG_CLOSE_CHILD_STDOUT (1U << 4) #define POPEN4_FLAG_CLOSE_CHILD_STDERR (1U << 5) pid_t popen4(const char* cmd, int pin[2], int pout[2], int perr[2], int flags){ /******************** ** FORK PROCESS ** ********************/ pid_t ret = fork(); if(ret < 0){ /** * Error in fork(), still in parent. */ fprintf(stderr, "fork() failed!\n"); return ret; }else if(ret == 0){ /** * Child-side of fork */ if(flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){ close(0); }else{ unsetCloExecFlag(pin [PIPERD]); dup2(pin [PIPERD], 0); } if(flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){ close(1); }else{ unsetCloExecFlag(pout[PIPEWR]); dup2(pout[PIPEWR], 1); } if(flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){ close(2); }else{ unsetCloExecFlag(perr[PIPEWR]); dup2(perr[PIPEWR], 2); } execl("/bin/sh", "sh", "-c", cmd, NULL); fprintf(stderr, "exec() failed!\n"); exit(-1); }else{ /** * Parent-side of fork */ if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDIN && ~flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){ close(pin [PIPERD]); } if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDOUT && ~flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){ close(pout[PIPEWR]); } if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDERR && ~flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){ close(perr[PIPEWR]); } return ret; } /* Unreachable */ return ret; } /** * Main Function. * * Sets up the whole piping scheme. */ int main(int argc, char* argv[]){ pthread_t srcThrd, a2xor0Thrd, xor0Thrd, xor1Thrd, xor22BThrd, dstThrd; pid_t gzip, gunzip, cat; PIPESET pipeset; pipe4_cloexec(pipeset.Ain); pipe4_cloexec(pipeset.Aout); pipe4_cloexec(pipeset.Aerr); pipe4_cloexec(pipeset.Bin); pipe4_cloexec(pipeset.BoutCin); pipe4_cloexec(pipeset.Berr); pipe4_cloexec(pipeset.Cout); pipe4_cloexec(pipeset.Cerr); pipe4_cloexec(pipeset.xor0); pipe4_cloexec(pipeset.xor1); pipe4_cloexec(pipeset.xor2); /* Spawn processes */ gzip = popen4("gzip -c -", pipeset.Ain, pipeset.Aout, pipeset.Aerr, POPEN4_FLAG_NONE); gunzip = popen4("gunzip -c -", pipeset.Bin, pipeset.BoutCin, pipeset.Berr, POPEN4_FLAG_NONE); cat = popen4("cat", pipeset.BoutCin, pipeset.Cout, pipeset.Cerr, POPEN4_FLAG_NONE); /* Spawn threads */ pthread_create(&srcThrd, NULL, srcThrdMain, &pipeset); pthread_create(&a2xor0Thrd, NULL, a2xor0ThrdMain, &pipeset); pthread_create(&xor0Thrd, NULL, xor0ThrdMain, &pipeset); pthread_create(&xor1Thrd, NULL, xor1ThrdMain, &pipeset); pthread_create(&xor22BThrd, NULL, xor22BThrdMain, &pipeset); pthread_create(&dstThrd, NULL, dstThrdMain, &pipeset); pthread_join(srcThrd, (void**)NULL); pthread_join(a2xor0Thrd, (void**)NULL); pthread_join(xor0Thrd, (void**)NULL); pthread_join(xor1Thrd, (void**)NULL); pthread_join(xor22BThrd, (void**)NULL); pthread_join(dstThrd, (void**)NULL); return 0; }
关于您自己的代码的评论
您的代码有很多问题,其中大多数与线程无关.
- 您不close()文件描述符d->gunzip_ptr->in.这意味着gunzip永远不知道在其stdin上不再有输入,因此它永远不会退出.
- 由于gunzip从未退出,因此它永远不会close()它的stdout,因此另一端的阻塞read()永远不会取消阻止.非阻滞读取将始终给出-1,并带有errno == EAGAIN.
- 您的popen3()不close() p_stdin[POPEN3_READ],p_stdout[POPEN3_WRITE]或p_stderr[POPEN3_WRITE] fork()的父侧.只有孩子应该有这些描述符.未能关闭这些意味着,当父母本身试图阅读孩子的stdout和stderr时,它将再也看不到EOF,出于与上述相同的原因:因为它本身仍然拥有一个书写端管道,它可以写在其中. ,使新数据显示为读取.
- 您的代码隐含地依赖于gunzip在您写的每1024章中至少写出一个字节.不能保证这种情况是这种情况,因为gunzip gunzip gunzip在内部闲暇时,在其闲暇时进行缓冲.
- 这是因为您的代码读取,然后将最多BUF_LENGTH_VALUE字节的块复制到d->in_buf中.然后,您将通过fread()读取的字节数分配给d->n_in_bytes.在您的write()中使用了相同的d->n_in_bytes来写入gunzip的stdin.然后,您要发出consume_gunzip_chunk()醒来的信号,然后pthread_cond_wait()为下一个GZIP压缩块.但是这个GZIP压缩的块可能永远不会到来,因为不能保证gunzip能够从仅输入的第一个1024字节中解开有用的输出,甚至可以保证它会write() write()而不是缓冲直到它具有4096字节(完整的输出)为止.因此,read()在consume_gunzip_chunk()中呼叫可能永远不会成功(甚至返回,如果read()被阻止).如果read()永远不会返回,则consume_gunzip_chunk()不信号d->in_cond,因此所有三个线程都被卡住了.即使read()是非障碍物,GZIP的最后一个输出块也可能永远不会出现,因为gzip的输入永远不会关闭,因此它不会通过write()'将它们冲洗出来,因此read()在另一端将永远不会获得有用的数据,并且没有A close().
-
可能(可能?)错误的原因:d->n_out_bytes_read_from_gunzip,一旦变为非 - 0,就永远不会再变成0.这意味着极度困惑
while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0) pthread_cond_wait(&d->in_cond, &d->in_lock);
在produce_gzip_chunk()中,一旦使用d->n_out_bytes_read_from_gunzip != 0输入,将永远卡住.通过在consume_gunzip_chunk()中调用sleep(1),该>设置d->n_out_bytes_read_from_gunzip,您可能已经通过在consume_gunzip_chunk()之前读取所有输入来解决问题. >
- 有两个调用pthread_cond_wait(&d->in_cond, &d->in_lock);的线程,这些线程为produce_gzip_chunk()和consume_gzip_chunk().绝对不能保证当consume_gunzip_chunk()呼叫pthread_cond_signal(&d->in_cond);时,"正确"线程(您的设计中的任何型)会接收信号.为了确保所有这些都可以使用pthread_cond_broadcast(),但随后您将自己暴露于 a>.在这种情况下,需要使用pthread_cond_broadcast()是我认为设计不良的症状.
- 相关,您在线程中调用pthread_cond_signal(&d->in_cond)在其中(确实是函数)中,您可以在其中调用pthread_cond_wait(&d->in_cond, &d->in_lock).有什么用途?
- 您使用d->in_lock出于太多不同的目的,使自己陷入僵局的可能性或由于过度保护而导致的性能低.特别是,您将其用作d->in_cond和d->out_cond的保护.这太强大了 - gunzip的输出添加到d->in_line中应该能够与gunzip的输入同时发生.d->in_buf.
-
在consume_gunzip_chunk()中,您有
while (d->n_in_bytes_written_to_gunzip == 0) { pthread_cond_wait(&d->out_cond, &d->in_lock); } if (d->n_in_bytes_written_to_gunzip) { ...
这个if永远不会失败!您可能会想到一个案例吗?
- 考虑使整个struct pthread_data挥发性(或至少由多个线程使用的那些整数元素),因为编译器可能会决定优化应保留的负载和商店.
赞美
以至于听起来不太消极,我想说的是,您的问题总体上不是由于滥用Pthreads API所致,而是由于错误的消费者逻辑和缺乏close() s.此外,您似乎明白pthread_cond_wait()可能会醒来伪善,所以您已经包装了它在检查不变的循环中.
将来:
我将使用管道,甚至在线程之间.这使您无法实施自己的消费者计划;内核已经为您解决了它,并为您提供了pipe(),read()和write() primitives,这就是您需要利用这种现成的解决方案的全部.它还使代码清洁器和静音变量无效.必须简单地努力地关闭末端,并且必须在fork()存在的情况下非常小心.规则很简单:
- 如果存在管道的写入末端,则在开放式读取端上的read()不会给出EOF,但会阻止或EAGAIN.
- 如果管道的写入端都已关闭,则在开放式读取端上A read()将给出EOF.
- 如果管道的读取端都已关闭,则write()的任何写入端将导致SIGPIPE.
- fork()重复整个过程,包括所有描述符(Modulo也许在pthread_atfork()中疯狂的东西)!
其他推荐答案
啊.所以我想我误解了这个问题....对不起.
我以为您想运行gunzip,然后再运行另一个内部过滤器,并想做" n"时间.
看来,您真正想做的是运行许多过滤器阶段,一个接一个地……有些使用外部命令,一些(也许是?)是该程序的内部.因此,渴望管理一些阶段间缓冲.
所以...我又去了.目的是运行任何数量的阶段,从输入阶段开始,然后是外部命令或内部功能"滤波器"阶段,最后是输出阶段.每个外部命令阶段都有三个pthreads-用于stdin,stdout和stderr.内部功能阶段使用一个pthread和初始输入和最终输出一个pthread.在阶段之间是一个小管结构(称为"稻草")来"双重缓冲区"并将阶段解除...我希望这更接近您的想法.
"稻草"是事物的本质:
struct straw { pthread_mutex_t* mutex ; struct lump* free ; pthread_cond_t* free_cond ; bool free_waiting ; struct lump* ready ; pthread_cond_t* ready_cond ; bool ready_waiting ; struct lump* lumps[2] ; } ;
其中a struct lump包含一个缓冲区和什么. "稻草"有两个这样的"肿块",在任何时候,一个pthread都可能填满了一个肿块,而另一个pthread则排出另一个.或两个肿块可能是免费的(在免费列表上)或两者都准备好(完整)在就绪列表上等待.
然后要为一个空的肿块填充(例如,从管道上阅读时):
static struct lump* lump_acquire(struct straw* strw) { struct lump* lmp ; pthread_mutex_lock(strw->mutex) ; while (strw->free == NULL) { strw->free_waiting = true ; pthread_cond_wait(strw->free_cond, strw->mutex) ; strw->free_waiting = false ; } ; lmp = strw->free ; strw->free = lmp->next ; pthread_mutex_unlock(strw->mutex) ; lmp->next = NULL ; /* tidy */ lmp->ptr = lmp->end = lmp->buff ; /* empty */ lmp->done = false ; return lmp ; } ;
然后将完整的肿块吹入稻草的(一端).
static void lump_blow(struct lump* lmp) { struct straw* strw ; strw = lmp->strw ; qassert((lmp == strw->lumps[0]) || (lmp == strw->lumps[1])) ; qassert( (lmp->buff <= lmp->ptr) && (lmp->ptr <= lmp->end) && (lmp->end <= lmp->limit) ) ; lmp->ptr = lmp->buff ; pthread_mutex_lock(strw->mutex) ; if (strw->ready == NULL) strw->ready = lmp ; else strw->ready->next = lmp ; lmp->next = NULL ; if (strw->ready_waiting) pthread_cond_signal(strw->ready_cond) ; pthread_mutex_unlock(strw->mutex) ; } ;
从稻草的另一端吮吸一个团块:
static struct lump* lump_suck(struct straw* strw) { struct lump* lmp ; pthread_mutex_lock(strw->mutex) ; while (strw->ready == NULL) { strw->ready_waiting = true ; pthread_cond_wait(strw->ready_cond, strw->mutex) ; strw->ready_waiting = false ; } ; lmp = strw->ready ; strw->ready = lmp->next ; pthread_mutex_unlock(strw->mutex) ; qassert( (lmp->buff <= lmp->ptr) && (lmp->ptr <= lmp->end) && (lmp->end <= lmp->limit) ) ; lmp->ptr = lmp->buff ; /* lmp->ptr..lmp->end */ lmp->next = NULL ; /* tidy */ return lmp ; } ;
和最后一块,一旦排干了肿块:
static void lump_free(struct lump* lmp) { struct straw* strw ; strw = lmp->strw ; qassert((lmp == strw->lumps[0]) || (lmp == strw->lumps[1])) ; qassert( (lmp->buff <= lmp->ptr) && (lmp->ptr <= lmp->end) && (lmp->end <= lmp->limit) ) ; pthread_mutex_lock(strw->mutex) ; if (strw->free == NULL) strw->free = lmp ; else strw->free->next = lmp ; lmp->next = NULL ; /* end of list of free */ lmp->ptr = lmp->end = lmp->buff ; /* empty */ lmp->done = false ; if (strw->free_waiting) pthread_cond_signal(strw->free_cond) ; pthread_mutex_unlock(strw->mutex) ; } ;
整个程序太大了,无法适应答案 - 请参阅: pipework.c 开始的地方:
/*============================================================================== * pipework.c * * Copyright (c) Chris Hall (GMCH) 2014, All rights reserved. * * Though you may do what you like with this, provided you recognise that * it is offered "as is", gratis, and may or may not be fit for any purpose * whatsoever -- you are on your own. * *------------------------------------------------------------------------------ * * This will read from stdin, pass the data through an arbitrary number of * "filter" stages and finally write the result to stdout. * * A filter stage may be an external command taking a piped stdin and * outputting to a piped stdout. Anything it says to stderr is collected * and output to the program's stderr. * * A filter stage may also be an internal function. * * The input, filter and output stages are implemented as a number of pthreads, * with internal, miniature pipes (called "straws") between them. All I/O is * blocking. This is an experiment in the use of pthreads to simplify things. * * ============================ * This is v0.08 of 4-Jul-2014 * ============================ * * The 'main' below runs eight stages: input, 4 commands, 2 internal filters * and the output. The output should be an exact copy of the input. * * In order to test the stderr handling, the following small perl script is * used as two of the command filters: * * chatter.pl * -------------------------------------------------------- use strict ; use warnings ; my $line = 0 ; while (<STDIN>) { my $len = length($_) ; my $r = rand ; $line += 1 ; print STDERR "|$line:$len:$r|" ; if (int($r * 100) == 0) { print STDERR "\n" ; } ; print $_ ; } ; * -------------------------------------------------------- * *------------------------------------------------------------------------------ * Limitations * * * this will crash if it gets some error its not expecting or not * designed to overcome. Clearly, to be useful this needs to be more * robust and more informative. * * * some (possible/theoretical) errors are simply ignored. * * * no attempt is made to collect up the child processes or to discover * their return codes. If the child process reports errors or anything * else on stderr, then that will be visible. But otherwise, if it just * crashes then the pipeline will run to completion, but the result may * be nonsense. * * * if one of the child processes stalls, the whole thing stalls. * * * an I/O error in a stage will send 'end' downstream, but the program * will continue until everything upstream has completed. * * * generally... not intended for production use !! */
和perl脚本可以: chatter.pl
hth