如何通过PTHREADS管理两个或多个消费者?[英] How to manage two or more consumers via pthreads?

本文是小编为大家收集整理的关于如何通过PTHREADS管理两个或多个消费者?的处理方法,想解了如何通过PTHREADS管理两个或多个消费者?的问题怎么解决?如何通过PTHREADS管理两个或多个消费者?问题的解决办法?那么可以参考本文帮助大家快速定位并解决问题。

问题描述

我有一个通用的问题,我想解决的问题,其中从标准输入或常规文件流发送到应用程序的二进制数据,这又将该二进制数据转换为文本.使用线程,我想在将文本输送到下一个应用程序之前对其进行处理,该应用程序将其进一步修改,依此类推.

作为一个简单的测试用例,我想通过gunzip提取压缩数据.具体来说,我正在考虑使用gunzip -c -通过其(重新分配)stdin文件描述符提取发送给它的二进制数据的块,然后从其(重新分配)stdout文件描述符中拉出一块文本.然后,我可以将这些文本块打印到真实的stdout或stderr(或者以后再做其他事情).

(我意识到我可以在命令行上进行gzip基于基于gzip的压缩和提取.我的目标是使用此测试用例,以了解如何正确传递二进制的通用块和线程之间的文本数据通过二进制运行该数据,或进一步处理.)

在我的测试程序的情况下,我设置了三个pthread_t线程:

  • produce_gzip_chunk_thread
  • consume_gzip_chunk_thread
  • consume_gunzip_chunk_thread

i传递这些线程中的每一个共享数据实例,称为thread_data,其中包含线程锁定,两个条件以及一些缓冲区和计数器变量.我还包括一组使用popen3()打开的gunzip进程的文件描述符:

typedef struct pthread_data pthread_data_t;
typedef struct popen3_desc popen3_desc_t;

struct pthread_data {
    pthread_mutex_t in_lock;
    pthread_cond_t in_cond;
    pthread_cond_t out_cond;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes;
    size_t n_in_bytes_written_to_gunzip;
    size_t n_out_bytes_read_from_gunzip;
    FILE *in_file_ptr;
    boolean in_eof;
    char in_line[LINE_LENGTH_VALUE];
    popen3_desc_t *gunzip_ptr;
};

struct popen3_desc {
    int in;
    int out;
    int err;
};

produce_gzip_chunk_thread读取gzip的1024字节块,来自一个名为foo.gz的常规文件.

这些字节写入称为in_buf的unsigned char缓冲区,这是共享数据结构的一部分,我将传递给每个线程:

void * produce_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes = 0;

    d->in_eof = kFalse;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr);
        if (n_in_bytes > 0) {
            while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
                pthread_cond_wait(&d->in_cond, &d->in_lock);
            memcpy(d->in_buf, in_buf, n_in_bytes);
            d->n_in_bytes = n_in_bytes;
#ifdef DEBUG
            fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes);
#endif
            pthread_cond_signal(&d->in_cond);
        }
        else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr))
            break;
    } 
    d->in_eof = kTrue;
    pthread_mutex_unlock(&d->in_lock);
    pthread_cond_signal(&d->in_cond);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> produce_gzip_chunk()\n");
#endif
    return NULL;
}

一旦在n_bytes中存储了一个正数字节,也就是说,我们已经从输入gzip存档中提取数据,该数据需要使用gunzip进行处理,这触发了允许第二个线程的条件consume_gzip_chunk_thread操作:

void * consume_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_in_bytes_written_to_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes == 0 && !d->in_eof)
            pthread_cond_wait(&d->in_cond, &d->in_lock);
        if (d->n_in_bytes) {
#ifdef DEBUG
            fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes);
#endif
            if (!d->gunzip_ptr) {
#ifdef DEBUG
                fprintf(stderr, "Debug: * setting up gunzip ptr\n");
#endif
                d->gunzip_ptr = malloc(sizeof(popen3_desc_t));
                if (!d->gunzip_ptr) {
                    fprintf(stderr, "Error: Could not create gunzip file handle struct\n");
                    exit(EXIT_FAILURE);
                }

                popen3("gunzip -c -", 
                       &(d->gunzip_ptr->in), 
                       &(d->gunzip_ptr->out), 
                       &(d->gunzip_ptr->err), 
                       kTrue, 
                       kTrue);
                memset(d->in_line, 0, LINE_LENGTH_VALUE);
            }
            n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes);
#ifdef DEBUG
            fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip);
#endif
            if (n_in_bytes_written_to_gunzip > 0)
                d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip;

            d->n_in_bytes = 0;
            pthread_cond_signal(&d->out_cond);
        }
        if (d->in_eof) 
            break;
    } 
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gzip_chunk()\n");
#endif
    return NULL;
}

消耗gzip数据块时,我们使用write函数将in_buf的n_bytes发送到in_buf in_buf n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes n_bytes最后,我们将另一个线程信号发送到out_cond,以帮助Reawaken consume_gunzip_chunk_thread,它从gunzip的输出中读取以完成更多工作:

void * consume_gunzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_out_bytes_read_from_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes_written_to_gunzip == 0) {
            pthread_cond_wait(&d->out_cond, &d->in_lock);
        }
        if (d->n_in_bytes_written_to_gunzip) {
            sleep(1);
            n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE);
