C++11 并发学习(四) 发表于 2017-02-15 | 阅读次数 学习自大神博客 多线程下生产者消费者模型下面我们来看一下C++11多线程下生产者消费者模型 单生产者-单消费者模型123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#include <mutex>#include <thread>static const int kItemRepositorySize = 10; //Item buffer sizestatic const int kItemsToProduce = 100; //How many items we plan to producestruct ItemRepository { int item_buffer[kItemRepositorySize]; //产品缓冲区, 配合模拟环形队列 size_t read_position; //消费者读取产品位置 size_t write_position; //生产者写入产品位置 std::mutex mtx; //互斥量,保护产品缓冲区 std::condition_variable repo_not_full; //条件变量,指示产品缓冲区不为满 std::condition_variable repo_not_empty; //条件变量,指示产品缓冲区不为空} gItemRepository; //产品库全局变量,生产者和消费者操作该变量typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while (((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) { //buffer is full std::cout << "Producer is waiting for an empty slot...\n"; (ir->repo_not_full).wait(lock); //生产者等待"产品库缓冲区不为满"这一条件发生 } (ir->item_buffer)[ir->write_position] = item; //写入产品 (ir->write_position)++; //写入位置后移 if (ir->write_position == kItemRepositorySize) { //写入位置如果在队列最后 则需要重新设置为初始位置 ir->write_position = 0; } (ir->repo_not_empty).notify_all(); //通知消费者产品库不为空 lock.unlock();}int ConsumeItem(ItemRepository *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); while (ir->write_position == ir->read_position) { std::cout << "Consumer is waiting for items...\n"; (ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生 } data = (ir->item_buffer)[ir->read_position]; //读取产品 (ir->read_position)++; //读取位置后移 if (ir->read_position >= kItemRepositorySize) { //读取位置如果在队列最后 则需要重新设置为初始位置 ir->read_position = 0; } (ir->repo_not_full).notify_all(); lock.unlock(); return data;}//生产者任务void ProducerTask() { for (int i = 1; i < kItemsToProduce; ++i) { //sleep(1); std::cout << "Produce the" << i << "^th item..." << std::endl; ProduceItem(&gItemRepository, i); //循环生产 kItemsToProduce 个产品 }}//消费者任务void ConsumerTask() { static int cnt = 0; while(1) { sleep(1); int item = ConsumeItem(&gItemRepository); //消费一个产品 std::cout << "Consume the" << item << "^th item" << std::endl; if (++cnt == kItemsToProduce) { //如果产品消费个数为 kItemsToProduce, 则退出. break; } }}void InitItemRepository(ItemRepository *ir) { ir->write_position = 0; ir->read_position = 0;}int main() { InitItemRepository(&gItemRepository); std::thread producer(ProducerTask); std::thread consumer(ConsumerTask); producer.join(); consumer.join();} 运行结果: 单生产者-多消费者模型123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123//与单生产者和单消费者模型不同的是,//单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。//所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#include <mutex>#include <thread>static const int kItemRepositorySize = 10; //Item buffer sizestatic const int kItemsToProduce = 100; //How many items we plan to producestruct ItemRepository { int item_buffer[kItemRepositorySize]; //产品缓冲区, 配合模拟环形队列 size_t read_position; //消费者读取产品位置 size_t write_position; //生产者写入产品位置 size_t item_counter; std::mutex item_counter_mtx; std::mutex mtx; //互斥量,保护产品缓冲区 std::condition_variable repo_not_full; //条件变量,指示产品缓冲区不为满 std::condition_variable repo_not_empty; //条件变量,指示产品缓冲区不为空} gItemRepository; //产品库全局变量,生产者和消费者操作该变量typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while (((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) { //buffer is full std::cout << "Producer is waiting for an empty slot...\n"; (ir->repo_not_full).wait(lock); //生产者等待"产品库缓冲区不为满"这一条件发生 } (ir->item_buffer)[ir->write_position] = item; //写入产品 (ir->write_position)++; //写入位置后移 if (ir->write_position == kItemRepositorySize) { //写入位置如果在队列最后 则需要重新设置为初始位置 ir->write_position = 0; } (ir->repo_not_empty).notify_all(); //通知消费者产品库不为空 lock.unlock();}int ConsumeItem(ItemRepository *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); while (ir->write_position == ir->read_position) { std::cout << "Consumer is waiting for items...\n"; (ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生 } data = (ir->item_buffer)[ir->read_position]; //读取产品 (ir->read_position)++; //读取位置后移 if (ir->read_position >= kItemRepositorySize) { //读取位置如果在队列最后 则需要重新设置为初始位置 ir->read_position = 0; } (ir->repo_not_full).notify_all(); lock.unlock(); return data;}//生产者任务void ProducerTask() { for (int i = 1; i < kItemsToProduce; ++i) { //sleep(1); std::cout << "Producer thread" << std::this_thread::get_id() << "Produce the" << i << "^th item..." << std::endl; ProduceItem(&gItemRepository, i); //循环生产 kItemsToProduce 个产品 } std::cout << "Producer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;}//消费者任务void ConsumerTask() { bool ready_to_exit = false; while(1) { sleep(1); std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx); if (gItemRepository.item_counter < kItemsToProduce) { int item = ConsumeItem(&gItemRepository); ++(gItemRepository.item_counter); std::cout << "Consumer thread" << std::this_thread::get_id() << "is consuming the" << item << "^th item"<< std::endl; } else ready_to_exit = true; lock.unlock(); if (ready_to_exit == true) { break; } } std::cout << "Consumer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;}void InitItemRepository(ItemRepository *ir) { ir->write_position = 0; ir->read_position = 0; ir->item_counter = 0;}int main() { InitItemRepository(&gItemRepository); std::thread producer(ProducerTask); std::thread consumer1(ConsumerTask); std::thread consumer2(ConsumerTask); std::thread consumer3(ConsumerTask); std::thread consumer4(ConsumerTask); producer.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join();} 运行结果: 多生产者-单消费者模型123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126//与单生产者和单消费者模型不同的是,//单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。//所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#include <mutex>#include <thread>static const int kItemRepositorySize = 10; //Item buffer sizestatic const int kItemsToProduce = 100; //How many items we plan to producestruct ItemRepository { int item_buffer[kItemRepositorySize]; //产品缓冲区, 配合模拟环形队列 size_t read_position; //消费者读取产品位置 size_t write_position; //生产者写入产品位置 size_t item_counter; std::mutex item_counter_mtx; std::mutex mtx; //互斥量,保护产品缓冲区 std::condition_variable repo_not_full; //条件变量,指示产品缓冲区不为满 std::condition_variable repo_not_empty; //条件变量,指示产品缓冲区不为空} gItemRepository; //产品库全局变量,生产者和消费者操作该变量typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while (((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) { //buffer is full std::cout << "Producer is waiting for an empty slot...\n"; (ir->repo_not_full).wait(lock); //生产者等待"产品库缓冲区不为满"这一条件发生 } (ir->item_buffer)[ir->write_position] = item; //写入产品 (ir->write_position)++; //写入位置后移 if (ir->write_position == kItemRepositorySize) { //写入位置如果在队列最后 则需要重新设置为初始位置 ir->write_position = 0; } (ir->repo_not_empty).notify_all(); //通知消费者产品库不为空 lock.unlock();}int ConsumeItem(ItemRepository *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); while (ir->write_position == ir->read_position) { std::cout << "Consumer is waiting for items...\n"; (ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生 } data = (ir->item_buffer)[ir->read_position]; //读取产品 (ir->read_position)++; //读取位置后移 if (ir->read_position >= kItemRepositorySize) { //读取位置如果在队列最后 则需要重新设置为初始位置 ir->read_position = 0; } (ir->repo_not_full).notify_all(); lock.unlock(); return data;}//生产者任务void ProducerTask() { bool ready_to_exit = false; while (1) { sleep(1); std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx); if (gItemRepository.item_counter < kItemsToProduce) { ++(gItemRepository.item_counter); ProduceItem(&gItemRepository, gItemRepository.item_counter); std::cout << "Producer thread" << std::this_thread::get_id() << "is producing the" << gItemRepository.item_counter << "^th item..." << std::endl; } else ready_to_exit = true; lock.unlock(); if (ready_to_exit == true) { break; } } std::cout << "Producer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;}//消费者任务void ConsumerTask() { static int item_consumed = 0; while (1) { sleep(1); ++item_consumed; if (item_consumed <= kItemsToProduce) { int item = ConsumeItem(&gItemRepository); std::cout << "Consumer thread" << std::this_thread::get_id() << "is consuming the" << item << "^th item"<< std::endl; } else break; } std::cout << "Consumer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;}void InitItemRepository(ItemRepository *ir) { ir->write_position = 0; ir->read_position = 0; ir->item_counter = 0;}int main() { InitItemRepository(&gItemRepository); std::thread producer1(ProducerTask); std::thread producer2(ProducerTask); std::thread producer3(ProducerTask); std::thread producer4(ProducerTask); std::thread consumer(ConsumerTask); producer1.join(); producer2.join(); producer3.join(); producer4.join(); consumer.join();} 运行结果: 多生产者-多消费者模型123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141//该模型可以说是前面两种模型的综合,程序需要维护两个计数器,//分别是生产者已生产产品的数目和消费者已取走产品的数目。//另外也需要保护产品库在多个生产者和多个消费者互斥地访问#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#include <mutex>#include <thread>static const int kItemRepositorySize = 10; //Item buffer sizestatic const int kItemsToProduce = 100; //How many items we plan to producestruct ItemRepository { int item_buffer[kItemRepositorySize]; //产品缓冲区, 配合模拟环形队列 size_t read_position; //消费者读取产品位置 size_t write_position; //生产者写入产品位置 size_t produced_item_counter; size_t consumed_item_counter; std::mutex produced_item_counter_mtx; std::mutex consumed_item_counter_mtx; std::mutex mtx; //互斥量,保护产品缓冲区 std::condition_variable repo_not_full; //条件变量,指示产品缓冲区不为满 std::condition_variable repo_not_empty; //条件变量,指示产品缓冲区不为空} gItemRepository; //产品库全局变量,生产者和消费者操作该变量typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while (((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) { //buffer is full std::cout << "Producer is waiting for an empty slot...\n"; (ir->repo_not_full).wait(lock); //生产者等待"产品库缓冲区不为满"这一条件发生 } (ir->item_buffer)[ir->write_position] = item; //写入产品 (ir->write_position)++; //写入位置后移 if (ir->write_position == kItemRepositorySize) { //写入位置如果在队列最后 则需要重新设置为初始位置 ir->write_position = 0; } (ir->repo_not_empty).notify_all(); //通知消费者产品库不为空 lock.unlock();}int ConsumeItem(ItemRepository *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); while (ir->write_position == ir->read_position) { std::cout << "Consumer is waiting for items...\n"; (ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生 } data = (ir->item_buffer)[ir->read_position]; //读取产品 (ir->read_position)++; //读取位置后移 if (ir->read_position >= kItemRepositorySize) { //读取位置如果在队列最后 则需要重新设置为初始位置 ir->read_position = 0; } (ir->repo_not_full).notify_all(); lock.unlock(); return data;}//生产者任务void ProducerTask() { bool ready_to_exit = false; while (1) { sleep(1); std::unique_lock<std::mutex> lock(gItemRepository.produced_item_counter_mtx); if (gItemRepository.produced_item_counter < kItemsToProduce) { ++(gItemRepository.produced_item_counter); ProduceItem(&gItemRepository, gItemRepository.produced_item_counter); std::cout << "Producer thread" << std::this_thread::get_id() << "is producing the" << gItemRepository.produced_item_counter << "^th item..." << std::endl; } else ready_to_exit = true; lock.unlock(); if (ready_to_exit == true) { break; } } std::cout << "Producer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;}//消费者任务void ConsumerTask() { bool ready_to_exit = false; while (1) { sleep(1); std::unique_lock<std::mutex> lock(gItemRepository.consumed_item_counter_mtx); if (gItemRepository.consumed_item_counter <= kItemsToProduce) { int item = ConsumeItem(&gItemRepository); ++(gItemRepository.consumed_item_counter); std::cout << "Consumer thread" << std::this_thread::get_id() << "is consuming the" << gItemRepository.consumed_item_counter << "^th item"<< std::endl; } else ready_to_exit = true; lock.unlock(); if (ready_to_exit == true) { break; } } std::cout << "Consumer thread" << std::this_thread::get_id() << "is exiting..." << std::endl;}void InitItemRepository(ItemRepository *ir) { ir->write_position = 0; ir->read_position = 0; ir->produced_item_counter = 0; ir->consumed_item_counter = 0;}int main() { InitItemRepository(&gItemRepository); std::thread producer1(ProducerTask); std::thread producer2(ProducerTask); std::thread producer3(ProducerTask); std::thread producer4(ProducerTask); std::thread consumer1(ConsumerTask); std::thread consumer2(ConsumerTask); std::thread consumer3(ConsumerTask); std::thread consumer4(ConsumerTask); producer1.join(); producer2.join(); producer3.join(); producer4.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join();} 运行结果: