国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

C++并發(fā)編程 -3.同步并發(fā)操作

這篇具有很好參考價值的文章主要介紹了C++并發(fā)編程 -3.同步并發(fā)操作。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

本文介紹如何使用條件變量控制并發(fā)的同步操作、C++ 并發(fā)三劍客,函數(shù)式編程

一.條件變量

1.概念

????????C++條件變量(condition variable)是一種多線程編程中常用的同步機制,用于線程間的通信和協(xié)調(diào)。它允許一個或多個線程等待某個條件的發(fā)生,當(dāng)條件滿足時,線程被喚醒并繼續(xù)執(zhí)行。

????????在C++中,條件變量通常與互斥鎖(mutex)一起使用,以確保線程之間的安全訪問共享資源?;コ怄i用于保護共享資源的訪問,而條件變量用于在某個條件滿足時通知等待的線程。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool isReady = false;

void workerThread()
{
    std::unique_lock<std::mutex> lock(mtx);
    while (!isReady)
    {
        cv.wait(lock); // 等待條件變量滿足
    }
    std::cout << "Worker thread is awake!" << std::endl;
}

int main()
{
    std::thread worker(workerThread);

    std::this_thread::sleep_for(std::chrono::seconds(2)); // 模擬一些耗時操作

    {
        std::lock_guard<std::mutex> lock(mtx);
        isReady = true;
        cv.notify_one(); // 喚醒等待的線程
    }
    worker.join();
    return 0;
}

????????主線程創(chuàng)建了一個工作線程,并在工作線程中等待條件變量isReadytrue。主線程在等待一段時間后,將isReady設(shè)置為true,并通過cv.notify_one()喚醒工作線程。工作線程在等待期間調(diào)用cv.wait(lock),這會使線程進入等待狀態(tài),直到條件變量被喚醒。一旦條件滿足,工作線程被喚醒并輸出消息。

? ? ? ? 其中wait參數(shù)有三種使用方式:

wait wait_for wait_until
函數(shù)原型 void wait(std::unique_lock<std::mutex>& lock); template< class Rep, class Period > std::cv_status wait_for(std::unique_lock<std::mutex>& lock, const std::chrono::duration<Rep, Period>& rel_time); template< class Clock, class Duration > std::cv_status wait_until(std::unique_lock<std::mutex>& lock, const std::chrono::time_point<Clock, Duration>& abs_time);
功能 調(diào)用wait釋放鎖,等待notify。被喚醒后加鎖

1.調(diào)用wait釋放鎖,等待notify。被喚醒后加鎖。

2.等待時間超過指定的相對時間,超時后繼續(xù)加鎖。

1.調(diào)用wait釋放鎖,等待notify。被喚醒后加鎖。

2.等待時間超過指定的相對時間,無論是否超時仍加鎖。

3.等待超過指定時間

返回值

等待時間內(nèi)被喚醒,則返回std::cv_status::no_timeout;

如果等待時間超時,則返回std::cv_status::timeout

與waitfor一致
示例
cv.wait(lock, []{ return isDataReady; });
cv.wait_for(lock, std::chrono::seconds(5), []{ return isDataReady; }
    auto timeout = std::chrono::steady_clock::now() + std::chrono::seconds(5);
    cv.wait_until(lock, timeout, []{ return isDataReady; })

2.使用條件變量構(gòu)建線程安全的棧區(qū)

? ? ? ? 回顧

? ? ? ? 在第三章節(jié) -構(gòu)建線程安全的棧區(qū)? 為了避免數(shù)據(jù)競爭,對于pop操作在線程中調(diào)用empty判斷是否為空,如果不為空,則pop,因為empty和pop內(nèi)部分別加鎖,是兩個原子操作,導(dǎo)致pop時可能會因為其他線程提前pop導(dǎo)致隊列為空,從而引發(fā)崩潰。

void push(T new_value)
{
    std::lock_guard<std::mutex> lock(m);
    data.push(std::move(new_value));
}

T pop()
{
    std::lock_guard<std::mutex> lock(m);
    auto element = data.top();
    data.pop();
    return element;
}

threadsafe_stack1<int> safe_stack;
safe_stack.push(1);
std::thread t1([&safe_stack]() 
{
    if (!safe_stack.empty()) 
    {
       std::this_thread::sleep_for(std::chrono::seconds(1));
       safe_stack.pop();
    }
});
std::thread t2([&safe_stack]() 
{
    if (!safe_stack.empty()) 
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        safe_stack.pop();
    }
});

????????????????C++并發(fā)編程 -3.同步并發(fā)操作,C++,c++,開發(fā)語言,linux,服務(wù)器,后端?

????????通過添加《傳入引用》或《返回彈出元素的指針》來解決上面異常。但是兩種方式都存在一定的弊端:

????????傳入引用弊端是需要在函數(shù)外部構(gòu)建臨時變量影響效率且需要拋出空棧。

????????返回智能指針異常下會導(dǎo)致空指針產(chǎn)生。