#ifdef DEBUG
            fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip);
            fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line);
#endif
            memset(d->in_line, 0, strlen(d->in_line));
            if (n_out_bytes_read_from_gunzip > 0)
                d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip;
            d->n_in_bytes_written_to_gunzip = 0;
            pthread_cond_signal(&d->in_cond);
        }
        if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0))
            break;
    }
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gunzip_chunk()\n");
#endif
    return NULL;
}

这将尝试read gunzip流程输出文件描述符中的任何可用字节.出于调试目的,我只想将它们打印到stderr.

我面临的问题是,在执行read之前,我需要在consume_gunzip_chunk中添加sleep(1)语句,以使事情正常工作.

没有此sleep(1)语句,我的测试程序通常不会输出任何内容,除了每8-10尝试一次,当正确提取压缩数据时一次.

问题 - 我对条件的安排做错了什么,使sleep(1)呼叫需要使gzip - 萃取正常工作?在生产方案中,使用更大的输入文件,强行等待每1kb一秒钟似乎是一个坏主意.


对于完整源代码的可重复性,这是两个相关文件.这是标题:

/*
 * convert.h
 */

#ifndef CONVERT_H
#define CONVERT_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <getopt.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>

#define CB_VERSION "1.0"
#define LINE_LENGTH_VALUE 65536
#define BUF_LENGTH_VALUE 1024
#define POPEN3_READ 0
#define POPEN3_WRITE 1

typedef int boolean;
extern const boolean kTrue;
extern const boolean kFalse;
const boolean kTrue = 1;
const boolean kFalse = 0;

typedef enum {
    kGzip,
    kUnknown
} format_t;

typedef struct pthread_data pthread_data_t;
typedef struct popen3_desc popen3_desc_t;

struct pthread_data {
    pthread_mutex_t in_lock;
    pthread_cond_t in_cond;
    pthread_cond_t out_cond;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes;
    size_t n_in_bytes_written_to_gunzip;
    size_t n_out_bytes_read_from_gunzip;
    boolean in_eof;
    FILE *in_file_ptr;
    popen3_desc_t *gunzip_ptr;
    char in_line[LINE_LENGTH_VALUE];
};

struct popen3_desc {
    int in;
    int out;
    int err;
};

static const char *name = "convert";
static const char *version = CB_VERSION;
static const char *authors = "Alex Reynolds";
static const char *usage = "\n" \
    "Usage: convert --input-format=str <input-file>\n" \
    "  Process Flags:\n\n" \
    "  --input-format=str            | -f str  Input format (str = [ gzip ]; required)\n" \
    "  --help                        | -h      Show this usage message\n";

static struct convert_globals_t {
    char *input_format_str;
    format_t input_format;
    char **filenames;
    int num_filenames;
} convert_globals;

static struct option convert_client_long_options[] = {
    { "input-format",           required_argument,  NULL,   'f' },
    { "help",               no_argument,        NULL,   'h' },
    { NULL,             no_argument,        NULL,    0  }
}; 

static const char *convert_client_opt_string = "f:h?";

void * consume_gunzip_chunk        (void *t_data);
void * consume_gzip_chunk          (void *t_data);
void * produce_gzip_chunk          (void *t_data);
FILE * new_file_ptr                (const char *in_fn);
void   delete_file_ptr             (FILE **file_ptr);
pid_t  popen3                      (const char *command, 
                                    int *in_desc, 
                                    int *out_desc, 
                                    int *err_desc, 
                                    boolean nonblock_in, 
                                    boolean nonblock_outerr);
off_t  fsize                       (const char *fn);
void   initialize_globals          ();
void   parse_command_line_options  (int argc, 
                                    char **argv);
void   print_usage                 (FILE *stream);

#endif

这是实现:

/*
 * convert.c
 */

#include "convert.h"

