One of these patterns is having a blocking queue between a producer thread and a consumer thread. The two threads share no objects directly (no need to lock), but the producer passes "completed" objects to the consumer to process via the queue. It's possible that the consumer may starve or that the producer may have the opposite problem and produce too much (spew?).
Java has elegant objects for concurrency, like the suite of BlockingQueues. A favorite of mine is the LinkedBlockingQueue, so I created a comparable class for C++11 called ConcurrentBlockingQueue. I did not implement every feature (some are impossible in C++, like peek and poll), but this should offer all that's needed to implement a safe producer/consumer model.
If the consumer starves, it will block until data is ready on the queue. With the new C++11 condition_variable, the blocking is incredibly efficient (the producer sends a notify signal when data is ready). If the producer produces like a good Quant (too much data to handle), then it will wait until the queue has space available. If the producer can't wait, it's possible to create an unbounded queue that expands in memory.
Apologies for the length of the code in the post. I plan to share source code files on github and will link here when available.
This was compiled with gcc version 4.8.4 using the following command:
g++ -std=c++11 cbq.cpp -lpthread -o cbq
Output was:
Put 1 Put 2 Put 3 Put 4 Put 5 Put 6 1 :: 2 :: 3 :: Put 7 Put 8 Put 9 4 :: 5 :: 6 :: Put 10 Put 11 Put 12 7 :: 8 :: 9 :: Put 13 Put 14 Put 15 10 :: 11 :: 12 :: Put 16 Put 17 Put 18 13 :: 14 :: 15 :: Put 19 Put 20 16 :: 17 :: 18 :: 19 :: 20 ::
#include <iostream> #include <limits> #include <cstddef> #include <vector> #include <list> #include <string> #include <thread> #include <chrono> #include <condition_variable> #include <mutex> template <class _Tp, class _Alloc = std::allocator<_Tp>> class ConcurrentBlockingQueue { public: ConcurrentBlockingQueue(){} ConcurrentBlockingQueue(size_t size){ max_size = size; } size_t remainingCapacity(); size_t size(); void clear(); void empty_block(); void put(_Tp e); bool offer(_Tp e); template<class _Clock, class _Duration> bool offer(_Tp e, const std::chrono::duration<_Clock, _Duration>& sleep_duration); _Tp take(); template<class Container> int drainTo(Container & c); template<class Container> int drainTo(Container & c, size_t maxElements); //The C++ spec makes poll() impossible (can't return null reference) //Perhaps it will be available in C++17 if they introduce ::optional //For poll() functionality, use drainTo(container, 1), check return value and/or container contents protected: size_t max_size = 0; std::condition_variable empty_cond, full_cond; std::mutex mmutex, empty_mutex, full_mutex; std::list<_Tp, _Alloc> lst; }; template <class _Tp, class _Alloc> size_t ConcurrentBlockingQueue<_Tp, _Alloc>::remainingCapacity(){ if (max_size == 0) return std::numeric_limits<size_t>::max(); return max_size - size(); } template <class _Tp, class _Alloc> size_t ConcurrentBlockingQueue<_Tp, _Alloc>::size(){ std::unique_lock<std::mutex> lck {mmutex}; return lst.size(); } template <class _Tp, class _Alloc> void ConcurrentBlockingQueue<_Tp, _Alloc>::clear(){ std::unique_lock<std::mutex> full_lock {mmutex}; lst.clear(); full_cond.notify_all(); } template <class _Tp, class _Alloc> void ConcurrentBlockingQueue<_Tp, _Alloc>::put(_Tp e){ std::unique_lock<std::mutex> full_lock {full_mutex}; while(remainingCapacity() <= 0){ full_cond.wait(full_lock); } std::unique_lock<std::mutex> lck {mmutex}; lst.push_back(e); empty_cond.notify_all(); } template <class _Tp, class _Alloc> bool ConcurrentBlockingQueue<_Tp, _Alloc>::offer(_Tp e){ return offer(e, std::chrono::seconds{0}); } template <class _Tp, class _Alloc> template<class _Clock, class _Duration> bool ConcurrentBlockingQueue<_Tp, _Alloc>::offer(_Tp e, const std::chrono::duration<_Clock, _Duration>& timeout_duration){ std::chrono::duration<_Clock, _Duration> z{0}; if(timeout_duration > z){ auto timeout_point = std::chrono::system_clock::now() + timeout_duration; std::unique_lock<std::mutex> full_lock {full_mutex}; while(remainingCapacity() <= 0 && std::chrono::system_clock::now() < timeout_point){ full_cond.wait_until(full_lock, timeout_point); } } if(remainingCapacity() <= 0){ return false; } std::unique_lock<std::mutex> lck {mmutex}; lst.push_back(e); empty_cond.notify_all(); return true; } template <class _Tp, class _Alloc> void ConcurrentBlockingQueue<_Tp, _Alloc>::empty_block(){ std::unique_lock<std::mutex> empty_lock {empty_mutex}; while(size() == 0){ empty_cond.wait(empty_lock); } } template <class _Tp, class _Alloc> _Tp ConcurrentBlockingQueue<_Tp, _Alloc>::take(){ empty_block(); std::unique_lock<std::mutex> lck {mmutex}; _Tp e = lst.front(); lst.pop_front(); full_cond.notify_all(); return e; } template <class _Tp, class _Alloc> template <class Container> int ConcurrentBlockingQueue<_Tp, _Alloc>::drainTo(Container & c, size_t maxElements){ std::unique_lock<std::mutex> lck {mmutex}; auto begin_it = lst.begin(); auto end_it = lst.end(); if(maxElements > 0 && maxElements <= lst.size()){ end_it = lst.begin(); std::advance(end_it, maxElements); } std::copy(begin_it, end_it, std::back_inserter(c)); lst.erase(begin_it, end_it); full_cond.notify_all(); } template <class _Tp, class _Alloc> template <class Container> int ConcurrentBlockingQueue<_Tp, _Alloc>::drainTo(Container & c){ return drainTo(c, 0); } /* The rest of the code is for demonstration of the ConcurrentBlockingQueue */ using std::thread; using std::vector; using std::string; using std::cout; using std::endl; string Shutdown {"shutdown"}; bool isShutdown(string s){ return Shutdown.compare(s) == 0; } void producer_function(int putMethod, ConcurrentBlockingQueue<string> & q, int timeout_ms){ int count = 0; while(true){ if(timeout_ms > 0){ std::this_thread::sleep_for(std::chrono::milliseconds{timeout_ms}); } string s {std::to_string(++count)}; if(putMethod == 1){ cout << "Offer " << s; //bool in = q.offer(s, std::chrono::seconds{2}); bool in = q.offer(s); //Can insert first N, then rest will immediately fail cout << " " << in << endl; } else { cout << "Put " << s << endl; q.put(s); } if(count >= 20){ q.put(Shutdown); break; } } } void consumer_function(int takeMethod, ConcurrentBlockingQueue<string> & q, int timeout_ms){ vector<string> v; bool shutdown = false; while(!shutdown){ if(timeout_ms > 0){ std::this_thread::sleep_for(std::chrono::milliseconds{timeout_ms}); } if(takeMethod == 1){ v.clear(); q.empty_block(); q.drainTo(v, 3); for(auto s : v){ if(isShutdown(s)){ shutdown = true; } else { cout << s << " :: "; } } if (v.size() > 0) cout << endl; } else { string s = q.take(); if(isShutdown(s)){ shutdown = true; } else { cout << "Took " << s << endl; } } } } int main(int argc, char * argv []){ ConcurrentBlockingQueue<string> q{5}; thread producer {producer_function, 0, ref(q), 0}; thread consumer {consumer_function, 1, ref(q), 1000}; producer.join(); consumer.join(); }
No comments:
Post a Comment