国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Linux——生產(chǎn)者消費者模型和信號量

這篇具有很好參考價值的文章主要介紹了Linux——生產(chǎn)者消費者模型和信號量。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄???????

基于BlockingQueue的生產(chǎn)者消費者模型

概念

條件變量的第二個參數(shù)的作用

?鎖的作用

生產(chǎn)者消費者模型的高效性

生產(chǎn)者而言,向blockqueue里面放置任務(wù)

消費者而言,從blockqueue里面拿取任務(wù):

總結(jié)

完整代碼(不含存儲數(shù)據(jù)的線程)

完整代碼(含存儲線程)?

信號量

1、先發(fā)現(xiàn)我們之前寫的代碼不足的地方。

?關(guān)于臨界資源

總結(jié)

信號量的概念

申請信號量的本質(zhì)

申請信號量的情況

信號量作為計數(shù)器的操作

信號量相關(guān)函數(shù)

初始化信號量

等待信號量

發(fā)布信號量

基于RingQueue的生產(chǎn)者消費者模型

引入環(huán)形隊列

?生產(chǎn)和消費在什么情況下可能訪問同一個位置

?舉個例子

生產(chǎn)者而言

消費者而言

總結(jié)?

環(huán)形隊列的實現(xiàn)(代碼)


基于BlockingQueue的生產(chǎn)者消費者模型

概念

在多線程編程中阻塞隊列(Blocking Queue)是一種常用于實現(xiàn)生產(chǎn)者和消費者模型的數(shù)據(jù)結(jié)構(gòu)。
其與普通的隊列區(qū)別在于:
?? ?當(dāng)隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入了元素;
?? ?當(dāng)隊列滿時,往隊列里存放元素的操作也會被阻塞,直到有元素被從隊列中取出(以上的操作都是基于不同的線程來說的,線程在對阻塞隊列進(jìn)程操作時會被阻塞)。

條件變量的第二個參數(shù)的作用

?因為如果隊列為滿那么對應(yīng)進(jìn)來的線程將等待(被掛起)
由于自身已經(jīng)擁有鎖的緣故,鎖就不會被釋放了
將第二個參數(shù)的鎖傳過去的作用就是,如果該線程被掛起
那么對應(yīng)的鎖是會被釋放的,以便于后邊的線程可以申請鎖成功

?鎖的作用

在阻塞隊列中,因為有鎖的存在,無論外部的線程有多少,真正進(jìn)入到我們的阻塞隊列中生產(chǎn)或者消費的線程永遠(yuǎn)只有一個。

生產(chǎn)者消費者模型的高效性

生產(chǎn)者而言,向blockqueue里面放置任務(wù)

對于生產(chǎn)者,他的任務(wù)從哪里來的呢?它獲取任務(wù)和構(gòu)建任務(wù)要不要花時間??

生產(chǎn)者獲取任務(wù)是要花時間的:
生產(chǎn)活動,從數(shù)據(jù)庫?從網(wǎng)絡(luò),從外設(shè)??拿來的用戶數(shù)據(jù)??!

消費者而言,從blockqueue里面拿取任務(wù):


對于消費者,難道它把任務(wù)從任務(wù)隊列中拿出來就完了嗎??
消費者拿到任務(wù)之后,后續(xù)還有沒有任務(wù)??

消費者拿任務(wù)這個過程,也可能非常耗時!!(計算或處理或存儲)。


總結(jié)

可以在生產(chǎn)之前,和消費之后,讓線程并行執(zhí)行。

也就是說高效是體現(xiàn)在進(jìn)入阻塞隊列前的生產(chǎn)者并行的生產(chǎn)數(shù)據(jù),和出阻塞隊列后的消費者并行的消費數(shù)據(jù)。

完整代碼(不含存儲數(shù)據(jù)的線程)

里面含有?BlockQueue.hpp ?MainCp.cc ?Task.hpp(并且由詳細(xì)的注釋)

*********************************
BlockQueue.hpp
*********************************

#pragma once

#include <iostream>
#include <pthread.h>
#include <queue>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <cstdio>

const int gmaxcap = 5;

template <class T>
class BlockQueue
{
public:
   // 構(gòu)造函數(shù) 主要完成阻塞隊列的大小、鎖、條件變量的初始化
    BlockQueue(const int &maxcap = gmaxcap) // 給一個缺省值(默認(rèn)為gmaxcap)
    : _maxcap(maxcap)
    {
        // 都是局部的
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }

    // 向隊列里面放數(shù)據(jù)
    void push(const T &in) // 輸入型參數(shù), const &
    {
        // 加鎖
        pthread_mutex_lock(&_mutex);

        // 1、判斷
        // 細(xì)節(jié)2:充當(dāng)條件判斷的語法必須是while,不能是if
        while (is_full()) // 如果隊列中滿的條件滿足了,就不能生產(chǎn)了
        {
            // 細(xì)節(jié)1:pthread_cond_wait這個函數(shù)的第二個參數(shù),必須是我們正在使用的互斥鎖!

            // 因為如果隊列為滿那么對應(yīng)進(jìn)來的線程將等待(被掛起)
            // 由于自身已經(jīng)擁有鎖的緣故,鎖就不會被釋放了
            // 將第二個參數(shù)的鎖傳過去的作用就是,如果該線程被掛起
            // 那么對應(yīng)的鎖是會被釋放的,以便于后邊的線程可以申請鎖成功

            // a、pthread_cond_wait:該函數(shù)調(diào)用的時候,會以原子性的方式,將鎖釋放,并將自己掛起
            // b、pthread_cond_wait:該函數(shù)在被喚醒返回的時候,會自動的重新獲取你傳入的鎖
            pthread_cond_wait(&_pcond, &_mutex); // 因為我們的生產(chǎn)條件不滿足,無法生產(chǎn),我們的生產(chǎn)者進(jìn)行等待
        }

        // 2、當(dāng)走到這里一定是沒有滿的
        // 將需要放入的數(shù)據(jù)push進(jìn)隊列
        _q.push(in);

        // 3、絕對能保證,阻塞隊列里面一定有數(shù)據(jù),因此將消費者進(jìn)行喚醒
        // 細(xì)節(jié)3:pthread_cond_signal:這個函數(shù),可以放在臨界區(qū)內(nèi)部,也可以放在外部
        pthread_cond_signal(&_ccond); // 這里可以有一定的策略(比如高于多少數(shù)據(jù)再消費等等)
        
        // 解鎖
        pthread_mutex_unlock(&_mutex);
    }


    // 向隊列里面拿數(shù)據(jù)
    void pop(T *out) // 輸出型參數(shù):*, //輸入型參數(shù):&
    {
        // 為保證數(shù)據(jù)的安全,肯定是先加鎖
        pthread_mutex_lock(&_mutex);
        // 1、判斷
        while (is_empty()) // 如果隊列中為空,就不能消費了
        {
            // 由于該函數(shù)可能調(diào)用失敗,所以需要進(jìn)行while循環(huán)以防調(diào)用失敗后就直接向后執(zhí)行了
            pthread_cond_wait(&_ccond, &_mutex);
        }
        // 2、走到這里我們能保證,一定不為空

        // 拿到頭部結(jié)點
        *out = _q.front();

        // 將該數(shù)據(jù)pop掉
        _q.pop();

        // 3、絕對能保證,阻塞隊列里面,至少有一個空的位置,因此將生產(chǎn)者進(jìn)行喚醒
        pthread_cond_signal(&_pcond); // 這里可以有一定的策略(比如低于多少數(shù)據(jù)再生產(chǎn)等等)
        pthread_mutex_unlock(&_mutex);
    }

    // 出作用域由于自動調(diào)用析構(gòu)函數(shù),因此自動銷毀
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);

        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

private:
    // 判斷隊列是否為空
    bool is_empty()
    {
        return _q.empty();
    }

    // 判斷隊列是否為滿
    bool is_full()
    {
        return _q.size() == _maxcap;
    }

    


private:

    // 定義一個隊列
    std::queue<T> _q;

    int _maxcap; // 表示隊列的上限

    pthread_mutex_t _mutex;// 這個阻塞隊列一定是臨界資源(因此需要鎖)

    pthread_cond_t _pcond; // 生產(chǎn)者對應(yīng)的條件變量

    pthread_cond_t _ccond; // 消費者對應(yīng)的條件變量
};



**********************************************
MainCp.cc
**********************************************
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <time.h>

const std::string oper = "+-*/%";

int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;

    case '/':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
    break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
    break;
    default:
        break;
    }

    return result;
}

// 消費線程的消費動作
void *consumer(void *bq_)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(bq_);

    while (true)
    {
        // 消費活動
        // int data;
        Task t;
        bq->pop(&t); // 由于是輸出型參數(shù),因此是可以直接使用變量獲取傳入的變量的
        // std::cout << "消費數(shù)據(jù):" << t._x << "+" << t._y << "=" << t() << std::endl;
        // std::cout << "消費數(shù)據(jù):" << t.getx() << "+" << t.gety() << "=" << t() << std::endl;
        std::cout << "消費任務(wù):" << t() << std::endl;

        // sleep(1);
    }

    return nullptr;
}

// 生產(chǎn)線程的生產(chǎn)動作
void *prodecter(void *bq_)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(bq_);

    while (true)
    {
        // 生產(chǎn)活動
        int x = rand() % 10; // 在這里我們先用隨機數(shù),構(gòu)建一個數(shù)據(jù)
        int y = rand() % 5;

        int operCode = rand() % oper.size();
        // 定義一個Task對象
        Task t(x, y, oper[operCode], mymath);
        bq->push(t);
        std::cout << "生產(chǎn)任務(wù):" << t.toTaskString() << std::endl;
        // sleep(1);
    }

    return nullptr;
}

int main()
{
    // 生成隨機數(shù)種子:srand(time(nullptr));
    srand((unsigned long)time(nullptr) ^ getpid());

    // 為了讓兩個線程看到同一塊資源
    BlockQueue<Task> *bq = new BlockQueue<Task>();

    // 定義線程
    pthread_t c, p;

    // 創(chuàng)建一個消費線程
    pthread_create(&c, nullptr, consumer, bq);
    // 創(chuàng)建一個生產(chǎn)線程
    pthread_create(&p, nullptr, prodecter, bq);

    // 等待線程
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    delete bq;
    return 0;
}