int main(int argc, char **argv)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> main()\n");
#endif

    pthread_t produce_gzip_chunk_thread = NULL;
    pthread_t consume_gzip_chunk_thread = NULL;
    pthread_t consume_gunzip_chunk_thread = NULL;
    pthread_data_t *thread_data = NULL;

    parse_command_line_options(argc, argv);

    /* initialize thread data */
    thread_data = malloc(sizeof(pthread_data_t));
    thread_data->n_in_bytes = 0;
    thread_data->n_in_bytes_written_to_gunzip = 0;
    thread_data->n_out_bytes_read_from_gunzip = 0;
    thread_data->in_eof = kFalse;
    thread_data->in_file_ptr = new_file_ptr(convert_globals.filenames[0]);
    pthread_mutex_init(&(thread_data->in_lock), NULL);
    pthread_cond_init(&(thread_data->in_cond), NULL);
    pthread_cond_init(&(thread_data->out_cond), NULL);

    /* parse input */
    if (convert_globals.input_format == kGzip) 
        {
            if (pthread_create(&produce_gzip_chunk_thread, NULL, produce_gzip_chunk, (void *) thread_data) != 0) {
                fprintf(stderr, "Error: Could not create gzip chunk production thread\n");
                return EXIT_FAILURE;
            }
            if (pthread_create(&consume_gzip_chunk_thread, NULL, consume_gzip_chunk, (void *) thread_data) != 0) {
                fprintf(stderr, "Error: Could not create gzip chunk consumption thread\n");            
                return EXIT_FAILURE;
            }
            if (pthread_create(&consume_gunzip_chunk_thread, NULL, consume_gunzip_chunk, (void *) thread_data) != 0) {
                fprintf(stderr, "Error: Could not create gunzip chunk consumption thread\n");            
                return EXIT_FAILURE;
            }
            if (pthread_join(produce_gzip_chunk_thread, NULL) != 0) {
                fprintf(stderr, "Error: Could not join gzip chunk production thread\n");
                return EXIT_FAILURE;
            }
            if (pthread_join(consume_gzip_chunk_thread, NULL) != 0) {
                fprintf(stderr, "Error: Could not join gzip chunk consumption thread\n");
                return EXIT_FAILURE;
            }
            if (pthread_join(consume_gunzip_chunk_thread, NULL) != 0) {
                fprintf(stderr, "Error: Could not join gunzip chunk consumption thread\n");
                return EXIT_FAILURE;
            }
        }
    else
        {
            /* 
               handle text formats
            */
        }

    /* cleanup */
    delete_file_ptr(&thread_data->in_file_ptr);
    pthread_mutex_destroy(&(thread_data->in_lock));
    pthread_cond_destroy(&(thread_data->in_cond));
    pthread_cond_destroy(&(thread_data->out_cond));
    free(thread_data);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> main()\n");
#endif
    return EXIT_SUCCESS;
}

void * consume_gunzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_out_bytes_read_from_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes_written_to_gunzip == 0) {
            pthread_cond_wait(&d->out_cond, &d->in_lock);
        }
        if (d->n_in_bytes_written_to_gunzip) {
            sleep(1);
            n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE);
#ifdef DEBUG
            fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip);
            fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line);
#endif
            memset(d->in_line, 0, strlen(d->in_line));
            if (n_out_bytes_read_from_gunzip > 0)
                d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip;
            d->n_in_bytes_written_to_gunzip = 0;
            pthread_cond_signal(&d->in_cond);
        }
        if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0))
            break;
    }
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gunzip_chunk()\n");
#endif
    return NULL;
}

void * consume_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_in_bytes_written_to_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes == 0 && !d->in_eof)
            pthread_cond_wait(&d->in_cond, &d->in_lock);
        if (d->n_in_bytes) {
#ifdef DEBUG
            fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes);
#endif
            if (!d->gunzip_ptr) {
#ifdef DEBUG
                fprintf(stderr, "Debug: * setting up gunzip ptr\n");
#endif
                d->gunzip_ptr = malloc(sizeof(popen3_desc_t));
                if (!d->gunzip_ptr) {
                    fprintf(stderr, "Error: Could not create gunzip file handle struct\n");
                    exit(EXIT_FAILURE);
                }

                popen3("gunzip -c -", 
                       &(d->gunzip_ptr->in), 
                       &(d->gunzip_ptr->out), 
                       &(d->gunzip_ptr->err), 
                       kTrue, 
                       kTrue);
                memset(d->in_line, 0, LINE_LENGTH_VALUE);
            }
            n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes);
#ifdef DEBUG
            fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip);
#endif
            if (n_in_bytes_written_to_gunzip > 0)
                d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip;

            d->n_in_bytes = 0;
            /* pthread_cond_signal(&d->in_cond); */
            pthread_cond_signal(&d->out_cond);
        }
        if (d->in_eof) 
            break;
    } 
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gzip_chunk()\n");
#endif
    return NULL;
}

void * produce_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes = 0;

    d->in_eof = kFalse;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr);
        if (n_in_bytes > 0) {
            while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
                pthread_cond_wait(&d->in_cond, &d->in_lock);
            memcpy(d->in_buf, in_buf, n_in_bytes);
            d->n_in_bytes = n_in_bytes;
#ifdef DEBUG
            fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes);
#endif
            pthread_cond_signal(&d->in_cond);
        }
        else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr))
            break;
    } 
    d->in_eof = kTrue;
    pthread_mutex_unlock(&d->in_lock);
    pthread_cond_signal(&d->in_cond);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> produce_gzip_chunk()\n");
#endif
    return NULL;
}

FILE * new_file_ptr(const char *in_fn)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> new_file_ptr()\n");
#endif

    FILE *file_ptr = NULL;
    boolean not_stdin = kTrue;

    not_stdin = strcmp(in_fn, "-");
    file_ptr = (not_stdin) ? fopen(in_fn, "r") : stdin;

    if (!file_ptr) {
        fprintf(stderr, "Error: Could not open input stream\n");
        exit(EXIT_FAILURE);
    }

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> new_file_ptr()\n");
#endif
    return file_ptr;
}

void delete_file_ptr(FILE **file_ptr)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> delete_file_ptr()\n");
#endif

    fclose(*file_ptr);
    *file_ptr = NULL;

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> delete_file_ptr()\n");
#endif
}

