国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

線程池-手寫(xiě)線程池Linux C簡(jiǎn)單版本(生產(chǎn)者-消費(fèi)者模型)

這篇具有很好參考價(jià)值的文章主要介紹了線程池-手寫(xiě)線程池Linux C簡(jiǎn)單版本(生產(chǎn)者-消費(fèi)者模型)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

簡(jiǎn)介

本線程池采用C語(yǔ)言實(shí)現(xiàn)

線程池的場(chǎng)景:

當(dāng)某些任務(wù)特別耗時(shí)(例如大量的IO讀寫(xiě)操作),嚴(yán)重影響線程其他的任務(wù)的執(zhí)行,可以使用線程池

線程池的一般特點(diǎn):

線程池通常是一個(gè)生產(chǎn)者-消費(fèi)者模型
生產(chǎn)者線程用于發(fā)布任務(wù),任務(wù)通常保存在任務(wù)隊(duì)列中
線程池作為消費(fèi)者,用于取出任務(wù),執(zhí)行任務(wù)

線程池中線程數(shù)量的選擇:

有一個(gè)經(jīng)驗(yàn)公式: 線程數(shù)量 =(io等待時(shí)間+cpu運(yùn)算時(shí)間)*核心數(shù)/cpu運(yùn)算時(shí)間

因此可以根據(jù)經(jīng)驗(yàn)公式得出下面兩種場(chǎng)景的線程數(shù)量:

  • cpu密集任務(wù):線程數(shù)量=核心數(shù)(即上面的公式假設(shè)cpu運(yùn)算時(shí)間>>io等待時(shí)間)
  • io密集任務(wù):線程數(shù)量=2*n+2

手寫(xiě)線程池

線程池代碼結(jié)構(gòu):

  • thread_pool_create:創(chuàng)建線程池所需要的資源,包含不限于任務(wù)隊(duì)列,子線程的創(chuàng)建。
  • thread_pool_post:用于任務(wù)的發(fā)布,將執(zhí)行任務(wù)存在任務(wù)隊(duì)列中。
  • thread_pool_destroy:用于線程池的退出,以及資源的銷毀。
  • wait_all_done:join線程池所有子線程,等待回收子線程。
  • thread_worker:用于任務(wù)執(zhí)行。

主要的核心點(diǎn)集中在thread_pool_post和thread_worker兩個(gè)函數(shù)中,這兩個(gè)函數(shù)也構(gòu)成了生產(chǎn)者-消費(fèi)者模型。本文采用隊(duì)列+互斥鎖+條件變量實(shí)現(xiàn)。

線程池結(jié)構(gòu)體分析

由于C語(yǔ)言不像C++可以用類封裝函數(shù),因此線程池會(huì)使用結(jié)構(gòu)體來(lái)封裝一些變量或者函數(shù)指針。

task_t

封裝任務(wù)的入口指針以及參數(shù)。

typedef struct task_t {
    handler_pt func;
    void * arg;
} task_t;

task_queue_t

封裝任務(wù)隊(duì)列,為了不頻繁移動(dòng)隊(duì)列中數(shù)據(jù),此處采用頭尾索引來(lái)標(biāo)記任務(wù)。

typedef struct task_queue_t {
    uint32_t head;
    uint32_t tail;
    uint32_t count;
    task_t *queue;
} task_queue_t;

thread_pool_t

包含互斥鎖,條件變量,任務(wù)隊(duì)列等信息

struct thread_pool_t {
    pthread_mutex_t mutex;
    pthread_cond_t condition; //條件變量
    pthread_t *threads; //線程
    task_queue_t task_queue; //任務(wù)隊(duì)列

    int closed; //是否關(guān)閉線程池執(zhí)行的標(biāo)志,為1表示關(guān)閉
    int started; // 當(dāng)前正在運(yùn)行的線程數(shù)

    int thrd_count; //線程數(shù)
    int queue_size; //任務(wù)隊(duì)列大小
};

其中closed:表示是否關(guān)閉線程池執(zhí)行的標(biāo)志,為1表示關(guān)閉。在線程的運(yùn)行函數(shù)中,用來(lái)判斷是否繼續(xù)循環(huán)等待執(zhí)行任務(wù)隊(duì)列中的任務(wù)。
started:表示當(dāng)前正在運(yùn)行的線程數(shù)。在thread_pool_destroy函數(shù)中銷毀線程池時(shí),需要等待所有線程停止才行,即started == 0

