#include #include #include #include #include #include #include auto constexpr PRODUCTION_DELAY = std::chrono::milliseconds{100}; auto constexpr CONSUMPTION_DELAY = std::chrono::milliseconds{10}; #ifdef DEBUG long constexpr ACCREPORTSTEPS = 10; #endif struct buffer_type { static int constexpr size = 10; std::vector value; int n{0}; // Use a mutex to synchronise access. std::mutex mutex; // Two conditions to wait for: // The buffer has some value(s). std::condition_variable has_value; // The buffer has some empty space(s). std::condition_variable has_space; buffer_type() : value(size) {} #ifdef DEBUG long _num_rd_acc = 0, _num_wr_acc = 0; long _num_rd = 0, _num_wr = 0; void _report() { // Print occasionally if ((_num_rd_acc + _num_wr_acc) % ACCREPORTSTEPS == 0) { std::clog << "current buffer size = " << n << ", rd tries = " << _num_rd_acc << ", wr tries = " << _num_wr_acc << ", rd done = " << _num_rd << ", wr done = " << _num_wr << std::endl; } } #endif }; // Code executed by consumers. void consumer(buffer_type &buffer); // Code executed by producers. void producer(buffer_type &buffer); int main(int argc, char *argv[]) { if (argc != 2) { std::cerr << "Give the number of threads in the command line.\n"; return 1; } int const num_threads = std::stoi(argv[1]); if (num_threads < 2) { std::cerr << "We need at least two threads (one producer, one consumer).\n"; return 2; } // Create the buffer for comunication. buffer_type buffer; // Create one consumer an the other threads are producers. std::vector threads; threads.emplace_back(consumer, std::ref(buffer)); for (int tid = 1; tid < num_threads; ++tid) { threads.emplace_back(producer, std::ref(buffer)); } std::cout << "All threads started and running..." << std::endl; // Execution stops here: The threads never finish. for (auto &thd : threads) { thd.join(); } } int get_value(buffer_type &buffer) { // For synchronisation, we aquire the mutex of the buffer. // unique_lock works better with condition variables. std::unique_lock lock(buffer.mutex); #ifdef DEBUG // For debug, register number of accesses ++buffer._num_rd_acc; // And print occasionally buffer._report(); #endif // First we wait until the buffer has some value to get. buffer.has_value.wait(lock, [&buffer] { return buffer.n > 0; }); #ifdef DEBUG ++buffer._num_rd; #endif // There is some value, get the last one. buffer.n--; // If there were no space previously, we signal that now there is. if (buffer.n == buffer.size - 1) { buffer.has_space.notify_one(); } // If the code is right, this invariant should hold. assert(0 <= buffer.n && buffer.n < buffer.size); // Read and return the value. return buffer.value[buffer.n]; } void solve_problem(int) { // Do something. std::this_thread::sleep_for(CONSUMPTION_DELAY); // sleep(1); } void consumer(buffer_type &buffer) { // Forever read a value and use it. while (true) { // Get value from buffer. int value = get_value(buffer); // Do some useful work with the value. solve_problem(value); } } void put_value(buffer_type &buffer, int val) { // For synchronisation, we aquire the mutex of the buffer. // unique_lock works better with condition variables. std::unique_lock lock(buffer.mutex); #ifdef DEBUG // Count access for debugging. ++buffer._num_wr_acc; // Print occasionally buffer._report(); #endif // First we wait until the buffer has some space where to put the value. buffer.has_space.wait(lock, [&buffer] { return buffer.n < buffer.size; }); #ifdef DEBUG ++buffer._num_wr; #endif // Now that we have the space, we put the value there. buffer.value[buffer.n] = val; buffer.n++; // If the buffer was empty before, signal possible waiting threads. if (buffer.n == 1) { buffer.has_value.notify_one(); } // If the code is right, this invariant must hold. assert(0 < buffer.n && buffer.n <= buffer.size); } int compute_problem() { static std::mutex mut_value; static int value{0}; std::this_thread::sleep_for(PRODUCTION_DELAY); std::lock_guard lock(mut_value); return value++; } void producer(buffer_type &buffer) { // Forever compute a value and insert in buffer. while (true) { // Compute a value int value = compute_problem(); // Insert in buffer put_value(buffer, value); } }