pid_t popen3(const char *command, int *in_desc, int *out_desc, int *err_desc, boolean nonblock_in, boolean nonblock_outerr)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> popen3()\n");
#endif

    int p_stdin[2], p_stdout[2], p_stderr[2];
    pid_t pid;

    if (pipe(p_stdin) != 0 || pipe(p_stdout) != 0 || pipe(p_stderr) != 0)
        return -1;

    if (nonblock_in) {
        fcntl(p_stdin[POPEN3_WRITE], F_SETFL, fcntl(p_stdin[POPEN3_WRITE], F_GETFL) | O_NONBLOCK);
    }

    if (nonblock_outerr) {
        fcntl(p_stdout[POPEN3_READ], F_SETFL, fcntl(p_stdout[POPEN3_READ], F_GETFL) | O_NONBLOCK);
        fcntl(p_stderr[POPEN3_READ], F_SETFL, fcntl(p_stderr[POPEN3_READ], F_GETFL) | O_NONBLOCK);
    }

    pid = fork();
    if (pid < 0)
        return pid; /* error */

    if (pid == 0) {
        close(p_stdin[POPEN3_WRITE]);
        close(p_stdout[POPEN3_READ]);
        close(p_stderr[POPEN3_READ]);
        dup2(p_stdin[POPEN3_READ], fileno(stdin));
        dup2(p_stdout[POPEN3_WRITE], fileno(stderr));
        dup2(p_stdout[POPEN3_WRITE], fileno(stdout));
        execl("/bin/sh", "sh", "-c", command, NULL);
        fprintf(stderr, "Error: Could not execl [%s]\n", command);
        exit(EXIT_FAILURE);
    }

    if (in_desc == NULL)
        close(p_stdin[POPEN3_WRITE]);
    else
        *in_desc = p_stdin[POPEN3_WRITE];

    if (out_desc == NULL)
        close(p_stdout[POPEN3_READ]);
    else
        *out_desc = p_stdout[POPEN3_READ];

    if (err_desc == NULL)
        close(p_stderr[POPEN3_READ]);
    else
        *err_desc = p_stderr[POPEN3_READ];

#ifdef DEBUG
    fprintf(stderr, "Debug: New *in_desc  = %d\n", *in_desc);
    fprintf(stderr, "Debug: New *out_desc = %d\n", *out_desc);
    fprintf(stderr, "Debug: New *err_desc = %d\n", *err_desc);
#endif

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> popen3()\n");
#endif
    return pid;
}

off_t fsize(const char *fn) 
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> fsize()\n");
#endif

    struct stat st; 

    if (stat(fn, &st) == 0)
        return st.st_size;

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> fsize()\n");
#endif
    return EXIT_FAILURE; 
}

void initialize_globals()
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> initialize_globals()\n");
#endif

    convert_globals.input_format = kUnknown;
    convert_globals.filenames = NULL;
    convert_globals.num_filenames = 0;

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> initialize_globals()\n");
#endif
}

void parse_command_line_options(int argc, char **argv)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> parse_command_line_options()\n");
#endif

    int client_long_index;
    int client_opt = getopt_long(argc, 
                                 argv, 
                                 convert_client_opt_string, 
                                 convert_client_long_options, 
                                 &client_long_index);
    char *in_format_str = NULL;

    opterr = 0; /* disable error reporting by GNU getopt */
    initialize_globals();

    while (client_opt != -1) 
        {
            switch (client_opt) 
                {
                case 'f':
                    in_format_str = optarg;
                    break;
                case 'h':
                    print_usage(stdout);
                    exit(EXIT_SUCCESS);
                case '?':
                    print_usage(stdout);
                    exit(EXIT_SUCCESS);
                default:
                    break;
                }
            client_opt = getopt_long(argc, 
                                     argv, 
                                     convert_client_opt_string, 
                                     convert_client_long_options, 
                                     &client_long_index);
        }

    convert_globals.filenames = argv + optind;
    convert_globals.num_filenames = argc - optind;    

    if (!in_format_str) {
        fprintf(stderr, "Error: Specified input format was omitted; please specify one of required input formats\n");
        print_usage(stderr);
        exit(EXIT_FAILURE);
    }
    else if (convert_globals.num_filenames != 1) {
        fprintf(stderr, "Error: Please specify an input file (either a regular file or '-' for stdin\n");
        print_usage(stderr);
        exit(EXIT_FAILURE);
    }

    /* map format string to setting */
    if (strcmp(in_format_str, "gzip") == 0)
        convert_globals.input_format = kGzip;
    else {
        fprintf(stderr, "Error: Specified input format is unknown; please specify one of required input formats\n");
        print_usage(stderr);
        exit(EXIT_FAILURE);
    }

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> parse_command_line_options()\n");
#endif
}

void print_usage(FILE *stream)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> print_usage()\n");
#endif

    fprintf(stream, 
            "%s\n" \
            "  version: %s\n" \
            "  author:  %s\n" \
            "%s\n", 
            name, 
            version,
            authors,
            usage);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> print_usage()\n");
#endif
}

这是构建过程:

$ mkdir -p objects
$ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline -c convert.c -o objects/convert.o -iquote${PWD}                                                        
$ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline objects/convert.o -o convert -lpthread

我能够在OS X和Linux主机上构建此测试代码,并具有合理的现代编译环境.

事先感谢您的任何有用建议!

推荐答案

我首先要说我觉得pthreads条件和静音并不是真正必要的,也不是对您描述的问题的最佳反应.

我认为,您所描述的问题和无静脉版本是忘记close()顽固的管道末端的症状管道喂养儿童过程的stdin泄漏(进入那个孩子或其他人)活着.

然后,鉴于与Stdin的阅读端相对应的写入端仍然存在,该系统没有给出EOF,而是无限期地阻止.

在您的情况下,您 did 防止管道端文件描述符泄漏到产卵的孩子(在您的fork()的子侧上打电话正确close() C13>,尽管您忘记了close()错误的管道在父侧结束).但是,您并没有阻止所有其他孩子的泄漏!如果您致电popen3()两次,则防止三个描述符的泄漏到孩子中,但是由于父母仍然拥有它们,当下一个呼叫popen3()发生时,fork()现在有fork()现在有 6 要关闭的文件描述符(三个旧组,以及您刚创建的三个集合).

在您的情况下,您应该在这些管道末端设置关闭符号标志,因此:

fcntl(fdIn [PIPEWR], F_SETFD, fcntl(fdIn [PIPEWR], F_GETFD) | FD_CLOEXEC);
fcntl(fdOut[PIPERD], F_SETFD, fcntl(fdOut[PIPERD], F_GETFD) | FD_CLOEXEC);
fcntl(fdErr[PIPERD], F_SETFD, fcntl(fdErr[PIPERD], F_GETFD) | FD_CLOEXEC);

这是催生6个线程和3个进程的代码,并在内部压缩后将其输入未修饰传递给输出,然后将其解压缩.它有效地实现了gzip -c - | XOR 0x55 | XOR 0x55 | gunzip -c - | cat,其中:

  1. 标准输入被Thread srcThrd.
  2. gzip的输出由线程a2xor0Thrd读取,并馈入线程xor0Thrd.
  3. 线程xor0Thrd在将其传递到线程xor1Thrd之前,用0x55 XORS输入.
  4. 线程xor1Thrd在将其传递到线程xor22BThrd之前,用0x55 XORS输入.
  5. 线程xor22BThrd将其输入输入到Process gunzip.
  6. Process gunzip将其输出直接馈送(不通过线程)cat
  7. Process cat的输出由线程dstThrd读取并打印到标准输出.

压缩是通过程序间管道通信完成的,而Xoring是通过进程内管道通信完成的.不使用静音变量或条件变量. main()非常容易理解.此代码应该易于扩展到您的情况.

/* Includes */
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>



/* Defines */
#define PIPERD 0
#define PIPEWR 1




/* Data structures */
typedef struct PIPESET{
    int Ain[2];
    int Aout[2];
    int Aerr[2];
    int xor0[2];
    int xor1[2];
    int xor2[2];
    int Bin[2];
    int BoutCin[2];
    int Berr[2];
    int Cout[2];
    int Cerr[2];
} PIPESET;




/* Function Implementations */

/**
 * Source thread main method.
 * 
 * Slurps from standard input and feeds process A.
 */

void* srcThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(0, &c, 1) > 0){
        write(pipeset->Ain[PIPEWR], &c, 1);
    }

    close(pipeset->Ain[PIPEWR]);

    pthread_exit(NULL);
}

/**
 * A to XOR0 thread main method.
 * 
 * Manually pipes from standard output of process A to input of thread XOR0.
 */

void* a2xor0ThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char    buf[65536];
    ssize_t bytesRead;

    while((bytesRead = read(pipeset->Aout[PIPERD], buf, 65536)) > 0){
        write(pipeset->xor0[PIPEWR], buf, bytesRead);
    }

    close(pipeset->xor0[PIPEWR]);

    pthread_exit(NULL);
}

/**
 * XOR0 thread main method.
 * 
 * XORs input with 0x55 and outputs to input of XOR1.
 */

void* xor0ThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(pipeset->xor0[PIPERD], &c, 1) > 0){
        c ^= 0x55;
        write(pipeset->xor1[PIPEWR], &c, 1);
    }

    close(pipeset->xor1[PIPEWR]);

    pthread_exit(NULL);
}

/**
 * XOR1 thread main method.
 * 
 * XORs input with 0x55 and outputs to input of process B.
 */

void* xor1ThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(pipeset->xor1[PIPERD], &c, 1) > 0){
        c ^= 0x55;
        write(pipeset->xor2[PIPEWR], &c, 1);
    }

    close(pipeset->xor2[PIPEWR]);

    pthread_exit(NULL);
}

/**
 * XOR2 to B thread main method.
 * 
 * Manually pipes from input (output of XOR1) to input of process B.
 */

void* xor22BThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char    buf[65536];
    ssize_t bytesRead;

    while((bytesRead = read(pipeset->xor2[PIPERD], buf, 65536)) > 0){
        write(pipeset->Bin[PIPEWR], buf, bytesRead);
    }

    close(pipeset->Bin[PIPEWR]);

    pthread_exit(NULL);
}

/**
 * Destination thread main method.
 * 
 * Manually copies the standard output of process C to the standard output.
 */

void* dstThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(pipeset->Cout[PIPERD], &c, 1) > 0){
        write(1, &c, 1);
    }

    pthread_exit(NULL);
}

/**
 * Set close on exec flag on given descriptor.
 */

void setCloExecFlag(int fd){
    fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
}

/**
 * Set close on exec flag on given descriptor.
 */

void unsetCloExecFlag(int fd){
    fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) & ~FD_CLOEXEC);
}

/**
 * Pipe4.
 * 
 * Create a pipe with some ends possibly marked close-on-exec.
 */

#define PIPE4_FLAG_NONE       (0U)
#define PIPE4_FLAG_RD_CLOEXEC (1U << 0)
#define PIPE4_FLAG_WR_CLOEXEC (1U << 1)

int pipe4(int fd[2], int flags){
    int ret = pipe(fd);

    if(flags&PIPE4_FLAG_RD_CLOEXEC){setCloExecFlag(fd[PIPERD]);}
    if(flags&PIPE4_FLAG_WR_CLOEXEC){setCloExecFlag(fd[PIPEWR]);}

    return ret;
}

/**
 * Pipe4 explicit derivatives.
 */

#define pipe4_cloexec(fd)  pipe4((fd), PIPE4_FLAG_RD_CLOEXEC|PIPE4_FLAG_WR_CLOEXEC)

/**
 * Popen4.
 * 
 * General-case for spawning a process and tethering it with cloexec pipes on stdin,
 * stdout and stderr.
 * 
 * @param [in]      cmd    The command to execute.
 * @param [in/out]  pin    The pointer to the cloexec pipe for stdin.
 * @param [in/out]  pout   The pointer to the cloexec pipe for stdout.
 * @param [in/out]  perr   The pointer to the cloexec pipe for stderr.
 * @param [in]      flags  A bitwise OR of flags to this function. Available
 *                         flags are:
 * 
 *     POPEN4_FLAG_NONE:
 *         Explicitly specify no flags.
 *     POPEN4_FLAG_NOCLOSE_PARENT_STDIN,
 *     POPEN4_FLAG_NOCLOSE_PARENT_STDOUT,
 *     POPEN4_FLAG_NOCLOSE_PARENT_STDERR:
 *         Don't close pin[PIPERD], pout[PIPEWR] and perr[PIPEWR] in the parent,
 *         respectively.
 *     POPEN4_FLAG_CLOSE_CHILD_STDIN,
 *     POPEN4_FLAG_CLOSE_CHILD_STDOUT,
 *     POPEN4_FLAG_CLOSE_CHILD_STDERR:
 *         Close the respective streams in the child. Ignores pin, pout and perr
 *         entirely. Overrides a NOCLOSE_PARENT flag for the same stream.
 */

#define POPEN4_FLAG_NONE                             (0U)
#define POPEN4_FLAG_NOCLOSE_PARENT_STDIN        (1U << 0)
#define POPEN4_FLAG_NOCLOSE_PARENT_STDOUT       (1U << 1)
#define POPEN4_FLAG_NOCLOSE_PARENT_STDERR       (1U << 2)
#define POPEN4_FLAG_CLOSE_CHILD_STDIN           (1U << 3)
#define POPEN4_FLAG_CLOSE_CHILD_STDOUT          (1U << 4)
#define POPEN4_FLAG_CLOSE_CHILD_STDERR          (1U << 5)

pid_t popen4(const char* cmd, int pin[2], int pout[2], int perr[2], int flags){
    /********************
     **  FORK PROCESS  **
     ********************/
    pid_t ret = fork();

    if(ret < 0){
        /**
         * Error in fork(), still in parent.
         */

        fprintf(stderr, "fork() failed!\n");
        return ret;
    }else if(ret == 0){
        /**
         * Child-side of fork
         */

        if(flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){
            close(0);
        }else{
            unsetCloExecFlag(pin [PIPERD]);
            dup2(pin [PIPERD], 0);
        }
        if(flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){
            close(1);
        }else{
            unsetCloExecFlag(pout[PIPEWR]);
            dup2(pout[PIPEWR], 1);
        }
        if(flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){
            close(2);
        }else{
            unsetCloExecFlag(perr[PIPEWR]);
            dup2(perr[PIPEWR], 2);
        }

        execl("/bin/sh", "sh", "-c", cmd, NULL);

        fprintf(stderr, "exec() failed!\n");
        exit(-1);
    }else{
        /**
         * Parent-side of fork
         */

        if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDIN  &&
           ~flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){
            close(pin [PIPERD]);
        }
        if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDOUT &&
           ~flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){
            close(pout[PIPEWR]);
        }
        if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDERR &&
           ~flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){
            close(perr[PIPEWR]);
        }

        return ret;
    }

    /* Unreachable */
    return ret;
}

/**
 * Main Function.
 * 
 * Sets up the whole piping scheme.
 */