????????這么都不是很友好,所以我們可以通過條件變量完善之前的程序,重新實現(xiàn)一個線程安全隊列。

#include <iostream>
#include <thread>
#include <stack>
#include <memory>
#include <mutex>
#include <condition_variable>

using namespace std;

template<typename T>
class threadQueue
{
public:
    threadQueue(): mutx(), condVar(), stackQueue()
    {};
    ~threadQueue(){};

    void push(T value)
    {
        lock_guard<mutex> lk(mutx);
        stackQueue.push(value);
        condVar.notify_one();
    }

    void waitAndpop(T &stackQueue_)
    {
        unique_lock<mutex> lk(mutx);
        /*
           wait函數(shù)第二個參數(shù)確保隊列有數(shù)據(jù)才進行條件變量后續(xù)操作,相當(dāng)于增加一層判斷,更加準(zhǔn)確
        */
        condVar.wait(lk, [this](){return !stackQueue.empty();});
        stackQueue_ = stackQueue.top();
        stackQueue.pop();
    }
    shared_ptr<T> waitAndPop()
    {
        unique_lock<mutex> lk(mutx);
        condVar.wait(lk, [this](){return !stackQueue.empty();});
        shared_ptr<T> res = make_shared<T>(stackQueue.top());
        stackQueue.pop();
        return res;
    }

    bool tryPop(T &value)
    {
        lock_guard<mutex> lk(mutx);
            if(stackQueue.empty()){return false;}
        value = stackQueue.top();
        stackQueue.pop();
        return true;
    }
    shared_ptr<T> tryPop()
    {
        lock_guard<mutex> lk(mutx);
        if (stackQueue.empty())
        	return std::shared_ptr<T>();
        shared_ptr<T> res = make_shared<T>(stackQueue.top());
        stackQueue.pop();
        return res;
    }

private:
    mutable mutex mutx;
    condition_variable condVar;
    stack<T> stackQueue;
};

mutex mutxThread;

template<typename T>
void funcProducer(threadQueue<T> &thdQueue)
{
    for(;;)
    {
        for(size_t i = 0; i < __LONG_MAX__; i++)
        {
            thdQueue.push(i);
            lock_guard<mutex> lk(mutxThread);
            cout<<"funcProducer:"<<i<<endl;
            this_thread::sleep_for(chrono::milliseconds(100));
        }
    }
}

template<typename T>
void funcWorker1(threadQueue<T> &thdQueue)
{
    for(;;)
    {
        auto data = thdQueue.waitAndPop();

        lock_guard<mutex> lk(mutxThread);
        cout<<"funcWorker1 waitAndpop:"<<*(data)<<endl;
        // this_thread::sleep_for(chrono::milliseconds(500));
    }
}

template<typename T>
void funcWorker2(threadQueue<T> &thdQueue)
{
    for(;;)
    {
        auto data = thdQueue.tryPop();
        if(data != nullptr)
        {
            lock_guard<mutex> lk(mutxThread);
            cout<<"funcWorker2 waitAndpop:"<<*(data)<<endl;
        }
        // this_thread::sleep_for(chrono::milliseconds(500));
    }
}

int main()
{
    threadQueue<int> thdQueue;

    thread t1(funcProducer<int>, ref(thdQueue));
    thread t2(funcWorker1<int>, ref(thdQueue));
    thread t3(funcWorker2<int>, ref(thdQueue));

    t1.join();
    t2.join();
    t3.join();
}

C++并發(fā)編程 -3.同步并發(fā)操作,C++,c++,開發(fā)語言,linux,服務(wù)器,后端C++并發(fā)編程 -3.同步并發(fā)操作,C++,c++,開發(fā)語言,linux,服務(wù)器,后端

二.C++ 并發(fā)三劍客

? ? ? ? 簡單介紹一下future, promise和async三者之間的關(guān)系。其具體實現(xiàn)請看本章節(jié)第四小節(jié)

C++并發(fā)編程 -3.同步并發(fā)操作,C++,c++,開發(fā)語言,linux,服務(wù)器,后端

1.async

????????std::async 是一個用于異步執(zhí)行函數(shù)的模板函數(shù),它返回一個 std::future 對象,該對象用于獲取函數(shù)的返回值。其主要應(yīng)用于I/O密集型任務(wù),如網(wǎng)絡(luò)請求或文件讀寫,其中操作可能需要等待,但CPU不需要做太多工作.(thread適用于CPU密集型任務(wù))

? ? ? ? async可選填參數(shù):

? ? ? ? 指定launch::async:表示任務(wù)執(zhí)行在另一線程。

? ? ? ? 指定launch::deferred:表示延遲執(zhí)行任務(wù),調(diào)用get或者wait時才會執(zhí)行,不會創(chuàng)建線程,惰性執(zhí)行在當(dāng)前線程。

? ? ? ? 不填寫或同時指定:屬于未定義行為,編譯器會根據(jù)實際情況決定采用哪種策略(通常創(chuàng)建線程)

1.1 async異步任務(wù)