線程池函數(shù)分析

thread_pool_create

創(chuàng)建線程池,初始化一些線程池屬性
通過(guò)循環(huán)pthread_create函數(shù)創(chuàng)建子線程。

thread_pool_t *thread_pool_create(int thrd_count, int queue_size) {
    thread_pool_t *pool;

    if (thrd_count <= 0 || queue_size <= 0) {
        return NULL;
    }
    pool = (thread_pool_t*) malloc(sizeof(*pool));
    if (pool == NULL) {
        return NULL;
    }
    pool->thrd_count = 0;
    pool->queue_size = queue_size;
    pool->task_queue.head = 0;
    pool->task_queue.tail = 0;
    pool->task_queue.count = 0;

    pool->started = pool->closed = 0;

    pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);
    if (pool->task_queue.queue == NULL) {
        // TODO: free pool
        return NULL;
    }

    pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);
    if (pool->threads == NULL) {
        // TODO: free pool
        return NULL;
    }
    int i = 0;
    for (; i < thrd_count; i++) {
        if (pthread_create(&(pool ->threads[i]), NULL, thread_worker, (void*)pool) != 0) {
            // TODO: free pool
            return NULL;
        }
        pool->thrd_count++;
        pool->started++;
    }
    return pool;
}

thread_pool_post

作為生產(chǎn)者,往任務(wù)隊(duì)列里面添加任務(wù)
通過(guò)pthread_cond_signal通知子喚醒子線程的pthread_cond_wait

int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {
    if (pool == NULL || func == NULL) {
        return -1;
    }
    task_queue_t *task_queue = &(pool->task_queue);
//此處用自旋鎖會(huì)更節(jié)省消耗,因?yàn)殒i里面的邏輯比較簡(jiǎn)單
    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }

    if (pool->closed) {
        pthread_mutex_unlock(&(pool->mutex));
        return -3;
    }

    if (task_queue->count == pool->queue_size) {
        pthread_mutex_unlock(&(pool->mutex));
        return -4;
    }
//避免queue數(shù)據(jù)的變化,采用頭尾索引來(lái)標(biāo)識(shí)
    task_queue->queue[task_queue->tail].func = func;
    task_queue->queue[task_queue->tail].arg = arg;
    task_queue->tail = (task_queue->tail + 1) % pool->queue_size;
    task_queue->count++;
//喚醒一個(gè)休眠的線程
    if (pthread_cond_signal(&(pool->condition)) != 0) {
        pthread_mutex_unlock(&(pool->mutex));
        return -5;
    }
    pthread_mutex_unlock(&(pool->mutex));
    return 0;
}

thread_worker

pthread_cond_wait等待任務(wù)的喚醒
作為消費(fèi)者, (*(task.func))(task.arg);執(zhí)行任務(wù)

static void *thread_worker(void *thrd_pool) {
    thread_pool_t *pool = (thread_pool_t*)thrd_pool;
    task_queue_t *que;
    task_t task;
    for (;;) {
        pthread_mutex_lock(&(pool->mutex));
        que = &pool->task_queue;
        while (que->count == 0 && pool->closed == 0) {
            // 阻塞在 condition,等待任務(wù)隊(duì)列添加任務(wù)
            pthread_cond_wait(&(pool->condition), &(pool->mutex));
        }
        if (pool->closed == 1 && que->count == 0) break;//沒(méi)有任務(wù),并且關(guān)閉標(biāo)志打開(kāi),即跳出循環(huán)
        task = que->queue[que->head];
        que->head = (que->head + 1) % pool->queue_size;
        que->count--;
        pthread_mutex_unlock(&(pool->mutex));
        (*(task.func))(task.arg);//執(zhí)行對(duì)應(yīng)任務(wù)函數(shù)
    }
    pool->started--;//跳出循環(huán)之后,運(yùn)行線程數(shù)需要減1
    pthread_mutex_unlock(&(pool->mutex));
    pthread_exit(NULL);
    return NULL;
}

thread_pool_destroy

