Master Core  v0.0.9 - 2abfd2849db8ba7a83957c64eb976b406713c123
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012 The Bitcoin developers
2 // Distributed under the MIT/X11 software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef CHECKQUEUE_H
6 #define CHECKQUEUE_H
7 
8 #include <algorithm>
9 #include <vector>
10 
11 #include <boost/foreach.hpp>
12 #include <boost/thread/condition_variable.hpp>
13 #include <boost/thread/locks.hpp>
14 #include <boost/thread/mutex.hpp>
15 
16 template<typename T> class CCheckQueueControl;
17 
27 template<typename T> class CCheckQueue {
28 private:
29  // Mutex to protect the inner state
30  boost::mutex mutex;
31 
32  // Worker threads block on this when out of work
33  boost::condition_variable condWorker;
34 
35  // Master thread blocks on this when out of work
36  boost::condition_variable condMaster;
37 
38  // The queue of elements to be processed.
39  // As the order of booleans doesn't matter, it is used as a LIFO (stack)
40  std::vector<T> queue;
41 
42  // The number of workers (including the master) that are idle.
43  int nIdle;
44 
45  // The total number of workers (including the master).
46  int nTotal;
47 
48  // The temporary evaluation result.
49  bool fAllOk;
50 
51  // Number of verifications that haven't completed yet.
52  // This includes elements that are not anymore in queue, but still in
53  // worker's own batches.
54  unsigned int nTodo;
55 
56  // Whether we're shutting down.
57  bool fQuit;
58 
59  // The maximum number of elements to be processed in one batch
60  unsigned int nBatchSize;
61 
62  // Internal function that does bulk of the verification work.
63  bool Loop(bool fMaster = false) {
64  boost::condition_variable &cond = fMaster ? condMaster : condWorker;
65  std::vector<T> vChecks;
66  vChecks.reserve(nBatchSize);
67  unsigned int nNow = 0;
68  bool fOk = true;
69  do {
70  {
71  boost::unique_lock<boost::mutex> lock(mutex);
72  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
73  if (nNow) {
74  fAllOk &= fOk;
75  nTodo -= nNow;
76  if (nTodo == 0 && !fMaster)
77  // We processed the last element; inform the master he can exit and return the result
78  condMaster.notify_one();
79  } else {
80  // first iteration
81  nTotal++;
82  }
83  // logically, the do loop starts here
84  while (queue.empty()) {
85  if ((fMaster || fQuit) && nTodo == 0) {
86  nTotal--;
87  bool fRet = fAllOk;
88  // reset the status for new work later
89  if (fMaster)
90  fAllOk = true;
91  // return the current status
92  return fRet;
93  }
94  nIdle++;
95  cond.wait(lock); // wait
96  nIdle--;
97  }
98  // Decide how many work units to process now.
99  // * Do not try to do everything at once, but aim for increasingly smaller batches so
100  // all workers finish approximately simultaneously.
101  // * Try to account for idle jobs which will instantly start helping.
102  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
103  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
104  vChecks.resize(nNow);
105  for (unsigned int i = 0; i < nNow; i++) {
106  // We want the lock on the mutex to be as short as possible, so swap jobs from the global
107  // queue to the local batch vector instead of copying.
108  vChecks[i].swap(queue.back());
109  queue.pop_back();
110  }
111  // Check whether we need to do work at all
112  fOk = fAllOk;
113  }
114  // execute work
115  BOOST_FOREACH(T &check, vChecks)
116  if (fOk)
117  fOk = check();
118  vChecks.clear();
119  } while(true);
120  }
121 
122 public:
123  // Create a new check queue
124  CCheckQueue(unsigned int nBatchSizeIn) :
125  nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {}
126 
127  // Worker thread
128  void Thread() {
129  Loop();
130  }
131 
132  // Wait until execution finishes, and return whether all evaluations where succesful.
133  bool Wait() {
134  return Loop(true);
135  }
136 
137  // Add a batch of checks to the queue
138  void Add(std::vector<T> &vChecks) {
139  boost::unique_lock<boost::mutex> lock(mutex);
140  BOOST_FOREACH(T &check, vChecks) {
141  queue.push_back(T());
142  check.swap(queue.back());
143  }
144  nTodo += vChecks.size();
145  if (vChecks.size() == 1)
146  condWorker.notify_one();
147  else if (vChecks.size() > 1)
148  condWorker.notify_all();
149  }
150 
152  }
153 
154  friend class CCheckQueueControl<T>;
155 };
156 
160 template<typename T> class CCheckQueueControl {
161 private:
163  bool fDone;
164 
165 public:
166  CCheckQueueControl(CCheckQueue<T> *pqueueIn) : pqueue(pqueueIn), fDone(false) {
167  // passed queue is supposed to be unused, or NULL
168  if (pqueue != NULL) {
169  assert(pqueue->nTotal == pqueue->nIdle);
170  assert(pqueue->nTodo == 0);
171  assert(pqueue->fAllOk == true);
172  }
173  }
174 
175  bool Wait() {
176  if (pqueue == NULL)
177  return true;
178  bool fRet = pqueue->Wait();
179  fDone = true;
180  return fRet;
181  }
182 
183  void Add(std::vector<T> &vChecks) {
184  if (pqueue != NULL)
185  pqueue->Add(vChecks);
186  }
187 
189  if (!fDone)
190  Wait();
191  }
192 };
193 
194 #endif
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:183
boost::condition_variable condWorker
Definition: checkqueue.h:33
boost::mutex mutex
Definition: checkqueue.h:30
boost::condition_variable condMaster
Definition: checkqueue.h:36
bool Loop(bool fMaster=false)
Definition: checkqueue.h:63
void Thread()
Definition: checkqueue.h:128
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:16
CCheckQueueControl(CCheckQueue< T > *pqueueIn)
Definition: checkqueue.h:166
CCheckQueue(unsigned int nBatchSizeIn)
Definition: checkqueue.h:124
std::vector< T > queue
Definition: checkqueue.h:40
bool fAllOk
Definition: checkqueue.h:49
Queue for verifications that have to be performed.
Definition: checkqueue.h:27
bool Wait()
Definition: checkqueue.h:133
bool fQuit
Definition: checkqueue.h:57
unsigned int nTodo
Definition: checkqueue.h:54
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:138
unsigned int nBatchSize
Definition: checkqueue.h:60
CCheckQueue< T > * pqueue
Definition: checkqueue.h:162