*************************************************
Task.hpp
*************************************************
#pragma once

#include <iostream>
#include <functional>
#include <cstdio>

class Task
{

    using func_t = std::function<int(int, int, char)>;
    // typedef std::function<int(int, int)> func_t;

public:
    Task()
    {
    }

    Task(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _callback(func)
    {
    }

    // 該函數(shù)是消費任務(wù)的打印信息
    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }

// 該函數(shù)是生產(chǎn)任務(wù)的打印信息

    std::string toTaskString()
    {
        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};






完整代碼(含存儲線程)?

里面含有?BlockQueue.hpp ?MainCp.cc ?Task.hpp(并且由詳細(xì)的注釋)

// BlockQueue.hpp

#pragma once

#include <iostream>
#include <pthread.h>
#include <queue>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <cstdio>

const int gmaxcap = 5;

template <class T>
class BlockQueue
{
public:
   // 構(gòu)造函數(shù) 主要完成阻塞隊列的大小、鎖、條件變量的初始化
    BlockQueue(const int &maxcap = gmaxcap) // 給一個缺省值(默認(rèn)為gmaxcap)
    : _maxcap(maxcap)
    {
        // 都是局部的
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }

    // 向隊列里面放數(shù)據(jù)
    void push(const T &in) // 輸入型參數(shù), const &
    {
        // 加鎖
        pthread_mutex_lock(&_mutex);

        // 1、判斷
        // 細(xì)節(jié)2:充當(dāng)條件判斷的語法必須是while,不能是if
        while (is_full()) // 如果隊列中滿的條件滿足了,就不能生產(chǎn)了
        {
            // 細(xì)節(jié)1:pthread_cond_wait這個函數(shù)的第二個參數(shù),必須是我們正在使用的互斥鎖!

            // 因為如果隊列為滿那么對應(yīng)進(jìn)來的線程將等待(被掛起)
            // 由于自身已經(jīng)擁有鎖的緣故,鎖就不會被釋放了
            // 將第二個參數(shù)的鎖傳過去的作用就是,如果該線程被掛起
            // 那么對應(yīng)的鎖是會被釋放的,以便于后邊的線程可以申請鎖成功

            // a、pthread_cond_wait:該函數(shù)調(diào)用的時候,會以原子性的方式,將鎖釋放,并將自己掛起
            // b、pthread_cond_wait:該函數(shù)在被喚醒返回的時候,會自動的重新獲取你傳入的鎖
            pthread_cond_wait(&_pcond, &_mutex); // 因為我們的生產(chǎn)條件不滿足,無法生產(chǎn),我們的生產(chǎn)者進(jìn)行等待
        }

        // 2、當(dāng)走到這里一定是沒有滿的
        // 將需要放入的數(shù)據(jù)push進(jìn)隊列
        _q.push(in);

        // 3、絕對能保證,阻塞隊列里面一定有數(shù)據(jù),因此將消費者進(jìn)行喚醒
        // 細(xì)節(jié)3:pthread_cond_signal:這個函數(shù),可以放在臨界區(qū)內(nèi)部,也可以放在外部
        pthread_cond_signal(&_ccond); // 這里可以有一定的策略(比如高于多少數(shù)據(jù)再消費等等)
        
        // 解鎖
        pthread_mutex_unlock(&_mutex);
    }


    // 向隊列里面拿數(shù)據(jù)
    void pop(T *out) // 輸出型參數(shù):*, //輸入型參數(shù):&
    {
        // 為保證數(shù)據(jù)的安全,肯定是先加鎖
        pthread_mutex_lock(&_mutex);
        // 1、判斷
        while (is_empty()) // 如果隊列中為空,就不能消費了
        {
            // 由于該函數(shù)可能調(diào)用失敗,所以需要進(jìn)行while循環(huán)以防調(diào)用失敗后就直接向后執(zhí)行了
            pthread_cond_wait(&_ccond, &_mutex);
        }
        // 2、走到這里我們能保證,一定不為空

        // 拿到頭部結(jié)點
        *out = _q.front();

        // 將該數(shù)據(jù)pop掉
        _q.pop();

        // 3、絕對能保證,阻塞隊列里面,至少有一個空的位置,因此將生產(chǎn)者進(jìn)行喚醒
        pthread_cond_signal(&_pcond); // 這里可以有一定的策略(比如低于多少數(shù)據(jù)再生產(chǎn)等等)
        pthread_mutex_unlock(&_mutex);
    }

    // 出作用域由于自動調(diào)用析構(gòu)函數(shù),因此自動銷毀
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);

        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

private:
    // 判斷隊列是否為空
    bool is_empty()
    {
        return _q.empty();
    }

    // 判斷隊列是否為滿
    bool is_full()
    {
        return _q.size() == _maxcap;
    }

    


private:

    // 定義一個隊列
    std::queue<T> _q;

    int _maxcap; // 表示隊列的上限

