#include #include #include #include #include #include #include auto constexpr PRODUCTION_DELAY = std::chrono::milliseconds{100}; auto constexpr CONSUMPTION_DELAY = std::chrono::milliseconds{10}; #ifdef DEBUG long constexpr ACCREPORTSTEPS = 10000; #endif struct buffer_type { static int constexpr size = 10; std::vector value; int n{0}; // Use a mutex to synchronise access. std::mutex mutex; buffer_type() : value(size) {} #ifdef DEBUG long _num_rd_acc = 0, _num_wr_acc = 0; long _num_rd = 0, _num_wr = 0; void _report() { // Print occasionally if ((_num_rd_acc + _num_wr_acc) % ACCREPORTSTEPS == 0) { std::clog << "current buffer size = " << n << ", rd tries = " << _num_rd_acc << ", wr tries = " << _num_wr_acc << ", rd done = " << _num_rd << ", wr done = " << _num_wr << std::endl; } } #endif }; // Code executed by consumers. void consumer(buffer_type &buffer); // Code executed by producers. void producer(buffer_type &buffer); int main(int argc, char *argv[]) { if (argc != 2) { std::cerr << "Give the number of threads in the command line.\n"; return 1; } int const num_threads = std::stoi(argv[1]); if (num_threads < 2) { std::cerr << "We need at least two threads (one producer, one consumer).\n"; return 2; } // Create the buffer for comunication. buffer_type buffer; // Create one consumer an the other threads are producers. std::vector threads; threads.emplace_back(consumer, std::ref(buffer)); for (int tid = 1; tid < num_threads; ++tid) { threads.emplace_back(producer, std::ref(buffer)); } std::cout << "All threads started and running..." << std::endl; // This never happens. Threads don't stop. for (auto &thd : threads) { thd.join(); } } int get_value(buffer_type &buffer) { while (true) { // Repeat until there is something in the buffer to get. { // For synchronisation, we aquire the mutex of the buffer. std::lock_guard guard(buffer.mutex); #ifdef DEBUG // For debug, register number of accesses ++buffer._num_rd_acc; // And print occasionally buffer._report(); #endif if (buffer.n > 0) { #ifdef DEBUG ++buffer._num_rd; #endif // Get last value in the buffer. buffer.n--; // If the code is right, this invariant should hold. assert(0 <= buffer.n && buffer.n < buffer.size); return buffer.value[buffer.n]; } } // We coudn't do our work. Let other threads try. std::this_thread::yield(); } return 0; } void solve_problem(int) { // Do something. std::this_thread::sleep_for(CONSUMPTION_DELAY); } void consumer(buffer_type &buffer) { // Forever read a value and use it. while (true) { // Get value from buffer. int value = get_value(buffer); // Do some useful work with the value. solve_problem(value); } } void put_value(buffer_type &buffer, int val) { while (true) { // Wait until there is place to put the value. { // For synchronisation, we aquire the mutex of the buffer. std::lock_guard guard(buffer.mutex); #ifdef DEBUG // For debug, register number of accesses ++buffer._num_wr_acc; // And print occasionally buffer._report(); #endif if (buffer.n < buffer.size) { #ifdef DEBUG ++buffer._num_wr; #endif // There is space. Put value at the end of the buffer. buffer.value[buffer.n] = val; buffer.n++; // If the code is right, this invariant must hold. assert(0 < buffer.n && buffer.n <= buffer.size); return; } } // We couldn't do our work. Let other thread try. std::this_thread::yield(); } } int compute_problem() { static std::mutex mut_value; static int value{0}; std::this_thread::sleep_for(PRODUCTION_DELAY); std::lock_guard lock(mut_value); return value++; } void producer(buffer_type &buffer) { // Forever compute a value and insert in buffer. while (true) { // Compute a value int value = compute_problem(); // Insert in buffer put_value(buffer, value); } }