int main(int argc, char* argv[]){
    pthread_t srcThrd, a2xor0Thrd, xor0Thrd, xor1Thrd, xor22BThrd, dstThrd;
    pid_t     gzip, gunzip, cat;
    PIPESET   pipeset;

    pipe4_cloexec(pipeset.Ain);
    pipe4_cloexec(pipeset.Aout);
    pipe4_cloexec(pipeset.Aerr);
    pipe4_cloexec(pipeset.Bin);
    pipe4_cloexec(pipeset.BoutCin);
    pipe4_cloexec(pipeset.Berr);
    pipe4_cloexec(pipeset.Cout);
    pipe4_cloexec(pipeset.Cerr);
    pipe4_cloexec(pipeset.xor0);
    pipe4_cloexec(pipeset.xor1);
    pipe4_cloexec(pipeset.xor2);

    /* Spawn processes */
    gzip   = popen4("gzip -c -",   pipeset.Ain,     pipeset.Aout,    pipeset.Aerr, POPEN4_FLAG_NONE);
    gunzip = popen4("gunzip -c -", pipeset.Bin,     pipeset.BoutCin, pipeset.Berr, POPEN4_FLAG_NONE);
    cat    = popen4("cat",         pipeset.BoutCin, pipeset.Cout,    pipeset.Cerr, POPEN4_FLAG_NONE);


    /* Spawn threads */
    pthread_create(&srcThrd,    NULL, srcThrdMain,    &pipeset);
    pthread_create(&a2xor0Thrd, NULL, a2xor0ThrdMain, &pipeset);
    pthread_create(&xor0Thrd,   NULL, xor0ThrdMain,   &pipeset);
    pthread_create(&xor1Thrd,   NULL, xor1ThrdMain,   &pipeset);
    pthread_create(&xor22BThrd, NULL, xor22BThrdMain, &pipeset);
    pthread_create(&dstThrd,    NULL, dstThrdMain,    &pipeset);
    pthread_join(srcThrd,    (void**)NULL);
    pthread_join(a2xor0Thrd, (void**)NULL);
    pthread_join(xor0Thrd,   (void**)NULL);
    pthread_join(xor1Thrd,   (void**)NULL);
    pthread_join(xor22BThrd, (void**)NULL);
    pthread_join(dstThrd,    (void**)NULL);
    return 0;
}

关于您自己的代码的评论

您的代码有很多问题,其中大多数与线程无关.

赞美

以至于听起来不太消极,我想说的是,您的问题总体上不是由于滥用Pthreads API所致,而是由于错误的消费者逻辑和缺乏close() s.此外,您似乎明白pthread_cond_wait()可能会醒来伪善,所以您已经包装了它在检查不变的循环中.

将来:

我将使用管道,甚至在线程之间.这使您无法实施自己的消费者计划;内核已经为您解决了它,并为您提供了pipe(),read()和write() primitives,这就是您需要利用这种现成的解决方案的全部.它还使代码清洁器和静音变量无效.必须简单地努力地关闭末端,并且必须在fork()存在的情况下非常小心.规则很简单:

  • 如果存在管道的写入末端,则在开放式读取端上的read()不会给出EOF,但会阻止或EAGAIN.
  • 如果管道的写入端都已关闭,则在开放式读取端上A read()将给出EOF.
  • 如果管道的读取端都已关闭,则write()的任何写入端将导致SIGPIPE.
  • fork()重复整个过程,包括所有描述符(Modulo也许在pthread_atfork()中疯狂的东西)!

其他推荐答案

啊.所以我想我误解了这个问题....对不起.

我以为您想运行gunzip,然后再运行另一个内部过滤器,并想做" n"时间.

看来,您真正想做的是运行许多过滤器阶段,一个接一个地……有些使用外部命令,一些(也许是?)是该程序的内部.因此,渴望管理一些阶段间缓冲.

所以...我又去了.目的是运行任何数量的阶段,从输入阶段开始,然后是外部命令或内部功能"滤波器"阶段,最后是输出阶段.每个外部命令阶段都有三个pthreads-用于stdin,stdout和stderr.内部功能阶段使用一个pthread和初始输入和最终输出一个pthread.在阶段之间是一个小管结构(称为"稻草")来"双重缓冲区"并将阶段解除...我希望这更接近您的想法.

"稻草"是事物的本质:

struct straw
{
  pthread_mutex_t*  mutex ;

  struct lump*  free ;
  pthread_cond_t* free_cond ;
  bool          free_waiting ;

  struct lump*  ready ;
  pthread_cond_t* ready_cond ;
  bool          ready_waiting ;

  struct lump*  lumps[2] ;
} ;

其中a struct lump包含一个缓冲区和什么. "稻草"有两个这样的"肿块",在任何时候,一个pthread都可能填满了一个肿块,而另一个pthread则排出另一个.或两个肿块可能是免费的(在免费列表上)或两者都准备好(完整)在就绪列表上等待.

然后要为一个空的肿块填充(例如,从管道上阅读时):

static struct lump*
lump_acquire(struct straw* strw)
{
  struct lump* lmp ;

  pthread_mutex_lock(strw->mutex) ;

  while (strw->free == NULL)
    {
      strw->free_waiting = true ;
      pthread_cond_wait(strw->free_cond, strw->mutex) ;
      strw->free_waiting = false ;
    } ;

  lmp = strw->free ;
  strw->free = lmp->next ;

  pthread_mutex_unlock(strw->mutex) ;

  lmp->next = NULL ;                    /* tidy                 */

  lmp->ptr  = lmp->end = lmp->buff ;    /* empty                */
  lmp->done = false ;

  return lmp ;
} ;

然后将完整的肿块吹入稻草的(一端).