    pthread_mutex_t _mutex;// 這個阻塞隊列一定是臨界資源(因此需要鎖)

    pthread_cond_t _pcond; // 生產(chǎn)者對應(yīng)的條件變量

    pthread_cond_t _ccond; // 消費者對應(yīng)的條件變量
};

// MainCp.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <time.h>

// 創(chuàng)建一個類
// 里面包含了BlockQueue類型的對象
// C:計算
// S:存儲
template <class C, class S>
class BlockQueues
{
public:
    BlockQueue<C> *c_bq;
    BlockQueue<S> *s_bq;
};

// 消費線程的消費動作
void *consumer(void *bq_)
{
    // 先把傳入的參數(shù)強轉(zhuǎn)成BlockQueues<CalTask, SaveTask> *  類型的
    // 然后再指向自己的成員
    BlockQueue<CalTask> *bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bq_))->c_bq;

    BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bq_))->s_bq;

    while (true)
    {
        // 消費活動 
        // int data;
        CalTask t;
        bq->pop(&t);
        std::string result = t(); // 任務(wù)非常耗時??! 也是有可能的
        std::cout << "cal thread, 完成計算任務(wù):" << result << " ... done" << std::endl;

        // // 調(diào)用SaveTask類中有參的構(gòu)造函數(shù)來初始化對象save
        // SaveTask save(result, Save);
        // // 將savepush到save_bq存儲隊列里面
        // save_bq->push(save);

        // std::cout << "cal thread, 推送存儲任務(wù)完成..." << std::endl;

        // sleep(1);
    }

    return nullptr;
}

// 生產(chǎn)線程的生產(chǎn)動作
void *prodecter(void *bq_)
{
    BlockQueue<CalTask> *bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bq_))->c_bq;
    while (true)
    {
        // 生產(chǎn)活動,從數(shù)據(jù)庫?從網(wǎng)絡(luò),從外設(shè)??拿來的用戶數(shù)據(jù)?。?
        // 在這里x不可能為0,因此計算/和%的時候不需要考慮x為0的情況
        int x = rand() % 10 + 1; // 在這里我們先用隨機數(shù),構(gòu)建一個數(shù)據(jù)
        int y = rand() % 5;

        int operCode = rand() % oper.size();
        // 定義一個Task對象
        CalTask t(x, y, oper[operCode], mymath);
        bq->push(t);
        std::cout << "prodecter thread, 生產(chǎn)計算任務(wù):" << t.toTaskString() << std::endl;
        sleep(1);
    }

    return nullptr;
}

void *saver(void *bqs_)
{
    BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;

    while (true)
    {
        SaveTask t;
        save_bq->pop(&t);

        // 仿函數(shù)調(diào)用了Save方法
        // 因此保存了任務(wù)
        t();

        std::cout << "save thread, 保存任務(wù)完成..." << std::endl;
    }
}

int main()
{
    // 生成隨機數(shù)種子:srand(time(nullptr));
    srand((unsigned long)time(nullptr) ^ getpid());

    BlockQueues<CalTask, SaveTask> bqs;

    // 為了讓兩個線程看到同一塊資源
    bqs.c_bq = new BlockQueue<CalTask>();
    bqs.s_bq = new BlockQueue<SaveTask>();

    // 定義線程
    pthread_t c[2], p[3], s;

    // 創(chuàng)建消費線程
    pthread_create(c, nullptr, consumer, &bqs);
    pthread_create(c + 1, nullptr, consumer, &bqs);
    // 創(chuàng)建生產(chǎn)線程
    pthread_create(p, nullptr, prodecter, &bqs);
    pthread_create(p + 1, nullptr, prodecter, &bqs);
    pthread_create(p + 2, nullptr, prodecter, &bqs);
    // 保存一個線程
    // pthread_create(&s, nullptr, saver, &bqs);

    // 等待線程
    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(p[2], nullptr);
    // pthread_join(s, nullptr);

    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

// Task.hpp

#pragma once

#include <iostream>
#include <functional>
#include <string>
#include <cstdio>

const std::string oper = "+-*/%";

int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;

    case '/':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
    break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
    break;
    default:
        break;
    }

    return result;
}

// 計算數(shù)據(jù)的類
class CalTask
{

    using func_t = std::function<int(int, int, char)>;
    // typedef std::function<int(int, int)> func_t;

public:
    CalTask()
    {
    }

    CalTask(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _callback(func)
    {
    }

    // 該函數(shù)是消費任務(wù)的打印信息
    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }

    // 該函數(shù)是生產(chǎn)任務(wù)的打印信息

