Thursday, November 5, 2015

C++11 ConcurrentBlockingQueue

I love the new thread model introduced in C++11; it is so much cleaner than trying to wrangle pthreads. However, the existing model is still too low level in general. Concurrent programming should only involve mutexes and other synchronization primitives when absolutely necessary. In general, threading can be abstracted to a few patterns that ensure safety while offering efficiency.

One of these patterns is having a blocking queue between a producer thread and a consumer thread. The two threads share no objects directly (no need to lock), but the producer passes "completed" objects to the consumer to process via the queue. It's possible that the consumer may starve or that the producer may have the opposite problem and produce too much (spew?).

Java has elegant objects for concurrency, like the suite of BlockingQueues. A favorite of mine is the LinkedBlockingQueue, so I created a comparable class for C++11 called ConcurrentBlockingQueue. I did not implement every feature (some are impossible in C++, like peek and poll), but this should offer all that's needed to implement a safe producer/consumer model.

If the consumer starves, it will block until data is ready on the queue. With the new C++11 condition_variable, the blocking is incredibly efficient (the producer sends a notify signal when data is ready). If the producer produces like a good Quant (too much data to handle), then it will wait until the queue has space available. If the producer can't wait, it's possible to create an unbounded queue that expands in memory.

Apologies for the length of the code in the post. I plan to share source code files on github and will link here when available.

This was compiled with gcc version 4.8.4 using the following command:
 g++ -std=c++11 cbq.cpp -lpthread -o cbq

Output was:
Put 1
Put 2
Put 3
Put 4
Put 5
Put 6
1 :: 2 :: 3 :: 
Put 7
Put 8
Put 9
4 :: 5 :: 6 :: 
Put 10
Put 11
Put 12
7 :: 8 :: 9 :: 
Put 13
Put 14
Put 15
10 :: 11 :: 12 :: 
Put 16
Put 17
Put 18
13 :: 14 :: 15 :: 
Put 19
Put 20
16 :: 17 :: 18 :: 
19 :: 20 :: 

#include <iostream>
#include <limits>
#include <cstddef>
#include <vector>
#include <list>
#include <string>
#include <thread>
#include <chrono>
#include <condition_variable>
#include <mutex>

template <class _Tp, class _Alloc = std::allocator<_Tp>>
class ConcurrentBlockingQueue {
 public:
  ConcurrentBlockingQueue(){}
  ConcurrentBlockingQueue(size_t size){
   max_size = size;
  }

  size_t remainingCapacity();
  size_t size();
  void clear();
  void empty_block();

  void put(_Tp e);
  bool offer(_Tp e);
  template<class _Clock, class _Duration>
  bool offer(_Tp e, const std::chrono::duration<_Clock, _Duration>& sleep_duration);

  _Tp take();
  template<class Container>
  int drainTo(Container & c);
  template<class Container>
  int drainTo(Container & c, size_t maxElements);

  //The C++ spec makes poll() impossible (can't return null reference)
  //Perhaps it will be available in C++17 if they introduce ::optional
  //For poll() functionality, use drainTo(container, 1), check return value and/or container contents
 protected:
  size_t max_size = 0;

  std::condition_variable empty_cond, full_cond;
  std::mutex mmutex, empty_mutex, full_mutex;

  std::list<_Tp, _Alloc> lst;
};


template <class _Tp, class _Alloc>
size_t ConcurrentBlockingQueue<_Tp, _Alloc>::remainingCapacity(){
 if (max_size == 0) return std::numeric_limits<size_t>::max();
 return max_size - size();
}

template <class _Tp, class _Alloc>
size_t ConcurrentBlockingQueue<_Tp, _Alloc>::size(){
 std::unique_lock<std::mutex> lck {mmutex};
 return lst.size();
}

template <class _Tp, class _Alloc>
void ConcurrentBlockingQueue<_Tp, _Alloc>::clear(){
 std::unique_lock<std::mutex> full_lock {mmutex};
 lst.clear();
 full_cond.notify_all();
}

template <class _Tp, class _Alloc>
void ConcurrentBlockingQueue<_Tp, _Alloc>::put(_Tp e){
 std::unique_lock<std::mutex> full_lock {full_mutex};
 while(remainingCapacity() <= 0){
  full_cond.wait(full_lock);
 }
 std::unique_lock<std::mutex> lck {mmutex};
 lst.push_back(e);
 empty_cond.notify_all();
}