? ? ? ? async啟用異步執(zhí)行后,在其內(nèi)部會創(chuàng)建一個線程,任務(wù)完成后線程自動銷毀,不需要join等操作。


#include <iostream>
#include <thread>
#include <memory>
#include <future>

using namespace std;

void func()
{
    int i = 0;
    for(;;)
    {
        cout<<"func"<<endl;
        this_thread::sleep_for(chrono::seconds(1));
        if(++i == 10) {break;}
    }
    cout<<" thread end"<<endl;
}

int main()
{
    std::future<void> the_answer=std::async(launch::async, func);//deferred

    for(int i = 0; i < 15; ++i)
    {
        cout<<"main run"<<endl;
        this_thread::sleep_for(chrono::seconds(1));
    }

    return 1;
}

? 剛運行時可以看到存在兩個線程

C++并發(fā)編程 -3.同步并發(fā)操作,C++,c++,開發(fā)語言,linux,服務(wù)器,后端

? 線程退出后:

C++并發(fā)編程 -3.同步并發(fā)操作,C++,c++,開發(fā)語言,linux,服務(wù)器,后端

1.2 async延時執(zhí)行

? ? ? ? 當(dāng)指定launch::deferred表示延遲執(zhí)行任務(wù),調(diào)用get或者wait時才會在調(diào)用線程中執(zhí)行。? ? ? ? ?????????std::future::get() 和 std::future::wait() 是 C++ 中用于處理異步任務(wù)的兩個方法.

1.2.1?future::get()

? ? ? ? 這是一個阻塞調(diào)用,用于獲取 std::future 對象表示的值或異常。如果異步任務(wù)還沒有完成,get() 會阻塞當(dāng)前線程,直到任務(wù)完成。如果任務(wù)已經(jīng)完成,get() 會立即返回任務(wù)的結(jié)果。get() 只能調(diào)用一次,因為它會移動或消耗掉 std::future 對象的狀態(tài)。一旦 get() 被調(diào)用,std::future 對象就不能再被用來獲取結(jié)果。如果調(diào)用兩次,會出現(xiàn)如下結(jié)果:

#include <iostream>
#include <thread>
#include <memory>
#include <future>

using namespace std;

void func()
{
    cout<<"func"<<endl;
}

int main()
{
    std::future<void> the_answer=std::async(launch::deferred, func);//deferred

    cout<<"main run"<<endl;
    this_thread::sleep_for(chrono::seconds(1));
    the_answer.get();
    the_answer.get();

    return 1;
}

????????????????C++并發(fā)編程 -3.同步并發(fā)操作,C++,c++,開發(fā)語言,linux,服務(wù)器,后端?

? ? ? ? 為避免多次調(diào)用get造成異常,盡量使用wait

1.2.2?future::wait()

????????std::future::wait() 也是一個阻塞調(diào)用,但它與 get() 的主要區(qū)別在于 wait() 不會返回任務(wù)的結(jié)果。它只是等待異步任務(wù)完成。如果任務(wù)已經(jīng)完成,wait() 會立即返回。如果任務(wù)還沒有完成,wait() 會阻塞當(dāng)前線程,直到任務(wù)完成。與 get() 不同,wait() 可以被多次調(diào)用,它不會消耗掉 std::future 對象的狀態(tài)。

#include <iostream>
#include <thread>
#include <memory>
#include <future>

using namespace std;

int func()
{
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    cout<<"func"<<endl;
    return 10;
}

int main()
{
    future<int> the_answer = async(launch::deferred, func);
    cout<<"main run"<<endl;
    
    auto start = std::chrono::high_resolution_clock::now(); 
    the_answer.wait();
    the_answer.wait();
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start); 
    cout << "the_answer.wait run time: " << duration.count() << " ms" << endl;

    return 1;
}

????????C++并發(fā)編程 -3.同步并發(fā)操作,C++,c++,開發(fā)語言,linux,服務(wù)器,后端?

可以看到只運行了1000ms, 當(dāng)?shù)谝粋€wait運行結(jié)束表示任務(wù)執(zhí)行結(jié)束,而第二個wait會立馬返回。

wait get
阻塞線程,知道任務(wù)運行完畢 阻塞線程,知道任務(wù)運行完畢
可多次調(diào)用,當(dāng)任務(wù)執(zhí)行完畢后wait將不在起作用,會立馬返回。 不可多次調(diào)用,異常
執(zhí)行任務(wù),不獲取結(jié)果 執(zhí)行任務(wù),獲取結(jié)果
  • ?也可以通過wait_for(std::chrono::seconds(0)(要求任務(wù)立馬返回),判斷任務(wù)是否已經(jīng)完成
if (the_answer.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
   cout<<"task already over"<<endl;
   return 0;
}
1.3?async使用注意事項
1.3.1 警惕未接受返回值阻塞
#include <iostream>
#include <thread>
#include <memory>
#include <future>

int main()
{
    cout<<"main start"<<endl;
    {
        async(launch::async, [](){
            cout<<"start run async"<<endl;
            this_thread::sleep_for(chrono::milliseconds(1000));
            cout<<"end run async"<<endl;
        });
    }
    cout<<"main end"<<endl;
}

????????C++并發(fā)編程 -3.同步并發(fā)操作,C++,c++,開發(fā)語言,linux,服務(wù)器,后端?

? ? ? ? 因為async返回一個右值類型的future,無論左值還是右值,future都要被析構(gòu),因為其處于一個局部作用域{}中。 當(dāng)編譯器執(zhí)行到}時會觸發(fā)future析構(gòu)。但是future析構(gòu)要保證其關(guān)聯(lián)的任務(wù)完成,所以需要等待任務(wù)完成future才被析構(gòu), 所以也就成了串行的效果了。