銷毀釋放線程池,置 pool->closed = 1;
通過(guò)pthread_cond_broadcast喚醒線程池所有線程,這個(gè)和thread_pool_post里的pthread_cond_signal一樣,并且broadcast會(huì)通知到所有的線程

int thread_pool_destroy(thread_pool_t *pool) {
    if (pool == NULL) {
        return -1;
    }

    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }

    if (pool->closed) {
        thread_pool_free(pool);
        return -3;
    }
    pool->closed = 1;
//廣播形式,通知所有阻塞在condition的線程接觸阻塞
    if (pthread_cond_broadcast(&(pool->condition)) != 0 || 
            pthread_mutex_unlock(&(pool->mutex)) != 0) {
        thread_pool_free(pool);
        return -4;
    }
    wait_all_done(pool);
    thread_pool_free(pool);
    return 0;
}

wait_all_done

將所有線程通過(guò)pthread_join回收,所有子線程任務(wù)執(zhí)行完畢,回收線程

int wait_all_done(thread_pool_t *pool) {
    printf("wait_all_done start!pool->thrd_count:%d\n", pool->thrd_count);
    int i, ret=0;
    for (i=0; i < pool->thrd_count; i++) {
        printf("wait_all_done doing! i:%d\n", i);
        if (pthread_join(pool->threads[i], NULL) != 0) {
            ret=1;
        }
        
    }
    printf("wait_all_done end!\n");
    return ret;
}

thread_pool_free

釋放線程池空間

static void thread_pool_free(thread_pool_t *pool) {
    if (pool == NULL || pool->started > 0) {
        return;
    }

    if (pool->threads) {
        free(pool->threads);
        pool->threads = NULL;

        pthread_mutex_lock(&(pool->mutex));
        pthread_mutex_destroy(&pool->mutex);
        pthread_cond_destroy(&pool->condition);
    }

    if (pool->task_queue.queue) {
        free(pool->task_queue.queue);
        pool->task_queue.queue = NULL;
    }
    free(pool);
}

主函數(shù)調(diào)用

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#include "thrd_pool.h"

int nums = 0;
int done = 0;
int task_num = 100;

pthread_mutex_t lock;

void do_task(void *arg) {
    usleep(10000);
    pthread_mutex_lock(&lock);
    done++;
    printf("doing %d task\n", done);
    pthread_mutex_unlock(&lock);
}

int main(int argc, char **argv) {
    int threads = 8;
    int queue_size = 256;

    if (argc == 2) {
        threads = atoi(argv[1]);
        if (threads <= 0) {
            printf("threads number error: %d\n", threads);
            return 1;
        }
    } else if (argc > 2) {
        threads = atoi(argv[1]);
        queue_size = atoi(argv[1]);
        if (threads <= 0 || queue_size <= 0) {
            printf("threads number or queue size error: %d,%d\n", threads, queue_size);
            return 1;
        }
    }

    thread_pool_t *pool = thread_pool_create(threads, queue_size);
    if (pool == NULL) {
        printf("thread pool create error!\n");
        return 1;
    }

    while (thread_pool_post(pool, &do_task, NULL) == 0) {
        pthread_mutex_lock(&lock);
        nums++;
        pthread_mutex_unlock(&lock);
        if (nums > task_num) break;
    }

    printf("add %d tasks\n", nums);
    usleep(1000000);//延時(shí)等待所有的作業(yè)完成

    printf("did %d tasks\n", done);
    thread_pool_destroy(pool);
    return 0;
}

運(yùn)行結(jié)果

使用指令編譯文件:

gcc main.c thrd_pool.c -o main -lpthread

運(yùn)行執(zhí)行文件得到運(yùn)行結(jié)果
線程池-手寫(xiě)線程池Linux C簡(jiǎn)單版本(生產(chǎn)者-消費(fèi)者模型),線程池,linux,c語(yǔ)言,c++線程池-手寫(xiě)線程池Linux C簡(jiǎn)單版本(生產(chǎn)者-消費(fèi)者模型),線程池,linux,c語(yǔ)言,c++

完整代碼下載線程池Linux C語(yǔ)言簡(jiǎn)單版本文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-627157.html

到了這里,關(guān)于線程池-手寫(xiě)線程池Linux C簡(jiǎn)單版本(生產(chǎn)者-消費(fèi)者模型)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包