Line data Source code
1 : // Copyright (c) 2012-2014 The Bitcoin Core developers
2 : // Distributed under the MIT software license, see the accompanying
3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 :
5 : #ifndef BITCOIN_CHECKQUEUE_H
6 : #define BITCOIN_CHECKQUEUE_H
7 :
8 : #include <algorithm>
9 : #include <vector>
10 :
11 : #include <boost/foreach.hpp>
12 : #include <boost/thread/condition_variable.hpp>
13 : #include <boost/thread/locks.hpp>
14 : #include <boost/thread/mutex.hpp>
15 :
16 : template <typename T>
17 : class CCheckQueueControl;
18 :
19 : /**
20 : * Queue for verifications that have to be performed.
21 : * The verifications are represented by a type T, which must provide an
22 : * operator(), returning a bool.
23 : *
24 : * One thread (the master) is assumed to push batches of verifications
25 : * onto the queue, where they are processed by N-1 worker threads. When
26 : * the master is done adding work, it temporarily joins the worker pool
27 : * as an N'th worker, until all jobs are done.
28 : */
29 : template <typename T>
30 : class CCheckQueue
31 : {
32 : private:
33 : //! Mutex to protect the inner state
34 : boost::mutex mutex;
35 :
36 : //! Worker threads block on this when out of work
37 : boost::condition_variable condWorker;
38 :
39 : //! Master thread blocks on this when out of work
40 : boost::condition_variable condMaster;
41 :
42 : //! The queue of elements to be processed.
43 : //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
44 : std::vector<T> queue;
45 :
46 : //! The number of workers (including the master) that are idle.
47 : int nIdle;
48 :
49 : //! The total number of workers (including the master).
50 : int nTotal;
51 :
52 : //! The temporary evaluation result.
53 : bool fAllOk;
54 :
55 : /**
56 : * Number of verifications that haven't completed yet.
57 : * This includes elements that are no longer queued, but still in the
58 : * worker's own batches.
59 : */
60 : unsigned int nTodo;
61 :
62 : //! Whether we're shutting down.
63 : bool fQuit;
64 :
65 : //! The maximum number of elements to be processed in one batch
66 : unsigned int nBatchSize;
67 :
68 : /** Internal function that does bulk of the verification work. */
69 7140 : bool Loop(bool fMaster = false)
70 : {
71 7140 : boost::condition_variable& cond = fMaster ? condMaster : condWorker;
72 : std::vector<T> vChecks;
73 7140 : vChecks.reserve(nBatchSize);
74 : unsigned int nNow = 0;
75 : bool fOk = true;
76 : do {
77 : {
78 8727 : boost::unique_lock<boost::mutex> lock(mutex);
79 : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
80 8727 : if (nNow) {
81 1586 : fAllOk &= fOk;
82 1586 : nTodo -= nNow;
83 1586 : if (nTodo == 0 && !fMaster)
84 : // We processed the last element; inform the master it can exit and return the result
85 215 : condMaster.notify_one();
86 : } else {
87 : // first iteration
88 7141 : nTotal++;
89 : }
90 : // logically, the do loop starts here
91 19004 : while (queue.empty()) {
92 7916 : if ((fMaster || fQuit) && nTodo == 0) {
93 6809 : nTotal--;
94 6809 : bool fRet = fAllOk;
95 : // reset the status for new work later
96 6809 : if (fMaster)
97 6809 : fAllOk = true;
98 : // return the current status
99 13618 : return fRet;
100 : }
101 1107 : nIdle++;
102 1107 : cond.wait(lock); // wait
103 775 : nIdle--;
104 : }
105 : // Decide how many work units to process now.
106 : // * Do not try to do everything at once, but aim for increasingly smaller batches so
107 : // all workers finish approximately simultaneously.
108 : // * Try to account for idle jobs which will instantly start helping.
109 : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
110 6344 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
111 3172 : vChecks.resize(nNow);
112 4071 : for (unsigned int i = 0; i < nNow; i++) {
113 : // We want the lock on the mutex to be as short as possible, so swap jobs from the global
114 : // queue to the local batch vector instead of copying.
115 7455 : vChecks[i].swap(queue.back());
116 2485 : queue.pop_back();
117 : }
118 : // Check whether we need to do work at all
119 1586 : fOk = fAllOk;
120 : }
121 : // execute work
122 24425 : BOOST_FOREACH (T& check, vChecks)
123 2485 : if (fOk)
124 2484 : fOk = check();
125 : vChecks.clear();
126 7141 : } while (true);
127 : }
128 :
129 : public:
130 : //! Create a new check queue
131 192 : CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {}
132 :
133 : //! Worker thread
134 : void Thread()
135 : {
136 331 : Loop();
137 : }
138 :
139 : //! Wait until execution finishes, and return whether all evaluations were successful.
140 : bool Wait()
141 : {
142 6809 : return Loop(true);
143 : }
144 :
145 : //! Add a batch of checks to the queue
146 1870 : void Add(std::vector<T>& vChecks)
147 : {
148 1870 : boost::unique_lock<boost::mutex> lock(mutex);
149 26130 : BOOST_FOREACH (T& check, vChecks) {
150 4970 : queue.push_back(T());
151 4970 : check.swap(queue.back());
152 : }
153 3740 : nTodo += vChecks.size();
154 1870 : if (vChecks.size() == 1)
155 1796 : condWorker.notify_one();
156 74 : else if (vChecks.size() > 1)
157 74 : condWorker.notify_all();
158 1870 : }
159 :
160 96 : ~CCheckQueue()
161 : {
162 96 : }
163 :
164 6809 : bool IsIdle()
165 : {
166 6809 : boost::unique_lock<boost::mutex> lock(mutex);
167 13618 : return (nTotal == nIdle && nTodo == 0 && fAllOk == true);
168 : }
169 :
170 : };
171 :
172 : /**
173 : * RAII-style controller object for a CCheckQueue that guarantees the passed
174 : * queue is finished before continuing.
175 : */
176 : template <typename T>
177 : class CCheckQueueControl
178 : {
179 : private:
180 : CCheckQueue<T>* pqueue;
181 : bool fDone;
182 :
183 : public:
184 6809 : CCheckQueueControl(CCheckQueue<T>* pqueueIn) : pqueue(pqueueIn), fDone(false)
185 : {
186 : // passed queue is supposed to be unused, or NULL
187 6809 : if (pqueue != NULL) {
188 6809 : bool isIdle = pqueue->IsIdle();
189 6809 : assert(isIdle);
190 : }
191 6809 : }
192 :
193 : bool Wait()
194 : {
195 6809 : if (pqueue == NULL)
196 : return true;
197 13618 : bool fRet = pqueue->Wait();
198 6809 : fDone = true;
199 : return fRet;
200 : }
201 :
202 0 : void Add(std::vector<T>& vChecks)
203 : {
204 1870 : if (pqueue != NULL)
205 1870 : pqueue->Add(vChecks);
206 0 : }
207 :
208 6809 : ~CCheckQueueControl()
209 : {
210 6809 : if (!fDone)
211 : Wait();
212 6809 : }
213 : };
214 :
215 : #endif // BITCOIN_CHECKQUEUE_H
|