2.promise和packaged_task

? ? ? ? promise和packaged_task都是C++中的異步編程工具,用于在多線程環(huán)境下進行任務(wù)的分離和結(jié)果的傳遞。

? ? ? ? promise是一種用于在一個線程中產(chǎn)生結(jié)果,并在另一個線程中獲取結(jié)果的機制。它允許一個線程(稱為"提供者")在某個時間點設(shè)置一個值或異常,而另一個線程(稱為"消費者")在需要時獲取該值或異常。

????????packaged_task是一種將可調(diào)用對象(如函數(shù)、函數(shù)對象或Lambda表達式)封裝為一個可以異步執(zhí)行的任務(wù)的機制。

? ? ? ? 兩者都是與future結(jié)合使用,通過future獲取結(jié)果或者值。

2.1.future與packaged_task

????????packaged_task是一個可調(diào)用目標(biāo),它包裝了一個任務(wù),該任務(wù)可以在另一個線程上運行。它可以捕獲任務(wù)的返回值或異常,并將其存儲在future對象中,以便以后使用。

? ? ? ? 使用future和packaged_task通常的做法是:

1)? 創(chuàng)建一個std::packaged_task對象,該對象包裝了要執(zhí)行的任務(wù)。

funciton my_task
packaged_task<int()> task(my_task);

2)? 調(diào)用packaged_task對象的get_future()方法,該方法返回一個與任務(wù)關(guān)聯(lián)的future對象。

future<int> result = task.get_future();

3)? 在另一個線程上調(diào)用std::packaged_task對象的operator(),以執(zhí)行任務(wù)。

 thread t(move(task));

4)? ?在需要任務(wù)結(jié)果的地方,調(diào)用與任務(wù)關(guān)聯(lián)的future對象的get()方法,以獲取任務(wù)的返回值或異常。

int value = result.get();

示例:

#include <iostream>
#include <thread>
#include <memory>
#include <future>

using namespace std;

int my_task() {
    this_thread::sleep_for( chrono::seconds(5));
    cout << "my task run 5 s" <<  endl;
    return 42;
}
int main()
{
    // 創(chuàng)建一個包裝了任務(wù)的  packaged_task 對象  
    packaged_task<int()> task(my_task);
    // 獲取與任務(wù)關(guān)聯(lián)的  future 對象  
    future<int> result = task.get_future();
    // cout<<"task.get_future()"<<task.get_future().get()<<endl;
    // 在另一個線程上執(zhí)行任務(wù)  
    thread t(move(task));
    t.detach(); // 將線程與主線程分離,以便主線程可以等待任務(wù)完成  
    // 等待任務(wù)完成并獲取結(jié)果  
    cout << "will get result"<<endl;
    int value = result.get();
    cout << "The result is: " << value <<  endl;

}

?????????????????????????C++并發(fā)編程 -3.同步并發(fā)操作,C++,c++,開發(fā)語言,linux,服務(wù)器,后端

????????在上面的示例中,我們創(chuàng)建了一個包裝了任務(wù)的std::packaged_task對象,并獲取了與任務(wù)關(guān)聯(lián)的std::future對象。然后,我們在另一個線程上執(zhí)行任務(wù),并等待任務(wù)完成并獲取結(jié)果。最后,我們輸出結(jié)果。

2.2.future與promise
2.2.1 設(shè)置值

? ? ? ??packaged_task主要是異步過程中任務(wù),promise在異步過程中處理數(shù)值。

#include <iostream>
#include <thread>
#include <future>

void set_value(std::promise<int>& prom) {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    // 設(shè)置結(jié)果為42
    prom.set_value(42);
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();
    
    std::thread t(set_value, std::ref(prom));
    // 在主線程中等待結(jié)果
    int result = fut.get();
    std::cout << "Result: " << result << std::endl;
    t.join();
    
    return 0;
}

????????值得注意的是在調(diào)用fut.get()方法時,如果promise的值還沒有被設(shè)置,則該方法會阻塞當(dāng)前線程,直到值被設(shè)置為止。

2.2.2 設(shè)置異常

????????promise還有一個set_exception()方法,用于設(shè)置異常。該方法接受std::exception_ptr參數(shù),該參數(shù)可以通過調(diào)用std::current_exception()方法獲取。