    std::string toTaskString()
    {
        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

// 存儲數(shù)據(jù)的類
class SaveTask
{
    typedef std::function<void(const std::string &)> func_t;

public:
    SaveTask()
    {
    }

    SaveTask(const std::string &message, func_t func)
        : _message(message), _func(func)
    {
    }

    // 其本質(zhì)就是將任務(wù)存儲到文件當(dāng)中 -- 調(diào)用的是Save方法
    void operator()()
    {
        _func(_message);
    }

private:
    std::string _message;
    func_t _func;
};

void Save(const std::string &message)
{
    const std::string target = "./log.txt";
    // c_str() 函數(shù)可以將 const string* 類型 轉(zhuǎn)化為 const char* 類型
    // 因為在c語言中沒有string類型,必須通過string類對象的成員函數(shù) c_str() 把 string 轉(zhuǎn)換成c中的字符串樣式
    FILE *fp = fopen(target.c_str(), "a+");
    if (!fp)
    {
        std::cerr << "fopen err" << std::endl;
        return;
    }
    fputs(message.c_str(), fp);

    fputs("\n", fp);
    fclose(fp);
}


信號量

1、先發(fā)現(xiàn)我們之前寫的代碼不足的地方。



// 加鎖
        pthread_mutex_lock(&_mutex);

        while (is_full()) // 如果隊列中滿的條件滿足了,就不能生產(chǎn)了
        {

            pthread_cond_wait(&_pcond, &_mutex); // 因為我們的生產(chǎn)條件不滿足,無法生產(chǎn),我們的生產(chǎn)者進(jìn)行等待
        }

       
        _q.push(in);


        pthread_cond_signal(&_ccond); // 這里可以有一定的策略(比如高于多少數(shù)據(jù)再消費等等)
        
        // 解鎖
        pthread_mutex_unlock(&_mutex);

?關(guān)于臨界資源

1.一個線程,在操作臨界資源的時候,必須臨界資源是滿足條件的!
2.可是,公共資源是否滿足生產(chǎn)或者消費條件,我們無法直接得知【我們不能事前得知【在沒有訪問之前,無法得知】】
3.只能先加鎖,再檢測(while),再操作,再解鎖。

因為你要檢測,本質(zhì):也是在訪問臨界資源!

總結(jié)

因為我們在操作臨界資源的時候,有可能不就緒,但是,我們無法提前得知,所以,只能先加鎖,再檢測,根據(jù)檢測結(jié)果,決定下一步怎么走!

只要我們對資源進(jìn)行整體加鎖,就默認(rèn)了,我們對這個資源整體使用。

實際情況可能存在:
一份公共資源,但是允許同時訪問不同的區(qū)域!

無論誰想訪問這一份公共資源 -- 必須先申請信號量!
只要申請信號量成功,那么就一定保證會有一個位置為你預(yù)留。

程序員編碼保證不同的線程可以并發(fā)訪問公共資源的不同區(qū)域!

信號量的概念

a.信號量本質(zhì)是一把計數(shù)器 -- 衡量臨界資源中資源數(shù)量多少的計數(shù)器


b.只要擁有信號量,就在未來一定能夠擁有臨界資源的一部分。

申請信號量的本質(zhì)

對臨界資源中特定小塊資源的 ?預(yù)訂機制 -> 有可能,我們在訪問真正的臨界資源之前,我們其實就可以提前知道臨界資源的使用情況?。?!

申請信號量的情況

只要申請成功,就一定有你的資源。
只要申請失敗,就說明條件不就緒,你只能等!!

因此在申請資源之前就不需要在判斷了?。?!

由于多個線程要訪問臨界資源中的某一區(qū)域,多個線程必須得先看到信號量,那么可以推斷出,信號量本身必須是公共資源。

信號量作為計數(shù)器的操作

sem --; ?--- 申請資源 --- 必須保證操作的原子性? --? P


sem ++; ?--- 歸還資源 --- 必須保證操作的原子性 -- V

信號量核心操作: PV原語(原是原子性,語是語句)

信號量相關(guān)函數(shù)

初始化信號量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

參數(shù):

pshared:0表示線程間共享,非零表示進(jìn)程間共享
value:信號量初始值

銷毀信號量

int sem_destroy(sem_t *sem);


等待信號量

功能:

等待信號量,會將信號量的值減1

int sem_wait(sem_t *sem); //P()


發(fā)布信號量

功能:

發(fā)布信號量,表示資源使用完畢,可以歸還資源了。將信號量值加1。

int sem_post(sem_t *sem);// V()

基于RingQueue的生產(chǎn)者消費者模型

引入環(huán)形隊列

Linux——生產(chǎn)者消費者模型和信號量

?生產(chǎn)和消費在什么情況下可能訪問同一個位置

1.空的時候
2.滿的時候
3.其他情況,生產(chǎn)者和消費者,根本訪問的就是不同的區(qū)域!

?舉個例子

把生產(chǎn)者和消費者比作兩個小朋友在這個循環(huán)隊列里面放蘋果和拿蘋果,一個小朋友只放蘋果,一個小朋友只拿蘋果。

當(dāng)然,倘若盤子里面沒蘋果了,拿蘋果那個小朋友肯定不能再拿了,除了讓放蘋果的小朋友進(jìn)行放蘋果。

倘若這個循環(huán)隊列里面蘋果放滿了,放蘋果的小朋友肯定不能再放了,除了讓拿蘋果的小朋友進(jìn)行拿。

循環(huán)隊列里面中盤子的情況?

?a.盤子全為空
兩個小朋友站在一起:讓誰先運行呢?(生產(chǎn)者)

b.盤子上全都是蘋果(滿)
我們兩個站在一起:讓誰先運行呢?(消費者)

c.其他情況,兩個小朋友在的是不同的位置!

在環(huán)形隊列中,大部分情況下,單生產(chǎn)和單消費是可以并發(fā)執(zhí)行的!只有在滿,或者空的時候,才有互斥和同步問題??! ?

為了完成喚醒隊列cp問題,我們要做的核心工作是什么?

1.你不能超過我
2.我不能把你套一個圈以上
3.我們兩個什么情況會在一起


信號量是用來衡量臨界資源中資源數(shù)量的

1.對于生產(chǎn)者而言,看中的是什么?

隊列中的剩余空間 --- 空間資源定義一個信號量

2.對于消費者而言,看中的是什么?

放入隊列中的數(shù)據(jù)!--- 數(shù)據(jù)資源定義一個信號量

生產(chǎn)者而言

prodocter_sem:10

// 申請成功,你就可以繼續(xù)向下運行
// 申請失敗,當(dāng)前執(zhí)行流,阻塞在申請?zhí)?br> P(producter_sem);


// 從事生產(chǎn)活動 -- 把數(shù)據(jù)放入到隊列中

V(comsumer_sem);?

消費者而言

comsumer_sem: 0


P(comsumer_sem);

// 從事消費活動


V(producter_sem);

總結(jié)?

為滿的時候,生產(chǎn)和消費同時到來

1、生產(chǎn)者無法到臨界區(qū)來,因為它申請信號量無法成功

2、生產(chǎn)者和消費者同時來的時候,一定能保證消費者先消費


未來,生產(chǎn)和消費的位置我們要想清楚

1.其實就是隊列中的下標(biāo)
2.一定是兩個下標(biāo)(生產(chǎn)者一個下標(biāo),消費者一個下標(biāo),他們互不影響)
3.為空或者為滿,下標(biāo)相同

環(huán)形隊列的實現(xiàn)(代碼)

代碼包含main.cc RingQueue.hpp Task.hpp? (里面有詳細(xì)注釋)文章來源地址http://www.zghlxwxcb.cn/news/detail-469398.html

// main.cc 
#include "RingQueue.hpp"
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "Task.hpp"

std::string SelfName()
{
    char name[128];
    snprintf(name, sizeof(name), "thread[0x%x]", pthread_self());
    return name;
}

void *ProductorRoutine(void *rq)
{
    RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);

    while (true)
    {
        // // sleep(3);
        // // version1
        // int data = rand() % 10 + 1;
        // ringqueue->Push(data);
        // std::cout << "生產(chǎn)完成,生產(chǎn)的數(shù)據(jù)是:" << data << std::endl;
        // // sleep(1);

        // version2
        // 構(gòu)建or獲取一個任務(wù)
        int x = rand() % 20;

        int y = rand() % 10;

        char op = oper[rand() % oper.size()];
        Task t(x, y, op, mymath);// 生產(chǎn)是要花費時間的
        // 生產(chǎn)任務(wù)
        ringqueue->Push(t);
        // 輸出提示
        std::cout << SelfName() << ", 生產(chǎn)者派發(fā)了一個任務(wù):" << t.toTaskString() << std::endl;
        // sleep(2);
    }
}
void *ConsumerRoutine(void *rq)
{
    RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);

    while (true)
    {
        // // version1
        // int data;
        // ringqueue->Pop(&data);
        // std::cout << "消費完成,消費的數(shù)據(jù)時:" << data << std::endl;
        // sleep(1);

        // version2
        Task t;
        // 消費任務(wù)
        ringqueue->Pop(&t);

        std::string result = t(); // 消費是要花費時間的

        std::cout << SelfName() << ", 消費者消費了一個任務(wù):" << result << std::endl;
    }
}

int main()
{
    // 埋下隨機數(shù)種子
    srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self());

    // RingQueue<int> *rq = new RingQueue<int >();
    RingQueue<Task> *rq = new RingQueue<Task>();

    // 單生產(chǎn),但消費,多生產(chǎn),多消費 -->
    // 只要保證,最終進(jìn)入臨界區(qū)的是一個生產(chǎn),一個消費就行!

    // 多生產(chǎn),多消費的意義?
    pthread_t p[4], c[8];

    for (int i = 0; i < 4; i++)
    {
        pthread_create(p + i, nullptr, ProductorRoutine, rq);
    }

    for (int i = 0; i < 8; i++)
    {
        pthread_create(c + i, nullptr, ConsumerRoutine, rq);
    }

    for (int i = 0; i < 4; i++)
    {
        pthread_join(p[i], nullptr);
    }

    for (int i = 0; i < 8; i++)
    {
        pthread_join(c[i], nullptr);
    }

    // pthread_create(&p, nullptr, ProductorRoutine, rq);
    // pthread_create(&c, nullptr, ConsumerRoutine, rq);

    // pthread_join(p, nullptr);
    // pthread_join(c, nullptr);

    return 0;
}

// RingQueue.hpp
#pragma once

#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>

static const int gcap = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t &sem)
    {
        // 等待信號量,會將信號量的值減1
        int n = sem_wait(&sem);
        assert(n == 0);
        (void)n;
    }
    void V(sem_t &sem)
    {
        // 發(fā)布信號量,表示資源使用完畢,可以歸還資源了。將信號量值加1。
        int n = sem_post(&sem);
        assert(n == 0);
        (void)n;
    }

public:
    // 信號量的初始化
    RingQueue(const int &cap = gcap)
        : _queue(cap), _cap(cap)
    {
        int n = sem_init(&_spaceSem, 0, _cap);
        assert(n == 0);
        n = sem_init(&_dataSem, 0, 0);
        assert(n == 0);

        _productorStep = _consumerStep = 0;

        pthread_mutex_init(&_pmutex, nullptr);
        pthread_mutex_init(&_cmutex, nullptr);
    }

    void Push(const T &in)
    {

        // ?:先加鎖,后申請信號量,還是先申請信號量,再加鎖?

        // 空間資源--
        P(_spaceSem); // 申請到了空間信號量,意味著,我一定能進(jìn)行正常的生產(chǎn)

        pthread_mutex_lock(&_pmutex);

        // 環(huán)形隊列生產(chǎn)者下標(biāo)++
        _queue[_productorStep++] = in;

        // 將下標(biāo)取模一下,以防超過隊列的長度
        _productorStep %= _cap;

        pthread_mutex_unlock(&_pmutex);

        // 數(shù)據(jù)資源++
        V(_dataSem);
    }

    void Pop(T *out)
    {

        // 數(shù)據(jù)資源--
        P(_dataSem); // 申請到了數(shù)據(jù)信號量,意味著,我一定能進(jìn)行正常的消費

        pthread_mutex_lock(&_cmutex);
        
        // 環(huán)形隊列消費者下標(biāo)++
        *out = _queue[_consumerStep++];

        // 將下標(biāo)取模一下,以防超過隊列的長度
        _consumerStep %= _cap;

        pthread_mutex_unlock(&_cmutex);

        // 空間資源++
        V(_spaceSem);
    }

    ~RingQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);

        // 銷毀鎖
        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }

private:
    std::vector<T> _queue;
    int _cap;
    sem_t _spaceSem; // 生產(chǎn)者 想生產(chǎn),看中的是什么資源呢? 空間資源
    sem_t _dataSem;  // 消費者 想消費, 看中的是什么資源呢? 數(shù)據(jù)資源

    int _productorStep;
    int _consumerStep;

    // 定義鎖變量
    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;
};


// Task.hpp
#pragma once

#include <iostream>
#include <functional>
#include <string>
#include <cstdio>

const std::string oper = "+-*/%";

int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;

    case '/':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
    break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
    break;
    default:
        break;
    }

    return result;
}

// 計算數(shù)據(jù)的類
class Task
{

    using func_t = std::function<int(int, int, char)>;
    // typedef std::function<int(int, int)> func_t;

public:
    Task()
    {
    }

    Task(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _callback(func)
    {
    }

    // 該函數(shù)是消費任務(wù)的打印信息
    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }

    // 該函數(shù)是生產(chǎn)任務(wù)的打印信息

    std::string toTaskString()
    {
        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};



到了這里,關(guān)于Linux——生產(chǎn)者消費者模型和信號量的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【Linux】基于環(huán)形隊列的生產(chǎn)者消費者模型的實現(xiàn)

    【Linux】基于環(huán)形隊列的生產(chǎn)者消費者模型的實現(xiàn)

    文章目錄 前言 一、基于環(huán)形隊列的生產(chǎn)者消費者模型的實現(xiàn) 上一篇文章我們講了信號量的幾個接口和基于環(huán)形隊列的生產(chǎn)者消費者模型,下面我們就快速來實現(xiàn)。 首先我們創(chuàng)建三個文件,分別是makefile,RingQueue.hpp,以及main.cc。我們先簡單搭建一下環(huán)形隊列的框架: 首先我們

    2024年02月11日
    瀏覽(26)
  • 【Linux】生產(chǎn)者消費者模型代碼實現(xiàn)和信號量

    【Linux】生產(chǎn)者消費者模型代碼實現(xiàn)和信號量

    一定要先理解生產(chǎn)者消費者模型的原理~ 文章目錄 一、生產(chǎn)者消費者模型實現(xiàn)代碼 二、信號量 1.基于環(huán)形隊列的生產(chǎn)者消費者模型 總結(jié) 下面我們實現(xiàn)基于阻塞隊列的生產(chǎn)消費模型: 在多線程編程中阻塞隊列 (Blocking Queue) 是一種常用于實現(xiàn)生產(chǎn)者和消費者模型的數(shù)據(jù)結(jié)構(gòu)。其

    2024年02月11日
    瀏覽(20)
  • 【linux】線程同步+基于BlockingQueue的生產(chǎn)者消費者模型

    【linux】線程同步+基于BlockingQueue的生產(chǎn)者消費者模型

    喜歡的點贊,收藏,關(guān)注一下把! 在線程互斥寫了一份搶票的代碼,我們發(fā)現(xiàn)雖然加鎖解決了搶到負(fù)數(shù)票的問題,但是一直都是一個線程在搶票,它錯了嗎,它沒錯但是不合理。那我們應(yīng)該如何安全合理的搶票呢? 講個小故事。 假設(shè)學(xué)校有一個VIP學(xué)霸自習(xí)室,這個自習(xí)室有

