Skip to content

condition_variable

C++11

condition_variable 用于线程间同步

cpp
#include <condition_variable>

template <class Mutex>
class condition_variable {
public:
    condition_variable();
    ~condition_variable();

    void wait(unique_lock<mutex>& lock);
    void wait_for(unique_lock<mutex>& lock, chrono::duration<Rep, Period> const& rel_time);
    void wait_until(unique_lock<mutex>& lock, chrono::time_point<Clock, Duration> const& abs_time);

    // 唤醒一个等待的线程
    void notify_one() noexcept;
    // 唤醒所有等待的线程
    void notify_all() noexcept;
};

注意事项

  • 使用condition_variable时,必须确保在等待之前获取互斥锁,并且在唤醒后释放互斥锁。
  • wait、wait_for和wait_until函数都会释放互斥锁,然后在等待期间重新获取它。
  • notify_one唤醒一个等待的线程,而notify_all唤醒所有等待的线程。

示例

cpp
#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <chrono>
#include <condition_variable>

std::mutex mtx;                    // 全局互斥锁
std::condition_variable condition; // 条件变量
std::queue<int> queue;             // 队列
const size_t QUEUE_SIZE = 2;       // 队列容量

const int PRODUCER_COUNT = 2;
const int CONSUMER_COUNT = 2;

std::mutex quit_mtx; // 全局互斥锁
int completed_count = 0;

void producer(int id)
{
    for (size_t i = 0; i < 3; i++)
    {
        std::unique_lock<std::mutex> lock(mtx);
        condition.wait(lock, []
                       { return queue.size() < QUEUE_SIZE; });

        // sleep 1s
        std::this_thread::sleep_for(std::chrono::seconds(1));
        int value = id * 10 + i;
        queue.push(value);
        std::cout << "producer<" << id << "> " << value << std::endl;

        condition.notify_one();
        lock.unlock();
    }

    std::unique_lock<std::mutex> lock(quit_mtx);
    completed_count++;
    condition.notify_all();
}

void comsumer(int id)
{
    while (true)
    {
        std::unique_lock<std::mutex> lock(mtx);
        condition.wait(lock, []
                       { return !queue.empty() || completed_count >= PRODUCER_COUNT; });

        // all producter thread completed
        if (completed_count >= PRODUCER_COUNT)
        {
            break;
        }

        if (!queue.empty())
        {
            int value = queue.front();
            queue.pop();

            std::cout << "comsumer<" << id << "> " << value << std::endl;
        }

        condition.notify_one();
        lock.unlock();
    }
}

int main()
{
    std::thread producer_threads[PRODUCER_COUNT];
    std::thread consumer_threads[CONSUMER_COUNT];

    // 拉起线程
    for (size_t i = 0; i < PRODUCER_COUNT; i++)
    {
        producer_threads[i] = std::thread(producer, i + 1);
    }

    for (size_t i = 0; i < CONSUMER_COUNT; i++)
    {
        consumer_threads[i] = std::thread(comsumer, i + 1);
    }

    // 等待线程执行完毕
    for (size_t i = 0; i < PRODUCER_COUNT; i++)
    {
        producer_threads[i].join();
    }

    for (size_t i = 0; i < CONSUMER_COUNT; i++)
    {
        consumer_threads[i].join();
    }
}

输出结果

shell
producer<1> 10
producer<1> 11
comsumer<2> 10
comsumer<2> 11
producer<1> 12
comsumer<2> 12
producer<2> 20
producer<2> 21
comsumer<1> 20
comsumer<1> 21
producer<2> 22
comsumer<1> 22