#include <iostream>
#include <thread>
#include <future>

void set_exception(std::promise<int>& prom) {
    try {
        // 模擬一些可能拋出異常的操作
        throw std::runtime_error("Something went wrong");
    } catch (...) {
        // 捕獲異常并設(shè)置到 promise 中
        prom.set_exception(std::current_exception());
    }
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();
    
    std::thread t(set_exception, std::ref(prom));
    
    try {
        // 在主線程中等待結(jié)果
        int result = fut.get();
        std::cout << "Result: " << result << std::endl;
    } catch (const std::exception& e) {
        std::cout << "Exception caught: " << e.what() << std::endl;
    }
    
    t.join();
    
    return 0;
}

????????set_exception 函數(shù)中,我們模擬了一些可能拋出異常的操作,并使用prom.set_exception (std::current_exception()) 將當(dāng)前捕獲的異常設(shè)置到 promise 中。在主線程中,我們使用 fut.get() 等待并獲取結(jié)果。如果在另一個線程中發(fā)生了異常,我們可以在主線程中使用 try-catch 塊捕獲并處理異常。

3?future與shared_future

? ? ? ? 當(dāng)需要多個線程等待同一個執(zhí)行結(jié)果時,需要使用std::shared_future。

????????適合使用std::shared_future的場景,多個線程等待同一個異步操作的結(jié)果:

#include <iostream>
#include <thread>
#include <future>

void myFunction(std::promise<int>&& promise) 
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    promise.set_value(42); // 設(shè)置 promise 的值
}
void threadFunction(std::shared_future<int> future) 
{
    try {
        int result = future.get();
        std::cout << "Result: " << result<<"this thread id:"<<std::this_thread::get_id() << std::endl;
    }
    catch (const std::future_error& e) {
        std::cout << "Future error: " << e.what() << std::endl;
    }
}
int main() 
{
    std::promise<int> promise;
    std::shared_future<int> future = promise.get_future();
    std::thread myThread1(myFunction, std::move(promise)); // 將 promise 移動到線程中
    // 使用 share() 方法獲取新的 shared_future 對象  
    std::thread myThread2(threadFunction, future);
    std::thread myThread3(threadFunction, future);
    myThread1.join();
    myThread2.join();
    myThread3.join();

    return 0;
}

?????????C++并發(fā)編程 -3.同步并發(fā)操作,C++,c++,開發(fā)語言,linux,服務(wù)器,后端

? ? ? ? 在這個示例中,創(chuàng)建了一個std::promise<int>對象promise和一個與之關(guān)聯(lián)的std::shared_future<int>對象future。然后,我們將promise對象移動到另一個線程myThread1中,該線程將執(zhí)行myFunction函數(shù),并在完成后設(shè)置promise的值。我們還創(chuàng)建了兩個線程myThread2myThread3,它們將等待future對象的結(jié)果。如果myThread1成功地設(shè)置了promise的值,那么future.get()將返回該值。這些線程可以同時訪問和等待future對象的結(jié)果,而不會相互干擾。?

4. 自頂向下實現(xiàn)

4.1? 用packaged_task實現(xiàn)async

? ? ? ? async異步任務(wù)相當(dāng)于創(chuàng)建線程,不再予以實現(xiàn)。

? ? ? ? async延時執(zhí)行示例如下:

#include <iostream>
#include <thread>
#include <future>
#include <functional>

using Func = std::function<int(int)>;

std::future<int> myAsync(Func task, int i)
{
    std::packaged_task<int(int)> packaged(task);
    std::future<int> fu = packaged.get_future();

    std::thread t(std::move(packaged), i);
    t.detach();
    return fu;
}

int main()
{
    auto task = [](int i) { std::this_thread::sleep_for(std::chrono::seconds(1)); return i+100; };

    std::future<int> f = myAsync(task, 5);
    std::cout << f.get() << std::endl;
    return 0;
}

? ? ? ? ?async指定deferred 表示延時進行,其內(nèi)部與上述內(nèi)部實現(xiàn)類似,先使用packaged打包需要執(zhí)行的任務(wù),然后將任務(wù)與future綁定,將任務(wù)move到線程中,執(zhí)行完畢后通過future.get獲取任務(wù)執(zhí)行結(jié)果。如此便滿足第二章節(jié)圖片內(nèi)容結(jié)構(gòu)。

4.2? 使用promise實現(xiàn)packaged_task

? ? ? ? ? 通過promise設(shè)置值來完成packaged_task反饋。

#include <iostream>
#include <thread>
#include <future>
#include <functional>

//typename關(guān)鍵字在C++中的主要作用是告訴編譯器某個名稱是一個類型而不是變量或函數(shù)。
template<typename Type, typename... Args>
class myPackTask
{
public:
    myPackTask(std::function<Type(Args...)> _func):func_(_func){}

    std::future<Type> get_future()
    {
        return promise_.get_future();
    }

    void operator()(Args... args)
    {
        Type result = func_(args...);
        promise_.set_value(result);
    }

private:
    std::function<Type(Args...)> func_;
    std::promise<Type> promise_;
};

int add(int a, int b) {
    return a + b;
}

int main() {
    myPackTask<int, int, int> task(add);
    std::future<int> future = task.get_future(); 

    std::thread thread(std::move(task), 2, 3);  
    int result = future.get();  
    std::cout << "Result: " << result << std::endl;

    thread.join();  // 等待線程結(jié)束

    return 0;
}
?4.3? 實現(xiàn)promise

? ? ? ? 其主要思想就是通過條件變量通知get所在線程調(diào)用的地方。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

template<typename T>
class Promise {
public:
    Promise() : value(nullptr), ready(false) {}

    void set(const T& val) {
        std::unique_lock<std::mutex> lock(mutex);
        value = std::make_shared<T>(val);
        ready = true;
        condition.notify_all();
    }

    T get() {
        std::unique_lock<std::mutex> lock(mutex);
        condition.wait(lock, [this] { return ready; });
        return *value;
    }

private:
    std::shared_ptr<T> value;
    bool ready;
    std::mutex mutex;
    std::condition_variable condition;
};

int main() {
    Promise<int> promise;

    std::thread producer([&promise]() {
        std::this_thread::sleep_for(std::chrono::seconds(2));
        promise.set(42);
    });

    std::thread consumer([&promise]() {
        int result = promise.get();
        std::cout << "Got value: " << result << std::endl;
    });

    producer.join();
    consumer.join();

    return 0;
}

三.并行與函數(shù)式編程

以下內(nèi)容摘自戀戀風(fēng)塵,博主講解C++并發(fā)非常不錯,力薦

1 快速排序

? ? ? ? 先使用C++ 設(shè)計一個快速排序方法

#include <iostream>
#include <thread>
#include <memory>
#include <future>

template<typename T>
void quick_sort_recursive(T arr[], int start, int end) {
    if (start >= end) return;
    T key = arr[start];
    int left = start, right = end;
    while(left < right) {
        while (arr[right] >= key && left < right) right--;
        while (arr[left] <= key && left < right) left++;
        std::swap(arr[left], arr[right]);
    }
    if (arr[left] < key) {
        std::swap(arr[left], arr[start]);
    }
    quick_sort_recursive(arr, start, left - 1);
    quick_sort_recursive(arr, left + 1, end);
}
template<typename T>
void quick_sort(T arr[], int len) {
    quick_sort_recursive(arr, 0, len - 1);
}

int main() {
    int num_arr[] = { 5,3,7,6,4,1,0,2,9,10,8 };
    int length = sizeof(num_arr) / sizeof(int);
     quick_sort(num_arr, length );
    std::cout << "sorted result is ";
    for (int i = 0; i < length; i++) {
        std::cout << " " << num_arr[i];
    }
    std::cout << std::endl;    
}

2 使用函數(shù)式編程快速排序

????????函數(shù)式編程可以被看作是將函數(shù)視為數(shù)學(xué)中的公式。在函數(shù)式編程中,函數(shù)被視為一種純粹的數(shù)學(xué)映射,它接受輸入并產(chǎn)生輸出,沒有副作用或可變狀態(tài)。這種思想源于數(shù)學(xué)中的函數(shù)概念,其中函數(shù)被定義為將輸入映射到輸出的關(guān)系。

? ? ? ? 在第一小節(jié)實現(xiàn)快速排序是一種面向過程的編程方式,我們可以通過定義一個函數(shù)來實現(xiàn)快速排序算法。這個函數(shù)接受一個數(shù)組作為輸入,并通過一系列的步驟來實現(xiàn)快速排序的邏輯。這些步驟包括選擇基準(zhǔn)元素、劃分數(shù)組、遞歸調(diào)用等。

? ? ? ? 而使用函數(shù)式編程,強調(diào)使用純函數(shù)來構(gòu)建程序,通過函數(shù)的組合和轉(zhuǎn)換來處理數(shù)據(jù)。函數(shù)式編程避免使用可變狀態(tài)和共享狀態(tài)。

#include <iostream>
#include <thread>
#include <memory>
#include <future>
#include <list>
#include <algorithm>

template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
    if (input.empty())
    {
        return input;
    }
    std::list<T> result;
    //  ① 將input中的第一個元素放入result中,并且將這第一個元素從input中刪除
    result.splice(result.begin(), input, input.begin());  
    //  ② 取result的第一個元素,將來用這個元素做切割,切割input中的列表。
    T const& pivot = *result.begin();    
    //  ③std::partition 是一個標(biāo)準(zhǔn)庫函數(shù),用于將容器或數(shù)組中的元素按照指定的條件進行分區(qū),
    // 使得滿足條件的元素排在不滿足條件的元素之前。
    // 所以經(jīng)過計算divide_point指向的是input中第一個大于等于pivot的元素
        auto divide_point = std::partition(input.begin(), input.end(),
            [&](T const& t) {return t < pivot; });    
    // ④ 我們將小于pivot的元素放入lower_part中
    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input, input.begin(),
        divide_point);  
    // ⑤我們將lower_part傳遞給sequential_quick_sort 返回一個新的有序的從小到大的序列
    //lower_part 中都是小于divide_point的值
        auto new_lower(
            sequential_quick_sort(std::move(lower_part)));    
    // ⑥我們剩余的input列表傳遞給sequential_quick_sort遞歸調(diào)用,input中都是大于divide_point的值。
        auto new_higher(
            sequential_quick_sort(std::move(input)));    
        //⑦到此時new_higher和new_lower都是從小到大排序好的列表
        //將new_higher 拼接到result的尾部
        result.splice(result.end(), new_higher);    
        //將new_lower 拼接到result的頭部
        result.splice(result.begin(), new_lower);   
        return result;
}