    2024年02月03日
    瀏覽(24)
  • Linux之信號量 | 消費者生產(chǎn)者模型的循環(huán)隊列

    Linux之信號量 | 消費者生產(chǎn)者模型的循環(huán)隊列

    目錄 一、信號量 1、概念 2、信號量操作函數(shù) 二、基于環(huán)形隊列的生產(chǎn)者消費者模型 1、模型分析 2、代碼實現(xiàn) 1、單生產(chǎn)單消費的生產(chǎn)者消費者模型 2、多生產(chǎn)多消費的生產(chǎn)者消費者模型 引入:前面我們講到了,對臨界資源進(jìn)行訪問時,為了保證數(shù)據(jù)的一致性,我們需要對臨

    2024年04月17日
    瀏覽(26)
  • 『Linux』第九講:Linux多線程詳解(四)_ 生產(chǎn)者消費者模型

    『Linux』第九講:Linux多線程詳解(四)_ 生產(chǎn)者消費者模型

    「前言」文章是關(guān)于Linux多線程方面的知識,上一篇是?Linux多線程詳解(三),今天這篇是 Linux多線程詳解(四),內(nèi)容大致是生產(chǎn)消費者模型,講解下面開始! 「歸屬專欄」Linux系統(tǒng)編程 「主頁鏈接」個人主頁 「筆者」楓葉先生(fy) 「楓葉先生有點文青病」「每篇一句」

    2024年02月07日
    瀏覽(20)
  • 【Linux】線程同步 -- 條件變量 | 生產(chǎn)者消費者模型 | 自旋鎖 |讀寫鎖

    【Linux】線程同步 -- 條件變量 | 生產(chǎn)者消費者模型 | 自旋鎖 |讀寫鎖

    舉一個例子: 學(xué)生去超市消費的時候,與廠家生產(chǎn)的時候,兩者互不相沖突。 生產(chǎn)的過程與消費的過程 – 解耦 臨時的保存產(chǎn)品的場所(超時) – 緩沖區(qū) 模型總結(jié)“321”原則: 3種關(guān)系:生產(chǎn)者和生產(chǎn)者(互斥),消費者和消費者(互斥),生產(chǎn)者和消費者(互斥[保證共享資

    2024年02月14日
    瀏覽(25)
  • Linux之【多線程】生產(chǎn)者與消費者模型&BlockQueue(阻塞隊列)

    Linux之【多線程】生產(chǎn)者與消費者模型&BlockQueue(阻塞隊列)

    舉個例子:學(xué)生要買東西,一般情況下都會直接聯(lián)系廠商,因為買的商品不多,對于供貨商來說交易成本太高,所以有了交易場所超市這個媒介的存在。目的就是為了集中需求,分發(fā)產(chǎn)品。 消費者與生產(chǎn)者之間通過了超市進(jìn)行交易。當(dāng)生產(chǎn)者不需要的時候,廠商可以繼續(xù)生產(chǎn)

    2024年02月02日
    瀏覽(28)
  • 《Linux從練氣到飛升》No.29 生產(chǎn)者消費者模型

    ??作者: 主頁 我的專欄 C語言從0到1 探秘C++ 數(shù)據(jù)結(jié)構(gòu)從0到1 探秘Linux 菜鳥刷題集 ??歡迎關(guān)注:??點贊??收藏??留言 ?? 碼字不易,你的??點贊??收藏??關(guān)注對我真的很重要,有問題可在評論區(qū)提出,感謝閱讀?。?!

    2024年02月05日
    瀏覽(20)
  • 【Linux】POSIX信號量 | 基于環(huán)形隊列的生產(chǎn)者消費者模型

    【Linux】POSIX信號量 | 基于環(huán)形隊列的生產(chǎn)者消費者模型

    ??? 作者:@阿亮joy. ?? 專欄: 《學(xué)會Linux》 ?? 座右銘:每個優(yōu)秀的人都有一段沉默的時光,那段時光是付出了很多努力卻得不到結(jié)果的日子,我們把它叫做扎根 POSIX 信號量和 SystemV 信號量作用相同,都是用于同步操作,達(dá)到無沖突的訪問共享資源目的。 但 POSIX 可以用于

    2023年04月08日
    瀏覽(43)
  • 線程池-手寫線程池Linux C簡單版本(生產(chǎn)者-消費者模型)

    線程池-手寫線程池Linux C簡單版本(生產(chǎn)者-消費者模型)

    本線程池采用C語言實現(xiàn) 線程池的場景: 當(dāng)某些任務(wù)特別耗時(例如大量的IO讀寫操作),嚴(yán)重影響線程其他的任務(wù)的執(zhí)行,可以使用線程池 線程池的一般特點: 線程池通常是一個生產(chǎn)者-消費者模型 生產(chǎn)者線程用于發(fā)布任務(wù),任務(wù)通常保存在任務(wù)隊列中 線程池作為消費者,

    2024年02月14日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包