#include #include #include #include #include #include auto constexpr PRODUCTION_DELAY = std::chrono::milliseconds{100}; auto constexpr CONSUMPTION_DELAY = std::chrono::milliseconds{10}; #ifdef DEBUG long constexpr ACCREPORTSTEPS = 10000000; #endif struct buffer_type { static int constexpr size = 10; std::vector value; int n{0}; buffer_type() : value(size) {} #ifdef DEBUG long _num_rd_acc = 0, _num_wr_acc = 0; long _num_rd = 0, _num_wr = 0; void _report() { // Print occasionally if ((_num_rd_acc + _num_wr_acc) % ACCREPORTSTEPS == 0) { std::clog << "current buffer size = " << n << ", rd tries = " << _num_rd_acc << ", wr tries = " << _num_wr_acc << ", rd done = " << _num_rd << ", wr done = " << _num_wr << std::endl; } } #endif }; // Code executed by consumers. void consumer(buffer_type &buffer); // Code executed by producers. void producer(buffer_type &buffer); int main(int argc, char *argv[]) { if (argc != 2) { std::cerr << "Give the number of threads in the command line.\n"; return 1; } int const num_threads = std::stoi(argv[1]); if (num_threads < 2) { std::cerr << "We need at least two threads (one producer, one consumer).\n"; return 2; } // Create the buffer for comunication. buffer_type buffer; // Create one consumer an the other threads are producers. std::vector threads; threads.emplace_back(consumer, std::ref(buffer)); for (int tid = 1; tid < num_threads; ++tid) { threads.emplace_back(producer, std::ref(buffer)); } std::cout << "All threads started and running..." << std::endl; // This never happens. Threads don't stop. for (auto &thd : threads) { thd.join(); } } int get_value(buffer_type &buffer) { while (true) { // Repeat until there is something in the buffer to get. #ifdef DEBUG // For debug, register number of accesses ++buffer._num_rd_acc; // And print occasionally buffer._report(); #endif if (buffer.n > 0) { #ifdef DEBUG ++buffer._num_rd; #endif // Get last value in the buffer. --buffer.n; // If the code is right, this invariant should hold. assert(0 <= buffer.n && buffer.n < buffer.size); return buffer.value[buffer.n]; } } return 0; } void solve_problem(int) { // Do something. std::this_thread::sleep_for(CONSUMPTION_DELAY); } void consumer(buffer_type &buffer) { // Forever read a value and use it. while (true) { // Get value from buffer. int value = get_value(buffer); // Do some useful work with the value. solve_problem(value); } } void put_value(buffer_type &buffer, int val) { while (true) { // Wait until there is place to put the value. #ifdef DEBUG // Count access for debugging. ++buffer._num_wr_acc; // Print occasionally buffer._report(); #endif if (buffer.n < buffer.size) { #ifdef DEBUG ++buffer._num_wr; #endif // There is space. Put value at the end of the buffer. buffer.value[buffer.n] = val; ++buffer.n; // If the code is right, this invariant must hold. assert(0 < buffer.n && buffer.n <= buffer.size); return; } } } int compute_problem() { static int value{0}; std::this_thread::sleep_for(PRODUCTION_DELAY); return value++; } void producer(buffer_type &buffer) { // Forever compute a value and insert in buffer. while (true) { // Compute a value int value = compute_problem(); // Insert in buffer put_value(buffer, value); } }