template <class _Tp, class _Alloc>
bool ConcurrentBlockingQueue<_Tp, _Alloc>::offer(_Tp e){
 return offer(e, std::chrono::seconds{0});
}

template <class _Tp, class _Alloc>
template<class _Clock, class _Duration>
bool ConcurrentBlockingQueue<_Tp, _Alloc>::offer(_Tp e, const std::chrono::duration<_Clock, _Duration>& timeout_duration){
 std::chrono::duration<_Clock, _Duration> z{0};
 if(timeout_duration > z){
  auto timeout_point = std::chrono::system_clock::now() + timeout_duration;
  std::unique_lock<std::mutex> full_lock {full_mutex};
  while(remainingCapacity() <= 0 && std::chrono::system_clock::now() < timeout_point){
   full_cond.wait_until(full_lock, timeout_point);
  }
 }
 if(remainingCapacity() <= 0){
  return false;
 }
 std::unique_lock<std::mutex> lck {mmutex};
 lst.push_back(e);
 empty_cond.notify_all();
 return true;
}

template <class _Tp, class _Alloc>
void ConcurrentBlockingQueue<_Tp, _Alloc>::empty_block(){
 std::unique_lock<std::mutex> empty_lock {empty_mutex};
 while(size() == 0){
  empty_cond.wait(empty_lock);
 }
}

template <class _Tp, class _Alloc>
_Tp ConcurrentBlockingQueue<_Tp, _Alloc>::take(){
 empty_block();
 std::unique_lock<std::mutex> lck {mmutex};
 _Tp e = lst.front();
 lst.pop_front();
 full_cond.notify_all();
 return e;
}

template <class _Tp, class _Alloc>
template <class Container>
int ConcurrentBlockingQueue<_Tp, _Alloc>::drainTo(Container & c, size_t maxElements){
 std::unique_lock<std::mutex> lck {mmutex};
 auto begin_it = lst.begin();
 auto end_it = lst.end();
 if(maxElements > 0 && maxElements <= lst.size()){
  end_it = lst.begin();
  std::advance(end_it, maxElements);
 }
 std::copy(begin_it, end_it, std::back_inserter(c));
 lst.erase(begin_it, end_it);
 full_cond.notify_all();
}

template <class _Tp, class _Alloc>
template <class Container>
int ConcurrentBlockingQueue<_Tp, _Alloc>::drainTo(Container & c){
 return drainTo(c, 0);
}

/*

The rest of the code is for demonstration of the ConcurrentBlockingQueue

*/

using std::thread;
using std::vector;
using std::string;
using std::cout;
using std::endl;

string Shutdown {"shutdown"};
bool isShutdown(string s){
 return Shutdown.compare(s) == 0;
}

void producer_function(int putMethod, ConcurrentBlockingQueue<string> & q, int timeout_ms){
 int count = 0;
 
 while(true){
  if(timeout_ms > 0){
   std::this_thread::sleep_for(std::chrono::milliseconds{timeout_ms});
  }
  
  string s {std::to_string(++count)};
  if(putMethod == 1){
   cout << "Offer " << s;
   //bool in = q.offer(s, std::chrono::seconds{2});
   bool in = q.offer(s); //Can insert first N, then rest will immediately fail
   cout << " " << in << endl;
  } else {
   cout << "Put " << s << endl;
   q.put(s);
  }
  if(count >= 20){
   q.put(Shutdown);
   break;
  }
 }
}

void consumer_function(int takeMethod, ConcurrentBlockingQueue<string> & q, int timeout_ms){
 vector<string> v;
 bool shutdown = false;
 while(!shutdown){
  if(timeout_ms > 0){
   std::this_thread::sleep_for(std::chrono::milliseconds{timeout_ms});
  }

  if(takeMethod == 1){
   v.clear();
   q.empty_block();
   q.drainTo(v, 3);
   for(auto s : v){
    if(isShutdown(s)){
     shutdown = true;
    } else {
     cout << s << " :: ";
    }
   }
   if (v.size() > 0) cout << endl;
  }
  else {
   string s = q.take();
   if(isShutdown(s)){
    shutdown = true;
   } else {
    cout << "Took " << s << endl;
   }
  }
 }
}

int main(int argc, char * argv []){
 ConcurrentBlockingQueue<string> q{5};
 thread producer {producer_function, 0, ref(q), 0};
 thread consumer {consumer_function, 1, ref(q), 1000};
 producer.join();
 consumer.join();
}

No comments:

Post a Comment