static void
lump_blow(struct lump* lmp)
{
  struct straw* strw ;

  strw = lmp->strw ;

  qassert((lmp == strw->lumps[0]) || (lmp == strw->lumps[1])) ;
  qassert( (lmp->buff <= lmp->ptr)
                     && (lmp->ptr <= lmp->end)
                                 && (lmp->end <= lmp->limit) ) ;

  lmp->ptr = lmp->buff ;

  pthread_mutex_lock(strw->mutex) ;

  if (strw->ready == NULL)
    strw->ready = lmp ;
  else
    strw->ready->next = lmp ;
  lmp->next = NULL ;

  if (strw->ready_waiting)
    pthread_cond_signal(strw->ready_cond) ;

  pthread_mutex_unlock(strw->mutex) ;
} ;

从稻草的另一端吮吸一个团块:

static struct lump*
lump_suck(struct straw* strw)
{
  struct lump* lmp ;

  pthread_mutex_lock(strw->mutex) ;

  while (strw->ready == NULL)
    {
      strw->ready_waiting = true ;
      pthread_cond_wait(strw->ready_cond, strw->mutex) ;
      strw->ready_waiting = false ;
    } ;

  lmp = strw->ready ;
  strw->ready = lmp->next ;

  pthread_mutex_unlock(strw->mutex) ;

  qassert( (lmp->buff <= lmp->ptr)
                     && (lmp->ptr <= lmp->end)
                                 && (lmp->end <= lmp->limit) ) ;

  lmp->ptr = lmp->buff ;        /* lmp->ptr..lmp->end   */
  lmp->next = NULL ;            /* tidy                 */

  return lmp ;
} ;

和最后一块,一旦排干了肿块:

static void
lump_free(struct lump* lmp)
{
  struct straw* strw ;

  strw = lmp->strw ;

  qassert((lmp == strw->lumps[0]) || (lmp == strw->lumps[1])) ;
  qassert( (lmp->buff <= lmp->ptr)
                     && (lmp->ptr <= lmp->end)
                                 && (lmp->end <= lmp->limit) ) ;

  pthread_mutex_lock(strw->mutex) ;

  if (strw->free == NULL)
    strw->free = lmp ;
  else
    strw->free->next = lmp ;

  lmp->next = NULL ;                    /* end of list of free  */

  lmp->ptr  = lmp->end = lmp->buff ;    /* empty                */
  lmp->done = false ;

  if (strw->free_waiting)
    pthread_cond_signal(strw->free_cond) ;

  pthread_mutex_unlock(strw->mutex) ;
} ;

整个程序太大了,无法适应答案 - 请参阅: pipework.c 开始的地方:

/*==============================================================================
 * pipework.c
 *
 * Copyright (c) Chris Hall (GMCH) 2014, All rights reserved.
 *
 * Though you may do what you like with this, provided you recognise that
 * it is offered "as is", gratis, and may or may not be fit for any purpose
 * whatsoever -- you are on your own.
 *
 *------------------------------------------------------------------------------
 *
 * This will read from stdin, pass the data through an arbitrary number of
 * "filter" stages and finally write the result to stdout.
 *
 * A filter stage may be an external command taking a piped stdin and
 * outputting to a piped stdout.  Anything it says to stderr is collected
 * and output to the program's stderr.
 *
 * A filter stage may also be an internal function.
 *
 * The input, filter and output stages are implemented as a number of pthreads,
 * with internal, miniature pipes (called "straws") between them.  All I/O is
 * blocking.  This is an experiment in the use of pthreads to simplify things.
 *
 * ============================
 * This is v0.08 of  4-Jul-2014
 * ============================
 *
 * The 'main' below runs eight stages: input, 4 commands, 2 internal filters
 * and the output.  The output should be an exact copy of the input.
 *
 * In order to test the stderr handling, the following small perl script is
 * used as two of the command filters:
 *
 *   chatter.pl
 *   --------------------------------------------------------
     use strict ;
     use warnings ;

     my $line = 0 ;
     while (<STDIN>)
       {
         my $len = length($_) ;
         my $r   = rand ;

         $line += 1 ;
         print STDERR "|$line:$len:$r|" ;

         if (int($r * 100) == 0)
           {
             print STDERR "\n" ;
           } ;

         print $_ ;
       } ;
 *   --------------------------------------------------------
 *
 *------------------------------------------------------------------------------
 * Limitations
 *
 *   * this will crash if it gets some error its not expecting or not
 *     designed to overcome.  Clearly, to be useful this needs to be more
 *     robust and more informative.
 *
 *   * some (possible/theoretical) errors are simply ignored.
 *
 *   * no attempt is made to collect up the child processes or to discover
 *     their return codes.  If the child process reports errors or anything
 *     else on stderr, then that will be visible.  But otherwise, if it just
 *     crashes then the pipeline will run to completion, but the result may
 *     be nonsense.
 *
 *   * if one of the child processes stalls, the whole thing stalls.
 *
 *   * an I/O error in a stage will send 'end' downstream, but the program
 *     will continue until everything upstream has completed.
 *
 *   * generally... not intended for production use !!
 */

和perl脚本可以: chatter.pl

hth

本文地址:https://www.itbaoku.cn/post/359376.html