int main() {
    std::list<int> numlists = { 6,1,0,7,5,2,9,-1 };
    auto sort_result = sequential_quick_sort(numlists);
    std::cout << "sorted result is ";
    for (auto iter = sort_result.begin(); iter != sort_result.end(); iter++) {
        std::cout << " " << (*iter);
    }
    std::cout << std::endl;
}

3 并行處理快速排序

? ? ? ? 在快速排序中,我們以某個數(shù)字作為基準(zhǔn)(一般開頭), 分為前半部分和后半部分,然后前半部分和后半部分單獨計算。?

? ? ? ? 并行排序的思想是將前半部分(或后半部分)單獨拷貝一份,置于async中(不指定啟動方式,編譯器會根據(jù)計算次數(shù)決定什么時候異步執(zhí)行什么時候串行),進行并行計算文章來源地址http://www.zghlxwxcb.cn/news/detail-834591.html

#include <iostream>
#include <thread>
#include <memory>
#include <future>
#include <list>
#include <algorithm>

template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
    if (input.empty())
    {
        return input;
    }
    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    T const& pivot = *result.begin();
    auto divide_point = std::partition(input.begin(), input.end(),
        [&](T const& t) {return t < pivot; });
    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input, input.begin(),
        divide_point);
    // ①因為lower_part是副本,所以并行操作不會引發(fā)邏輯錯誤,這里可以啟動future做排序
    std::future<std::list<T>> new_lower(
        std::async(&parallel_quick_sort<T>, std::move(lower_part)));
    // ②
    auto new_higher(
        parallel_quick_sort(std::move(input)));    
        result.splice(result.end(), new_higher);    
        result.splice(result.begin(), new_lower.get());    
        return result;
}

int main() 
{
    std::list<int> numlists = { 6,1,0,7,5,2,9,-1 };
    auto sort_result = parallel_quick_sort(numlists);
    std::cout << "sorted result is ";
    for (auto iter = sort_result.begin(); iter != sort_result.end(); iter++) {
        std::cout << " " << (*iter);
    }
    std::cout << std::endl;
}

到了這里,關(guān)于C++并發(fā)編程 -3.同步并發(fā)操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 基于linux下的高并發(fā)服務(wù)器開發(fā)(第一章)- 目錄操作函數(shù)

    基于linux下的高并發(fā)服務(wù)器開發(fā)(第一章)- 目錄操作函數(shù)

    ?(1)int mkdir(const char* pathname,mode_t mode); #include sys/stat.h #include sys/types.h int mkdir(const char *pathname, mode_t mode); ?? ?作用:創(chuàng)建一個目錄 ?? ?參數(shù): ?? ??? ? pathname: 創(chuàng)建的目錄的路徑 ?? ??? ?mode: 權(quán)限,八進制的數(shù) ?? ?返回值: ?? ??? ? 成功返回0, 失敗返回-1 ?(

    2024年02月16日
    瀏覽(27)
  • C++并發(fā)操作解密:輕松搞定數(shù)據(jù)同步

    C++并發(fā)操作解密:輕松搞定數(shù)據(jù)同步

    ? 概述: 在C++中,通過互斥鎖解決并發(fā)數(shù)據(jù)同步問題。定義共享數(shù)據(jù)和互斥鎖,編寫線程函數(shù),使用互斥鎖確保操作的原子性。主函數(shù)中創(chuàng)建并啟動線程,保障線程安全。實例源代碼演示了簡單而有效的同步機制。 在C++中解決并發(fā)操作時的數(shù)據(jù)同步問題通常需要使用互斥鎖

    2024年02月04日
    瀏覽(21)
  • linux并發(fā)服務(wù)器 —— linux網(wǎng)絡(luò)編程(七)

    linux并發(fā)服務(wù)器 —— linux網(wǎng)絡(luò)編程(七)

    C/S結(jié)構(gòu) - 客戶機/服務(wù)器;采用兩層結(jié)構(gòu),服務(wù)器負責(zé)數(shù)據(jù)的管理,客戶機負責(zé)完成與用戶的交互;C/S結(jié)構(gòu)中,服務(wù)器 - 后臺服務(wù),客戶機 - 前臺功能; 優(yōu)點 1. 充分發(fā)揮客戶端PC處理能力,先在客戶端處理再提交服務(wù)器,響應(yīng)速度快; 2. 操作界面好看,滿足個性化需求; 3.

    2024年02月09日
    瀏覽(23)
  • C++并發(fā)編程 | 原子操作std::atomic

    C++并發(fā)編程 | 原子操作std::atomic

    目錄 1、原子操作std::atomic相關(guān)概念 2、不加鎖情況 3、加鎖情況 ?4、原子操作 5、總結(jié) 原子操作: 更小的代碼片段,并且該片段必定是連續(xù)執(zhí)行的,不可分割。 1.1 原子操作std::atomic與互斥量的區(qū)別 1) 互斥量 :類模板,保護一段共享代碼段,可以是一段代碼,也可以是一個

    2023年04月26日
    瀏覽(25)
  • Linux學(xué)習(xí)之網(wǎng)絡(luò)編程3(高并發(fā)服務(wù)器)

    Linux學(xué)習(xí)之網(wǎng)絡(luò)編程3(高并發(fā)服務(wù)器)

    Linux網(wǎng)絡(luò)編程我是看視頻學(xué)的,Linux網(wǎng)絡(luò)編程,看完這個視頻大概網(wǎng)絡(luò)編程的基礎(chǔ)差不多就掌握了。這個系列是我看這個Linux網(wǎng)絡(luò)編程視頻寫的筆記總結(jié)。 問題: 根據(jù)上一個筆記,我們可以寫出一個簡單的服務(wù)端和客戶端通信,但是我們發(fā)現(xiàn)一個問題——服務(wù)器只能連接一個

    2024年02月01日
    瀏覽(28)
  • Linux網(wǎng)絡(luò)編程:多進程 多線程_并發(fā)服務(wù)器

    文章目錄: 一:wrap常用函數(shù)封裝 wrap.h? wrap.c server.c封裝實現(xiàn) client.c封裝實現(xiàn) 二:多進程process并發(fā)服務(wù)器 server.c服務(wù)器 實現(xiàn)思路 代碼邏輯? client.c客戶端 三:多線程thread并發(fā)服務(wù)器 server.c服務(wù)器 實現(xiàn)思路 代碼邏輯? client.c客戶端 ???? ??read 函數(shù)的返回值 wrap.h? wrap

    2024年02月12日
    瀏覽(31)
  • 多進程并發(fā)TCP服務(wù)器模型(含客戶端)(網(wǎng)絡(luò)編程 C語言實現(xiàn))

    摘要 :大家都知道不同pc間的通信需要用到套接字sockte來實現(xiàn),但是服務(wù)器一次只能收到一個客戶端發(fā)來的消息,所以為了能讓服務(wù)器可以接收多個客戶端的連接與消息的傳遞,我們就引入了多進程并發(fā)這樣一個概念。聽名字就可以知道--需要用到進程,當(dāng)然也有多線程并發(fā)

    2024年02月17日
    瀏覽(104)
  • 【Linux網(wǎng)絡(luò)編程】高并發(fā)服務(wù)器框架 線程池介紹+線程池封裝

    【Linux網(wǎng)絡(luò)編程】高并發(fā)服務(wù)器框架 線程池介紹+線程池封裝

    前言 一、線程池介紹 ??線程池基本概念 ??線程池組成部分 ??線程池工作原理? 二、線程池代碼封裝 ??main.cpp ??ThreadPool.h ??ThreadPool.cpp ??ChildTask.h? ??ChildTask.cpp ??BaseTask.h ??BaseTask.cpp 三、測試效果 四、總結(jié) ??創(chuàng)建線程池的好處 本文主要學(xué)習(xí) Linux內(nèi)核編程 ,結(jié)合

    2024年01月16日
    瀏覽(33)
  • 【Linux】C++項目實戰(zhàn)-高并發(fā)服務(wù)器詳析

    【Linux】C++項目實戰(zhàn)-高并發(fā)服務(wù)器詳析

    橙色 server_process.c文件內(nèi)容如下: 注意第70行的if(errno == EINTR),如果沒有這個if判斷的話,當(dāng)同時多個客戶端鏈接進來,停掉一個客戶端,然后再啟動一個客戶端,就會發(fā)現(xiàn)沒法連接了,accept會報一個錯誤。因為一個客戶端停掉,在服務(wù)器端就相當(dāng)于一個子進程終止執(zhí)行,會發(fā)

    2024年02月09日
    瀏覽(20)
  • Linux中 socket編程中多進程/多線程TCP并發(fā)服務(wù)器模型

    一次只能處理一個客戶端的請求,等這個客戶端退出后,才能處理下一個客戶端。 缺點:循環(huán)服務(wù)器所處理的客戶端不能有耗時操作。 模型 源碼 可以同時處理多個客戶端請求 父進程 / 主線程專門用于負責(zé)連接,創(chuàng)建子進程 / 分支線程用來與客戶端交互。 模型 源碼 模型 源

    2024年02月12日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包